In [1]:
import os
import configparser
from pathlib import Path
from pyspark.sql import SparkSession

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1633831269192_0001,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
# Create our Spark Session via a SparkSession builder
spark = SparkSession.builder.appName("accident analysis").getOrCreate()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Data Quality check on transformed load to S3 bucket

### 1. Data schema of every dimensional table matches data model

In [3]:
S3_BUCKET_SOURCE_PATH = "s3://spark-project-kolusu/input/"
S3_BUCKET_DEST_PATH = "s3://spark-project-kolusu/output/"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
modules = ["Crashes","Vehicles","Persons","Time"]
for module in modules:
    dest_path = S3_BUCKET_DEST_PATH + "dim_" +module + "/"
    df = spark.read.parquet(dest_path)
    print("Table: " + dest_path.split('/')[-2])
    schema = df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Table: dim_Crashes
root
 |-- COLLISION_ID: long (nullable = true)
 |-- BOROUGH: string (nullable = true)
 |-- STREET_NAME: string (nullable = true)
 |-- CONTRIBUTING_FACTOR_VEHICLE: string (nullable = true)
 |-- NUMBER_PEOPLE_INJURED: long (nullable = true)
 |-- NUMBER_PEOPLE_KILLED: long (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)

Table: dim_Vehicles
root
 |-- COLLISION_ID: long (nullable = true)
 |-- VEHICLE_ID: long (nullable = true)
 |-- STATE_REGISTRATION: string (nullable = true)
 |-- VEHICLE_TYPE: string (nullable = true)
 |-- VEHICLE_MAKE: string (nullable = true)
 |-- VEHICLE_MODEL: string (nullable = true)
 |-- VEHICLE_YEAR: long (nullable = true)
 |-- DRIVER_SEX: string (nullable = true)
 |-- POINT_OF_IMPACT: string (nullable = true)
 |-- VEHICLE_DAMAGE: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)

T

### 2. No empty table after running ETL data pipeline

In [5]:
modules = ["Crashes","Vehicles","Persons","Time"]
for module in modules:
    dest_path = S3_BUCKET_DEST_PATH + "dim_" +module + "/"
    df = spark.read.parquet(dest_path)
    record_num = df.count()
    if record_num <= 0:
        raise ValueError("This table is empty!")
    else:
        print("Table: " + dest_path.split('/')[-2] + f" is not empty: total {record_num} records.")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Table: dim_Crashes is not empty: total 1827986 records.
Table: dim_Vehicles is not empty: total 3664982 records.
Table: dim_Persons is not empty: total 4453244 records.
Table: dim_Time is not empty: total 1827986 records.

### 3. Source/Dest count checks to ensure completeness

In [6]:
def get_src_dest_table_count(src_path, dest_path):
    df_src = spark.read.csv(src_path, header=True)
    df_src = df_src.filter(df_src.collision_id.isNotNull())
    df_dest = spark.read.parquet(dest_path)
    return df_src.count(), df_dest.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
modules = ["Crashes","Vehicles","Persons"]
for module in modules:
    src_path = S3_BUCKET_SOURCE_PATH + module + "/"
    dest_path = S3_BUCKET_DEST_PATH + "dim_" +module + "/"
    df_src_count, df_dest_count = get_src_dest_table_count(src_path, dest_path)                    
    print("%s: The source table record count is %d and destination table record count is %d" %(module,df_src_count, df_dest_count))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Crashes: The source table record count is 1827986 and destination table record count is 1827986
Vehicles: The source table record count is 3664982 and destination table record count is 3664982
Persons: The source table record count is 4453244 and destination table record count is 4453244

## Data Quality check on data in Redshift

In [14]:
#sc.install_pypi_package("psycopg2")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
import psycopg2

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

  """)

In [10]:
host = 'redshift-cluster-collision.cnufhcq0yteu.us-west-2.redshift.amazonaws.com'
dbname = 'dev'
user = 'awsuser'
db_password = 'June-1989'
db_port = 5439

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
conn = psycopg2.connect("host={} dbname={} user={} password={} port={}" \
            .format(host, dbname, user, db_password, db_port))
cur = conn.cursor()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
def get_table_count(tablename):
    query = ("Select count(*) from %s" %(tablename))
    cur.execute(query)
    result = cur.fetchone()
    print ("The number of records in table %s is %d" %(tablename, result[0]))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
get_table_count("dim_collisions")
get_table_count("dim_vehicles")
get_table_count("dim_persons")
get_table_count("dim_time")
get_table_count("fact_crashes")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The number of records in table dim_collisions is 1827986
The number of records in table dim_vehicles is 3664982
The number of records in table dim_persons is 4453244
The number of records in table dim_time is 1827986
The number of records in table fact_crashes is 9395799

##### We can see that the number of records in each Redshift table match with its corresponding source table