In [1]:
import os

# Local configs for Hadoop and Hive
os.environ['SPARK_HOME'] = '/home/paulnasdaq/spark-3.5.1-bin-without-hadoop'
os.environ['SPARK_DIST_CLASSPATH'] = '/home/paulnasdaq/hadoop-3.4.0/etc/hadoop:/home/paulnasdaq/hadoop-3.4.0/share/hadoop/common/lib/*:/home/paulnasdaq/hadoop-3.4.0/share/hadoop/common/*:/home/paulnasdaq/hadoop-3.4.0/share/hadoop/hdfs:/home/paulnasdaq/hadoop-3.4.0/share/hadoop/hdfs/lib/*:/home/paulnasdaq/hadoop-3.4.0/share/hadoop/hdfs/*:/home/paulnasdaq/hadoop-3.4.0/share/hadoop/mapreduce/*:/home/paulnasdaq/hadoop-3.4.0/share/hadoop/yarn:/home/paulnasdaq/hadoop-3.4.0/share/hadoop/yarn/lib/*:/home/paulnasdaq/hadoop-3.4.0/share/hadoop/yarn/*'
from pyspark.sql import SparkSession




In [2]:
spark = SparkSession.builder.getOrCreate()

In [None]:
# Optional (Load data from Hive)
# housing_df = spark.read.format('parquet').load('/usr/hive/warehouse/unsw_nb15.db/final')

In [3]:
# Load data from HDFS
housing_df = spark.read.csv("/housing.csv", header=True, inferSchema=True)

                                                                                

In [4]:
housing_df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



In [5]:
from pyspark.sql.functions import monotonically_increasing_id

housing_df = housing_df.withColumn('id', monotonically_increasing_id())

housing_df.count()

20640

In [6]:
housing_df.select('total_rooms').agg({'total_rooms': 'avg'}).show()

+------------------+
|  avg(total_rooms)|
+------------------+
|2635.7630813953488|
+------------------+



In [7]:
from pyspark.sql.functions import mean

housing_df.select(*[mean(c) for c in housing_df.columns]).show()

+-------------------+----------------+-----------------------+------------------+-------------------+------------------+-----------------+------------------+-----------------------+--------------------+-------+
|     avg(longitude)|   avg(latitude)|avg(housing_median_age)|  avg(total_rooms)|avg(total_bedrooms)|   avg(population)|  avg(households)|avg(median_income)|avg(median_house_value)|avg(ocean_proximity)|avg(id)|
+-------------------+----------------+-----------------------+------------------+-------------------+------------------+-----------------+------------------+-----------------------+--------------------+-------+
|-119.56970445736148|35.6318614341087|     28.639486434108527|2635.7630813953488|  537.8705525375618|1425.4767441860465|499.5396802325581|3.8706710029070246|     206855.81690891474|                NULL|10319.5|
+-------------------+----------------+-----------------------+------------------+-------------------+------------------+-----------------+------------------

In [8]:
housing_df.groupby('ocean_proximity').agg({col: 'avg' for col in housing_df.columns}).show()

+---------------+------------------+------------------+------------------+-------------------+-------------------+------------------+------------------+------------------+-----------------------+-----------------------+--------------------+
|ocean_proximity|   avg(households)|     avg(latitude)|   avg(population)|avg(total_bedrooms)|     avg(longitude)|           avg(id)|avg(median_income)|  avg(total_rooms)|avg(median_house_value)|avg(housing_median_age)|avg(ocean_proximity)|
+---------------+------------------+------------------+------------------+-------------------+-------------------+------------------+------------------+------------------+-----------------------+-----------------------+--------------------+
|         ISLAND|             276.6|33.358000000000004|             668.0|              420.4|           -118.354|            8316.0|2.7444200000000003|            1574.6|               380440.0|                   42.4|                NULL|
|     NEAR OCEAN|501.24454477050415|

In [9]:
from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf

def squared(value):
    return value * value

squared_udf = udf(squared, FloatType())

housing_df.withColumn('total_rooms_sq', squared_udf('total_rooms'))
housing_df.count()

20640

In [10]:
train, test = housing_df.randomSplit([0.7, 0.3])

train, test

(DataFrame[longitude: double, latitude: double, housing_median_age: double, total_rooms: double, total_bedrooms: double, population: double, households: double, median_income: double, median_house_value: double, ocean_proximity: string, id: bigint],
 DataFrame[longitude: double, latitude: double, housing_median_age: double, total_rooms: double, total_bedrooms: double, population: double, households: double, median_income: double, median_house_value: double, ocean_proximity: string, id: bigint])

In [11]:
numerical_features_list = train.columns
numerical_features_list.remove('median_house_value')
numerical_features_list.remove('id')
numerical_features_list.remove('ocean_proximity')

numerical_features_list

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income']

In [12]:
from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols=numerical_features_list, outputCols=numerical_features_list)

imputer = imputer.fit(train)
train = imputer.transform(train)

test = imputer.transform(test)

train.show()

                                                                                

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+----+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|  id|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+----+
|  -124.35|   40.54|              52.0|     1820.0|         300.0|     806.0|     270.0|       3.0147|           94600.0|     NEAR OCEAN|2655|
|   -124.3|    41.8|              19.0|     2672.0|         552.0|    1298.0|     478.0|       1.9797|           85800.0|     NEAR OCEAN|1851|
|   -124.3|   41.84|              17.0|     2677.0|         531.0|    1244.0|     456.0|       3.0313|          103600.0|     NEAR OCEAN|1861|
|  -124.27|   40.69|              36.0|     2349.0|         528.0|    1194.0|     465.0|       2.5179|           79000.0|     NEAR OCEAN|2631|

In [14]:
from pyspark.ml.feature import VectorAssembler

numerical_vector_assembler = VectorAssembler(inputCols=numerical_features_list, outputCol='numerical_feature_vector')

train = numerical_vector_assembler.transform(train)
test = numerical_vector_assembler.transform(test)

train.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+----+------------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|  id|numerical_feature_vector|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+----+------------------------+
|  -124.35|   40.54|              52.0|     1820.0|         300.0|     806.0|     270.0|       3.0147|           94600.0|     NEAR OCEAN|2655|    [-124.35,40.54,52...|
|   -124.3|    41.8|              19.0|     2672.0|         552.0|    1298.0|     478.0|       1.9797|           85800.0|     NEAR OCEAN|1851|    [-124.3,41.8,19.0...|
|   -124.3|   41.84|              17.0|     2677.0|         531.0|    1244.0|     456.0|       3.0313|          103600.0|     NEAR OCEAN|1861|    [-124.3,41.84,

In [15]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol='numerical_feature_vector', outputCol='scaled_numerical_feature_vector', withStd=True, withMean=True)

scaler = scaler.fit(train)

train = scaler.transform(train)
test = scaler.transform(test)



In [16]:
train.select('scaled_numerical_feature_vector').take(5)

[Row(scaled_numerical_feature_vector=DenseVector([-2.3835, 2.288, 1.8514, -0.3754, -0.5685, -0.5407, -0.6013, -0.4502])),
 Row(scaled_numerical_feature_vector=DenseVector([-2.3585, 2.8757, -0.7697, 0.0178, 0.0334, -0.1103, -0.0577, -0.9941])),
 Row(scaled_numerical_feature_vector=DenseVector([-2.3585, 2.8943, -0.9286, 0.0201, -0.0167, -0.1575, -0.1152, -0.4415])),
 Row(scaled_numerical_feature_vector=DenseVector([-2.3435, 2.3579, 0.5805, -0.1313, -0.0239, -0.2012, -0.0917, -0.7113])),
 Row(scaled_numerical_feature_vector=DenseVector([-2.3336, 2.1667, 0.2628, -0.5553, -0.2843, -0.8661, -0.8182, -1.0141]))]

In [17]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol='ocean_proximity', outputCol='ocean_category_index')

indexer = indexer.fit(train)
train = indexer.transform(train)
test = indexer.transform(test)

In [18]:
train.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+----+------------------------+-------------------------------+--------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|  id|numerical_feature_vector|scaled_numerical_feature_vector|ocean_category_index|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+----+------------------------+-------------------------------+--------------------+
|  -124.35|   40.54|              52.0|     1820.0|         300.0|     806.0|     270.0|       3.0147|           94600.0|     NEAR OCEAN|2655|    [-124.35,40.54,52...|           [-2.3834506612078...|                 2.0|
|   -124.3|    41.8|              19.0|     2672.0|         552.0|    1298.0|     478.0|       1.9797|           858

In [19]:
from pyspark.ml.feature import OneHotEncoder

one_hot_encoder = OneHotEncoder(inputCol='ocean_category_index', outputCol='ocean_category_one_hot')
one_hot_encoder = one_hot_encoder.fit(train)

train = one_hot_encoder.transform(train)
test = one_hot_encoder.transform(test)



In [20]:
train.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+----+------------------------+-------------------------------+--------------------+----------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|  id|numerical_feature_vector|scaled_numerical_feature_vector|ocean_category_index|ocean_category_one_hot|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+----+------------------------+-------------------------------+--------------------+----------------------+
|  -124.35|   40.54|              52.0|     1820.0|         300.0|     806.0|     270.0|       3.0147|           94600.0|     NEAR OCEAN|2655|    [-124.35,40.54,52...|           [-2.3834506612078...|                 2.0|         (4,[2],[1.0])|
|   -124.3|    41.8|    

In [21]:
assembler = VectorAssembler(inputCols=['scaled_numerical_feature_vector', 'ocean_category_one_hot'], outputCol='final_feature_vector')
train = assembler.transform(train)
test = assembler.transform(test)

In [40]:
train.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+----+------------------------+-------------------------------+--------------------+----------------------+--------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|  id|numerical_feature_vector|scaled_numerical_feature_vector|ocean_category_index|ocean_category_one_hot|final_feature_vector|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+----+------------------------+-------------------------------+--------------------+----------------------+--------------------+
|   -124.3|    41.8|              19.0|     2672.0|         552.0|    1298.0|     478.0|       1.9797|           85800.0|     NEAR OCEAN|1851|    [-124.3,41.8,19.0...|           [-2.3607740418622...|      

In [22]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol='final_feature_vector', labelCol='median_house_value')
lr = lr.fit(train)


                                                                                

In [23]:
pred_train_df = lr.transform(train).withColumnRenamed('prediction', 'predicted_median_house_value')
pred_train_df.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+----+------------------------+-------------------------------+--------------------+----------------------+--------------------+----------------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|  id|numerical_feature_vector|scaled_numerical_feature_vector|ocean_category_index|ocean_category_one_hot|final_feature_vector|predicted_median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+----+------------------------+-------------------------------+--------------------+----------------------+--------------------+----------------------------+
|  -124.35|   40.54|              52.0|     1820.0|         300.0|     806.0|     270.0|       3.0147|           94600

In [24]:
pred_test_df = lr.transform(test).withColumnRenamed('prediction', 'predicted_median_house_value')
pred_test_df.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+----+------------------------+-------------------------------+--------------------+----------------------+--------------------+----------------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|  id|numerical_feature_vector|scaled_numerical_feature_vector|ocean_category_index|ocean_category_one_hot|final_feature_vector|predicted_median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+----+------------------------+-------------------------------+--------------------+----------------------+--------------------+----------------------------+
|  -124.26|   40.58|              52.0|     2217.0|         394.0|     907.0|     369.0|       2.3571|          111400