In [194]:
!pip install pyspark


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m24.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [234]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .master("local") \
        .appName("Spark MLlib") \
        .config('spark.ui.port', '4040') \
        .getOrCreate()

In [257]:
df = spark.read.format("csv").load("ResaleflatpricesbasedonregistrationdatefromJan2017onwards.csv", header=True, inferSchema=True)

In [258]:
df.printSchema()

root
 |-- month: timestamp (nullable = true)
 |-- town: string (nullable = true)
 |-- flat_type: string (nullable = true)
 |-- block: string (nullable = true)
 |-- street_name: string (nullable = true)
 |-- storey_range: string (nullable = true)
 |-- floor_area_sqm: double (nullable = true)
 |-- flat_model: string (nullable = true)
 |-- lease_commence_date: integer (nullable = true)
 |-- remaining_lease: string (nullable = true)
 |-- resale_price: double (nullable = true)



In [259]:
df.show(10)

+-------------------+----------+---------+-----+-----------------+------------+--------------+--------------+-------------------+------------------+------------+
|              month|      town|flat_type|block|      street_name|storey_range|floor_area_sqm|    flat_model|lease_commence_date|   remaining_lease|resale_price|
+-------------------+----------+---------+-----+-----------------+------------+--------------+--------------+-------------------+------------------+------------+
|2017-01-01 00:00:00|ANG MO KIO|   2 ROOM|  406|ANG MO KIO AVE 10|    10 TO 12|          44.0|      Improved|               1979|61 years 04 months|    232000.0|
|2017-01-01 00:00:00|ANG MO KIO|   3 ROOM|  108| ANG MO KIO AVE 4|    01 TO 03|          67.0|New Generation|               1978|60 years 07 months|    250000.0|
|2017-01-01 00:00:00|ANG MO KIO|   3 ROOM|  602| ANG MO KIO AVE 5|    01 TO 03|          67.0|New Generation|               1980|62 years 05 months|    262000.0|
|2017-01-01 00:00:00|ANG MO 

In [260]:
from pyspark.sql.functions import monotonically_increasing_id
df = df.withColumn('id', monotonically_increasing_id())
df = df[['id'] + df.columns[:-1]]
# df.show(10)
df.count()

188947

In [261]:
df.show(5)

+---+-------------------+----------+---------+-----+-----------------+------------+--------------+--------------+-------------------+------------------+------------+
| id|              month|      town|flat_type|block|      street_name|storey_range|floor_area_sqm|    flat_model|lease_commence_date|   remaining_lease|resale_price|
+---+-------------------+----------+---------+-----+-----------------+------------+--------------+--------------+-------------------+------------------+------------+
|  0|2017-01-01 00:00:00|ANG MO KIO|   2 ROOM|  406|ANG MO KIO AVE 10|    10 TO 12|          44.0|      Improved|               1979|61 years 04 months|    232000.0|
|  1|2017-01-01 00:00:00|ANG MO KIO|   3 ROOM|  108| ANG MO KIO AVE 4|    01 TO 03|          67.0|New Generation|               1978|60 years 07 months|    250000.0|
|  2|2017-01-01 00:00:00|ANG MO KIO|   3 ROOM|  602| ANG MO KIO AVE 5|    01 TO 03|          67.0|New Generation|               1980|62 years 05 months|    262000.0|
|  3

In [262]:
from pyspark.sql.functions import regexp_extract, col, expr, coalesce, lit

# Data transformation
df = df.withColumn("remaining_lease_years", regexp_extract(col("remaining_lease"), r"(\d+) years", 1).cast("int")) \
  .withColumn("remaining_lease_months", regexp_extract(col("remaining_lease"), r"(\d+) month", 1).cast("int"))

df = df.withColumn("remaining_lease_months", coalesce(col("remaining_lease_months"), lit(0)))

df = df.withColumn("remaining_lease_in_months", expr("remaining_lease_years * 12 + remaining_lease_months"))


In [263]:
df = df.drop('month', 'block', 'street_name', 'lease_commence_date', 'remaining_lease', 'remaining_lease_years', 'remaining_lease_months')

In [264]:
# Variables used for training: 
    # Categorical: town, flat_type, storey_range, floor_area_sql, flat_model
    # Numerical: floor_area_sql, remaining_lease_in_months
# Predict resale_price
df.show(5)

+---+----------+---------+------------+--------------+--------------+------------+-------------------------+
| id|      town|flat_type|storey_range|floor_area_sqm|    flat_model|resale_price|remaining_lease_in_months|
+---+----------+---------+------------+--------------+--------------+------------+-------------------------+
|  0|ANG MO KIO|   2 ROOM|    10 TO 12|          44.0|      Improved|    232000.0|                      736|
|  1|ANG MO KIO|   3 ROOM|    01 TO 03|          67.0|New Generation|    250000.0|                      727|
|  2|ANG MO KIO|   3 ROOM|    01 TO 03|          67.0|New Generation|    262000.0|                      749|
|  3|ANG MO KIO|   3 ROOM|    04 TO 06|          68.0|New Generation|    265000.0|                      745|
|  4|ANG MO KIO|   3 ROOM|    01 TO 03|          67.0|New Generation|    265000.0|                      749|
+---+----------+---------+------------+--------------+--------------+------------+-------------------------+
only showing top 5 

In [265]:
train, test = df.randomSplit([0.7, 0.3])
train, test 

(DataFrame[id: bigint, town: string, flat_type: string, storey_range: string, floor_area_sqm: double, flat_model: string, resale_price: double, remaining_lease_in_months: int],
 DataFrame[id: bigint, town: string, flat_type: string, storey_range: string, floor_area_sqm: double, flat_model: string, resale_price: double, remaining_lease_in_months: int])

In [266]:
numerical_features_lst = train.columns
numerical_features_lst.remove('id')
numerical_features_lst.remove('town')
numerical_features_lst.remove('flat_type')
numerical_features_lst.remove('storey_range')
numerical_features_lst.remove('flat_model')
numerical_features_lst.remove('resale_price')
numerical_features_lst


['floor_area_sqm', 'remaining_lease_in_months']

In [267]:
from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols=numerical_features_lst, outputCols=numerical_features_lst)
imputer = imputer.fit(train)
train = imputer.transform(train)
test = imputer.transform(test)

train.show(3)

+---+----------+---------+------------+--------------+--------------+------------+-------------------------+
| id|      town|flat_type|storey_range|floor_area_sqm|    flat_model|resale_price|remaining_lease_in_months|
+---+----------+---------+------------+--------------+--------------+------------+-------------------------+
|  1|ANG MO KIO|   3 ROOM|    01 TO 03|          67.0|New Generation|    250000.0|                      727|
|  2|ANG MO KIO|   3 ROOM|    01 TO 03|          67.0|New Generation|    262000.0|                      749|
|  5|ANG MO KIO|   3 ROOM|    01 TO 03|          68.0|New Generation|    275000.0|                      756|
+---+----------+---------+------------+--------------+--------------+------------+-------------------------+
only showing top 3 rows



In [268]:
from pyspark.ml.feature import VectorAssembler 
numerical_vector_assembler = VectorAssembler(inputCols=numerical_features_lst, outputCol='numerical_feature_vector')
train = numerical_vector_assembler.transform(train)
test = numerical_vector_assembler.transform(test)
train.show(3)

+---+----------+---------+------------+--------------+--------------+------------+-------------------------+------------------------+
| id|      town|flat_type|storey_range|floor_area_sqm|    flat_model|resale_price|remaining_lease_in_months|numerical_feature_vector|
+---+----------+---------+------------+--------------+--------------+------------+-------------------------+------------------------+
|  1|ANG MO KIO|   3 ROOM|    01 TO 03|          67.0|New Generation|    250000.0|                      727|            [67.0,727.0]|
|  2|ANG MO KIO|   3 ROOM|    01 TO 03|          67.0|New Generation|    262000.0|                      749|            [67.0,749.0]|
|  5|ANG MO KIO|   3 ROOM|    01 TO 03|          68.0|New Generation|    275000.0|                      756|            [68.0,756.0]|
+---+----------+---------+------------+--------------+--------------+------------+-------------------------+------------------------+
only showing top 3 rows



In [269]:
from pyspark.ml.feature import StandardScaler
# Normal distribution
scaler = StandardScaler(inputCol='numerical_feature_vector', outputCol='scaled_numerical_feature_vector', withStd=True, withMean=True)
scaler = scaler.fit(train)
train = scaler.transform(train)
test = scaler.transform(test)
train.show(3)

+---+----------+---------+------------+--------------+--------------+------------+-------------------------+------------------------+-------------------------------+
| id|      town|flat_type|storey_range|floor_area_sqm|    flat_model|resale_price|remaining_lease_in_months|numerical_feature_vector|scaled_numerical_feature_vector|
+---+----------+---------+------------+--------------+--------------+------------+-------------------------+------------------------+-------------------------------+
|  1|ANG MO KIO|   3 ROOM|    01 TO 03|          67.0|New Generation|    250000.0|                      727|            [67.0,727.0]|           [-1.2519466170961...|
|  2|ANG MO KIO|   3 ROOM|    01 TO 03|          67.0|New Generation|    262000.0|                      749|            [67.0,749.0]|           [-1.2519466170961...|
|  5|ANG MO KIO|   3 ROOM|    01 TO 03|          68.0|New Generation|    275000.0|                      756|            [68.0,756.0]|           [-1.2103596230898...|
+---

In [270]:
train.select('scaled_numerical_feature_vector').take(5)

[Row(scaled_numerical_feature_vector=DenseVector([-1.2519, -0.9974])),
 Row(scaled_numerical_feature_vector=DenseVector([-1.2519, -0.8666])),
 Row(scaled_numerical_feature_vector=DenseVector([-1.2104, -0.8249])),
 Row(scaled_numerical_feature_vector=DenseVector([-1.2104, -0.932])),
 Row(scaled_numerical_feature_vector=DenseVector([-1.2519, -0.9439]))]

In [271]:
categorical_features_lst = train.columns
categorical_features_lst.remove('id')
categorical_features_lst.remove('floor_area_sqm')
categorical_features_lst.remove('remaining_lease_in_months')
categorical_features_lst.remove('resale_price')
categorical_features_lst.remove('numerical_feature_vector')
categorical_features_lst.remove('scaled_numerical_feature_vector')
categorical_features_lst


['town', 'flat_type', 'storey_range', 'flat_model']

In [272]:
from pyspark.ml.feature import StringIndexer
# Convert categorical variable to a particular value
# Not sure why using a Pipeline doesn't work -> Workaround is to do manually since categorically variable is quite small
indexer = StringIndexer(inputCol='town', outputCol='town_index')
indexer = indexer.fit(train)
train = indexer.transform(train)
test = indexer.transform(test)

indexer = StringIndexer(inputCol='flat_type', outputCol='flat_type_index')
indexer = indexer.fit(train)
train = indexer.transform(train)
test = indexer.transform(test)

indexer = StringIndexer(inputCol='storey_range', outputCol='storey_range_index')
indexer = indexer.fit(train)
train = indexer.transform(train)
test = indexer.transform(test)

indexer = StringIndexer(inputCol='flat_model', outputCol='flat_model_index')
indexer = indexer.fit(train)
train = indexer.transform(train)
test = indexer.transform(test)

train.show(3)


+---+----------+---------+------------+--------------+--------------+------------+-------------------------+------------------------+-------------------------------+----------+---------------+------------------+----------------+
| id|      town|flat_type|storey_range|floor_area_sqm|    flat_model|resale_price|remaining_lease_in_months|numerical_feature_vector|scaled_numerical_feature_vector|town_index|flat_type_index|storey_range_index|flat_model_index|
+---+----------+---------+------------+--------------+--------------+------------+-------------------------+------------------------+-------------------------------+----------+---------------+------------------+----------------+
|  1|ANG MO KIO|   3 ROOM|    01 TO 03|          67.0|New Generation|    250000.0|                      727|            [67.0,727.0]|           [-1.2519466170961...|       9.0|            2.0|               3.0|             2.0|
|  2|ANG MO KIO|   3 ROOM|    01 TO 03|          67.0|New Generation|    262000.0|  

In [273]:
set(train.select('flat_model_index').collect())

{Row(flat_model_index=0.0),
 Row(flat_model_index=1.0),
 Row(flat_model_index=2.0),
 Row(flat_model_index=3.0),
 Row(flat_model_index=4.0),
 Row(flat_model_index=5.0),
 Row(flat_model_index=6.0),
 Row(flat_model_index=7.0),
 Row(flat_model_index=8.0),
 Row(flat_model_index=9.0),
 Row(flat_model_index=10.0),
 Row(flat_model_index=11.0),
 Row(flat_model_index=12.0),
 Row(flat_model_index=13.0),
 Row(flat_model_index=14.0),
 Row(flat_model_index=15.0),
 Row(flat_model_index=16.0),
 Row(flat_model_index=17.0),
 Row(flat_model_index=18.0),
 Row(flat_model_index=19.0),
 Row(flat_model_index=20.0)}

In [274]:
from pyspark.ml.feature import OneHotEncoder
one_hot_encoder = OneHotEncoder(inputCol='town_index', outputCol='town_one_hot')
one_hot_encoder = one_hot_encoder.fit(train)
train = one_hot_encoder.transform(train)
test = one_hot_encoder.transform(test)

one_hot_encoder = OneHotEncoder(inputCol='flat_type_index', outputCol='flat_type_one_hot')
one_hot_encoder = one_hot_encoder.fit(train)
train = one_hot_encoder.transform(train)
test = one_hot_encoder.transform(test)

one_hot_encoder = OneHotEncoder(inputCol='storey_range_index', outputCol='storey_range_one_hot')
one_hot_encoder = one_hot_encoder.fit(train)
train = one_hot_encoder.transform(train)
test = one_hot_encoder.transform(test)

one_hot_encoder = OneHotEncoder(inputCol='flat_model_index', outputCol='flat_model_one_hot')
one_hot_encoder = one_hot_encoder.fit(train)
train = one_hot_encoder.transform(train)
test = one_hot_encoder.transform(test)

train.show(3)

+---+----------+---------+------------+--------------+--------------+------------+-------------------------+------------------------+-------------------------------+----------+---------------+------------------+----------------+--------------+-----------------+--------------------+------------------+
| id|      town|flat_type|storey_range|floor_area_sqm|    flat_model|resale_price|remaining_lease_in_months|numerical_feature_vector|scaled_numerical_feature_vector|town_index|flat_type_index|storey_range_index|flat_model_index|  town_one_hot|flat_type_one_hot|storey_range_one_hot|flat_model_one_hot|
+---+----------+---------+------------+--------------+--------------+------------+-------------------------+------------------------+-------------------------------+----------+---------------+------------------+----------------+--------------+-----------------+--------------------+------------------+
|  1|ANG MO KIO|   3 ROOM|    01 TO 03|          67.0|New Generation|    250000.0|            

In [277]:
# Combine numerical and categorical

assembler = VectorAssembler(inputCols=['scaled_numerical_feature_vector', 'town_one_hot', 'flat_type_one_hot', 'storey_range_one_hot', 'flat_model_one_hot'],
                            outputCol='final_feature_vector')
train = assembler.transform(train)
test = assembler.transform(test)
train.show(3)

+---+----------+---------+------------+--------------+--------------+------------+-------------------------+------------------------+-------------------------------+----------+---------------+------------------+----------------+--------------+-----------------+--------------------+------------------+--------------------+
| id|      town|flat_type|storey_range|floor_area_sqm|    flat_model|resale_price|remaining_lease_in_months|numerical_feature_vector|scaled_numerical_feature_vector|town_index|flat_type_index|storey_range_index|flat_model_index|  town_one_hot|flat_type_one_hot|storey_range_one_hot|flat_model_one_hot|final_feature_vector|
+---+----------+---------+------------+--------------+--------------+------------+-------------------------+------------------------+-------------------------------+----------+---------------+------------------+----------------+--------------+-----------------+--------------------+------------------+--------------------+
|  1|ANG MO KIO|   3 ROOM|    0

In [278]:
train.select('final_feature_vector').take(3)

[Row(final_feature_vector=SparseVector(69, {0: -1.2519, 1: -0.9974, 11: 1.0, 29: 1.0, 36: 1.0, 51: 1.0})),
 Row(final_feature_vector=SparseVector(69, {0: -1.2519, 1: -0.8666, 11: 1.0, 29: 1.0, 36: 1.0, 51: 1.0})),
 Row(final_feature_vector=SparseVector(69, {0: -1.2104, 1: -0.8249, 11: 1.0, 29: 1.0, 36: 1.0, 51: 1.0}))]

In [280]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol='final_feature_vector', 
                      labelCol='resale_price')
lr

LinearRegression_ea2e8f85e9c9

In [281]:
lr = lr.fit(train)

24/09/07 12:43:01 WARN Instrumentation: [9c5f1c4e] regParam is zero, which might cause numerical instability and overfitting.


In [282]:
pred_train_df = lr.transform(train).withColumnRenamed('prediction', 'predicted_resale_price')
pred_train_df.show(5)

+---+----------+---------+------------+--------------+--------------+------------+-------------------------+------------------------+-------------------------------+----------+---------------+------------------+----------------+--------------+-----------------+--------------------+------------------+--------------------+----------------------+
| id|      town|flat_type|storey_range|floor_area_sqm|    flat_model|resale_price|remaining_lease_in_months|numerical_feature_vector|scaled_numerical_feature_vector|town_index|flat_type_index|storey_range_index|flat_model_index|  town_one_hot|flat_type_one_hot|storey_range_one_hot|flat_model_one_hot|final_feature_vector|predicted_resale_price|
+---+----------+---------+------------+--------------+--------------+------------+-------------------------+------------------------+-------------------------------+----------+---------------+------------------+----------------+--------------+-----------------+--------------------+------------------+-------

In [283]:
pred_test_df = lr.transform(test).withColumnRenamed('prediction', 'predicted_resale_price')
pred_test_df.show(5)

+---+----------+---------+------------+--------------+--------------+------------+-------------------------+------------------------+-------------------------------+----------+---------------+------------------+----------------+--------------+-----------------+--------------------+------------------+--------------------+----------------------+
| id|      town|flat_type|storey_range|floor_area_sqm|    flat_model|resale_price|remaining_lease_in_months|numerical_feature_vector|scaled_numerical_feature_vector|town_index|flat_type_index|storey_range_index|flat_model_index|  town_one_hot|flat_type_one_hot|storey_range_one_hot|flat_model_one_hot|final_feature_vector|predicted_resale_price|
+---+----------+---------+------------+--------------+--------------+------------+-------------------------+------------------------+-------------------------------+----------+---------------+------------------+----------------+--------------+-----------------+--------------------+------------------+-------

In [290]:
predictions_and_actuals = pred_test_df[['predicted_resale_price', 'resale_price']]

predictions_and_actuals_rdd = predictions_and_actuals.rdd
predictions_and_actuals_rdd.take(2)


                                                                                

[Row(predicted_resale_price=247241.81254870223, resale_price=232000.0),
 Row(predicted_resale_price=352425.65344206576, resale_price=265000.0)]

In [291]:
predictions_and_actuals_rdd = predictions_and_actuals_rdd.map(tuple)
predictions_and_actuals_rdd.take(2)

[(247241.81254870223, 232000.0), (352425.65344206576, 265000.0)]

In [296]:
from pyspark.mllib.evaluation import RegressionMetrics

metrics = RegressionMetrics(predictions_and_actuals_rdd)
s = '''
Mean Squared Error:         {0}
Root Mean Squared Error:    {1}
Mean Absolute Error:        {2}
R**2:                       {3}
'''.format(metrics.meanSquaredError, metrics.rootMeanSquaredError, metrics.meanAbsoluteError, metrics.r2)

print(s)

ModuleNotFoundError: No module named 'distutils'