In [2]:
!pip install pyspark
!pip install findspark

import findspark
findspark.init()

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m1.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=537ad6c91ef41031c67df594493ccdb63a8079efb313c00c31e9aab451ec172b
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [58]:
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np

spark = SparkSession.builder\
                    .master("local[*]")\
                    .appName('example_spark')\
                    .getOrCreate()

spark

### Loading Data

In [12]:
# Reading Data
train_df = spark.read.csv('/content/sample_data/california_housing_train.csv', header=True)
test_df  = spark.read.csv('/content/sample_data/california_housing_test.csv', header=True)

train_df.limit(5).toPandas()

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value
0,-114.31,34.19,15.0,5612.0,1283.0,1015.0,472.0,1.4936,66900.0
1,-114.47,34.4,19.0,7650.0,1901.0,1129.0,463.0,1.82,80100.0
2,-114.56,33.69,17.0,720.0,174.0,333.0,117.0,1.6509,85700.0
3,-114.57,33.64,14.0,1501.0,337.0,515.0,226.0,3.1917,73400.0
4,-114.57,33.57,20.0,1454.0,326.0,624.0,262.0,1.925,65500.0


In [30]:
predictors, response = train_df.columns[:-1], train_df.columns[-1]

In [40]:
train_df.printSchema()

root
 |-- longitude: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- housing_median_age: string (nullable = true)
 |-- total_rooms: string (nullable = true)
 |-- total_bedrooms: string (nullable = true)
 |-- population: string (nullable = true)
 |-- households: string (nullable = true)
 |-- median_income: string (nullable = true)
 |-- median_house_value: string (nullable = true)



In [41]:
from pyspark.sql.types import DoubleType

In [44]:
for col in train_df.columns:
  train_df = train_df.withColumn(col, train_df[col].cast(DoubleType()))
  test_df = test_df.withColumn(col, test_df[col].cast(DoubleType()))

In [45]:
train_df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)



### Comparing Data Statistics

In [15]:
train_df.summary().toPandas()

Unnamed: 0,summary,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value
0,count,17000.0,17000.0,17000.0,17000.0,17000.0,17000.0,17000.0,17000.0,17000.0
1,mean,-119.56210823529376,35.6252247058827,28.58935294117647,2643.664411764706,539.4108235294118,1429.5739411764705,501.2219411764706,3.883578100000021,207300.9123529412
2,stddev,2.005166408426036,2.1373397946570867,12.586936981660406,2179.947071452777,421.4994515798648,1147.852959159527,384.5208408559016,1.908156518379104,115983.76438720895
3,min,-114.31,32.54,1.0,1000.0,1.0,100.0,1.0,0.4999,100000.0
4,25%,-121.79,33.93,18.0,1462.0,297.0,789.0,282.0,2.5662,119400.0
5,50%,-118.49,34.25,29.0,2127.0,434.0,1167.0,409.0,3.5445,180400.0
6,75%,-118.0,37.72,37.0,3150.0,648.0,1720.0,605.0,4.7639,265000.0
7,max,-124.35,41.95,9.0,9992.0,999.0,999.0,999.0,9.9071,99900.0


In [16]:
test_df.summary().toPandas()

Unnamed: 0,summary,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value
0,count,3000.0,3000.0,3000.0,3000.0,3000.0,3000.0,3000.0,3000.0,3000.0
1,mean,-119.58920000000028,35.63538999999999,28.845333333333333,2599.578666666667,529.9506666666666,1402.7986666666666,489.912,3.807271799999998,205846.275
2,stddev,1.9949362939550168,2.1296695233438334,12.555395554955757,2155.593331625582,415.6543681363231,1030.5430124122424,365.42270980552615,1.8545117296914773,113119.68746964622
3,min,-114.49,32.56,1.0,1000.0,100.0,100.0,10.0,0.4999,100000.0
4,25%,-121.81,33.93,18.0,1401.0,291.0,780.0,273.0,2.5437,121200.0
5,50%,-118.49,34.27,29.0,2105.0,437.0,1155.0,409.0,3.4871,177600.0
6,75%,-118.02,37.69,37.0,3128.0,636.0,1742.0,597.0,4.6556,263900.0
7,max,-124.18,41.92,9.0,9998.0,999.0,999.0,998.0,9.8708,99800.0


### Null Counts

In [25]:
import pyspark.sql.functions as f
get_nulls = lambda df: df.select(*[f.count(f.when(f.isnan(c) | f.col(c).isNull(), c)).alias(c) for c in df.columns]).show()

In [32]:
get_nulls(train_df)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|        0|       0|                 0|          0|             0|         0|         0|            0|                 0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+



In [27]:
get_nulls(test_df)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|        0|       0|                 0|          0|             0|         0|         0|            0|                 0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+



### Correlation Test

In [46]:
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler

In [85]:
def get_correlation(df, var_names):
  assembler = VectorAssembler(inputCols=var_names, outputCol="features", handleInvalid='keep')
  new_df = assembler.transform(df).select("features")
  correlation = Correlation.corr(new_df, "features", "pearson").collect()[0][0].toArray()
  corr_matrix = pd.DataFrame(correlation, columns=var_names, index=var_names)
  corr_matrix = corr_matrix.reset_index().melt(id_vars="index")
  corr_matrix.columns = ["var_1", "var_2", "correlation"]
  return corr_matrix

In [86]:
# Correlation among variables in training set.
train_corr = get_correlation(train_df, predictors)
train_corr

Unnamed: 0,var_1,var_2,correlation
0,longitude,longitude,1.000000
1,latitude,longitude,-0.925208
2,housing_median_age,longitude,-0.114250
3,total_rooms,longitude,0.047010
4,total_bedrooms,longitude,0.071802
...,...,...,...
59,total_rooms,median_income,0.195383
60,total_bedrooms,median_income,-0.013495
61,population,median_income,-0.000638
62,households,median_income,0.007644


In [99]:
# Hence correaltion nbetween latitute adn longitude is so high in trianing set.
# Lets drop latitute for now.
un_corr_predictors = set(train_corr[train_corr['correlation'].abs() < 0.8][['var_1', 'var_2']].values.reshape(-1))
un_corr_predictors

{'households',
 'housing_median_age',
 'latitude',
 'longitude',
 'median_income',
 'population',
 'total_bedrooms',
 'total_rooms'}

### Modelling

In [101]:
train_df.select(*un_corr_predictors).limit(5).toPandas()

Unnamed: 0,households,population,housing_median_age,longitude,median_income,latitude,total_bedrooms,total_rooms
0,472.0,1015.0,15.0,-114.31,1.4936,34.19,1283.0,5612.0
1,463.0,1129.0,19.0,-114.47,1.82,34.4,1901.0,7650.0
2,117.0,333.0,17.0,-114.56,1.6509,33.69,174.0,720.0
3,226.0,515.0,14.0,-114.57,3.1917,33.64,337.0,1501.0
4,262.0,624.0,20.0,-114.57,1.925,33.57,326.0,1454.0


In [123]:
from pyspark.ml.feature import StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

In [148]:
evaluator = RegressionEvaluator().setLabelCol('median_house_value').setMetricName('mae')

In [150]:
lr = LinearRegression(featuresCol = 'StandardFeatures', labelCol = response, regParam = 0.01)
pipeline = Pipeline(stages=[
    VectorAssembler(inputCols = list(un_corr_predictors), outputCol="Features"),
    StandardScaler(withMean=True, withStd=True, inputCol="Features", outputCol="StandardFeatures"),
    lr
    ])

In [151]:
paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.1, 0.01]) \
                              .addGrid(lr.fitIntercept, [False, True])\
                              .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
                              .build()

In [152]:
model = TrainValidationSplit(estimator = pipeline,
                             estimatorParamMaps = paramGrid,
                             evaluator = evaluator,
                             trainRatio = 0.8).fit(train_df)

### Performance Evalaution

In [153]:
train_df_preds = model.transform(train_df).select(response, "prediction")
test_df_preds = model.transform(test_df).select(response, "prediction")

In [154]:
evaluator = RegressionEvaluator().setPredictionCol('prediction').setLabelCol('median_house_value').setMetricName('mae')

In [155]:
evaluator.evaluate(train_df_preds), evaluator.evaluate(test_df_preds)

(50800.25930223442, 50355.592475132595)