In [1]:
import findspark
findspark.init()

In [2]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("version_check").getOrCreate()

# Print the PySpark version
print("PySpark version:", spark.version)

In [3]:
df = spark.read.format("csv").load("housing.csv", header=True, inferSchema=True)

df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



#### Rearrange the DataFrame columns according to the newly created 'id' column at the beginning.

In [4]:
from pyspark.sql.functions import monotonically_increasing_id

df = df.withColumn('id', monotonically_increasing_id())

df = df[['id'] + df.columns[:-1]]

df.show(3)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
only s

#### Split the dataframe into training and testing sets with a ratio of 70% for training and 30% for testing.

In [5]:
train, test = df.randomSplit([0.7, 0.3])

train, test

(DataFrame[id: bigint, longitude: double, latitude: double, housing_median_age: double, total_rooms: double, total_bedrooms: double, population: double, households: double, median_income: double, median_house_value: double, ocean_proximity: string],
 DataFrame[id: bigint, longitude: double, latitude: double, housing_median_age: double, total_rooms: double, total_bedrooms: double, population: double, households: double, median_income: double, median_house_value: double, ocean_proximity: string])

#### Retrieve the numerical and categorical values seperately
### Numerical values

In [6]:
numerical_features_lst = train.columns
numerical_features_lst.remove('median_house_value')
numerical_features_lst.remove('id')
numerical_features_lst.remove('ocean_proximity')

numerical_features_lst

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income']

####  Impute missing values in the dataframe: filling in missing or null values in a dataset with some estimated values.

In [7]:
from pyspark.ml.feature import Imputer

# Create an Imputer instance
imputer = Imputer(inputCols=numerical_features_lst,
                  outputCols=numerical_features_lst)

imputer = imputer.fit(train)

train = imputer.transform(train)
test = imputer.transform(test)

train.show(2)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
only showing top 2 rows



###  Create a single vector column from multiple numerical feature columns. 
##### Machine learning algorithms in PySpark expect the input features to be in vector format. By using VectorAssembler, I can create a single feature column that can be used as input to these algorithms.

In [8]:
from pyspark.ml.feature import VectorAssembler

numerical_vector_assembler = VectorAssembler(inputCols=numerical_features_lst,
                                             outputCol='numerical_feature_vector')

train = numerical_vector_assembler.transform(train)
test = numerical_vector_assembler.transform(test)

train.show(2)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|    [-122.23,37.88,41...|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|    [-122.24,37.85,52...|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------

In [9]:
train.select('numerical_feature_vector').take(2)

[Row(numerical_feature_vector=DenseVector([-122.23, 37.88, 41.0, 880.0, 129.0, 322.0, 126.0, 8.3252])),
 Row(numerical_feature_vector=DenseVector([-122.24, 37.85, 52.0, 1467.0, 190.0, 496.0, 177.0, 7.2574]))]

#### Fit the scale on training set to memorize the std and mean. The standardization process involves subtracting the mean value of the feature and dividing by its standard deviation.

In [10]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol='numerical_feature_vector',
                        outputCol='scaled_numerical_feature_vector',
                        withStd=True, withMean=True)

scaler = scaler.fit(train)

train = scaler.transform(train)
test = scaler.transform(test)

train.show(2)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|    [-122.23,37.88,41...|           [-1.3189315963185...|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|    [-122.24,37.85,52...|           [-1.3239057253870...|
+---+

In [11]:
train.select('scaled_numerical_feature_vector').take(2)

[Row(scaled_numerical_feature_vector=DenseVector([-1.3189, 1.0447, 0.9848, -0.8074, -0.9714, -0.9613, -0.9732, 2.3336])),
 Row(scaled_numerical_feature_vector=DenseVector([-1.3239, 1.0308, 1.8606, -0.537, -0.8268, -0.8103, -0.8407, 1.7751]))]

### Categorical values

In [12]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol='ocean_proximity',
                        outputCol='ocean_category_index')

indexer = indexer.fit(train)
train = indexer.transform(train)
test = indexer.transform(test)

train.show(3)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|ocean_category_index|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|    [-122.23,37.88,41...|           [-1.3189315963185...|                 3.0|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          3521

In [13]:
set(train.select('ocean_category_index').collect())

{Row(ocean_category_index=0.0),
 Row(ocean_category_index=1.0),
 Row(ocean_category_index=2.0),
 Row(ocean_category_index=3.0),
 Row(ocean_category_index=4.0)}

In [14]:
from pyspark.ml.feature import OneHotEncoder

one_hot_encoder = OneHotEncoder(inputCol='ocean_category_index',
                                outputCol='ocean_category_one_hot')

one_hot_encoder = one_hot_encoder.fit(train)

train = one_hot_encoder.transform(train)
test = one_hot_encoder.transform(test)

train.show(3)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|ocean_category_index|ocean_category_one_hot|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|    [-122.23,37.88,41...|           [-1.3189315963185...|                 3.0|         (4,[3],[1.0])|
|  2|  -122.24|   37.85|    

#### COMBINE FEATURES: Assemble numerical and categorical (one-hot encoded) features into a single vector

In [15]:
assembler = VectorAssembler(inputCols=['scaled_numerical_feature_vector',
                                       'ocean_category_one_hot'],
                            outputCol='final_feature_vector')

train = assembler.transform(train)
test = assembler.transform(test)
train.show(2)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------+--------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|ocean_category_index|ocean_category_one_hot|final_feature_vector|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------+--------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|    [-122.23,37.88,41...|           [-1.3189315963185...|          

In [16]:
train.select('final_feature_vector').take(2)

[Row(final_feature_vector=DenseVector([-1.3189, 1.0447, 0.9848, -0.8074, -0.9714, -0.9613, -0.9732, 2.3336, 0.0, 0.0, 0.0, 1.0])),
 Row(final_feature_vector=DenseVector([-1.3239, 1.0308, 1.8606, -0.537, -0.8268, -0.8103, -0.8407, 1.7751, 0.0, 0.0, 0.0, 1.0]))]

#### Train a linear regression model on the training data and use that model to predict the target variable 

In [17]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol='final_feature_vector',
                      labelCol='median_house_value')

lr = lr.fit(train)
pred_train_df = lr.transform(train).withColumnRenamed('prediction',
                                                      'predicted_median_house_value')

pred_train_df.show(5)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------+--------------------+----------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|ocean_category_index|ocean_category_one_hot|final_feature_vector|predicted_median_house_value|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------+--------------------+----------------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          45260

In [18]:
pred_test_df = lr.transform(test).withColumnRenamed('prediction', 'predicted_median_house_value')

pred_test_df.show(5)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------+--------------------+----------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|ocean_category_index|ocean_category_one_hot|final_feature_vector|predicted_median_house_value|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------+--------------------+----------------------------+
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          35850

In [19]:
pred_test_pd_df = pred_test_df.toPandas()

pred_test_pd_df.head(2)

Unnamed: 0,id,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity,numerical_feature_vector,scaled_numerical_feature_vector,ocean_category_index,ocean_category_one_hot,final_feature_vector,predicted_median_house_value
0,1,-122.22,37.86,21.0,7099.0,1106.0,2401.0,1138.0,8.3014,358500.0,NEAR BAY,"[-122.22, 37.86, 21.0, 7099.0, 1106.0, 2401.0,...","[-1.3139574672501082, 1.0354255364586464, -0.6...",3.0,"(0.0, 0.0, 0.0, 1.0)","[-1.3139574672501082, 1.0354255364586464, -0.6...",421919.104408
1,12,-122.26,37.85,52.0,2491.0,474.0,1098.0,468.0,3.075,213500.0,NEAR BAY,"[-122.26, 37.85, 52.0, 2491.0, 474.0, 1098.0, ...","[-1.3338539835240164, 1.0307644190375265, 1.86...",3.0,"(0.0, 0.0, 0.0, 1.0)","[-1.3338539835240164, 1.0307644190375265, 1.86...",232342.533921
