In [1]:
import findspark
findspark.init()
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, r2_score
from pyspark.sql.functions import col, to_date, unix_timestamp
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score, confusion_matrix, accuracy_score, classification_report
import numpy as np
from pyspark.sql import SparkSession
import time

In [2]:
# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/29 20:30:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
# Read in the data into a DataFrame.
from pyspark import SparkFiles
url = "http://abc11274.sg-host.com/wp-content/uploads/2024/07/realtor-data.zip.csv"
spark.sparkContext.addFile(url)

df = spark.read.csv(SparkFiles.get("realtor-data.zip.csv"), sep=",", header=True)

df.show()

24/07/29 20:31:27 WARN SparkContext: The path http://abc11274.sg-host.com/wp-content/uploads/2024/07/realtor-data.zip.csv has been added already. Overwriting of added paths is not supported in the current version.
                                                                                

+-----------+--------+--------+---+----+--------+---------+-------------+-----------+--------+----------+--------------+
|brokered_by|  status|   price|bed|bath|acre_lot|   street|         city|      state|zip_code|house_size|prev_sold_date|
+-----------+--------+--------+---+----+--------+---------+-------------+-----------+--------+----------+--------------+
|   103378.0|for_sale|105000.0|  3|   2|    0.12|1962661.0|     Adjuntas|Puerto Rico|   00601|     920.0|          null|
|    52707.0|for_sale| 80000.0|  4|   2|    0.08|1902874.0|     Adjuntas|Puerto Rico|   00601|    1527.0|          null|
|   103379.0|for_sale| 67000.0|  2|   1|    0.15|1404990.0|   Juana Diaz|Puerto Rico|   00795|     748.0|          null|
|    31239.0|for_sale|145000.0|  4|   2|     0.1|1947675.0|        Ponce|Puerto Rico|   00731|    1800.0|          null|
|    34632.0|for_sale| 65000.0|  6|   2|    0.05| 331151.0|     Mayaguez|Puerto Rico|   00680|      null|          null|
|   103378.0|for_sale|179000.0| 

## Data Cleaning

In [6]:
# Create a temporary view of the DataFrame.
df.createOrReplaceTempView("real_estate_sales")

In [7]:
df.printSchema()

root
 |-- brokered_by: string (nullable = true)
 |-- status: string (nullable = true)
 |-- price: string (nullable = true)
 |-- bed: string (nullable = true)
 |-- bath: string (nullable = true)
 |-- acre_lot: string (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- house_size: string (nullable = true)
 |-- prev_sold_date: string (nullable = true)



In [None]:
# Converting to pandas dataframe
pandas_df = df.toPandas()

# Dropping columns
pandas_df.head()


In [None]:
# Checking data types
print(pandas_df.dtypes)

## Machine Learning

### Encoding

In [None]:
# Exporting encoded data
encoded_df_full = pd.get_dummies(pandas_df)
encoded_df_full = encoded_df_full.astype(int)
encoded_df_full.to_csv('Resources/encoded_data_full.csv', index=False)

# Creating & exporting sample encoded data for testing
encoded_df_sample = encoded_df_full.head(50000) # keeping first 50,000 rows
encoded_df_sample.to_csv('Resources/encoded_df_sample.csv', index=False)

### Training

In [None]:
# Prepare the data
X = encoded_df_sample.drop(columns='Sale Amount') # swap with encoded_df_full to improve accuracy
y = encoded_df_sample['Sale Amount']

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Standardize the features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# Create a random forest classifier
rf_model = RandomForestRegressor(n_estimators=100, random_state=42)

# Fitting the model
rf_model.fit(X_train_scaled, y_train)

### Making Predictions

In [None]:
# Make predictions
predictions = rf_model.predict(X_test_scaled)

### Model Evaluation

In [None]:
# Evaluate the model
mae = mean_absolute_error(y_test, predictions)
mse = mean_squared_error(y_test, predictions)
r2 = r2_score(y_test, predictions)

print(f"Mean Absolute Error (MAE): {mae}")
print(f"Mean Squared Error (MSE): {mse}")
print(f"R-squared (R²): {r2}")