<a href="https://colab.research.google.com/github/kalyani234/machine-learning-projects/blob/main/Spark_Machine_learning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [36]:
!pip install pyspark



In [37]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config("spark.ui.port", '4050')\
        .getOrCreate()

spark

In [38]:
# load csv
df = spark.read.format("csv").load("/content/housing.csv", header=True, inferSchema=True)

In [39]:
# Print of the schema

df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



In [40]:
# print the table
df.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [41]:
# add new column id
from pyspark.sql.functions import monotonically_increasing_id


df = df.withColumn('id', monotonically_increasing_id())
df = df[['id']+ df.columns[:-1]]

In [42]:
df.show(3)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
only s

In [43]:
# count the number of rows
df.count()

20640

In [44]:
# Average total
df.select('total_rooms').agg({'total_rooms': 'avg'}).show()

+------------------+
|  avg(total_rooms)|
+------------------+
|2635.7630813953488|
+------------------+



In [45]:
# Avg of every col
from pyspark.sql.functions import mean
df.select(*[mean(c) for c in df.columns]).show()

+-------+-------------------+----------------+-----------------------+------------------+-------------------+------------------+-----------------+------------------+-----------------------+--------------------+
|avg(id)|     avg(longitude)|   avg(latitude)|avg(housing_median_age)|  avg(total_rooms)|avg(total_bedrooms)|   avg(population)|  avg(households)|avg(median_income)|avg(median_house_value)|avg(ocean_proximity)|
+-------+-------------------+----------------+-----------------------+------------------+-------------------+------------------+-----------------+------------------+-----------------------+--------------------+
|10319.5|-119.56970445736148|35.6318614341087|     28.639486434108527|2635.7630813953488|  537.8705525375618|1425.4767441860465|499.5396802325581|3.8706710029070246|     206855.81690891474|                NULL|
+-------+-------------------+----------------+-----------------------+------------------+-------------------+------------------+-----------------+----------

In [46]:
# calculate avg of every column group by ocean_proximity
df.groupby('ocean_proximity').agg({col : 'avg' for col in df.columns[3:-1]}).show()

+---------------+------------------+------------------+-------------------+------------------+------------------+-----------------------+-----------------------+
|ocean_proximity|   avg(households)|   avg(population)|avg(total_bedrooms)|avg(median_income)|  avg(total_rooms)|avg(median_house_value)|avg(housing_median_age)|
+---------------+------------------+------------------+-------------------+------------------+------------------+-----------------------+-----------------------+
|         ISLAND|             276.6|             668.0|              420.4|2.7444200000000003|            1574.6|               380440.0|                   42.4|
|     NEAR OCEAN|501.24454477050415|1354.0086531226486|  538.6156773211568| 4.005784800601957| 2583.700902934537|     249433.97742663656|     29.347253574115875|
|       NEAR BAY| 488.6161572052402|1230.3174672489083|  514.1828193832599| 4.172884759825336| 2493.589519650655|     259212.31179039303|      37.73013100436681|
|      <1H OCEAN| 517.744964

In [47]:
# squre of total_rooms columns

from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf

def squared(value):
  return value * value

squared_udf = udf(squared, FloatType())

df.withColumn('total_rooms_squared',squared_udf('total_rooms')).show(5)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|total_rooms_squared|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|           774400.0|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|          5.03958E7|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|          2152089.0|
|  3|  -122.25|   37.85|    

In [48]:
df.show(5)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  3|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  4| 

In [49]:
# train and test split into 70 30

train, test = df.randomSplit([0.7, 0.3])
train, test

(DataFrame[id: bigint, longitude: double, latitude: double, housing_median_age: double, total_rooms: double, total_bedrooms: double, population: double, households: double, median_income: double, median_house_value: double, ocean_proximity: string],
 DataFrame[id: bigint, longitude: double, latitude: double, housing_median_age: double, total_rooms: double, total_bedrooms: double, population: double, households: double, median_income: double, median_house_value: double, ocean_proximity: string])

In [50]:
# remove some columns
numerical_features_lst = train.columns
numerical_features_lst.remove('median_house_value')
numerical_features_lst.remove('id')
numerical_features_lst.remove('ocean_proximity')
numerical_features_lst

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income']

In [51]:
# imputting missing values

from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols= numerical_features_lst,
                  outputCols= numerical_features_lst)

In [52]:
imputer = imputer.fit(train)
train = imputer.transform(train)
test = imputer.transform(test)
train.show(3)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  3|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
only s

In [53]:
from pyspark.ml.feature import VectorAssembler

numerical_vector_assembler = VectorAssembler(inputCols=numerical_features_lst,
                                             outputCol= 'numerical_feature_vector')
train = numerical_vector_assembler.transform(train)
test = numerical_vector_assembler.transform(test)
train.show(2)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|    [-122.23,37.88,41...|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|    [-122.22,37.86,21...|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------

In [54]:
train.select('numerical_feature_vector').take(2)

[Row(numerical_feature_vector=DenseVector([-122.23, 37.88, 41.0, 880.0, 129.0, 322.0, 126.0, 8.3252])),
 Row(numerical_feature_vector=DenseVector([-122.22, 37.86, 21.0, 7099.0, 1106.0, 2401.0, 1138.0, 8.3014]))]

In [55]:
# Standarize
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol= 'numerical_feature_vector',
                        outputCol= 'scaled_numerical_feature_vector',
                        withStd=True , withMean=True)
scaler = scaler.fit(train)
train = scaler.transform(train)
test = scaler.transform(test)

In [56]:
train.select('scaled_numerical_feature_vector').take(3)

[Row(scaled_numerical_feature_vector=DenseVector([-1.3257, 1.0424, 0.9828, -0.7951, -0.9604, -0.9529, -0.9646, 2.351])),
 Row(scaled_numerical_feature_vector=DenseVector([-1.3207, 1.0331, -0.6088, 2.0177, 1.3295, 0.8391, 1.6492, 2.3385])),
 Row(scaled_numerical_feature_vector=DenseVector([-1.3357, 1.0284, 1.8581, -0.6169, -0.7119, -0.7495, -0.7244, 0.9401]))]

In [57]:
# change  ocean_proximity categorical into numerical
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol='ocean_proximity',
                         outputCol='ocean_category_index')
indexer = indexer.fit(train)
train = indexer.transform(train)
test = indexer.transform(test)

In [58]:
train.select('ocean_category_index').take(3)

[Row(ocean_category_index=3.0),
 Row(ocean_category_index=3.0),
 Row(ocean_category_index=3.0)]

In [59]:
set(train.select('ocean_category_index').collect())

{Row(ocean_category_index=0.0),
 Row(ocean_category_index=1.0),
 Row(ocean_category_index=2.0),
 Row(ocean_category_index=3.0),
 Row(ocean_category_index=4.0)}

In [60]:
# one hot encoder for ocean category
from pyspark.ml.feature import OneHotEncoder

one_hot_encoder = OneHotEncoder(inputCol='ocean_category_index',
                                outputCol='ocean_category_one_hot_label')
one_hot_encoder =one_hot_encoder.fit(train)
train = one_hot_encoder.transform(train)
test = one_hot_encoder.transform(test)

train.show(3)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|ocean_category_index|ocean_category_one_hot_label|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|    [-122.23,37.88,41...|           [-1.3257292004562...|                 3.0|               (4,[3],[1.0])|
|  1

In [61]:
# combine scaled_vector_col and ocean_category_one_hot_label using assembler

assembler = VectorAssembler(inputCols=['scaled_numerical_feature_vector', 'ocean_category_one_hot_label'],
                            outputCol= 'final_feature_vector')

#assembler = assembler.fit(train)

train = assembler.transform(train)
test = assembler.transform(test)

train.show(3)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------------+--------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|ocean_category_index|ocean_category_one_hot_label|final_feature_vector|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------------+--------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|    [-122.23,37.88,41...|           [-1.325729200

In [62]:
train.select('final_feature_vector').take(3)

[Row(final_feature_vector=DenseVector([-1.3257, 1.0424, 0.9828, -0.7951, -0.9604, -0.9529, -0.9646, 2.351, 0.0, 0.0, 0.0, 1.0])),
 Row(final_feature_vector=DenseVector([-1.3207, 1.0331, -0.6088, 2.0177, 1.3295, 0.8391, 1.6492, 2.3385, 0.0, 0.0, 0.0, 1.0])),
 Row(final_feature_vector=DenseVector([-1.3357, 1.0284, 1.8581, -0.6169, -0.7119, -0.7495, -0.7244, 0.9401, 0.0, 0.0, 0.0, 1.0]))]

In [63]:
# Applying Linear Regression Algorithm
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol='final_feature_vector',
                      labelCol = 'median_house_value' )
lr

LinearRegression_3daa942a4fe1

In [64]:
lr = lr.fit(train)
lr

LinearRegressionModel: uid=LinearRegression_3daa942a4fe1, numFeatures=12

In [65]:
pred_train_df = lr.transform(train).withColumnRenamed('prediction','predicted_median_house_value')
pred_train_df.show(5)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------------+--------------------+----------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|ocean_category_index|ocean_category_one_hot_label|final_feature_vector|predicted_median_house_value|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------------+--------------------+----------------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.32

In [66]:
pred_test_df = lr.transform(test).withColumnRenamed('prediction','predicted_median_house_value')
pred_test_df.show(5)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------------+--------------------+----------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|ocean_category_index|ocean_category_one_hot_label|final_feature_vector|predicted_median_house_value|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------------+--------------------+----------------------------+
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.25

In [67]:
# convert into Dataframe
pred_test_pd_df = pred_test_df.toPandas()
pred_test_pd_df.head(2)

Unnamed: 0,id,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity,numerical_feature_vector,scaled_numerical_feature_vector,ocean_category_index,ocean_category_one_hot_label,final_feature_vector,predicted_median_house_value
0,2,-122.24,37.85,52.0,1467.0,190.0,496.0,177.0,7.2574,352100.0,NEAR BAY,"[-122.24, 37.85, 52.0, 1467.0, 190.0, 496.0, 1...","[-1.3307175917974219, 1.028421622267114, 1.858...",3.0,"(0.0, 0.0, 0.0, 1.0)","[-1.3307175917974219, 1.028421622267114, 1.858...",377094.109713
1,4,-122.25,37.85,52.0,1627.0,280.0,565.0,259.0,3.8462,342200.0,NEAR BAY,"[-122.25, 37.85, 52.0, 1627.0, 280.0, 565.0, 2...","[-1.335705983138561, 1.028421622267114, 1.8580...",3.0,"(0.0, 0.0, 0.0, 1.0)","[-1.335705983138561, 1.028421622267114, 1.8580...",255630.273449


In [68]:
# display predicted median house value and median house value

predictions_and_actuals = pred_test_df[['predicted_median_house_value','median_house_value']]
predictions_and_actuals_rdd = predictions_and_actuals.rdd

predictions_and_actuals_rdd.take(2)

[Row(predicted_median_house_value=377094.10971291416, median_house_value=352100.0),
 Row(predicted_median_house_value=255630.27344916924, median_house_value=342200.0)]

In [69]:
# convert into tuple  of predicted median house value and median house value
predictions_and_actuals_rdd = predictions_and_actuals_rdd.map(tuple)
predictions_and_actuals_rdd.take(2)

[(377094.10971291416, 352100.0), (255630.27344916924, 342200.0)]

In [70]:
# Metrics
from pyspark.mllib.evaluation import RegressionMetrics

metrics = RegressionMetrics(predictions_and_actuals_rdd)

s = '''
Mean Squared Error:  {0}
Root Mean Squared Error: {1}
Mean Absolute Error: {2}
R**2: {3}
'''.format(metrics.meanAbsoluteError,
           metrics.rootMeanSquaredError,
           metrics.meanSquaredError,
           metrics.r2)
print(s)




Mean Squared Error:  50709.372871131694
Root Mean Squared Error: 69069.64873239127
Mean Absolute Error: 4770616376.015919
R**2: 0.6445314183619053

