### AIT 614 - Big Data Essentials <br>
#### Final Project: House Pricing Analysis
#### Notebook 1

Course Section #: AIT 614-DL2<br>
Team 5: Nafisa Ahmed, Peter Bishay, Charles Gilbertson, Sneha Kumaran, Cora Sula<br>

### Data Cleansing/Exploratory Analysis

In [0]:
# import dataset
housing_data = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/nahmed27@gmu.edu/DC_Properties.csv", inferSchema = "true") #added inferschema to produce the ML algorithm

# display the first few rows of the dataset
display(housing_data)

In [0]:
# Display the total rows of the dataset
housing_data.count()

In [0]:
# display summary/descriptive statistics
display(housing_data.summary())

In [0]:
# display the data schema 
housing_data.printSchema()

In [0]:
# drop unnecessary columns
column_to_drop = ["_c0","SALE_NUM", "FULLADDRESS","CITY","STATE","ZIPCODE", "NATIONALGRID","LATITUDE","LONGITUDE","ASSESSMENT_NBHD", "ASSESSMENT_SUBNBHD", "CENSUS_TRACT", "CENSUS_BLOCK", "SQUARE", "X", "Y", "CMPLX_NUM", "USECODE", "GIS_LAST_MOD_DTTM"]            

housing_data = housing_data.drop(*column_to_drop)

In [0]:
# check for NULL Values
from pyspark.sql.functions import col, count, when
null_counts = housing_data.select([count(when(col(c).isNull(), c)).alias(c) for c in housing_data.columns])

# Display the null value counts
display(null_counts)

In [0]:
# drop the NULL values for the 'PRICE' variable
housing_data = housing_data.dropna(subset=["PRICE"])

In [0]:
# Display total rows for the data
housing_data.count()

In [0]:
# check for NULL Values again
null_counts = housing_data.select([count(when(col(c).isNull(), c)).alias(c) for c in housing_data.columns])

# Display the null counts row
display(null_counts)

In [0]:
# display the data schema 
housing_data.printSchema()

In [0]:
# display the first few rows of the dataset
display(housing_data)

### Queries

In [0]:
# Create a temporary view of the housing_data dataframe 

housing_data.createOrReplaceTempView("dc_properties")

In [0]:
%sql
select * from dc_properties 

In [0]:
%sql
SELECT AVG(PRICE) AS average_price FROM dc_properties;

In [0]:
%sql
SELECT QUADRANT, AVG(PRICE) AS average_price
FROM dc_properties
WHERE PRICE IS NOT NULL AND QUADRANT IS NOT NULL
GROUP BY QUADRANT;

In [0]:
%sql
SELECT QUADRANT, STRUCT, AVG(PRICE) AS average_price
FROM dc_properties
WHERE PRICE IS NOT NULL AND QUADRANT IS NOT NULL AND STRUCT IS NOT NULL
GROUP BY QUADRANT, STRUCT;


### Visualizations

In [0]:
# convert spark data frame to a pandas data frame to create visualizations
housing_data_pd = housing_data.toPandas()

#display pandas data
housing_data_pd.head()

In [0]:
#Correlation matrix
housing_data_pd.corr()

In [0]:
# Correlation Heat Map
import plotly.express as px

fig_cm = px.imshow(
    housing_data_pd.corr().round(2),
    color_continuous_scale= 'sunset',
    text_auto=True,
    template= 'ggplot2',
    title= "House Prediction Correlation Heatmap"
)

# Make the figure bigger
fig_cm.update_layout(
    height=800,  # set the height in pixels
    width=800    # set the width in pixels
)

# Show the figure
fig_cm.show()


In [0]:
# Visualize which Quadrant in DC had the most home buyers
import seaborn as sns
import matplotlib.pyplot as plt

sns.set(style="whitegrid")
plt.figure(figsize=(15, 7))
sns.countplot(data=housing_data_pd, x='QUADRANT', order=housing_data_pd['QUADRANT'].value_counts(ascending=False).index)

plt.xlabel( "Quadrant" , size = 12, weight='bold') # Set label for x-axis 

plt.ylabel( "Count" , size = 12, weight='bold') # Set label for y-axis 
  
plt.title( "Homebuyer Count by Quadrant" , size = 14, weight='bold') # Set title for plot 

plt.show()

In [0]:
# Visualize which Ward in DC had the most home buyers
sns.set(style="whitegrid")
plt.figure(figsize=(15, 7))
sns.countplot(data=housing_data_pd, x='WARD', order=housing_data_pd['WARD'].value_counts(ascending=False).index)

plt.xlabel( "Ward" , size = 12, weight='bold') # Set label for x-axis 

plt.ylabel( "Count" , size = 12, weight='bold') # Set label for y-axis 
  
plt.title( "Homebuyer Count by Ward" , size = 14, weight='bold') # Set title for plot 

plt.show()

In [0]:
# Visualize which Structure in DC had the most home buyers
sns.set(style="whitegrid")
plt.figure(figsize=(15, 7))
sns.countplot(data=housing_data_pd, x='STRUCT', order=housing_data_pd['STRUCT'].value_counts(ascending=False).index)

plt.xlabel( "Structure" , size = 12, weight='bold') # Set label for x-axis 

plt.ylabel( "Count" , size = 12, weight='bold') # Set label for y-axis 
  
plt.title( "Homebuyer Count by Structure" , size = 14, weight='bold') # Set title for plot 

plt.show()

In [0]:
# Time Series Analysis - Sale Date on the Average of the Prices
import pandas as pd

#Converting the 'SALEDATE' column to datetime format
housing_data_pd['SALEDATE'] = pd.to_datetime(housing_data_pd['SALEDATE'])

# 'PRICE' column will be cleaned and it will remove non-numeric characters or other data types
housing_data_pd['PRICE'] = pd.to_numeric(housing_data_pd['PRICE'], errors='coerce')

# Dropping the rows with no values for 'PRICE'
housing_data_pd.dropna(subset=['PRICE'], inplace=True)

# Data is assorted in ascending order
housing_data_pd.sort_values('SALEDATE', inplace=True)

# Sales by the date will be grouped by the average of the price
sales_date = housing_data_pd.groupby('SALEDATE')['PRICE'].mean().reset_index()

# Plotting the timeseries of the overall average prices based on the date and time
plt.figure(figsize=(12, 6))
plt.plot(sales_date['SALEDATE'], sales_date['PRICE'], marker='o', linestyle='-')
plt.title('Average Home Prices Over Time')
plt.xlabel('Date')
plt.ylabel('Average Price')
plt.show()


In [0]:
# House Prices by Property Grade

plt.figure(figsize=(20, 6))
sns.boxplot(x='GRADE', y='PRICE', data=housing_data_pd)
plt.title('House Prices by Property Grade')
plt.xlabel('Grade')
plt.ylabel('Price')
plt.show()


In [0]:
# Houses Prices by Property Structure

plt.figure(figsize=(20, 6))
sns.boxplot(x='STRUCT', y='PRICE', data=housing_data_pd)
plt.title('House Prices by Property Structure')
plt.xlabel('Structure')
plt.ylabel('Price')
plt.show()

In [0]:
# House Prices by Living Area

plt.figure(figsize=(8, 6))
plt.scatter(housing_data_pd['LIVING_GBA'], housing_data_pd['PRICE'], alpha=0.5)
plt.title('House Prices vs. Living Area')
plt.xlabel('Living Area')
plt.ylabel('Price')
plt.show()


In [0]:
# Visualize which the type of house features have the highest value count

# categorical columns chosen to portray the type of house features that has the highest value of count
categorical_columns = ['STYLE', 'STRUCT', 'GRADE', 'CNDTN', 'EXTWALL', 'ROOF', 'INTWALL']

# Filtering
categorical_data = housing_data_pd[categorical_columns]

# Subplots
num_plots = len(categorical_columns)
plt.figure(figsize=(15, 10))

for i, column in enumerate(categorical_data):
    plt.subplot((num_plots // 2) + 1, 2, i + 1)
    sns.countplot(data=categorical_data, x=column, order=categorical_data[column].value_counts().index)
    plt.title(f'Count of {column}')
    plt.xlabel('')
    plt.ylabel('Count')
    plt.xticks(rotation=45)  # Rotate x-axis labels for better readability

plt.tight_layout()
plt.show()

In [0]:
# Remove the outliers in the variables: ROOMS, BATHRM, HF_BATHRM, and BEDRM 
housing_data = housing_data.filter((col("ROOMS") < 100) & (col("ROOMS") >= col("BEDRM")) & (col("BATHRM") < 24))

display(housing_data)

### Feature Preprocessing/Training & Test Data Split

In [0]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, Imputer
from pyspark.ml import Pipeline

# Define categorical columns
categorical_cols = ['HEAT', 'AC', 'QUALIFIED', 'STYLE', 'STRUCT', 'GRADE', 'CNDTN', 'EXTWALL', 'ROOF', 'INTWALL', 'SOURCE', 'WARD', 'QUADRANT']

# Create a list of StringIndexers for categorical columns
indexers = [StringIndexer(inputCol=col, outputCol=col + '_index', handleInvalid='keep') for col in categorical_cols]

# Create a list of OneHotEncoders for indexed categorical columns
encoders = [OneHotEncoder(inputCol=col + '_index', outputCol=col + '_encoded') for col in categorical_cols]

# Define the numeric columns
numeric_cols = ['BATHRM', 'HF_BATHRM', 'NUM_UNITS', 'ROOMS', 'BEDRM', 'AYB', 'YR_RMDL', 'EYB', 'STORIES',
                'GBA', 'BLDG_NUM', 'KITCHENS', 'FIREPLACES', 'LANDAREA', 'LIVING_GBA']

# Impute missing values in numeric columns using the mean
imputer = Imputer(inputCols=numeric_cols, outputCols=[col + "_imputed" for col in numeric_cols], strategy="mean")

# Assemble all features into a single vector
assembler = VectorAssembler(inputCols=[col + "_imputed" for col in numeric_cols] + [col + '_encoded' for col in categorical_cols],
                            outputCol='features')

# Define the label column
label = 'PRICE'

# Create a pipeline
pipeline = Pipeline(stages=indexers + encoders + [imputer, assembler])

# Fit the pipeline to the data
pipeline_model = pipeline.fit(housing_data)

# Transform the data
transformed_data = pipeline_model.transform(housing_data)

# Show the transformed data
transformed_data.select('features', label).show()

In [0]:
# display the first few rows of the transformed dataset
display(transformed_data)

In [0]:
# check for NULL Values after feature processing
null_counts = transformed_data.select([count(when(col(c).isNull(), c)).alias(c) for c in transformed_data.columns])

# Display the null counts row
display(null_counts)

In [0]:
# Create training and test data sets
train_df, test_df = transformed_data.randomSplit([0.8, 0.2], seed=42)
print(train_df.cache().count())
print(test_df.count())

In [0]:
# Display training data
display(train_df)

In [0]:
# Display test data
display(test_df)

### Modeling and Prediction

#### Random Forest Prediction

In [0]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Create Random Forest object for regression
rf_regressor = RandomForestRegressor(featuresCol='features', labelCol='PRICE', numTrees=10, maxDepth=5, seed=42)

# Fit the Random Forest model to the training data
rf_model = rf_regressor.fit(train_df)

# Make predictions on the test data
predictions_rf = rf_model.transform(test_df)

# Evaluate the model using a regression evaluator - RMSE
evaluator = RegressionEvaluator(labelCol='PRICE', predictionCol='prediction', metricName='rmse')
rmse = evaluator.evaluate(predictions_rf)
print("Root Mean Squared Error:", rmse)

In [0]:
# Display the RMSE, R-Squared and Mean Squared Error for the Random Forest Regressor Model

# Evaluate the model - RMSE
rf_evaluator_rmse = RegressionEvaluator(labelCol='PRICE', predictionCol='prediction', metricName='rmse')
rf_rmse = rf_evaluator_rmse.evaluate(predictions_rf)
print("Root Mean Squared Error:", rf_rmse)

# Evaluate the model - R-Sqaured
rf_evaluator_r2 = RegressionEvaluator(labelCol='PRICE', predictionCol='prediction', metricName='r2')
rf_r2 = rf_evaluator_r2.evaluate(predictions_rf)

print("R-squared:", rf_r2)

# Evaluate the model - Mean Squared Error
rf_mse = evaluator.evaluate(predictions_rf, {evaluator.metricName: 'mse'})
print("Mean Squared Error:", rf_mse)

In [0]:
# Show the predictions for the Random Forest model
predictions_rf.select('features', 'PRICE', 'prediction').show()

In [0]:
#Determine Feature Importance for the Random Forest Model
importances_rf = rf_model.featureImportances.toArray()

# Get feature names from the vector assembler
feature_names = assembler.getInputCols()

# Create a Pandas DataFrame with feature names and importance scores
importances_df_pd_rf = pd.DataFrame(list(zip(feature_names, importances_rf)), columns=["Attribute_rf", "Importance_rf"])

# Create a Spark DataFrame from the Pandas DataFrame
importances_df = spark.createDataFrame(importances_df_pd_rf)

# Sort the DataFrame by importance in descending order
importances_df = importances_df.sort("Importance_rf", ascending=False)

# Show the result
importances_df.show()

In [0]:
# Visalizing the feature importance for Random Forest using a bar plot.
importances_df_pd = importances_df.toPandas()

plt.figure(figsize=(10, 6))
plt.bar(importances_df_pd['Attribute_rf'], importances_df_pd['Importance_rf'], color='blue')
plt.xlabel('Feature')
plt.ylabel('Importance')
plt.title('Random Forest Feature Importance')
plt.xticks(rotation=45, ha='right') 
plt.tight_layout()
plt.show()

#### Linear Regression Prediction

In [0]:
# Rename the 'PRICE' column to 'label' in both the training and test data sets to run Linear Regression in Pyspark

train_data = train_df.withColumnRenamed('PRICE', 'label')
test_data = test_df.withColumnRenamed('PRICE', 'label')

# Display training data
display(train_data)

# Display test data
display(test_data)

In [0]:
# Linear Regression using Pyspark
from pyspark.ml.regression import LinearRegression

# Create a Linear Regression object
lr = LinearRegression(featuresCol="features", labelCol="label")

# Fit the model to the training data
lr_model = lr.fit(train_data)

# Make predictions on the test data
predictions = lr_model.transform(test_data)

# Evaluate the model
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)

print(f"Root Mean Squared Error (RMSE): {rmse}")

In [0]:
# Display the RMSE, R-Squared and Mean Squared Error for the Linear Regression Model

# Evaluate the model - RMSE
reg_evaluator_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
regression_rmse = reg_evaluator_rmse.evaluate(predictions)

print(f"Root Mean Squared Error (RMSE): {regression_rmse}")

# Evaluate the model - R-Squared
reg_evaluator_r2 = RegressionEvaluator(labelCol='label', predictionCol='prediction', metricName='r2')
regression_r2 = reg_evaluator_r2.evaluate(predictions)

print("R-squared:", regression_r2)

# Evaluate the model - Mean Squared Error
reg_evaluator_mse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="mse")
regression_mse = reg_evaluator_mse.evaluate(predictions)
print(f"Mean Squared Error (MSE): {regression_mse}")

In [0]:
# Show the predictions for the Linear Regression model
predictions.select('features', 'label', 'prediction').show()

In [0]:
#Determine the feature importance for the Linear Regression Model (pyspark)

# Get coefficients from the model
coefficients = lr_model.coefficients.toArray()

# Get feature names from the vector assembler
feature_names_lr = assembler.getInputCols()

# Create a Pandas DataFrame with feature names and coefficients
importances_df_pd_lr = pd.DataFrame(list(zip(feature_names_lr, coefficients)), columns=["Attribute_lr", "Coefficient_lr"])

# Sort the DataFrame by absolute coefficient values in descending order
importances_df_pd_lr = importances_df_pd_lr.assign(Absolute_Coefficient_lr=lambda x: abs(x['Coefficient_lr']))
importances_df_pd_lr = importances_df_pd_lr.sort_values(by='Absolute_Coefficient_lr', ascending=False)

print(importances_df_pd_lr)

In [0]:
# # Visalizing the feature importance for Liinear Regression using a bar plot.
importances_df_pd_lr = importances_df_pd_lr.sort_values(by='Absolute_Coefficient_lr', ascending=False)

plt.figure(figsize=(10, 6))
plt.bar(importances_df_pd_lr['Attribute_lr'], importances_df_pd_lr['Absolute_Coefficient_lr'], color='green')
plt.xlabel('Feature')
plt.ylabel('Absolute Coefficient')
plt.title('Linear Regression Feature Importance')
plt.xticks(rotation=45, ha='right') 
plt.tight_layout()
plt.show()

In [0]:
#Here is a simple linear regression model using sklearn that prints the accuracy and r^2 score in comparison to the other one. It looks like that the linear regression algorithm is not a good fit to this dataset. 

from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error, r2_score

# 'PRICE' is the target variable
#Selected few categorical features based on the dataset
selected_features = ['BATHRM', 'HF_BATHRM', 'ROOMS', 'BEDRM', 'AYB', 'YR_RMDL', 'EYB', 'STORIES', 'GBA', 'FIREPLACES', 'LANDAREA']

# Filtering the PRICE column 
data_for_modeling = housing_data_pd[selected_features + ['PRICE']].copy()

# Dropped the rows with missing values
data_for_modeling.dropna(inplace=True)

# Split the data into training and testing sets (80%/20%)
X = data_for_modeling[selected_features]
y = data_for_modeling['PRICE']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# fitting the training set into the Linear Regression model
model = LinearRegression()
model.fit(X_train, y_train)

# Predictions
y_pred = model.predict(X_test)


#Accuracy
accuracy = model.score(X_test, y_test)
print(f"Model Accuracy: {accuracy}")

# Model evaluation
mse = mean_squared_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)


print(f"Mean Squared Error (MSE): {mse}")
print(f"R-squared (R2): {r2}")

In [0]:
# Predictions for the model
display(y_pred)

#### Decision Tree Prediction

In [0]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Create a Decision Tree Regressor
dt = DecisionTreeRegressor(featuresCol="features", labelCol="label")

# Fit the model to the training data
dt_model = dt.fit(train_data)

# Make predictions on the test data
predictions = dt_model.transform(test_data)

# Evaluate RMSE
reg_evaluator_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
regression_rmse = reg_evaluator_rmse.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {regression_rmse}")

# Evaluate R-Squared
reg_evaluator_r2 = RegressionEvaluator(labelCol='label', predictionCol='prediction', metricName='r2')
regression_r2 = reg_evaluator_r2.evaluate(predictions)
print("R-squared:", regression_r2)

# Evaluate Mean Squared Error
reg_evaluator_mse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="mse")
regression_mse = reg_evaluator_mse.evaluate(predictions)
print(f"Mean Squared Error (MSE): {regression_mse}")

In [0]:
# Show the predictions for the Decision Tree model
predictions.select('features', 'label', 'prediction').show()

In [0]:
# Get feature importances from the model
feature_importances = dt_model.featureImportances.toArray()

# Get feature names from the vector assembler
feature_names_dt = assembler.getInputCols()

# Create a Pandas DataFrame with feature names and importances
importances_df_pd_dt = pd.DataFrame(list(zip(feature_names_dt, feature_importances)), columns=["Attribute_dt", "Importance_dt"])

# Sort the DataFrame by importance values in descending order
importances_df_pd_dt = importances_df_pd_dt.sort_values(by='Importance_dt', ascending=False)

print(importances_df_pd_dt)

In [0]:
# Get feature importances from the model
feature_importances = dt_model.featureImportances.toArray()

# Get feature names from the vector assembler
feature_names_dt = assembler.getInputCols()

# Create a Pandas DataFrame with feature names and importances
importances_df_pd_dt = pd.DataFrame(list(zip(feature_names_dt, feature_importances)), columns=["Attribute", "Importance"])

# Sort the DataFrame by importance values in descending order
top_importances_df_pd_dt = importances_df_pd_dt.sort_values(by='Importance', ascending=False).head(10)

# Plotting
plt.figure(figsize=(10, 6))
plt.bar(top_importances_df_pd_dt['Attribute'], top_importances_df_pd_dt['Importance'], color='blue')
plt.xlabel('Features')
plt.ylabel('Importance')
plt.title('Feature Importances from Decision Tree')
plt.xticks(rotation=45)
plt.show()

In [0]:
# Get the decision tree model
tree_model = dt_model

# Print out the decision tree
print(tree_model.toDebugString)