<strong>Scope of the Project and Data Sources</strong>

<p>There are <a href="https://en.wikipedia.org/wiki/Motor_vehicle_fatality_rate_in_U.S._by_year">millions</a> of car crashes in the United States every year, <a href="https://en.wikipedia.org/wiki/Motor_vehicle_fatality_rate_in_U.S._by_year">thousands</a> of which are deadly. One dataset that has collected information on 3 million+ accidents in 49 states between 2016 and 2020 is <a href="https://www.kaggle.com/sobhanmoosavi/us-accidents">US Accidents: A Countrywide Traffic Accident Dataset.</a> According to the description in Kaggle, this dataset has been gathered from a variety of APIs that capture and stream traffic events from a number of different sources, including federal and local bureaus of transportation, traffic sensors, traffic cameras, police and other law enforcement agencies. While this dataset itself contains a great deal of useful information about the accidents, including when and where they happened, their severity, and what the weather was at the time they occurred, combining this dataset with another dataset could provide even more insight into the background of the accident and possible contributing factors.</p>
<p>In this project, the accident data from 2016 and 2017 was combined with Population Estimates data from another dataset, <a href="https://data.world/nrippner/county-level-data-usda-economic-research-service">County-Level-Data-USDA-Economic-Research-Service.</a> According to the Kaggle description, the original source of the data is the U.S. Census Bureau. This data includes the population of United States counties, as well as net population changes, urban-rural characteristics, and economic system types. The purpose of combining these datasets is to add this population information as additional factors that can be analyzed in the study of accidents, along with the other factors that are part of the accident dataset. The hope is that analyzing large datasets like this will let us see larger patterns in what causes accidents, and potentially use this information to anticipte and prevent them in the future.</p>


<strong>Uses of the Data</strong>

<p>The data in this project could be used by researchers in academia and elsewhere, who do analyses of accident data. An example of this type of analysis is <a href="https://whyy.org/articles/study-city-driving-often-safer-than-the-burbs/"> this study done by researchers at the University of Pennsylvania</a>, which used accident data and population data to show that denser areas of cities have less accident rates than areas with more urban sprawl. This data could also be useful for urban planners; <a href="https://www.researchgate.net/publication/285736707_Relationships_between_urban_planning_variables_and_traffic_crashes_in_Damascus">in this study </a>traffic accident, population, and other data were analyzed to find “ relationships that could have practical applications for use by city planners and traffic safety engineers. Another group that could potentially be interested in this type of accident data is individuals in the insurance industry. <a href="https://insurify.com/insights/states-with-most-car-accidents-2020/">Insurify</a>, an auto insurance quotes comparison site, had a team of analysts examine state car accident data because these types of analyses determine insurance costs.</p>

<strong>Data Exploration and Data Cleaning</strong> 

An important aspect of this project was joining the Accident Table with the Population Table. They need to be joined on the county and state fields (since there are counties with the same name in different states). An exploration of the data revealed that the Population Dataset had the word "County"after the names of all the counties, while the Accident Dataset did not have this. This would mean that matching on the county names would not work. To fix this, the word county was removed from the Population Table using regular expressions.

An exploration of the data also revealed that the Population Dataset included rows with state level population information as well as counties. This could cause problems with the joining of the Population and Accident Tables, because there are counties with the same name as the state they are in. This could result in accidents that took place in the county being assigned the population for the entire state. To remedy this problem, the state records needed to be removed. In the FIPS coding system, state FIPS are distinguished by the fact that they end in 000, so all records with an FIPS code ending in 3 zeroes were dropped. 

Another data cleaning step that was taken using regular expressions is the removal of the commas in the population numbers. This was required to ensure that they were cast correctly to Integers. 


<strong>Import Statements</strong>

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, to_timestamp, concat, lit, to_date, date_format, year, month, dayofweek, regexp_replace, trim, substring, isnan, when, count, desc
from pyspark.sql.types import FloatType, TimestampType, IntegerType
import datetime

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1626369817567_0002,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<strong>Create Spark Session</strong>

In [2]:
def create_spark_session():
    """starts spark session"""
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<strong>Utility Function to Convert String to Datetime</strong>

In [3]:
def get_timestamp(ts):
    """converts time string to timestamp"""
    time=datetime.datetime.fromtimestamp(ts/1000).strftime('%Y-%m-%d %H:%M:%S')
    return time

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<strong>Function to Clean, Model, and Pipeline Accident Data Data from Population Dataset</strong>

In [4]:
def process_population_data(spark, input_data, output_data):
    """
    Takes spark session, location of population csv file, location where parquet file should be saved on s3
    Reads in population csv file, processes it to create Population table, saves table as parquet file.

    """
    
    #read in population csv file
    df_population = spark.read.option("header",True).csv(input_data)
   
    #read in population csv file
    population_table=df_population.withColumn("pop_id",col("FIPS")) \
    .withColumn("state",col("State")) \
    .withColumn("area_name",col("Area_Name")) \
    .withColumn("county", trim(regexp_replace("Area_Name", 'County', ''))) \
    .withColumn("Pop_2016_raw",col("POP_ESTIMATE_2016")) \
    .withColumn("Economic_typology",col("Economic_typology_2015").cast(IntegerType())) \
    .withColumn("Rural_Urban_Code",col("Rural-urban_Continuum Code_2013").cast(IntegerType())) \
    .withColumn("Pop_2016",regexp_replace("Pop_2016_raw", ',', '').cast(IntegerType())) \
    .withColumn("Pop_2017_raw",col("POP_ESTIMATE_2017")) \
    .withColumn("Pop_2017",regexp_replace("Pop_2017_raw", ',', '').cast(IntegerType())) \
    .withColumn("Net_Pop_Change_2016_raw",col("N_POP_CHG_2016")) \
    .withColumn("Net_Pop_Change_2016",regexp_replace("Net_Pop_Change_2016_raw", ',', '').cast(IntegerType())) \
    .withColumn("Net_Pop_Change_2017_raw",col("N_POP_CHG_2017")) \
    .withColumn("Net_Pop_Change_2017",regexp_replace("Net_Pop_Change_2017_raw", ',', '').cast(IntegerType())) \
    
    
    #check if a county with the same name as a state has 2 listings.
    population_table.filter(population_table.county == "New York")
    
    #remove state level information from population table, since we only want counties
    population_table=population_table.filter(population_table.pop_id != "0")
    population_table=population_table.filter(substring(population_table.pop_id,-3, 3) != "000")
    
    #confirm that a county with the same name as a state now only has 1 listing
    population_table.filter(population_table.county == "New York")
    
    #test for duplicate pop_id
    test=population_table.groupBy("pop_id").count().filter("count > 1")
    
    #drop extraneous columns from population table
    population_table=population_table.drop("FIPS","Rural-urban_Continuum Code_2003", \
    "Rural-urban_Continuum Code_2013", "Urban_Influence_Code_2003", "Urban_Influence_Code_2013", \
    "Economic_typology_2015","CENSUS_2010_POP","ESTIMATES_BASE_2010","POP_ESTIMATE_2010","POP_ESTIMATE_2011", \
    "POP_ESTIMATE_2012","POP_ESTIMATE_2013","POP_ESTIMATE_2014","POP_ESTIMATE_2015", "POP_ESTIMATE_2016", \
    "POP_ESTIMATE_2017","N_POP_CHG_2010","N_POP_CHG_2011","N_POP_CHG_2012","N_POP_CHG_2013","N_POP_CHG_2014", \
    "N_POP_CHG_2015","N_POP_CHG_2016", "N_POP_CHG_2017", "Births_2010", "Births_2011", "Births_2012","Births_2013", \
    "Births_2014", "Births_2015","Births_2016","Births_2017","Deaths_2010", "Deaths_2011", "Deaths_2012", \
    "Deaths_2013","Deaths_2014","Deaths_2015","Deaths_2016","Deaths_2017", "NATURAL_INC_2010","NATURAL_INC_2011", \
    "NATURAL_INC_2012","NATURAL_INC_2013","NATURAL_INC_2014","NATURAL_INC_2015","NATURAL_INC_2016","NATURAL_INC_2017", \
    "INTERNATIONALMIG_2010","INTERNATIONALMIG-2011","INTERNATIONAL_MIG_2012","INTERNATIONAL_MIG_2013", \
    "INTERNATIONAL_MIG_2014","INTERNATIONAL_MIG_2015","INTERNATIONAL_MIG_2016","INTERNATIONAL_MIG_2017", \
    "DOMESTIC_MIG_2010","DOMESTIC_MIG_2011","DOMESTIC_MIG_2012","DOMESTIC_MIG_2013","DOMESTIC_MIG_2014","DOMESTIC_MIG_2015", \
    "DOMESTIC_MIG_2016","DOMESTIC_MIG_2017","NET_MIG_2010", "NET_MIG_2011","NET_MIG_2012", "NET_MIG_2013","NET_MIG_2014", \
    "NET_MIG_2015","NET_MIG_2016","NET_MIG_2017","RESIDUAL_2010","RESIDUAL_2011","RESIDUAL_2012","RESIDUAL_2013", \
    "RESIDUAL_2014","RESIDUAL_2015","RESIDUAL_2016","RESIDUAL_2017","GQ_ESTIMATES_BASE_2010","GQ_ESTIMATES_2010", \
    "GQ_ESTIMATES_2011","GQ_ESTIMATES_2012","GQ_ESTIMATES_2013","GQ_ESTIMATES_2014","GQ_ESTIMATES_2015","GQ_ESTIMATES_2016", \
    "GQ_ESTIMATES_2017","R_birth_2011","R_birth_2012","R_birth_2013","R_birth_2014","R_birth_2015","R_birth_2016", \
    "R_birth_2017","R_death_2011","R_death_2012","R_death_2013","R_death_2014","R_death_2015","R_death_2016", \
    "R_death_2017","R_NATURAL_INC_2011","R_NATURAL_INC_2012","R_NATURAL_INC_2013","R_NATURAL_INC_2014","R_NATURAL_INC_2015", \
    "R_NATURAL_INC_2016","R_NATURAL_INC_2017","R_INTERNATIONAL_MIG_2011","R_INTERNATIONAL_MIG_2012", \
    "R_INTERNATIONAL_MIG_2013","R_INTERNATIONAL_MIG_2014","R_INTERNATIONAL_MIG_2015","R_INTERNATIONAL_MIG_2016", \
    "R_INTERNATIONAL_MIG_2017","R_DOMESTIC_MIG_2011","R_DOMESTIC_MIG_2012","R_DOMESTIC_MIG_2013","R_DOMESTIC_MIG_2014", \
    "R_DOMESTIC_MIG_2015","R_DOMESTIC_MIG_2016","R_DOMESTIC_MIG_2017","R_NET_MIG_2011","R_NET_MIG_2012","R_NET_MIG_2013", \
    "R_NET_MIG_2014","R_NET_MIG_2015","R_NET_MIG_2016","R_NET_MIG_2017","Pop_2016_raw", "Pop_2017_raw", \
    "Net_Pop_Change_2016_raw","Net_Pop_Change_2017_raw","Int_mig_2016_raw","Int_mig_2017_raw","Dom_mig_2016_raw", \
    "Dom_mig_2017_raw","Net_mig_2016_raw","Net_mig_2017_raw")              
    
   
    #write final population_table to parquet file 
    population_table.write.parquet(output_data_pop, mode="overwrite")
    
    
    

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<strong>Function to Clean, Model, and Pipeline Accident Data from Accident Dataset</strong>

In [5]:
def process_accident_data(spark, input_data, output_data_acc, output_table_loc, output_table_time, output_table_weather):
    """
    Takes spark session, location of accident csv file, locations where parquet files should be saved on s3
    Reads in accident csv file, processes it to create Accident_Population 2016_2017 table, Weather Table, Time Table, and Location tables, saves tables as parquet files
    """
    
    #read in accident csv file
    df_accident = spark.read.csv(input_data)
   
    #extract columns to create weather table
    weather_table=df_accident.withColumn("weather_timestamp", to_timestamp(col("_c20"),"MM/dd/yyyy HH:mm")) \
    .withColumn("Weather_Condition",col("_c29")) \
    .withColumn("Wind_Speed",col("_c27").cast(FloatType())) \
    .withColumn("Visibility",col("_c25").cast(FloatType())) \
    .withColumn("Humidity",col("_c23").cast(FloatType())) \
    .withColumn("Precipitation",col("_c28").cast(FloatType())) \
    .withColumn("Temperature",col("_c21").cast(FloatType())) 
    
    #remove duplicate weather timestamps
    weather_table=weather_table.dropDuplicates(["weather_timestamp"])
    
    #drop extraneous columns in weather table
    weather_table=weather_table.drop("_c0","_c1", "_c2", "_c3", "_c4", "_c5", "_c6", "_c7", \
    "_c8" , "_c9", "_c9", "_c10", "_c11", "_c12", "_c13", "_c14", "_c15", "_c16", "_c17", "_c18", "_c19", "_c20", \
    "_c21" , "_c22", "_c23", "_c24", "_c25", "_c26", "_c27", "_c28", "_c29", "_c30", "_c31", "_c32", "_c33", "_c34", \
    "_c35" , "_c36", "_c37", "_c38", "_c39", "_c40", "_c41", "_c42", "_c43", "_c44", "_c45","_C46")
   
    #extract columns to create location table 
    location_table = df_accident.withColumn('lat_log', concat(col('_c4'), lit(','), col('_c5'), \
    lit('_'), col('_c6'), lit('_'), col('_c7')))  \
    .withColumn("Number",col("_c10")) \
    .withColumn("Street",col("_c11")) \
    .withColumn("City",col("_c13")) \
    .withColumn("County_name",col("_c14")) \
    .withColumn("State",col("_c15")) \
    .withColumn("ZipCode",col("_c16")) 
    
    #remove duplicate latitude/longitudes
    location_table=location_table.dropDuplicates(["lat_log"])
    
    #drop extraneous columns in location table
    location_table=location_table.drop("_c0","_c1", "_c2", "_c3", "_c4", "_c5", "_c6", "_c7", \
    "_c8" , "_c9", "_c9", "_c10", "_c11", "_c12", "_c13", "_c14", "_c15", "_c16", "_c17", "_c18", "_c19", "_c20", \
    "_c21" , "_c22", "_c23", "_c24", "_c25", "_c26", "_c27", "_c28", "_c29", "_c30", "_c31", "_c32", "_c33", "_c34", \
    "_c35" , "_c36", "_c37", "_c38", "_c39", "_c40", "_c41", "_c42", "_c43", "_c44", "_c45","_C46")
    
    #UDF toconvert string to timestamp
    timestampUDF = udf(lambda ts:get_timestamp(ts))
    
    #extract columns to create time table
    time_table=df_accident.withColumn("start_timestamp", to_timestamp(col("_c2"),"MM/dd/yyyy HH:mm")) \
    .withColumn("end_timestamp", to_timestamp(col("_c3"),"MM/dd/yyyy HH:mm")) 
    
    time_table=time_table.withColumn("start_date",to_date(col("start_timestamp"))) \
    .withColumn("end_date",to_date(col("end_timestamp"))) \
    .withColumn('start_time', date_format(col("start_timestamp"), 'HH:mm:ss')) \
    .withColumn('end_time', date_format(col("end_timestamp"), 'HH:mm:ss')) \
    .withColumn("start_month", month("start_date")) \
    .withColumn("start_year", year("start_date")) \
    .withColumn("start_weekday", dayofweek("start_date")) \
    .withColumn("end_month", month("end_date")) \
    .withColumn("end_year", year("end_date")) \
    .withColumn("end_weekday", dayofweek("end_date")) 
    
    #remove duplicate start-end timestamps    
    time_table=time_table.dropDuplicates(['start_timestamp','end_timestamp']) 
    
    #drop extraneous columns in time table
    time_table=time_table.drop("_c0","_c1", "_c2", "_c3", "_c4", "_c5", "_c6", "_c7", \
    "_c8" , "_c9", "_c9", "_c10", "_c11", "_c12", "_c13", "_c14", "_c15", "_c16", "_c17", "_c18", "_c19", "_c20", \
    "_c21" , "_c22", "_c23", "_c24", "_c25", "_c26", "_c27", "_c28", "_c29", "_c30", "_c31", "_c32", "_c33", "_c34", \
    "_c35" , "_c36", "_c37", "_c38", "_c39", "_c40", "_c41", "_c42", "_c43", "_c44", "_c45","_C46")
    
    #extract columns to create accident detail table
    accident_detail_table=df_accident.withColumn("Accident_Id",col("_c0")) \
    .withColumn("Description",col("_c9")) \
    .withColumn("Severity",col("_c1").cast(IntegerType())) \
    .withColumn("start", to_timestamp(col("_c2"),"MM/dd/yyyy HH:mm")) \
    .withColumn("end", to_timestamp(col("_c3"),"MM/dd/yyyy HH:mm")) \
    .withColumn("weather_timestamp", to_timestamp(col("_c20"),"MM/dd/yyyy HH:mm")) \
    .withColumn('latlog', concat(col('_c4'), lit(','), col('_c5'), \
    lit('_'), col('_c6'), lit('_'), col('_c7')))  \
    
    #test for duplicate accident ids
    test=accident_detail_table.groupBy("Accident_Id").count().filter("count > 1")
    test.drop('count')
    
    #drop extraneous columns in accident table
    accident_detail_table=accident_detail_table.drop("_c0","_c1", "_c2", "_c3", "_c4", "_c5", "_c6", "_c7", \
    "_c8" , "_c9", "_c9", "_c10", "_c11", "_c12", "_c13", "_c14", "_c15", "_c16", "_c17", "_c18", "_c19", "_c20", \
    "_c21" , "_c22", "_c23", "_c24", "_c25", "_c26", "_c27", "_c28", "_c29", "_c30", "_c31", "_c32", "_c33", "_c34", \
    "_c35" , "_c36", "_c37", "_c38", "_c39", "_c40", "_c41", "_c42", "_c43", "_c44", "_c45","_C46")
                
    #read in population data from parquet file
    df_pop_parquet = spark.read.parquet('s3://population-kb789/population_table/')
    
    #create intermediate population-location dataframe
    pop_location_df = df_pop_parquet.join(location_table,(location_table.State==df_pop_parquet.state) & \
    (location_table.County_name==df_pop_parquet.county), how="inner")
    
    #drop duplicate latitude-longitudes from population-location dataframe
    pop_location_df=pop_location_df.dropDuplicates(["lat_log"])
     
    #filter time table to 2016 and 2017
    time_table_2016_2017=time_table.filter((time_table.start_year == 2016) | (time_table.start_year == 2017))
    
    #create accident_detail_2016_2017 by joining it with time table filtered to 2016_2017
    accident_detail_2016_2017=accident_detail_table.join(time_table_2016_2017,((time_table_2016_2017.start_timestamp== \
    accident_detail_table.start) & (time_table_2016_2017.end_timestamp== \
    accident_detail_table.end)), how="inner")
    
    #drop extraneous columns from accident_detail_2016_2017
    accident_detail_2016_2017=accident_detail_2016_2017.drop("start_date", "end_date", \
    "start_time", "end_time","start_month","start_year", "start_weekday", "end_month", "end_year", "end_weekday")
    
    #join accident_detail_2016_2017 with population-location dataframe
    accident_detail_2016_2017=accident_detail_2016_2017.join(pop_location_df,(pop_location_df.lat_log== \
    accident_detail_2016_2017.latlog), how="left")
    
    #drop extraneous columns from accident_detail_2016
    accident_detail_2016_2017=accident_detail_2016_2017.drop("state", "area_name", "county", "Pop_2017", "Net_Pop_Change_2017", \
    "Int_mig_2017", "Dom_mig_2017","lat_log","Number","Street", "City", "County_name", "State", "ZipCode", "start", "end", \
    "Economic_typology","Rural_Urban_Code", "Pop_2016", "Net_Pop_Change_2016")
       
    #write final accident_detail_2016 table to parquet file 
    accident_detail_2016_2017.write.parquet(output_data_acc, mode="overwrite")
    
    #write final time table to parquet file 
    time_table.write.parquet(output_table_time, mode="overwrite")
    
    #write final location table to parquet file 
    location_table.write.parquet(output_table_loc, mode="overwrite")
    
    #write final weather table to parquet file 
    weather_table.write.parquet(output_table_weather, mode="overwrite")
    

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<strong>Start Spark Session</strong>

In [6]:
spark = create_spark_session()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<strong>Call Function to Clean, Model, and Pipeline Population Data </strong>

In [7]:

input_data_pop = "s3://population-kb789/PopulationEstimates.csv"
output_data_pop= "s3://population-kb789/population_table/"
df_population=process_population_data(spark, input_data_pop, output_data_pop)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<strong>Call Function to Clean, Model, and Pipeline Accident Data </strong>

In [8]:

input_data_acc = "s3://accidents-kb789/US_Accidents_Dec20_Updated.csv"
output_data_acc = "s3://accidents-kb789/accident_tables/"
output_table_loc = "s3://kb789-location/location-table/"
output_table_time = "s3://kb789-time/time-table/"
output_table_weather = "s3://kb789-weather/weather-table/"
process_accident_data(spark, input_data_acc, output_data_acc, output_table_loc, output_table_time, output_table_weather )

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<strong>Data Model</strong>

<p>This data model was chosen so that this dataset could be used by researchers to analyze accident data. The Accident-Population Table 2016-2017 is at the center; it has the unique id for each accident, the textual description of each accident, and the severity of each accident along with foreign keys—the weather timestamp of the accident, the latitude and longitude of the accident, and the timestamp of the accident that connect to all the other tables. This way, additional details about the accidents can be analyzed</p>
<p>The Weather table includes information about the weather at the time the accident occurred. It’s fields include details about the temperature, the precipitation, and the windspeed. The data could be used to do a study very much like <a href="https://ncics.org/cics-news/precipitation-and-fatal-motor-vehicle-crashes/"> this one</a> which found “that precipitation events increase the likelihood of a fatal car crash in the continental United States by about 34%.”</p>
<p>The Time table includes information about the day, time, date, and timestamp that the accident occurred. This is the sort of data that could be used to do a study similar to this one, titled <a href="https://crashstats.nhtsa.dot.gov/Api/Public/ViewPublication/810637">“Passenger Vehicle Occupant Fatalities by Day and Night – A Contrast.”</a></p>
<p>The Location table includes data about the location at which the accident took place. Information like the address and the latitude and longitude is included. Studies like <a href="https://www.iihs.org/topics/fatality-statistics/detail/state-by-state">this one</a>, which look at traffic accident fatalities by state, prove that traffic accidents are often analyzed by location attributes like state.</p>
<p>Finally, the Population table includes population information about the area in which the accident occurred—the population number, whether the area is experiencing a population increase or decrease, whether the area is urban or rural, and the area’s economic characteristics. This data could support studies similar to <a href="https://bmcpublichealth.biomedcentral.com/articles/10.1186/s12889-019-7809-7">this one</a>, which looks at traffic accidents and economic development in Thailand, and studies like <a href="https://royalsocietypublishing.org/doi/10.1098/rsos.191739">this one,</a> which compares “the incidence of road accidents in urban as opposed to rural areas” in England and Wales.</p>


<strong>Data Pipeline</strong>

<p>Data Pipeline Steps:</p>
<ol>
    <li>Start a Spark Session</li>
    <li>Read in the population csv file.</li>
    <li>Create the Population table, adjust the datatypes as needed to match the data dictionary, drop unneeded columns, save it to a parquet file.</li>
    <li>Read in the accident csv file.</li>
    <li>Create the Weather table.</li>
    <li>Create the Location table.</li>
    <li>Create the Time table.</li>
    <li>Create the Accident Detail table.</li>
    <li>Load the Population table from the parquet file.</li>
    <li>Join the Population Table with the Location table on state name and county name to create Population-Location dataframe</li>
    <li>Create Accident Detail Table 2016_2017 by filtering using the Time Table</li>
    <li>Join the Accident Detail Table 2016_2017 Table with the Population-Location Dataframe on Latitude and Longitude to create final Accident_Detail_2016_2017 table.</li>
    <li>Save the Weather, Location, Time and Accident_Detail_2016_2017 tables as parquet files </li>
<ol/>


<strong>Data Quality Checks</strong>

<p>The population table was checked for duplicate county-state pairings, since this is the field the accident dataset is being joined on. No duplicates were found.</p>
<p>The accident_detail_2016_2017 was checked for null values in the pop_id field. 9700 records were found to have null values in this field, which means that about 3.7% of the records lack population data fields. This is probably due to instances of the county name being listed slightly differently between the 2 datasets. If time permitted, the datasets could be explored some more to look for additional discrepancies.</p>
<p>Finally, a random row from the raw accident csv file was selected. It's accident_id and county were input, and the records for these in the accident_detail_2016_2017 table and the population table are printed so they can be compared. The data did match so the record was loaded correctly.<p>


In [9]:
def data_quality_check(population_table, accident_table, test_record_id, test_record_county):
    """
    Takes location of population table on s3, accident_population_2016_2017 table on s3, 
    the accident_id and county of a test record from the raw csv files. 
    
    Tests for duplicate county-state records in the population table and prints results,
    tests how many records in the accident_population_2016_2017 table have null values in the foreign key pop_id field 
    and prints results, 
    prints how the data for the test row appears in the population table and the accident_population_2016_2017 tables.
    """
    results=""
    population_table = spark.read.parquet(population_table)
    accident_table = spark.read.parquet(accident_table)

    #check for duplicate county-state rows in population table
    pop_test=population_table.groupBy("county","state").count().filter("count > 1")
    pop_test.drop('count').show()
        
    #print total number of rows in accident table
    record_count=accident_table.count()
    print("Record Count:" + str(record_count))
    
    #print how many of them have null values for population_id
    #code from https://www.datasciencemadesimple.com/count-of-missing-nanna-and-null-values-in-pyspark/
    accident_table.select([count(when(isnan('pop_id') | col('pop_id').isNull() , True))]).show()
    
    #test the data in the accident and population data for record selected from raw csv file. 
    test_record=accident_table.filter(accident_table.Accident_Id == test_record_id).show()
    test_record_county=population_table.filter(population_table.county==test_record_county).show()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
 data_quality_check('s3://population-kb789/population_table/', 's3://accidents-kb789/accident_tables/','A-2755', 'Westchester' )

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+-----+
|county|state|
+------+-----+
+------+-----+

Record Count:259254
+------------------------------------------------------------------+
|count(CASE WHEN (isnan(pop_id) OR (pop_id IS NULL)) THEN true END)|
+------------------------------------------------------------------+
|                                                              9700|
+------------------------------------------------------------------+

+-----------+--------------------+--------+-------------------+--------------------+-------------------+-------------------+------+
|Accident_Id|         Description|Severity|  weather_timestamp|              latlog|    start_timestamp|      end_timestamp|pop_id|
+-----------+--------------------+--------+-------------------+--------------------+-------------------+-------------------+------+
|     A-2755|Between Cross Cou...|       2|2017-01-26 19:51:00|40.94003,-73.8556...|2017-01-26 19:44:00|2017-01-27 01:44:00| 36119|
+-----------+--------------------+--------+--

<strong>Data Analysis-Questions Answered By the Data</strong>

<p><strong>Which counties had the most severe accidents? Do they have a higher than average population? Which counties had the highest number of accidents? Do they have a higher than average population?</strong></p>
<p>The counties with the most severe accidents do not have higher than average population. The counties with the highest number of accidents do have higher than average population. This could be because in uncongested areas, the only accidents that happen are on highspeed highways, which tend to be more serious. There are not as many minor fender benders.</p>
<p><strong>What day of the week do most accidents occur?</strong></p>
<p>The day of the week that most accidents occurred was Thursday.</p>
<p><strong>What was the average temperature at which accidents occurred in New York? In North Dakota?</strong></p>
<p>For NY the average temperature was 61, for North Dakota it was 55.</p> 


In [11]:
#read in parquet files
accident_table_2016_2017 = spark.read.parquet('s3://accidents-kb789/accident_tables/')
population_table = spark.read.parquet('s3://population-kb789/population_table/')
location_table = spark.read.parquet('s3://kb789-location/location-table/')
weather_table = spark.read.parquet('s3://kb789-weather/weather-table/')
time_table = spark.read.parquet('s3://kb789-time/time-table/')


#find average severity of accidents by county, sort by counties with the highest severity, indicate if their population
#is above or below the average national population
accident_location_2016_2017=accident_table_2016_2017.join(location_table,((location_table.lat_log== \
    accident_table_2016_2017.latlog)), how="inner")
average_population=population_table.groupBy().avg('Pop_2016')
accident_severity_2016_2017=accident_location_2016_2017.groupBy("pop_id") \
                      .avg("Severity")
severity_pop=accident_severity_2016_2017.join(population_table,(population_table.pop_id== \
    accident_severity_2016_2017.pop_id))
severity_pop=severity_pop.withColumn('Above Average 2016 Population?', when(col('Pop_2016') > 101494, "Yes").otherwise("No"))
severity_pop=severity_pop.drop('pop_id','area_name','economic_typology', 'Rural_Urban_Code', \
    'Pop_2016','Pop_2017','Net_Pop_Change_2016','Net_Pop_Change_2017')
severity_pop.sort(desc("avg(Severity)")).show()

#find total number of accidents by county, sort by counties with the most accidents, indicate if their population
#is above or below the average national population
accident_count_2016_2017=accident_location_2016_2017.groupBy("pop_id") \
                      .count() 
count_pop=accident_count_2016_2017.join(population_table,(population_table.pop_id== \
    accident_count_2016_2017.pop_id))
count_pop=count_pop.withColumn('Above Average 2016 Population?', when(col('Pop_2016') > 101494, "Yes").otherwise("No"))
count_pop=count_pop.drop('pop_id','area_name','economic_typology', 'Rural_Urban_Code', \
    'Pop_2016','Pop_2017','Net_Pop_Change_2016','Net_Pop_Change_2017')
count_pop.sort(desc("count")).show()

#find what day of the week the most accidents occor
accident_time=accident_table_2016_2017.join(time_table,((time_table.start_timestamp== \
    accident_table_2016_2017.start_timestamp) & (time_table.end_timestamp== \
    accident_table_2016_2017.end_timestamp)), how="inner")
accident_day=accident_time.groupBy("start_weekday") \
                      .count()
accident_day.sort(desc("count")).show(20)

#Find out what was the average temperature at which accidents occurred in a particular state
acc_loc_ny=accident_location_2016_2017.filter(accident_location_2016_2017.State=="NY")
acc_loc_weather_ny=acc_loc_ny.join(weather_table,((weather_table.weather_timestamp== \
    acc_loc_ny.weather_timestamp)), how="inner")
acc_loc_weather_ny.groupBy().avg('Temperature').show()

acc_loc_ny=accident_location_2016_2017.filter(accident_location_2016_2017.State=="ND")
acc_loc_weather_ny=acc_loc_ny.join(weather_table,((weather_table.weather_timestamp== \
    acc_loc_ny.weather_timestamp)), how="inner")
acc_loc_weather_ny.groupBy().avg('Temperature').show()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------+-----+----------+------------------------------+
|avg(Severity)|state|    county|Above Average 2016 Population?|
+-------------+-----+----------+------------------------------+
|          4.0|   GA|    Clinch|                            No|
|          4.0|   KY|     Floyd|                            No|
|          4.0|   NY|    Fulton|                            No|
|          4.0|   GA|  Crawford|                            No|
|          4.0|   AL|Lauderdale|                            No|
|          4.0|   OK|  Pontotoc|                            No|
|          4.0|   MS|     Leake|                            No|
|          4.0|   NC|    Greene|                            No|
|          4.0|   PA|   Montour|                            No|
|          4.0|   IA|Winneshiek|                            No|
|          4.0|   IN|     Union|                            No|
|          4.0|   OK|    Pawnee|                            No|
|          4.0|   GA|  Colquitt|        

<strong>Conclusion</strong>

<p>This project is research oriented; it's about asking questions and getting answers that might lead you to want to ask more questions. Because this is not a situation where you know every question that’s going to be asked ahead of time, and where the same queries are going to be run again and again—like with a company that has a database of products, that has the same query checking inventory run many times a day—a Data Lake seemed like a better choice than a Redshift Database, because of the flexibility it provides. PySpark appeared to be the right tool to use, because it made it easy to first import the large unstructured datasets, examine them, and then apply structuring. S3 was a good place to house the large csv files, because it could be seamlessly accessed from a PySpark notebook running on an EMR cluster. Storing the final, structured tables as Parquet files worked well because they could easily be read in and have analysis applied to them. Pyspark was very effective at querying the large tables to perform analysis. </p>

<p>Since the census data is annual, it would be ideal if that dataset could be updated yearly. However, the accident dataset could be updated more frequently, especially if we wanted to see emerging patterns. If the data was increased by 100x, <a href = "https://towardsdatascience.com/apache-spark-performance-boosting-e072a3ec1179"> some coding practices that improve performance could be implemented to handle this increase.</a> Some possibilities include replacing joins and aggregations with windows, using options other than count when you don’t need to know the exact number or rows, and avoiding .show() in production. If the data needed to be run daily by 7AM, Apache Airflow could be used to schedule the Spark application. <a href="https://medium.com/analytics-vidhya/a-gentle-introduction-to-data-workflows-with-apache-airflow-and-apache-spark-6c2cd9aee573"> Apache and Airflow can be set up to work together</a>, and Airflow will not only schedule a Spark job but also notify users if the job fails and retry it. If the dataset needed to be accessed by 100+ people, the use of PySpark might become problematic because <a href="https://databricks.com/session/not-your-fathers-database-how-to-use-apache-spark-properly-in-your-big-data-architecture"> “Spark is not meant to be optimal for supporting many concurrent requests.”</a> At that point, it might be better to create an ETL to pipeline the data into a Redshift database.</p>
