<a href="https://colab.research.google.com/github/skarot/PySparkRepo/blob/main/MLLib.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Spark MLLib

## Install Libraries

In [45]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


## Create Spark session

In [46]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master('local')\
        .appName('colab')\
        .config('spark.ui.port','4050')\
        .getOrCreate()


In [47]:
spark

## Load Data

In [48]:
df = spark.read.format('csv').load('housing.csv',header = True, inferSchema = True)
df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



## EDA

In [49]:
df.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [50]:
df.count()

20640

## Add an id column to existing dataframe

In [51]:
from pyspark.sql.functions import monotonically_increasing_id

df = df.withColumn('id', monotonically_increasing_id())

In [52]:
df.columns

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income',
 'median_house_value',
 'ocean_proximity',
 'id']

## Rearrange columns to make id as the first column

In [53]:
df.columns[-1:]

['id']

In [54]:
df.columns[:-1]

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income',
 'median_house_value',
 'ocean_proximity']

In [55]:
df = df[['id']+df.columns[:-1]]

In [56]:
df.show(3)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
only s

## Basic Aggregations

Get average of total_rooms column

In [57]:
df.select('total_rooms').agg({'total_rooms':'avg'}).show()

+------------------+
|  avg(total_rooms)|
+------------------+
|2635.7630813953488|
+------------------+



Another syntex for same operation

In [58]:
from pyspark.sql.functions import mean

df.select(mean('total_rooms')).show()

+------------------+
|  avg(total_rooms)|
+------------------+
|2635.7630813953488|
+------------------+



Get mean for each column

In [59]:
df.select(*[mean(c) for c in df.columns]).show()

+-------+-------------------+----------------+-----------------------+------------------+-------------------+------------------+-----------------+------------------+-----------------------+--------------------+
|avg(id)|     avg(longitude)|   avg(latitude)|avg(housing_median_age)|  avg(total_rooms)|avg(total_bedrooms)|   avg(population)|  avg(households)|avg(median_income)|avg(median_house_value)|avg(ocean_proximity)|
+-------+-------------------+----------------+-----------------------+------------------+-------------------+------------------+-----------------+------------------+-----------------------+--------------------+
|10319.5|-119.56970445736148|35.6318614341087|     28.639486434108527|2635.7630813953488|  537.8705525375618|1425.4767441860465|499.5396802325581|3.8706710029070246|     206855.81690891474|                null|
+-------+-------------------+----------------+-----------------------+------------------+-------------------+------------------+-----------------+----------

Group By

In [60]:
df.groupBy('ocean_proximity').agg({'population':'avg'}).show()

+---------------+------------------+
|ocean_proximity|   avg(population)|
+---------------+------------------+
|         ISLAND|             668.0|
|     NEAR OCEAN|1354.0086531226486|
|       NEAR BAY|1230.3174672489083|
|      <1H OCEAN|1520.2904991243433|
|         INLAND|1391.0462524805373|
+---------------+------------------+



Group by with aggregation on more than 1 column 

In [61]:
df.groupBy('ocean_proximity').agg({col:'avg' for col in df.columns[3:-1]}).show()

+---------------+------------------+------------------+-------------------+------------------+------------------+-----------------------+-----------------------+
|ocean_proximity|   avg(households)|   avg(population)|avg(total_bedrooms)|avg(median_income)|  avg(total_rooms)|avg(median_house_value)|avg(housing_median_age)|
+---------------+------------------+------------------+-------------------+------------------+------------------+-----------------------+-----------------------+
|         ISLAND|             276.6|             668.0|              420.4|2.7444200000000003|            1574.6|               380440.0|                   42.4|
|     NEAR OCEAN|501.24454477050415|1354.0086531226486|  538.6156773211568| 4.005784800601957| 2583.700902934537|     249433.97742663656|     29.347253574115875|
|       NEAR BAY| 488.6161572052402|1230.3174672489083|  514.1828193832599| 4.172884759825336| 2493.589519650655|     259212.31179039303|      37.73013100436681|
|      <1H OCEAN| 517.744964

## User Defined Functions - UDF 

In [62]:
from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf
def squarred(value):
  return value*value
  
squarred_udf = udf(squarred,FloatType())
df.withColumn('total_rooms_squarred',squarred_udf('total_rooms')).show(5)


+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+--------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|total_rooms_squarred|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+--------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|            774400.0|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|           5.03958E7|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|           2152089.0|
|  3|  -122.25|   37.8

## MlLib

### Split data

In [63]:
train, test = df.randomSplit([0.7,0.3])

### Get Continuous columns/Numerical columns

In [64]:
non_categorical_cols = df.columns
non_categorical_cols.remove('ocean_proximity')
non_categorical_cols.remove('median_house_value')
non_categorical_cols.remove('id')

In [65]:
non_categorical_cols

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income']

### Deal with missing values

In [66]:
from pyspark.ml.feature import Imputer
imputer = Imputer(inputCols = non_categorical_cols , outputCols = non_categorical_cols)

imputer = imputer.fit(train)

train = imputer.transform(train)
test = imputer.transform(test)

train.show(3)


+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  3|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
only s

### Make vectors for each row

In [67]:
from pyspark.ml.feature import VectorAssembler

numerical_vector_assembler = VectorAssembler(inputCols = non_categorical_cols ,outputCol = 'numerical_feature_vector')

train = numerical_vector_assembler.transform(train)
test = numerical_vector_assembler.transform(test)

In [69]:
train.select('numerical_feature_vector').take(2)

[Row(numerical_feature_vector=DenseVector([-122.23, 37.88, 41.0, 880.0, 129.0, 322.0, 126.0, 8.3252])),
 Row(numerical_feature_vector=DenseVector([-122.24, 37.85, 52.0, 1467.0, 190.0, 496.0, 177.0, 7.2574]))]

### Preprocessing - Standardization

In [70]:
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(inputCol = 'numerical_feature_vector', outputCol = 'scaled_numerical_feature_vector',withMean = True, withStd = True )

scaler = scaler.fit(train)
train = scaler.transform(train)
test = scaler.transform(test)

In [71]:
train.select('numerical_feature_vector','scaled_numerical_feature_vector').take(2)

[Row(numerical_feature_vector=DenseVector([-122.23, 37.88, 41.0, 880.0, 129.0, 322.0, 126.0, 8.3252]), scaled_numerical_feature_vector=DenseVector([-1.3355, 1.056, 0.9918, -0.7907, -0.9637, -0.9527, -0.9623, 2.3496])),
 Row(numerical_feature_vector=DenseVector([-122.24, 37.85, 52.0, 1467.0, 190.0, 496.0, 177.0, 7.2574]), scaled_numerical_feature_vector=DenseVector([-1.3405, 1.042, 1.8663, -0.5281, -0.8208, -0.8031, -0.8314, 1.7869]))]

### Categorical Columns

In [74]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol = 'ocean_proximity', outputCol = 'ocean_proximity_index')

indexer = indexer.fit(train)

train = indexer.transform(train)
test = indexer.transform(test)

In [76]:
set(train.select('ocean_proximity_index').collect())

{Row(ocean_proximity_index=0.0),
 Row(ocean_proximity_index=1.0),
 Row(ocean_proximity_index=2.0),
 Row(ocean_proximity_index=3.0),
 Row(ocean_proximity_index=4.0)}

### One hot encoding to convert float to int values

In [79]:
from pyspark.ml.feature import OneHotEncoder
one_hot_encoder= OneHotEncoder(inputCol = 'ocean_proximity_index', outputCol = 'ocean_proximity_index_one_hot')

one_hot_encoder = one_hot_encoder.fit(train)
train = one_hot_encoder.transform(train)
test = one_hot_encoder.transform(test)

In [80]:
train.select('ocean_proximity_index_one_hot').take(2)

[Row(ocean_proximity_index_one_hot=SparseVector(4, {3: 1.0})),
 Row(ocean_proximity_index_one_hot=SparseVector(4, {3: 1.0}))]

### Combine scaled numberical columns with the categorical columns we modified as numerical 

In [81]:
assembler = VectorAssembler(inputCols=['scaled_numerical_feature_vector','ocean_proximity_index_one_hot'],outputCol='final_feature_vector')

train = assembler.transform(train)
test = assembler.transform(test)

In [83]:
train.select('final_feature_vector').take(2)

[Row(final_feature_vector=DenseVector([-1.3355, 1.056, 0.9918, -0.7907, -0.9637, -0.9527, -0.9623, 2.3496, 0.0, 0.0, 0.0, 1.0])),
 Row(final_feature_vector=DenseVector([-1.3405, 1.042, 1.8663, -0.5281, -0.8208, -0.8031, -0.8314, 1.7869, 0.0, 0.0, 0.0, 1.0]))]

## Create the model and train

In [84]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol='final_feature_vector', labelCol='median_house_value')
lr

LinearRegression_ae58f90a1038

In [85]:
lr = lr.fit(train)
lr

LinearRegressionModel: uid=LinearRegression_ae58f90a1038, numFeatures=12

In [91]:
predict_train_df = lr.transform(train).withColumnRenamed('prediction','predicted_median_house_values')

In [92]:
predict_train_df.show(5)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+---------------------+-----------------------------+--------------------+-----------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|ocean_proximity_index|ocean_proximity_index_one_hot|final_feature_vector|predicted_median_house_values|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+---------------------+-----------------------------+--------------------+-----------------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|  

In [93]:
predict_test_df = lr.transform(test).withColumnRenamed('prediction','predicted_median_house_values')

## Evaluation

In [94]:
predict_and_actuals = predict_test_df[['predicted_median_house_values','median_house_value']]
predict_and_actuals_rdd = predict_and_actuals.rdd
predict_and_actuals_rdd.take(2)

[Row(predicted_median_house_values=422898.38361259905, median_house_value=358500.0),
 Row(predicted_median_house_values=260703.10126451481, median_house_value=269700.0)]

### Convert rdd to tuple

In [96]:
predict_and_actuals_rdd = predict_and_actuals_rdd.map(tuple)
predict_and_actuals_rdd.take(2)

[(422898.38361259905, 358500.0), (260703.10126451481, 269700.0)]

In [98]:
from pyspark.mllib.evaluation import RegressionMetrics

metrics = RegressionMetrics(predict_and_actuals_rdd)

s = '''
Mean Absolute Error - {0},
Mean Squared Error - {1},
Root Mean Squared Error - {2}
R Squared Error - {3}

'''.format(metrics.meanAbsoluteError,
           metrics.meanSquaredError,
           metrics.rootMeanSquaredError,
           metrics.r2)

print(s)




Mean Absolute Error - 50305.16022442479,
Mean Squared Error - 4772251794.711922,
Root Mean Squared Error - 69081.48662783628
R Squared Error - 0.6445463527677707


