In [60]:
import configparser
from etl import (create_spark_session)
config = configparser.ConfigParser()
config.read("../dwh.cfg")
spark = create_spark_session(config.get("AWS", "KEY"), config.get("AWS", "SECRET"))

# Crash Data

In [4]:
df_accidents = spark.read.option("header",True).csv(config.get("S3", "CRASH_DATA_LOC")).cache()
df_accidents.count()

1729860

In [6]:
df_accidents.printSchema()

root
 |-- CRASH DATE: string (nullable = true)
 |-- CRASH TIME: string (nullable = true)
 |-- BOROUGH: string (nullable = true)
 |-- ZIP CODE: string (nullable = true)
 |-- LATITUDE: string (nullable = true)
 |-- LONGITUDE: string (nullable = true)
 |-- LOCATION: string (nullable = true)
 |-- ON STREET NAME: string (nullable = true)
 |-- CROSS STREET NAME: string (nullable = true)
 |-- OFF STREET NAME: string (nullable = true)
 |-- NUMBER OF PERSONS INJURED: string (nullable = true)
 |-- NUMBER OF PERSONS KILLED: string (nullable = true)
 |-- NUMBER OF PEDESTRIANS INJURED: string (nullable = true)
 |-- NUMBER OF PEDESTRIANS KILLED: string (nullable = true)
 |-- NUMBER OF CYCLIST INJURED: string (nullable = true)
 |-- NUMBER OF CYCLIST KILLED: string (nullable = true)
 |-- NUMBER OF MOTORIST INJURED: string (nullable = true)
 |-- NUMBER OF MOTORIST KILLED: string (nullable = true)
 |-- CONTRIBUTING FACTOR VEHICLE 1: string (nullable = true)
 |-- CONTRIBUTING FACTOR VEHICLE 2: string (nu

## Data Quality Issues:

1)`CRASH DATE` column is inferred as string type

2)`NUMBER OF ...` column types are inferred as string types

3) 2 records with COLLISION_ID = NULL

In [15]:
from pyspark.sql.functions import * 
df_accidents.select("COLLISION_ID").groupBy("COLLISION_ID").count().where(col("count") > 1).show()

+------------+-----+
|COLLISION_ID|count|
+------------+-----+
|        null|    2|
+------------+-----+



4) Required columns: `CRASH_DATE`, `LATITUDE` and `LONGITUDE` contains null values

Total Count: 1,729,860, Corrupt rows: 206,273

In [16]:
df_accidents.filter(col("CRASH DATE").isNull() | col("LATITUDE").isNull() | col("LONGITUDE").isNull()).count()

206273

5) some vechicle_type_list letter casing impacts factors.

Example: Taxi, TAXI, or taxi

In [59]:
df_accidents.select(
     array_distinct(array(
          col("VEHICLE TYPE CODE 1"),
          col("VEHICLE TYPE CODE 2"),
          col("VEHICLE TYPE CODE 3"),
          col("VEHICLE TYPE CODE 4"),
          col("VEHICLE TYPE CODE 5")
         )).alias("vechicle_type_list"))\
    .select(
        explode("vechicle_type_list").alias("factor")
    ).filter(col("factor").rlike("(?i)^taxi$")).distinct().show(20, False)
    

+------+
|factor|
+------+
|TAXI  |
|Taxi  |
|taxi  |
+------+



6) contributing_factor_list contains 2 unexpected numerical values: 1 and 80. Needs to be fixed in the source system

In [46]:
df_accidents.select(
    array_distinct(array(col("CONTRIBUTING FACTOR VEHICLE 1"),
            col("CONTRIBUTING FACTOR VEHICLE 2"),
            col("CONTRIBUTING FACTOR VEHICLE 3"),
           col("CONTRIBUTING FACTOR VEHICLE 4"),
           col("CONTRIBUTING FACTOR VEHICLE 5"),
    )).alias("contributing_factor_list")).select(
        explode("contributing_factor_list").alias("factor"))\
        .withColumn("is_numerical", expr("cast(factor as int)") ).filter(col("is_numerical") > 0).distinct().show()

+------+------------+
|factor|is_numerical|
+------+------------+
|     1|           1|
|    80|          80|
+------+------------+



## Data Quality fixing strategy in ETL

1)`CRASH DATE` -  cast to `DateType()`

2) All `NUMBER OF ...` - cast to `IntegerType()`

3) Required columns: `CRASH_DATE`, `LATITUDE` and `LONGITUDE` contains null values - DROP Rows where required column missing

4) 2 records with COLLISION_ID = NULL - drop rows where COLLISION_ID = NULL

5) some vechicle_type_list letter casing impacts factors - convert all factors to uppercase()

6) contributing_factor_list contains 2 unexpected numerical values - N/A Should be fixed at the source level


# Accident Data

In [61]:
df_taxi = spark.read.option("header",True).csv(config.get("S3", "NYC_TAXI_DATA_LOC"))
df_taxi.printSchema()

root
 |-- VendorID: string (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- congestion_surcharge: string (nullable = true)



In [64]:
df_taxi.count()

16847778

## Data Quality Issues:

1) `VendorId`, inferred incorrect data type

2) 280,883 NULL Values in `VendorID`

3) `tpep_pickup_datetime` and `tpep_dropoff_datetime` data type inferred incorrectly

4) `total_amount` data type inferred incorrectly

5) `PULocationID` and `DOLocationID`. data type inferred incorrectly 

In [63]:
df_taxi.filter(col("VendorID").isNull()).count()

280883

## Data Quality fixing strategy in ETL

- Correct type casting 
- Replace missing VendorID's with -1.