# Introduction to Spark
Spark is a computing framework for handling and analysing big data -- like Hadoop. However whilst with Hadoop, all MapReduce operations are done to and from disk, Spark does everything in-memory. This makes Hadoop great for (slow) batch processing, and Spark more optimised for faster, or even real-time analyisis.

More than anything else, the goal from this notebook is to understand how we would use Spark if we so wished to, be it in a professional or personal context. Since it's a massive framework with a multitude of functionalities, this will only cover the building blocks of Spark, the basics of reading and transforming data, and making predictions (and evaluating those predictions) using the PySpark ML module.

<br>

<u>**Note**</u> <i>Spark itself, and therefore coding in Spark is above and beyond the BCS and EMC curricula. To reiterate the above, this is all extra material that only serves to give us a flavour of how Spark (in Python) would work if we wished to apply it.</i> 

### Business problem
We will attempt to go through the data pipeline by addressing a churn prediction problem: we've been asked by a marketing agency to predict their customer churn so they could pre-empt with appropriate action. 

## Libraries
First, let's import the necessary libraries. Today, all we'll really be needing is `pyspark`, which is Spark's API in python. We also import pandas though in case we want at any point to convert our Spark dataframe into the more familiar pandas equivalent. 

In [1]:
# Pandas for manipulation, transformation, and conversion from Spark dataframe to pandas if we so wish
import pandas as pd 

# Spark API -- can be used for data reading, transformation, machine learning, SQL and streaming at scale.  
import pyspark

## Initiating SparkSession

In order to use Spark -- in pyton or otherwise -- we need to first set up a 'session' or a connection. 

In [2]:
# Initiating a 'sparksession' or a connection to our spark cluster
spark = pyspark.sql.SparkSession.builder.master('local[*]').appName('customer_churn').getOrCreate()

# Conventionally, we'd call the connection, 'spark'

Notes from the above line:

- We build the SparkSession from the `sql` module from Pyspark.
- In `master()` we specify the cluster we're connecting to -- could be a URL for example for a remote cluster. In this case, however, we'll stick to running Spark locally. To do this we specify _local_, and we then put __*__ within square brackets to tell it that we want to use all the cores in our computer. We could have instead said _local[2]_ for instance to tell it to use only 2 cores.
- With `appName()` we give our Spark App an arbitrary name in case we need to call on it later. 
- Finally with `getOrCreate()` we tell Pyspark to create this app with the minimal configurations we've set (there are many more parameters we could have tweaked, but for now, we're happy with the above). 

In [3]:
# Let's see what this looks like:
spark

Note that it simply returns the configurations we've specified -- e.g. AppName, and Master. We also see that it returned to us the version of Spark we're using, and confirmed that this SparkSession is running in-memory. 


Also note that clicking on the `Spark UI` link takes us to a 'job' tracker page. Feel free to explore the other tabs as well to get a better understanding of what's under the hood!

## Resilient Distributed Datasets (RDDs)

Before Spark Dataframes, which we'll look at in a minute, there were RDDs. In fact RDDs are the building blocks of Spark. They have the following features: 

- **Resilient** -- they are resilient because they're immutable, i.e. they cannot be changed, only new RDDs can be created from old ones. <br> <br>
- **Distributed** -- Spark handles data somewhat like HDFS and MapReduce (but not exactly) in that it also stores and processes data in a distributed manner. RDDs are partitions of bigger datasets -- and by virtues of being partitions, they are distributed. <br> <br>
- **Datasets** -- because they hold data! <br><br>

__________
#### **Important Note**:    <i>'Lazy loading'</i>
Spark is fast in general because it executes operations in-memory, as mentioned. However, it's actually even faster because of a feature embedded in it called 'lazy loading.'

All this means is, Spark will not actually execute operations in-memory all the time, just when we tell it to return a value/result. When we tell it to return a result, in Spark terminology, that's called an **`action`** -- includes things like counting the number of rows, showing the dataframe, or writing it to a file system. In contrast, a **`transformation`** is also a function, but it doesn't actually trigger any computation, it just 'acknowledges' a task in a sense. 

For instance, whenever you create a new dataset from the original one by transforming it using say, a groupBy, or if you take a sample, that will be a transformation, and won't actually run in-memory. However, if you tell it ro return that output, only then will that operation (and every operation leading to the output) will actually run in-memory. This distinction that Spark makes between transformations and actions make it run even quicker.  

For a more comprehensive list of examples of transformations vs. actions, check out [this documentation for RDDs](http://spark.apache.org/docs/latest/rdd-programming-guide.html#actions) and [this documentation](https://spark.apache.org/docs/2.0.0/api/java/org/apache/spark/sql/Dataset.html) for dataframes (explained below!).
__________



Now, let's see if we can load in our data as an RDD first!

In [4]:
# To do this, we load up a SparContext
sc = spark.sparkContext

In [5]:
# Let's see what this looks like
sc

Note the only difference between this and our SparkSession above is that here we're missing the top bit that says `SparkSession - in-memory`. 

**Note**: Before Spark 2.0, we would have had set up SparkContexts for using Spark syntax, SqlContexts for using any SQL queries, StreamingContexts for streaming etc. After 2.0, Spark developers added in SparkSessions, which serve as an 'umbrella' context for all other contexts -- so we don't need to worry about creating separate contexts for separate Spark features anymore!

In [6]:
# We read in the CSV file using 'textFile'
rdd = sc.textFile('agency_churn.csv')


# Note: of course we didn't need to call it rdd, it's just a variable name!

In [7]:
# Check the type of the object:
type(rdd)

pyspark.rdd.RDD

In [8]:
# Return the first row/element in this RDD
rdd.first()

# Note: First() is an action

'Names,Age,Total_Purchase,Account_Manager,Years,Onboard_date,Location,Company,Churn'

In [9]:
# More generalised form of first() -- i.e. will show more than the first row, in this case the first 4 rows 
rdd.take(4)

# Note: Take() is an action

['Names,Age,Total_Purchase,Account_Manager,Years,Onboard_date,Location,Company,Churn',
 'Cameron Williams,42,11066.8,0,7.22,2013-08-30 7:00:40,"10265 Elizabeth Mission Barkerburgh, AK 89518",Harvey LLC,1',
 'Kevin Mueller,41,11916.22,0,6.5,2013-08-13 0:38:46,"6157 Frank Gardens Suite 019 Carloshaven, RI 17756",Wilson PLC,1',
 'Eric Lozano,38,12884.75,0,6.67,2016-06-29 6:20:07,"1331 Keith Court Alyssahaven, DE 90114","Miller, Johnson and Wallace",1']

In [12]:
# Return all row objects (an action)
rdd.collect()

['Names,Age,Total_Purchase,Account_Manager,Years,Onboard_date,Location,Company,Churn',
 'Cameron Williams,42,11066.8,0,7.22,2013-08-30 7:00:40,"10265 Elizabeth Mission Barkerburgh, AK 89518",Harvey LLC,1',
 'Kevin Mueller,41,11916.22,0,6.5,2013-08-13 0:38:46,"6157 Frank Gardens Suite 019 Carloshaven, RI 17756",Wilson PLC,1',
 'Eric Lozano,38,12884.75,0,6.67,2016-06-29 6:20:07,"1331 Keith Court Alyssahaven, DE 90114","Miller, Johnson and Wallace",1',
 'Phillip White,42,8010.76,0,6.71,2014-04-22 12:43:12,"13120 Daniel Mount Angelabury, WY 30645-4695",Smith Inc,1',
 'Cynthia Norton,37,9191.58,0,5.56,2016-01-19 15:31:15,"765 Tricia Row Karenshire, MH 71730",Love-Jones,1',
 'Jessica Williams,48,10356.02,0,5.12,2009-03-03 23:13:37,"6187 Olson Mountains East Vincentborough, PR 74359",Kelly-Warren,1',
 'Eric Butler,,11331.58,,5.23,2016-12-05 3:35:43,"4846 Savannah Road West Justin, IA 87713-3460",Reynolds-Sheppard,1',
 'Zachary Walsh,32,9885.12,1,6.92,2006-03-09 14:50:20,"25271 Roy Expressway 

**Important note on RDDs**<br>
As we can see from the output above, any action that we apply on RDDs (like `first` or `collect`) returns a 'list' or what are called row objects. Each row object represents a row in our RDD. 

It's critical to note here, that RDDs do not have any columnar structure -- i.e. whilst we can output the first row object, we still have no schema. 

## Spark Dataframe
Enter the Spark dataframe -- an object with the same features as RDDs but also has a schema! This makes it our data structure of choice. 

In [13]:
# We use the read.csv function that we can access from our SparkSession object, 'spark'
df = spark.read.csv('agency_churn.csv', 
                     header = True, inferSchema = True)

# Note: by default it wasn't reading the first row in as a header, so we specified it to be True. 

In [14]:
# Check the type of object we have
type(df)

pyspark.sql.dataframe.DataFrame

In [15]:
# As with Pandas dataframes, we can use .head()
df.head(2)

# Note the output is still row objects

[Row(Names='Cameron Williams', Age=42, Total_Purchase=11066.8, Account_Manager=0, Years=7.22, Onboard_date=datetime.datetime(2013, 8, 30, 7, 0, 40), Location='10265 Elizabeth Mission Barkerburgh, AK 89518', Company='Harvey LLC', Churn=1),
 Row(Names='Kevin Mueller', Age=41, Total_Purchase=11916.22, Account_Manager=0, Years=6.5, Onboard_date=datetime.datetime(2013, 8, 13, 0, 38, 46), Location='6157 Frank Gardens Suite 019 Carloshaven, RI 17756', Company='Wilson PLC', Churn=1)]

In [16]:
# We can also check the columns using the columns attributes (like with Pandas)
df.columns

['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Onboard_date',
 'Location',
 'Company',
 'Churn']

In [17]:
# Show the first 4 rows
df.show(4)

# Note: show() is an action for dataframes

+----------------+---+--------------+---------------+-----+-------------------+--------------------+--------------------+-----+
|           Names|Age|Total_Purchase|Account_Manager|Years|       Onboard_date|            Location|             Company|Churn|
+----------------+---+--------------+---------------+-----+-------------------+--------------------+--------------------+-----+
|Cameron Williams| 42|       11066.8|              0| 7.22|2013-08-30 07:00:40|10265 Elizabeth M...|          Harvey LLC|    1|
|   Kevin Mueller| 41|      11916.22|              0|  6.5|2013-08-13 00:38:46|6157 Frank Garden...|          Wilson PLC|    1|
|     Eric Lozano| 38|      12884.75|              0| 6.67|2016-06-29 06:20:07|1331 Keith Court ...|Miller, Johnson a...|    1|
|   Phillip White| 42|       8010.76|              0| 6.71|2014-04-22 12:43:12|13120 Daniel Moun...|           Smith Inc|    1|
+----------------+---+--------------+---------------+-----+-------------------+--------------------+----

### Select columns

Notice that the output of the dataframe is far less visually appealing than that of pandas! We have two options here: 
- Forget that Spark (and distributed computing) exists! Let's just stick to pandas.
- Select columns to display instead of all columns, until Spark fixes the display in the next iteration. 

We're going to go with the latter suggestion. To select columns of a Spark dataframe, we can use syntax we're familiar with from Pandas:

In [18]:
# Select columns with [] as with the Pandas
df[['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years', 
 'Onboard_date',
 'Location']].show(2)

+----------------+---+--------------+---------------+-----+-------------------+--------------------+
|           Names|Age|Total_Purchase|Account_Manager|Years|       Onboard_date|            Location|
+----------------+---+--------------+---------------+-----+-------------------+--------------------+
|Cameron Williams| 42|       11066.8|              0| 7.22|2013-08-30 07:00:40|10265 Elizabeth M...|
|   Kevin Mueller| 41|      11916.22|              0|  6.5|2013-08-13 00:38:46|6157 Frank Garden...|
+----------------+---+--------------+---------------+-----+-------------------+--------------------+
only showing top 2 rows



We can also select columns with SQL queries!

In [19]:
# First we have egister the DataFrame as a SQL 'temporary view' i.e.
df.createOrReplaceTempView("churn_table")

# We now have a table called 'churn_table' which we can query as we would a standard SQL database table

In [20]:
spark.sql("SELECT Names, Age, Total_Purchase FROM churn_table LIMIT 4").show()

+----------------+---+--------------+
|           Names|Age|Total_Purchase|
+----------------+---+--------------+
|Cameron Williams| 42|       11066.8|
|   Kevin Mueller| 41|      11916.22|
|     Eric Lozano| 38|      12884.75|
|   Phillip White| 42|       8010.76|
+----------------+---+--------------+



Finally, we can use the standard Spark syntax as so:

In [21]:
df.select('Names', 'Age', 'Total_Purchase').show(4)

+----------------+---+--------------+
|           Names|Age|Total_Purchase|
+----------------+---+--------------+
|Cameron Williams| 42|       11066.8|
|   Kevin Mueller| 41|      11916.22|
|     Eric Lozano| 38|      12884.75|
|   Phillip White| 42|       8010.76|
+----------------+---+--------------+
only showing top 4 rows



### Investigating dataframe schema and data types
As mentioned, the difference between a Spark dataframe and an RDD is that the former has a schema or a column structure.

In [22]:
# We can investigate our schema by using the printSchema() function
df.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)



In [23]:
# We can also just choose to look at data types
df.dtypes

[('Names', 'string'),
 ('Age', 'int'),
 ('Total_Purchase', 'double'),
 ('Account_Manager', 'int'),
 ('Years', 'double'),
 ('Onboard_date', 'timestamp'),
 ('Location', 'string'),
 ('Company', 'string'),
 ('Churn', 'int')]

**Note** They seem to all make sense, which is great! However, remember that we specified the `inferSchema = True` way back in the read.csv() function, which tells the function to estimate - or 'infer' - the data types in each of our columns. 


By default however, the _inferSchema_ parameter is set to False. This would have read in all columns as _strings_, and we would have had to change this manually. Hypothetically, if we did want to change the structure manually or if we were dealing with data from which it was difficult to infer a schema (correctly), we could do the following:

In [24]:
# First import necessary functions from the 'types' submodule.
from pyspark.sql.types import (StructField, StructType, 
                               IntegerType, StringType, 
                              FloatType, DateType)

# Note, we imported the following data types here: IntegerType, StringType, FloatType, and DateType as that's all we
# we need. The module has other functions as well though.

We create a new data schema for our data as a list of tuple objects, where each tuple, or element in the list represents a field (column) in our dataframe. To dissect each of those `StructField` objects: 

- <u>Name</u>: This is the name we'll give our column.
- <u>dataType</u>: The data type of the column. This will take any one of the values from the functions imported above.
- <u>nullable</u>: This specifies if the column can take Null values or not. 

Note how this is very similar to creating a database in SQL! Perhaps it's no big surprise though as after all, the work we're doing here is all within the SQL module of Pyspark.

In [None]:
# We create a new schema - one StructField for each field.
new_data_schema = [StructField(name = 'Names', dataType= StringType(), nullable= True), 
              StructField(name= 'Age', dataType= IntegerType(), nullable= True), 
              StructField(name= 'Total_purchase', dataType= FloatType(), nullable= True), 
              StructField(name= 'Account_manager', dataType= IntegerType(), nullable= True), 
              StructField(name= 'Years', dataType= FloatType(), nullable= True), 
              StructField(name= 'Num_sites', dataType= IntegerType(), nullable= True), 
              StructField(name= 'Onboard_date', dataType= DateType(), nullable= True),
              StructField(name= 'Location', dataType= StringType(), nullable= True),
              StructField(name= 'Company', dataType= StringType(), nullable= True), 
              StructField(name= 'Churn', dataType= IntegerType(), nullable= True)]


# We create a StrucType object from the above.
final_struc = StructType(fields= data_schema)

# We re-read our CSV, this time assigning our StrucType object to the 'schema' parameter as so:
new_df = spark.read.csv('customer_churn.csv.csv', header = True, schema = final_struc)

### Other useful dataframe functions
Namely:
- Summary statistics
- Finding unique values of a column
- Getting the shape

In [25]:
# To get summary stats, we can use the describe() function as with pandas 
df.describe().show()

# again, you'll see a crowded output, so we can select a few columns for the sake of clarity.

+-------+-------------+-----------------+------------------+-------------------+-----------------+--------------------+--------------------+-------------------+
|summary|        Names|              Age|    Total_Purchase|    Account_Manager|            Years|            Location|             Company|              Churn|
+-------+-------------+-----------------+------------------+-------------------+-----------------+--------------------+--------------------+-------------------+
|  count|          900|              898|               899|                898|              900|                 900|                 899|                900|
|   mean|         null|41.81069042316258|10064.888242491663| 0.4799554565701559| 5.27315555555555|                null|                null|0.16666666666666666|
| stddev|         null| 6.13303075073418| 2409.188637458988|0.49987645989755647|1.274449013194616|                null|                null| 0.3728852122772358|
|    min|   Aaron King|           

In [27]:
# Let's instead pick a few columns
df.select('Names', 'Age', 'Total_Purchase', 'Churn').describe().show()

+-------+-------------+-----------------+------------------+-------------------+
|summary|        Names|              Age|    Total_Purchase|              Churn|
+-------+-------------+-----------------+------------------+-------------------+
|  count|          900|              898|               899|                900|
|   mean|         null|41.81069042316258|10064.888242491663|0.16666666666666666|
| stddev|         null| 6.13303075073418| 2409.188637458988| 0.3728852122772358|
|    min|   Aaron King|               22|             100.0|                  0|
|    max|Zachary Walsh|               65|          18026.01|                  1|
+-------+-------------+-----------------+------------------+-------------------+



Note here that the describe() function in Spark dataframes is also including a string variable, Names. In fact it's calculating 'minimum' and 'maximum' values for it too. What is it doing here?

In [28]:
# To get the unique values of a column, we use the distinct() method after selecting the column in question:
df.select('Churn').distinct().show()

+-----+
|Churn|
+-----+
|    1|
|    0|
+-----+



In [116]:
# And the same for the Account_Manager feature
df.select('Account_Manager').distinct().show()

+---------------+
|Account_Manager|
+---------------+
|           null|
|              1|
|              0|
+---------------+



In [42]:
# To get the number of unique values -- i.e. nunique() in pandas -- we simply add the count() function at the end:
df.select('Age').distinct().count()

37

In [43]:
# Finally, to get the shape, we need to apply the count() to get the number of rows:
df.count()

900

In [44]:
# And to get the number of columns, we need to apply the in-built Python function, len() to the list of columns:
len(df.columns)

9

In [47]:
# Together = Shape!
df.count(), len(df.columns)

(900, 9)

### Filtering
In other words, 'selecting' by rows. To do this, we simply use the `filter()` function.

In [48]:
# Let's check out all customers above the age of 40.

# Syntax 1:
df.filter('Age > 40').show(4)

# (and only show us 4 rows for now)

+----------------+---+--------------+---------------+-----+-------------------+--------------------+------------+-----+
|           Names|Age|Total_Purchase|Account_Manager|Years|       Onboard_date|            Location|     Company|Churn|
+----------------+---+--------------+---------------+-----+-------------------+--------------------+------------+-----+
|Cameron Williams| 42|       11066.8|              0| 7.22|2013-08-30 07:00:40|10265 Elizabeth M...|  Harvey LLC|    1|
|   Kevin Mueller| 41|      11916.22|              0|  6.5|2013-08-13 00:38:46|6157 Frank Garden...|  Wilson PLC|    1|
|   Phillip White| 42|       8010.76|              0| 6.71|2014-04-22 12:43:12|13120 Daniel Moun...|   Smith Inc|    1|
|Jessica Williams| 48|      10356.02|              0| 5.12|2009-03-03 23:13:37|6187 Olson Mounta...|Kelly-Warren|    1|
+----------------+---+--------------+---------------+-----+-------------------+--------------------+------------+-----+
only showing top 4 rows



In [50]:
# Syntax 2: This is similar to pandas filtering
df.filter(df['Age'] > 40).show(4)

+----------------+---+--------------+---------------+-----+-------------------+--------------------+------------+-----+
|           Names|Age|Total_Purchase|Account_Manager|Years|       Onboard_date|            Location|     Company|Churn|
+----------------+---+--------------+---------------+-----+-------------------+--------------------+------------+-----+
|Cameron Williams| 42|       11066.8|              0| 7.22|2013-08-30 07:00:40|10265 Elizabeth M...|  Harvey LLC|    1|
|   Kevin Mueller| 41|      11916.22|              0|  6.5|2013-08-13 00:38:46|6157 Frank Garden...|  Wilson PLC|    1|
|   Phillip White| 42|       8010.76|              0| 6.71|2014-04-22 12:43:12|13120 Daniel Moun...|   Smith Inc|    1|
|Jessica Williams| 48|      10356.02|              0| 5.12|2009-03-03 23:13:37|6187 Olson Mounta...|Kelly-Warren|    1|
+----------------+---+--------------+---------------+-----+-------------------+--------------------+------------+-----+
only showing top 4 rows



To create multiple conditions, it's best if we stick to syntax 2, as so:

In [51]:
# Filter for customers who are older than 40, and have an account manager assigned. 

# Only show Names, and Total Purchase, and for now only show 3 rows.
df.select('Names', 'Total_Purchase').filter((df['Age'] > 40) & (df['Account_Manager'] == 1)).show(3)

+--------------+--------------+
|         Names|Total_Purchase|
+--------------+--------------+
|   Ashlee Carr|       14062.6|
|Bruce Phillips|       8771.02|
|  Craig Garner|       8988.67|
+--------------+--------------+
only showing top 3 rows



In [52]:
# How big is this subset?
df.select('Names', 'Total_Purchase').filter((df['Age'] > 40) & (df['Account_Manager'] == 1)).count()

247

We can also filter string values quite easily, if we wanted to: 

In [129]:
# Find all customers whose names start with Eric (i.e. their first name). 

# Show only the Names, Age, and Total_Purchase columns 
df.select('Names', 'Age', 'Total_Purchase').filter(df['Names'].startswith('Eric')).show()

# Could also use .endswith('Eric') for last names for instance.

+------------+----+--------------+
|       Names| Age|Total_Purchase|
+------------+----+--------------+
| Eric Lozano|  38|      12884.75|
| Eric Butler|null|      11331.58|
|Erica Flores|  38|       9624.18|
| Eric Kelley|  43|      10271.19|
|  Eric Smith|  37|      11416.56|
| Eric Castro|  38|      10770.82|
| Eric Martin|  44|      14155.97|
|  Eric Terry|  42|      16371.42|
+------------+----+--------------+



In [53]:
# What if we want to find any match regardless of whether the string starts or ends with the name in question?

# Let's search for Terry in first, middle or last name:
df.select('Names', 'Age', 'Total_Purchase').filter(df['Names'].contains('Terry')).show()

+---------------+---+--------------+
|          Names|Age|Total_Purchase|
+---------------+---+--------------+
|    Diane Terry| 53|      13173.82|
|    Kelly Terry| 45|       3689.95|
|     Terry Diaz| 46|      11401.63|
|   Robert Terry| 44|       6517.22|
|  Theresa Terry| 44|       8819.13|
|   Terry Kelley| 37|       9788.78|
|Terry Curtis MD| 48|       8403.02|
|  Kristen Terry| 39|      10292.19|
|     Eric Terry| 42|      16371.42|
+---------------+---+--------------+



We can also filter dates easily using the `between()` function. 

In [55]:
# Filter rows for those customers who were onboarded at any point between 2006 and the very beginning of 2008.
df.select('Names','Age','Onboard_date', 'Churn') \
.filter(df['Onboard_date'].between('2006-01-01', '2008-01-01')) \
.count()

# Note on syntax above: we can use the backslash \ to continue our code on the line below for readability. 

192

In [57]:
# Show the result rather than count no. of rows
df.select('Names','Age','Onboard_date', 'Churn') \
.filter(df['Onboard_date'].between('2006-01-01', '2008-01-01')) \
.show(3)

+--------------+----+-------------------+-----+
|         Names| Age|       Onboard_date|Churn|
+--------------+----+-------------------+-----+
| Zachary Walsh|  32|2006-03-09 14:50:20|    1|
|Jennifer Lynch|  40|2006-03-28 15:42:45|    1|
| Doris Wilkins|null|2006-09-03 06:13:55|    1|
+--------------+----+-------------------+-----+
only showing top 3 rows



## Cleaning data
As we all know data is messy! Of course with data of large volumes, the problem of messy/incorrect data is amplified. We can deal with them however, in Spark dataframes. 

<u>Note:</u> As with filtering, lots of data cleaning functions are similar to their Pandas equivalents.

In [29]:
# To begin with, let's import the 'functions' submodule as 'f' in case we need any of the handy functions it brings.
import pyspark.sql.functions as f

# Try f.[PRESS TAB] to see what functions are included!

In [31]:
# Let's see where the Null values are in the Age column:
df.filter(f.isnull(df.Age)).show()

# We could also in this case use the following as there's Spark dataframe-specific function that checks for Nulls:
# df.filter(df.Age.isNull()).show()

+-------------+----+--------------+---------------+-----+-------------------+--------------------+-----------------+-----+
|        Names| Age|Total_Purchase|Account_Manager|Years|       Onboard_date|            Location|          Company|Churn|
+-------------+----+--------------+---------------+-----+-------------------+--------------------+-----------------+-----+
|  Eric Butler|null|      11331.58|           null| 5.23|2016-12-05 03:35:43|4846 Savannah Roa...|Reynolds-Sheppard|    1|
|Doris Wilkins|null|       8213.41|              1| 7.35|2006-09-03 06:13:55|28216 Wright Moun...|             null|    1|
+-------------+----+--------------+---------------+-----+-------------------+--------------------+-----------------+-----+



To view the rows where there are any Null values in any of our columns, we could string several conditions together with the `&` logical operator between them -- i.e. 

`df.filter((f.isnull(df['Age'])) & (f.isnull(df['Total_Purchase'])) & ....)`

However, we could just quickly check how many there actually are with the familiar `dropna()` function. 

In [33]:
# Count the rows where we have Null values 
df.dropna(how = 'any', subset= None).count()

896

In [135]:
# We can see that only 4 rows out of 900 have Null values.
df.count()

900

In [34]:
# Due to the small number, we'll just go ahead and drop all of them
df2 = df.dropna(how = 'any', subset=None)

Alternatively, if we had wanted to replace the Null values, we could have used .fillna() as follows:
`df.fillna(value = ..., subset = 'Age')`

In [54]:
# Check the count after dropping NUll rows:
df2.count()

896

### Adding, removing, and extracting columns

In [55]:
# We can easily add a column as follows:
df2 = df2.withColumn(colName= 'Age_times_2', col= df2['Age']*2)

## Parameters: 
# colName = the name of our new column
# col = the values of this new column

In [56]:
# Now check columns
df2.columns


['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Onboard_date',
 'Location',
 'Company',
 'Churn',
 'Age_times_2']

In [57]:
# This column adds no value here, so we can drop it:
df2 = df2.drop('Age_times_2')

df2.columns

['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Onboard_date',
 'Location',
 'Company',
 'Churn']

We can use this same functionality to extract Month and Year values from the __Onboard_date__ variable, using the the `month` and `year` functions respectively from the functions submodule we imported earlier. 

In [58]:
# Extract Month
df2 = df2.withColumn('Month', f.month(df2['Onboard_date']))

# Extract Year
df2 = df2.withColumn('Year', f.year(df2['Onboard_date']))

In [59]:
df2.columns

['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Onboard_date',
 'Location',
 'Company',
 'Churn',
 'Month',
 'Year']

### GroupBy 
As we've seen so far, Spark dataframes have lots of shared functions with standard SQL queries -- e.g. `select`, `distinct` -- and we can add GroupBy to this this list! Let's explore how we would do a groupBy in Spark. 

In [72]:
# Let's try grouping by Year (of onboarding)

df2.groupBy('Year').mean().select('Year', 'avg(Total_Purchase)', 'avg(Account_Manager)').show()

# Note: We select a few columns here just to get neater output. 

+----+-------------------+--------------------+
|Year|avg(Total_Purchase)|avg(Account_Manager)|
+----+-------------------+--------------------+
|2007| 10389.766777777775|                 0.5|
|2015| 10043.000810810809|  0.5135135135135135|
|2006| 10061.851584158412| 0.42574257425742573|
|2013|  9588.432499999999|              0.4375|
|2014|  9914.628543689323|  0.5048543689320388|
|2012|  9656.126716417912|  0.4626865671641791|
|2009|  9957.127857142857| 0.47619047619047616|
|2016| 10188.949846153848| 0.47692307692307695|
|2010|  10607.28940476191|  0.4642857142857143|
|2011| 10514.843888888892|  0.5277777777777778|
|2008|  9746.088289473686|                 0.5|
+----+-------------------+--------------------+



In [75]:
# Order in descending order of 'Year' value

df2.groupBy('Year').mean() \ 
.select('Year', 'avg(Total_Purchase)', 'avg(Account_Manager)') \
.orderBy('Year', ascending = False) \
.show()

+----+-------------------+--------------------+
|Year|avg(Total_Purchase)|avg(Account_Manager)|
+----+-------------------+--------------------+
|2016| 10188.949846153848| 0.47692307692307695|
|2015| 10043.000810810809|  0.5135135135135135|
|2014|  9914.628543689323|  0.5048543689320388|
|2013|  9588.432499999999|              0.4375|
|2012|  9656.126716417912|  0.4626865671641791|
|2011| 10514.843888888892|  0.5277777777777778|
|2010|  10607.28940476191|  0.4642857142857143|
|2009|  9957.127857142857| 0.47619047619047616|
|2008|  9746.088289473686|                 0.5|
|2007| 10389.766777777775|                 0.5|
|2006| 10061.851584158412| 0.42574257425742573|
+----+-------------------+--------------------+



We can also groupby several columns in Spark dataframes, somewhat like complex pivot tables, or multi-indexing in Pandas dataframes.

In [78]:
# Let's groupby Year and Month 

df2.groupBy('Year', 'Month').mean() \
.select('Year', 'Month', 'avg(Total_Purchase)', 'avg(Account_Manager)') \
.orderBy(df2['Year'].desc(), 'Month') \
.show()

# Note how you're able to order different variables in different orders -- in this case, we sort 'Year' in descending
# order and Month by the default ascending order.

+----+-----+-------------------+--------------------+
|Year|Month|avg(Total_Purchase)|avg(Account_Manager)|
+----+-----+-------------------+--------------------+
|2016|    1| 11082.634285714286| 0.14285714285714285|
|2016|    2| 10367.696249999999|                 0.5|
|2016|    3| 11216.039999999999|                 0.5|
|2016|    4|          10101.122|                 0.6|
|2016|    5|           9476.245|  0.8333333333333334|
|2016|    6| 11068.869999999999|                 0.4|
|2016|    7|            9050.66|                 0.5|
|2016|    8|          10704.825|                0.25|
|2016|    9|            9151.57|                0.25|
|2016|   10|            8690.08|                 0.6|
|2016|   11|          10502.965|  0.6666666666666666|
|2016|   12|  9076.523333333333|  0.3333333333333333|
|2015|    1|  9767.253999999999|                 0.4|
|2015|    2|  9737.191666666666|  0.3333333333333333|
|2015|    3|          10244.714|                 0.6|
|2015|    4|           11081

# Machine learning

There are four or so parts to doing any machine learning project, which is the structure Pyspark ML also follows:
- Feature engineering 
- Train/test split, as it's a supervised problem
- Applying the algorithm (modelling)
- Evaluation

### Feature engineering

Like with Sklearn models, we have to transform the categorical features before running them through an ML model. This can be done in two main ways: 
- <u>One-hot encoding:</u> Creating a column for each feature value that takes either a 1 or a 0 for each row. I.e. <i>Year = 2006</i> would be a column in and of its own right with rows being a 1 (Yes), or a 0 (No). This would be applied across the board for all features and all distinct values they can hold. 
- <u>Label encoding:</u> Labelling each distinct value in a column as a distinct number -- i.e. <i>Company X</i> might now be labelled 1, <i>Company Y</i> might be 2 etc. As numerical features, they would then be valid for input in an ML model for Pyspark ML. <br>

However, solely for simplicity reasons, for this workshop we'll be sticking to numerical features.

In [79]:
df2.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Year: integer (nullable = true)



In [84]:
# We only want to model on numerical features for now - no need for one-hot encoding for now. We can specify a list
# of column names:

keep_cols = [
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years', 
'Year', 
'Month']

# Output
keep_cols

['Age', 'Total_Purchase', 'Account_Manager', 'Years', 'Year', 'Month']

In [85]:
# Check data types in our columns
df2[keep_cols].dtypes

[('Age', 'int'),
 ('Total_Purchase', 'double'),
 ('Account_Manager', 'int'),
 ('Years', 'double'),
 ('Year', 'int'),
 ('Month', 'int')]

Unlike Sklearn however, where a series of column would work, to do any ML with Pyspark we will need to first group the columns into one, as that is the only input it would accept. 

In [83]:
# To do this, we'll use the handy VectorAssembler function:
from pyspark.ml.feature import VectorAssembler

In [86]:
# It takes 2 main parameters: 
## 1) inputCols -- these are our input features.
## 2) outputCol -- this is the name of our output vector that is a combination of the input features we specify.

assembler = VectorAssembler(inputCols = keep_cols, outputCol = 'features')

In [87]:
# Just a VectorAssembler object. We haven't applied the transformation yet.
assembler

VectorAssembler_aae4b179f702

In [88]:
# We create the output dataframe:
output_df = assembler.transform(df2)

In [89]:
# let's see the output_dataframe columns
output_df.columns

['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Onboard_date',
 'Location',
 'Company',
 'Churn',
 'Month',
 'Year',
 'features']

We can see that it has just added a <i>features</i> column at the end.

In [92]:
# Let's see what that actually looks like:
output_df.select('Age', 'Total_Purchase', 'Account_Manager', 'features').show(2)

# It's simply combined them together in a vector with a comma separating the values of the inputCols.

+---+--------------+---------------+--------------------+
|Age|Total_Purchase|Account_Manager|            features|
+---+--------------+---------------+--------------------+
| 42|       11066.8|              0|[42.0,11066.8,0.0...|
| 41|      11916.22|              0|[41.0,11916.22,0....|
+---+--------------+---------------+--------------------+
only showing top 2 rows



In [94]:
# As mentioned we only want to select the 'features' column for our modeling
final_data = output_df.select('features', 'Churn')

In [95]:
final_data.show(3)

+--------------------+-----+
|            features|Churn|
+--------------------+-----+
|[42.0,11066.8,0.0...|    1|
|[41.0,11916.22,0....|    1|
|[38.0,12884.75,0....|    1|
+--------------------+-----+
only showing top 3 rows



### Train/test split
Since it's a supervised problem we first train our model on our training set and evaluate it by testing it on our test set. 

In [97]:
# Train/test split - 70% training, 30% testing
train, test = final_data.randomSplit([0.7, 0.3], seed=123)

<u>Notes:</u> 
- Unlike Sklearn's __train_test_split__ we don't need to pass 4 quadrants, x_train, x_test, y_train, y_test as input. Here, all we need to pass are the train and test, as we're only passing in two columns -- <i>features</i> and <i>Churn</i>.
- We set a seed - here, the arbitrary '123' - to ensure reproducible results.

In [98]:
train.show(3)

+--------------------+-----+
|            features|Churn|
+--------------------+-----+
|[22.0,11254.38,1....|    0|
|[27.0,8628.8,1.0,...|    0|
|[28.0,8670.98,0.0...|    0|
+--------------------+-----+
only showing top 3 rows



In [100]:
train.count(), test.count()

(629, 267)

### Classification
Now for the actual modelling! We'll try Decision trees, and Random Forests for now, but the module has many more algorithms that we can use -- albeit not as comprehensive as sklearn. You can read more on functionality in the [Pyspark ML documentation here](https://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html).

In [102]:
# We import the functions from the classification submodule.
from pyspark.ml.classification import RandomForestClassifier, DecisionTreeClassifier

1) Decision Tree

In [103]:
# We first assign the function to a variable, specifying the 'labelCol' or predicted column as Churn. We'll leave
# model parameters to their default values for now, as we can always come back to tweak them.
dt = DecisionTreeClassifier(labelCol = 'Churn')


# We then fit the model to our training set
fitted_tree = dt.fit(train)

In [104]:
# Just a model object
fitted_tree

DecisionTreeClassificationModel (uid=DecisionTreeClassifier_0309654a9410) of depth 5 with 35 nodes

2) Random Forest

In [105]:
# We do the same with the random forest:
rf = RandomForestClassifier(labelCol = 'Churn')

# Again, we fit this model to our training set
fitted_rf = rf.fit(train)

## Evaluation
To see how our model has done, we can use the handy `BinaryClassificationEvaluator` and `MulticlassClassificationEvaluator` functions.

In [106]:
# For binary class problems (like ours)
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# For multiclass problems (not like ours, but we'll need it to calculate accuracy & F1)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [108]:
# First, let's make predictions 
predictions_dt = fitted_tree.transform(test)

# This returns a dataframe with our labels (Churn) and our prediction (prediction)
predictions_dt.show(3)

+--------------------+-----+-------------+--------------------+----------+
|            features|Churn|rawPrediction|         probability|prediction|
+--------------------+-----+-------------+--------------------+----------+
|[25.0,9672.03,0.0...|    0| [134.0,27.0]|[0.83229813664596...|       0.0|
|[26.0,8787.39,1.0...|    1| [134.0,27.0]|[0.83229813664596...|       0.0|
|[26.0,8939.61,0.0...|    0|   [51.0,5.0]|[0.91071428571428...|       0.0|
+--------------------+-----+-------------+--------------------+----------+
only showing top 3 rows



In [110]:
# We can do the same with the random forest:
predictions_rf = fitted_rf.transform(test)

#predictions_rf.show(3)

In [112]:
# To evaluate using the BinaryClassificationEvaluator function, we pass through the label column and the default 
# metric name.
binary_evaluator = BinaryClassificationEvaluator(labelCol = 'Churn', 
                                                    metricName = 'areaUnderROC')

In [113]:
# We use the evaluate() method to actually do the calculation.
binary_evaluator.evaluate(predictions_dt)

0.5800529100529099

In [117]:
# We also do the same with our random forest to compare.
binary_evaluator.evaluate(predictions_rf)

0.6467195767195761

<u>Note:</u> Area Under the ROC (Receiver Operating Characteristic) curve essentially compares how well our model does when compared with randomness. An AUC score of 0.5 represent randomness. Therefore,
- If our AUC > 0.5 - great!
- If our AUC = 0.5 - just as good as random chance. 
- If our AUC < 0.5 - worse than randomness!

The ROC curve is illustrated on a grid where the X-axis is the False Positive rate (FPR) and the Y-axis is the True Positive Rate (also called Recall). To read more into the AUC/ROC or on any other classification metric, check out this excellent [Wikipedia article on the subject](https://en.wikipedia.org/wiki/Receiver_operating_characteristic).

In [121]:
# We do the same with the multiclassClassificationEvaluator function. 
churn_eval_multi = MulticlassClassificationEvaluator(predictionCol = 'prediction', labelCol = 'Churn', 
                                                    metricName = 'accuracy')

# The metrics this function takes are more familiar - to name a few, 'accuracy' and 'F1'

In [118]:
# Accuracy of the decision tree
churn_eval_multi.evaluate(predictions_dt)

0.8089887640449438

In [120]:
# Accuracy of our random forest.
churn_eval_multi.evaluate(predictions_rf)

0.8352059925093633

## End the session
It's not the case here, but if we were connected to a cluster, it would most likely incur costs to keep it running and keep resources allocated for our session. 

We can end the session using the stop() function:

In [122]:
spark.stop()