# Introduction
This notebook contains a demo for using Apache Spark. The demo uses an example adapted from the [Tensorflow tutorial](https://www.tensorflow.org/tutorials/keras/regression). The demo showcases the following the modules from PySpark:

- PySpark DataFrame and SQL
- PySpark MLib

# Environment Setup


In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m5.9 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824025 sha256=3a3463e5740570a4437e13319119bef9e47b64587eb6f61f6ef7781dc7521e28
  Stored in directory: /root/.cache/pip/wheels/b1/59/a0/a1a0624b5e865fd389919c1a10f53aec9b12195d6747710baf
Successfully built pyspark
Installing collected packages: py4j, pyspa

# Dependencies

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.functions import vector_to_array
from pyspark.ml.evaluation import RegressionEvaluator

import pandas as pd

# Linear Regression: Predict fuel efficiency


## Create a Spark Session

In [None]:
# Create a Spark Session
spark = SparkSession.builder.appName("spark demo").getOrCreate()
# Check Spark Session Information
spark

## Load the data

In [None]:
# Download the data
url = 'http://archive.ics.uci.edu/ml/machine-learning-databases/auto-mpg/auto-mpg.data'
column_names = ['MPG', 'Cylinders', 'Displacement', 'Horsepower', 'Weight',
                'Acceleration', 'Model Year', 'Origin']

raw_dataset = pd.read_csv(url, names=column_names,
                          na_values='?', comment='\t',
                          sep=' ', skipinitialspace=True)

In [None]:
# Convert to Spark DataFrame
columns = [F.col(name) for name in column_names]
df = spark.createDataFrame(raw_dataset) 
df.printSchema()
df.show()

root
 |-- MPG: double (nullable = true)
 |-- Cylinders: long (nullable = true)
 |-- Displacement: double (nullable = true)
 |-- Horsepower: double (nullable = true)
 |-- Weight: double (nullable = true)
 |-- Acceleration: double (nullable = true)
 |-- Model Year: long (nullable = true)
 |-- Origin: long (nullable = true)

+----+---------+------------+----------+------+------------+----------+------+
| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model Year|Origin|
+----+---------+------------+----------+------+------------+----------+------+
|18.0|        8|       307.0|     130.0|3504.0|        12.0|        70|     1|
|15.0|        8|       350.0|     165.0|3693.0|        11.5|        70|     1|
|18.0|        8|       318.0|     150.0|3436.0|        11.0|        70|     1|
|16.0|        8|       304.0|     150.0|3433.0|        12.0|        70|     1|
|17.0|        8|       302.0|     140.0|3449.0|        10.5|        70|     1|
|15.0|        8|       429.0|     198.0|4341

## Data Preparation

First we see that the dataset contain 6 N/A values in Horsepower. We can use he dropna() function of the DataFrame to delete rows that contain N/A value. 

In [None]:
# Uses the Spark SQL module to find out N\A values
df.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in df.columns]
   ).show()

print("# of rows before cleaning: ", df.count())

df = df.dropna()

print("# of rows after cleaning: ", df.count())

+---+---------+------------+----------+------+------------+----------+------+
|MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model Year|Origin|
+---+---------+------------+----------+------+------------+----------+------+
|  0|        0|           0|         6|     0|           0|         0|     0|
+---+---------+------------+----------+------+------------+----------+------+

# of rows before cleaning:  398
# of rows after cleaning:  392


After dropping N/A values, we perform one-hot encoding on the Origin column. The current Origin column contains a number ranging from 1-3 with 1 meaning USA, 2 meaning Europe, and 3 meaning Japan. We want to flatten the column by going from 

Origin | 
-------|
 2     |

to

 USA | Europe | Japen 
-----|--------|------
  0  |   1    |  0   

with 1 meaning the car is made in the corresponding region.

In [None]:
# Define the transformation function for Origin
map = {1: "USA", 2: "Europe", 3: "Japan"}
def convertToCategory(code):
  return map[code]

# Convert function to UDF 
convertUDF = F.udf(lambda z: convertToCategory(z), StringType())

In [None]:
# Applies the transformation
df = df.select(*columns[:-1], convertUDF(F.col("Origin")).alias("Origin"))
df.show()

+----+---------+------------+----------+------+------------+----------+------+
| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model Year|Origin|
+----+---------+------------+----------+------+------------+----------+------+
|18.0|        8|       307.0|     130.0|3504.0|        12.0|        70|   USA|
|15.0|        8|       350.0|     165.0|3693.0|        11.5|        70|   USA|
|18.0|        8|       318.0|     150.0|3436.0|        11.0|        70|   USA|
|16.0|        8|       304.0|     150.0|3433.0|        12.0|        70|   USA|
|17.0|        8|       302.0|     140.0|3449.0|        10.5|        70|   USA|
|15.0|        8|       429.0|     198.0|4341.0|        10.0|        70|   USA|
|14.0|        8|       454.0|     220.0|4354.0|         9.0|        70|   USA|
|14.0|        8|       440.0|     215.0|4312.0|         8.5|        70|   USA|
|14.0|        8|       455.0|     225.0|4425.0|        10.0|        70|   USA|
|15.0|        8|       390.0|     190.0|3850.0|     

In [None]:
# Index the Origin column
origin_indexer = StringIndexer(inputCol="Origin", outputCol="Origin_Index")
origin_indexer_fitted = origin_indexer.fit(df)
df = origin_indexer_fitted.transform(df)
df = df.drop("Origin") # Clean up the old column

# Applies one-hot encoding. Note the encoding in vector format
encoder = OneHotEncoder(inputCols=["Origin_Index"], outputCols=['Origin_OneHot'], dropLast=False)
df = encoder.fit(df).transform(df)
df = df.drop("Origin_Index") # Clean up the old column
df.show()

+----+---------+------------+----------+------+------------+----------+-------------+
| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model Year|Origin_OneHot|
+----+---------+------------+----------+------+------------+----------+-------------+
|18.0|        8|       307.0|     130.0|3504.0|        12.0|        70|(3,[0],[1.0])|
|15.0|        8|       350.0|     165.0|3693.0|        11.5|        70|(3,[0],[1.0])|
|18.0|        8|       318.0|     150.0|3436.0|        11.0|        70|(3,[0],[1.0])|
|16.0|        8|       304.0|     150.0|3433.0|        12.0|        70|(3,[0],[1.0])|
|17.0|        8|       302.0|     140.0|3449.0|        10.5|        70|(3,[0],[1.0])|
|15.0|        8|       429.0|     198.0|4341.0|        10.0|        70|(3,[0],[1.0])|
|14.0|        8|       454.0|     220.0|4354.0|         9.0|        70|(3,[0],[1.0])|
|14.0|        8|       440.0|     215.0|4312.0|         8.5|        70|(3,[0],[1.0])|
|14.0|        8|       455.0|     225.0|4425.0|       

After one-hot encoding we want to explode the one-hot column into the format described that was presented previously:

 USA | Europe | Japen 
-----|--------|------
  0  |   1    |  0   

In [None]:
# Converts the vector notation to array
df = df.select('*', vector_to_array("Origin_OneHot").alias("Origin_OneHot_Vector"))
df = df.drop("Origin_OneHot") # Clean up the old column

df.show()

+----+---------+------------+----------+------+------------+----------+--------------------+
| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model Year|Origin_OneHot_Vector|
+----+---------+------------+----------+------+------------+----------+--------------------+
|18.0|        8|       307.0|     130.0|3504.0|        12.0|        70|     [1.0, 0.0, 0.0]|
|15.0|        8|       350.0|     165.0|3693.0|        11.5|        70|     [1.0, 0.0, 0.0]|
|18.0|        8|       318.0|     150.0|3436.0|        11.0|        70|     [1.0, 0.0, 0.0]|
|16.0|        8|       304.0|     150.0|3433.0|        12.0|        70|     [1.0, 0.0, 0.0]|
|17.0|        8|       302.0|     140.0|3449.0|        10.5|        70|     [1.0, 0.0, 0.0]|
|15.0|        8|       429.0|     198.0|4341.0|        10.0|        70|     [1.0, 0.0, 0.0]|
|14.0|        8|       454.0|     220.0|4354.0|         9.0|        70|     [1.0, 0.0, 0.0]|
|14.0|        8|       440.0|     215.0|4312.0|         8.5|        70

In [None]:
# Explodes the array into individual column
num_class = len(df.first()["Origin_OneHot_Vector"])
cols_expanded = [(F.col("Origin_OneHot_Vector")[i].alias(f"{origin_indexer_fitted.labels[i]}")) for i in range(num_class)]

df = df.select("*", *cols_expanded)
df = df.drop("Origin_OneHot_Vector")  # Clean up the old column
df.show()

+----+---------+------------+----------+------+------------+----------+---+-----+------+
| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model Year|USA|Japan|Europe|
+----+---------+------------+----------+------+------------+----------+---+-----+------+
|18.0|        8|       307.0|     130.0|3504.0|        12.0|        70|1.0|  0.0|   0.0|
|15.0|        8|       350.0|     165.0|3693.0|        11.5|        70|1.0|  0.0|   0.0|
|18.0|        8|       318.0|     150.0|3436.0|        11.0|        70|1.0|  0.0|   0.0|
|16.0|        8|       304.0|     150.0|3433.0|        12.0|        70|1.0|  0.0|   0.0|
|17.0|        8|       302.0|     140.0|3449.0|        10.5|        70|1.0|  0.0|   0.0|
|15.0|        8|       429.0|     198.0|4341.0|        10.0|        70|1.0|  0.0|   0.0|
|14.0|        8|       454.0|     220.0|4354.0|         9.0|        70|1.0|  0.0|   0.0|
|14.0|        8|       440.0|     215.0|4312.0|         8.5|        70|1.0|  0.0|   0.0|
|14.0|        8|     

The last step in data preparation is condense all the individual feature columns into a single **Features** column. 

In [None]:
features_col = ["Cylinders", "Displacement", "Horsepower", "Weight",
                "Acceleration", "Model Year", "USA", "Japan", "Europe"]

vector_assembler = VectorAssembler(inputCols=features_col, outputCol="Features")
df_ml = vector_assembler.transform(df)
df_ml = df_ml.select(["Features", "MPG"])
df_ml.show(3)

+--------------------+----+
|            Features| MPG|
+--------------------+----+
|[8.0,307.0,130.0,...|18.0|
|[8.0,350.0,165.0,...|15.0|
|[8.0,318.0,150.0,...|18.0|
+--------------------+----+
only showing top 3 rows



## Train the Model
At this point, the dataset is cleaned and ready to be used in training.

In [None]:
# Split the dataset into training and test set
splits = df_ml.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [None]:
# Train the model
lr = LinearRegression(featuresCol="Features", labelCol="MPG", maxIter=10, regParam=0.3, elasticNetParam=0.8, solver="normal")
lr_model = lr.fit(train_df)

print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [-0.27598848582287533,0.0,-0.025860170807309374,-0.004052156755356032,0.0,0.7099642180636758,-1.0738335574042681,0.8442398631833453,0.0]
Intercept: -13.599886568971387


After the model is trained we can use it to make predictions

The training set predictions:

In [None]:
train_predictions = lr_model.transform(train_df)
train_predictions.show()


+--------------------+----+------------------+
|            Features| MPG|        prediction|
+--------------------+----+------------------+
|[4.0,68.0,49.0,18...|29.0| 28.29102237457758|
|[4.0,71.0,65.0,18...|32.0|29.557080582323685|
|[4.0,76.0,52.0,16...|31.0| 30.65101611607028|
|[4.0,79.0,67.0,19...|31.0|29.043414370598477|
|[4.0,79.0,67.0,19...|26.0|28.146496469595505|
|[4.0,83.0,61.0,20...|32.0| 28.98381108740847|
|[4.0,85.0,52.0,20...|29.0|28.588738624042605|
|[4.0,88.0,76.0,20...|30.0|25.370542289092377|
|[4.0,90.0,70.0,19...|29.0|28.884236250876505|
|[4.0,90.0,71.0,22...|25.0| 27.69945924803737|
|[4.0,90.0,75.0,21...|24.0|27.352052373610405|
|[4.0,90.0,75.0,21...|28.0|26.209332151365082|
|[4.0,91.0,53.0,17...|33.0| 31.45346949510835|
|[4.0,91.0,70.0,19...|26.0|24.897606999621132|
|[4.0,97.0,46.0,18...|26.0| 26.36837924897987|
|[4.0,97.0,46.0,19...|26.0|28.032273876304956|
|[4.0,97.0,54.0,22...|23.0| 25.88357263815457|
|[4.0,97.0,60.0,18...|27.0| 26.72035323249657|
|[4.0,97.0,75

The test set predictions:

In [None]:
test_predictions = lr_model.transform(test_df)
test_predictions.show()

+--------------------+----+------------------+
|            Features| MPG|        prediction|
+--------------------+----+------------------+
|[3.0,70.0,90.0,21...|18.0|  27.3095794343576|
|[3.0,70.0,97.0,23...|19.0| 25.58384972903943|
|[4.0,71.0,65.0,17...|31.0|27.682473803720093|
|[4.0,72.0,69.0,16...|35.0| 28.22737820134782|
|[4.0,79.0,67.0,20...|31.0|27.996566669647336|
|[4.0,79.0,70.0,20...|30.0| 25.48923390313803|
|[4.0,90.0,70.0,19...|29.0|29.594200468940187|
|[4.0,91.0,53.0,17...|33.0| 30.74350527704467|
|[4.0,96.0,69.0,21...|26.0|25.759060265143077|
|[4.0,97.0,88.0,21...|27.0|26.472598834214224|
|[4.0,97.0,88.0,21...|27.0| 24.93110569542619|
|[4.0,98.0,80.0,21...|28.0|  24.5020687477423|
|[4.0,101.0,83.0,2...|27.0| 28.18419670827582|
|[4.0,110.0,87.0,2...|25.0| 21.91645704164719|
|[4.0,113.0,95.0,2...|25.0|25.062937355813816|
|[4.0,114.0,91.0,2...|20.0| 24.30760312059102|
|[4.0,115.0,95.0,2...|23.0|25.170249316889254|
|[4.0,119.0,97.0,2...|24.0|26.566540195006034|
|[4.0,120.0,8

Now we can evaulate our model using the RegressionEvaluator

In [None]:
evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="MPG", metricName="r2")

print("Train R2:", evaluator.evaluate(train_predictions))
print("Test R2:", evaluator.evaluate(test_predictions))

Train R2: 0.8075420726689316
Test R2: 0.818453171334369
