In [1]:
# Import the necessary PySpark libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, month, date_format, regexp_replace

In [2]:
# Start a Spark session
spark = SparkSession.builder \
    .appName("RedfinHousingDataETL") \
    .getOrCreate()

In [3]:
# Source data URL
housing_by_city = "https://redfin-public-data.s3.us-west-2.amazonaws.com/redfin_market_tracker/city_market_tracker.tsv000.gz"

# Download and read the gzipped TSV file with Spark
def extract_data(url):
    # You must first download the file because Spark can't read HTTP URLs directly
    import urllib.request
    import os

    local_file = "city_market_tracker.tsv000.gz"
    if not os.path.exists(local_file):
        urllib.request.urlretrieve(url, local_file)

    # Load it with Spark
    df = spark.read.option("header", True) \
                   .option("sep", "\t") \
                   .csv(local_file)

    # Return the dataframe
    return df

In [4]:
# Call the extract function upon the data source
raw_df = extract_data(housing_by_city)

In [5]:
# Report the size of the raw dataset
print("Num of rows:", raw_df.count())
print("Num of cols:", len(raw_df.columns))

Num of rows: 5779946
Num of cols: 58


In [6]:
# Show the first 5 records of the raw data
raw_df.show(5)

+------------+----------+---------------+-----------+--------------+--------+----------------------+-------------+---------+---------+----------+--------------------+----------------+-----------------+---------------------+---------------------+-----------------+---------------------+---------------------+------------------+--------------------+--------------------+------------------+--------------------+--------------------+----------+--------------------+--------------------+-------------+--------------------+--------------------+------------+--------------------+--------------------+---------+--------------------+--------------------+------------------+--------------------+--------------------+----------+--------------+--------------+-------------------+--------------------+--------------------+-------------------+--------------------+--------------------+-------------------+--------------------+--------------------+-----------------------+---------------------------+---------------

In [11]:
# The function that would carry out the transformation of the Redfin data
def transform_redfin_data(df):
    # Select the 24 desired columns
    selected_cols = [
        'period_begin', 'period_end', 'period_duration', 'region_type', 'region_type_id', 'table_id',
        'is_seasonally_adjusted', 'city', 'state', 'state_code', 'property_type', 'property_type_id',
        'median_sale_price', 'median_list_price', 'median_ppsf', 'median_list_ppsf', 'homes_sold',
        'inventory', 'months_of_supply', 'median_dom', 'avg_sale_to_list', 'sold_above_list',
        'parent_metro_region_metro_code', 'last_updated'
    ]
    df = df.select(selected_cols)

    # Remove commas from 'city' field
    df = df.withColumn("city", regexp_replace(col("city"), ",", ""))

    # Drop records with any nulls
    df = df.dropna()

    # Convert 'period_begin' and 'period_end' to proper timestamps
    df = df.withColumn("period_begin", col("period_begin").cast("date"))
    df = df.withColumn("period_end", col("period_end").cast("date"))

    # Extract year from period_begin and period_end into new columns
    df = df.withColumn("period_begin_in_years", year(col("period_begin")))
    df = df.withColumn("period_end_in_years", year(col("period_end")))

    # Extract month name from period_begin and period_end into new columns
    df = df.withColumn("period_begin_in_months", date_format(col("period_begin"), "MMMM"))
    df = df.withColumn("period_end_in_months", date_format(col("period_end"), "MMMM"))

    # Return the dataframe
    return df

In [12]:
# Call the transform function upon the raw data
trasformed_df = transform_redfin_data(raw_df)

In [13]:
# Show the first 5 records of transformed data
trasformed_df.show(5)

+------------+----------+---------------+-----------+--------------+--------+----------------------+---------+---------+----------+--------------------+----------------+-----------------+-----------------+------------------+------------------+----------+---------+------------------+----------+-------------------+-------------------+------------------------------+-------------------+---------------------+-------------------+----------------------+--------------------+
|period_begin|period_end|period_duration|region_type|region_type_id|table_id|is_seasonally_adjusted|     city|    state|state_code|       property_type|property_type_id|median_sale_price|median_list_price|       median_ppsf|  median_list_ppsf|homes_sold|inventory|  months_of_supply|median_dom|   avg_sale_to_list|    sold_above_list|parent_metro_region_metro_code|       last_updated|period_begin_in_years|period_end_in_years|period_begin_in_months|period_end_in_months|
+------------+----------+---------------+-----------+---

In [14]:
# Report the size of the transformed dataset
print("Num of rows:", trasformed_df.count())
print("Num of cols:", len(trasformed_df.columns))

Num of rows: 4500063
Num of cols: 28


In [None]:
# Download the transformed dataset as Parquet file (not necessary for this purpose)
trasformed_df.write.mode("overwrite").parquet("Redfin_housing_data_by_City.parquet")