# Data Cleaning and Preprocessing
In this notebook, we focus on the data cleaning and preprocessing stages of our project. We will handle missing values, outliers, and categorical variables in our dataset. We will use a combination of Pandas library for initial data processing and the Spark DataFrame API for scalable data transformation and processing.

## Loading the data

In [1]:
from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession.builder.appName("HousingDataAnalysis").getOrCreate()

# Load data into Spark DataFrame
df = spark.read.format("csv").option("header","true").option("inferSchema", "true").load("housing.csv")

In the above code, we initialize a Spark Session and load our dataset into a Spark DataFrame.

## Initial Data Inspection

In [2]:
# Display the first few rows of the DataFrame
df.show(5)

# Print the schema of the DataFrame
df.printSchema()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

The above code snippets display the first few rows of the DataFrame and print out the schema of our data, which allows us to see the data types for each column.

## Processing

In [3]:
# Get numerical column names only
numerical_columns = [t[0] for t in df.dtypes if t[1] == 'int' or t[1] == 'double']

# Replace null values with the median
for col_name in numerical_columns:
    median_value = df.stat.approxQuantile(col_name, [0.5], 0.25)
    df = df.fillna(median_value[0], subset=[col_name])

To deal with missing values in our dataset, we're going to replace them with the median of the respective columns. We're choosing the median because it's a robust measure that is not as influenced by outliers as the mean.

To do this, we first create a list of the numerical columns in our DataFrame. Then, we iterate through these columns, computing the median (using the `approxQuantile` function) and replacing any null values with this median.

Note that the `approxQuantile` function takes three parameters: the column name, a list of quantile probabilities, and a relative error. Here, we set the quantile probability to 0.5 to compute the median, and the relative error to 0.25. The relative error parameter is used for performance tuning, with a higher value offering better performance but less accuracy.

Also note that we're only doing this for numerical columns. This is because calculating the median for categorical or string columns is not meaningful. For these types of columns, a different strategy would be needed, such as replacing nulls with the most frequent category.


In [4]:
# Remove outliers using the interquartile range
cols = ['median_income', 'housing_median_age', 'total_rooms', 'total_bedrooms',
        'population', 'households', 'median_house_value']
for col_name in cols:
    quantiles = df.stat.approxQuantile(col_name, [0.25, 0.75], 0.05)
    IQR = quantiles[1] - quantiles[0]
    bounds = [quantiles[0] - 1.5 * IQR, quantiles[1] + 1.5 * IQR]
    df = df.where((df[col_name] >= bounds[0]) & (df[col_name] <= bounds[1]))

In the second step, we handle outliers, which can skew our analysis and model predictions. To remove outliers, we use the interquartile range (IQR) method. This method considers values to be outliers if they are 1.5*IQR less than the first quartile or 1.5*IQR greater than the third quartile. The `approxQuantile` function is used to calculate the quartiles, and the `where` function is used to filter out the outliers.

In [5]:
from pyspark.sql.functions import col
# Create new feature 'rooms_per_household'
df = df.withColumn("rooms_per_household", col("total_rooms")/col("households"))

# Create new feature 'population_per_household'
df = df.withColumn("population_per_household", col("population")/col("households"))

# Create new feature 'bedrooms_per_room'
df = df.withColumn("bedrooms_per_room", col("total_bedrooms")/col("total_rooms"))

Next, we're going to create some new features that might help improve the performance of our future model. We're doing this by combining existing columns in ways that might better capture the underlying patterns in the data.

- `rooms_per_household`: This feature represents the average number of rooms per household in a district. We calculate this by dividing the total number of rooms by the total number of households.

- `population_per_household`: This feature represents the average population per household in a district. We calculate this by dividing the total population by the total number of households.

- `bedrooms_per_room`: This feature represents the proportion of bedrooms among total rooms in a district. We calculate this by dividing the total number of bedrooms by the total number of rooms.

We create these new features using the `withColumn` function, which creates a new DataFrame by adding a column or replacing the existing column that has the same name. The `col` function is used to select the columns to be operated on.

In [6]:
# Convert categorical feature 'ocean_proximity' to numerical using StringIndexer
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="ocean_proximity", outputCol="ocean_proximity_index")
df = indexer.fit(df).transform(df)

df.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+------------------------+-------------------+---------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|rooms_per_household|population_per_household|  bedrooms_per_room|ocean_proximity_index|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+------------------------+-------------------+---------------------+
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|  8.288135593220339|      2.8022598870056497|0.12951601908657123|                  3.0|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.643

Next, we handle categorical features. Machine learning algorithms require numerical input, so we need to convert categorical features to numerical. Here, we use the `StringIndexer` function to convert the 'ocean_proximity' column to numerical values. Each unique category in 'ocean_proximity' is mapped to a different number, creating a new 'ocean_proximity_index' column.

In [7]:
from pyspark.sql.functions import corr

# Calculate correlation between `median_house_value` and `median_income`
df.select(corr('median_house_value', 'median_income')).show()

+---------------------------------------+
|corr(median_house_value, median_income)|
+---------------------------------------+
|                     0.6289971929399653|
+---------------------------------------+



One important aspect of feature engineering is understanding the relationships between different features. Here, we calculate the correlation between the `median_house_value` and `median_income` columns using the `corr` function from `pyspark.sql.functions`. The correlation coefficient is a statistical measure of the strength of the relationship between the relative movements of two variables. The values range between -1.0 and 1.0, where 1 means a strong positive relationship, -1 means a strong negative relationship, and 0 means no relationship.


In [8]:
# Create a pivot table
pivot_df = df.groupBy("ocean_proximity").pivot("housing_median_age").mean("median_house_value")
pivot_df.show()

+---------------+--------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|oce

Pivot tables are a useful tool to summarize and analyze large datasets. Here, we create a pivot table using the `groupBy` and `pivot` methods on our DataFrame, with `ocean_proximity` as the index, `housing_median_age` as the column, and the average `median_house_value` as the values. The resulting DataFrame shows the average median house value for each combination of ocean proximity and housing median age.


In [9]:
from pyspark.sql.window import Window
from pyspark.sql.functions import avg

# Define a window partitioned by 'housing_median_age'
window = Window.partitionBy('housing_median_age')

# Add a new column 'avg_price_by_age' which is the average price for each 'housing_median_age'
df = df.withColumn('avg_price_by_age', avg('median_house_value').over(window))

# Show some data
df.show(10)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+------------------------+-------------------+---------------------+-----------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|rooms_per_household|population_per_household|  bedrooms_per_room|ocean_proximity_index| avg_price_by_age|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+------------------------+-------------------+---------------------+-----------------+
|  -117.95|   35.08|               1.0|       83.0|          15.0|      32.0|      15.0|        4.875|          141700.0|         INLAND|  5.533333333333333|      2.1333333333333333|0.18072289156626506|                  1.0|         144300.0|
|  -116.95|   33.86|        

Here we are using Spark's Window Functions to calculate the average price for each unique 'housing_median_age' group. This is done by defining a Window that is partitioned by 'housing_median_age', and then calculating the average 'median_house_value' over this Window.

The resulting 'avg_price_by_age' column shows us the average house price for houses of the same median age, providing us with more insight into the relationship between the age of the housing and the house prices.


## Save processed dataset

In [11]:
# Write the processed DataFrame to a new CSV file
df.write.csv("processed_housing.csv", header=True)

## Summary

After performing all the necessary steps to clean, transform, and enrich our data using Spark, we save our processed DataFrame to a new CSV file named "processed_housing.csv" using Spark's `write.csv` function.

This marks the end of our data processing phase. In this notebook, we loaded the California Housing dataset into a Spark DataFrame, inspected the data, and made some initial transformations. We handled missing values and outliers, encoded categorical variables, created some new features, and performed other transformations that will help in building our predictive model.

By utilizing Apache Spark, we were able to handle big data processing efficiently, which is crucial when working with large datasets like the one we have. The transformed dataset is now ready for further analysis and machine learning modeling, which will be performed in the next notebooks.

From a business perspective, all of these transformations were made to ensure that the dataset accurately represents the problem at hand - predicting housing prices in California. Understanding the data, making necessary adjustments, and creating new meaningful features are all important steps towards creating an accurate and robust predictive model that can aid various stakeholders in making informed decisions.