<h2 id="myheading"> Table of Contents </h2>

1. [DataFrame Basics](#dataframebasics)
* [Spark DataFrame Operations](#dataframeops)
* [GroupBy and Aggregate](#groupby)
* [Missing data](#missingdata)
* [Dates and timestamps](#dates)
* [Project exercise](#exercise)
* [Machine Learning](#mllib)
    * [Linear regression](#lr)
    * [Logistic regression](#logreg)
        * [Pipelines](#pipelines)
    * [Tree based methods](#treemethods)
        * [Decision tree](#dt)
        * [Random forest](#rf)
        * [Gradient boosted methods](#gbm)
        * [K-means clustering](#kmeans)
        * [Recommender system](#recsys)
        * [NLP](#nlp)
* [Spark streaming](#streaming)

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('Basics').getOrCreate() # Call the appName whatver you want

[http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark-sql-module]

## <a name="dataframebasics"> DataFrame basics </a>

Spark DataFrames are the workhouse and main way of working with Spark and Python post Spark 2.0. DataFrames act as powerful versions of tables, with rows and columns, easily handling large datasets. The shift to DataFrames provides many advantages:
* A much simpler syntax
* Ability to use SQL directly in the dataframe
* Operations are automatically distributed across RDDs
    
If you've used R or even the pandas library with Python you are probably already familiar with the concept of DataFrames. Spark DataFrame expand on a lot of these concepts, allowing you to transfer that knowledge easily by understanding the simple syntax of Spark DataFrames. Remember that the main advantage to using Spark DataFrames vs those other programs is that Spark can handle data across many RDDs, huge data sets that would never fit on a single computer. That comes at a slight cost of some "peculiar" syntax choices, but after this course you will feel very comfortable with all those topics!


In [3]:
df = spark.read.json('people.json')

In [13]:
type(df)

pyspark.sql.dataframe.DataFrame

In [4]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [5]:
df.describe().show()

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+



In [6]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [7]:
df.columns

['age', 'name']

**Customizing schema**

Some data types make it easier to infer schema (like tabular formats such as csv which we will show later). 

However you often have to set the schema yourself if you aren't dealing with a .read method that doesn't have inferSchema() built-in.

Spark has all the tools you need for this, it just requires a very specific structure:

In [8]:
from pyspark.sql.types import StructField, IntegerType, StringType, StructType

In [9]:
data_schema = [StructField('age', IntegerType(), True),
              StructField('name', StringType(), True)]

In [10]:
final_struc = StructType(fields=data_schema)

In [11]:
df = spark.read.json('people.json', schema=final_struc)

In [12]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)



#### Retrieving data

In [13]:
df['age'] #This returns a column object

Column<b'age'>

In [14]:
type(df['age'])

pyspark.sql.column.Column

To get a dataframe with that column:

In [15]:
df.select('age').show() # This returns a dataframe with that column

+----+
| age|
+----+
|null|
|  30|
|  19|
+----+



In [16]:
df.select(['age', 'name']).show() # Selecting multiple columns

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [17]:
df.head(2) # First two rows

[Row(age=None, name='Michael'), Row(age=30, name='Andy')]

In [18]:
df.head(2)[0]

Row(age=None, name='Michael')

In [19]:
# Adding new columns
df.withColumn('newage', df['age']).show()

+----+-------+------+
| age|   name|newage|
+----+-------+------+
|null|Michael|  null|
|  30|   Andy|    30|
|  19| Justin|    19|
+----+-------+------+



In [20]:
df.withColumn('newerage', df['age']*2).show()

+----+-------+--------+
| age|   name|newerage|
+----+-------+--------+
|null|Michael|    null|
|  30|   Andy|      60|
|  19| Justin|      38|
+----+-------+--------+



In [21]:
df.withColumnRenamed('age', 'my_new_age').show() # Renaming a colunm

+----------+-------+
|my_new_age|   name|
+----------+-------+
|      null|Michael|
|        30|   Andy|
|        19| Justin|
+----------+-------+



### Using SQL to interact with the data

To use SQL queries directly with the dataframe, you will need to register it to a temporary view:

In [8]:
df.createOrReplaceTempView('people')

In [9]:
new_results = spark.sql("SELECT * FROM people")
new_results.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [10]:
results = spark.sql("SELECT * FROM people WHERE age=30")

In [11]:
results.show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



## <a name="dataframeops"> Basic operations </a>

In [15]:
from pyspark.sql import SparkSession

In [16]:
spark = SparkSession.builder.appName('basics').getOrCreate()

In [17]:
df = spark.read.csv('appl_stock.csv', inferSchema=True, header=True)

In [23]:
df.show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|    

In [18]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



**Filtering the data**

A large part of working with DataFrames is the ability to quickly filter out data based on conditions. Spark DataFrames are built on top of the Spark SQL platform, which means that is you already know SQL, you can quickly and easily grab that data using SQL commands, or using the DataFram methods (which is what we focus on in this course).

In [25]:
df.filter("Close<500").show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|    

In [26]:
df.filter("Close<500").select("Open").show()

+------------------+
|              Open|
+------------------+
|        213.429998|
|        214.599998|
|        214.379993|
|            211.75|
|        210.299994|
|212.79999700000002|
|209.18999499999998|
|        207.870005|
|210.11000299999998|
|210.92999500000002|
|        208.330002|
|        214.910006|
|        212.079994|
|206.78000600000001|
|202.51000200000001|
|205.95000100000001|
|        206.849995|
|        204.930004|
|        201.079996|
|192.36999699999998|
+------------------+
only showing top 20 rows



In [27]:
df.filter("Close<500").select(["Open", "Close"]).show() # Filter on close and select two columns open and close

+------------------+------------------+
|              Open|             Close|
+------------------+------------------+
|        213.429998|        214.009998|
|        214.599998|        214.379993|
|        214.379993|        210.969995|
|            211.75|            210.58|
|        210.299994|211.98000499999998|
|212.79999700000002|210.11000299999998|
|209.18999499999998|        207.720001|
|        207.870005|        210.650002|
|210.11000299999998|            209.43|
|210.92999500000002|            205.93|
|        208.330002|        215.039995|
|        214.910006|            211.73|
|        212.079994|        208.069996|
|206.78000600000001|            197.75|
|202.51000200000001|        203.070002|
|205.95000100000001|        205.940001|
|        206.849995|        207.880005|
|        204.930004|        199.289995|
|        201.079996|        192.060003|
|192.36999699999998|        194.729998|
+------------------+------------------+
only showing top 20 rows



In [28]:
df.filter(df['Close']>500).show()

+-------------------+------------------+------------------+------------------+------------------+---------+-----------------+
|               Date|              Open|              High|               Low|             Close|   Volume|        Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+-----------------+
|2012-02-13 00:00:00|        499.529991|503.83000899999996|497.08998899999995|502.60002099999997|129304000|        65.116633|
|2012-02-14 00:00:00|        504.659988|         509.56002|        502.000008|        509.459991|115099600|        66.005408|
|2012-02-16 00:00:00|        491.500008|        504.890007|         486.62999|502.20999900000004|236138000|        65.066102|
|2012-02-17 00:00:00|        503.109993|507.77002000000005|        500.299995|         502.12001|133951300|        65.054443|
|2012-02-21 00:00:00|506.88001299999996|        514.850021|504.12000300000005|        514.850021|151398800|        66.

In [29]:
df.filter(df['Close']<500).select(['Volume', 'Close']).show()

+---------+------------------+
|   Volume|             Close|
+---------+------------------+
|123432400|        214.009998|
|150476200|        214.379993|
|138040000|        210.969995|
|119282800|            210.58|
|111902700|211.98000499999998|
|115557400|210.11000299999998|
|148614900|        207.720001|
|151473000|        210.650002|
|108223500|            209.43|
|148516900|            205.93|
|182501900|        215.039995|
|153038200|            211.73|
|152038600|        208.069996|
|220441900|            197.75|
|266424900|        203.070002|
|466777500|        205.940001|
|430642100|        207.880005|
|293375600|        199.289995|
|311488100|        192.060003|
|187469100|        194.729998|
+---------+------------------+
only showing top 20 rows



**Using and operator**

In [31]:
df.filter((df['Close']<200) & (df['Open']>200)).show()

+-------------------+------------------+----------+----------+----------+---------+------------------+
|               Date|              Open|      High|       Low|     Close|   Volume|         Adj Close|
+-------------------+------------------+----------+----------+----------+---------+------------------+
|2010-01-22 00:00:00|206.78000600000001|207.499996|    197.16|    197.75|220441900|         25.620401|
|2010-01-28 00:00:00|        204.930004|205.500004|198.699995|199.289995|293375600|25.819922000000002|
|2010-01-29 00:00:00|        201.079996|202.199995|190.250002|192.060003|311488100|         24.883208|
+-------------------+------------------+----------+----------+----------+---------+------------------+



In [33]:
df.filter((df['Close']<200) & ~(df['Open']>200)).show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-02-01 00:00:00|192.36999699999998|             196.0|191.29999899999999|        194.729998|187469100|         25.229131|
|2010-02-02 00:00:00|        195.909998|        196.319994|193.37999299999998|        195.859997|174585600|25.375532999999997|
|2010-02-03 00:00:00|        195.169994|        200.200003|        194.420004|        199.229994|153832000|25.812148999999998|
|2010-02-04 00:00:00|        196.730003|        198.370001|        191.570005|        192.050003|189413000|         24.881912|
|2010-02-05 00:00:00|192.63000300000002|             196.0|        190.850002|        195.460001|212576700|25.3

In [34]:
df.filter(df['Low'] == 197.16).show()

+-------------------+------------------+----------+------+------+---------+---------+
|               Date|              Open|      High|   Low| Close|   Volume|Adj Close|
+-------------------+------------------+----------+------+------+---------+---------+
|2010-01-22 00:00:00|206.78000600000001|207.499996|197.16|197.75|220441900|25.620401|
+-------------------+------------------+----------+------+------+---------+---------+



If we want to use this output instead of just displaying it, we can use .collect() method instead of .show() method

In [35]:
df.filter(df['Low']==197.16).collect()

[Row(Date=datetime.datetime(2010, 1, 22, 0, 0), Open=206.78000600000001, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=25.620401)]

In [36]:
result = df.filter(df['Low']==197.16).collect()

In [39]:
row = result[0]

In [40]:
row.asDict()['Volume']

220441900

### <a name='groupby'> GroupBy and Aggregate </a>

Let's learn how to use GroupBy and Aggregate methods on a DataFrame. GroupBy allows you to group rows together based off some column value, for example, you could group together sales data by the day the sale occured, or group repeast customer data based off the name of the customer. Once you've performed the GroupBy operation you can use an aggregate function off that data. An aggregate function aggregates multiple rows of data into a single output, such as taking the sum of inputs, or counting the number of inputs.

In [42]:
from pyspark.sql import SparkSession

In [43]:
spark = SparkSession.builder.appName('groupby').getOrCreate()

In [44]:
df = spark.read.csv('6.3.sales_info.csv', inferSchema=True, header = True)

In [45]:
df.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



In [46]:
df.printSchema()

root
 |-- Company: string (nullable = true)
 |-- Person: string (nullable = true)
 |-- Sales: double (nullable = true)



In [49]:
df.groupBy('Company') #This returns a GroupedData object, off of which you can all various methods

<pyspark.sql.group.GroupedData at 0x1d2a68f3ef0>

In [53]:
df.groupBy('Company').mean()

DataFrame[Company: string, avg(Sales): double]

In [52]:
df.groupBy('Company').mean().show()

+-------+-----------------+
|Company|       avg(Sales)|
+-------+-----------------+
|   APPL|            370.0|
|   GOOG|            220.0|
|     FB|            610.0|
|   MSFT|322.3333333333333|
+-------+-----------------+



In [55]:
df.agg({'Sales':'sum'}).show() # SUm of all sales in the DataFrame

+----------+
|sum(Sales)|
+----------+
|    4327.0|
+----------+



In [56]:
df.agg({'Sales':'max'}).show() # Max of all sales in the DataFrame

+----------+
|max(Sales)|
+----------+
|     870.0|
+----------+



**Functions**

There are a variety of functions you can import from pyspark.sql.functions. Check out the documentation for the full list available:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions

In [57]:
from pyspark.sql.functions import countDistinct, avg, stddev

In [59]:
df.select(countDistinct('Sales')).show()

+---------------------+
|count(DISTINCT Sales)|
+---------------------+
|                   11|
+---------------------+



In [60]:
df.select(avg('Sales').alias('Avg Sales')).show() # Rename the column

+-----------------+
|        Avg Sales|
+-----------------+
|360.5833333333333|
+-----------------+



In [61]:
df.select(stddev('Sales')).show()

+------------------+
|stddev_samp(Sales)|
+------------------+
|250.08742410799007|
+------------------+



In [62]:
# Lets format this better
from pyspark.sql.functions import format_number

In [63]:
sales_std = df.select(stddev('Sales').alias('std'))

In [64]:
sales_std.select(format_number('std', 2)).show()

+---------------------+
|format_number(std, 2)|
+---------------------+
|               250.09|
+---------------------+



In [66]:
sales_std.select(format_number('std', 2).alias('std dev')).show()

+-------+
|std dev|
+-------+
| 250.09|
+-------+



*Ordering and sorting*

In [67]:
df.orderBy('Sales').show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|Charlie|120.0|
|   MSFT|    Amy|124.0|
|   APPL|  Linda|130.0|
|   GOOG|    Sam|200.0|
|   MSFT|Vanessa|243.0|
|   APPL|   John|250.0|
|   GOOG|  Frank|340.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   MSFT|   Tina|600.0|
|   APPL|   Mike|750.0|
|     FB|   Carl|870.0|
+-------+-------+-----+



In [69]:
df.orderBy(df['Sales'].desc()).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|     FB|   Carl|870.0|
|   APPL|   Mike|750.0|
|   MSFT|   Tina|600.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   GOOG|  Frank|340.0|
|   APPL|   John|250.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   APPL|  Linda|130.0|
|   MSFT|    Amy|124.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+



### <a name='missingdata'> Missing data </a>

3 basic options for filling in missing data:

* Just keep the missing data points.
* Drop them missing data points (including the entire row)
* Fill them in with some other value.

In [70]:
from pyspark.sql import SparkSession

In [71]:
spark = SparkSession.builder.appName('missing').getOrCreate()

In [72]:
df = spark.read.csv('6.4.ContainsNull.csv', header=True, inferSchema=True)

In [73]:
df.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



You can use the .na functions for missing data. The drop command has the following parameters:

    df.na.drop(how='any', thresh=None, subset=None)
    
    * param how: 'any' or 'all'.
    
        If 'any', drop a row if it contains any nulls.
        If 'all', drop a row only if all its values are null.
    
    * param thresh: int, default None
    
        If specified, drop rows that have less than `thresh` non-null values.
        This overwrites the `how` parameter.
        
    * param subset: 
        optional list of column names to consider.

In [75]:
df.na.drop().show() # Drops all rows with any missing data

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [76]:
df.na.drop(thresh=2).show() #Drop rows that have less than 'thresh' number of non-null values 

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [78]:
df.na.drop(how='any').show() # How can be 'any' or 'all'

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [79]:
df.na.drop(subset='Sales').show() #Only consider Sales column for NAs 

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



**Fill missing values**

We can also fill the missing values with new values. If you have multiple nulls across multiple data types, Spark is actually smart enough to match up the data types. For example:

In [80]:
df.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sales: double (nullable = true)



In [82]:
df.na.fill('FILL VALUE').show() # Only fills it in string columns

+----+----------+-----+
|  Id|      Name|Sales|
+----+----------+-----+
|emp1|      John| null|
|emp2|FILL VALUE| null|
|emp3|FILL VALUE|345.0|
|emp4|     Cindy|456.0|
+----+----------+-----+



In [83]:
df.na.fill(0).show() # Fills in 0s only for numeric columns, not for strings

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|  0.0|
|emp2| null|  0.0|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [84]:
df.na.fill('No name', subset='Name').show()

+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John| null|
|emp2|No name| null|
|emp3|No name|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+



In [85]:
from pyspark.sql.functions import mean

In [86]:
mean_val = df.select(mean(df['Sales'])).collect()
mean_val

[Row(avg(Sales)=400.5)]

In [88]:
mean_val[0][0]

400.5

In [89]:
mean_sales = mean_val[0][0]

In [91]:
# Now fill in the missing sales with mean_sales
df.na.fill(mean_sales, subset='Sales').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [92]:
# Doing it all in one line
df.na.fill(df.select(mean(df['Sales'])).collect()[0][0], subset='Sales').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



## <a name='dates'> Dates and timestamps </a>

In [94]:
from pyspark.sql import SparkSession

In [95]:
spark = SparkSession.builder.appName('dates').getOrCreate()

In [96]:
df = spark.read.csv('6.1.appl_stock.csv', header=True, inferSchema=True)

In [99]:
df.select(['Date', 'Open']).show()

+-------------------+------------------+
|               Date|              Open|
+-------------------+------------------+
|2010-01-04 00:00:00|        213.429998|
|2010-01-05 00:00:00|        214.599998|
|2010-01-06 00:00:00|        214.379993|
|2010-01-07 00:00:00|            211.75|
|2010-01-08 00:00:00|        210.299994|
|2010-01-11 00:00:00|212.79999700000002|
|2010-01-12 00:00:00|209.18999499999998|
|2010-01-13 00:00:00|        207.870005|
|2010-01-14 00:00:00|210.11000299999998|
|2010-01-15 00:00:00|210.92999500000002|
|2010-01-19 00:00:00|        208.330002|
|2010-01-20 00:00:00|        214.910006|
|2010-01-21 00:00:00|        212.079994|
|2010-01-22 00:00:00|206.78000600000001|
|2010-01-25 00:00:00|202.51000200000001|
|2010-01-26 00:00:00|205.95000100000001|
|2010-01-27 00:00:00|        206.849995|
|2010-01-28 00:00:00|        204.930004|
|2010-01-29 00:00:00|        201.079996|
|2010-02-01 00:00:00|192.36999699999998|
+-------------------+------------------+
only showing top

In [97]:
df.head(1)

[Row(Date=datetime.datetime(2010, 1, 4, 0, 0), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039)]

In [100]:
from pyspark.sql.functions import (dayofmonth, hour,
                                   dayofyear, month,
                                   year, weekofyear,
                                   format_number, date_format)


In [101]:
df.select(dayofmonth(df['Date'])).show()

+----------------+
|dayofmonth(Date)|
+----------------+
|               4|
|               5|
|               6|
|               7|
|               8|
|              11|
|              12|
|              13|
|              14|
|              15|
|              19|
|              20|
|              21|
|              22|
|              25|
|              26|
|              27|
|              28|
|              29|
|               1|
+----------------+
only showing top 20 rows



In [102]:
df.select(hour(df['Date'])).show()

+----------+
|hour(Date)|
+----------+
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
+----------+
only showing top 20 rows



If we want to know avg closing price per year

In [103]:
from pyspark.sql.functions import avg

In [104]:
df.select(year(df['Date'])).show()

+----------+
|year(Date)|
+----------+
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
+----------+
only showing top 20 rows



Use `withColumn()` to create a new column in teh DataFrame

In [105]:
df.withColumn('Year', year(df['Date'])).show() #Returns a new :class:`DataFrame` by adding a column or replacing the existing column that has the same name.


+-------------------+------------------+------------------+------------------+------------------+---------+------------------+----+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|Year|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+----+
|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|2010|
|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|2010|
|2010-01-06 00:00:00|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|2010|
|2010-01-07 00:00:00|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|2010|
|2010-01-08 00:00:00|        210.299994|        212.000006|209.0600050000000

In [106]:
new_df = df.withColumn('Year', year(df['Date']))

In [108]:
new_df.head(2)

[Row(Date=datetime.datetime(2010, 1, 4, 0, 0), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039, Year=2010),
 Row(Date=datetime.datetime(2010, 1, 5, 0, 0), Open=214.599998, High=215.589994, Low=213.249994, Close=214.379993, Volume=150476200, Adj Close=27.774976000000002, Year=2010)]

In [114]:
new_df.groupBy('Year').agg({'Close':'mean'}).show()

+----+------------------+
|Year|        avg(Close)|
+----+------------------+
|2015|120.03999980555547|
|2013| 472.6348802857143|
|2014| 295.4023416507935|
|2012| 576.0497195640002|
|2016|104.60400786904763|
|2010| 259.8424600000002|
|2011|364.00432532142867|
+----+------------------+



Do this again to rename the column

In [119]:
new = new_df.groupBy('Year').agg({'Close':'mean'}).withColumnRenamed('avg(Close)', 'Avg Closing Price').\
orderBy('Avg Closing Price')
new.show()

+----+------------------+
|Year| Avg Closing Price|
+----+------------------+
|2016|104.60400786904763|
|2015|120.03999980555547|
|2010| 259.8424600000002|
|2014| 295.4023416507935|
|2011|364.00432532142867|
|2013| 472.6348802857143|
|2012| 576.0497195640002|
+----+------------------+



Lets round it off to 2 digits

In [121]:
new.select(['Year', format_number('Avg Closing Price', 2)]).show()

+----+-----------------------------------+
|Year|format_number(Avg Closing Price, 2)|
+----+-----------------------------------+
|2016|                             104.60|
|2015|                             120.04|
|2010|                             259.84|
|2014|                             295.40|
|2011|                             364.00|
|2013|                             472.63|
|2012|                             576.05|
+----+-----------------------------------+



## <a name='mllib'> MLLib </a>

### <a name='lr'> Linear regression </a>

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('linreg').getOrCreate()

In [3]:
from pyspark.ml.regression import LinearRegression

In [5]:
df_train = spark.read.format('libsvm').load('8.1.sample_linear_regression_data.txt')

In [6]:
df_train.show()

+-------------------+--------------------+
|              label|            features|
+-------------------+--------------------+
| -9.490009878824548|(10,[0,1,2,3,4,5,...|
| 0.2577820163584905|(10,[0,1,2,3,4,5,...|
| -4.438869807456516|(10,[0,1,2,3,4,5,...|
|-19.782762789614537|(10,[0,1,2,3,4,5,...|
| -7.966593841555266|(10,[0,1,2,3,4,5,...|
| -7.896274316726144|(10,[0,1,2,3,4,5,...|
| -8.464803554195287|(10,[0,1,2,3,4,5,...|
| 2.1214592666251364|(10,[0,1,2,3,4,5,...|
| 1.0720117616524107|(10,[0,1,2,3,4,5,...|
|-13.772441561702871|(10,[0,1,2,3,4,5,...|
| -5.082010756207233|(10,[0,1,2,3,4,5,...|
|  7.887786536531237|(10,[0,1,2,3,4,5,...|
| 14.323146365332388|(10,[0,1,2,3,4,5,...|
|-20.057482615789212|(10,[0,1,2,3,4,5,...|
|-0.8995693247765151|(10,[0,1,2,3,4,5,...|
| -19.16829262296376|(10,[0,1,2,3,4,5,...|
|  5.601801561245534|(10,[0,1,2,3,4,5,...|
|-3.2256352187273354|(10,[0,1,2,3,4,5,...|
| 1.5299675726687754|(10,[0,1,2,3,4,5,...|
| -0.250102447941961|(10,[0,1,2,3,4,5,...|
+----------

This is the format that Spark expects: Two columns with the names "label" and "features". 
* The "label" column needs to have the numerical label, either a regression numerical value, or a numerical value that matches to a classification grouping.
* Unsupervised learning algorithms do not use or require a label.
* The feature column contains a vector of all the features that belong to that row
* Usually what we end up doing is combining the various feature columns we have into a single 'features' column using the data transformations we've learned about.

In [8]:
lr = LinearRegression(featuresCol = 'features', labelCol = 'label', predictionCol = 'prediction')

In [9]:
# Fit the model 
lr_model = lr.fit(df_train)

In [25]:
import numpy as np
# Print the coefficients and intercept for linear regression
print(f"Model coefficients: {np.round(lr_model.coefficients,2)}", "\n ",
      f"Intercept: {np.round(lr_model.intercept, 2)}")

Model coefficients: [ 0.01  0.83 -0.81  2.44  0.52  1.15 -0.3  -0.51 -0.62  0.7 ] 
  Intercept: 0.14


In [33]:
print(f"Model r2: {np.round(lr_model.summary.r2, 2)}", "\n",
     f"Model RMSE: {np.round(lr_model.summary.rootMeanSquaredError, 2)}")

Model r2: 0.03 
 Model RMSE: 10.16


In [27]:
lr_model.summary.rootMeanSquaredError

10.16309157133015

In [34]:
lr_model.summary.residuals.show()

+-------------------+
|          residuals|
+-------------------+
|-11.011130022096554|
| 0.9236590911176538|
|-4.5957401897776675|
|  -20.4201774575836|
|-10.339160314788181|
|-5.9552091439610555|
|-10.726906349283922|
|  2.122807193191233|
|  4.077122222293811|
|-17.316168071241652|
| -4.593044343959059|
|  6.380476690746936|
| 11.320566035059846|
|-20.721971774534094|
| -2.736692773777401|
| -16.66886934252847|
|  8.242186378876315|
|-1.3723486332690233|
|-0.7060332131264666|
|-1.1591135969994064|
+-------------------+
only showing top 20 rows



#### <a name='lr_traintestsplit'> Train test split </a>

In [36]:
all_data =  spark.read.format('libsvm').load('8.1.sample_linear_regression_data.txt')

In [37]:
split_object = all_data.randomSplit([0.7, 0.3]) # Returns a list of 2 DFs, with 70% and 30% of data each

In [38]:
split_object

[DataFrame[label: double, features: vector],
 DataFrame[label: double, features: vector]]

In [39]:
df_train, df_test = all_data.randomSplit([0.7, 0.3])

In [40]:
df_train.describe().show() # Has 342 rows. 70% of data

+-------+-------------------+
|summary|              label|
+-------+-------------------+
|  count|                342|
|   mean| 0.5171471810392639|
| stddev| 10.490443714083256|
|    min|-28.046018037776633|
|    max|  27.78383192005107|
+-------+-------------------+



In [41]:
df_test.describe().show() # Has 159 rows, 30% of data  

+-------+--------------------+
|summary|               label|
+-------+--------------------+
|  count|                 159|
|   mean|-0.30291217607383364|
| stddev|   9.945905817201263|
|    min| -28.571478869743427|
|    max|   23.52945433069272|
+-------+--------------------+



In [42]:
lr = LinearRegression(featuresCol = 'features', labelCol = 'label', predictionCol = 'prediction')
lr_model = lr.fit(df_train)

In [43]:
test_evaluate = lr_model.evaluate(df_test)

In [44]:
test_evaluate.rootMeanSquaredError

10.283647371758052

In [45]:
test_evaluate.r2

-0.07583508915497639

**How would this work with unlabeled data** 

When we don't have labels to compare against 

In [47]:
unlabeled_data = df_test.select('features')

In [48]:
unlabeled_data.show()

+--------------------+
|            features|
+--------------------+
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
+--------------------+
only showing top 20 rows



In [50]:
lr_model.transform(unlabeled_data).show()

+--------------------+-------------------+
|            features|         prediction|
+--------------------+-------------------+
|(10,[0,1,2,3,4,5,...|-1.4696422017863662|
|(10,[0,1,2,3,4,5,...| 1.9218345645429515|
|(10,[0,1,2,3,4,5,...|0.00946913732419391|
|(10,[0,1,2,3,4,5,...|  4.780559704926512|
|(10,[0,1,2,3,4,5,...| -3.955345441651352|
|(10,[0,1,2,3,4,5,...|  1.036012195961091|
|(10,[0,1,2,3,4,5,...|  2.658295934376892|
|(10,[0,1,2,3,4,5,...|-2.6631900901089574|
|(10,[0,1,2,3,4,5,...| 2.3010719867388825|
|(10,[0,1,2,3,4,5,...| 2.4431412778903563|
|(10,[0,1,2,3,4,5,...|  4.477613433727788|
|(10,[0,1,2,3,4,5,...| 1.7221658420412012|
|(10,[0,1,2,3,4,5,...| 0.9753413267158018|
|(10,[0,1,2,3,4,5,...| 3.2838125674415166|
|(10,[0,1,2,3,4,5,...|0.12281228289991947|
|(10,[0,1,2,3,4,5,...| 0.4171019222216994|
|(10,[0,1,2,3,4,5,...| 1.8231174859757484|
|(10,[0,1,2,3,4,5,...| 3.5678446649223785|
|(10,[0,1,2,3,4,5,...|0.11122643551813005|
|(10,[0,1,2,3,4,5,...| 3.5740928780155263|
+----------

** Code along example** 
Trying to predict a customer's total expenditure

In [52]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('lr_example').getOrCreate()

In [53]:
from pyspark.ml.regression import LinearRegression

In [54]:
df = spark.read.csv('8.2.Ecommerce-Customers.csv', inferSchema=True, header=True)

In [56]:
df.printSchema()

root
 |-- Email: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Avatar: string (nullable = true)
 |-- Avg Session Length: double (nullable = true)
 |-- Time on App: double (nullable = true)
 |-- Time on Website: double (nullable = true)
 |-- Length of Membership: double (nullable = true)
 |-- Yearly Amount Spent: double (nullable = true)



In [57]:
df.show()

+--------------------+--------------------+----------------+------------------+------------------+------------------+--------------------+-------------------+
|               Email|             Address|          Avatar|Avg Session Length|       Time on App|   Time on Website|Length of Membership|Yearly Amount Spent|
+--------------------+--------------------+----------------+------------------+------------------+------------------+--------------------+-------------------+
|mstephenson@ferna...|835 Frank TunnelW...|          Violet| 34.49726772511229| 12.65565114916675| 39.57766801952616|  4.0826206329529615|  587.9510539684005|
|   hduke@hotmail.com|4547 Archer Commo...|       DarkGreen| 31.92627202636016|11.109460728682564|37.268958868297744|    2.66403418213262|  392.2049334443264|
|    pallen@yahoo.com|24645 Valerie Uni...|          Bisque|33.000914755642675|11.330278057777512|37.110597442120856|   4.104543202376424| 487.54750486747207|
|riverarebecca@gma...|1414 David Throug...|   

In [61]:
df.head(5)

[Row(Email='mstephenson@fernandez.com', Address='835 Frank TunnelWrightmouth, MI 82180-9605', Avatar='Violet', Avg Session Length=34.49726772511229, Time on App=12.65565114916675, Time on Website=39.57766801952616, Length of Membership=4.0826206329529615, Yearly Amount Spent=587.9510539684005),
 Row(Email='hduke@hotmail.com', Address='4547 Archer CommonDiazchester, CA 06566-8576', Avatar='DarkGreen', Avg Session Length=31.92627202636016, Time on App=11.109460728682564, Time on Website=37.268958868297744, Length of Membership=2.66403418213262, Yearly Amount Spent=392.2049334443264),
 Row(Email='pallen@yahoo.com', Address='24645 Valerie Unions Suite 582Cobbborough, DC 99414-7564', Avatar='Bisque', Avg Session Length=33.000914755642675, Time on App=11.330278057777512, Time on Website=37.110597442120856, Length of Membership=4.104543202376424, Yearly Amount Spent=487.54750486747207),
 Row(Email='riverarebecca@gmail.com', Address='1414 David ThroughwayPort Jason, OH 22070-1220', Avatar='Sad

In [60]:
for item in df.head(2)[1]:
    print(item)

hduke@hotmail.com
4547 Archer CommonDiazchester, CA 06566-8576
DarkGreen
31.92627202636016
11.109460728682564
37.268958868297744
2.66403418213262
392.2049334443264


In [64]:
df.columns

['Email',
 'Address',
 'Avatar',
 'Avg Session Length',
 'Time on App',
 'Time on Website',
 'Length of Membership',
 'Yearly Amount Spent']

In [62]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

We'll use numeric columns (Avg session length, Time on app, Time on website, Length of membership) to predict Yearly Amount spent.

Ignoring categorical columns (Email, address, avatar) for now

In [65]:
assembler = VectorAssembler(inputCols=['Avg Session Length', 'Time on App', 'Time on Website', 'Length of Membership'],
                           outputCol = 'features') # Transforms a bunch of cols into features 

In [66]:
output = assembler.transform(df)

In [67]:
output.head(2) # Can see features as a dense vector 

[Row(Email='mstephenson@fernandez.com', Address='835 Frank TunnelWrightmouth, MI 82180-9605', Avatar='Violet', Avg Session Length=34.49726772511229, Time on App=12.65565114916675, Time on Website=39.57766801952616, Length of Membership=4.0826206329529615, Yearly Amount Spent=587.9510539684005, features=DenseVector([34.4973, 12.6557, 39.5777, 4.0826])),
 Row(Email='hduke@hotmail.com', Address='4547 Archer CommonDiazchester, CA 06566-8576', Avatar='DarkGreen', Avg Session Length=31.92627202636016, Time on App=11.109460728682564, Time on Website=37.268958868297744, Length of Membership=2.66403418213262, Yearly Amount Spent=392.2049334443264, features=DenseVector([31.9263, 11.1095, 37.269, 2.664]))]

In [68]:
# Select final data 
df_final = output.select('features', 'Yearly Amount Spent')
df_final.show()

+--------------------+-------------------+
|            features|Yearly Amount Spent|
+--------------------+-------------------+
|[34.4972677251122...|  587.9510539684005|
|[31.9262720263601...|  392.2049334443264|
|[33.0009147556426...| 487.54750486747207|
|[34.3055566297555...|  581.8523440352177|
|[33.3306725236463...|  599.4060920457634|
|[33.8710378793419...|   637.102447915074|
|[32.0215955013870...|  521.5721747578274|
|[32.7391429383803...|  549.9041461052942|
|[33.9877728956856...|  570.2004089636196|
|[31.9365486184489...|  427.1993848953282|
|[33.9925727749537...|  492.6060127179966|
|[33.8793608248049...|  522.3374046069357|
|[29.5324289670579...|  408.6403510726275|
|[33.1903340437226...|  573.4158673313865|
|[32.3879758531538...|  470.4527333009554|
|[30.7377203726281...|  461.7807421962299|
|[32.1253868972878...| 457.84769594494855|
|[32.3388993230671...| 407.70454754954415|
|[32.1878120459321...|  452.3156754800354|
|[32.6178560628234...|   605.061038804892|
+----------

In [69]:
df_train, df_test = df_final.randomSplit([0.7, 0.3])

In [70]:
df_train.describe().show()

+-------+-------------------+
|summary|Yearly Amount Spent|
+-------+-------------------+
|  count|                343|
|   mean|  502.3919600224409|
| stddev|  80.01763222417557|
|    min|   266.086340948469|
|    max|  744.2218671047146|
+-------+-------------------+



In [71]:
df_test.describe().show()

+-------+-------------------+
|summary|Yearly Amount Spent|
+-------+-------------------+
|  count|                157|
|   mean|  492.5896614114529|
| stddev|  77.58423697497992|
|    min| 256.67058229005585|
|    max|  765.5184619388373|
+-------+-------------------+



In [72]:
lr = LinearRegression(featuresCol = 'features', labelCol = 'Yearly Amount Spent')

In [73]:
lr_model = lr.fit(df_train)

In [74]:
lr_model.evaluate(df_test).residuals.show()

+-------------------+
|          residuals|
+-------------------+
|   13.2893607485762|
| 2.2032319271501706|
|  5.463836506198163|
|-11.088652227986756|
| -6.804997704875177|
|  11.56668191355061|
| 0.4808219876441626|
| 5.3135365521760605|
|-3.3965356117164447|
|-6.5350832533171115|
|  3.599959549007565|
|-13.547191295395805|
| 18.735862068698054|
| -3.434350391780356|
| 0.7247785049299864|
| -5.032695833692912|
|  9.784056535022557|
|-1.7207055223631187|
| 12.685084425780701|
|  5.663096428559925|
+-------------------+
only showing top 20 rows



In [76]:
lr_model.evaluate(df_test).r2

0.9811971405785709

In [78]:
lr_model.evaluate(df_test).rootMeanSquaredError

10.60468849128742

In [79]:
df_unlabelled = df_test.select('features')

In [80]:
df_unlabelled.show()

+--------------------+
|            features|
+--------------------+
|[29.5324289670579...|
|[30.5743636841713...|
|[31.0472221394875...|
|[31.0662181616375...|
|[31.1280900496166...|
|[31.1695067987115...|
|[31.2606468698795...|
|[31.3584771924370...|
|[31.4252268808548...|
|[31.4474464941278...|
|[31.5316044825729...|
|[31.5741380228732...|
|[31.6098395733896...|
|[31.6253601348306...|
|[31.6610498227460...|
|[31.7242025238451...|
|[31.8512531286083...|
|[31.8627411090001...|
|[31.9096268275227...|
|[31.9480174211613...|
+--------------------+
only showing top 20 rows



In [81]:
predictions = lr_model.transform(df_unlabelled)
predictions.show()

+--------------------+------------------+
|            features|        prediction|
+--------------------+------------------+
|[29.5324289670579...| 395.3509903240513|
|[30.5743636841713...| 439.8611818309155|
|[31.0472221394875...|387.03356268282323|
|[31.0662181616375...| 460.0219454356611|
|[31.1280900496166...| 564.0576844519298|
|[31.1695067987115...| 415.7898488887422|
|[31.2606468698795...| 420.8458092693072|
|[31.3584771924370...|489.86241389729935|
|[31.4252268808548...| 534.1632542664784|
|[31.4474464941278...| 425.1378253485411|
|[31.5316044825729...|  432.915646180355|
|[31.5741380228732...| 557.9564634559827|
|[31.6098395733896...| 425.8096875824101|
|[31.6253601348306...|379.77125114870455|
|[31.6610498227460...|415.63357507497085|
|[31.7242025238451...| 508.4205831216534|
|[31.8512531286083...|463.20819013177584|
|[31.8627411090001...| 558.0188466964098|
|[31.9096268275227...| 550.7609512474585|
|[31.9480174211613...| 456.2577804643379|
+--------------------+------------

In [83]:
print("RMSE: {}".format(lr_model.evaluate(df_test).rootMeanSquaredError))
print("MSE: {}".format(lr_model.evaluate(df_test).meanSquaredError))

RMSE: 10.60468849128742
MSE: 112.45941799724386


### <a name='logreg'> Logistic regression </a>

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('titanic').getOrCreate()

In [3]:
df = spark.read.csv('9.1.titanic.csv', header=True, inferSchema=True)

In [4]:
df.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

In [5]:
df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [6]:
df_final = df.select(['Survived', 'Pclass', 'Sex', 'Age', 'SibSp','Parch', 'Fare', 'Embarked']).na.drop()

In [7]:
df_final.show()

+--------+------+------+----+-----+-----+-------+--------+
|Survived|Pclass|   Sex| Age|SibSp|Parch|   Fare|Embarked|
+--------+------+------+----+-----+-----+-------+--------+
|       0|     3|  male|22.0|    1|    0|   7.25|       S|
|       1|     1|female|38.0|    1|    0|71.2833|       C|
|       1|     3|female|26.0|    0|    0|  7.925|       S|
|       1|     1|female|35.0|    1|    0|   53.1|       S|
|       0|     3|  male|35.0|    0|    0|   8.05|       S|
|       0|     1|  male|54.0|    0|    0|51.8625|       S|
|       0|     3|  male| 2.0|    3|    1| 21.075|       S|
|       1|     3|female|27.0|    0|    2|11.1333|       S|
|       1|     2|female|14.0|    1|    0|30.0708|       C|
|       1|     3|female| 4.0|    1|    1|   16.7|       S|
|       1|     1|female|58.0|    0|    0|  26.55|       S|
|       0|     3|  male|20.0|    0|    0|   8.05|       S|
|       0|     3|  male|39.0|    1|    5| 31.275|       S|
|       0|     3|female|14.0|    0|    0| 7.8542|       

#### Working with Categorical Columns

In [8]:
from pyspark.ml.feature import (VectorAssembler, VectorIndexer, StringIndexer, OneHotEncoder)

In [9]:
sex_indexer = StringIndexer(inputCol='Sex', outputCol='SexIndex')
sex_encoder = OneHotEncoder(inputCol='SexIndex', outputCol='SexVec')

In [10]:
embark_indexer = StringIndexer(inputCol='Embarked',outputCol='EmbarkIndex')
embark_encoder = OneHotEncoder(inputCol='EmbarkIndex',outputCol='EmbarkVec')

In [11]:
assembler = VectorAssembler(inputCols=['Pclass', 'SexVec', 'Age', 'SibSp',
                                       'Parch', 'Fare', 'EmbarkVec'],
                            outputCol='features')

In [12]:
from pyspark.ml.classification import LogisticRegression

#### <a name='pipelines'> Pipelines </a>

A pipeline chains multiple Transformers and Estimators together to specify an ML workflow. 

* Transformers: an algorithm which can transform one DataFrame into another DataFrame. E.g., an ML model is a Transformer which transforms a DataFrame with features into a DataFrame with predictions
* Estimators: an algorithm which can be fit on a DataFrame to produce a Transformer. E.g., a learning algorithm is an Estimator which trains on a DataFrame and produces a model.

In [15]:
from pyspark.ml import Pipeline

In [14]:
log_reg_titanic = LogisticRegression(featuresCol = 'features', labelCol = 'Survived')

In [16]:
pipeline = Pipeline(stages = [sex_indexer, embark_indexer,
                              sex_encoder, embark_encoder,
                             assembler, log_reg_titanic])

In [17]:
df_train, df_test = df_final.randomSplit([0.7, 0.3])

In [18]:
log_reg_model = pipeline.fit(df_train)

In [19]:
results = log_reg_model.transform(df_test)

In [21]:
results.show()

+--------+------+------+----+-----+-----+-------+--------+--------+-----------+-------------+-------------+--------------------+--------------------+--------------------+----------+
|Survived|Pclass|   Sex| Age|SibSp|Parch|   Fare|Embarked|SexIndex|EmbarkIndex|       SexVec|    EmbarkVec|            features|       rawPrediction|         probability|prediction|
+--------+------+------+----+-----+-----+-------+--------+--------+-----------+-------------+-------------+--------------------+--------------------+--------------------+----------+
|       0|     1|  male|19.0|    1|    0|   53.1|       S|     0.0|        0.0|(1,[0],[1.0])|(2,[0],[1.0])|[1.0,1.0,19.0,1.0...|[-0.2996241574486...|[0.42564936359351...|       1.0|
|       0|     1|  male|29.0|    0|    0|   30.0|       S|     0.0|        0.0|(1,[0],[1.0])|(2,[0],[1.0])|[1.0,1.0,29.0,0.0...|[0.03709923300845...|[0.50927374461425...|       0.0|
|       0|     1|  male|31.0|    1|    0|   52.0|       S|     0.0|        0.0|(1,[0],[1.0

#### Evaluating the test data results

In [20]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [22]:
eval = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='Survived')

In [24]:
eval

BinaryClassificationEvaluator_4658e6beaa61

In [25]:
eval.evaluate(results) # Displays the AUC

0.7980922098569158

### <a name='treemethods'> Tree based methods </a>

We will be using a college dataset to try to classify colleges as Private or Public based off these features:

    Private A factor with levels No and Yes indicating private or public university
    Apps Number of applications received
    Accept Number of applications accepted
    Enroll Number of new students enrolled
    Top10perc Pct. new students from top 10% of H.S. class
    Top25perc Pct. new students from top 25% of H.S. class
    F.Undergrad Number of fulltime undergraduates
    P.Undergrad Number of parttime undergraduates
    Outstate Out-of-state tuition
    Room.Board Room and board costs
    Books Estimated book costs
    Personal Estimated personal spending
    PhD Pct. of faculty with Ph.D.’s
    Terminal Pct. of faculty with terminal degree
    S.F.Ratio Student/faculty ratio
    perc.alumni Pct. alumni who donate
    Expend Instructional expenditure per student
    Grad.Rate Graduation rate

In [27]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('trees').getOrCreate()

In [28]:
df = spark.read.csv('10.1.College.csv', header=True, inferSchema=True)

In [30]:
df.head(2)

[Row(School='Abilene Christian University', Private='Yes', Apps=1660, Accept=1232, Enroll=721, Top10perc=23, Top25perc=52, F_Undergrad=2885, P_Undergrad=537, Outstate=7440, Room_Board=3300, Books=450, Personal=2200, PhD=70, Terminal=78, S_F_Ratio=18.1, perc_alumni=12, Expend=7041, Grad_Rate=60),
 Row(School='Adelphi University', Private='Yes', Apps=2186, Accept=1924, Enroll=512, Top10perc=16, Top25perc=29, F_Undergrad=2683, P_Undergrad=1227, Outstate=12280, Room_Board=6450, Books=750, Personal=1500, PhD=29, Terminal=30, S_F_Ratio=12.2, perc_alumni=16, Expend=10527, Grad_Rate=56)]

In [32]:
df.printSchema()

root
 |-- School: string (nullable = true)
 |-- Private: string (nullable = true)
 |-- Apps: integer (nullable = true)
 |-- Accept: integer (nullable = true)
 |-- Enroll: integer (nullable = true)
 |-- Top10perc: integer (nullable = true)
 |-- Top25perc: integer (nullable = true)
 |-- F_Undergrad: integer (nullable = true)
 |-- P_Undergrad: integer (nullable = true)
 |-- Outstate: integer (nullable = true)
 |-- Room_Board: integer (nullable = true)
 |-- Books: integer (nullable = true)
 |-- Personal: integer (nullable = true)
 |-- PhD: integer (nullable = true)
 |-- Terminal: integer (nullable = true)
 |-- S_F_Ratio: double (nullable = true)
 |-- perc_alumni: integer (nullable = true)
 |-- Expend: integer (nullable = true)
 |-- Grad_Rate: integer (nullable = true)



In [33]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [34]:
assembler = VectorAssembler(inputCols = ['Apps', 'Accept', 'Enroll', 'Top10perc', 
                                         'Top25perc', 'F_Undergrad', 'P_Undergrad', 'Outstate',
                                        'Room_Board', 'Books', 'Personal', 'PhD',
                                        'Terminal', 'S_F_Ratio', 'perc_alumni', 'Expend',
                                        'Grad_Rate'],
                           outputCol = 'features')

In [35]:
df_output = assembler.transform(df)

Private column is 'Yes' or 'No', make it 0 or 1 

In [36]:
from pyspark.ml.feature import StringIndexer

In [37]:
indexer = StringIndexer(inputCol = 'Private', outputCol='PrivateIndex')

In [38]:
df_output_2 = indexer.fit(df_output).transform(df_output)

In [39]:
df_output_2.head(2)

[Row(School='Abilene Christian University', Private='Yes', Apps=1660, Accept=1232, Enroll=721, Top10perc=23, Top25perc=52, F_Undergrad=2885, P_Undergrad=537, Outstate=7440, Room_Board=3300, Books=450, Personal=2200, PhD=70, Terminal=78, S_F_Ratio=18.1, perc_alumni=12, Expend=7041, Grad_Rate=60, features=DenseVector([1660.0, 1232.0, 721.0, 23.0, 52.0, 2885.0, 537.0, 7440.0, 3300.0, 450.0, 2200.0, 70.0, 78.0, 18.1, 12.0, 7041.0, 60.0]), PrivateIndex=0.0),
 Row(School='Adelphi University', Private='Yes', Apps=2186, Accept=1924, Enroll=512, Top10perc=16, Top25perc=29, F_Undergrad=2683, P_Undergrad=1227, Outstate=12280, Room_Board=6450, Books=750, Personal=1500, PhD=29, Terminal=30, S_F_Ratio=12.2, perc_alumni=16, Expend=10527, Grad_Rate=56, features=DenseVector([2186.0, 1924.0, 512.0, 16.0, 29.0, 2683.0, 1227.0, 12280.0, 6450.0, 750.0, 1500.0, 29.0, 30.0, 12.2, 16.0, 10527.0, 56.0]), PrivateIndex=0.0)]

In [40]:
df_output_2.columns

['School',
 'Private',
 'Apps',
 'Accept',
 'Enroll',
 'Top10perc',
 'Top25perc',
 'F_Undergrad',
 'P_Undergrad',
 'Outstate',
 'Room_Board',
 'Books',
 'Personal',
 'PhD',
 'Terminal',
 'S_F_Ratio',
 'perc_alumni',
 'Expend',
 'Grad_Rate',
 'features',
 'PrivateIndex']

In [41]:
df_final = df_output_2.select(['features', 'PrivateIndex'])

In [42]:
df_train, df_test = df_final.randomSplit([0.7, 0.3])

**Importing the 3 classifiers**

In [43]:
from pyspark.ml.classification import DecisionTreeClassifier, GBTClassifier, RandomForestClassifier

In [44]:
from pyspark.ml import Pipeline

#### <a name='dt'> Decision tree </a>

In [48]:
dtc = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'PrivateIndex')

In [49]:
dtc_model = dtc.fit(df_final)

#### <a name='rf'> Random forest </a>

In [50]:
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'PrivateIndex')

In [51]:
rf_model = rf.fit(df_final)

#### <a name='gbm'> Gradient boosted tree classifier </a>

In [52]:
gbt = GBTClassifier(featuresCol = 'features', labelCol='PrivateIndex')

In [53]:
gbt_model = gbt.fit(df_final)

#### Model comparison

In [57]:
dtc_predictions = dtc_model.transform(df_test)
rfc_predictions = rf_model.transform(df_test)
gbt_predictions = gbt_model.transform(df_test)

In [59]:
dtc_predictions.show()

+--------------------+------------+-------------+--------------------+----------+
|            features|PrivateIndex|rawPrediction|         probability|prediction|
+--------------------+------------+-------------+--------------------+----------+
|[141.0,118.0,55.0...|         0.0|  [346.0,0.0]|           [1.0,0.0]|       0.0|
|[174.0,146.0,88.0...|         0.0|  [346.0,0.0]|           [1.0,0.0]|       0.0|
|[193.0,146.0,55.0...|         0.0|   [11.0,0.0]|           [1.0,0.0]|       0.0|
|[202.0,184.0,122....|         0.0|  [346.0,0.0]|           [1.0,0.0]|       0.0|
|[212.0,197.0,91.0...|         0.0|  [346.0,0.0]|           [1.0,0.0]|       0.0|
|[232.0,216.0,106....|         0.0|   [31.0,1.0]|   [0.96875,0.03125]|       0.0|
|[233.0,233.0,153....|         1.0|    [4.0,1.0]|           [0.8,0.2]|       0.0|
|[244.0,198.0,82.0...|         0.0|  [346.0,0.0]|           [1.0,0.0]|       0.0|
|[245.0,208.0,125....|         0.0|  [346.0,0.0]|           [1.0,0.0]|       0.0|
|[257.0,183.0,10

#### Evaluation metrics

In [58]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [60]:
eval = MulticlassClassificationEvaluator(labelCol='PrivateIndex', predictionCol='prediction', metricName='accuracy')

In [63]:
dtc_acc = eval.evaluate(dtc_predictions)
rf_acc = eval.evaluate(rfc_predictions)
gbt_acc = eval.evaluate(gbt_predictions)

In [66]:
print('Results')
print("_"*80)
print('A single decision tree had an accuracy of: {0:2.2f}%'.format(dtc_acc*100))
print("_"*80)
print('A random forest ensemble had an accuracy of: {0:2.2f}%'.format(rf_acc*100))
print("_"*80)
print('An ensemble using GBT had an accuracy of: {0:2.2f}%'.format(gbt_acc*100))

Results
________________________________________________________________________________
A single decision tree had an accuracy of: 97.88%
________________________________________________________________________________
A random forest ensemble had an accuracy of: 97.88%
________________________________________________________________________________
An ensemble using GBT had an accuracy of: 100.00%


For feature importance:

`rf_model.feature_importance`

#### <a name='kmeans'> K-means clustering </a>

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('kmeans').getOrCreate()

In [4]:
df = spark.read.format('libsvm').load('11.1.sample_kmeans_data.txt')

In [5]:
df.printSchema()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)



In [6]:
from pyspark.ml.clustering import KMeans

In [10]:
# Trains a k-means model
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(df)

In [11]:
# Evaluate cluster by computing within group sums of squares 
wsse = model.computeCost(df)

In [12]:
wsse

0.11999999999994547

In [13]:
model.clusterCenters()

[array([0.1, 0.1, 0.1]), array([9.1, 9.1, 9.1])]

Working with the seeds dataset 
https://archive.ics.uci.edu/ml/datasets/seeds.

The examined group comprised kernels belonging to 3 different varieties of wheat: Kama, Rosa and Canadian, 70 elements each, randomly selected for the experiment.

The data set can be used for the tasks of classification and cluster analysis.


Attribute Information:

To construct the data, seven geometric parameters of wheat kernels were measured: 
1. area A, 
2. perimeter P, 
3. compactness C = 4*pi*A/P^2, 
4. length of kernel, 
5. width of kernel, 
6. asymmetry coefficient 
7. length of kernel groove. 
All of these parameters were real-valued continuous.

Let's see if we can cluster them in to 3 groups with K-means!\

In [14]:
df = spark.read.csv('11.2.seeds_dataset.csv', header=True, inferSchema=True)

In [15]:
df.head()

Row(area=15.26, perimeter=14.84, compactness=0.871, length_of_kernel=5.763, width_of_kernel=3.312, asymmetry_coefficient=2.221, length_of_groove=5.22)

In [16]:
df.printSchema()

root
 |-- area: double (nullable = true)
 |-- perimeter: double (nullable = true)
 |-- compactness: double (nullable = true)
 |-- length_of_kernel: double (nullable = true)
 |-- width_of_kernel: double (nullable = true)
 |-- asymmetry_coefficient: double (nullable = true)
 |-- length_of_groove: double (nullable = true)



In [17]:
df.describe().show()

+-------+------------------+------------------+--------------------+-------------------+------------------+---------------------+-------------------+
|summary|              area|         perimeter|         compactness|   length_of_kernel|   width_of_kernel|asymmetry_coefficient|   length_of_groove|
+-------+------------------+------------------+--------------------+-------------------+------------------+---------------------+-------------------+
|  count|               210|               210|                 210|                210|               210|                  210|                210|
|   mean|14.847523809523816|14.559285714285718|  0.8709985714285714|  5.628533333333335| 3.258604761904762|   3.7001999999999997|  5.408071428571429|
| stddev|2.9096994306873647|1.3059587265640225|0.023629416583846364|0.44306347772644983|0.3777144449065867|   1.5035589702547392|0.49148049910240543|
|    min|             10.59|             12.41|              0.8081|              4.899|            

#### Format the data

In [19]:
from pyspark.ml.linalg import Vectors 
from pyspark.ml.feature import VectorAssembler

In [20]:
vector_assembler = VectorAssembler(inputCols = df.columns, outputCol = 'features') #Since all columns go into the vectorassembler

In [28]:
df_final = vector_assembler.transform(df)

In [22]:
df_final.printSchema()

root
 |-- area: double (nullable = true)
 |-- perimeter: double (nullable = true)
 |-- compactness: double (nullable = true)
 |-- length_of_kernel: double (nullable = true)
 |-- width_of_kernel: double (nullable = true)
 |-- asymmetry_coefficient: double (nullable = true)
 |-- length_of_groove: double (nullable = true)
 |-- features: vector (nullable = true)



#### Scaling the data 

In [23]:
from pyspark.ml.feature import StandardScaler

In [29]:
scaler = StandardScaler(inputCol = 'features', outputCol = 'scaledFeatures', withStd=True, withMean = False)

In [30]:
scaler_Model = scaler.fit(df_final)

In [31]:
df_final = scaler_Model.transform(df_final)

#### Train and evaluate the model

In [32]:
# Train the model 
kmeans = KMeans(featuresCol = 'scaledFeatures', k=3)
kmeans_model = kmeans.fit(df_final)

In [33]:
# Evaluate clustering by computing Within Set Sum of Squared Errors.
wssse = kmeans_model.computeCost(df_final)
print("Within Set Sum of Squared Errors = " + str(wssse))

Within Set Sum of Squared Errors = 429.07559671506715


In [34]:
# Shows the result.
centers = kmeans_model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

Cluster Centers: 
[ 4.87257659 10.88120146 37.27692543 12.3410157   8.55443412  1.81649011
 10.32998598]
[ 4.06105916 10.13979506 35.80536984 11.82133095  7.50395937  3.27184732
 10.42126018]
[ 6.31670546 12.37109759 37.39491396 13.91155062  9.748067    2.39849968
 12.2661748 ]


In [36]:
kmeans_model.transform(df_final).select('prediction').show()

+----------+
|prediction|
+----------+
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         2|
|         2|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         1|
+----------+
only showing top 20 rows



### <a name='recsys'> Recommender system </a>

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('rec').getOrCreate()

In [3]:
df = spark.read.csv('12.1.movielens_ratings.csv', header=True, inferSchema=True)

In [4]:
df.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- userId: integer (nullable = true)



In [5]:
df.head(4)

[Row(movieId=2, rating=3.0, userId=0),
 Row(movieId=3, rating=1.0, userId=0),
 Row(movieId=5, rating=2.0, userId=0),
 Row(movieId=9, rating=4.0, userId=0)]

Row 1 meaning: User 0 watched movie 2 and rated it 3

In [7]:
df.describe().show()

+-------+------------------+------------------+------------------+
|summary|           movieId|            rating|            userId|
+-------+------------------+------------------+------------------+
|  count|              1501|              1501|              1501|
|   mean| 49.40572951365756|1.7741505662891406|14.383744170552964|
| stddev|28.937034065088994| 1.187276166124803| 8.591040424293272|
|    min|                 0|               1.0|                 0|
|    max|                99|               5.0|                29|
+-------+------------------+------------------+------------------+



#### Train-test split 

In [8]:
df_train, df_test = df.randomSplit([0.8, 0.2])

#### ALS Model

In [9]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [10]:
als = ALS(maxIter=5, regParam=0.01, userCol='userId', itemCol='movieId', ratingCol='rating')
model = als.fit(df_train)

In [11]:
# Evaluate the model by computing RMSE on test data
predictions = model.transform(df_test)

In [12]:
predictions.head()

Row(movieId=31, rating=1.0, userId=27, prediction=1.84310781955719)

In [13]:
evaluator = RegressionEvaluator(predictionCol = 'prediction', labelCol='rating', metricName='rmse')
rmse = evaluator.evaluate(predictions)

In [14]:
print('Root mean square error: %.2f'%(rmse))

Root mean square error: 1.73


To recommend an item to a user: 

In [15]:
user_11 = df_test.filter(df_test['userId']==11).select(['movieId', 'userId'])

In [19]:
model.transform(user_11).orderBy('prediction', ascending=False).show()

+-------+------+----------+
|movieId|userId|prediction|
+-------+------+----------+
|     82|    11| 2.6380394|
|     11|    11| 1.8249062|
|     13|    11| 1.7102478|
|     79|    11|-0.4021929|
|     51|    11|-0.8936707|
|     72|    11|-1.0260999|
|     69|    11|-1.7409068|
+-------+------+----------+



Recommend movie 82

### <a name='nlp'> NLP </a>

In [21]:
from pyspark.sql import SparkSession

In [23]:
spark = SparkSession.builder.appName('nlp').getOrCreate()

In [24]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer

In [25]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

In [39]:
sen_df = spark.createDataFrame([
    (0, 'Hi I heard about Spark'),
    (1, 'I wish Java could use case classes'),
    (2, 'Logistic,regression,models,are,neat')
],['id', 'sentence']) #Allows us to pass any list of tuples where actual entries are the rows 
# First argument is list of entries, 2nd entry is column names

In [27]:
sen_df.show()

+---+--------------------+
| id|            sentence|
+---+--------------------+
|  0|Hi I heard about ...|
|  1|I wish Java could...|
|  2|Logistic regressi...|
+---+--------------------+



In [40]:
tokenizer = Tokenizer(inputCol = 'sentence', outputCol = 'words')

In [41]:
regextokenizer = RegexTokenizer(inputCol = 'sentence', outputCol = 'words',
                               pattern = '\\W')

In [42]:
count_tokens = udf(lambda words: len(words), IntegerType()) #user defined function

In [43]:
tokenized = tokenizer.transform(sen_df)

In [44]:
tokenized.show()

+---+--------------------+--------------------+
| id|            sentence|               words|
+---+--------------------+--------------------+
|  0|Hi I heard about ...|[hi, i, heard, ab...|
|  1|I wish Java could...|[i, wish, java, c...|
|  2|Logistic,regressi...|[logistic,regress...|
+---+--------------------+--------------------+



Third sentence not clear, since we created it with commas to begin with. Lets count. 

In [45]:
tokenized.withColumn('tokens', count_tokens(col('words'))).show()

+---+--------------------+--------------------+------+
| id|            sentence|               words|tokens|
+---+--------------------+--------------------+------+
|  0|Hi I heard about ...|[hi, i, heard, ab...|     5|
|  1|I wish Java could...|[i, wish, java, c...|     7|
|  2|Logistic,regressi...|[logistic,regress...|     1|
+---+--------------------+--------------------+------+



In [46]:
rg_tokenized = regextokenizer.transform(sen_df)

In [47]:
rg_tokenized.withColumn('tokens', count_tokens(col('words'))).show()

+---+--------------------+--------------------+------+
| id|            sentence|               words|tokens|
+---+--------------------+--------------------+------+
|  0|Hi I heard about ...|[hi, i, heard, ab...|     5|
|  1|I wish Java could...|[i, wish, java, c...|     7|
|  2|Logistic,regressi...|[logistic, regres...|     5|
+---+--------------------+--------------------+------+



Here it splits on whitespace as well as commas

**Stopwords removal**

In [48]:
from pyspark.ml.feature import StopWordsRemover

In [52]:
sentence_df = spark.createDataFrame([
    (0, ["I", "saw", "the", "red", "balloon"]),
    (1, ["Mary", "had", "a", "little", "lamb"])], ["id", "tokens"])

In [53]:
sentence_df.show()

+---+--------------------+
| id|              tokens|
+---+--------------------+
|  0|[I, saw, the, red...|
|  1|[Mary, had, a, li...|
+---+--------------------+



In [54]:
remover = StopWordsRemover(inputCol = 'tokens', outputCol='filtered')

In [55]:
remover.transform(sentence_df).show()

+---+--------------------+--------------------+
| id|              tokens|            filtered|
+---+--------------------+--------------------+
|  0|[I, saw, the, red...| [saw, red, balloon]|
|  1|[Mary, had, a, li...|[Mary, little, lamb]|
+---+--------------------+--------------------+



#### n-grams

An n-gram is a sequence of nn tokens (typically words) for some integer nn. The NGram class can be used to transform input features into nn-grams.

<p><code>NGram</code> takes as input a sequence of strings (e.g. the output of a <a href="ml-features.html#tokenizer">Tokenizer</a>).  The parameter <code>n</code> is used to determine the number of terms in each $n$-gram. The output will consist of a sequence of $n$-grams where each $n$-gram is represented by a space-delimited string of $n$ consecutive words.  If the input sequence contains fewer than <code>n</code> strings, no output is produced.</p>


In [57]:
from pyspark.ml.feature import NGram

In [58]:
wordDataFrame = spark.createDataFrame([
    (0, ["Hi", "I", "heard", "about", "Spark"]),
    (1, ["I", "wish", "Java", "could", "use", "case", "classes"]),
    (2, ["Logistic", "regression", "models", "are", "neat"])
], ["id", "words"])

ngram = NGram(n=2, inputCol="words", outputCol="ngrams")

ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.select("ngrams").show(truncate=False)

+------------------------------------------------------------------+
|ngrams                                                            |
+------------------------------------------------------------------+
|[Hi I, I heard, heard about, about Spark]                         |
|[I wish, wish Java, Java could, could use, use case, case classes]|
|[Logistic regression, regression models, models are, are neat]    |
+------------------------------------------------------------------+



_______
#### Feature Extractors
_______

<h2 id="tf-idf">TF-IDF</h2>

<p><a href="http://en.wikipedia.org/wiki/Tf%E2%80%93idf">Term frequency-inverse document frequency (TF-IDF)</a> 
is a feature vectorization method widely used in text mining to reflect the importance of a term 
to a document in the corpus. Denote a term by <code>$t$</code>, a document by  d , and the corpus by D.
Term frequency <code>$TF(t, d)$</code> is the number of times that term <code>$t$</code> appears in document <code>$d$</code>, while 
document frequency <code>$DF(t, D)$</code> is the number of documents that contains term <code>$t$</code>. If we only use 
term frequency to measure the importance, it is very easy to over-emphasize terms that appear very 
often but carry little information about the document, e.g. &#8220;a&#8221;, &#8220;the&#8221;, and &#8220;of&#8221;. If a term appears 
very often across the corpus, it means it doesn&#8217;t carry special information about a particular document.
Inverse document frequency is a numerical measure of how much information a term provides:

$$ IDF(t, D) = \log \frac{|D| + 1}{DF(t, D) + 1} $$

where |D| is the total number of documents in the corpus. Since logarithm is used, if a term 
appears in all documents, its IDF value becomes 0. Note that a smoothing term is applied to avoid 
dividing by zero for terms outside the corpus. The TF-IDF measure is simply the product of TF and IDF:
$$ TFIDF(t, d, D) = TF(t, d) \cdot IDF(t, D). $$


In [59]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

sentenceData = spark.createDataFrame([
    (0.0, "Hi I heard about Spark"),
    (0.0, "I wish Java could use case classes"),
    (1.0, "Logistic regression models are neat")
], ["label", "sentence"])

sentenceData.show()

+-----+--------------------+
|label|            sentence|
+-----+--------------------+
|  0.0|Hi I heard about ...|
|  0.0|I wish Java could...|
|  1.0|Logistic regressi...|
+-----+--------------------+



In [61]:
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)
wordsData.show(truncate=False)

+-----+-----------------------------------+------------------------------------------+
|label|sentence                           |words                                     |
+-----+-----------------------------------+------------------------------------------+
|0.0  |Hi I heard about Spark             |[hi, i, heard, about, spark]              |
|0.0  |I wish Java could use case classes |[i, wish, java, could, use, case, classes]|
|1.0  |Logistic regression models are neat|[logistic, regression, models, are, neat] |
+-----+-----------------------------------+------------------------------------------+



In [63]:
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

rescaledData.select("label", "features").show(truncate=False)

+-----+----------------------------------------------------------------------------------------------------------------------+
|label|features                                                                                                              |
+-----+----------------------------------------------------------------------------------------------------------------------+
|0.0  |(20,[0,5,9,17],[0.6931471805599453,0.6931471805599453,0.28768207245178085,1.3862943611198906])                        |
|0.0  |(20,[2,7,9,13,15],[0.6931471805599453,0.6931471805599453,0.8630462173553426,0.28768207245178085,0.28768207245178085]) |
|1.0  |(20,[4,6,13,15,18],[0.6931471805599453,0.6931471805599453,0.28768207245178085,0.28768207245178085,0.6931471805599453])|
+-----+----------------------------------------------------------------------------------------------------------------------+



#### CountVectorizer
CountVectorizer and CountVectorizerModel aim to help convert a collection of text documents to vectors of token counts. When an a-priori dictionary is not available, CountVectorizer can be used as an Estimator to extract the vocabulary, and generates a CountVectorizerModel. The model produces sparse representations for the documents over the vocabulary, which can then be passed to other algorithms like LDA.

During the fitting process, CountVectorizer will select the top vocabSize words ordered by term frequency across the corpus. An optional parameter minDF also affects the fitting process by specifying the minimum number (or fraction if < 1.0) of documents a term must appear in to be included in the vocabulary. Another optional binary toggle parameter controls the output vector. If set to true all nonzero counts are set to 1. This is especially useful for discrete probabilistic models that model binary, rather than integer, counts.

In [64]:
from pyspark.ml.feature import CountVectorizer

# Input data: Each row is a bag of words with a ID.
df = spark.createDataFrame([
    (0, "a b c".split(" ")),
    (1, "a b b c a".split(" "))
], ["id", "words"])
df.show()

+---+---------------+
| id|          words|
+---+---------------+
|  0|      [a, b, c]|
|  1|[a, b, b, c, a]|
+---+---------------+



In [65]:
# fit a CountVectorizerModel from the corpus.
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0) #minDf sepcifies min documents a term must appear in 

model = cv.fit(df)

result = model.transform(df)
result.show(truncate=False)

+---+---------------+-------------------------+
|id |words          |features                 |
+---+---------------+-------------------------+
|0  |[a, b, c]      |(3,[0,1,2],[1.0,1.0,1.0])|
|1  |[a, b, b, c, a]|(3,[0,1,2],[2.0,2.0,1.0])|
+---+---------------+-------------------------+



#### NLP Code Along

For this code along we will build a spam filter! We'll use the various NLP tools we learned about as well as a new classifier, Naive Bayes.

We'll use a classic dataset for this - UCI Repository SMS Spam Detection: https://archive.ics.uci.edu/ml/datasets/SMS+Spam+Collection

In [66]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('nlp').getOrCreate()
data = spark.read.csv("13.1.smsspamcollection/SMSSpamCollection",inferSchema=True,sep='\t')

In [67]:
data.show() # No headers 

+----+--------------------+
| _c0|                 _c1|
+----+--------------------+
| ham|Go until jurong p...|
| ham|Ok lar... Joking ...|
|spam|Free entry in 2 a...|
| ham|U dun say so earl...|
| ham|Nah I don't think...|
|spam|FreeMsg Hey there...|
| ham|Even my brother i...|
| ham|As per your reque...|
|spam|WINNER!! As a val...|
|spam|Had your mobile 1...|
| ham|I'm gonna be home...|
|spam|SIX chances to wi...|
|spam|URGENT! You have ...|
| ham|I've been searchi...|
| ham|I HAVE A DATE ON ...|
|spam|XXXMobileMovieClu...|
| ham|Oh k...i'm watchi...|
| ham|Eh u remember how...|
| ham|Fine if thats th...|
|spam|England v Macedon...|
+----+--------------------+
only showing top 20 rows



In [68]:
data = data.withColumnRenamed('_c0','class').withColumnRenamed('_c1','text')

In [69]:
data.show()

+-----+--------------------+
|class|                text|
+-----+--------------------+
|  ham|Go until jurong p...|
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
|  ham|U dun say so earl...|
|  ham|Nah I don't think...|
| spam|FreeMsg Hey there...|
|  ham|Even my brother i...|
|  ham|As per your reque...|
| spam|WINNER!! As a val...|
| spam|Had your mobile 1...|
|  ham|I'm gonna be home...|
| spam|SIX chances to wi...|
| spam|URGENT! You have ...|
|  ham|I've been searchi...|
|  ham|I HAVE A DATE ON ...|
| spam|XXXMobileMovieClu...|
|  ham|Oh k...i'm watchi...|
|  ham|Eh u remember how...|
|  ham|Fine if thats th...|
| spam|England v Macedon...|
+-----+--------------------+
only showing top 20 rows



**Create a new length feature**

In [71]:
from pyspark.sql.functions import length

In [72]:
data = data.withColumn('length',length(data['text']))

In [73]:
data.show()

+-----+--------------------+------+
|class|                text|length|
+-----+--------------------+------+
|  ham|Go until jurong p...|   111|
|  ham|Ok lar... Joking ...|    29|
| spam|Free entry in 2 a...|   155|
|  ham|U dun say so earl...|    49|
|  ham|Nah I don't think...|    61|
| spam|FreeMsg Hey there...|   147|
|  ham|Even my brother i...|    77|
|  ham|As per your reque...|   160|
| spam|WINNER!! As a val...|   157|
| spam|Had your mobile 1...|   154|
|  ham|I'm gonna be home...|   109|
| spam|SIX chances to wi...|   136|
| spam|URGENT! You have ...|   155|
|  ham|I've been searchi...|   196|
|  ham|I HAVE A DATE ON ...|    35|
| spam|XXXMobileMovieClu...|   149|
|  ham|Oh k...i'm watchi...|    26|
|  ham|Eh u remember how...|    81|
|  ham|Fine if thats th...|    56|
| spam|England v Macedon...|   155|
+-----+--------------------+------+
only showing top 20 rows



In [74]:
# Pretty Clear Difference in avg length of ham vs spam messages 
data.groupby('class').mean().show()

+-----+-----------------+
|class|      avg(length)|
+-----+-----------------+
|  ham|71.45431945307645|
| spam|138.6706827309237|
+-----+-----------------+



#### Feature Transformations

In [75]:
from pyspark.ml.feature import Tokenizer,StopWordsRemover, CountVectorizer,IDF,StringIndexer

tokenizer = Tokenizer(inputCol="text", outputCol="token_text")

#Take list of tokens and remove common stopwords 
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')

# Take list of non-stopword words and extract vocabulary
count_vec = CountVectorizer(inputCol='stop_tokens',outputCol='c_vec')

idf = IDF(inputCol="c_vec", outputCol="tf_idf")

In [76]:
# Convert ham-spam to 0-1s integers
ham_spam_to_num = StringIndexer(inputCol='class',outputCol='label')

In [77]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

In [78]:
clean_up = VectorAssembler(inputCols=['tf_idf','length'],outputCol='features')

#### The Model

We'll use Naive Bayes, but feel free to play around with this choice!

In [79]:
from pyspark.ml.classification import NaiveBayes

In [80]:
# Use defaults
nb = NaiveBayes()

In [81]:
#### Pipeline

In [82]:
from pyspark.ml import Pipeline

In [83]:
data_prep_pipe = Pipeline(stages=[ham_spam_to_num,tokenizer,stopremove,count_vec,idf,clean_up])

In [84]:
cleaner = data_prep_pipe.fit(data)

In [85]:
clean_data = cleaner.transform(data)

In [88]:
clean_data.show(5)

+-----+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|class|                text|length|label|          token_text|         stop_tokens|               c_vec|              tf_idf|            features|
+-----+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|  ham|Go until jurong p...|   111|  0.0|[go, until, juron...|[go, jurong, poin...|(13423,[7,11,31,6...|(13423,[7,11,31,6...|(13424,[7,11,31,6...|
|  ham|Ok lar... Joking ...|    29|  0.0|[ok, lar..., joki...|[ok, lar..., joki...|(13423,[0,24,297,...|(13423,[0,24,297,...|(13424,[0,24,297,...|
| spam|Free entry in 2 a...|   155|  1.0|[free, entry, in,...|[free, entry, 2, ...|(13423,[2,13,19,3...|(13423,[2,13,19,3...|(13424,[2,13,19,3...|
|  ham|U dun say so earl...|    49|  0.0|[u, dun, say, so,...|[u, dun, say, ear...|(13423,[0,70,80,1...|(13423,[0,70,8

#### Training and Evaluation

In [90]:
clean_data = clean_data.select(['label','features'])
clean_data.show(5)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(13424,[7,11,31,6...|
|  0.0|(13424,[0,24,297,...|
|  1.0|(13424,[2,13,19,3...|
|  0.0|(13424,[0,70,80,1...|
|  0.0|(13424,[36,134,31...|
+-----+--------------------+
only showing top 5 rows



In [91]:
(training,testing) = clean_data.randomSplit([0.7,0.3])

In [92]:
spam_predictor = nb.fit(training)

In [93]:
data.printSchema()

root
 |-- class: string (nullable = true)
 |-- text: string (nullable = true)
 |-- length: integer (nullable = true)



In [94]:
test_results = spam_predictor.transform(testing)

In [95]:
test_results.show(5)

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(13424,[0,1,2,13,...|[-609.33361438322...|[0.99999999999999...|       0.0|
|  0.0|(13424,[0,1,3,9,1...|[-569.62548887717...|[1.0,5.6693468385...|       0.0|
|  0.0|(13424,[0,1,14,18...|[-1377.0247368010...|[1.0,6.6716179396...|       0.0|
|  0.0|(13424,[0,1,14,31...|[-219.22736498980...|[1.0,2.8468641014...|       0.0|
|  0.0|(13424,[0,1,27,35...|[-1476.9344966802...|[0.99999999999999...|       0.0|
+-----+--------------------+--------------------+--------------------+----------+
only showing top 5 rows



In [96]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [98]:
import numpy as np

In [100]:
acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting spam was: {}".format(np.round(acc,2)))

Accuracy of model at predicting spam was: 0.92


## <a name='streaming'> Spark streaming </a>

* Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. 
* Data can be ingested from many sources like Kafka, Flume, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window.
* Finally, processed data can be pushed out to filesystems, databases, and live dashboards. 
* In fact, you can apply Spark’s machine learning and graph processing algorithms on data streams.

<img src='http://spark.apache.org/docs/latest/img/streaming-arch.png'/>


There are SparkSQL modules for streaming: 

http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=streaming#module-pyspark.sql.streaming

But they are all still listed as experimental, so instead of showing you somethign that might break in the future, we'll stick to the RDD methods (which is what the documentation also currently shows for streaming).

Internally, it works as follows: 

<img src='http://spark.apache.org/docs/latest/img/streaming-flow.png'/>
* Spark Streaming receives live input data streams and divides the data into batches, which are then processed by the Spark engine to generate the final stream of results in batches.
* Because we will be using Spark Streaming and not structured streaming (still experimental and in Alpha) we need to use some older “RDD” syntax.
* This stems from using a <font color='red'> **SparkContext** </font> instead of a SparkSession.
* We will be building a very simple application that connects to a local stream of data (an open terminal) through a socket connection. It will then count the words for each line that we type in.


#### Steps for streaming 
1. Create a SparkContext
* Create a StreamingContext
* Create a Socket Text Stream
* Read in the lines as a “DStream”

#### The steps for working with the data:
1. Split the input line into a list of words
* Map each word to a tuple: (word,1)
* Then group (reduce)  the tuples by the word (key) and sum up the second argument (the number one)
* That will then provide us with a word count in the form (‘hello’,3) for each line.As a quick note, the RDD syntax relies heavily on lambda expressions, which are just quick anonymous functions.


### Note on  Streaming
Streaming is something that is rapidly advancing and changing fast, there are multipl enew libraries every year, new and different services always popping up, and what is in this notebook may or may not apply to you. Maybe your looking for something specific on Kafka, or maybe you are looking for streaming about twitter, in which case Spark might be overkill for what you really want. Realistically speaking each situation is going to require a customized solution and this course is never going to be able to supply a one size fits all solution. Because of this, I wanted to point out some great resources for Python and Spark StreamingL

* [The Official Documentation is great. This should be your first go to.](http://spark.apache.org/docs/latest/streaming-programming-guide.html#spark-streaming-programming-guide)

* [Fantastic Guide to Spark Streaming with Kafka](https://www.rittmanmead.com/blog/2017/01/getting-started-with-spark-streaming-with-python-and-kafka/)

* [Another Spark Streaming Example with Geo Plotting](http://nbviewer.jupyter.org/github/ibm-cds-labs/spark.samples/blob/master/notebook/DashDB%20Twitter%20Car%202015%20Python%20Notebook.ipynb)

<font color='red'> Example only on Linux .</font>
    
Refer to notebook 'Introduction to Spark Streaming' 