<a href="https://colab.research.google.com/github/ywchanna2001/Big-Data-Processing/blob/main/Spark_ML_Lib.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [51]:
!pip install pyspark



In [52]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

spark

In [53]:
# Creating a data frame

df = spark.read.format("csv").load("housing.csv", header=True, inferSchema=True)

df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



In [54]:
df.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [55]:
# Add an index column

from pyspark.sql.functions import monotonically_increasing_id

df = df.withColumn("id", monotonically_increasing_id())

df = df[['id'] + df.columns[:-1]]

df.show(5)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  3|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  4| 

In [56]:
df.count()

20640

In [57]:
df.select('total_rooms').agg({'total_rooms': 'avg'}).show()

+------------------+
|  avg(total_rooms)|
+------------------+
|2635.7630813953488|
+------------------+



In [58]:
# Calculating average for each column

from pyspark.sql.functions import mean

df.select(*[mean(c) for c in df.columns]).show()

+-------+-------------------+----------------+-----------------------+------------------+-------------------+------------------+-----------------+------------------+-----------------------+--------------------+
|avg(id)|     avg(longitude)|   avg(latitude)|avg(housing_median_age)|  avg(total_rooms)|avg(total_bedrooms)|   avg(population)|  avg(households)|avg(median_income)|avg(median_house_value)|avg(ocean_proximity)|
+-------+-------------------+----------------+-----------------------+------------------+-------------------+------------------+-----------------+------------------+-----------------------+--------------------+
|10319.5|-119.56970445736148|35.6318614341087|     28.639486434108527|2635.7630813953488|  537.8705525375618|1425.4767441860465|499.5396802325581|3.8706710029070246|     206855.81690891474|                NULL|
+-------+-------------------+----------------+-----------------------+------------------+-------------------+------------------+-----------------+----------

In [59]:
df.groupby('ocean_proximity').agg({col: 'avg' for col in df.columns[3:-1]}).show()

+---------------+------------------+------------------+-------------------+------------------+------------------+-----------------------+-----------------------+
|ocean_proximity|   avg(households)|   avg(population)|avg(total_bedrooms)|avg(median_income)|  avg(total_rooms)|avg(median_house_value)|avg(housing_median_age)|
+---------------+------------------+------------------+-------------------+------------------+------------------+-----------------------+-----------------------+
|         ISLAND|             276.6|             668.0|              420.4|2.7444200000000003|            1574.6|               380440.0|                   42.4|
|     NEAR OCEAN|501.24454477050415|1354.0086531226486|  538.6156773211568| 4.005784800601957| 2583.700902934537|     249433.97742663656|     29.347253574115875|
|       NEAR BAY| 488.6161572052402|1230.3174672489083|  514.1828193832599| 4.172884759825336| 2493.589519650655|     259212.31179039303|      37.73013100436681|
|      <1H OCEAN| 517.744964

In [60]:
from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf

def squared(value):
  return value * value

squared_udf = udf(squared, FloatType())

df.withColumn('total_rooms_squared', squared_udf('total_rooms')).show(5)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|total_rooms_squared|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|           774400.0|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|          5.03958E7|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|          2152089.0|
|  3|  -122.25|   37.85|    

In [61]:
 train, test = df.randomSplit([0.7, 0.3])

 train, test

(DataFrame[id: bigint, longitude: double, latitude: double, housing_median_age: double, total_rooms: double, total_bedrooms: double, population: double, households: double, median_income: double, median_house_value: double, ocean_proximity: string],
 DataFrame[id: bigint, longitude: double, latitude: double, housing_median_age: double, total_rooms: double, total_bedrooms: double, population: double, households: double, median_income: double, median_house_value: double, ocean_proximity: string])

In [62]:
numerical_features_list = train.columns
numerical_features_list.remove('median_house_value')
numerical_features_list.remove('ocean_proximity')
numerical_features_list.remove('id')


numerical_features_list

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income']

In [63]:
from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols=numerical_features_list, outputCols=numerical_features_list)

model = imputer.fit(train)

train = model.transform(train)
test = model.transform(test)

train.show(5)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  3|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  4| 

In [65]:
from pyspark.ml.feature import VectorAssembler

numerical_vector_assembler = VectorAssembler(inputCols=numerical_features_list, outputCol='numerical_feature_vector')

train = numerical_vector_assembler.transform(train)
test = numerical_vector_assembler.transform(test)

train.show(5)

train = numerical_vector_assembler.transform(train)
test = numerical_vector_assembler.transform(test)

train.show(5)

IllegalArgumentException: Output column numerical_feature_vector already exists.

In [66]:
train.select('numerical_feature_vector').take(5)

[Row(numerical_feature_vector=DenseVector([-122.23, 37.88, 41.0, 880.0, 129.0, 322.0, 126.0, 8.3252])),
 Row(numerical_feature_vector=DenseVector([-122.22, 37.86, 21.0, 7099.0, 1106.0, 2401.0, 1138.0, 8.3014])),
 Row(numerical_feature_vector=DenseVector([-122.24, 37.85, 52.0, 1467.0, 190.0, 496.0, 177.0, 7.2574])),
 Row(numerical_feature_vector=DenseVector([-122.25, 37.85, 52.0, 1274.0, 235.0, 558.0, 219.0, 5.6431])),
 Row(numerical_feature_vector=DenseVector([-122.25, 37.85, 52.0, 1627.0, 280.0, 565.0, 259.0, 3.8462]))]

In [67]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol='numerical_feature_vector',
                        outputCol='scaled_numerical_feature_vector',
                        withStd=True, withMean=True)

scaler = scaler.fit(train)

train = scaler.transform(train)
test = scaler.transform(test)

train.show(5)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|    [-122.23,37.88,41...|           [-1.3268860115768...|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|    [-122.22,37.86,21...|           [-1.3218976373770...|
|  2|

In [68]:
train.select('scaled_numerical_feature_vector').take(5)

[Row(scaled_numerical_feature_vector=DenseVector([-1.3269, 1.055, 0.9796, -0.8035, -0.9737, -0.963, -0.9781, 2.3254])),
 Row(scaled_numerical_feature_vector=DenseVector([-1.3219, 1.0456, -0.6081, 2.0317, 1.3521, 0.8467, 1.6683, 2.3129])),
 Row(scaled_numerical_feature_vector=DenseVector([-1.3319, 1.0409, 1.8528, -0.5359, -0.8285, -0.8115, -0.8447, 1.7663])),
 Row(scaled_numerical_feature_vector=DenseVector([-1.3369, 1.0409, 1.8528, -0.6239, -0.7213, -0.7576, -0.7349, 0.9211])),
 Row(scaled_numerical_feature_vector=DenseVector([-1.3369, 1.0409, 1.8528, -0.4629, -0.6142, -0.7515, -0.6303, -0.0198]))]

In [69]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol='ocean_proximity',
                        outputCol='ocean_category_index',
                        handleInvalid="keep")

indexer = indexer.fit(train)
train = indexer.transform(train)
test = indexer.transform(test)

train.show(5)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|ocean_category_index|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|    [-122.23,37.88,41...|           [-1.3268860115768...|                 3.0|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          3585

In [70]:
set(train.select('ocean_category_index').collect())

{Row(ocean_category_index=0.0),
 Row(ocean_category_index=1.0),
 Row(ocean_category_index=2.0),
 Row(ocean_category_index=3.0),
 Row(ocean_category_index=4.0)}

In [71]:
from pyspark.ml.feature import OneHotEncoder

one_hot_encoder = OneHotEncoder(inputCol='ocean_category_index',
                                outputCol='ocean_category_one_hot')

one_hot_encoder = one_hot_encoder.fit(train)

train = one_hot_encoder.transform(train)
test = one_hot_encoder.transform(test)

train.show(5)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|ocean_category_index|ocean_category_one_hot|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|    [-122.23,37.88,41...|           [-1.3268860115768...|                 3.0|         (5,[3],[1.0])|
|  1|  -122.22|   37.86|    

In [72]:
 assembler = VectorAssembler(inputCols=['scaled_numerical_feature_vector',
                                        'ocean_category_one_hot'],
                             outputCol='final_feature_vector')

train = assembler.transform(train)
test = assembler.transform(test)

train.show(5)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------+--------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|ocean_category_index|ocean_category_one_hot|final_feature_vector|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------+--------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|    [-122.23,37.88,41...|           [-1.3268860115768...|          

In [73]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol='final_feature_vector',
                       labelCol='median_house_value')

lr

LinearRegression_c63645d40ace

In [74]:
lr = lr.fit(train)
lr

LinearRegressionModel: uid=LinearRegression_c63645d40ace, numFeatures=13

In [75]:
pred_train_df = lr.transform(train).withColumnRenamed('prediction',
                                                      'predicted_median_house_value')

pred_train_df.show(5)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------+--------------------+----------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|ocean_category_index|ocean_category_one_hot|final_feature_vector|predicted_median_house_value|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------+--------------------+----------------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          45260

In [76]:
pred_test_df = lr.transform(test).withColumnRenamed('prediction',
                                                      'predicted_median_house_value')

pred_test_df.show(5)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------+--------------------+----------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|ocean_category_index|ocean_category_one_hot|final_feature_vector|predicted_median_house_value|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------+--------------------+----------------------------+
|  7|  -122.25|   37.84|              52.0|     3104.0|         687.0|    1157.0|     647.0|         3.12|          24140

In [77]:
pred_test_pd_df = pred_test_df.toPandas()

pred_test_pd_df.head(5)

Unnamed: 0,id,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity,numerical_feature_vector,scaled_numerical_feature_vector,ocean_category_index,ocean_category_one_hot,final_feature_vector,predicted_median_house_value
0,7,-122.25,37.84,52.0,3104.0,687.0,1157.0,647.0,3.12,241400.0,NEAR BAY,"[-122.25, 37.84, 52.0, 3104.0, 687.0, 1157.0, ...","[-1.3368627599763308, 1.036262043393782, 1.852...",3.0,"(0.0, 0.0, 0.0, 1.0, 0.0)","[-1.3368627599763308, 1.036262043393782, 1.852...",254235.874612
1,11,-122.26,37.85,52.0,3503.0,752.0,1504.0,734.0,3.2705,241800.0,NEAR BAY,"[-122.26, 37.85, 52.0, 3503.0, 752.0, 1504.0, ...","[-1.341851134176084, 1.0409405699339969, 1.852...",3.0,"(0.0, 0.0, 0.0, 1.0, 0.0)","[-1.341851134176084, 1.0409405699339969, 1.852...",256146.866065
2,14,-122.26,37.85,52.0,2643.0,626.0,1212.0,620.0,1.9167,159200.0,NEAR BAY,"[-122.26, 37.85, 52.0, 2643.0, 626.0, 1212.0, ...","[-1.341851134176084, 1.0409405699339969, 1.852...",3.0,"(0.0, 0.0, 0.0, 1.0, 0.0)","[-1.341851134176084, 1.0409405699339969, 1.852...",201572.073279
3,16,-122.27,37.85,52.0,1966.0,347.0,793.0,331.0,2.775,152500.0,NEAR BAY,"[-122.27, 37.85, 52.0, 1966.0, 347.0, 793.0, 3...","[-1.34683950837583, 1.0409405699339969, 1.8527...",3.0,"(0.0, 0.0, 0.0, 1.0, 0.0)","[-1.34683950837583, 1.0409405699339969, 1.8527...",213258.953882
4,17,-122.27,37.85,52.0,1228.0,293.0,648.0,303.0,2.1202,155500.0,NEAR BAY,"[-122.27, 37.85, 52.0, 1228.0, 293.0, 648.0, 3...","[-1.34683950837583, 1.0409405699339969, 1.8527...",3.0,"(0.0, 0.0, 0.0, 1.0, 0.0)","[-1.34683950837583, 1.0409405699339969, 1.8527...",190688.218317


In [79]:
predictions_and_actuals = pred_test_df[['predicted_median_house_value', 'median_house_value']]

predictions_and_actuals_rdd = predictions_and_actuals.rdd

predictions_and_actuals_rdd.take(5)

[Row(predicted_median_house_value=254235.87461183761, median_house_value=241400.0),
 Row(predicted_median_house_value=256146.86606467972, median_house_value=241800.0),
 Row(predicted_median_house_value=201572.07327886118, median_house_value=159200.0),
 Row(predicted_median_house_value=213258.95388209628, median_house_value=152500.0),
 Row(predicted_median_house_value=190688.21831668215, median_house_value=155500.0)]

In [82]:
from pyspark.mllib.evaluation import RegressionMetrics

metrics = RegressionMetrics(predictions_and_actuals_rdd)

s = '''
Mean Squared Error:      {0}
Root Mean Squared Error: {1}
Mean Absolute Error:     {2}
R Squared:                {3}
'''.format(metrics.meanSquaredError,
           metrics.rootMeanSquaredError,
           metrics.meanAbsoluteError,
           metrics.r2)

print(s)




Mean Squared Error:      4571226671.342767
Root Mean Squared Error: 67610.84729052555
Mean Absolute Error:     49552.30054890808
R Squared:                0.6554004874829733

