In [1]:
# Import SparkSession
from pyspark.sql import SparkSession

In [2]:
# Create SparkSession
spark = SparkSession.builder\
        .master('local')\
        .appName('Pyspark_ML_Lib_Introduction')\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [3]:
spark

In [4]:
df = spark.read.format('csv').load('./housing.csv', header=True, inferSchema=True)

df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



In [5]:
df.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [6]:
from pyspark.sql.functions import monotonically_increasing_id

df = df.withColumn('id', monotonically_increasing_id())

df = df[['id'] + df.columns[:-1]]  # re-ordering the column. put the id in the first column and keep the rest of the columns

df.show(3)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
only s

In [7]:
df.count()

20640

In [8]:
df.select('total_rooms').agg({'total_rooms': 'avg'}).show()

+------------------+
|  avg(total_rooms)|
+------------------+
|2635.7630813953488|
+------------------+



In [9]:
from pyspark.sql.functions import mean

df.select(*[mean(c) for c in df.columns]).show()

+-------+-------------------+----------------+-----------------------+------------------+-------------------+------------------+-----------------+------------------+-----------------------+--------------------+
|avg(id)|     avg(longitude)|   avg(latitude)|avg(housing_median_age)|  avg(total_rooms)|avg(total_bedrooms)|   avg(population)|  avg(households)|avg(median_income)|avg(median_house_value)|avg(ocean_proximity)|
+-------+-------------------+----------------+-----------------------+------------------+-------------------+------------------+-----------------+------------------+-----------------------+--------------------+
|10319.5|-119.56970445736148|35.6318614341087|     28.639486434108527|2635.7630813953488|  537.8705525375618|1425.4767441860465|499.5396802325581|3.8706710029070246|     206855.81690891474|                null|
+-------+-------------------+----------------+-----------------------+------------------+-------------------+------------------+-----------------+----------

In [10]:
df.groupby('ocean_proximity').agg({col: 'avg' for col in df.columns[3:-1]}).show()

+---------------+------------------+------------------+-------------------+------------------+------------------+-----------------------+-----------------------+
|ocean_proximity|   avg(households)|   avg(population)|avg(total_bedrooms)|avg(median_income)|  avg(total_rooms)|avg(median_house_value)|avg(housing_median_age)|
+---------------+------------------+------------------+-------------------+------------------+------------------+-----------------------+-----------------------+
|       NEAR BAY| 488.6161572052402|1230.3174672489083|  514.1828193832599| 4.172884759825336| 2493.589519650655|     259212.31179039303|      37.73013100436681|
|      <1H OCEAN| 517.7449649737302|1520.2904991243433|  546.5391852999778|4.2306819176882655|2628.3435858143607|     240084.28546409807|     29.279225043782837|
|         INLAND|477.44756525721266|1391.0462524805373|  533.8816194581281| 3.208996382231716|2717.7427873607085|     124805.39200122119|      24.27186689055106|
|     NEAR OCEAN|501.2445447

In [11]:
from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf

def squared(value):
    return value * value;

squared_udf = udf(squared, FloatType())

df.withColumn('total_rooms_squared', squared_udf('total_rooms')).show(5)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|total_rooms_squared|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|           774400.0|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|          5.03958E7|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|          2152089.0|
|  3|  -122.25|   37.85|    

ML Lib

In [12]:
train, test = df.randomSplit([0.7, 0.3]) # 70% train 30% test

train, test

(DataFrame[id: bigint, longitude: double, latitude: double, housing_median_age: double, total_rooms: double, total_bedrooms: double, population: double, households: double, median_income: double, median_house_value: double, ocean_proximity: string],
 DataFrame[id: bigint, longitude: double, latitude: double, housing_median_age: double, total_rooms: double, total_bedrooms: double, population: double, households: double, median_income: double, median_house_value: double, ocean_proximity: string])

First deal with continuous varialble then deal with the categorial variable (ocean_proximity)

In [13]:
numerical_features_lst = train.columns
numerical_features_lst.remove('median_house_value')
numerical_features_lst.remove('id')
numerical_features_lst.remove('ocean_proximity')

numerical_features_lst

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income']

deal with missing values by imputing

In [14]:
from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols=numerical_features_lst,
                 outputCols=numerical_features_lst) # override the column names

imputer = imputer.fit(train) # me imputation by default

train = imputer.transform(train)

test = imputer.transform(test)

train.show(3)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  4|  -122.25|   37.85|              52.0|     1627.0|         280.0|     565.0|     259.0|       3.8462|          342200.0|       NEAR BAY|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
only s

pyspark to work on ML, it doesnt work on the current columns, it doesnt like, what it want is all the columns and put it on a vector

In [15]:
from pyspark.ml.feature import VectorAssembler

numerical_vector_assembler = VectorAssembler(inputCols=numerical_features_lst,
                                            outputCol='numerical_feature_vector')

train = numerical_vector_assembler.transform(train)

test = numerical_vector_assembler.transform(test)

train.show(2)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|    [-122.23,37.88,41...|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|    [-122.24,37.85,52...|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------

In [16]:
train.select('numerical_feature_vector').take(2)

[Row(numerical_feature_vector=DenseVector([-122.23, 37.88, 41.0, 880.0, 129.0, 322.0, 126.0, 8.3252])),
 Row(numerical_feature_vector=DenseVector([-122.24, 37.85, 52.0, 1467.0, 190.0, 496.0, 177.0, 7.2574]))]

standardasized or perform some preprocessing function to get those contineous variable in some sort of range and style 

In [17]:
from pyspark.ml.feature import StandardScaler

scalar = StandardScaler(inputCol='numerical_feature_vector',
                       outputCol='scaled_numerical_feature_vector',
                       withStd=True,
                       withMean=True)

scalar = scalar.fit(train)

train = scalar.transform(train)
test = scalar.transform(test)

train.show(3)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|    [-122.23,37.88,41...|           [-1.3308192295028...|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|    [-122.24,37.85,52...|           [-1.3358020518939...|
|  4|

In [18]:
train.select('scaled_numerical_feature_vector').take(3)

[Row(scaled_numerical_feature_vector=DenseVector([-1.3308, 1.0576, 0.9837, -0.8139, -0.9794, -0.9747, -0.9846, 2.3346])),
 Row(scaled_numerical_feature_vector=DenseVector([-1.3358, 1.0435, 1.8586, -0.5425, -0.8336, -0.8212, -0.8503, 1.775])),
 Row(scaled_numerical_feature_vector=DenseVector([-1.3408, 1.0435, 1.8586, -0.4685, -0.6186, -0.7603, -0.6343, -0.0124]))]

Categorical variable

In [19]:
from pyspark.ml.feature import StringIndexer # take of each string of categorical variable (NEAR BAY, NEAR ISLAND etc) and map it in particular row

indexer = StringIndexer(inputCol='ocean_proximity',
                       outputCol='ocean_category_index')

indexer = indexer.fit(train)
train = indexer.transform(train)
test = indexer.transform(test)

train.show(3)


+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|ocean_category_index|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|    [-122.23,37.88,41...|           [-1.3308192295028...|                 3.0|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          3521

In [20]:
set(train.select('ocean_category_index').collect()) # set to see the categories created

{Row(ocean_category_index=0.0),
 Row(ocean_category_index=1.0),
 Row(ocean_category_index=2.0),
 Row(ocean_category_index=3.0),
 Row(ocean_category_index=4.0)}

In [21]:
# use one hot encoding
from pyspark.ml.feature import OneHotEncoder

one_hot_encoder = OneHotEncoder(inputCol='ocean_category_index',
                               outputCol='ocean_category_index_one_hot')

one_hot_encoder = one_hot_encoder.fit(train)

train = one_hot_encoder.transform(train)
test = one_hot_encoder.transform(test)

train.show(3)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|ocean_category_index|ocean_category_index_one_hot|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|    [-122.23,37.88,41...|           [-1.3308192295028...|                 3.0|               (4,[3],[1.0])|
|  2

In [22]:
# combine our one hot vector (categorical) and numerical feature

assembler = VectorAssembler(inputCols=['scaled_numerical_feature_vector', 'ocean_category_index_one_hot'],
                           outputCol='final_feature_vector')

train = assembler.transform(train)
test = assembler.transform(test)

train.show(2)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------------+--------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|ocean_category_index|ocean_category_index_one_hot|final_feature_vector|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------------+--------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|    [-122.23,37.88,41...|           [-1.330819229

In [23]:
train.select('final_feature_vector').take(2)

[Row(final_feature_vector=DenseVector([-1.3308, 1.0576, 0.9837, -0.8139, -0.9794, -0.9747, -0.9846, 2.3346, 0.0, 0.0, 0.0, 1.0])),
 Row(final_feature_vector=DenseVector([-1.3358, 1.0435, 1.8586, -0.5425, -0.8336, -0.8212, -0.8503, 1.775, 0.0, 0.0, 0.0, 1.0]))]

In [24]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol='final_feature_vector',
                     labelCol='median_house_value')

lr

LinearRegression_ba68b37d8fa4

In [25]:
lr = lr.fit(train)

lr

LinearRegressionModel: uid=LinearRegression_ba68b37d8fa4, numFeatures=12

In [26]:
pred_train_df = lr.transform(train).withColumnRenamed('prediction', 'predicted_median_house_value') # returns train but add a prediction column for each row

pred_train_df.show(5)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------------+--------------------+----------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|ocean_category_index|ocean_category_index_one_hot|final_feature_vector|predicted_median_house_value|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------------+--------------------+----------------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.32

In [27]:
pred_test_df = lr.transform(test).withColumnRenamed('prediction', 'predicted_median_house_value') # returns train but add a prediction column for each row

pred_test_df.show(5)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------------+--------------------+----------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|ocean_category_index|ocean_category_index_one_hot|final_feature_vector|predicted_median_house_value|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------------+--------------------+----------------------------+
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.30

In [28]:
# convert spark dataframe into pandas dataframe since the heavy lifting was already done by spark
pred_test_pd_df = pred_test_df.toPandas()

pred_test_pd_df.head(2)

Unnamed: 0,id,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity,numerical_feature_vector,scaled_numerical_feature_vector,ocean_category_index,ocean_category_index_one_hot,final_feature_vector,predicted_median_house_value
0,1,-122.22,37.86,21.0,7099.0,1106.0,2401.0,1138.0,8.3014,358500.0,NEAR BAY,"[-122.22, 37.86, 21.0, 7099.0, 1106.0, 2401.0,...","[-1.3258364071117357, 1.0482215687072243, -0.6...",3.0,"(0.0, 0.0, 0.0, 1.0)","[-1.3258364071117357, 1.0482215687072243, -0.6...",421649.078477
1,3,-122.25,37.85,52.0,1274.0,235.0,558.0,219.0,5.6431,341300.0,NEAR BAY,"[-122.25, 37.85, 52.0, 1274.0, 235.0, 558.0, 2...","[-1.3407848742851176, 1.0435336653939329, 1.85...",3.0,"(0.0, 0.0, 0.0, 1.0)","[-1.3407848742851176, 1.0435336653939329, 1.85...",318463.828988


In [29]:
predictions_and_actuals = pred_test_df[['predicted_median_house_value', 'median_house_value']]

predictions_and_actuals_rdd = predictions_and_actuals.rdd # have to convert to rdd

predictions_and_actuals_rdd.take(2)

[Row(predicted_median_house_value=421649.0784774951, median_house_value=358500.0),
 Row(predicted_median_house_value=318463.828987621, median_house_value=341300.0)]

In [30]:
predictions_and_actuals_rdd = predictions_and_actuals_rdd.map(tuple)

predictions_and_actuals_rdd.take(2)

[(421649.0784774951, 358500.0), (318463.828987621, 341300.0)]

In [31]:
from pyspark.mllib.evaluation import RegressionMetrics

metrics = RegressionMetrics(predictions_and_actuals_rdd)

s = '''
Mean Squared Error:      {0}
Root Mean Squared Error: {1}
Mean Absolute Error:     {2}
R**2:                    {3}
'''.format(metrics.meanSquaredError,
          metrics.rootMeanSquaredError,
          metrics.meanAbsoluteError,
          metrics.r2)

print(s)


Mean Squared Error:      4711799460.4714
Root Mean Squared Error: 68642.54847010999
Mean Absolute Error:     49781.8138471449
R**2:                    0.6491560071406317

