In [1]:
import os
# Find the latest version of spark 3.x  from https://downloads.apache.org/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.5.1'
spark_version = 'spark-3.5.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Get:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Get:3 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:5 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Hit:6 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:7 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:8 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:9 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Get:10 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [127 kB]
Get:11 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [874 kB]
Get:12 http://security.ubuntu.com/ubuntu jammy-security/main amd64 Packages [1,857 kB]
Get:13 http://archive.ubuntu.c

In [2]:
 # Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [3]:
 # Read in real_estate_sales.csv data from the S3 Bucket realestateproj
from pyspark import SparkFiles
url ="https://realestateproj.s3.ap-southeast-2.amazonaws.com/real-estate-sales.csv"
spark.sparkContext.addFile(url)
df_real_estate_sales = spark.read.csv(SparkFiles.get("real-estate-sales.csv"), header=True, inferSchema=True)

# Show DataFrame
df_real_estate_sales.show(3)

+-------------+---------+-------------+-------+---------------+--------------+-----------+-----------+-------------+----------------+------------+----------------+-----------+--------------------+
|Serial Number|List Year|Date Recorded|   Town|        Address|Assessed Value|Sale Amount|Sales Ratio|Property Type|Residential Type|Non Use Code|Assessor Remarks|OPM remarks|            Location|
+-------------+---------+-------------+-------+---------------+--------------+-----------+-----------+-------------+----------------+------------+----------------+-----------+--------------------+
|      2020348|     2020|   09/13/2021|Ansonia|230 WAKELEE AVE|      150500.0|   325000.0|      0.463|   Commercial|            NULL|        NULL|            NULL|       NULL|                NULL|
|        20002|     2020|   10/02/2020|Ashford|390 TURNPIKE RD|      253000.0|   430000.0|     0.5883|  Residential|   Single Family|        NULL|            NULL|       NULL|                NULL|
|       210317|

In [18]:
# check the dataframe and drop any with null values
print("total rows in the original dataset:", df_real_estate_sales.count())
df_real_estate_sales_clean = df_real_estate_sales[["List Year", "Town","Assessed Value","Sale Amount","Sales Ratio",
                                                  "Property Type", "Residential Type"]].dropna(how='any')
print("total non_null rows:", df_real_estate_sales_clean.count())
# rename the column names
columns_to_rename = {"List Year":"list_year", "Assessed Value":"assessed_value", "Sale Amount":"sale_amount", "Sales Ratio":"sales_ratio", "Property Type":"property_type", "Residential Type":"residential_type"}
for old_name, new_name in columns_to_rename.items():
  df_real_estate_sales_clean=df_real_estate_sales_clean.withColumnRenamed(old_name, new_name)
df_real_estate_sales_clean.show(3)

total rows in the original dataset: 1054159
total non_null rows: 660274
+---------+-------+--------------+-----------+-----------+-------------+----------------+
|list_year|   Town|assessed_value|sale_amount|sales_ratio|property_type|residential_type|
+---------+-------+--------------+-----------+-----------+-------------+----------------+
|     2020|Ashford|      253000.0|   430000.0|     0.5883|  Residential|   Single Family|
|     2021|   Avon|      329730.0|   805000.0|     0.4096|  Residential|   Single Family|
|     2020|   Avon|      130400.0|   179900.0|     0.7248|  Residential|           Condo|
+---------+-------+--------------+-----------+-----------+-------------+----------------+
only showing top 3 rows



In [7]:
 # Read in .csv data from the S3 Bucket realestateproj
url_affordableHousing ="https://realestateproj.s3.ap-southeast-2.amazonaws.com/Affordable_Housing_by_Town_2011-2023.csv"
spark.sparkContext.addFile(url_affordableHousing)
df_affordable_housing = spark.read.csv(SparkFiles.get("Affordable_Housing_by_Town_2011-2023.csv"), sep=",", header=True)

# Show DataFrame
df_affordable_housing.show(5)

+----+---------+-----------+------------+-------------------+------------------------+-----------------------------------+---------------------+--------------------+------------------+
|Year|Town Code|       Town|Census Units|Government Assisted|Tenant Rental Assistance| Single Family CHFA/ USDA Mortgages|Deed Restricted Units|Total Assisted Units|Percent Affordable|
+----+---------+-----------+------------+-------------------+------------------------+-----------------------------------+---------------------+--------------------+------------------+
|2020|        1|    Andover|        1317|                 18|                       1|                                 32|                    0|                  51|              3.87|
|2020|        2|    Ansonia|        8148|                349|                     764|                                147|                    0|                1260|             15.46|
|2020|        3|    Ashford|        1903|                 32|              

In [8]:
# check the dataframes and drop any with null values
print("total rows in the original dataset:", df_affordable_housing.count())
df_affordable_housing_clean = df_affordable_housing[["Year", "Town","Percent Affordable"]].dropna(how='any')
print("total non_null rows:", df_affordable_housing_clean.count())

total rows in the original dataset: 2194
total non_null rows: 2194


In [19]:
# rename the column names
columns_to_rename_2 = {"Year":"year", "Percent Affordable":"percent_affordable"}
for old_name, new_name in columns_to_rename_2.items():
  df_affordable_housing_clean=df_affordable_housing_clean.withColumnRenamed(old_name, new_name)
df_affordable_housing_clean.show(3)

+----+-------+------------------+
|year|   Town|percent_affordable|
+----+-------+------------------+
|2020|Andover|              3.87|
|2020|Ansonia|             15.46|
|2020|Ashford|              3.57|
+----+-------+------------------+
only showing top 3 rows



In [21]:
# Create a temporary view for the df_real_estate_sales_clean
df_real_estate_sales_clean.createOrReplaceTempView("real_estate_sales_table")
df_affordable_housing_clean.createOrReplaceTempView("affordable_housing_table")

In [29]:
# Join the tables to get the Percent Affordable into the real estate sales table
import time
start_time = time.time()

joined_table = spark.sql("""
select t1.*, t2.percent_affordable
from real_estate_sales_table t1
left join affordable_housing_table t2
    on t1.list_year=t2.year
    AND t1.Town=t2.Town
""")

print("--- %s seconds ---" % (time.time() - start_time))


--- 0.03372764587402344 seconds ---


In [30]:
# check the joined_table
joined_table.show(3)

+---------+-------+--------------+-----------+-----------+-------------+----------------+------------------+
|list_year|   Town|assessed_value|sale_amount|sales_ratio|property_type|residential_type|percent_affordable|
+---------+-------+--------------+-----------+-----------+-------------+----------------+------------------+
|     2020|Ashford|      253000.0|   430000.0|     0.5883|  Residential|   Single Family|              3.57|
|     2021|   Avon|      329730.0|   805000.0|     0.4096|  Residential|   Single Family|              4.09|
|     2020|   Avon|      130400.0|   179900.0|     0.7248|  Residential|           Condo|              4.11|
+---------+-------+--------------+-----------+-----------+-------------+----------------+------------------+
only showing top 3 rows



In [33]:
# Check the joined table
non_null_count = joined_table.filter(joined_table["percent_affordable"].isNotNull()).count()
print(non_null_count)
joined_table_clean = joined_table.dropna(how='any')
print("total non_null rows:", joined_table_clean.count())

485285
total non_null rows: 485285


In [34]:
# convert the joined table to Pandas DataFrame to use sklearn for modelling
joined_table_df = joined_table.toPandas()
joined_table_df.head(3)

Unnamed: 0,list_year,Town,assessed_value,sale_amount,sales_ratio,property_type,residential_type,percent_affordable
0,2020,Ashford,253000.0,430000.0,0.5883,Residential,Single Family,3.57
1,2021,Avon,329730.0,805000.0,0.4096,Residential,Single Family,4.09
2,2020,Avon,130400.0,179900.0,0.7248,Residential,Condo,4.11
