# PySpark tutorial

## What is PySpark?


pyspark is a python api for working with apache spark.

* **Python api**: you can use the syntex and agility of python to interact with and send commands to a system that is not based, at its core, on python.

* **Apache Spark**: a system designed for working, analyzing and modeling with immense amounts of data in many computers at the same time. Putting it in a different way, apache spark allows you to run computations in parallel, instead of sequentially. It allows you to divide one incredibly large task into many smaller tasks, and run each such task on a different machine. This allowes you to accomplish your analysis goals in reasonable time that would not be possible on a single machine.

usually, we would define the amount of data that suits PySpark as what would not fit into a single machine storage (let alone RAM).

**important related concepts:**
1. distributed computing - when you distribute a task into several smaller task that run at the same time. this is what pyspark allows you to do with many machines, but it can also be done on a single machine with several threads, for example.
2. cluster - a network of machines that can take on tasks from a user, interact with one another and return results. these provide the computing resources that pyspark will use to make the computations.
3. Resilient Distributed Dataset (RDD) - an immutable distributed collection of data. it is not tabular and has no data schema. therefore, for tabular data wrangling, DataFrames allowes for more API options and uner-then-hood optimizations. still, you might encounter RDDs as you learn more about Spark, and should be aware of their existence.

**Part of PySpark we will cover:**
1. PySpark SQL - contains commands for data processing and manipulation.
2. PySpark MLlib - includes a variety of models, model training and related commands.

**Spark Architecture:**
to send commands and receive results from a cluster, you will need to initiate a spark session. this object is your tool for interacting with Spark. each user of the cluster will have its own Spark Session, that will allow him to use the cluster in isolation from other users. all of the sessions are communicating with a spark context, which is the master node in the cluster - that is, it assigns each of computers in the cluster tasks and coordinates them. each of the computers in the cluster that perform tasks for a master node is called a worker node. to connect to a worker node, the master node needs to get that node's comput power allocated to it, by a cluster manager, that is responsable for distributing the cluster resources. inside each worker node, there are execute programs that run the tasks - they can run multiple tasks simultaneously, and has their own cashe for storing results. so, each master node can have multiple worker nodes, that can have multiple tasks running.  

In [None]:
!uname -a

In [None]:
!pip install pyspark # ~ 1 min

In [None]:
# !apt-get install openjdk-8-jdk-headless -qq > /dev/null
# !wget -q https://www-us.apache.org/dist/spark/spark-3.0.2/spark-3.0.2-bin-hadoop2.7.tgz
# !tar xf spark-3.0.2-bin-hadoop2.7.tgz
# !pip install -q findspark
# import os
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.0.2-bin-hadoop2.7"

# Spark DataFrame Basics

Spark DataFrames allow for easy handling of large datasets.

* Easy syntax
* Ability to use SQL directly in the dataframe
* Operations are automatically distributed across RDDs

## Create a DataFrame


In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName("pyspark_basics").getOrCreate()

In [None]:
%%writefile user_simple.json
{"name":"Bob"}
{"name":"Jim", "age":40}
{"name":"Mary", "age": 24}

In [None]:
df = spark.read.json("user_simple.json")

In [None]:
!pwd

In [None]:
!ls

In [None]:
df

## Show DataFrame


In [None]:
df.show()

In [None]:
df.printSchema()

In [None]:
df.columns

In [None]:
df.describe()

In [None]:
df.describe().show()

In [None]:
from google.colab import drive
drive.mount('/content/drive')

## Specifying Schema Structure

- Some data types make it easier to infer schema.

- Often have to set the schema yourself

- Spark has tools to help specify the structure

Next we need to create the list of Structure fields
  * :param name: string, name of the field.
  * :param dataType: :class:`DataType` of the field.
  * :param nullable: boolean, whether the field can be null (None)

In [None]:
from pyspark.sql.types import StructField, StringType, IntegerType, StructType

In [None]:
data_schema = [StructField("age", IntegerType(), True), StructField("name",StringType(), True)]

In [None]:
final_struc = StructType(fields=data_schema)

In [None]:
df = spark.read.json("user_simple.json", schema=final_struc)

In [None]:
df.printSchema()

In [None]:
df.show()

## Grab Data

In [None]:
df['age']

In [None]:
type(df['age'])

In [None]:
df.select("age")

In [None]:
type(df.select("age"))

In [None]:
df.select("age").show()

In [None]:
df.head(2)

In [None]:
df.select(["name","age"])

In [None]:
df.select(["name","age"]).show()

## Create New Columns

In [None]:
df.withColumn("newAge", df['age']).show()

In [None]:
df.show()

In [None]:
df.withColumnRenamed("name","firstName").show()

In [None]:
df.show()

In [None]:
df.withColumn("agePlusTen", df['age']+10).show()

In [None]:
df.withColumn("age_minus_5", df['age']-5).show()

## Using SQL

In [None]:
df.createOrReplaceTempView("custmers")

In [None]:
sql_results = spark.sql("SELECT * from custmers")

In [None]:
sql_results

In [None]:
sql_results.show()

In [None]:
spark.sql("SELECT * FROM custmers WHERE age=24").show()

## DataFrame Operations

- Cover basic operations with Spark DataFrames.
- Use stock data from Walmart.

In [None]:
!curl https://raw.githubusercontent.com/markumreed/colab_pyspark/main/WMT.csv >> WMT.csv

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("operations").getOrCreate()
df = spark.read.csv('WMT.csv',inferSchema=True,header=True)

In [None]:
df.printSchema()

In [None]:
df.head(5)

## Filtering Data

- DataFrames allow for quick filtering of data based on conditions


In [None]:
df.filter('Close<62').show()

In [None]:
df.filter('Close<62').select('Open').show()

In [None]:
df.filter('Close<62').select(['Date','Open']).show()

## Using Comparison Operators
- Using comparison operators will look similar to SQL operators
- Make to call the entire column within the dataframe

In [None]:
df.filter(df['Close'] < 62).show()

In [None]:
df.filter((df['Close'] < 62) & ~(df['Open'] > 60)).show()

In [None]:
df.filter(df['Open'] == 60.98).show(1)

In [None]:
df.filter(df['Open'] == 60.98).collect()

In [None]:
res =df.filter(df['Open'] == 60.98).collect()

In [None]:
type(res[0])

In [None]:
res[0].asDict()

In [None]:
for item in res[0]:
  print(item)

In [None]:
import pandas as pd

In [None]:
pd.Series(res[0].asDict())

# GroupBy and Aggregate Functions
- `GroupBy` allows you to group rows together based off some column value
- Once you've performed the `GroupBy` operation you can use an aggregate function off that data.
- An aggregate function aggregates multiple rows of data into a single output, such as taking the sum of inputs, or counting the number of inputs.



In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName("groupbyagg").getOrCreate()

## Import Data


In [None]:
!curl https://raw.githubusercontent.com/markumreed/colab_pyspark/main/sales_data.csv >> sales_data.csv

In [None]:
df = spark.read.csv("sales_data.csv", inferSchema=True, header=True)

In [None]:
df.printSchema()

In [None]:
df.show()

## Grouping Data
- Group the data by company

In [None]:
df.groupBy("company")

## Aggregate Functions
- mean, count, max, min, sum...

In [None]:
df.groupBy("company").mean().show()

In [None]:
df.groupBy("company").count().show()

In [None]:
df.groupBy("company").min().show()

In [None]:
df.groupBy("company").max().show()

In [None]:
df.groupBy("company").sum().show()

## Aggregating

- Not all methods need a groupby call, instead you can just call the generalized `.agg()` method, that will call the aggregate across all rows in the dataframe column specified.
- It can take in arguments as a single column, or create multiple aggregate calls all at once using dictionary notation.


In [None]:
df.agg({"num_sales":"max"}).show()

In [None]:
df.groupBy("company").agg({"num_sales":"mean"}).show()

In [None]:
company_groups = df.groupBy("company")

In [None]:
company_groups.min().show()

## Functions
There are a variety of functions you can import from pyspark.sql.functions.

In [None]:
from pyspark.sql.functions import countDistinct, avg, stddev

In [None]:
df.select(countDistinct("num_sales")).show()

In [None]:
df.select(avg("num_sales")).show()

In [None]:
df.select(stddev("num_sales")).show()

### Alias
- To change the name, use the `.alias()` method for this:

In [None]:
df.select(countDistinct("num_sales").alias("ANYTHING WE WANT")).show()

### Precision
- Use the `format_number` to change precision


In [None]:
from pyspark.sql.functions import format_number

In [None]:
sales_std = df.select(stddev("num_sales").alias("stddev"))

In [None]:
sales_std.show()

In [None]:
sales_std.select(format_number("stddev",2)).show()

## Order By


In [None]:
df.orderBy("num_sales").show() # Ascending Order

In [None]:
df.orderBy(df['num_sales'].desc()).show()

# Missing Data

- Often data sources are incomplete
- There are 3 options for filling in missing data:
  1. Just keep the missing data points.
  1. Drop them missing data points/row
  1. Fill them in with some other value.

## Keeping the missing data
A few machine learning algorithms can easily deal with missing data, let's see what it looks like:

In [None]:
!curl https://raw.githubusercontent.com/markumreed/colab_pyspark/main/missing_data.csv >> missing_data.csv

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("missing_data").getOrCreate()

In [None]:
df = spark.read.csv("missing_data.csv", header=True, inferSchema=True)

In [None]:
df.show()

In [None]:
df.printSchema()

## Drop the missing data

You can use the `.na` functions for missing data. The `drop` command has the following parameters:

```df.na.drop(how='any', thresh=None, subset=None)```
    
    * param how: 'any' or 'all'.
    
        If 'any', drop a row if it contains any nulls.
        If 'all', drop a row only if all its values are null.
    
    * param thresh: int, default None
    
        If specified, drop rows that have less than `thresh` non-null values.
        This overwrites the `how` parameter.
        
    * param subset:
        optional list of column names to consider.


In [None]:
df.na.drop().show()

In [None]:
df.na.drop(thresh=2).show()

In [None]:
df.na.drop(subset=['sales']).show()

In [None]:
df.na.drop(how='any').show()

In [None]:
df.na.drop(how='all').show()

## Fill the missing values

We can also fill the missing values with new values. If you have multiple nulls across multiple data types, Spark is actually smart enough to match up the data types. For example:


In [None]:
df.na.fill('سهیل').show()

In [None]:
df.na.fill(999).show()

In [None]:
df.na.fill("Missing Name", subset=["name"]).show()

In [None]:
from pyspark.sql.functions import mean

In [None]:
mean_value = df.select(mean(df['sales'])).collect()

In [None]:
mean_sales_value = mean_value[0][0]

In [None]:
df.na.fill(mean_sales_value, ["sales"]).show()

In [None]:
# DON'T DO THIS
df.na.fill(df.select(mean(df['sales'])).collect()[0][0] ,['sales']).show() # NOT EASY TO READ

# Dates and Timestamps

You will often find yourself working with Time and Date information


In [None]:
!curl https://raw.githubusercontent.com/markumreed/colab_pyspark/main/WMT.csv >> WMT.csv

In [None]:
spark = SparkSession.builder.appName('walmart_dates').getOrCreate()

In [None]:
df = spark.read.csv('WMT.csv', header=True, inferSchema=True)

In [None]:
df.show()

In [None]:
from pyspark.sql.functions import format_number, dayofmonth, hour, dayofyear, month, year, weekofyear, date_format

In [None]:
df.select(dayofmonth(df['Date'])).show()

In [None]:
df.select(hour(df['Date'])).show()

In [None]:
df.select(dayofyear(df['Date'])).show()

In [None]:
df.select(month(df['Date'])).show()

Find Avg Close Price per month.

In [None]:
df.withColumn("Month", month(df['Date'])).show()

In [None]:
df2 = df.withColumn("Month", month(df['Date']))

In [None]:
df2.groupBy("Month").mean()[['avg(Month)', 'avg(Close)']].show()

In [None]:
res = df2.groupBy("Month").mean()[['avg(Month)', 'avg(Close)']]
res = res.withColumnRenamed("avg(Month)", "Month")
res = res.select("Month", format_number('avg(Close)',2).alias("Mean Close")).show()

# Spark DataFrames Review

In [None]:
!curl https://raw.githubusercontent.com/markumreed/colab_pyspark/main/appl_stock.csv >> apple_stock.csv

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("apple_stock").getOrCreate()

In [None]:
df = spark.read.csv("apple_stock.csv", header=True, inferSchema=True)

In [None]:
df.columns

In [None]:
df.printSchema()

In [None]:
df.head(5)

In [None]:
df.describe().show()

In [None]:
df.describe().printSchema()

In [None]:
from pyspark.sql.functions import format_number

In [None]:
res = df.describe()

In [None]:
df.describe().columns

In [None]:
res.select(res["summary"],
             format_number(res['Open'].cast('float'), 2).alias('Open'),
             format_number(res['High'].cast('float'), 2).alias('High'),
             format_number(res['Low'].cast('float'), 2).alias('Low'),
             format_number(res['Close'].cast('float'), 2).alias('Close'),
             res['Volume'] .cast('int').alias('Volume')
             ).show()

In [None]:
# High vs Volume
df2 = df.withColumn("HV Ratio", df['High']/df['Volume'])

In [None]:
df2.show()

In [None]:
df2.select('HV Ratio').show()

In [None]:
df.orderBy(df['High'].desc()).head(1)[0][0]

In [None]:
from pyspark.sql.functions import mean
df.select(mean('Close')).show()

In [None]:
from pyspark.sql.functions import max, min

In [None]:
df.select(max('Volume'), min('Volume')).show()

In [None]:
df.filter("Close < 120").count()

In [None]:
from pyspark.sql.functions import count

In [None]:
res = df.filter('Close < 1210')
res.select(count('Close')).show()

In [None]:
(df.filter('High > 180').count() * 1.0/df.count()) * 100

In [None]:
from pyspark.sql.functions import corr

In [None]:
df.select(corr('High', 'Volume')).show()

In [None]:
from pyspark.sql.functions import year
yeardf = df.withColumn("Year", year(df['Date']))

In [None]:
max_df = yeardf.groupBy('Year').max()

In [None]:
max_df.select('Year', 'max(High)').show()

In [None]:
max_df.show()

In [None]:
from pyspark.sql.functions import month

In [None]:
monthdf = df.withColumn("Month", month("Date"))
monthavgs = monthdf.select("Month", "Close").groupBy("Month").mean()
monthavgs.select("Month", "avg(Close)").orderBy('Month').show()

# Linear Regression with PySpark

- Based on the Official Spark Documentation for PySpark

In [None]:
!curl https://raw.githubusercontent.com/apache/spark/master/data/mllib/sample_linear_regression_data.txt >> sample_linear_regression_data.txt

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName("lr_ex").getOrCreate()

In [None]:
from pyspark.ml.regression import LinearRegression

In [None]:
training = spark.read.format("libsvm").load("sample_linear_regression_data.txt")

In [None]:
training.show()

In [None]:
lr = LinearRegression(featuresCol="features", labelCol="label", predictionCol="prediction")

In [None]:
lrModel = lr.fit(training)

In [None]:
print("Coefficients:", str(lrModel.coefficients))
print("Intercept:", str(lrModel.intercept))

In [None]:
trainSummary = lrModel.summary

In [None]:
print("MAE: ", trainSummary.meanAbsoluteError)
print("MSE: ", trainSummary.meanSquaredError)
print("RMSE: ", trainSummary.rootMeanSquaredError)
print("R2: ", trainSummary.r2)
print("Adj R2: ", trainSummary.r2adj)


## Train Test Split with PySpark
- Pass in the split between training/test as a list.
-  No correct, but generally 70/30 or 60/40 splits are used.
-  Depending on how much data you have and how unbalanced it is.

In [None]:
df = spark.read.format("libsvm").load("sample_linear_regression_data.txt") # FULL DATASET

In [None]:
train_data, test_data = df.randomSplit([0.7, 0.3], seed=42)

In [None]:
test_data.show()

In [None]:
unlabeled_data = test_data.select('features')

In [None]:
corrected_model = lr.fit(train_data)

In [None]:
res = corrected_model.evaluate(test_data)

In [None]:
print("MAE: ", res.meanAbsoluteError)
print("MSE: ", res.meanSquaredError)
print("RMSE: ", res.rootMeanSquaredError)
print("R2: ", res.r2)
print("Adj R2: ", res.r2adj)

In [None]:
predictions = corrected_model.transform(unlabeled_data)

In [None]:
predictions.show()

# Data Transformations with PySpark

## Data Features
### StringIndexer
- Convert string data into numerical (categorical feature)
- Encode as dummy variables/OneHotEncoder
- `StringIndexer`

In [None]:
from pyspark.ml.feature import StringIndexer

df2 = spark.createDataFrame(
    [(0,"a"), (1, "b"), (2, "c"), (3, "a"), (4, "b"), (5, "c")],
    ["user_id", "category"]
)

In [None]:
df2.show()

+-------+--------+
|user_id|category|
+-------+--------+
|      0|       a|
|      1|       b|
|      2|       c|
|      3|       a|
|      4|       b|
|      5|       c|
+-------+--------+



In [None]:
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")

In [None]:
indexed = indexer.fit(df2).transform(df2)

In [None]:
indexed.show()

+-------+--------+-------------+
|user_id|category|categoryIndex|
+-------+--------+-------------+
|      0|       a|          0.0|
|      1|       b|          1.0|
|      2|       c|          2.0|
|      3|       a|          0.0|
|      4|       b|          1.0|
|      5|       c|          2.0|
+-------+--------+-------------+



## VectorIndexer
- **VectorAssembler** is a transformer that combines a given list of columns into a single vector column.
- **VectorAssembler** accepts the following input column types:
  - all numeric types, boolean type, and vector type.  

---

- Assume that we have a DataFrame with the columns id, hour, mobile, userFeatures, and clicked:

id | hour | mobile | userFeatures     | clicked
----|------|--------|------------------|---------
0  | 18   | 1.0    | [0.0, 10.0, 0.5] | 1.0
     
- userFeatures is a vector column that contains three user features.  
- After transformation we should get the following DataFrame:

id | hour | mobile | userFeatures     | clicked | features
----|------|--------|------------------|---------|-----------------------------
0  | 18   | 1.0    | [0.0, 10.0, 0.5] | 1.0     | [18.0, 1.0, 0.0, 10.0, 0.5]

In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [None]:
df3 = spark.createDataFrame(
    [(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0)],
    ["id", "hour", "mobile", "userFeatures", "clicked"]
)
df3.show()

+---+----+------+--------------+-------+
| id|hour|mobile|  userFeatures|clicked|
+---+----+------+--------------+-------+
|  0|  18|   1.0|[0.0,10.0,0.5]|    1.0|
+---+----+------+--------------+-------+



In [None]:
assembler = VectorAssembler(
    inputCols = ["hour", "mobile", "userFeatures"],
    outputCol = "features"
)
output = assembler.transform(df3)

In [None]:
output.select("features", "clicked").show()

+--------------------+-------+
|            features|clicked|
+--------------------+-------+
|[18.0,1.0,0.0,10....|    1.0|
+--------------------+-------+



## PySpark - Linear Regression (Another example)

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName("lin_reg").getOrCreate()

In [None]:
df = spark.read.csv("/content/drive/MyDrive/PySpark/ecommerce.csv", inferSchema=True, header=True)

In [None]:
df.printSchema()

root
 |-- Email: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Avatar: string (nullable = true)
 |-- Avg Session Length: double (nullable = true)
 |-- Time on App: double (nullable = true)
 |-- Time on Website: double (nullable = true)
 |-- Length of Membership: double (nullable = true)
 |-- Yearly Amount Spent: double (nullable = true)



In [None]:
df.show()

+--------------------+--------------------+----------------+------------------+------------------+------------------+--------------------+-------------------+
|               Email|             Address|          Avatar|Avg Session Length|       Time on App|   Time on Website|Length of Membership|Yearly Amount Spent|
+--------------------+--------------------+----------------+------------------+------------------+------------------+--------------------+-------------------+
|mstephenson@ferna...|835 Frank TunnelW...|          Violet| 34.49726772511229| 12.65565114916675| 39.57766801952616|  4.0826206329529615|  587.9510539684005|
|   hduke@hotmail.com|4547 Archer Commo...|       DarkGreen| 31.92627202636016|11.109460728682564|37.268958868297744|    2.66403418213262|  392.2049334443264|
|    pallen@yahoo.com|24645 Valerie Uni...|          Bisque|33.000914755642675|11.330278057777512|37.110597442120856|   4.104543202376424| 487.54750486747207|
|riverarebecca@gma...|1414 David Throug...|   

In [None]:
df.head()

Row(Email='mstephenson@fernandez.com', Address='835 Frank TunnelWrightmouth, MI 82180-9605', Avatar='Violet', Avg Session Length=34.49726772511229, Time on App=12.65565114916675, Time on Website=39.57766801952616, Length of Membership=4.0826206329529615, Yearly Amount Spent=587.9510539684005)

In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [None]:
df.columns

['Email',
 'Address',
 'Avatar',
 'Avg Session Length',
 'Time on App',
 'Time on Website',
 'Length of Membership',
 'Yearly Amount Spent']

In [None]:
assembler = VectorAssembler(inputCols=['Avg Session Length', 'Time on App',
                                       'Time on Website','Length of Membership'],
                            outputCol='features')

In [None]:
output = assembler.transform(df)

In [None]:
output.select("features").show()

+--------------------+
|            features|
+--------------------+
|[34.4972677251122...|
|[31.9262720263601...|
|[33.0009147556426...|
|[34.3055566297555...|
|[33.3306725236463...|
|[33.8710378793419...|
|[32.0215955013870...|
|[32.7391429383803...|
|[33.9877728956856...|
|[31.9365486184489...|
|[33.9925727749537...|
|[33.8793608248049...|
|[29.5324289670579...|
|[33.1903340437226...|
|[32.3879758531538...|
|[30.7377203726281...|
|[32.1253868972878...|
|[32.3388993230671...|
|[32.1878120459321...|
|[32.6178560628234...|
+--------------------+
only showing top 20 rows



In [None]:
final_data = output.select("features", "Yearly Amount Spent")

In [None]:
train_data, test_data = final_data.randomSplit([0.7, 0.3])

In [None]:
train_data.describe().show()

+-------+-------------------+
|summary|Yearly Amount Spent|
+-------+-------------------+
|  count|                364|
|   mean|  502.9869765747741|
| stddev|  78.20081373178272|
|    min| 298.76200786180766|
|    max|  765.5184619388373|
+-------+-------------------+



In [None]:
test_data.describe().show()

+-------+-------------------+
|summary|Yearly Amount Spent|
+-------+-------------------+
|  count|                136|
|   mean|  489.4835268829236|
| stddev|  81.70383291336739|
|    min| 256.67058229005585|
|    max|  669.9871405017029|
+-------+-------------------+



In [None]:
from pyspark.ml.regression import LinearRegression

In [None]:
lr = LinearRegression(labelCol='Yearly Amount Spent')

In [None]:
model = lr.fit(train_data)

In [None]:
import pandas as pd

In [None]:
pd.DataFrame({"Coefficients":model.coefficients}, index=['Avg Session Length', 'Time on App',
                                       'Time on Website','Length of Membership'])

Unnamed: 0,Coefficients
Avg Session Length,25.528189
Time on App,38.739248
Time on Website,0.381506
Length of Membership,61.116148


In [None]:
res = model.evaluate(test_data)

In [None]:
res.residuals.show()

+--------------------+
|           residuals|
+--------------------+
|   9.669149274977144|
| -13.253743124870539|
|  -7.014907673268283|
|  -18.93401211450589|
|   3.666788132691522|
| -13.506343132392146|
| -7.6506770546077405|
|   8.515390472010267|
|  18.287686613130802|
|  3.2878745843414663|
|-0.09345137585660268|
| -5.8919286856935855|
|  -6.045290947261833|
| -17.994114323187887|
| -14.543665761366128|
| -2.7649679910356895|
|   6.936300111304206|
| -10.178724246025183|
|0.017993930797047142|
|   7.475084221903728|
+--------------------+
only showing top 20 rows



In [None]:
unlabeled_data = test_data.select("features")

In [None]:
predictions = model.transform(unlabeled_data)

In [None]:
predictions.show()

+--------------------+------------------+
|            features|        prediction|
+--------------------+------------------+
|[29.5324289670579...|398.97120179765034|
|[30.3931845423455...|333.18261292806415|
|[30.4925366965402...| 289.4861533931828|
|[30.8162006488763...| 285.0203530629749|
|[31.0472221394875...| 388.8306110563299|
|[31.0662181616375...| 462.4396363400665|
|[31.1280900496166...| 564.9033638016624|
|[31.1695067987115...|418.84114033028254|
|[31.3123495994443...| 445.3037314148098|
|[31.3584771924370...|491.88807586513394|
|[31.3895854806643...| 410.1630624358395|
|[31.5171218025062...| 281.8103493360793|
|[31.5257524169682...| 450.0109177571437|
|[31.5702008293202...| 563.9396064645928|
|[31.5741380228732...|  558.952937921953|
|[31.8186165667690...|449.18364136117134|
|[31.8209982016720...|417.73898090190914|
|[31.8279790554652...| 450.1814717929667|
|[31.8293464559211...|385.13434405717794|
|[31.8512531286083...|465.51716244489467|
+--------------------+------------

In [None]:
print("MAE:", res.meanAbsoluteError)
print("MSE:", res.meanSquaredError)
print("RMSE:", res.rootMeanSquaredError)
print("R2", res.r2)
print("Adj R2", res.r2adj)

MAE: 7.7682724106361
MSE: 92.46118689887506
RMSE: 9.615674022078487
R2 0.9860466096979039
Adj R2 0.985620551978756


# Logistic Regression with PySpark

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName("log_reg").getOrCreate()

In [None]:
!curl https://raw.githubusercontent.com/apache/spark/master/data/mllib/sample_libsvm_data.txt >> sample_libsvm_data_2.txt

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100  102k  100  102k    0     0   467k      0 --:--:-- --:--:-- --:--:--  467k


In [None]:
df = spark.read.format("libsvm").load("sample_libsvm_data_2.txt")

In [None]:
df.printSchema()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)



In [None]:
from pyspark.ml.classification import LogisticRegression

In [None]:
lr = LogisticRegression()

model = lr.fit(df)

summary = model.summary

In [None]:
summary.predictions.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(692,[127,128,129...|[20.3777627514862...|[0.99999999858729...|       0.0|
|  1.0|(692,[158,159,160...|[-21.114014198867...|[6.76550380001560...|       1.0|
|  1.0|(692,[124,125,126...|[-23.743613234676...|[4.87842678715831...|       1.0|
|  1.0|(692,[152,153,154...|[-19.192574012719...|[4.62137287298722...|       1.0|
|  1.0|(692,[151,152,153...|[-20.125398874697...|[1.81823629113437...|       1.0|
|  0.0|(692,[129,130,131...|[20.4890549504187...|[0.99999999873608...|       0.0|
|  1.0|(692,[158,159,160...|[-21.082940212813...|[6.97903542824686...|       1.0|
|  1.0|(692,[99,100,101,...|[-19.622713503566...|[3.00582577441380...|       1.0|
|  0.0|(692,[154,155,156...|[21.1594863606570...|[0.99999999935352...|       0.0|
|  0.0|(692,[127

In [None]:
from pyspark.mllib.evaluation import MulticlassMetrics

In [None]:
model.evaluate(df)

<pyspark.ml.classification.BinaryLogisticRegressionSummary at 0x7d884b11d540>

In [None]:
pred_and_labels = model.evaluate(df)

In [None]:
pred_and_labels.predictions.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(692,[127,128,129...|[20.3777627514862...|[0.99999999858729...|       0.0|
|  1.0|(692,[158,159,160...|[-21.114014198867...|[6.76550380001560...|       1.0|
|  1.0|(692,[124,125,126...|[-23.743613234676...|[4.87842678715831...|       1.0|
|  1.0|(692,[152,153,154...|[-19.192574012719...|[4.62137287298722...|       1.0|
|  1.0|(692,[151,152,153...|[-20.125398874697...|[1.81823629113437...|       1.0|
|  0.0|(692,[129,130,131...|[20.4890549504187...|[0.99999999873608...|       0.0|
|  1.0|(692,[158,159,160...|[-21.082940212813...|[6.97903542824686...|       1.0|
|  1.0|(692,[99,100,101,...|[-19.622713503566...|[3.00582577441380...|       1.0|
|  0.0|(692,[154,155,156...|[21.1594863606570...|[0.99999999935352...|       0.0|
|  0.0|(692,[127

In [None]:
pred_and_labels = pred_and_labels.predictions.select("label", "prediction")

In [None]:
pred_and_labels.show()

+-----+----------+
|label|prediction|
+-----+----------+
|  0.0|       0.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  0.0|       0.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  1.0|       1.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  1.0|       1.0|
|  0.0|       0.0|
|  1.0|       1.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  1.0|       1.0|
|  1.0|       1.0|
+-----+----------+
only showing top 20 rows



# Evaluation

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

In [None]:
eval = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="label")

In [None]:
eval_multi = MulticlassClassificationEvaluator(predictionCol="prediction",
                                               labelCol="label",
                                               metricName="accuracy")

In [None]:
acc = eval.evaluate(pred_and_labels)

In [None]:
acc

1.0

# Logistic Regression: Titantic Dataset

In [None]:
!curl https://raw.githubusercontent.com/markumreed/colab_pyspark/main/titanic.csv >> titanic.csv

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100 60302  100 60302    0     0   378k      0 --:--:-- --:--:-- --:--:--  379k


In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName("titanic").getOrCreate()

In [None]:
df = spark.read.csv("titanic.csv", inferSchema=True, header=True)

In [None]:
df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [None]:
df.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

In [None]:
data = df.select([
 'Survived',
 'Pclass',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Fare',
 'Embarked'])

In [None]:
data.head()

Row(Survived=0, Pclass=3, Sex='male', Age=22.0, SibSp=1, Parch=0, Fare=7.25, Embarked='S')

In [None]:
data_final = data.na.drop()

# Categorical Data with PySpark

In [None]:
from pyspark.ml.feature import (VectorAssembler, VectorIndexer,
                                OneHotEncoder, StringIndexer)

In [None]:
gender_indexer = StringIndexer(inputCol="Sex", outputCol="SexIndex")
gender_ecoder = OneHotEncoder(inputCol="SexIndex", outputCol="SexVec")

embark_indexer = StringIndexer(inputCol="Embarked", outputCol="EmbarkIndex")
embark_ecoder = OneHotEncoder(inputCol="EmbarkIndex", outputCol="EmbarkVec")


In [None]:
assembler = VectorAssembler(inputCols=["Pclass", "SexVec", "Age", "SibSp",
                                       "Parch", "Fare", "EmbarkVec"],
                            outputCol="features")

In [None]:
from pyspark.ml.classification import LogisticRegression

# Pipelines

In [None]:
from pyspark.ml import Pipeline

In [None]:
lr = LogisticRegression(featuresCol='features', labelCol="Survived")

In [None]:
pipeline = Pipeline(stages=[
                            gender_indexer,embark_indexer,
                            gender_ecoder,embark_ecoder,
                            assembler, lr
])

In [None]:
train, test = data_final.randomSplit([0.7, 0.3], seed=42)

In [None]:
model_fit = pipeline.fit(train)
res = model_fit.transform(test)

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [None]:
eval = BinaryClassificationEvaluator(rawPredictionCol='prediction',
                                     labelCol='Survived')

In [None]:
res.select('Survived', 'prediction').show()

+--------+----------+
|Survived|prediction|
+--------+----------+
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
|       0|       0.0|
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       1.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       1.0|
+--------+----------+
only showing top 20 rows



In [None]:
auc = eval.evaluate(res)

In [None]:
auc

0.7747561675272518

# Clustering with PySpark


In [None]:
!curl https://raw.githubusercontent.com/apache/spark/master/data/mllib/sample_kmeans_data.txt >> sample_kmeans_data.txt

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100   120  100   120    0     0    863      0 --:--:-- --:--:-- --:--:--   869


# K-means


In [None]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("sample_cluster").getOrCreate()

In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

In [None]:
df = spark.read.format("libsvm").load("sample_kmeans_data.txt")

In [None]:
df.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|           (3,[],[])|
|  1.0|(3,[0,1,2],[0.1,0...|
|  2.0|(3,[0,1,2],[0.2,0...|
|  3.0|(3,[0,1,2],[9.0,9...|
|  4.0|(3,[0,1,2],[9.1,9...|
|  5.0|(3,[0,1,2],[9.2,9...|
+-----+--------------------+



In [None]:
kmeans = KMeans().setK(2).setSeed(42)
model = kmeans.fit(df)

In [None]:
pred = model.transform(df)

In [None]:
eval = ClusteringEvaluator()

In [None]:
silhouette = eval.evaluate(pred)
print(f"Silhouette with squared euclidean distance: {silhouette}")

Silhouette with squared euclidean distance: 0.9997530305375207


In [None]:
centers = model.clusterCenters()
print("Cluster Centers:")
print("=================")
for center in centers:
  print(center)

Cluster Centers:
[0.1 0.1 0.1]
[9.1 9.1 9.1]


In [None]:
!curl https://archive.ics.uci.edu/ml/machine-learning-databases/00236/seeds_dataset.txt >> seeds_dataset.txt

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  9300    0  9300    0     0  28174      0 --:--:-- --:--:-- --:--:-- 28267


# Random Forest Classifier with PySpark

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName("rf").getOrCreate()

In [None]:
df = spark.read.format("libsvm").load("sample_libsvm_data.txt")

In [None]:
df.show()

# Train Test Split

In [None]:
(train, test) = df.randomSplit([0.7, 0.3], seed=42)

In [None]:
test.show()

In [None]:
train.printSchema()

# Train RF Model

In [None]:
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=20,seed=42)

In [None]:
model = rf.fit(train)

In [None]:
pred = model.transform(test)

In [None]:
pred.printSchema()

In [None]:
pred.select("prediction", "label", "features").show(5)

In [None]:
eval = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

In [None]:
acc = eval.evaluate(pred)

In [None]:
print("Test Error = %g" % (1.0 - acc))

In [None]:
model.featureImportances

# Gradient Boosted Trees

In [None]:
from pyspark.ml.classification import GBTClassifier

In [None]:
gbt = GBTClassifier(labelCol="label", featuresCol="features", maxIter=10, seed=42)

In [None]:
model = gbt.fit(train)

In [None]:
pred = model.transform(test)

In [None]:
pred.select("prediction", "label", "features").show(5)

In [None]:
eval = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
acc = eval.evaluate(pred)
print("Test Error = %g" % (1.0 - acc))

## Tree Methods with PySpark
1. Single Decision Tree
1. Random Forest
1. Gradient Boosted Tree Classifier

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("trees").getOrCreate()

In [None]:
df = spark.read.csv("College.csv", inferSchema=True, header=True)

In [None]:
df.printSchema()

In [None]:
df.head(2)

In [None]:
df.columns

# Formatting for Spark

In [None]:
# "label", "features"
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [None]:
df.printSchema()

In [None]:
df.columns

In [None]:
assembler = VectorAssembler(
    inputCols=['Apps',
 'Accept',
 'Enroll',
 'Top10perc',
 'Top25perc',
 'F_Undergrad',
 'P_Undergrad',
 'Outstate',
 'Room_Board',
 'Books',
 'Personal',
 'PhD',
 'Terminal',
 'S_F_Ratio',
 'perc_alumni',
 'Expend',
 'Grad_Rate'
    ],
    outputCol="features"
)

In [None]:
output = assembler.transform(df)

# String Variables (Private)

In [None]:
from pyspark.ml.feature import StringIndexer

In [None]:
indexer = StringIndexer(inputCol="Private", outputCol="PrivateIndexer")
output_fixed = indexer.fit(output).transform(output)

In [None]:
df_final = output_fixed.select("features", "PrivateIndexer")

In [None]:
train, test = df_final.randomSplit([0.7, 0.3], seed=42)

# Tree Classifiers

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier, GBTClassifier
from pyspark.ml import Pipeline

## CREATE Models

In [None]:
dtc = DecisionTreeClassifier(labelCol="PrivateIndexer", featuresCol="features")
rfc = RandomForestClassifier(labelCol="PrivateIndexer", featuresCol="features")
gbt = GBTClassifier(labelCol="PrivateIndexer", featuresCol="features")

In [None]:
dtc_model = dtc.fit(train)
rfc_model = rfc.fit(train)
gbt_model = gbt.fit(train)

# Predictions

In [None]:
dtc_pred = dtc_model.transform(test)
rfc_pred = rfc_model.transform(test)
gbt_pred = gbt_model.transform(test)

# Eval

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="PrivateIndexer", predictionCol="prediction", metricName="accuracy")

In [None]:
dtc_acc = evaluator.evaluate(dtc_pred)
rfc_acc = evaluator.evaluate(rfc_pred)
gbt_acc = evaluator.evaluate(gbt_pred)

In [None]:
print("-"*10)
print(f"DT Acc: {dtc_acc}")
print("-"*10)
print(f"RFC Acc: {rfc_acc}")
print("-"*10)
print(f"GBT Acc: {gbt_acc}")
print("-"*10)