# Spark

## Basic Commands/Examples

In [1]:
### FOR NON-ANACONDA PYSPARK
# import findspark
# findspark.init()

In [2]:
# Imports
import random
import pyspark
from pyspark.sql import SparkSession

In [3]:
# Create Spark session
spark = SparkSession.builder.getOrCreate()

# Create rudimentary dataframe
df = spark.sql("""select 'spark' as hello """)

# Display dataframe
df.show()

# Stop Spark session
spark.stop()

+-----+
|hello|
+-----+
|spark|
+-----+



In [4]:
# Function to compute pi
def inside(p):
    x, y = random.random(), random.random()
    return x*x + y*y < 1

num_samples = 10000

sc = pyspark.SparkContext()
count = sc.parallelize(range(0, num_samples)).filter(inside).count()

pi = 4 * count / num_samples
print(pi)

sc.stop()

3.1412


## Dataframe Basics

Material from [TowardsDataScience](https://towardsdatascience.com/pyspark-and-sparksql-basics-6cb4bf967e53).

In [5]:
# Imports
import os
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import date, timedelta, datetime
import time

In [6]:
# Initialize SparkSession
sc = SparkSession.builder.appName("PySparkExample")\
    .config("spark.sql.shuffle.partitions", "50")\
    .config("spark.driver.maxResultSize", "5g")\
    .config("spark.sql.execution.arrow.enabled", "true")\
    .getOrCreate()

In [7]:
# Read JSON file to dataframe
dataframe = sc.read.json(os.path.join("02-raw-data", "nyt2.json"))

# Show first 10 rows
dataframe.show(10)

+--------------------+--------------------+--------------------+-----------------+--------------------+--------+-----------------+-------------+----+--------------+--------------------+-------------+
|                 _id|  amazon_product_url|              author| bestsellers_date|         description|   price|   published_date|    publisher|rank|rank_last_week|               title|weeks_on_list|
+--------------------+--------------------+--------------------+-----------------+--------------------+--------+-----------------+-------------+----+--------------+--------------------+-------------+
|[5b4aa4ead3089013...|http://www.amazon...|       Dean R Koontz|[[1211587200000]]|Odd Thomas, who c...|  [, 27]|[[1212883200000]]|       Bantam| [1]|           [0]|           ODD HOURS|          [1]|
|[5b4aa4ead3089013...|http://www.amazon...|     Stephenie Meyer|[[1211587200000]]|Aliens have taken...|[25.99,]|[[1212883200000]]|Little, Brown| [2]|           [1]|            THE HOST|          [3]|


In [8]:
# JSON
# dataframe = sc.read.json('dataset/nyt2.json')

# TXT FILES
# dataframe_txt = sc.read.text('text_data.txt')

# CSV FILES
# dataframe_csv = sc.read.csv('csv_data.csv')

# PARQUET FILES
# dataframe_parquet = sc.read.load('parquet_data.parquet')

In [9]:
# Drop duplicates
dataframe_dropdup = dataframe.dropDuplicates()
dataframe_dropdup.show(10)

+--------------------+--------------------+--------------------+-----------------+--------------------+--------+-----------------+----------------+----+--------------+--------------------+-------------+
|                 _id|  amazon_product_url|              author| bestsellers_date|         description|   price|   published_date|       publisher|rank|rank_last_week|               title|weeks_on_list|
+--------------------+--------------------+--------------------+-----------------+--------------------+--------+-----------------+----------------+----+--------------+--------------------+-------------+
|[5b4aa4ead3089013...|http://www.amazon...|       John Sandford|[[1212192000000]]|The Minneapolis d...|[26.95,]|[[1213488000000]]|          Putnam| [9]|           [7]|        PHANTOM PREY|          [4]|
|[5b4aa4ead3089013...|http://www.amazon...|    Sebastian Faulks|[[1212796800000]]|James Bond tracks...|[24.95,]|[[1214092800000]]|       Doubleday|[11]|           [8]|      DEVIL MAY CARE|

In [10]:
# Select author data
dataframe.select("author").show(10)

+--------------------+
|              author|
+--------------------+
|       Dean R Koontz|
|     Stephenie Meyer|
|        Emily Giffin|
|   Patricia Cornwell|
|     Chuck Palahniuk|
|James Patterson a...|
|       John Sandford|
|       Jimmy Buffett|
|    Elizabeth George|
|      David Baldacci|
+--------------------+
only showing top 10 rows



In [11]:
# Select title, author, rank, and price data
dataframe.select("author", "title", "rank", "price").show(10)

+--------------------+--------------------+----+--------+
|              author|               title|rank|   price|
+--------------------+--------------------+----+--------+
|       Dean R Koontz|           ODD HOURS| [1]|  [, 27]|
|     Stephenie Meyer|            THE HOST| [2]|[25.99,]|
|        Emily Giffin|LOVE THE ONE YOU'...| [3]|[24.95,]|
|   Patricia Cornwell|           THE FRONT| [4]|[22.95,]|
|     Chuck Palahniuk|               SNUFF| [5]|[24.95,]|
|James Patterson a...|SUNDAYS AT TIFFANY’S| [6]|[24.99,]|
|       John Sandford|        PHANTOM PREY| [7]|[26.95,]|
|       Jimmy Buffett|          SWINE NOT?| [8]|[21.99,]|
|    Elizabeth George|     CARELESS IN RED| [9]|[27.95,]|
|      David Baldacci|     THE WHOLE TRUTH|[10]|[26.99,]|
+--------------------+--------------------+----+--------+
only showing top 10 rows



In [12]:
# Show title and when associated with title
dataframe.select("title", when(dataframe.title !=
                               'ODD HOURS', 1).otherwise(0)).show(10)

+--------------------+-----------------------------------------------------+
|               title|CASE WHEN (NOT (title = ODD HOURS)) THEN 1 ELSE 0 END|
+--------------------+-----------------------------------------------------+
|           ODD HOURS|                                                    0|
|            THE HOST|                                                    1|
|LOVE THE ONE YOU'...|                                                    1|
|           THE FRONT|                                                    1|
|               SNUFF|                                                    1|
|SUNDAYS AT TIFFANY’S|                                                    1|
|        PHANTOM PREY|                                                    1|
|          SWINE NOT?|                                                    1|
|     CARELESS IN RED|                                                    1|
|     THE WHOLE TRUTH|                                                    1|

In [13]:
# Show isin operation
dataframe[dataframe.author.isin("John Sandfor", "Emily Giffin")].show(5)

+--------------------+--------------------+------------+-----------------+--------------------+--------+-----------------+------------+----+--------------+--------------------+-------------+
|                 _id|  amazon_product_url|      author| bestsellers_date|         description|   price|   published_date|   publisher|rank|rank_last_week|               title|weeks_on_list|
+--------------------+--------------------+------------+-----------------+--------------------+--------+-----------------+------------+----+--------------+--------------------+-------------+
|[5b4aa4ead3089013...|http://www.amazon...|Emily Giffin|[[1211587200000]]|A woman's happy m...|[24.95,]|[[1212883200000]]|St. Martin's| [3]|           [2]|LOVE THE ONE YOU'...|          [2]|
|[5b4aa4ead3089013...|http://www.amazon...|Emily Giffin|[[1212192000000]]|A woman’s happy m...|[24.95,]|[[1213488000000]]|St. Martin’s| [4]|           [3]|LOVE THE ONE YOU'...|          [3]|
|[5b4aa4ead3089013...|http://www.amazon...|Em

In [14]:
# Show like operation
dataframe.select("author", "title", dataframe.title.like("% THE %")).show(15)

+--------------------+--------------------+------------------+
|              author|               title|title LIKE % THE %|
+--------------------+--------------------+------------------+
|       Dean R Koontz|           ODD HOURS|             false|
|     Stephenie Meyer|            THE HOST|             false|
|        Emily Giffin|LOVE THE ONE YOU'...|              true|
|   Patricia Cornwell|           THE FRONT|             false|
|     Chuck Palahniuk|               SNUFF|             false|
|James Patterson a...|SUNDAYS AT TIFFANY’S|             false|
|       John Sandford|        PHANTOM PREY|             false|
|       Jimmy Buffett|          SWINE NOT?|             false|
|    Elizabeth George|     CARELESS IN RED|             false|
|      David Baldacci|     THE WHOLE TRUTH|             false|
|        Troy Denning|          INVINCIBLE|             false|
|          James Frey|BRIGHT SHINY MORNING|             false|
|         Garth Stein|THE ART OF RACING...|            

In [15]:
# Startswith
dataframe.select("author", "title", dataframe.title.startswith("THE")).show(5)

+-----------------+--------------------+----------------------+
|           author|               title|startswith(title, THE)|
+-----------------+--------------------+----------------------+
|    Dean R Koontz|           ODD HOURS|                 false|
|  Stephenie Meyer|            THE HOST|                  true|
|     Emily Giffin|LOVE THE ONE YOU'...|                 false|
|Patricia Cornwell|           THE FRONT|                  true|
|  Chuck Palahniuk|               SNUFF|                 false|
+-----------------+--------------------+----------------------+
only showing top 5 rows



In [16]:
# Endswith
dataframe.select("author", "title", dataframe.title.endswith("NT")).show(5)

+-----------------+--------------------+-------------------+
|           author|               title|endswith(title, NT)|
+-----------------+--------------------+-------------------+
|    Dean R Koontz|           ODD HOURS|              false|
|  Stephenie Meyer|            THE HOST|              false|
|     Emily Giffin|LOVE THE ONE YOU'...|              false|
|Patricia Cornwell|           THE FRONT|               true|
|  Chuck Palahniuk|               SNUFF|              false|
+-----------------+--------------------+-------------------+
only showing top 5 rows



In [17]:
# Substring (start index, length)
dataframe.select(dataframe.author.substr(2, 10).alias("title")).show(5)

+----------+
|     title|
+----------+
|ean R Koon|
|tephenie M|
|mily Giffi|
|atricia Co|
|huck Palah|
+----------+
only showing top 5 rows



In [18]:
# Substring
dataframe.select(dataframe.author.substr(3, 6).alias("title")).show(5)

+------+
| title|
+------+
|an R K|
|epheni|
|ily Gi|
|tricia|
|uck Pa|
+------+
only showing top 5 rows



In [19]:
# Substring
dataframe.select(dataframe.author.substr(1, 6).alias("title")).show(5)

+------+
| title|
+------+
|Dean R|
|Stephe|
|Emily |
|Patric|
|Chuck |
+------+
only showing top 5 rows



In [20]:
# Substring
dataframe.select(dataframe.author.substr(0, 1).alias("title")).show(5)

+-----+
|title|
+-----+
|    D|
|    S|
|    E|
|    P|
|    C|
+-----+
only showing top 5 rows



In [21]:
# Add new column
dataframe = dataframe.withColumn("new_column", lit("This is a new column"))

In [22]:
# Update column name
dataframe = dataframe.withColumnRenamed('amazon_product_url', 'url')
dataframe.show(5)

+--------------------+--------------------+-----------------+-----------------+--------------------+--------+-----------------+-------------+----+--------------+--------------------+-------------+--------------------+
|                 _id|                 url|           author| bestsellers_date|         description|   price|   published_date|    publisher|rank|rank_last_week|               title|weeks_on_list|          new_column|
+--------------------+--------------------+-----------------+-----------------+--------------------+--------+-----------------+-------------+----+--------------+--------------------+-------------+--------------------+
|[5b4aa4ead3089013...|http://www.amazon...|    Dean R Koontz|[[1211587200000]]|Odd Thomas, who c...|  [, 27]|[[1212883200000]]|       Bantam| [1]|           [0]|           ODD HOURS|          [1]|This is a new column|
|[5b4aa4ead3089013...|http://www.amazon...|  Stephenie Meyer|[[1211587200000]]|Aliens have taken...|[25.99,]|[[1212883200000]]|L

In [23]:
# Remove column
dataframe_remove = dataframe.drop("publisher", "published_date").show(5)

+--------------------+--------------------+-----------------+-----------------+--------------------+--------+----+--------------+--------------------+-------------+--------------------+
|                 _id|                 url|           author| bestsellers_date|         description|   price|rank|rank_last_week|               title|weeks_on_list|          new_column|
+--------------------+--------------------+-----------------+-----------------+--------------------+--------+----+--------------+--------------------+-------------+--------------------+
|[5b4aa4ead3089013...|http://www.amazon...|    Dean R Koontz|[[1211587200000]]|Odd Thomas, who c...|  [, 27]| [1]|           [0]|           ODD HOURS|          [1]|This is a new column|
|[5b4aa4ead3089013...|http://www.amazon...|  Stephenie Meyer|[[1211587200000]]|Aliens have taken...|[25.99,]| [2]|           [1]|            THE HOST|          [3]|This is a new column|
|[5b4aa4ead3089013...|http://www.amazon...|     Emily Giffin|[[1211587

In [24]:
# Remove another way
dataframe_remove_2 = dataframe.drop(dataframe.publisher).drop(dataframe.published_date).show(5)

+--------------------+--------------------+-----------------+-----------------+--------------------+--------+----+--------------+--------------------+-------------+--------------------+
|                 _id|                 url|           author| bestsellers_date|         description|   price|rank|rank_last_week|               title|weeks_on_list|          new_column|
+--------------------+--------------------+-----------------+-----------------+--------------------+--------+----+--------------+--------------------+-------------+--------------------+
|[5b4aa4ead3089013...|http://www.amazon...|    Dean R Koontz|[[1211587200000]]|Odd Thomas, who c...|  [, 27]| [1]|           [0]|           ODD HOURS|          [1]|This is a new column|
|[5b4aa4ead3089013...|http://www.amazon...|  Stephenie Meyer|[[1211587200000]]|Aliens have taken...|[25.99,]| [2]|           [1]|            THE HOST|          [3]|This is a new column|
|[5b4aa4ead3089013...|http://www.amazon...|     Emily Giffin|[[1211587

In [25]:
# Columns and dtypes
dataframe.dtypes

[('_id', 'struct<$oid:string>'),
 ('url', 'string'),
 ('author', 'string'),
 ('bestsellers_date', 'struct<$date:struct<$numberLong:string>>'),
 ('description', 'string'),
 ('price', 'struct<$numberDouble:string,$numberInt:string>'),
 ('published_date', 'struct<$date:struct<$numberLong:string>>'),
 ('publisher', 'string'),
 ('rank', 'struct<$numberInt:string>'),
 ('rank_last_week', 'struct<$numberInt:string>'),
 ('title', 'string'),
 ('weeks_on_list', 'struct<$numberInt:string>'),
 ('new_column', 'string')]

In [26]:
# Contents
dataframe.show()

+--------------------+--------------------+--------------------+-----------------+--------------------+--------+-----------------+--------------------+----+--------------+--------------------+-------------+--------------------+
|                 _id|                 url|              author| bestsellers_date|         description|   price|   published_date|           publisher|rank|rank_last_week|               title|weeks_on_list|          new_column|
+--------------------+--------------------+--------------------+-----------------+--------------------+--------+-----------------+--------------------+----+--------------+--------------------+-------------+--------------------+
|[5b4aa4ead3089013...|http://www.amazon...|       Dean R Koontz|[[1211587200000]]|Odd Thomas, who c...|  [, 27]|[[1212883200000]]|              Bantam| [1]|           [0]|           ODD HOURS|          [1]|This is a new column|
|[5b4aa4ead3089013...|http://www.amazon...|     Stephenie Meyer|[[1211587200000]]|Aliens

In [27]:
# First n rows
dataframe.head()

Row(_id=Row($oid='5b4aa4ead3089013507db18b'), url='http://www.amazon.com/Odd-Hours-Dean-Koontz/dp/0553807056?tag=NYTBS-20', author='Dean R Koontz', bestsellers_date=Row($date=Row($numberLong='1211587200000')), description='Odd Thomas, who can communicate with the dead, confronts evil forces in a California coastal town.', price=Row($numberDouble=None, $numberInt='27'), published_date=Row($date=Row($numberLong='1212883200000')), publisher='Bantam', rank=Row($numberInt='1'), rank_last_week=Row($numberInt='0'), title='ODD HOURS', weeks_on_list=Row($numberInt='1'), new_column='This is a new column')

In [28]:
# First row
dataframe.first()

Row(_id=Row($oid='5b4aa4ead3089013507db18b'), url='http://www.amazon.com/Odd-Hours-Dean-Koontz/dp/0553807056?tag=NYTBS-20', author='Dean R Koontz', bestsellers_date=Row($date=Row($numberLong='1211587200000')), description='Odd Thomas, who can communicate with the dead, confronts evil forces in a California coastal town.', price=Row($numberDouble=None, $numberInt='27'), published_date=Row($date=Row($numberLong='1212883200000')), publisher='Bantam', rank=Row($numberInt='1'), rank_last_week=Row($numberInt='0'), title='ODD HOURS', weeks_on_list=Row($numberInt='1'), new_column='This is a new column')

In [29]:
# Summary statistics
dataframe.describe().show()

+-------+--------------------+---------------+--------------------+---------+------------------+--------------------+
|summary|                 url|         author|         description|publisher|             title|          new_column|
+-------+--------------------+---------------+--------------------+---------+------------------+--------------------+
|  count|               10195|          10195|               10195|    10195|             10195|               10195|
|   mean|                null|           null|                null|     null|1877.7142857142858|                null|
| stddev|                null|           null|                null|     null| 370.9760613506458|                null|
|    min|http://www.amazon...|        AJ Finn|                    |      ACE|  10TH ANNIVERSARY|This is a new column|
|    max|https://www.amazo...|various authors|’Tis for the Rebe...|allantine|               ZOO|This is a new column|
+-------+--------------------+---------------+----------

In [30]:
# Columns
dataframe.columns

['_id',
 'url',
 'author',
 'bestsellers_date',
 'description',
 'price',
 'published_date',
 'publisher',
 'rank',
 'rank_last_week',
 'title',
 'weeks_on_list',
 'new_column']

In [31]:
# Number of rows
dataframe.count()

10195

In [32]:
# Number of distinct rows
dataframe.distinct().count()

10195

In [33]:
# Physical plan
dataframe.explain()

== Physical Plan ==
*(1) Project [_id#14, amazon_product_url#15 AS url#369, author#16, bestsellers_date#17, description#18, price#19, published_date#20, publisher#21, rank#22, rank_last_week#23, title#24, weeks_on_list#25, This is a new column AS new_column#355]
+- FileScan json [_id#14,amazon_product_url#15,author#16,bestsellers_date#17,description#18,price#19,published_date#20,publisher#21,rank#22,rank_last_week#23,title#24,weeks_on_list#25] Batched: false, DataFilters: [], Format: JSON, Location: InMemoryFileIndex[file:/C:/Spark/02-raw-data/nyt2.json], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_id:struct<$oid:string>,amazon_product_url:string,author:string,bestsellers_date:struct<$d...




In [34]:
# Group by
dataframe.groupBy("author").count().show(10)

+--------------------+-----+
|              author|count|
+--------------------+-----+
|          James Frey|    2|
|    Elin Hilderbrand|   58|
|   Sharon Kay Penman|    2|
|         Kate Jacobs|    3|
|       Karen Robards|    6|
|     Gary Shteyngart|    3|
|         Lisa Genova|    7|
|James Patterson a...|   30|
|         Ruth Reichl|    3|
|         JRR Tolkien|    2|
+--------------------+-----+
only showing top 10 rows



In [35]:
# Filter
dataframe.filter(dataframe["title"] == 'THE HOST').show(5)

+--------------------+--------------------+---------------+-----------------+--------------------+--------+-----------------+-------------+----+--------------+--------+-------------+--------------------+
|                 _id|                 url|         author| bestsellers_date|         description|   price|   published_date|    publisher|rank|rank_last_week|   title|weeks_on_list|          new_column|
+--------------------+--------------------+---------------+-----------------+--------------------+--------+-----------------+-------------+----+--------------+--------+-------------+--------------------+
|[5b4aa4ead3089013...|http://www.amazon...|Stephenie Meyer|[[1211587200000]]|Aliens have taken...|[25.99,]|[[1212883200000]]|Little, Brown| [2]|           [1]|THE HOST|          [3]|This is a new column|
|[5b4aa4ead3089013...|http://www.amazon...|Stephenie Meyer|[[1212192000000]]|Aliens have taken...|[25.99,]|[[1213488000000]]|Little, Brown| [2]|           [2]|THE HOST|          [4]|Th

In [36]:
# Dataframe with 10 partitions
dataframe.repartition(10).rdd.getNumPartitions()

10

In [37]:
# Dataframe with 1 partition
dataframe.coalesce(1).rdd.getNumPartitions()

1

In [38]:
# Registering a table
dataframe.registerTempTable("df")
sc.sql("select * from df").show(3)

+--------------------+--------------------+---------------+-----------------+--------------------+--------+-----------------+-------------+----+--------------+--------------------+-------------+--------------------+
|                 _id|                 url|         author| bestsellers_date|         description|   price|   published_date|    publisher|rank|rank_last_week|               title|weeks_on_list|          new_column|
+--------------------+--------------------+---------------+-----------------+--------------------+--------+-----------------+-------------+----+--------------+--------------------+-------------+--------------------+
|[5b4aa4ead3089013...|http://www.amazon...|  Dean R Koontz|[[1211587200000]]|Odd Thomas, who c...|  [, 27]|[[1212883200000]]|       Bantam| [1]|           [0]|           ODD HOURS|          [1]|This is a new column|
|[5b4aa4ead3089013...|http://www.amazon...|Stephenie Meyer|[[1211587200000]]|Aliens have taken...|[25.99,]|[[1212883200000]]|Little, Bro

In [39]:
# Converting dataframe into an RDD
rdd_convert = dataframe.rdd
type(rdd_convert)

pyspark.rdd.RDD

In [40]:
type(dataframe)

pyspark.sql.dataframe.DataFrame

In [41]:
help(rdd_convert)

Help on RDD in module pyspark.rdd object:

class RDD(builtins.object)
 |  RDD(jrdd, ctx, jrdd_deserializer=AutoBatchedSerializer(PickleSerializer()))
 |  
 |  A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
 |  Represents an immutable, partitioned collection of elements that can be
 |  operated on in parallel.
 |  
 |  Methods defined here:
 |  
 |  __add__(self, other)
 |      Return the union of this RDD and another one.
 |      
 |      >>> rdd = sc.parallelize([1, 1, 2, 3])
 |      >>> (rdd + rdd).collect()
 |      [1, 1, 2, 3, 1, 1, 2, 3]
 |  
 |  __getnewargs__(self)
 |  
 |  __init__(self, jrdd, ctx, jrdd_deserializer=AutoBatchedSerializer(PickleSerializer()))
 |      Initialize self.  See help(type(self)) for accurate signature.
 |  
 |  __repr__(self)
 |      Return repr(self).
 |  
 |  aggregate(self, zeroValue, seqOp, combOp)
 |      Aggregate the elements of each partition, and then the results for all
 |      the partitions, using a given combine fun

In [42]:
# Converting dataframe into a RDD of string 
dataframe.toJSON().first()

'{"_id":{"$oid":"5b4aa4ead3089013507db18b"},"url":"http://www.amazon.com/Odd-Hours-Dean-Koontz/dp/0553807056?tag=NYTBS-20","author":"Dean R Koontz","bestsellers_date":{"$date":{"$numberLong":"1211587200000"}},"description":"Odd Thomas, who can communicate with the dead, confronts evil forces in a California coastal town.","price":{"$numberInt":"27"},"published_date":{"$date":{"$numberLong":"1212883200000"}},"publisher":"Bantam","rank":{"$numberInt":"1"},"rank_last_week":{"$numberInt":"0"},"title":"ODD HOURS","weeks_on_list":{"$numberInt":"1"},"new_column":"This is a new column"}'

In [43]:
# Obtaining contents of df as Pandas 
dataframe.toPandas()

  Nested StructType not supported in conversion to Arrow
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.


Unnamed: 0,_id,url,author,bestsellers_date,description,price,published_date,publisher,rank,rank_last_week,title,weeks_on_list,new_column
0,"(5b4aa4ead3089013507db18b,)",http://www.amazon.com/Odd-Hours-Dean-Koontz/dp...,Dean R Koontz,"((1211587200000,),)","Odd Thomas, who can communicate with the dead,...","(None, 27)","((1212883200000,),)",Bantam,"(1,)","(0,)",ODD HOURS,"(1,)",This is a new column
1,"(5b4aa4ead3089013507db18c,)",http://www.amazon.com/The-Host-Novel-Stephenie...,Stephenie Meyer,"((1211587200000,),)",Aliens have taken control of the minds and bod...,"(25.99, None)","((1212883200000,),)","Little, Brown","(2,)","(1,)",THE HOST,"(3,)",This is a new column
2,"(5b4aa4ead3089013507db18d,)",http://www.amazon.com/Love-Youre-With-Emily-Gi...,Emily Giffin,"((1211587200000,),)",A woman's happy marriage is shaken when she en...,"(24.95, None)","((1212883200000,),)",St. Martin's,"(3,)","(2,)",LOVE THE ONE YOU'RE WITH,"(2,)",This is a new column
3,"(5b4aa4ead3089013507db18e,)",http://www.amazon.com/The-Front-Garano-Patrici...,Patricia Cornwell,"((1211587200000,),)",A Massachusetts state investigator and his tea...,"(22.95, None)","((1212883200000,),)",Putnam,"(4,)","(0,)",THE FRONT,"(1,)",This is a new column
4,"(5b4aa4ead3089013507db18f,)",http://www.amazon.com/Snuff-Chuck-Palahniuk/dp...,Chuck Palahniuk,"((1211587200000,),)",An aging porn queens aims to cap her career by...,"(24.95, None)","((1212883200000,),)",Doubleday,"(5,)","(0,)",SNUFF,"(1,)",This is a new column
...,...,...,...,...,...,...,...,...,...,...,...,...,...
10190,"(5b4aa4ead3089013507dd959,)",https://www.amazon.com/Clancy-Line-Sight-Jack-...,Mike Maden,"((1530921600000,),)",Jack Ryan Jr. risks his life to protect a woma...,"(None, 0)","((1532217600000,),)",Putnam,"(11,)","(6,)",TOM CLANCY LINE OF SIGHT,"(4,)",This is a new column
10191,"(5b4aa4ead3089013507dd95a,)",https://www.amazon.com/Something-Water-Novel-C...,Catherine Steadman,"((1530921600000,),)",A documentary filmmaker and an investment bank...,"(None, 0)","((1532217600000,),)",Ballantine,"(12,)","(11,)",SOMETHING IN THE WATER,"(5,)",This is a new column
10192,"(5b4aa4ead3089013507dd95b,)",https://www.amazon.com/Little-Fires-Everywhere...,Celeste Ng,"((1530921600000,),)",An artist upends a quiet town outside Cleveland.,"(None, 0)","((1532217600000,),)",Penguin Press,"(13,)","(12,)",LITTLE FIRES EVERYWHERE,"(41,)",This is a new column
10193,"(5b4aa4ead3089013507dd95c,)",https://www.amazon.com/Shelter-Place-Nora-Robe...,Nora Roberts,"((1530921600000,),)",Survivors of a mass shooting outside a mall in...,"(None, 0)","((1532217600000,),)",St. Martin's,"(14,)","(5,)",SHELTER IN PLACE,"(6,)",This is a new column


In [44]:
# Change dicectory for output
os.chdir("03-processed-data")

In [45]:
# Write & Save File in .parquet format
dataframe.select("author", "title", "rank", "description") \
    .write \
    .save("Rankings_Descriptions.parquet")

In [46]:
# Write & Save File in .json format
dataframe.select("author", "title") \
    .write \
    .save("Authors_Titles.json", format="json")

In [47]:
# End Spark Session
sc.stop()

## Machine Learning

### Regression

Material from [Towards Data Science](https://towardsdatascience.com/apache-spark-mllib-tutorial-ec6f1cb336a9).

In [48]:
# Imports
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

In [49]:
# Create Spark sessions
spark = SparkSession.builder.getOrCreate()
type(spark)

pyspark.sql.session.SparkSession

In [50]:
# Read data from CSV
data = spark.read.csv(
    os.path.join("02-raw-data", "boston_housing.csv"),
    header=True,
    inferSchema=True)

# Show data
data.show()

+-------+----+-----+----+-----+-----+-----+------+---+---+-------+------+-----+----+
|   crim|  zn|indus|chas|  nox|   rm|  age|   dis|rad|tax|ptratio|     b|lstat|medv|
+-------+----+-----+----+-----+-----+-----+------+---+---+-------+------+-----+----+
|0.00632|18.0| 2.31|   0|0.538|6.575| 65.2|  4.09|  1|296|   15.3| 396.9| 4.98|24.0|
|0.02731| 0.0| 7.07|   0|0.469|6.421| 78.9|4.9671|  2|242|   17.8| 396.9| 9.14|21.6|
|0.02729| 0.0| 7.07|   0|0.469|7.185| 61.1|4.9671|  2|242|   17.8|392.83| 4.03|34.7|
|0.03237| 0.0| 2.18|   0|0.458|6.998| 45.8|6.0622|  3|222|   18.7|394.63| 2.94|33.4|
|0.06905| 0.0| 2.18|   0|0.458|7.147| 54.2|6.0622|  3|222|   18.7| 396.9| 5.33|36.2|
|0.02985| 0.0| 2.18|   0|0.458| 6.43| 58.7|6.0622|  3|222|   18.7|394.12| 5.21|28.7|
|0.08829|12.5| 7.87|   0|0.524|6.012| 66.6|5.5605|  5|311|   15.2| 395.6|12.43|22.9|
|0.14455|12.5| 7.87|   0|0.524|6.172| 96.1|5.9505|  5|311|   15.2| 396.9|19.15|27.1|
|0.21124|12.5| 7.87|   0|0.524|5.631|100.0|6.0821|  5|311|   15.2

In [51]:
# Define features (used to predict)
feature_columns = data.columns[:-1]

In [52]:
# Create features arrays
assembler = VectorAssembler(
    inputCols=feature_columns,
    outputCol="features"
)

In [53]:
# Create features columns
data_2 = assembler.transform(data)
data_2.show()

+-------+----+-----+----+-----+-----+-----+------+---+---+-------+------+-----+----+--------------------+
|   crim|  zn|indus|chas|  nox|   rm|  age|   dis|rad|tax|ptratio|     b|lstat|medv|            features|
+-------+----+-----+----+-----+-----+-----+------+---+---+-------+------+-----+----+--------------------+
|0.00632|18.0| 2.31|   0|0.538|6.575| 65.2|  4.09|  1|296|   15.3| 396.9| 4.98|24.0|[0.00632,18.0,2.3...|
|0.02731| 0.0| 7.07|   0|0.469|6.421| 78.9|4.9671|  2|242|   17.8| 396.9| 9.14|21.6|[0.02731,0.0,7.07...|
|0.02729| 0.0| 7.07|   0|0.469|7.185| 61.1|4.9671|  2|242|   17.8|392.83| 4.03|34.7|[0.02729,0.0,7.07...|
|0.03237| 0.0| 2.18|   0|0.458|6.998| 45.8|6.0622|  3|222|   18.7|394.63| 2.94|33.4|[0.03237,0.0,2.18...|
|0.06905| 0.0| 2.18|   0|0.458|7.147| 54.2|6.0622|  3|222|   18.7| 396.9| 5.33|36.2|[0.06905,0.0,2.18...|
|0.02985| 0.0| 2.18|   0|0.458| 6.43| 58.7|6.0622|  3|222|   18.7|394.12| 5.21|28.7|[0.02985,0.0,2.18...|
|0.08829|12.5| 7.87|   0|0.524|6.012| 66.6|5.5

In [54]:
# Split data into training and testing samples
train, test = data_2.randomSplit([0.7, 0.3])

In [55]:
# Training data count
train.count()

356

In [56]:
# Testing data count
test.count()

150

In [57]:
# Define ML algorithm
algo = LinearRegression(
    featuresCol="features",
    labelCol="medv"
)

In [58]:
# Train the Linear Regression model
model = algo.fit(train)

In [59]:
# Model type
type(model)

pyspark.ml.regression.LinearRegressionModel

In [60]:
# Model
model

LinearRegressionModel: uid=LinearRegression_5a659fe74ef4, numFeatures=13

In [61]:
# Evaluate model performance
evaluation_summary = model.evaluate(test)

In [62]:
# Check evaluation metrics
print(evaluation_summary.meanAbsoluteError)
print(evaluation_summary.rootMeanSquaredError)
print(evaluation_summary.r2)

3.198216982300582
5.13518205657268
0.7113180904343133


In [63]:
# Predict values for test data
predictions = model.transform(test)

In [64]:
# Show predictions
predictions.show()

+-------+----+-----+----+------+-----+----+-------+---+---+-------+------+-----+----+--------------------+------------------+
|   crim|  zn|indus|chas|   nox|   rm| age|    dis|rad|tax|ptratio|     b|lstat|medv|            features|        prediction|
+-------+----+-----+----+------+-----+----+-------+---+---+-------+------+-----+----+--------------------+------------------+
|0.01439|60.0| 2.93|   0| 0.401|6.604|18.8| 6.2196|  1|265|   15.6| 376.7| 4.38|29.1|[0.01439,60.0,2.9...|31.628181777527303|
|0.01778|95.0| 1.47|   0| 0.403|7.135|13.9| 7.6534|  3|402|   17.0| 384.3| 4.45|32.9|[0.01778,95.0,1.4...| 30.92063961116464|
|0.02009|95.0| 2.68|   0|0.4161|8.034|31.9|  5.118|  4|224|   14.7|390.55| 2.88|50.0|[0.02009,95.0,2.6...|43.512009757821474|
|0.02543|55.0| 3.78|   0| 0.484|6.696|56.4| 5.7321|  5|370|   17.6| 396.9| 7.18|23.9|[0.02543,55.0,3.7...| 27.52185599746684|
|0.02763|75.0| 2.95|   0| 0.428|6.595|21.8| 5.4011|  3|252|   18.3|395.63| 4.32|30.8|[0.02763,75.0,2.9...|31.109022066

In [65]:
# Show predictions
predictions.select(predictions.columns[13:]).show()

+----+--------------------+------------------+
|medv|            features|        prediction|
+----+--------------------+------------------+
|29.1|[0.01439,60.0,2.9...|31.628181777527303|
|32.9|[0.01778,95.0,1.4...| 30.92063961116464|
|50.0|[0.02009,95.0,2.6...|43.512009757821474|
|23.9|[0.02543,55.0,3.7...| 27.52185599746684|
|30.8|[0.02763,75.0,2.9...|31.109022066544988|
|25.0|[0.02875,28.0,15....| 29.05631321228384|
|26.6|[0.02899,40.0,1.2...|22.469556035838405|
|33.4|[0.03237,0.0,2.18...| 28.46487023588844|
|24.1|[0.03445,82.5,2.0...| 29.10041637787782|
|19.4|[0.03466,35.0,6.0...|23.512649664060277|
|35.4|[0.03705,20.0,3.3...|34.288687285335975|
|34.6|[0.03768,80.0,1.5...|  35.1578983176606|
|21.1|[0.03961,0.0,5.19...| 20.35751775166594|
|24.8|[0.04297,52.5,5.3...|27.306144300895312|
|18.2|[0.04301,80.0,1.9...|14.329032960176498|
|23.9|[0.04462,25.0,4.8...| 26.77590071317647|
|20.6|[0.04527,0.0,11.9...|21.952062176439433|
|22.3|[0.0459,52.5,5.32...|27.219063012425163|
|23.9|[0.0505

### Feature Transformation

Material from [Towards Data Science](https://towardsdatascience.com/apache-spark-mllib-tutorial-7aba8a1dce6e).

In [66]:
# Imports
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import PCA

In [67]:
# Create Spaek sessuin
spark = SparkSession.builder.getOrCreate()

# Read data from CSV
data = spark.read.csv(
    os.path.join("02-raw-data", "colors.csv"),
    header=True,
    inferSchema=True)

data.show()

+---+------+
| id| color|
+---+------+
|  1|   red|
|  2|  blue|
|  3|orange|
|  4| white|
|  5|   red|
|  6|orange|
|  7|   red|
|  8| white|
|  9|   red|
+---+------+



In [68]:
# Create indexer
indexer = StringIndexer(inputCol="color", outputCol="color_indexed")
type(indexer)

pyspark.ml.feature.StringIndexer

In [69]:
# Learn mappings from color label to color index
indexer_model = indexer.fit(data)

# Created indexed data
indexed_data = indexer_model.transform(data)

indexed_data.show()

+---+------+-------------+
| id| color|color_indexed|
+---+------+-------------+
|  1|   red|          0.0|
|  2|  blue|          3.0|
|  3|orange|          1.0|
|  4| white|          2.0|
|  5|   red|          0.0|
|  6|orange|          1.0|
|  7|   red|          0.0|
|  8| white|          2.0|
|  9|   red|          0.0|
+---+------+-------------+



In [70]:
# Create OneHotEncoder
ohe = OneHotEncoder(
    inputCols=["color_indexed"],
    outputCols=["color_ohe"]) #,
    #dropLast=False)

In [71]:
# Run the estimator
ohe_model = ohe.fit(indexed_data)
ohe_model

OneHotEncoderModel: uid=OneHotEncoder_31816ce18f35, dropLast=true, handleInvalid=error, numInputCols=1, numOutputCols=1

In [72]:
# Apply trained model to data
encoded_data = ohe_model.transform(indexed_data)
encoded_data.show()

+---+------+-------------+-------------+
| id| color|color_indexed|    color_ohe|
+---+------+-------------+-------------+
|  1|   red|          0.0|(3,[0],[1.0])|
|  2|  blue|          3.0|    (3,[],[])|
|  3|orange|          1.0|(3,[1],[1.0])|
|  4| white|          2.0|(3,[2],[1.0])|
|  5|   red|          0.0|(3,[0],[1.0])|
|  6|orange|          1.0|(3,[1],[1.0])|
|  7|   red|          0.0|(3,[0],[1.0])|
|  8| white|          2.0|(3,[2],[1.0])|
|  9|   red|          0.0|(3,[0],[1.0])|
+---+------+-------------+-------------+



In [73]:
# Read data from CSV
data = spark.read.csv(
    os.path.join("02-raw-data", "wine.csv"),
    header=False, inferSchema=True)

data.show()

+---+-----+----+----+----+---+----+----+----+----+----+----+----+----+
|_c0|  _c1| _c2| _c3| _c4|_c5| _c6| _c7| _c8| _c9|_c10|_c11|_c12|_c13|
+---+-----+----+----+----+---+----+----+----+----+----+----+----+----+
|  1|14.23|1.71|2.43|15.6|127| 2.8|3.06|0.28|2.29|5.64|1.04|3.92|1065|
|  1| 13.2|1.78|2.14|11.2|100|2.65|2.76|0.26|1.28|4.38|1.05| 3.4|1050|
|  1|13.16|2.36|2.67|18.6|101| 2.8|3.24| 0.3|2.81|5.68|1.03|3.17|1185|
|  1|14.37|1.95| 2.5|16.8|113|3.85|3.49|0.24|2.18| 7.8|0.86|3.45|1480|
|  1|13.24|2.59|2.87|21.0|118| 2.8|2.69|0.39|1.82|4.32|1.04|2.93| 735|
|  1| 14.2|1.76|2.45|15.2|112|3.27|3.39|0.34|1.97|6.75|1.05|2.85|1450|
|  1|14.39|1.87|2.45|14.6| 96| 2.5|2.52| 0.3|1.98|5.25|1.02|3.58|1290|
|  1|14.06|2.15|2.61|17.6|121| 2.6|2.51|0.31|1.25|5.05|1.06|3.58|1295|
|  1|14.83|1.64|2.17|14.0| 97| 2.8|2.98|0.29|1.98| 5.2|1.08|2.85|1045|
|  1|13.86|1.35|2.27|16.0| 98|2.98|3.15|0.22|1.85|7.22|1.01|3.55|1045|
|  1| 14.1|2.16| 2.3|18.0|105|2.95|3.32|0.22|2.38|5.75|1.25|3.17|1510|
|  1|1

In [74]:
# Create assembler
assembler = VectorAssembler(
    inputCols=data.columns[1:],
    outputCol="features")

data_2 = assembler.transform(data)
data_2.show()

+---+-----+----+----+----+---+----+----+----+----+----+----+----+----+--------------------+
|_c0|  _c1| _c2| _c3| _c4|_c5| _c6| _c7| _c8| _c9|_c10|_c11|_c12|_c13|            features|
+---+-----+----+----+----+---+----+----+----+----+----+----+----+----+--------------------+
|  1|14.23|1.71|2.43|15.6|127| 2.8|3.06|0.28|2.29|5.64|1.04|3.92|1065|[14.23,1.71,2.43,...|
|  1| 13.2|1.78|2.14|11.2|100|2.65|2.76|0.26|1.28|4.38|1.05| 3.4|1050|[13.2,1.78,2.14,1...|
|  1|13.16|2.36|2.67|18.6|101| 2.8|3.24| 0.3|2.81|5.68|1.03|3.17|1185|[13.16,2.36,2.67,...|
|  1|14.37|1.95| 2.5|16.8|113|3.85|3.49|0.24|2.18| 7.8|0.86|3.45|1480|[14.37,1.95,2.5,1...|
|  1|13.24|2.59|2.87|21.0|118| 2.8|2.69|0.39|1.82|4.32|1.04|2.93| 735|[13.24,2.59,2.87,...|
|  1| 14.2|1.76|2.45|15.2|112|3.27|3.39|0.34|1.97|6.75|1.05|2.85|1450|[14.2,1.76,2.45,1...|
|  1|14.39|1.87|2.45|14.6| 96| 2.5|2.52| 0.3|1.98|5.25|1.02|3.58|1290|[14.39,1.87,2.45,...|
|  1|14.06|2.15|2.61|17.6|121| 2.6|2.51|0.31|1.25|5.05|1.06|3.58|1295|[14.06,2.1

In [75]:
# Create scaler
scaler = StandardScaler(
    inputCol="features", 
    outputCol="scaled_features")

In [76]:
# Scaler model
scaler_model = scaler.fit(data_2)

In [77]:
# Scaled data
scaled_data = scaler_model.transform(data_2)
scaled_data.show()

+---+-----+----+----+----+---+----+----+----+----+----+----+----+----+--------------------+--------------------+
|_c0|  _c1| _c2| _c3| _c4|_c5| _c6| _c7| _c8| _c9|_c10|_c11|_c12|_c13|            features|     scaled_features|
+---+-----+----+----+----+---+----+----+----+----+----+----+----+----+--------------------+--------------------+
|  1|14.23|1.71|2.43|15.6|127| 2.8|3.06|0.28|2.29|5.64|1.04|3.92|1065|[14.23,1.71,2.43,...|[17.5283750084766...|
|  1| 13.2|1.78|2.14|11.2|100|2.65|2.76|0.26|1.28|4.38|1.05| 3.4|1050|[13.2,1.78,2.14,1...|[16.2596310690015...|
|  1|13.16|2.36|2.67|18.6|101| 2.8|3.24| 0.3|2.81|5.68|1.03|3.17|1185|[13.16,2.36,2.67,...|[16.2103594597015...|
|  1|14.37|1.95| 2.5|16.8|113|3.85|3.49|0.24|2.18| 7.8|0.86|3.45|1480|[14.37,1.95,2.5,1...|[17.7008256410266...|
|  1|13.24|2.59|2.87|21.0|118| 2.8|2.69|0.39|1.82|4.32|1.04|2.93| 735|[13.24,2.59,2.87,...|[16.3089026783015...|
|  1| 14.2|1.76|2.45|15.2|112|3.27|3.39|0.34|1.97|6.75|1.05|2.85|1450|[14.2,1.76,2.45,1...|[17.4

In [78]:
# Min/max scaler
scaler = MinMaxScaler(min=0, max=1, inputCol='features',
                      outputCol='features_minmax')

scaler_model = scaler.fit(data_2)
data_3 = scaler_model.transform(data_2)
data_3.show()

+---+-----+----+----+----+---+----+----+----+----+----+----+----+----+--------------------+--------------------+
|_c0|  _c1| _c2| _c3| _c4|_c5| _c6| _c7| _c8| _c9|_c10|_c11|_c12|_c13|            features|     features_minmax|
+---+-----+----+----+----+---+----+----+----+----+----+----+----+----+--------------------+--------------------+
|  1|14.23|1.71|2.43|15.6|127| 2.8|3.06|0.28|2.29|5.64|1.04|3.92|1065|[14.23,1.71,2.43,...|[0.84210526315789...|
|  1| 13.2|1.78|2.14|11.2|100|2.65|2.76|0.26|1.28|4.38|1.05| 3.4|1050|[13.2,1.78,2.14,1...|[0.57105263157894...|
|  1|13.16|2.36|2.67|18.6|101| 2.8|3.24| 0.3|2.81|5.68|1.03|3.17|1185|[13.16,2.36,2.67,...|[0.56052631578947...|
|  1|14.37|1.95| 2.5|16.8|113|3.85|3.49|0.24|2.18| 7.8|0.86|3.45|1480|[14.37,1.95,2.5,1...|[0.87894736842105...|
|  1|13.24|2.59|2.87|21.0|118| 2.8|2.69|0.39|1.82|4.32|1.04|2.93| 735|[13.24,2.59,2.87,...|[0.58157894736842...|
|  1| 14.2|1.76|2.45|15.2|112|3.27|3.39|0.34|1.97|6.75|1.05|2.85|1450|[14.2,1.76,2.45,1...|[0.83

In [79]:
# Read data from CSV
data = spark.read.csv(
    os.path.join("02-raw-data", "digits.csv"),
    header=True,
    inferSchema=True)

In [80]:
# Create assembler
assembler = VectorAssembler(inputCols=data.columns[1:], outputCol='features')
data_2 = assembler.transform(data)

In [81]:
# Principal component analysis 
pca = PCA(k=2, inputCol='features', outputCol='features_pca')
pca_model = pca.fit(data_2)
pca_data = pca_model.transform(data_2).select('features_pca')
pca_data.show()

+--------------------+
|        features_pca|
+--------------------+
|[103.738813757982...|
|[2466.78627830941...|
|[-121.55984060478...|
|[599.578991071953...|
|[2689.04430947598...|
|[1253.08650413365...|
|[93.0114290617962...|
|[650.952778816163...|
|[1115.56395904828...|
|[1062.72668192116...|
|[1029.01690081557...|
|[458.805321389768...|
|[-200.34133976162...|
|[751.263926957183...|
|[1265.44211418056...|
|[-199.11010313256...|
|[762.715694923041...|
|[1744.79986516159...|
|[128.314928856543...|
|[1731.44148649029...|
+--------------------+
only showing top 20 rows



### Classification

Material from [Towards Data Science](https://towardsdatascience.com/apache-spark-mllib-tutorial-part-3-complete-classification-workflow-a1eb430ad069).

In [82]:
# Imports
from pyspark.ml.feature import Imputer
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [83]:
# Read data from CSV
data = spark.read.csv(
    os.path.join("02-raw-data", "titanic.csv"),
    header=True, inferSchema=True)

data.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|Gender| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

In [84]:
# Summary stats in Pandas
data.describe().toPandas()

Unnamed: 0,summary,PassengerId,Survived,Pclass,Name,Gender,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,count,891.0,891.0,891.0,891,891,714.0,891.0,891.0,891,891.0,204,889
1,mean,446.0,0.3838383838383838,2.308641975308642,,,29.69911764705882,0.5230078563411896,0.3815937149270482,260318.54916792738,32.2042079685746,,
2,stddev,257.3538420152301,0.4865924542648575,0.8360712409770491,,,14.526497332334037,1.1027434322934315,0.8060572211299488,471609.26868834975,49.69342859718089,,
3,min,1.0,0.0,1.0,"""Andersson, Mr. August Edvard (""""Wennerstrom"""")""",female,0.42,0.0,0.0,110152,0.0,A10,C
4,max,891.0,1.0,3.0,"van Melkebeke, Mr. Philemon",male,80.0,8.0,6.0,WE/P 5735,512.3292,T,S


In [85]:
# Select columns
data = data.select(['Survived', 'Pclass', 'Gender',
                    'Age', 'SibSp', 'Parch', 'Fare'])

In [86]:
# Imputer (missing values)
imputer = Imputer(strategy='mean', inputCols=[
                  'Age'], outputCols=['AgeImputed'])
imputer_model = imputer.fit(data)
data = imputer_model.transform(data)
data.show()

+--------+------+------+----+-----+-----+-------+-----------------+
|Survived|Pclass|Gender| Age|SibSp|Parch|   Fare|       AgeImputed|
+--------+------+------+----+-----+-----+-------+-----------------+
|       0|     3|  male|22.0|    1|    0|   7.25|             22.0|
|       1|     1|female|38.0|    1|    0|71.2833|             38.0|
|       1|     3|female|26.0|    0|    0|  7.925|             26.0|
|       1|     1|female|35.0|    1|    0|   53.1|             35.0|
|       0|     3|  male|35.0|    0|    0|   8.05|             35.0|
|       0|     3|  male|null|    0|    0| 8.4583|29.69911764705882|
|       0|     1|  male|54.0|    0|    0|51.8625|             54.0|
|       0|     3|  male| 2.0|    3|    1| 21.075|              2.0|
|       1|     3|female|27.0|    0|    2|11.1333|             27.0|
|       1|     2|female|14.0|    1|    0|30.0708|             14.0|
|       1|     3|female| 4.0|    1|    1|   16.7|              4.0|
|       1|     1|female|58.0|    0|    0|  26.55

In [87]:
# Index data
gender_indexer = StringIndexer(
    inputCol='Gender', outputCol='GenderIndexed'
)
gender_indexer_model = gender_indexer.fit(data)
data = gender_indexer_model.transform(data)
data.show()

+--------+------+------+----+-----+-----+-------+-----------------+-------------+
|Survived|Pclass|Gender| Age|SibSp|Parch|   Fare|       AgeImputed|GenderIndexed|
+--------+------+------+----+-----+-----+-------+-----------------+-------------+
|       0|     3|  male|22.0|    1|    0|   7.25|             22.0|          0.0|
|       1|     1|female|38.0|    1|    0|71.2833|             38.0|          1.0|
|       1|     3|female|26.0|    0|    0|  7.925|             26.0|          1.0|
|       1|     1|female|35.0|    1|    0|   53.1|             35.0|          1.0|
|       0|     3|  male|35.0|    0|    0|   8.05|             35.0|          0.0|
|       0|     3|  male|null|    0|    0| 8.4583|29.69911764705882|          0.0|
|       0|     1|  male|54.0|    0|    0|51.8625|             54.0|          0.0|
|       0|     3|  male| 2.0|    3|    1| 21.075|              2.0|          0.0|
|       1|     3|female|27.0|    0|    2|11.1333|             27.0|          1.0|
|       1|     2

In [88]:
# Assembler
assembler = VectorAssembler(
    inputCols=['Pclass', 'SibSp', 'Parch', 'Fare', 'AgeImputed', 'GenderIndexed'], outputCol='features')
data = assembler.transform(data)
data.show()

+--------+------+------+----+-----+-----+-------+-----------------+-------------+--------------------+
|Survived|Pclass|Gender| Age|SibSp|Parch|   Fare|       AgeImputed|GenderIndexed|            features|
+--------+------+------+----+-----+-----+-------+-----------------+-------------+--------------------+
|       0|     3|  male|22.0|    1|    0|   7.25|             22.0|          0.0|[3.0,1.0,0.0,7.25...|
|       1|     1|female|38.0|    1|    0|71.2833|             38.0|          1.0|[1.0,1.0,0.0,71.2...|
|       1|     3|female|26.0|    0|    0|  7.925|             26.0|          1.0|[3.0,0.0,0.0,7.92...|
|       1|     1|female|35.0|    1|    0|   53.1|             35.0|          1.0|[1.0,1.0,0.0,53.1...|
|       0|     3|  male|35.0|    0|    0|   8.05|             35.0|          0.0|[3.0,0.0,0.0,8.05...|
|       0|     3|  male|null|    0|    0| 8.4583|29.69911764705882|          0.0|[3.0,0.0,0.0,8.45...|
|       0|     1|  male|54.0|    0|    0|51.8625|             54.0|      

In [89]:
# Algorithm and model
algo = RandomForestClassifier(featuresCol='features', labelCol='Survived')
model = algo.fit(data)

In [90]:
# Predictions
predictions = model.transform(data)
predictions.select(['Survived','prediction', 'probability']).show()

+--------+----------+--------------------+
|Survived|prediction|         probability|
+--------+----------+--------------------+
|       0|       0.0|[0.89551394491524...|
|       1|       1.0|[0.03340837227568...|
|       1|       1.0|[0.43427464885979...|
|       1|       1.0|[0.05957664990639...|
|       0|       0.0|[0.87195589265551...|
|       0|       0.0|[0.87195589265551...|
|       0|       0.0|[0.75135888964194...|
|       0|       0.0|[0.71124488011266...|
|       1|       1.0|[0.43979222491577...|
|       1|       1.0|[0.09010631586934...|
|       1|       1.0|[0.39443761678913...|
|       1|       1.0|[0.09000718096115...|
|       0|       0.0|[0.87825208756381...|
|       0|       0.0|[0.86089348748872...|
|       0|       1.0|[0.30549999071718...|
|       1|       1.0|[0.28043408460639...|
|       0|       0.0|[0.75132804256424...|
|       1|       0.0|[0.84628105917486...|
|       0|       0.0|[0.53498851756044...|
|       1|       1.0|[0.32864593954239...|
+--------+-

In [91]:
# Evaluate predictions
evaluator = BinaryClassificationEvaluator(
    labelCol='Survived', metricName='areaUnderROC')
evaluator.evaluate(predictions)

0.898313254295423

In [92]:
# Isolate data for use with scitkit-learn
y_true = predictions.select(['Survived']).collect()
y_pred = predictions.select(['prediction']).collect()

In [93]:
type(y_true[0])

pyspark.sql.types.Row

In [94]:
# True
y_true

[Row(Survived=0),
 Row(Survived=1),
 Row(Survived=1),
 Row(Survived=1),
 Row(Survived=0),
 Row(Survived=0),
 Row(Survived=0),
 Row(Survived=0),
 Row(Survived=1),
 Row(Survived=1),
 Row(Survived=1),
 Row(Survived=1),
 Row(Survived=0),
 Row(Survived=0),
 Row(Survived=0),
 Row(Survived=1),
 Row(Survived=0),
 Row(Survived=1),
 Row(Survived=0),
 Row(Survived=1),
 Row(Survived=0),
 Row(Survived=1),
 Row(Survived=1),
 Row(Survived=1),
 Row(Survived=0),
 Row(Survived=1),
 Row(Survived=0),
 Row(Survived=0),
 Row(Survived=1),
 Row(Survived=0),
 Row(Survived=0),
 Row(Survived=1),
 Row(Survived=1),
 Row(Survived=0),
 Row(Survived=0),
 Row(Survived=0),
 Row(Survived=1),
 Row(Survived=0),
 Row(Survived=0),
 Row(Survived=1),
 Row(Survived=0),
 Row(Survived=0),
 Row(Survived=0),
 Row(Survived=1),
 Row(Survived=1),
 Row(Survived=0),
 Row(Survived=0),
 Row(Survived=1),
 Row(Survived=0),
 Row(Survived=0),
 Row(Survived=0),
 Row(Survived=0),
 Row(Survived=1),
 Row(Survived=1),
 Row(Survived=0),
 Row(Survi

In [95]:
# Predictions
y_pred

[Row(prediction=0.0),
 Row(prediction=1.0),
 Row(prediction=1.0),
 Row(prediction=1.0),
 Row(prediction=0.0),
 Row(prediction=0.0),
 Row(prediction=0.0),
 Row(prediction=0.0),
 Row(prediction=1.0),
 Row(prediction=1.0),
 Row(prediction=1.0),
 Row(prediction=1.0),
 Row(prediction=0.0),
 Row(prediction=0.0),
 Row(prediction=1.0),
 Row(prediction=1.0),
 Row(prediction=0.0),
 Row(prediction=0.0),
 Row(prediction=0.0),
 Row(prediction=1.0),
 Row(prediction=0.0),
 Row(prediction=0.0),
 Row(prediction=1.0),
 Row(prediction=0.0),
 Row(prediction=0.0),
 Row(prediction=0.0),
 Row(prediction=0.0),
 Row(prediction=0.0),
 Row(prediction=1.0),
 Row(prediction=0.0),
 Row(prediction=0.0),
 Row(prediction=1.0),
 Row(prediction=1.0),
 Row(prediction=0.0),
 Row(prediction=0.0),
 Row(prediction=0.0),
 Row(prediction=0.0),
 Row(prediction=0.0),
 Row(prediction=1.0),
 Row(prediction=1.0),
 Row(prediction=0.0),
 Row(prediction=1.0),
 Row(prediction=0.0),
 Row(prediction=1.0),
 Row(prediction=1.0),
 Row(predi

In [96]:
# Scikit-learn
# from sklearn.metrics import classification_report, confusion_matrix
# print(classification_report(y_true, y_pred))
# print(confusion_matrix(y_true, y_pred))

## Resources

https://spark.apache.org/docs/latest/api/python/index.html

https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes

https://spark.apache.org/docs/latest/sql-getting-started.html

https://spark.apache.org/docs/latest/ml-guide.html

https://spark.apache.org/docs/latest/sql-data-sources.html

https://github.com/apache/spark/tree/master/examples/src/main/python

https://spark.apache.org/docs/latest/

https://spark.apache.org/docs/latest/api/python/pyspark.sql.html

https://spark.apache.org/docs/latest/quick-start.html

https://spark.apache.org/docs/latest/mllib-data-types.html

https://spark.apache.org/docs/latest/api/python/pyspark.sql.html

https://medium.com/@jaafarbenabderrazak.info/spark-for-machine-learning-using-python-and-mllib-435efdc3f708

https://www.sicara.ai/blog/2017-05-02-get-started-pyspark-jupyter-notebook-3-minutes

https://spark.apache.org/docs/latest/api/python/pyspark.html

https://www.sicara.ai/blog/2017-05-02-get-started-pyspark-jupyter-notebook-3-minutes

https://medium.com/sicara/get-started-pyspark-jupyter-guide-tutorial-ae2fe84f594f

https://phoenixnap.com/kb/install-spark-on-windows-10

https://github.com/steveloughran/winutils

https://github.com/apache/spark/tree/master/examples/src/main/python

https://spark.apache.org/examples.html

https://github.com/apache/spark/blob/master/examples/src/main/python/sql/basic.py

https://spark.apache.org/docs/latest/sql-getting-started.html#starting-point-sparksession

https://spark.apache.org/docs/latest/sql-data-sources.html

https://spark.apache.org/docs/latest/ml-guide.html

https://spark.apache.org/docs/latest/ml-statistics.html

https://spark.apache.org/docs/latest/ml-guide.html

https://towardsdatascience.com/spark-on-windows-a-getting-started-guide-11dc44412164

https://towardsdatascience.com/a-brief-introduction-to-pyspark-ff4284701873

https://www.datacamp.com/community/blog/pyspark-cheat-sheet-python

https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_Cheat_Sheet_Python.pdf

https://deelesh.github.io/pyspark-windows.html

https://changhsinlee.com/install-pyspark-windows-jupyter/

https://medium.com/@naomi.fridman/install-pyspark-to-run-on-jupyter-notebook-on-windows-4ec2009de21f

https://www.dezyre.com/apache-spark-tutorial/tutorial-introduction-to-apache-spark

https://medium.com/@naomi.fridman/install-pyspark-to-run-on-jupyter-notebook-on-windows-4ec2009de21f

https://github.com/naomifridman

https://medium.com/big-data-engineering/how-to-install-apache-spark-2-x-in-your-pc-e2047246ffc3