## Cleaning the Weekly Rental Data

### Create Spark Session

In [14]:
# Import necessary libraries
from pyspark.sql import SparkSession, functions as F

# Create Spark session
spark = (
    SparkSession.builder.appName('Weekly Rental Listings Preprocessing')
    .config("spark.sql.repl.eagerEval.enabled", True)  # display full dataframe in console
    .config("spark.sql.parquet.cacheMetadata", "true") # cache parquet metadata
    .config("spark.sql.session.timeZone", "Etc/UTC") # set timezone to UTC
    # memory configurations - hopefully will reduce crashing
    .config("spark.driver.memory", "4g") # set driver memory
    .config("spark.executor.memory", "4g") # set executor memory
    .getOrCreate()
)

In [15]:
# set download path
data_path = "../data/raw/domain/rental_listings_*.csv"

# download data from raw/domain
sdf = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv(data_path)
    .withColumn("source_file", F.input_file_name()) # add source file column
)

25/09/22 00:34:27 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: ../data/raw/domain/rental_listings_*.csv.
java.io.FileNotFoundException: File ../data/raw/domain/rental_listings_*.csv does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:917)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1238)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:907)
	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:462)
	at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:56)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:381)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.org$apache$spark$sql$catalyst$analysis$ResolveDataSource$$loadV1BatchSource(ResolveDataSource.scala:143)
	at org.apache

In [16]:
# extract year and quarter from filename using regex
sdf = (
    sdf.withColumn("year", F.regexp_extract(F.col("source_file"), r"_(\d{4})_", 1).cast("int"))
    .withColumn("quarter", F.regexp_extract(F.col("source_file"), r"_(\d{2}).csv", 1).cast("int"))
    .drop("source_file")
)

In [17]:
sdf.cache

<bound method DataFrame.cache of +-----------+------------+------------+------------+--------------------+-----------------+----------------+------------------+---------+--------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------+--------------------+--------------------+--------------------+----+-------+
|age_0_to_19|age_20_to_39|age_40_to_59| age_60_plus|        

In [18]:
sdf.printSchema()

root
 |-- age_0_to_19: string (nullable = true)
 |-- age_20_to_39: string (nullable = true)
 |-- age_40_to_59: string (nullable = true)
 |-- age_60_plus: string (nullable = true)
 |-- agency_name: string (nullable = true)
 |-- agent_name: string (nullable = true)
 |-- appointment_only: string (nullable = true)
 |-- avg_days_on_market: string (nullable = true)
 |-- bathrooms: string (nullable = true)
 |-- bedrooms: string (nullable = true)
 |-- car_spaces: string (nullable = true)
 |-- description: string (nullable = true)
 |-- family_percentage: string (nullable = true)
 |-- features_list: string (nullable = true)
 |-- first_listed_date: string (nullable = true)
 |-- full_address: string (nullable = true)
 |-- image_urls: string (nullable = true)
 |-- inspection_text: string (nullable = true)
 |-- land_area: string (nullable = true)
 |-- last_sold_date: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- listing_status: string (nullable = true)
 |-- listing_tag: strin

In [19]:
sdf.show(1, truncate=100, vertical=True)

-RECORD 0-------------------------------------------------------------------------------------------------------------------
 age_0_to_19         | 0.142523363                                                                                          
 age_20_to_39        | 0.5                                                                                                  
 age_40_to_59        | 0.226635516                                                                                          
 age_60_plus         | 0.130841121                                                                                          
 agency_name         | RT Edgar - Northside                                                                                 
 agent_name          | Lily Passarelli                                                                                      
 appointment_only    | False                                                                                                


### Schemas

We need to first fix the schemas, as spark inferred that most are string types. Let's fix this by casting the features to the correct datatypes

In [None]:
from pyspark.sql.types import (
    StructType, StructField,
    StringType, IntegerType, DoubleType, 
    DateType, BooleanType, TimestampType
)

# define the expected schema
defined_schema = StructType([
    StructField("age_0_to_19", DoubleType(), True),
    StructField("age_20_to_39", DoubleType(), True),
    StructField("age_40_to_59", DoubleType(), True),
    StructField("age_60_plus", DoubleType(), True),
    StructField("agency_name", StringType(), True),
    StructField("agent_name", StringType(), True),
    StructField("appointment_only", BooleanType(), True),
    StructField("avg_days_on_market", DoubleType(), True),
    StructField("bathrooms", IntegerType(), True),
    StructField("bedrooms", IntegerType(), True),
    StructField("car_spaces", IntegerType(), True),
    StructField("description", StringType(), True),
    StructField("family_percentage", DoubleType(), True),
    StructField("features_list", StringType(), True),
    StructField("first_listed_date", TimestampType(), True),
    StructField("full_address", StringType(), True),
    StructField("image_urls", StringType(), True),
    StructField("inspection_text", StringType(), True),
    StructField("land_area", DoubleType(), True),
    StructField("last_sold_date", DateType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("listing_status", StringType(), True),
    StructField("listing_tag", StringType(), True),
    StructField("listing_url", StringType(), True),
    StructField("long_term_resident", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("median_rent_price", IntegerType(), True),
    StructField("median_sold_price", IntegerType(), True),
    StructField("number_of_photos", IntegerType(), True),
    StructField("number_sold", IntegerType(), True),
    StructField("owner_percentage", DoubleType(), True),
    StructField("postcode", IntegerType(), True),
    StructField("property_features", StringType(), True),
    StructField("property_id", IntegerType(), True),
    StructField("property_type", StringType(), True),
    StructField("rental_price", StringType(), True),
    StructField("renter_percentage", DoubleType(), True),
    StructField("schools", StringType(), True),
    StructField("single_percentage", DoubleType(), True),
    StructField("state_abbreviation", StringType(), True),
    StructField("street", StringType(), True),
    StructField("street_number", StringType(), True),
    StructField("structured_features", StringType(), True),
    StructField("suburb", StringType(), True),
    StructField("unit_number", StringType(), True),
    StructField("updated_date", DateType(), True),
    StructField("url", StringType(), True),
    StructField("year", IntegerType(), True),
    StructField("quarter", IntegerType(), True)
])

In [25]:
# Cast columns to the defined schema where possible
# if casting fails, the value will be set to null
casts = []
for field in defined_schema.fields:
    if isinstance(field.dataType, (StringType, IntegerType, DoubleType, DateType, BooleanType, TimestampType)):
        casts.append(
            F.expr(
                f"try_cast(`{field.name}` AS {field.dataType.simpleString()})"
            ).alias(field.name)
        )
    else:
        casts.append(F.col(field.name))

In [26]:
sdf = sdf.select(*casts)
required_cols = [field.name for field in defined_schema.fields if not field.nullable]
sdf = sdf.dropna(subset=required_cols)

In [29]:
sdf.count()

31289

In [27]:
sdf.printSchema()

root
 |-- age_0_to_19: double (nullable = true)
 |-- age_20_to_39: double (nullable = true)
 |-- age_40_to_59: double (nullable = true)
 |-- age_60_plus: double (nullable = true)
 |-- agency_name: string (nullable = true)
 |-- agent_name: string (nullable = true)
 |-- appointment_only: boolean (nullable = true)
 |-- avg_days_on_market: double (nullable = true)
 |-- bathrooms: integer (nullable = true)
 |-- bedrooms: integer (nullable = true)
 |-- car_spaces: integer (nullable = true)
 |-- description: string (nullable = true)
 |-- family_percentage: double (nullable = true)
 |-- features_list: string (nullable = true)
 |-- first_listed_date: timestamp (nullable = true)
 |-- full_address: string (nullable = true)
 |-- image_urls: string (nullable = true)
 |-- inspection_text: string (nullable = true)
 |-- land_area: double (nullable = true)
 |-- last_sold_date: date (nullable = true)
 |-- latitude: double (nullable = true)
 |-- listing_status: string (nullable = true)
 |-- listing_tag: 

In [28]:
sdf.show(1, truncate=100, vertical=True)

-RECORD 0-------------------------------------------------------------------------------------------------------------------
 age_0_to_19         | 0.142523363                                                                                          
 age_20_to_39        | 0.5                                                                                                  
 age_40_to_59        | 0.226635516                                                                                          
 age_60_plus         | 0.130841121                                                                                          
 agency_name         | RT Edgar - Northside                                                                                 
 agent_name          | Lily Passarelli                                                                                      
 appointment_only    | false                                                                                                
