In [23]:
from pyspark.sql.functions import isnan, when, count, col, to_date, lit
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql.functions import date_format
from pyspark.sql.types import StructType, StructField, DoubleType
import datetime
from pyspark.sql.functions import expr
import os

In [2]:
from pyspark.sql import SparkSession, functions as F

# Create a spark session (which will run spark jobs)
spark = (
    SparkSession.builder.appName("MAST30034 Tutorial 2")
    .config("spark.sql.repl.eagerEval.enabled", True)
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config('spark.driver.memory', '4g')
    .config('spark.executor.memory', '2g')
    .getOrCreate()
)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/23 02:31:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/08/23 02:31:18 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [29]:
sdf1 = spark.read.parquet('../data/weather/weather_data.parquet')
sdf1.printSchema()
sdf1.show(1, vertical=True, truncate=100)
# check if there are any null values
missing_values = sdf1.agg(*[F.sum(F.when(F.col(c).isNull(), 1).otherwise(0))
                          .alias(c) for c in sdf1.columns])
missing_values.show()
# check if there are any null values

# cast date column to date type
sdf1 = sdf1.withColumn("DATE", col("DATE").cast("date"))

# also drop the PGTM column as it is not useful
sdf1 = sdf1.drop('PGTM')

# also as per industry practice convert to consistent casing
consistent_casing = [F.col(col_name).alias(col_name.lower())
                    for col_name in sdf1.columns]
sdf1 = sdf1.select(*consistent_casing)
sdf1.printSchema()
sdf1.show(1, vertical=True, truncate=100)

sdf1 = sdf1.withColumnRenamed('awnd', 'avg_wind_speed') \
            .withColumnRenamed('tavg', 'avg_temp') \
            .withColumnRenamed('tmax', 'max_temp') \
            .withColumnRenamed('tmin', 'min_temp') \
            .withColumnRenamed('prcp', 'precipitation') \
            .withColumnRenamed('snow', 'snowfall')

# create directory for raw weather data and store it there
if not os.path.exists('../data/weather_curated/'):
    os.mkdir('../data/weather_curated/')

# now write this file and if any other present overwrite it
sdf1.write.parquet('../data/weather_curated/weather_data.parquet',\
                 mode ='overwrite')

# there is no gap between weather raw and curated layer beacuse donot require
# much transformation

root
 |-- STATION: string (nullable = true)
 |-- NAME: string (nullable = true)
 |-- DATE: string (nullable = true)
 |-- AWND: double (nullable = true)
 |-- PGTM: double (nullable = true)
 |-- PRCP: double (nullable = true)
 |-- SNOW: double (nullable = true)
 |-- TAVG: double (nullable = true)
 |-- TMAX: double (nullable = true)
 |-- TMIN: double (nullable = true)

-RECORD 0-----------------------------------
 STATION | USW00094789                      
 NAME    | JFK INTERNATIONAL AIRPORT, NY US 
 DATE    | 2022-01-01                       
 AWND    | 2.8                              
 PGTM    | null                             
 PRCP    | 31.0                             
 SNOW    | 0.0                              
 TAVG    | 10.1                             
 TMAX    | 11.7                             
 TMIN    | 8.9                              
only showing top 1 row

+-------+----+----+----+----+----+----+----+----+----+
|STATION|NAME|DATE|AWND|PGTM|PRCP|SNOW|TAVG|TMAX|TMIN|
+-

## Lets begin some preprocessing on yellow taxi data

In [4]:
sdf = spark.read.parquet('../data/tlc/')
sdf_jan = spark.read.parquet('../data/tlc/2022-01.parquet')
sdf_jan.printSchema()
sdf_feb_23 = spark.read.parquet('../data/tlc/2023-02.parquet')
sdf_feb_23.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetim

In [5]:
def cast_to_schema(year, start, end, temp_schema):
    """ This function takes in the year, start and end month and the schema
    and casts the columns to the data type and saves it to the data/raw/
    directory. `year` is a string, `start` and `end` are integers months and
    `temp_schema` is a spark schema.
    This function is specific for the TLC data to store data to raw layer.
    """
    for month in range(start, end+1):
        input_path = f'../data/tlc/{year}-{str(month).zfill(2)}.parquet'
        output_path = f'../data/raw/{year}-{str(month).zfill(2)}.parquet'
        sdf_malformed = spark.read.parquet(input_path)
        sdf_malformed = sdf_malformed \
                        .select([F.col(c).cast(temp_schema[i].dataType)\
                        for i, c in enumerate(sdf_malformed.columns)])
        sdf_malformed.coalesce(1).write.mode('overwrite').parquet(output_path)

In [6]:
# see if there is any non-whole number passenger_count. So it can be removed
# before any proper data conversion can be made. Lets cast all data to 
# jan 2022 schema which contains passenger_count as "double"
tem_schema = sdf_jan.schema
tem_schema
cast_to_schema("2022", 1, 12, tem_schema)
cast_to_schema("2023", 1, 5, tem_schema)

                                                                                

In [7]:
# see if there is any non-whole number passenger_count
sdf = spark.read.parquet('../data/raw/*')
non_whole_count = sdf.filter(col("passenger_count") % 1 != 0).count()
print(f"Number of rows with non-whole number passenger : {non_whole_count}")



Number of rows with non-whole number passenger : 0


                                                                                

In [8]:
# well we are now sure there wasn't any non-whole number passenger_count
# so lets try to cast all datatype to 2023 february schema (proper schema)
sdf_feb_23.printSchema()

# also ensuring we have consistent casing
consistent_col_casing = [F.col(col_name).alias(col_name.lower())\
                        for col_name in sdf_feb_23.columns]
sdf_feb_23 = sdf_feb_23.select(*consistent_col_casing)
sdf_schema = sdf_feb_23.schema
sdf_schema

cast_to_schema(2022, 1, 12, sdf_schema)
cast_to_schema(2023, 1, 5, sdf_schema)

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



                                                                                

In [9]:
# read raw data to verify
sdf = spark.read.parquet('../data/raw/*')
sdf.printSchema()
sdf.show(1, vertical=True, truncate=100)

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)

-RECORD 0------------------------------------
 VendorID              | 1                   
 tpep_pickup_datetime  | 2022-10-

## Done with raw data &#128512;

In [10]:
# according to data dictionary store_and_fwd_flag represents boolean condition
# but currently have N and Y to represent No and Yes respectively
sdf = sdf.withColumn('store_and_fwd_flag', 
    (F.col('store_and_fwd_flag') == 'Y').cast('boolean'))
sdf.printSchema()
sdf.show(1, vertical=True, truncate = 100)

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: boolean (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)

-RECORD 0------------------------------------
 VendorID              | 1                   
 tpep_pickup_datetime  | 2022-10

In [11]:
# lets see the datashape before doing any further preprocessing
tot_rows = sdf.count()
tot_cols = len(sdf.columns)
print("Total rows: ", tot_rows, "Total columns: ", tot_cols)


Total rows:  55842484 Total columns:  19


In [12]:
# lets see which columns have missing values
missing_values = sdf.agg(*[F.sum(F.when(F.col(c).isNull(), 1)\
                        .otherwise(0)).alias(c) for c in sdf.columns])
missing_values.show()



+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       0|                   0|                    0|        1796968|            0|   1796968|           1796968|           0|           0|           0|          0|    0|      0|         

                                                                                

In [13]:
# well there is immense amount of missing passenger data. 
# (imputation doesn't makes sense here) so let drop those (1796968) rows
missing_passenger = (missing_values.collect()[0]['passenger_count'] 
                    / tot_rows * 100)
print(f'percentage of missing passenger_count data: {missing_passenger}%')

sdf_clean = sdf.filter(col("passenger_count").isNotNull())
missing_values2 = sdf_clean.agg(*[F.sum(F.when(F.col(c).isNull(), 1)\
                            .otherwise(0)).alias(c) for c in sdf_clean.columns])
missing_values2.show()

                                                                                

percentage of missing passenger_count data: 3.217922755728416%




+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       0|                   0|                    0|              0|            0|         0|                 0|           0|           0|           0|          0|    0|      0|         

                                                                                

`after removing misssing passenger_count no other values seems to be missing` \
`How nice is this dataset? No missing values at all.`
## lets do some outlier detection


In [14]:
# Add a new temporary column to record trip distance so can remove those
# trips whcih span more than 5 hours
sdf_with_difference = sdf_clean.withColumn("time_difference", 
                                          col("tpep_dropoff_datetime")
                                          - col("tpep_pickup_datetime"))

# Count rows where the time_difference is more than 5 hours and remove them
count_greater_than_5_hours = sdf_with_difference \
                             .filter(expr("time_difference "
                             "> interval 5 hours")).count()

print(f"Number of rows with time_difference greater than 5 hours: "
      f"{count_greater_than_5_hours}, percentage: " 
      f"{count_greater_than_5_hours/sdf_with_difference.count() * 100}")
                                         
sdf_clean2 = sdf_with_difference \
            .filter(expr("time_difference <= interval 5 hours"))
# no further need of time_difference column
sdf_clean2 = sdf_clean2.drop("time_difference")

# also remove those rows that records drop off time before pick up time
invalid_dropoff = sdf_clean2.filter(expr("tpep_dropoff_datetime"
                                    " <= tpep_pickup_datetime")).count()

print(f"Number of rows with invalid drop off time: {invalid_dropoff}," 
      f"percentage: {invalid_dropoff/sdf_clean2.count() * 100}")

sdf_clean2 = sdf_clean2.filter(expr("tpep_dropoff_datetime"
                                    " > tpep_pickup_datetime"))
print(f'remaining rows {sdf_clean2.count()}')


                                                                                

Number of rows with time_difference greater than 5 hours: 62764, percentage: 0.1161317434734086


                                                                                

Number of rows with invalid drop off time: 23308,percentage: 0.043176753937998566




remaining rows 53959444


                                                                                

In [15]:
# reomove Trips with a pick-up/drop-off location ID out of the range 1-263
sdf_clean3 = sdf_clean2.filter(expr("PULocationID >= 1 "
                                    "AND PULocationID <= 263 "
                                    "AND DOLocationID >= 1 "
                                    "AND DOLocationID <= 263"))

invalid_id = sdf_clean2.count() - sdf_clean3.count()
print(f"Number of invalid location IDs: {invalid_id}",
      f"percentage: {invalid_id/sdf_clean2.count()*100}%")
print(sdf_clean3.count())

                                                                                

Number of invalid location IDs: 943953 percentage: 1.749374956495104%




53015491


                                                                                

In [16]:
# remove rows with passenger count 0 or less
sdf_clean4 = sdf_clean3.filter(expr("passenger_count > 0"))
data_count = sdf_clean3.count()
invalid_passenger_count = data_count - sdf_clean4.count()
print(f"invalid passenger count: {invalid_passenger_count}"
      f" percentage: {invalid_passenger_count/data_count*100}%")



invalid passenger count: 1019697 percentage: 1.923394428243624%


                                                                                

In [17]:
# remove rows with trip_distance 0 or less
sdf_clean5 = sdf_clean4.filter(expr("trip_distance > 0"))
data_count = sdf_clean4.count()
invalid_trip_dis = data_count - sdf_clean5.count()
print(f"Number of invalid trip_distance records: {invalid_trip_dis}"
      f" percentage: {invalid_trip_dis/data_count*100}%")



Number of invalid trip_distance records: 577220 percentage: 1.1101282538353008%


                                                                                

In [18]:
# see if other outliers present according to data dictionary
print(sdf_clean5.filter(expr('VendorID > 2 AND vendorID < 1')).count())
print(sdf_clean5.filter(expr('RateCodeID > 6 AND RateCodeID < 1')).count())
print(sdf_clean5.filter(expr('payment_type > 6 AND payment_type < 1')).count())
print(sdf_clean5.filter(expr("improvement_surcharge < 0 OR fare_amount < 0"
                            " OR extra < 0 OR mta_tax < 0 or tip_amount < 0"
                            " OR total_amount < 0 OR congestion_surcharge < 0"
                            " OR airport_fee < 0 OR tip_amount < 0"
                            " OR tolls_amount < 0")).count())

0
0
0




333367


                                                                                

In [19]:
# so we have some outliers in above cell lets remove those
sdf_clean6 = sdf_clean5.filter(expr(" improvement_surcharge >= 0"
                                    " AND fare_amount >= 0 AND extra >= 0"
                                    " AND mta_tax >= 0 AND tip_amount >= 0"
                                    " AND total_amount >= 0"
                                    " AND congestion_surcharge >= 0"
                                    " AND airport_fee >= 0 AND tip_amount >= 0"
                                    " AND tolls_amount >= 0"))
prev = sdf_clean5.count()
removed = prev - sdf_clean6.count()
print(f"rows removed {removed} "
      f"percentage: {removed/prev*100}%")



rows removed 333367 percentage: 0.6483396447361609%


                                                                                

In [26]:
# also we might have some wrong date recorded in the data, we should remove them
from pyspark.sql.functions import col, to_date, lit, date_format


sdf_clean7 = sdf_clean6 \
            .filter((date_format(col("tpep_pickup_datetime"), 'yyyy-MM-dd') \
                        >= lit('2022-01-01')) \
                    & (date_format(col("tpep_pickup_datetime"), 'yyyy-MM-dd') \
                        <= lit('2023-05-31')) \
                    & (date_format(col("tpep_dropoff_datetime"), 'yyyy-MM-dd') \
                        >= lit('2022-01-01')) \
                    & (date_format(col("tpep_dropoff_datetime"), 'yyyy-MM-dd') \
                        <= lit('2023-05-31')))

prev = sdf_clean6.count()
removed = prev - sdf_clean7.count()
print(f"rows removed {removed} "
      f"percentage: {removed/prev*100}%")



rows removed 1126 percentage: 0.002204160589972749%


                                                                                

## PHEWWW! Done with all yellow taxi data preprocessing
## and outlier detection &#128512;

In [27]:
# now store data in curated layer
sdf_clean7.write.parquet('../data/curated/yellow_taxi.parquet',\
                        mode ='overwrite')

                                                                                