In [60]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, desc, col, to_date, year

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [73]:
print("Creating spark session")
spark = SparkSession.builder.appName("RedfinCityMarketAnalysis").getOrCreate()

#Revert to legacy time parser policy due to date conversion errors
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Creating spark session

In [None]:
#Load data into dataframe
s3_data = "s3://<s3-bucket>/input/city_market_tracker.tsv000.gz"
rf_df = spark.read.format("csv").option("header", True).option("sep", "\t").load(s3_data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
#Check Data
rf_df.show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+----------+---------------+-----------+--------------+--------+----------------------+--------------------+-----------------+--------------+----------+--------------------+----------------+-----------------+---------------------+---------------------+-----------------+---------------------+---------------------+-------------+---------------+---------------+----------------+--------------------+--------------------+----------+--------------+--------------+-------------+-----------------+-----------------+------------+----------------+----------------+---------+--------------+-------------+----------------+--------------------+--------------------+----------+--------------+--------------+----------------+--------------------+--------------------+---------------+-------------------+-------------------+-------------+---------------+---------------+-----------------------+---------------------------+---------------------------+-------------------+------------------------------

In [55]:
#Show dataframe schema
rf_df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- PERIOD_BEGIN: string (nullable = true)
 |-- PERIOD_END: string (nullable = true)
 |-- PERIOD_DURATION: string (nullable = true)
 |-- REGION_TYPE: string (nullable = true)
 |-- REGION_TYPE_ID: string (nullable = true)
 |-- TABLE_ID: string (nullable = true)
 |-- IS_SEASONALLY_ADJUSTED: string (nullable = true)
 |-- REGION: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- STATE_CODE: string (nullable = true)
 |-- PROPERTY_TYPE: string (nullable = true)
 |-- PROPERTY_TYPE_ID: string (nullable = true)
 |-- MEDIAN_SALE_PRICE: string (nullable = true)
 |-- MEDIAN_SALE_PRICE_MOM: string (nullable = true)
 |-- MEDIAN_SALE_PRICE_YOY: string (nullable = true)
 |-- MEDIAN_LIST_PRICE: string (nullable = true)
 |-- MEDIAN_LIST_PRICE_MOM: string (nullable = true)
 |-- MEDIAN_LIST_PRICE_YOY: string (nullable = true)
 |-- MEDIAN_PPSF: string (nullable = true)
 |-- MEDIAN_PPSF_MOM: string (nullable = true)
 |-- MEDIAN_PPSF_YOY: string (nul

In [77]:
#Select desired columns. Add Year field to dataframe
rf_df = rf_df["state_code","state","region","city","property_type","median_sale_price","median_list_price","period_end"].withColumn("parsed_end", to_date("period_end", "yyyy-MM-dd"))
rf_df = rf_df.withColumn("year", year("parsed_end"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [82]:
#Drop unneeded date columns
rf_df = rf_df.drop("period_end","parsed_end")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [93]:
#Group by state, year, aggregate median sale and list price
grouped_by_state = (
    rf_df.groupBy("state_code","year")
    .agg(avg("median_sale_price").alias("avg_median_sale"),
        avg("median_list_price").alias("avg_median_list")
    .orderBy(("state_code"),("year"))
    .filter(col("year") > "2022"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [98]:
grouped_by_state = grouped_by_state.withColumn("avg_diff", col("avg_median_sale") - col("avg_median_list"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [101]:
#Display states where sale price was higher than list price on average since 2022
grouped_by_state.where(col("avg_diff") > 0).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+----+------------------+------------------+------------------+
|state_code|year|   avg_median_sale|   avg_median_list|          avg_diff|
+----------+----+------------------+------------------+------------------+
|        CO|2023| 756466.0459969474| 735015.7816880406| 21450.26430890686|
|        CO|2024| 778642.3273206605| 748402.3110195674| 30240.01630109304|
|        CO|2025| 785511.0419508867| 768311.8050554016|17199.236895485083|
|        CT|2023|  654396.900865113| 601351.6179937952| 53045.28287131782|
|        CT|2024| 638235.5990566037| 628795.9568158168| 9439.642240786925|
|        DC|2023|        759530.275| 639512.4166666666| 120017.8583333334|
|        DC|2024| 768338.1083333333| 627062.4166666666|141275.69166666665|
|        DC|2025|          813661.8|          661389.9|152271.90000000002|
|        HI|2024| 1096955.230442426|1032818.2059026579|   64137.024539768|
|        HI|2025|1063005.5293066476| 987196.4507989908| 75809.07850765681|
|        NE|2025| 537441.

In [None]:
#Save the dataframes to S3, include headers
s3_output_csv = "s3://<bucket>/output/grouped_by_state.csv"
s3_output_parquet = "s3://<bucket>/output/rf_df.parquet"
grouped_by_state.write.mode("overwrite").option("header",True).csv(s3_output_csv)
grouped_by_state.write.mode("overwrite").option("header",True).parquet(s3_output_parquet)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…