<a href="https://colab.research.google.com/github/markumreed/colab_pyspark/blob/main/pyspark_in_colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Preamble

The following three cells must be ran in order to use PySpark in Google Colab. 

### Spark 3.0.2

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-3.0.2/spark-3.0.2-bin-hadoop2.7.tgz

In [2]:
!tar xf spark-3.0.2-bin-hadoop2.7.tgz
!pip install -q findspark

In [23]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.2-bin-hadoop2.7"

In [24]:
import findspark
findspark.init()

# Spark DataFrame Basics

Spark DataFrames allow for easy handling of large datasets. 

* Easy syntax
* Ability to use SQL directly in the dataframe
* Operations are automatically distributed across RDDs

## Create a DataFrame


In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName("pyspark_basics").getOrCreate()

In [None]:
%%writefile user_simple.json
{"name":"Bob"}
{"name":"Jim", "age":40}
{"name":"Mary", "age": 24}

Writing user_simple.json


In [None]:
df = spark.read.json("user_simple.json")

In [None]:
df

DataFrame[age: bigint, name: string]

## Show DataFrame


In [None]:
df.show()

+----+----+
| age|name|
+----+----+
|null| Bob|
|  40| Jim|
|  24|Mary|
+----+----+



In [None]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [None]:
df.columns

['age', 'name']

In [None]:
df.describe()

DataFrame[summary: string, age: string, name: string]

In [None]:
df.describe().show()

+-------+------------------+----+
|summary|               age|name|
+-------+------------------+----+
|  count|                 2|   3|
|   mean|              32.0|null|
| stddev|11.313708498984761|null|
|    min|                24| Bob|
|    max|                40|Mary|
+-------+------------------+----+



## Specifying Schema Structure

- Some data types make it easier to infer schema. 

- Often have to set the schema yourself

- Spark has tools to help specify the structure

Next we need to create the list of Structure fields
  * :param name: string, name of the field.
  * :param dataType: :class:`DataType` of the field.
  * :param nullable: boolean, whether the field can be null (None) 

In [None]:
from pyspark.sql.types import StructField, StringType, IntegerType, StructType

In [None]:
data_schema = [StructField("age", IntegerType(), True), StructField("name",StringType(), True)]

In [None]:
final_struc = StructType(fields=data_schema)

In [None]:
df = spark.read.json("user_simple.json", schema=final_struc)

In [None]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)



In [None]:
df.show()

+----+----+
| age|name|
+----+----+
|null| Bob|
|  40| Jim|
|  24|Mary|
+----+----+



## Grab Data

In [None]:
df['age']

Column<b'age'>

In [None]:
type(df['age'])

pyspark.sql.column.Column

In [None]:
df.select("age")

DataFrame[age: int]

In [None]:
type(df.select("age"))

pyspark.sql.dataframe.DataFrame

In [None]:
df.select("age").show()

+----+
| age|
+----+
|null|
|  40|
|  24|
+----+



In [None]:
df.head(2)

[Row(age=None, name='Bob'), Row(age=40, name='Jim')]

In [None]:
df.select(["name","age"])

DataFrame[name: string, age: int]

In [None]:
df.select(["name","age"]).show()

+----+----+
|name| age|
+----+----+
| Bob|null|
| Jim|  40|
|Mary|  24|
+----+----+



## Create New Columns

In [None]:
df.withColumn("newAge", df['age']).show()

+----+----+------+
| age|name|newAge|
+----+----+------+
|null| Bob|  null|
|  40| Jim|    40|
|  24|Mary|    24|
+----+----+------+



In [None]:
df.show()

+----+----+
| age|name|
+----+----+
|null| Bob|
|  40| Jim|
|  24|Mary|
+----+----+



In [None]:
df.withColumnRenamed("name","firstName").show()

+----+---------+
| age|firstName|
+----+---------+
|null|      Bob|
|  40|      Jim|
|  24|     Mary|
+----+---------+



In [None]:
df.show()

+----+----+
| age|name|
+----+----+
|null| Bob|
|  40| Jim|
|  24|Mary|
+----+----+



In [None]:
df.withColumn("agePlusTen", df['age']+10).show()

+----+----+----------+
| age|name|agePlusTen|
+----+----+----------+
|null| Bob|      null|
|  40| Jim|        50|
|  24|Mary|        34|
+----+----+----------+



In [None]:
df.withColumn("age_minus_5", df['age']-5).show()

+----+----+-----------+
| age|name|age_minus_5|
+----+----+-----------+
|null| Bob|       null|
|  40| Jim|         35|
|  24|Mary|         19|
+----+----+-----------+



## Using SQL

In [None]:
df.createOrReplaceTempView("custmers")

In [None]:
sql_results = spark.sql("SELECT * from custmers")

In [None]:
sql_results

DataFrame[age: int, name: string]

In [None]:
sql_results.show()

+----+----+
| age|name|
+----+----+
|null| Bob|
|  40| Jim|
|  24|Mary|
+----+----+



In [None]:
spark.sql("SELECT * FROM custmers WHERE age=24").show()

+---+----+
|age|name|
+---+----+
| 24|Mary|
+---+----+



## DataFrame Operations

- Cover basic operations with Spark DataFrames.
- Use stock data from Walmart.

In [None]:
!curl https://raw.githubusercontent.com/markumreed/colab_pyspark/main/WMT.csv >> WMT.csv

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100 89556  100 89556    0     0   392k      0 --:--:-- --:--:-- --:--:--  392k


In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("operations").getOrCreate()
df = spark.read.csv('WMT.csv',inferSchema=True,header=True)

In [None]:
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: string (nullable = true)



In [None]:
df.head(5)

[Row(Date='2016-01-20', Open=61.799999, High=62.330002, Low=60.200001, Close=60.84, Adj Close=53.990601, Volume='17369100'),
 Row(Date='2016-01-21', Open=60.98, High=62.790001, Low=60.91, Close=61.880001, Adj Close=54.913509, Volume='12089200'),
 Row(Date='2016-01-22', Open=62.439999, High=63.259998, Low=62.130001, Close=62.689999, Adj Close=55.632324, Volume='9197500'),
 Row(Date='2016-01-25', Open=62.779999, High=63.82, Low=62.549999, Close=63.450001, Adj Close=56.306763, Volume='12823400'),
 Row(Date='2016-01-26', Open=63.360001, High=64.470001, Low=63.259998, Close=64.0, Adj Close=56.794834, Volume='9441200')]

## Filtering Data

- DataFrames allow for quick filtering of data based on conditions 


In [None]:
df.filter('Close<62').show()

+----------+---------+---------+---------+---------+---------+--------+
|      Date|     Open|     High|      Low|    Close|Adj Close|  Volume|
+----------+---------+---------+---------+---------+---------+--------+
|2016-01-20|61.799999|62.330002|60.200001|    60.84|53.990601|17369100|
|2016-01-21|    60.98|62.790001|    60.91|61.880001|54.913509|12089200|
|2016-01-20|61.799999|62.330002|60.200001|    60.84|53.990601|17369100|
|2016-01-21|    60.98|62.790001|    60.91|61.880001|54.913509|12089200|
+----------+---------+---------+---------+---------+---------+--------+



In [None]:
df.filter('Close<62').select('Open').show()

+---------+
|     Open|
+---------+
|61.799999|
|    60.98|
|61.799999|
|    60.98|
+---------+



In [None]:
df.filter('Close<62').select(['Date','Open']).show()

+----------+---------+
|      Date|     Open|
+----------+---------+
|2016-01-20|61.799999|
|2016-01-21|    60.98|
|2016-01-20|61.799999|
|2016-01-21|    60.98|
+----------+---------+



## Using Comparison Operators
- Using comparison operators will look similar to SQL operators
- Make to call the entire column within the dataframe

In [None]:
df.filter(df['Close'] < 62).show()

+----------+---------+---------+---------+---------+---------+--------+
|      Date|     Open|     High|      Low|    Close|Adj Close|  Volume|
+----------+---------+---------+---------+---------+---------+--------+
|2016-01-20|61.799999|62.330002|60.200001|    60.84|53.990601|17369100|
|2016-01-21|    60.98|62.790001|    60.91|61.880001|54.913509|12089200|
|2016-01-20|61.799999|62.330002|60.200001|    60.84|53.990601|17369100|
|2016-01-21|    60.98|62.790001|    60.91|61.880001|54.913509|12089200|
+----------+---------+---------+---------+---------+---------+--------+



In [None]:
df.filter((df['Close'] < 62) & ~(df['Open'] > 60)).show()

+----+----+----+---+-----+---------+------+
|Date|Open|High|Low|Close|Adj Close|Volume|
+----+----+----+---+-----+---------+------+
+----+----+----+---+-----+---------+------+



In [None]:
df.filter(df['Open'] == 60.98).show(1)

+----------+-----+---------+-----+---------+---------+--------+
|      Date| Open|     High|  Low|    Close|Adj Close|  Volume|
+----------+-----+---------+-----+---------+---------+--------+
|2016-01-21|60.98|62.790001|60.91|61.880001|54.913509|12089200|
+----------+-----+---------+-----+---------+---------+--------+
only showing top 1 row



In [None]:
df.filter(df['Open'] == 60.98).collect()

[Row(Date='2016-01-21', Open=60.98, High=62.790001, Low=60.91, Close=61.880001, Adj Close=54.913509, Volume='12089200'),
 Row(Date='2016-01-21', Open=60.98, High=62.790001, Low=60.91, Close=61.880001, Adj Close=54.913509, Volume='12089200')]

In [None]:
res =df.filter(df['Open'] == 60.98).collect()

In [None]:
type(res[0])

pyspark.sql.types.Row

In [None]:
res[0].asDict()

{'Adj Close': 54.913509,
 'Close': 61.880001,
 'Date': '2016-01-21',
 'High': 62.790001,
 'Low': 60.91,
 'Open': 60.98,
 'Volume': '12089200'}

In [None]:
for item in res[0]:
  print(item)

2016-01-21
60.98
62.790001
60.91
61.880001
54.913509
12089200


In [None]:
import pandas as pd

In [None]:
pd.Series(res[0].asDict())

Date         2016-01-21
Open              60.98
High              62.79
Low               60.91
Close             61.88
Adj Close       54.9135
Volume         12089200
dtype: object

# GroupBy and Aggregate Functions
- `GroupBy` allows you to group rows together based off some column value
- Once you've performed the `GroupBy` operation you can use an aggregate function off that data. 
- An aggregate function aggregates multiple rows of data into a single output, such as taking the sum of inputs, or counting the number of inputs.



In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName("groupbyagg").getOrCreate()

## Import Data


In [None]:
!curl https://raw.githubusercontent.com/markumreed/colab_pyspark/main/sales_data.csv >> sales_data.csv

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100   202  100   202    0     0   1836      0 --:--:-- --:--:-- --:--:--  1836


In [None]:
df = spark.read.csv("sales_data.csv", inferSchema=True, header=True)

In [None]:
df.printSchema()

root
 |-- company: string (nullable = true)
 |-- representative: string (nullable = true)
 |-- num_sales: double (nullable = true)



In [None]:
df.show()

+-------+--------------+---------+
|company|representative|num_sales|
+-------+--------------+---------+
|    XYZ|           Bob|    200.0|
|    XYZ|           Tom|    120.0|
|    XYZ|         Frank|    340.0|
|   ABCD|         Jerry|    600.0|
|   ABCD|           Amy|    124.0|
|   ABCD|       Vanessa|    243.0|
|     OK|          Carl|    870.0|
|     OK|         Sarah|    350.0|
|   BLAH|          John|    250.0|
|   BLAH|         Linda|    130.0|
|   BLAH|          Mike|    750.0|
|   BLAH|         Chris|    350.0|
+-------+--------------+---------+



## Grouping Data
- Group the data by company

In [None]:
df.groupBy("company")

<pyspark.sql.group.GroupedData at 0x7fe2f0e67ef0>

## Aggregate Functions
- mean, count, max, min, sum...

In [None]:
df.groupBy("company").mean().show()

+-------+-----------------+
|company|   avg(num_sales)|
+-------+-----------------+
|   BLAH|            370.0|
|    XYZ|            220.0|
|     OK|            610.0|
|   ABCD|322.3333333333333|
+-------+-----------------+



In [None]:
df.groupBy("company").count().show()

+-------+-----+
|company|count|
+-------+-----+
|   BLAH|    4|
|    XYZ|    3|
|     OK|    2|
|   ABCD|    3|
+-------+-----+



In [None]:
df.groupBy("company").min().show()

+-------+--------------+
|company|min(num_sales)|
+-------+--------------+
|   BLAH|         130.0|
|    XYZ|         120.0|
|     OK|         350.0|
|   ABCD|         124.0|
+-------+--------------+



In [None]:
df.groupBy("company").max().show()

+-------+--------------+
|company|max(num_sales)|
+-------+--------------+
|   BLAH|         750.0|
|    XYZ|         340.0|
|     OK|         870.0|
|   ABCD|         600.0|
+-------+--------------+



In [None]:
df.groupBy("company").sum().show()

+-------+--------------+
|company|sum(num_sales)|
+-------+--------------+
|   BLAH|        1480.0|
|    XYZ|         660.0|
|     OK|        1220.0|
|   ABCD|         967.0|
+-------+--------------+



## Aggregating

- Not all methods need a groupby call, instead you can just call the generalized `.agg()` method, that will call the aggregate across all rows in the dataframe column specified. 
- It can take in arguments as a single column, or create multiple aggregate calls all at once using dictionary notation.


In [None]:
df.agg({"num_sales":"max"}).show()

+--------------+
|max(num_sales)|
+--------------+
|         870.0|
+--------------+



In [None]:
df.groupBy("company").agg({"num_sales":"mean"}).show()

+-------+-----------------+
|company|   avg(num_sales)|
+-------+-----------------+
|   BLAH|            370.0|
|    XYZ|            220.0|
|     OK|            610.0|
|   ABCD|322.3333333333333|
+-------+-----------------+



In [None]:
company_groups = df.groupBy("company")

In [None]:
company_groups.min().show()

+-------+--------------+
|company|min(num_sales)|
+-------+--------------+
|   BLAH|         130.0|
|    XYZ|         120.0|
|     OK|         350.0|
|   ABCD|         124.0|
+-------+--------------+



## Functions
There are a variety of functions you can import from pyspark.sql.functions.

In [None]:
from pyspark.sql.functions import countDistinct, avg, stddev

In [None]:
df.select(countDistinct("num_sales")).show()

+-------------------------+
|count(DISTINCT num_sales)|
+-------------------------+
|                       11|
+-------------------------+



In [None]:
df.select(avg("num_sales")).show()

+-----------------+
|   avg(num_sales)|
+-----------------+
|360.5833333333333|
+-----------------+



In [None]:
df.select(stddev("num_sales")).show()

+----------------------+
|stddev_samp(num_sales)|
+----------------------+
|    250.08742410799007|
+----------------------+



### Alias
- To change the name, use the `.alias()` method for this:

In [None]:
df.select(countDistinct("num_sales").alias("ANYTHING WE WANT")).show()

+----------------+
|ANYTHING WE WANT|
+----------------+
|              11|
+----------------+



### Precision
- Use the `format_number` to change precision


In [None]:
from pyspark.sql.functions import format_number

In [None]:
sales_std = df.select(stddev("num_sales").alias("stddev"))

In [None]:
sales_std.show()

+------------------+
|            stddev|
+------------------+
|250.08742410799007|
+------------------+



In [None]:
sales_std.select(format_number("stddev",2)).show()

+------------------------+
|format_number(stddev, 2)|
+------------------------+
|                  250.09|
+------------------------+



## Order By


In [None]:
df.orderBy("num_sales").show() # Ascending Order

+-------+--------------+---------+
|company|representative|num_sales|
+-------+--------------+---------+
|    XYZ|           Tom|    120.0|
|   ABCD|           Amy|    124.0|
|   BLAH|         Linda|    130.0|
|    XYZ|           Bob|    200.0|
|   ABCD|       Vanessa|    243.0|
|   BLAH|          John|    250.0|
|    XYZ|         Frank|    340.0|
|     OK|         Sarah|    350.0|
|   BLAH|         Chris|    350.0|
|   ABCD|         Jerry|    600.0|
|   BLAH|          Mike|    750.0|
|     OK|          Carl|    870.0|
+-------+--------------+---------+



In [None]:
df.orderBy(df['num_sales'].desc()).show()

+-------+--------------+---------+
|company|representative|num_sales|
+-------+--------------+---------+
|     OK|          Carl|    870.0|
|   BLAH|          Mike|    750.0|
|   ABCD|         Jerry|    600.0|
|     OK|         Sarah|    350.0|
|   BLAH|         Chris|    350.0|
|    XYZ|         Frank|    340.0|
|   BLAH|          John|    250.0|
|   ABCD|       Vanessa|    243.0|
|    XYZ|           Bob|    200.0|
|   BLAH|         Linda|    130.0|
|   ABCD|           Amy|    124.0|
|    XYZ|           Tom|    120.0|
+-------+--------------+---------+



# Missing Data

- Often data sources are incomplete
- There are 3 options for filling in missing data:
  1. Just keep the missing data points.
  1. Drop them missing data points/row
  1. Fill them in with some other value.

## Keeping the missing data
A few machine learning algorithms can easily deal with missing data, let's see what it looks like:

In [None]:
!curl https://raw.githubusercontent.com/markumreed/colab_pyspark/main/missing_data.csv >> missing_data.csv

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("missing_data").getOrCreate()

In [None]:
df = spark.read.csv("missing_data.csv", header=True, inferSchema=True)

In [None]:
df.show()

+-----+-----+-----+
|   id| name|sales|
+-----+-----+-----+
|id001|  Bob| null|
|id002| null| null|
|id003| null|585.0|
|id004|Karen|404.0|
+-----+-----+-----+



In [None]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- sales: double (nullable = true)



## Drop the missing data

You can use the `.na` functions for missing data. The `drop` command has the following parameters:

```df.na.drop(how='any', thresh=None, subset=None)```
    
    * param how: 'any' or 'all'.
    
        If 'any', drop a row if it contains any nulls.
        If 'all', drop a row only if all its values are null.
    
    * param thresh: int, default None
    
        If specified, drop rows that have less than `thresh` non-null values.
        This overwrites the `how` parameter.
        
    * param subset: 
        optional list of column names to consider.


In [None]:
df.na.drop().show()

+-----+-----+-----+
|   id| name|sales|
+-----+-----+-----+
|id004|Karen|404.0|
+-----+-----+-----+



In [None]:
df.na.drop(thresh=2).show()

+-----+-----+-----+
|   id| name|sales|
+-----+-----+-----+
|id001|  Bob| null|
|id003| null|585.0|
|id004|Karen|404.0|
+-----+-----+-----+



In [None]:
df.na.drop(subset=['sales']).show()

+-----+-----+-----+
|   id| name|sales|
+-----+-----+-----+
|id003| null|585.0|
|id004|Karen|404.0|
+-----+-----+-----+



In [None]:
df.na.drop(how='any').show()

+-----+-----+-----+
|   id| name|sales|
+-----+-----+-----+
|id004|Karen|404.0|
+-----+-----+-----+



In [None]:
df.na.drop(how='all').show()

+-----+-----+-----+
|   id| name|sales|
+-----+-----+-----+
|id001|  Bob| null|
|id002| null| null|
|id003| null|585.0|
|id004|Karen|404.0|
+-----+-----+-----+



## Fill the missing values

We can also fill the missing values with new values. If you have multiple nulls across multiple data types, Spark is actually smart enough to match up the data types. For example:


In [None]:
df.na.fill('SOME VALUE').show()

+-----+----------+-----+
|   id|      name|sales|
+-----+----------+-----+
|id001|       Bob| null|
|id002|SOME VALUE| null|
|id003|SOME VALUE|585.0|
|id004|     Karen|404.0|
+-----+----------+-----+



In [None]:
df.na.fill(999).show()

+-----+-----+-----+
|   id| name|sales|
+-----+-----+-----+
|id001|  Bob|999.0|
|id002| null|999.0|
|id003| null|585.0|
|id004|Karen|404.0|
+-----+-----+-----+



In [None]:
df.na.fill("Missing Name", subset=["name"]).show()

+-----+------------+-----+
|   id|        name|sales|
+-----+------------+-----+
|id001|         Bob| null|
|id002|Missing Name| null|
|id003|Missing Name|585.0|
|id004|       Karen|404.0|
+-----+------------+-----+



In [None]:
from pyspark.sql.functions import mean

In [None]:
mean_value = df.select(mean(df['sales'])).collect()

In [None]:
mean_sales_value = mean_value[0][0]

In [None]:
df.na.fill(mean_sales_value, ["sales"]).show()

+-----+-----+-----+
|   id| name|sales|
+-----+-----+-----+
|id001|  Bob|494.5|
|id002| null|494.5|
|id003| null|585.0|
|id004|Karen|404.0|
+-----+-----+-----+



In [None]:
# DON'T DO THIS
df.na.fill(df.select(mean(df['sales'])).collect()[0][0] ,['sales']).show() # NOT EASY TO READ

+-----+-----+-----+
|   id| name|sales|
+-----+-----+-----+
|id001|  Bob|494.5|
|id002| null|494.5|
|id003| null|585.0|
|id004|Karen|404.0|
+-----+-----+-----+



# Dates and Timestamps

You will often find yourself working with Time and Date information


In [None]:
!curl https://raw.githubusercontent.com/markumreed/colab_pyspark/main/WMT.csv >> WMT.csv

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 89556  100 89556    0     0   352k      0 --:--:-- --:--:-- --:--:--  351k


In [None]:
spark = SparkSession.builder.appName('walmart_dates').getOrCreate()

In [None]:
df = spark.read.csv('WMT.csv', header=True, inferSchema=True)

In [None]:
df.show()

+----------+---------+---------+---------+---------+---------+--------+
|      Date|     Open|     High|      Low|    Close|Adj Close|  Volume|
+----------+---------+---------+---------+---------+---------+--------+
|2016-01-20|61.799999|62.330002|60.200001|    60.84|53.990601|17369100|
|2016-01-21|    60.98|62.790001|    60.91|61.880001|54.913509|12089200|
|2016-01-22|62.439999|63.259998|62.130001|62.689999|55.632324| 9197500|
|2016-01-25|62.779999|    63.82|62.549999|63.450001|56.306763|12823400|
|2016-01-26|63.360001|64.470001|63.259998|     64.0|56.794834| 9441200|
|2016-01-27|64.099998|    65.18|63.889999|63.950001|56.750477|10214300|
|2016-01-28|64.029999|64.510002|    63.43|64.220001| 56.99007|11278300|
|2016-01-29|    64.75|66.529999|64.739998|66.360001|58.889149|16439100|
|2016-02-01|65.910004|    67.93|65.889999|     67.5| 59.90081|14728400|
|2016-02-02|67.300003|67.839996|66.279999|66.860001|59.332867|13585900|
|2016-02-03|67.309998|     67.5|    65.07|66.269997| 58.80928|12

In [None]:
from pyspark.sql.functions import format_number, dayofmonth, hour, dayofyear, month, year, weekofyear, date_format

In [None]:
df.select(dayofmonth(df['Date'])).show()

+----------------+
|dayofmonth(Date)|
+----------------+
|              20|
|              21|
|              22|
|              25|
|              26|
|              27|
|              28|
|              29|
|               1|
|               2|
|               3|
|               4|
|               5|
|               8|
|               9|
|              10|
|              11|
|              12|
|              16|
|              17|
+----------------+
only showing top 20 rows



In [None]:
df.select(hour(df['Date'])).show()

+----------+
|hour(Date)|
+----------+
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
+----------+
only showing top 20 rows



In [None]:
df.select(dayofyear(df['Date'])).show()

+---------------+
|dayofyear(Date)|
+---------------+
|             20|
|             21|
|             22|
|             25|
|             26|
|             27|
|             28|
|             29|
|             32|
|             33|
|             34|
|             35|
|             36|
|             39|
|             40|
|             41|
|             42|
|             43|
|             47|
|             48|
+---------------+
only showing top 20 rows



In [None]:
df.select(month(df['Date'])).show()

+-----------+
|month(Date)|
+-----------+
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          2|
|          2|
|          2|
|          2|
|          2|
|          2|
|          2|
|          2|
|          2|
|          2|
|          2|
|          2|
+-----------+
only showing top 20 rows



Find Avg Close Price per month.

In [None]:
df.select(month(df['Date'])).show()

+-----------+
|month(Date)|
+-----------+
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          2|
|          2|
|          2|
|          2|
|          2|
|          2|
|          2|
|          2|
|          2|
|          2|
|          2|
|          2|
+-----------+
only showing top 20 rows



In [None]:
df.withColumn("Month", month(df['Date'])).show()

+----------+---------+---------+---------+---------+---------+--------+-----+
|      Date|     Open|     High|      Low|    Close|Adj Close|  Volume|Month|
+----------+---------+---------+---------+---------+---------+--------+-----+
|2016-01-20|61.799999|62.330002|60.200001|    60.84|53.990601|17369100|    1|
|2016-01-21|    60.98|62.790001|    60.91|61.880001|54.913509|12089200|    1|
|2016-01-22|62.439999|63.259998|62.130001|62.689999|55.632324| 9197500|    1|
|2016-01-25|62.779999|    63.82|62.549999|63.450001|56.306763|12823400|    1|
|2016-01-26|63.360001|64.470001|63.259998|     64.0|56.794834| 9441200|    1|
|2016-01-27|64.099998|    65.18|63.889999|63.950001|56.750477|10214300|    1|
|2016-01-28|64.029999|64.510002|    63.43|64.220001| 56.99007|11278300|    1|
|2016-01-29|    64.75|66.529999|64.739998|66.360001|58.889149|16439100|    1|
|2016-02-01|65.910004|    67.93|65.889999|     67.5| 59.90081|14728400|    2|
|2016-02-02|67.300003|67.839996|66.279999|66.860001|59.332867|13

In [None]:
df2 = df.withColumn("Month", month(df['Date']))

In [None]:
df2.groupBy("Month").mean()[['avg(Month)', 'avg(Close)']].show()

+----------+------------------+
|avg(Month)|        avg(Close)|
+----------+------------------+
|      12.0|106.02932022330099|
|       1.0| 98.94980368627448|
|       6.0|  92.2302801401869|
|       3.0| 87.44880724770645|
|       5.0| 90.54859816822429|
|       9.0|100.69396066336634|
|       4.0| 91.55893247572816|
|       8.0| 96.97705391071432|
|       7.0| 96.65647596190469|
|      10.0|102.74810810810811|
|      11.0|105.59009729126215|
|       2.0| 89.16364570833336|
+----------+------------------+



In [None]:
res = df2.groupBy("Month").mean()[['avg(Month)', 'avg(Close)']]
res = res.withColumnRenamed("avg(Month)", "Month")
res = res.select("Month", format_number('avg(Close)',2).alias("Mean Close")).show()

+-----+----------+
|Month|Mean Close|
+-----+----------+
| 12.0|    106.03|
|  1.0|     98.95|
|  6.0|     92.23|
|  3.0|     87.45|
|  5.0|     90.55|
|  9.0|    100.69|
|  4.0|     91.56|
|  8.0|     96.98|
|  7.0|     96.66|
| 10.0|    102.75|
| 11.0|    105.59|
|  2.0|     89.16|
+-----+----------+



# Spark DataFrames Review

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("walmart_stock").getOrCreate()

In [None]:
df = spark.read.csv("walmart_stock.csv", header=True, inferSchema=True)

In [None]:
df.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

In [None]:
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [None]:
df.head(5)

[Row(Date='2012-01-03', Open=59.970001, High=61.060001, Low=59.869999, Close=60.330002, Volume=12668800, Adj Close=52.619234999999996),
 Row(Date='2012-01-04', Open=60.209998999999996, High=60.349998, Low=59.470001, Close=59.709998999999996, Volume=9593300, Adj Close=52.078475),
 Row(Date='2012-01-05', Open=59.349998, High=59.619999, Low=58.369999, Close=59.419998, Volume=12768200, Adj Close=51.825539),
 Row(Date='2012-01-06', Open=59.419998, High=59.450001, Low=58.869999, Close=59.0, Volume=8069400, Adj Close=51.45922),
 Row(Date='2012-01-09', Open=59.029999, High=59.549999, Low=58.919998, Close=59.18, Volume=6679300, Adj Close=51.616215000000004)]

In [None]:
df.describe().show()

+-------+----------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|      Date|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+----------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|      1258|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean|      null| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|      null|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|2012-01-03|56.389998999999996|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|2016-12-30|         90.800003|        90.970001|            89.25|        90.4700

In [None]:
df.describe().printSchema()

root
 |-- summary: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Adj Close: string (nullable = true)



In [None]:
from pyspark.sql.functions import format_number

In [None]:
res = df.describe()

In [None]:
df.describe().columns

['summary', 'Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

In [None]:
res.select(res["summary"],
             format_number(res['Open'].cast('float'), 2).alias('Open'),
             format_number(res['High'].cast('float'), 2).alias('High'),
             format_number(res['Low'].cast('float'), 2).alias('Low'),
             format_number(res['Close'].cast('float'), 2).alias('Close'),
             res['Volume'] .cast('int').alias('Volume')
             ).show()

+-------+--------+--------+--------+--------+--------+
|summary|    Open|    High|     Low|   Close|  Volume|
+-------+--------+--------+--------+--------+--------+
|  count|1,258.00|1,258.00|1,258.00|1,258.00|    1258|
|   mean|   72.36|   72.84|   71.92|   72.39| 8222093|
| stddev|    6.77|    6.77|    6.74|    6.76| 4519780|
|    min|   56.39|   57.06|   56.30|   56.42| 2094900|
|    max|   90.80|   90.97|   89.25|   90.47|80898100|
+-------+--------+--------+--------+--------+--------+



In [None]:
# High vs Volume
df2 = df.withColumn("HV Ratio", df['High']/df['Volume'])

In [None]:
df2.show()

+----------+------------------+------------------+------------------+------------------+--------+------------------+--------------------+
|      Date|              Open|              High|               Low|             Close|  Volume|         Adj Close|            HV Ratio|
+----------+------------------+------------------+------------------+------------------+--------+------------------+--------------------+
|2012-01-03|         59.970001|         61.060001|         59.869999|         60.330002|12668800|52.619234999999996|4.819714653321546E-6|
|2012-01-04|60.209998999999996|         60.349998|         59.470001|59.709998999999996| 9593300|         52.078475|6.290848613094555E-6|
|2012-01-05|         59.349998|         59.619999|         58.369999|         59.419998|12768200|         51.825539|4.669412994783916E-6|
|2012-01-06|         59.419998|         59.450001|         58.869999|              59.0| 8069400|          51.45922|7.367338463826307E-6|
|2012-01-09|         59.029999|   

In [None]:
df2.select('HV Ratio').show()

+--------------------+
|            HV Ratio|
+--------------------+
|4.819714653321546E-6|
|6.290848613094555E-6|
|4.669412994783916E-6|
|7.367338463826307E-6|
|8.915604778943901E-6|
|8.644477436914568E-6|
|9.351828421515645E-6|
| 8.29141562102703E-6|
|7.712212102001476E-6|
|7.071764823529412E-6|
|1.015495466386981E-5|
|6.576354146362592...|
| 5.90145296180676E-6|
|8.547679455011844E-6|
|8.420709512685392E-6|
|1.041448341728929...|
|8.316075414862431E-6|
|9.721183814992126E-6|
|8.029436027707578E-6|
|6.307432259386365E-6|
+--------------------+
only showing top 20 rows



In [None]:
df.orderBy(df['High'].desc()).head(1)[0][0]

'2015-01-13'

In [None]:
from pyspark.sql.functions import mean
df.select(mean('Close')).show()

+-----------------+
|       avg(Close)|
+-----------------+
|72.38844998012726|
+-----------------+



In [None]:
from pyspark.sql.functions import max, min

In [None]:
df.select(max('Volume'), min('Volume')).show()

+-----------+-----------+
|max(Volume)|min(Volume)|
+-----------+-----------+
|   80898100|    2094900|
+-----------+-----------+



In [None]:
df.filter("Close < 60").count()

81

In [None]:
from pyspark.sql.functions import count

In [None]:
res = df.filter('Close < 60')
res.select(count('Close')).show()

+------------+
|count(Close)|
+------------+
|          81|
+------------+



In [None]:
(df.filter('High > 80').count() * 1.0/df.count()) * 100

9.141494435612083

In [None]:
from pyspark.sql.functions import corr

In [None]:
df.select(corr('High', 'Volume')).show()

+-------------------+
| corr(High, Volume)|
+-------------------+
|-0.3384326061737161|
+-------------------+



In [None]:
from pyspark.sql.functions import year
yeardf = df.withColumn("Year", year(df['Date']))

In [None]:
max_df = yeardf.groupBy('Year').max()

In [None]:
max_df.select('Year', 'max(High)').show()

+----+---------+
|Year|max(High)|
+----+---------+
|2015|90.970001|
|2013|81.370003|
|2014|88.089996|
|2012|77.599998|
|2016|75.190002|
+----+---------+



In [None]:
max_df.show()

+----+-----------------+---------+---------+----------+-----------+-----------------+---------+
|Year|        max(Open)|max(High)| max(Low)|max(Close)|max(Volume)|   max(Adj Close)|max(Year)|
+----+-----------------+---------+---------+----------+-----------+-----------------+---------+
|2015|        90.800003|90.970001|    89.25| 90.470001|   80898100|84.91421600000001|     2015|
|2013|        81.209999|81.370003|    80.82| 81.209999|   25683700|        73.929868|     2013|
|2014|87.08000200000001|88.089996|86.480003| 87.540001|   22812400|81.70768000000001|     2014|
|2012|        77.599998|77.599998|76.690002| 77.150002|   38007300|        68.568371|     2012|
|2016|             74.5|75.190002|73.629997| 74.300003|   35076700|        73.233524|     2016|
+----+-----------------+---------+---------+----------+-----------+-----------------+---------+



In [None]:
from pyspark.sql.functions import month

In [None]:
monthdf = df.withColumn("Month", month("Date"))
monthavgs = monthdf.select("Month", "Close").groupBy("Month").mean()
monthavgs.select("Month", "avg(Close)").orderBy('Month').show()

+-----+-----------------+
|Month|       avg(Close)|
+-----+-----------------+
|    1|71.44801958415842|
|    2|  71.306804443299|
|    3|71.77794377570092|
|    4|72.97361900952382|
|    5|72.30971688679247|
|    6| 72.4953774245283|
|    7|74.43971943925233|
|    8|73.02981855454546|
|    9|72.18411785294116|
|   10|71.57854545454543|
|   11| 72.1110893069307|
|   12|72.84792478301885|
+-----+-----------------+



## Spark DataFrame Review 02


In [None]:
!curl https://raw.githubusercontent.com/markumreed/colab_pyspark/main/appl_stock.csv >> apple_stock.csv

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0  5  139k    5  7753    0     0  53102      0  0:00:02 --:--:--  0:00:02 52741100  139k  100  139k    0     0   817k      0 --:--:-- --:--:-- --:--:--  817k


# Linear Regression with PySpark

- Based on the Official Spark Documentation for PySpark

In [None]:
!curl https://raw.githubusercontent.com/apache/spark/master/data/mllib/sample_linear_regression_data.txt >> sample_linear_regression_data.txt

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100  116k  100  116k    0     0  1306k      0 --:--:-- --:--:-- --:--:-- 1306k


In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName("lr_ex").getOrCreate()

In [None]:
from pyspark.ml.regression import LinearRegression

In [None]:
training = spark.read.format("libsvm").load("sample_linear_regression_data.txt")

In [None]:
training.show()

+-------------------+--------------------+
|              label|            features|
+-------------------+--------------------+
| -9.490009878824548|(10,[0,1,2,3,4,5,...|
| 0.2577820163584905|(10,[0,1,2,3,4,5,...|
| -4.438869807456516|(10,[0,1,2,3,4,5,...|
|-19.782762789614537|(10,[0,1,2,3,4,5,...|
| -7.966593841555266|(10,[0,1,2,3,4,5,...|
| -7.896274316726144|(10,[0,1,2,3,4,5,...|
| -8.464803554195287|(10,[0,1,2,3,4,5,...|
| 2.1214592666251364|(10,[0,1,2,3,4,5,...|
| 1.0720117616524107|(10,[0,1,2,3,4,5,...|
|-13.772441561702871|(10,[0,1,2,3,4,5,...|
| -5.082010756207233|(10,[0,1,2,3,4,5,...|
|  7.887786536531237|(10,[0,1,2,3,4,5,...|
| 14.323146365332388|(10,[0,1,2,3,4,5,...|
|-20.057482615789212|(10,[0,1,2,3,4,5,...|
|-0.8995693247765151|(10,[0,1,2,3,4,5,...|
| -19.16829262296376|(10,[0,1,2,3,4,5,...|
|  5.601801561245534|(10,[0,1,2,3,4,5,...|
|-3.2256352187273354|(10,[0,1,2,3,4,5,...|
| 1.5299675726687754|(10,[0,1,2,3,4,5,...|
| -0.250102447941961|(10,[0,1,2,3,4,5,...|
+----------

In [None]:
lr = LinearRegression(featuresCol="features", labelCol="label", predictionCol="prediction")

In [None]:
lrModel = lr.fit(training)

In [None]:
print("Coefficients:", str(lrModel.coefficients))
print("Intercept:", str(lrModel.intercept))

Coefficients: [0.0073350710225801715,0.8313757584337543,-0.8095307954684084,2.441191686884721,0.5191713795290003,1.1534591903547016,-0.2989124112808717,-0.5128514186201779,-0.619712827067017,0.6956151804322931]
Intercept: 0.14228558260358093


In [None]:
trainSummary = lrModel.summary

In [None]:
print("MAE: ", trainSummary.meanAbsoluteError)
print("MSE: ", trainSummary.meanSquaredError)
print("RMSE: ", trainSummary.rootMeanSquaredError)
print("R2: ", trainSummary.r2)
print("Adj R2: ", trainSummary.r2adj)


MAE:  8.145215527783876
MSE:  103.28843028724194
RMSE:  10.16309157133015
R2:  0.027839179518600154
Adj R2:  0.007999162774081858


## Train Test Split with PySpark
- Pass in the split between training/test as a list.
-  No correct, but generally 70/30 or 60/40 splits are used.
-  Depending on how much data you have and how unbalanced it is.

In [None]:
df = spark.read.format("libsvm").load("sample_linear_regression_data.txt") # FULL DATASET

In [None]:
train_data, test_data = df.randomSplit([0.7, 0.3], seed=42)

In [None]:
test_data.show()

+-------------------+--------------------+
|              label|            features|
+-------------------+--------------------+
|-26.805483428483072|(10,[0,1,2,3,4,5,...|
|-22.949825936196074|(10,[0,1,2,3,4,5,...|
|-21.432387764165806|(10,[0,1,2,3,4,5,...|
|-20.212077258958672|(10,[0,1,2,3,4,5,...|
|-19.782762789614537|(10,[0,1,2,3,4,5,...|
| -19.66731861537172|(10,[0,1,2,3,4,5,...|
|-19.402336030214553|(10,[0,1,2,3,4,5,...|
|-17.803626188664516|(10,[0,1,2,3,4,5,...|
|-17.428674570939506|(10,[0,1,2,3,4,5,...|
|-17.065399625876015|(10,[0,1,2,3,4,5,...|
|-17.026492264209548|(10,[0,1,2,3,4,5,...|
|-16.151349351277112|(10,[0,1,2,3,4,5,...|
| -16.08565904102149|(10,[0,1,2,3,4,5,...|
|-15.951512565794573|(10,[0,1,2,3,4,5,...|
|-15.780685032623301|(10,[0,1,2,3,4,5,...|
| -15.72351561304857|(10,[0,1,2,3,4,5,...|
|-15.437384793431217|(10,[0,1,2,3,4,5,...|
|-15.334767479922341|(10,[0,1,2,3,4,5,...|
|-14.822152909751189|(10,[0,1,2,3,4,5,...|
|-14.762758252931127|(10,[0,1,2,3,4,5,...|
+----------

In [None]:
unlabeled_data = test_data.select('features')

In [None]:
corrected_model = lr.fit(train_data) 

In [None]:
res = corrected_model.evaluate(test_data)

In [None]:
print("MAE: ", res.meanAbsoluteError)
print("MSE: ", res.meanSquaredError)
print("RMSE: ", res.rootMeanSquaredError)
print("R2: ", res.r2)
print("Adj R2: ", res.r2adj)

MAE:  9.855750048378727
MSE:  142.31866794563598
RMSE:  11.929738804585622
R2:  -0.14679155085585793
Adj R2:  -0.24651255527810645


In [None]:
predictions = corrected_model.transform(unlabeled_data)

In [None]:
predictions.show()

+--------------------+--------------------+
|            features|          prediction|
+--------------------+--------------------+
|(10,[0,1,2,3,4,5,...|   1.500419302439231|
|(10,[0,1,2,3,4,5,...|   6.540721556576252|
|(10,[0,1,2,3,4,5,...|  1.4369775273526635|
|(10,[0,1,2,3,4,5,...|  1.3156052948594428|
|(10,[0,1,2,3,4,5,...|-0.09510236182489817|
|(10,[0,1,2,3,4,5,...| 0.12648407749270263|
|(10,[0,1,2,3,4,5,...|-0.40745999229762575|
|(10,[0,1,2,3,4,5,...| -1.3827504557268635|
|(10,[0,1,2,3,4,5,...|  2.6965070486236957|
|(10,[0,1,2,3,4,5,...|    2.42284270742401|
|(10,[0,1,2,3,4,5,...|-0.33620505674116263|
|(10,[0,1,2,3,4,5,...|  1.5811910073932323|
|(10,[0,1,2,3,4,5,...| -0.9126865153126812|
|(10,[0,1,2,3,4,5,...| -2.4337353560269603|
|(10,[0,1,2,3,4,5,...|  4.7238640017384945|
|(10,[0,1,2,3,4,5,...|  1.7972086764514907|
|(10,[0,1,2,3,4,5,...| -0.3727532193177282|
|(10,[0,1,2,3,4,5,...|   3.393593882956883|
|(10,[0,1,2,3,4,5,...|   1.173823533651508|
|(10,[0,1,2,3,4,5,...| 0.4009232

# Data Transformations with PySpark

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('data_transformer').getOrCreate()

In [None]:
df = spark.read.csv('customers.csv', inferSchema=True, header=True)

In [None]:
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Phone: long (nullable = true)
 |-- Group: string (nullable = true)



In [None]:
df.show()

+------+----------+-----+
|  Name|     Phone|Group|
+------+----------+-----+
|  John|4085552424|    I|
|  Mike|3105552738|   II|
|Cassie|4085552424|   II|
| Laura|3105552438|   II|
| Sarah|4085551234|    I|
| David|3105557463|  III|
|  Zach|4085553987|  III|
| Kiera|3105552938|    I|
| Alexa|4085559467|  III|
+------+----------+-----+



## Data Features
### StringIndexer
- Convert string data into numerical (categorical feature)
- Encode as dummy variables/OneHotEncoder
- `StringIndexer`

In [None]:
from pyspark.ml.feature import StringIndexer

df2 = spark.createDataFrame(
    [(0,"a"), (1, "b"), (2, "c"), (3, "a"), (4, "b"), (5, "c")],
    ["user_id", "category"]
)

In [None]:
df2.show()

+-------+--------+
|user_id|category|
+-------+--------+
|      0|       a|
|      1|       b|
|      2|       c|
|      3|       a|
|      4|       b|
|      5|       c|
+-------+--------+



In [None]:
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")

In [None]:
indexed = indexer.fit(df2).transform(df2)

In [None]:
indexed.show()

+-------+--------+-------------+
|user_id|category|categoryIndex|
+-------+--------+-------------+
|      0|       a|          0.0|
|      1|       b|          1.0|
|      2|       c|          2.0|
|      3|       a|          0.0|
|      4|       b|          1.0|
|      5|       c|          2.0|
+-------+--------+-------------+



## VectorIndexer
- **VectorAssembler** is a transformer that combines a given list of columns into a single vector column. 
- **VectorAssembler** accepts the following input column types: 
  - all numeric types, boolean type, and vector type.  

---

- Assume that we have a DataFrame with the columns id, hour, mobile, userFeatures, and clicked:

id | hour | mobile | userFeatures     | clicked
----|------|--------|------------------|---------
0  | 18   | 1.0    | [0.0, 10.0, 0.5] | 1.0
     
- userFeatures is a vector column that contains three user features.  
- After transformation we should get the following DataFrame:

id | hour | mobile | userFeatures     | clicked | features
----|------|--------|------------------|---------|-----------------------------
0  | 18   | 1.0    | [0.0, 10.0, 0.5] | 1.0     | [18.0, 1.0, 0.0, 10.0, 0.5]

In [None]:
from pyspark.ml.linalg import Vectors 
from pyspark.ml.feature import VectorAssembler

In [None]:
df3 = spark.createDataFrame(
    [(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0)],
    ["id", "hour", "mobile", "userFeatures", "clicked"]
)
df3.show()

+---+----+------+--------------+-------+
| id|hour|mobile|  userFeatures|clicked|
+---+----+------+--------------+-------+
|  0|  18|   1.0|[0.0,10.0,0.5]|    1.0|
+---+----+------+--------------+-------+



In [None]:
assembler = VectorAssembler(
    inputCols = ["hour", "mobile", "userFeatures"],
    outputCol = "features"
)
output = assembler.transform(df3)

In [None]:
output.select("features", "clicked").show()

+--------------------+-------+
|            features|clicked|
+--------------------+-------+
|[18.0,1.0,0.0,10....|    1.0|
+--------------------+-------+



## Example with Customer Data

In [None]:
df.show()

+------+----------+-----+
|  Name|     Phone|Group|
+------+----------+-----+
|  John|4085552424|    I|
|  Mike|3105552738|   II|
|Cassie|4085552424|   II|
| Laura|3105552438|   II|
| Sarah|4085551234|    I|
| David|3105557463|  III|
|  Zach|4085553987|  III|
| Kiera|3105552938|    I|
| Alexa|4085559467|  III|
+------+----------+-----+



In [None]:
indexer = StringIndexer(inputCol="Group", outputCol="groupIndex")
indexed = indexer.fit(df).transform(df)
indexed.show()

+------+----------+-----+----------+
|  Name|     Phone|Group|groupIndex|
+------+----------+-----+----------+
|  John|4085552424|    I|       0.0|
|  Mike|3105552738|   II|       1.0|
|Cassie|4085552424|   II|       1.0|
| Laura|3105552438|   II|       1.0|
| Sarah|4085551234|    I|       0.0|
| David|3105557463|  III|       2.0|
|  Zach|4085553987|  III|       2.0|
| Kiera|3105552938|    I|       0.0|
| Alexa|4085559467|  III|       2.0|
+------+----------+-----+----------+



In [None]:
assembler = VectorAssembler(
    inputCols = ["Phone", "groupIndex"],
    outputCol = "features"
)
output = assembler.transform(indexed)
output.select("Name", "features").show()

+------+-------------------+
|  Name|           features|
+------+-------------------+
|  John|[4.085552424E9,0.0]|
|  Mike|[3.105552738E9,1.0]|
|Cassie|[4.085552424E9,1.0]|
| Laura|[3.105552438E9,1.0]|
| Sarah|[4.085551234E9,0.0]|
| David|[3.105557463E9,2.0]|
|  Zach|[4.085553987E9,2.0]|
| Kiera|[3.105552938E9,0.0]|
| Alexa|[4.085559467E9,2.0]|
+------+-------------------+



## Linear Regression with PySpark 2

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName("lin_reg").getOrCreate()

In [None]:
df = spark.read.csv("Ecommerce_Customers.csv", inferSchema=True, header=True)

In [None]:
df.printSchema()

root
 |-- Email: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Avatar: string (nullable = true)
 |-- Avg Session Length: double (nullable = true)
 |-- Time on App: double (nullable = true)
 |-- Time on Website: double (nullable = true)
 |-- Length of Membership: double (nullable = true)
 |-- Yearly Amount Spent: double (nullable = true)



In [None]:
df.show()

+--------------------+--------------------+----------------+------------------+------------------+------------------+--------------------+-------------------+
|               Email|             Address|          Avatar|Avg Session Length|       Time on App|   Time on Website|Length of Membership|Yearly Amount Spent|
+--------------------+--------------------+----------------+------------------+------------------+------------------+--------------------+-------------------+
|mstephenson@ferna...|835 Frank TunnelW...|          Violet| 34.49726772511229| 12.65565114916675| 39.57766801952616|  4.0826206329529615|  587.9510539684005|
|   hduke@hotmail.com|4547 Archer Commo...|       DarkGreen| 31.92627202636016|11.109460728682564|37.268958868297744|    2.66403418213262|  392.2049334443264|
|    pallen@yahoo.com|24645 Valerie Uni...|          Bisque|33.000914755642675|11.330278057777512|37.110597442120856|   4.104543202376424| 487.54750486747207|
|riverarebecca@gma...|1414 David Throug...|   

In [None]:
df.head()

Row(Email='mstephenson@fernandez.com', Address='835 Frank TunnelWrightmouth, MI 82180-9605', Avatar='Violet', Avg Session Length=34.49726772511229, Time on App=12.65565114916675, Time on Website=39.57766801952616, Length of Membership=4.0826206329529615, Yearly Amount Spent=587.9510539684005)

In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [None]:
df.columns

['Email',
 'Address',
 'Avatar',
 'Avg Session Length',
 'Time on App',
 'Time on Website',
 'Length of Membership',
 'Yearly Amount Spent']

In [None]:
assembler = VectorAssembler(inputCols=['Avg Session Length', 'Time on App',
                                       'Time on Website','Length of Membership'],
                            outputCol='features')

In [None]:
output = assembler.transform(df)

In [None]:
output.select("features").show()

+--------------------+
|            features|
+--------------------+
|[34.4972677251122...|
|[31.9262720263601...|
|[33.0009147556426...|
|[34.3055566297555...|
|[33.3306725236463...|
|[33.8710378793419...|
|[32.0215955013870...|
|[32.7391429383803...|
|[33.9877728956856...|
|[31.9365486184489...|
|[33.9925727749537...|
|[33.8793608248049...|
|[29.5324289670579...|
|[33.1903340437226...|
|[32.3879758531538...|
|[30.7377203726281...|
|[32.1253868972878...|
|[32.3388993230671...|
|[32.1878120459321...|
|[32.6178560628234...|
+--------------------+
only showing top 20 rows



In [None]:
final_data = output.select("features", "Yearly Amount Spent")

In [None]:
train_data, test_data = final_data.randomSplit([0.7, 0.3])

In [None]:
train_data.describe().show()

+-------+-------------------+
|summary|Yearly Amount Spent|
+-------+-------------------+
|  count|                362|
|   mean|  501.1491393902461|
| stddev|  77.10133843526118|
|    min| 256.67058229005585|
|    max|  744.2218671047146|
+-------+-------------------+



In [None]:
test_data.describe().show()

+-------+-------------------+
|summary|Yearly Amount Spent|
+-------+-------------------+
|  count|                138|
|   mean|   494.500222246567|
| stddev|  84.95181183261874|
|    min|   266.086340948469|
|    max|  765.5184619388373|
+-------+-------------------+



In [None]:
from pyspark.ml.regression import LinearRegression

In [None]:
lr = LinearRegression(labelCol='Yearly Amount Spent')

In [None]:
model = lr.fit(train_data)

In [None]:
import pandas as pd

In [None]:
pd.DataFrame({"Coefficients":model.coefficients}, index=['Avg Session Length', 'Time on App',
                                       'Time on Website','Length of Membership'])

Unnamed: 0,Coefficients
Avg Session Length,25.752205
Time on App,39.396649
Time on Website,0.491886
Length of Membership,61.227496


In [None]:
res = model.evaluate(test_data)

In [None]:
res.residuals.show()

+-------------------+
|          residuals|
+-------------------+
| 11.652868881650193|
|-11.675731134906414|
| -5.196849520818262|
|-17.438580116879734|
|  -4.65535635127452|
|  -4.35657976156466|
| -3.733584859298844|
| 3.2279193114615055|
|  2.454174490866592|
| -1.727017481807593|
|  18.20670747878171|
|  16.59363435001461|
|-26.212788151766404|
| -2.528969527062827|
|-19.182971363148056|
|-1.2422155981684568|
| -9.576885889320181|
| 12.941876205740868|
|-1.8250912153936838|
| 12.448230647836965|
+-------------------+
only showing top 20 rows



In [None]:
unlabeled_data = test_data.select("features")

In [None]:
predictions = model.transform(unlabeled_data)

In [None]:
predictions.show()

+--------------------+------------------+
|            features|        prediction|
+--------------------+------------------+
|[29.5324289670579...| 396.9874821909773|
|[30.3931845423455...|    331.6046009381|
|[30.4925366965402...| 287.6680952407328|
|[30.8162006488763...|283.52492106534874|
|[30.8364326747734...| 472.1572567782641|
|[30.8794843441274...|494.56317974641934|
|[31.2681042107507...|427.20411803312277|
|[31.3091926408918...| 429.4927985284721|
|[31.5316044825729...|434.06143123849597|
|[31.5761319713222...| 542.9536014711359|
|[31.6005122003032...| 460.9661440123152|
|[31.6098395733896...|427.95191530109355|
|[31.6739155032749...| 501.9378560616476|
|[31.8124825597242...|395.33931451086005|
|[31.8164283341993...| 520.3054628668044|
|[31.8186165667690...| 447.6608889683041|
|[31.8279790554652...| 449.5796334362617|
|[31.9096268275227...| 550.5041594674983|
|[31.9120759292006...| 389.3598075211014|
|[31.9262720263601...|379.75670279648944|
+--------------------+------------

In [None]:
print("MAE:", res.meanAbsoluteError)
print("MSE:", res.meanSquaredError)
print("RMSE:", res.rootMeanSquaredError)
print("R2", res.r2)
print("Adj R2", res.r2adj)

MAE: 7.489850335633214
MSE: 84.76608914377711
RMSE: 9.206850120631763
R2 0.9881686207931215
Adj R2 0.9878127898395311


# Logistic Regression with PySpark

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName("log_reg").getOrCreate()

In [None]:
!curl https://raw.githubusercontent.com/apache/spark/master/data/mllib/sample_libsvm_data.txt >> sample_libsvm_data_2.txt

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  102k  100  102k    0     0   349k      0 --:--:-- --:--:-- --:--:--  350k


In [None]:
df = spark.read.format("libsvm").load("sample_libsvm_data.txt")

In [None]:
df.printSchema()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)



In [None]:
from pyspark.ml.classification import LogisticRegression

In [None]:
lr = LogisticRegression()

model = lr.fit(df)

summary = model.summary

In [None]:
summary.predictions.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(692,[127,128,129...|[19.8534775947478...|[0.99999999761359...|       0.0|
|  1.0|(692,[158,159,160...|[-20.377398194908...|[1.41321555111056...|       1.0|
|  1.0|(692,[124,125,126...|[-27.401459284891...|[1.25804865126979...|       1.0|
|  1.0|(692,[152,153,154...|[-18.862741612668...|[6.42710509170303...|       1.0|
|  1.0|(692,[151,152,153...|[-20.483011833009...|[1.27157209200604...|       1.0|
|  0.0|(692,[129,130,131...|[19.8506078990277...|[0.99999999760673...|       0.0|
|  1.0|(692,[158,159,160...|[-20.337256674833...|[1.47109814695581...|       1.0|
|  1.0|(692,[99,100,101,...|[-19.595579753418...|[3.08850168102631...|       1.0|
|  0.0|(692,[154,155,156...|[19.2708803215613...|[0.99999999572670...|       0.0|
|  0.0|(692,[127

In [None]:
from pyspark.mllib.evaluation import MulticlassMetrics

In [None]:
model.evaluate(df)

<pyspark.ml.classification.BinaryLogisticRegressionSummary at 0x7fbe44cae400>

In [None]:
pred_and_labels = model.evaluate(df)

In [None]:
pred_and_labels.predictions.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(692,[127,128,129...|[19.8534775947478...|[0.99999999761359...|       0.0|
|  1.0|(692,[158,159,160...|[-20.377398194908...|[1.41321555111056...|       1.0|
|  1.0|(692,[124,125,126...|[-27.401459284891...|[1.25804865126979...|       1.0|
|  1.0|(692,[152,153,154...|[-18.862741612668...|[6.42710509170303...|       1.0|
|  1.0|(692,[151,152,153...|[-20.483011833009...|[1.27157209200604...|       1.0|
|  0.0|(692,[129,130,131...|[19.8506078990277...|[0.99999999760673...|       0.0|
|  1.0|(692,[158,159,160...|[-20.337256674833...|[1.47109814695581...|       1.0|
|  1.0|(692,[99,100,101,...|[-19.595579753418...|[3.08850168102631...|       1.0|
|  0.0|(692,[154,155,156...|[19.2708803215613...|[0.99999999572670...|       0.0|
|  0.0|(692,[127

In [None]:
pred_and_labels = pred_and_labels.predictions.select("label", "prediction")

In [None]:
pred_and_labels.show()

+-----+----------+
|label|prediction|
+-----+----------+
|  0.0|       0.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  0.0|       0.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  1.0|       1.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  1.0|       1.0|
|  0.0|       0.0|
|  1.0|       1.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  1.0|       1.0|
|  1.0|       1.0|
+-----+----------+
only showing top 20 rows



# Evaluation

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

In [None]:
eval = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="label")

In [None]:
eval_multi = MulticlassClassificationEvaluator(predictionCol="prediction", 
                                               labelCol="label", 
                                               metricName="accuracy")

In [None]:
acc = eval.evaluate(pred_and_labels)

In [None]:
acc

1.0

# Logistic Regression: Titantic Dataset

In [None]:
!curl https://raw.githubusercontent.com/markumreed/colab_pyspark/main/titanic.csv >> titanic_2.csv

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100 60302  100 60302    0     0   303k      0 --:--:-- --:--:-- --:--:--  303k


In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName("titanic").getOrCreate()

In [None]:
df = spark.read.csv("titanic.csv", inferSchema=True, header=True)

In [None]:
df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [None]:
df.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

In [None]:
data = df.select([
 'Survived',
 'Pclass',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Fare',
 'Embarked'])

In [None]:
data.head()

Row(Survived=0, Pclass=3, Sex='male', Age=22.0, SibSp=1, Parch=0, Fare=7.25, Embarked='S')

In [None]:
data_final = data.na.drop()

# Categorical Data with PySpark

In [None]:
from pyspark.ml.feature import (VectorAssembler, VectorIndexer,
                                OneHotEncoder, StringIndexer)

In [None]:
gender_indexer = StringIndexer(inputCol="Sex", outputCol="SexIndex")
gender_ecoder = OneHotEncoder(inputCol="SexIndex", outputCol="SexVec")

embark_indexer = StringIndexer(inputCol="Embarked", outputCol="EmbarkIndex")
embark_ecoder = OneHotEncoder(inputCol="EmbarkIndex", outputCol="EmbarkVec")


In [None]:
assembler = VectorAssembler(inputCols=["Pclass", "SexVec", "Age", "SibSp",
                                       "Parch", "Fare", "EmbarkVec"],
                            outputCol="features")

In [None]:
from pyspark.ml.classification import LogisticRegression

# Pipelines

In [None]:
from pyspark.ml import Pipeline

In [None]:
lr = LogisticRegression(featuresCol='features', labelCol="Survived")

In [None]:
pipeline = Pipeline(stages=[
                            gender_indexer,embark_indexer,
                            gender_ecoder,embark_ecoder,
                            assembler, lr
])

In [None]:
train, test = data_final.randomSplit([0.7, 0.3], seed=42)

In [None]:
model_fit = pipeline.fit(train)
res = model_fit.transform(test)

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [None]:
eval = BinaryClassificationEvaluator(rawPredictionCol='prediction',
                                     labelCol='Survived')

In [None]:
res.select('Survived', 'prediction').show()

+--------+----------+
|Survived|prediction|
+--------+----------+
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
|       0|       0.0|
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       1.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       1.0|
+--------+----------+
only showing top 20 rows



In [None]:
auc = eval.evaluate(res)

In [None]:
auc

0.7747561675272518

# Clustering with PySpark


In [None]:
!curl https://raw.githubusercontent.com/apache/spark/master/data/mllib/sample_kmeans_data.txt >> sample_kmeans_data.txt

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   120  100   120    0     0    357      0 --:--:-- --:--:-- --:--:--   357


In [None]:
!curl https://archive.ics.uci.edu/ml/machine-learning-databases/00236/seeds_dataset.txt >> seeds_dataset.txt

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  9300  100  9300    0     0  14975      0 --:--:-- --:--:-- --:--:-- 14975


# K-means


In [None]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("sample_cluster").getOrCreate()

In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

In [None]:
df = spark.read.format("libsvm").load("sample_kmeans_data.txt")

In [None]:
df.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|           (3,[],[])|
|  1.0|(3,[0,1,2],[0.1,0...|
|  2.0|(3,[0,1,2],[0.2,0...|
|  3.0|(3,[0,1,2],[9.0,9...|
|  4.0|(3,[0,1,2],[9.1,9...|
|  5.0|(3,[0,1,2],[9.2,9...|
+-----+--------------------+



In [None]:
kmeans = KMeans().setK(2).setSeed(42)
model = kmeans.fit(df)

In [None]:
pred = model.transform(df)

In [None]:
eval = ClusteringEvaluator()

In [None]:
silhouette = eval.evaluate(pred)
print(f"Silhouette with squared euclidean distance: {silhouette}")

Silhouette with squared euclidean distance: 0.9997530305375207


In [None]:
centers = model.clusterCenters()
print("Cluster Centers:")
print("=================")
for center in centers:
  print(center)

Cluster Centers:
[0.1 0.1 0.1]
[9.1 9.1 9.1]


# Seeds Clustering Data from UCI

Attribute Information:

To construct the data, seven geometric parameters of wheat kernels were measured: 
1. area A, 
2. perimeter P, 
3. compactness C = 4*pi*A/P^2, 
4. length of kernel, 
5. width of kernel, 
6. asymmetry coefficient 
7. length of kernel groove. 

All of these parameters were real-valued continuous.

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName("seeds").getOrCreate()

In [None]:
from pyspark.ml.clustering import KMeans

In [None]:
df = spark.read.csv("seeds_dataset.csv", header=True, inferSchema=True)

In [None]:
df.show()

+-----+---------+-----------+------------------+------------------+---------------------+------------------+
| area|perimeter|compactness|  length_of_kernel|   width_of_kernel|asymmetry_coefficient|  length_of_groove|
+-----+---------+-----------+------------------+------------------+---------------------+------------------+
|15.26|    14.84|      0.871|             5.763|             3.312|                2.221|              5.22|
|14.88|    14.57|     0.8811| 5.553999999999999|             3.333|                1.018|             4.956|
|14.29|    14.09|      0.905|             5.291|3.3369999999999997|                2.699|             4.825|
|13.84|    13.94|     0.8955|             5.324|3.3789999999999996|                2.259|             4.805|
|16.14|    14.99|     0.9034|5.6579999999999995|             3.562|                1.355|             5.175|
|14.38|    14.21|     0.8951|             5.386|             3.312|   2.4619999999999997|             4.956|
|14.69|    14.49|  

In [None]:
df.describe().show()

+-------+------------------+------------------+--------------------+-------------------+------------------+---------------------+-------------------+
|summary|              area|         perimeter|         compactness|   length_of_kernel|   width_of_kernel|asymmetry_coefficient|   length_of_groove|
+-------+------------------+------------------+--------------------+-------------------+------------------+---------------------+-------------------+
|  count|               210|               210|                 210|                210|               210|                  210|                210|
|   mean|14.847523809523816|14.559285714285718|  0.8709985714285714|  5.628533333333335| 3.258604761904762|   3.7001999999999997|  5.408071428571429|
| stddev|2.9096994306873647|1.3059587265640225|0.023629416583846364|0.44306347772644983|0.3777144449065867|   1.5035589702547392|0.49148049910240543|
|    min|             10.59|             12.41|              0.8081|              4.899|            

## Format Data

In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [None]:
df.columns

['area',
 'perimeter',
 'compactness',
 'length_of_kernel',
 'width_of_kernel',
 'asymmetry_coefficient',
 'length_of_groove']

In [None]:
assembler = VectorAssembler(inputCols=df.columns, outputCol='features')

In [None]:
df_final = assembler.transform(df)

In [None]:
df_final.show()

+-----+---------+-----------+------------------+------------------+---------------------+------------------+--------------------+
| area|perimeter|compactness|  length_of_kernel|   width_of_kernel|asymmetry_coefficient|  length_of_groove|            features|
+-----+---------+-----------+------------------+------------------+---------------------+------------------+--------------------+
|15.26|    14.84|      0.871|             5.763|             3.312|                2.221|              5.22|[15.26,14.84,0.87...|
|14.88|    14.57|     0.8811| 5.553999999999999|             3.333|                1.018|             4.956|[14.88,14.57,0.88...|
|14.29|    14.09|      0.905|             5.291|3.3369999999999997|                2.699|             4.825|[14.29,14.09,0.90...|
|13.84|    13.94|     0.8955|             5.324|3.3789999999999996|                2.259|             4.805|[13.84,13.94,0.89...|
|16.14|    14.99|     0.9034|5.6579999999999995|             3.562|                1.355| 

## Scaling

In [None]:
from pyspark.ml.feature import StandardScaler

In [None]:
scaler = StandardScaler(inputCol='features',outputCol='scaledFeatures', withStd=True, withMean=False)

In [None]:
scaledModel = scaler.fit(df_final)

In [None]:
df_final = scaledModel.transform(df_final)

In [None]:
df_final.show()

+-----+---------+-----------+------------------+------------------+---------------------+------------------+--------------------+--------------------+
| area|perimeter|compactness|  length_of_kernel|   width_of_kernel|asymmetry_coefficient|  length_of_groove|            features|      scaledFeatures|
+-----+---------+-----------+------------------+------------------+---------------------+------------------+--------------------+--------------------+
|15.26|    14.84|      0.871|             5.763|             3.312|                2.221|              5.22|[15.26,14.84,0.87...|[5.24452795332028...|
|14.88|    14.57|     0.8811| 5.553999999999999|             3.333|                1.018|             4.956|[14.88,14.57,0.88...|[5.11393027165175...|
|14.29|    14.09|      0.905|             5.291|3.3369999999999997|                2.699|             4.825|[14.29,14.09,0.90...|[4.91116018695588...|
|13.84|    13.94|     0.8955|             5.324|3.3789999999999996|                2.259|     

## Train and Eval

In [None]:
kmeans = KMeans(featuresCol='scaledFeatures', k=3)
model = kmeans.fit(df_final)

In [None]:
pred = model.transform(df_final)

In [None]:
from pyspark.ml.evaluation import ClusteringEvaluator

In [None]:
eval = ClusteringEvaluator()

In [None]:
silhouette = eval.evaluate(pred)
print(f"Silhouette with squared euclidean distance: {silhouette}")

Silhouette with squared euclidean distance: 0.6300001033389961


In [None]:
centers = model.clusterCenters()
print("Cluster Centers:")
for center in centers:
  print(center)

Cluster Centers:
[ 4.07497225 10.14410142 35.89816849 11.80812742  7.54416916  3.15410901
 10.38031464]
[ 6.35645488 12.40730852 37.41990178 13.93860446  9.7892399   2.41585013
 12.29286107]
[ 4.96198582 10.97871333 37.30930808 12.44647267  8.62880781  1.80061978
 10.41913733]


# Random Forest Classifier with PySpark

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName("rf").getOrCreate()

In [None]:
df = spark.read.format("libsvm").load("sample_libsvm_data.txt")

In [None]:
df.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(692,[127,128,129...|
|  1.0|(692,[158,159,160...|
|  1.0|(692,[124,125,126...|
|  1.0|(692,[152,153,154...|
|  1.0|(692,[151,152,153...|
|  0.0|(692,[129,130,131...|
|  1.0|(692,[158,159,160...|
|  1.0|(692,[99,100,101,...|
|  0.0|(692,[154,155,156...|
|  0.0|(692,[127,128,129...|
|  1.0|(692,[154,155,156...|
|  0.0|(692,[153,154,155...|
|  0.0|(692,[151,152,153...|
|  1.0|(692,[129,130,131...|
|  0.0|(692,[154,155,156...|
|  1.0|(692,[150,151,152...|
|  0.0|(692,[124,125,126...|
|  0.0|(692,[152,153,154...|
|  1.0|(692,[97,98,99,12...|
|  1.0|(692,[124,125,126...|
+-----+--------------------+
only showing top 20 rows



# Train Test Split

In [None]:
(train, test) = df.randomSplit([0.7, 0.3], seed=42)

In [None]:
test.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(692,[100,101,102...|
|  0.0|(692,[123,124,125...|
|  0.0|(692,[123,124,125...|
|  0.0|(692,[124,125,126...|
|  0.0|(692,[124,125,126...|
|  0.0|(692,[124,125,126...|
|  0.0|(692,[125,126,127...|
|  0.0|(692,[126,127,128...|
|  0.0|(692,[126,127,128...|
|  0.0|(692,[126,127,128...|
|  0.0|(692,[127,128,129...|
|  0.0|(692,[129,130,131...|
|  0.0|(692,[150,151,152...|
|  0.0|(692,[151,152,153...|
|  0.0|(692,[152,153,154...|
|  0.0|(692,[153,154,155...|
|  0.0|(692,[153,154,155...|
|  0.0|(692,[154,155,156...|
|  0.0|(692,[234,235,237...|
|  1.0|(692,[97,98,99,12...|
+-----+--------------------+
only showing top 20 rows



In [None]:
train.printSchema()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)



# Train RF Model

In [None]:
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=20,seed=42)

In [None]:
model = rf.fit(train)

In [None]:
pred = model.transform(test)

In [None]:
pred.printSchema()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [None]:
pred.select("prediction", "label", "features").show(5)

+----------+-----+--------------------+
|prediction|label|            features|
+----------+-----+--------------------+
|       0.0|  0.0|(692,[100,101,102...|
|       0.0|  0.0|(692,[123,124,125...|
|       0.0|  0.0|(692,[123,124,125...|
|       0.0|  0.0|(692,[124,125,126...|
|       0.0|  0.0|(692,[124,125,126...|
+----------+-----+--------------------+
only showing top 5 rows



In [None]:
eval = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

In [None]:
acc = eval.evaluate(pred)

In [None]:
print("Test Error = %g" % (1.0 - acc))

Test Error = 0


In [None]:
model.featureImportances

SparseVector(692, {183: 0.0041, 272: 0.0463, 299: 0.0091, 300: 0.0441, 327: 0.0083, 351: 0.05, 373: 0.0403, 397: 0.003, 399: 0.037, 400: 0.0338, 405: 0.0397, 406: 0.05, 407: 0.1575, 412: 0.0428, 413: 0.091, 426: 0.0072, 429: 0.0028, 430: 0.0069, 435: 0.0163, 455: 0.0548, 460: 0.0031, 468: 0.0061, 469: 0.0037, 483: 0.0472, 510: 0.0409, 511: 0.0912, 518: 0.005, 568: 0.0371, 603: 0.009, 606: 0.0027, 634: 0.0089})

# Gradient Boosted Trees

In [None]:
from pyspark.ml.classification import GBTClassifier

In [None]:
gbt = GBTClassifier(labelCol="label", featuresCol="features", maxIter=10, seed=42)

In [None]:
model = gbt.fit(train)

In [None]:
pred = model.transform(test)

In [None]:
pred.select("prediction", "label", "features").show(5)

+----------+-----+--------------------+
|prediction|label|            features|
+----------+-----+--------------------+
|       0.0|  0.0|(692,[100,101,102...|
|       0.0|  0.0|(692,[123,124,125...|
|       0.0|  0.0|(692,[123,124,125...|
|       0.0|  0.0|(692,[124,125,126...|
|       0.0|  0.0|(692,[124,125,126...|
+----------+-----+--------------------+
only showing top 5 rows



In [None]:
eval = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
acc = eval.evaluate(pred)
print("Test Error = %g" % (1.0 - acc))

Test Error = 0.0571429


## Tree Methods with PySpark
1. Single Decision Tree
1. Random Forest
1. Gradient Boosted Tree Classifier

In [5]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("trees").getOrCreate()

In [6]:
df = spark.read.csv("College.csv", inferSchema=True, header=True)

In [7]:
df.printSchema()

root
 |-- School: string (nullable = true)
 |-- Private: string (nullable = true)
 |-- Apps: integer (nullable = true)
 |-- Accept: integer (nullable = true)
 |-- Enroll: integer (nullable = true)
 |-- Top10perc: integer (nullable = true)
 |-- Top25perc: integer (nullable = true)
 |-- F_Undergrad: integer (nullable = true)
 |-- P_Undergrad: integer (nullable = true)
 |-- Outstate: integer (nullable = true)
 |-- Room_Board: integer (nullable = true)
 |-- Books: integer (nullable = true)
 |-- Personal: integer (nullable = true)
 |-- PhD: integer (nullable = true)
 |-- Terminal: integer (nullable = true)
 |-- S_F_Ratio: double (nullable = true)
 |-- perc_alumni: integer (nullable = true)
 |-- Expend: integer (nullable = true)
 |-- Grad_Rate: integer (nullable = true)



In [9]:
df.head(2)

[Row(School='Abilene Christian University', Private='Yes', Apps=1660, Accept=1232, Enroll=721, Top10perc=23, Top25perc=52, F_Undergrad=2885, P_Undergrad=537, Outstate=7440, Room_Board=3300, Books=450, Personal=2200, PhD=70, Terminal=78, S_F_Ratio=18.1, perc_alumni=12, Expend=7041, Grad_Rate=60),
 Row(School='Adelphi University', Private='Yes', Apps=2186, Accept=1924, Enroll=512, Top10perc=16, Top25perc=29, F_Undergrad=2683, P_Undergrad=1227, Outstate=12280, Room_Board=6450, Books=750, Personal=1500, PhD=29, Terminal=30, S_F_Ratio=12.2, perc_alumni=16, Expend=10527, Grad_Rate=56)]

In [8]:
df.columns

['School',
 'Private',
 'Apps',
 'Accept',
 'Enroll',
 'Top10perc',
 'Top25perc',
 'F_Undergrad',
 'P_Undergrad',
 'Outstate',
 'Room_Board',
 'Books',
 'Personal',
 'PhD',
 'Terminal',
 'S_F_Ratio',
 'perc_alumni',
 'Expend',
 'Grad_Rate']

# Formatting for Spark

In [16]:
# "label", "features"
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [13]:
df.printSchema()

root
 |-- School: string (nullable = true)
 |-- Private: string (nullable = true)
 |-- Apps: integer (nullable = true)
 |-- Accept: integer (nullable = true)
 |-- Enroll: integer (nullable = true)
 |-- Top10perc: integer (nullable = true)
 |-- Top25perc: integer (nullable = true)
 |-- F_Undergrad: integer (nullable = true)
 |-- P_Undergrad: integer (nullable = true)
 |-- Outstate: integer (nullable = true)
 |-- Room_Board: integer (nullable = true)
 |-- Books: integer (nullable = true)
 |-- Personal: integer (nullable = true)
 |-- PhD: integer (nullable = true)
 |-- Terminal: integer (nullable = true)
 |-- S_F_Ratio: double (nullable = true)
 |-- perc_alumni: integer (nullable = true)
 |-- Expend: integer (nullable = true)
 |-- Grad_Rate: integer (nullable = true)



In [11]:
df.columns

['School',
 'Private',
 'Apps',
 'Accept',
 'Enroll',
 'Top10perc',
 'Top25perc',
 'F_Undergrad',
 'P_Undergrad',
 'Outstate',
 'Room_Board',
 'Books',
 'Personal',
 'PhD',
 'Terminal',
 'S_F_Ratio',
 'perc_alumni',
 'Expend',
 'Grad_Rate']

In [18]:
assembler = VectorAssembler(
    inputCols=['Apps',
 'Accept',
 'Enroll',
 'Top10perc',
 'Top25perc',
 'F_Undergrad',
 'P_Undergrad',
 'Outstate',
 'Room_Board',
 'Books',
 'Personal',
 'PhD',
 'Terminal',
 'S_F_Ratio',
 'perc_alumni',
 'Expend',
 'Grad_Rate'          
    ],
    outputCol="features"
)

In [19]:
output = assembler.transform(df)

# String Variables (Private)

In [20]:
from pyspark.ml.feature import StringIndexer

In [25]:
indexer = StringIndexer(inputCol="Private", outputCol="PrivateIndexer")
output_fixed = indexer.fit(output).transform(output)

In [26]:
df_final = output_fixed.select("features", "PrivateIndexer")

In [27]:
train, test = df_final.randomSplit([0.7, 0.3], seed=42)

# Tree Classifiers

In [28]:
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier, GBTClassifier
from pyspark.ml import Pipeline

## CREATE Models

In [29]:
dtc = DecisionTreeClassifier(labelCol="PrivateIndexer", featuresCol="features")
rfc = RandomForestClassifier(labelCol="PrivateIndexer", featuresCol="features")
gbt = GBTClassifier(labelCol="PrivateIndexer", featuresCol="features")

In [30]:
dtc_model = dtc.fit(train)
rfc_model = rfc.fit(train)
gbt_model = gbt.fit(train)

# Predictions

In [31]:
dtc_pred = dtc_model.transform(test)
rfc_pred = rfc_model.transform(test)
gbt_pred = gbt_model.transform(test)

# Eval

In [32]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [33]:
evaluator = MulticlassClassificationEvaluator(labelCol="PrivateIndexer", predictionCol="prediction", metricName="accuracy")

In [34]:
dtc_acc = evaluator.evaluate(dtc_pred)
rfc_acc = evaluator.evaluate(rfc_pred)
gbt_acc = evaluator.evaluate(gbt_pred)

In [38]:
print("-"*10)
print(f"DT Acc: {dtc_acc}")
print("-"*10)
print(f"RFC Acc: {rfc_acc}")
print("-"*10)
print(f"GBT Acc: {gbt_acc}")
print("-"*10)

----------
DT Acc: 0.925
----------
RFC Acc: 0.945
----------
GBT Acc: 0.94
----------
