# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, mean
from pyspark.sql.functions import expr, to_date
from pyspark.sql.functions import array, col, explode, struct, lit



In [2]:
import networkx as nx
import matplotlib.pyplot as plt
from graphviz import Digraph

In [4]:
# Read in the data here
spark = SparkSession.builder.master("local") \
    .appName("US Accidents and Weather EDA") \
    .getOrCreate()


### Step 1: Scope the Project and Gather Data

#### Scope 
    US Accidents Data: This data can be found on Kaggle. It includes several million records of accidents across the US, with details about the severity, location, and timing of each accident.
    US City Demographic Data: This data comes from OpenSoft and includes information by city, state, age, population, veteran status, and race.
    Weather Data: This data can be found on Kaggle or other open data platforms. It includes historical weather conditions by city, including temperature, precipitation, and weather events.

#### Describe and Gather Data 
The end result will be a data warehouse in the cloud (AWS Redshift). It will be used for analytics purposes, allowing the company to identify accident patterns and correlations with weather and demographic factors.

In [5]:
# Load the accidents data
accidents_df = spark.read.format('csv').option('header','true').option('inferSchema', 'true').load('data/US_Accidents_Dec21_updated.csv')


# Load the weather data
city_attributes_df = spark.read.format('csv').option('header','true').option('inferSchema', 'true').load('data/city_attributes.csv')
humidity_df = spark.read.format('csv').option('header','true').option('inferSchema', 'true').load('data/humidity.csv')
pressure_df = spark.read.format('csv').option('header','true').option('inferSchema', 'true').load('data/pressure.csv')
temperature_df = spark.read.format('csv').option('header','true').option('inferSchema', 'true').load('data/temperature.csv')
weather_desc_df = spark.read.format('csv').option('header','true').option('inferSchema', 'true').load('data/weather_description.csv')
wind_direction_df = spark.read.format('csv').option('header','true').option('inferSchema', 'true').load('data/wind_direction.csv')
wind_speed_df = spark.read.format('csv').option('header','true').option('inferSchema', 'true').load('data/wind_speed.csv')


In [6]:
# Load the demographics data
demographics_df = spark.read.format('csv').option('header','true').option("delimiter", ";").option('inferSchema', 'true').load('data/us-cities-demographics.csv')


In [7]:
# Show the first few records of each dataframe
accidents_df.show()


+----+--------+-------------------+-------------------+------------------+------------------+------------------+------------------+-------------------+--------------------+-------+--------------+----+------------+----------+-----+----------+-------+----------+------------+-------------------+--------------+-------------+-----------+------------+--------------+--------------+---------------+-----------------+-----------------+-------+-----+--------+--------+--------+-------+-------+----------+-------+-----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+
|  ID|Severity|         Start_Time|           End_Time|         Start_Lat|         Start_Lng|           End_Lat|           End_Lng|       Distance(mi)|         Description| Number|        Street|Side|        City|    County|State|   Zipcode|Country|  Timezone|Airport_Code|  Weather_Timestamp|Temperature(F)|Wind_Chill(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Directi

In [8]:
demographics_df.show()



+-------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|         City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+-------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|       Denver|     Colorado|      34.1|         341137|           341408|          682545|             29363|      113222|                  2.33|        CO|Black or African-...|72288|
|        Provo|         Utah|      23.6|          56231|            59027|          115258|              2177|       10925|    3.2800000000000002|        UT|American Indian a...| 1916|
|      Hampton|     Virginia|      35.5|          66214|            70240| 

In [9]:
city_attributes_df.show()


+-------------+-------------+---------+-----------+
|         City|      Country| Latitude|  Longitude|
+-------------+-------------+---------+-----------+
|    Vancouver|       Canada| 49.24966|-123.119339|
|     Portland|United States|45.523449|-122.676208|
|San Francisco|United States|37.774929|-122.419418|
|      Seattle|United States|47.606209|-122.332069|
|  Los Angeles|United States|34.052231|-118.243683|
|    San Diego|United States|32.715328|-117.157257|
|    Las Vegas|United States|36.174969|-115.137222|
|      Phoenix|United States| 33.44838|-112.074043|
|  Albuquerque|United States|35.084492|-106.651138|
|       Denver|United States|39.739151|-104.984703|
|  San Antonio|United States| 29.42412| -98.493629|
|       Dallas|United States|32.783058| -96.806671|
|      Houston|United States|29.763281| -95.363274|
|  Kansas City|United States|39.099731| -94.578568|
|  Minneapolis|United States|44.979969|  -93.26384|
|  Saint Louis|United States| 38.62727| -90.197891|
|      Chica

In [10]:
humidity_df.show()


+-------------------+---------+--------+-------------+-------+-----------+---------+---------+-------+-----------+------+-----------+------+-------+-----------+-----------+-----------+-------+---------+------------+-------+-------+------------+---------+-----+----------+-------+------------+--------+--------+------+---------+-----------------+-----+-----+---------+---------+
|           datetime|Vancouver|Portland|San Francisco|Seattle|Los Angeles|San Diego|Las Vegas|Phoenix|Albuquerque|Denver|San Antonio|Dallas|Houston|Kansas City|Minneapolis|Saint Louis|Chicago|Nashville|Indianapolis|Atlanta|Detroit|Jacksonville|Charlotte|Miami|Pittsburgh|Toronto|Philadelphia|New York|Montreal|Boston|Beersheba|Tel Aviv District|Eilat|Haifa|Nahariyya|Jerusalem|
+-------------------+---------+--------+-------------+-------+-----------+---------+---------+-------+-----------+------+-----------+------+-------+-----------+-----------+-----------+-------+---------+------------+-------+-------+------------+

In [11]:

temperature_df.show()


+-------------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-----------------+-------------+-----+---------+---------+
|           datetime|    Vancouver|     Portland|San Francisco|      Seattle|  Los Angeles|    San Diego|    Las Vegas|      Phoenix|  Albuquerque|       Denver|  San Antonio|       Dallas|      Houston|  Kansas City|  Minneapolis|  Saint Louis|      Chicago|    Nashville| Indianapolis|      Atlanta|      Detroit| Jacksonville|    Charlotte|        Miami|   Pittsburgh|      Toronto| Philadelphia|     New York|     Montreal|       Boston|    Beersheba|Tel Aviv District|        Eilat

In [12]:
weather_desc_df.show()
wind_direction_df.show()
wind_speed_df.show()
pressure_df.show()

+-------------------+-------------+----------------+-------------+----------------+------------+------------+------------+------------+------------+-------------+------------+------------+----------------+------------+----------------+------------+---------------+---------------+---------------+---------------+------------+----------------+---------------+--------------------+----------------+------------+-------------+----------+---------------+------------+---------------+-----------------+-------------+---------------+---------------+---------------+
|           datetime|    Vancouver|        Portland|San Francisco|         Seattle| Los Angeles|   San Diego|   Las Vegas|     Phoenix| Albuquerque|       Denver| San Antonio|      Dallas|         Houston| Kansas City|     Minneapolis| Saint Louis|        Chicago|      Nashville|   Indianapolis|        Atlanta|     Detroit|    Jacksonville|      Charlotte|               Miami|      Pittsburgh|     Toronto| Philadelphia|  New York|      

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [13]:
# Count the number of records in each dataframe
print("Accidents count: ", accidents_df.count())
print("Demographics count: ", demographics_df.count())
print("City attributes count: ", city_attributes_df.count())
print("Humidity count: ", humidity_df.count())
print("Pressure count: ", pressure_df.count())
print("Temperature count: ", temperature_df.count())
print("Weather description count: ", weather_desc_df.count())
print("Wind direction count: ", wind_direction_df.count())
print("Wind speed count: ", wind_speed_df.count())

Accidents count:  2845342
Demographics count:  2891
City attributes count:  36
Humidity count:  45253
Pressure count:  45253
Temperature count:  45253
Weather description count:  45253
Wind direction count:  45253
Wind speed count:  45253


In [14]:

# Print the schema of each dataframe
accidents_df.printSchema()





root
 |-- ID: string (nullable = true)
 |-- Severity: integer (nullable = true)
 |-- Start_Time: timestamp (nullable = true)
 |-- End_Time: timestamp (nullable = true)
 |-- Start_Lat: double (nullable = true)
 |-- Start_Lng: double (nullable = true)
 |-- End_Lat: double (nullable = true)
 |-- End_Lng: double (nullable = true)
 |-- Distance(mi): double (nullable = true)
 |-- Description: string (nullable = true)
 |-- Number: double (nullable = true)
 |-- Street: string (nullable = true)
 |-- Side: string (nullable = true)
 |-- City: string (nullable = true)
 |-- County: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Timezone: string (nullable = true)
 |-- Airport_Code: string (nullable = true)
 |-- Weather_Timestamp: timestamp (nullable = true)
 |-- Temperature(F): double (nullable = true)
 |-- Wind_Chill(F): double (nullable = true)
 |-- Humidity(%): double (nullable = true)
 |-- Pressure(

In [15]:
city_attributes_df.printSchema()


root
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)



In [16]:
demographics_df.printSchema()


root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: double (nullable = true)
 |-- Male Population: integer (nullable = true)
 |-- Female Population: integer (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- Number of Veterans: integer (nullable = true)
 |-- Foreign-born: integer (nullable = true)
 |-- Average Household Size: double (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: integer (nullable = true)



In [17]:
humidity_df.printSchema()
pressure_df.printSchema()
temperature_df.printSchema()
weather_desc_df.printSchema()
wind_direction_df.printSchema()
wind_speed_df.printSchema()

root
 |-- datetime: timestamp (nullable = true)
 |-- Vancouver: double (nullable = true)
 |-- Portland: double (nullable = true)
 |-- San Francisco: double (nullable = true)
 |-- Seattle: double (nullable = true)
 |-- Los Angeles: double (nullable = true)
 |-- San Diego: double (nullable = true)
 |-- Las Vegas: double (nullable = true)
 |-- Phoenix: double (nullable = true)
 |-- Albuquerque: double (nullable = true)
 |-- Denver: double (nullable = true)
 |-- San Antonio: double (nullable = true)
 |-- Dallas: double (nullable = true)
 |-- Houston: double (nullable = true)
 |-- Kansas City: double (nullable = true)
 |-- Minneapolis: double (nullable = true)
 |-- Saint Louis: double (nullable = true)
 |-- Chicago: double (nullable = true)
 |-- Nashville: double (nullable = true)
 |-- Indianapolis: double (nullable = true)
 |-- Atlanta: double (nullable = true)
 |-- Detroit: double (nullable = true)
 |-- Jacksonville: double (nullable = true)
 |-- Charlotte: double (nullable = true)
 |-- M

**Identifying Missing Values**

In [18]:
### Identifying Missing Values

# Get list of columns in the DataFrame
columns = accidents_df.columns

# Initialize an empty list to hold column expressions
column_exprs = []

# For each column, create an expression that counts null values
for c in columns:
    column_expr = sum(col(c).isNull().cast("int")).alias(c)
    column_exprs.append(column_expr)

# Select these column expressions from the DataFrame
null_counts_df = accidents_df.select(*column_exprs)

# Display the result
null_counts_df.show()

+---+--------+----------+--------+---------+---------+-------+-------+------------+-----------+-------+------+----+----+------+-----+-------+-------+--------+------------+-----------------+--------------+-------------+-----------+------------+--------------+--------------+---------------+-----------------+-----------------+-------+----+--------+--------+--------+-------+-------+----------+-------+----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+
| ID|Severity|Start_Time|End_Time|Start_Lat|Start_Lng|End_Lat|End_Lng|Distance(mi)|Description| Number|Street|Side|City|County|State|Zipcode|Country|Timezone|Airport_Code|Weather_Timestamp|Temperature(F)|Wind_Chill(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Direction|Wind_Speed(mph)|Precipitation(in)|Weather_Condition|Amenity|Bump|Crossing|Give_Way|Junction|No_Exit|Railway|Roundabout|Station|Stop|Traffic_Calming|Traffic_Signal|Turning_Loop|Sunrise_Sunset|Civil_Twilight|N

**    Identifying Duplicates**

In [19]:
duplicates = accidents_df.count() - accidents_df.distinct().count()
print("Number of duplicate rows: ", duplicates)


Number of duplicate rows:  0


**Identifying Missing Values**

In [20]:
### Identifying Missing Values

# Get list of columns in the DataFrame
columns = accidents_df.columns

# Initialize an empty list to hold column expressions
column_exprs = []

# For each column, create an expression that counts null values
for c in columns:
    column_expr = sum(col(c).isNull().cast("int")).alias(c)
    column_exprs.append(column_expr)

# Select these column expressions from the DataFrame
null_counts_df = accidents_df.select(*column_exprs)

# Display the result
null_counts_df.show()

+---+--------+----------+--------+---------+---------+-------+-------+------------+-----------+-------+------+----+----+------+-----+-------+-------+--------+------------+-----------------+--------------+-------------+-----------+------------+--------------+--------------+---------------+-----------------+-----------------+-------+----+--------+--------+--------+-------+-------+----------+-------+----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+
| ID|Severity|Start_Time|End_Time|Start_Lat|Start_Lng|End_Lat|End_Lng|Distance(mi)|Description| Number|Street|Side|City|County|State|Zipcode|Country|Timezone|Airport_Code|Weather_Timestamp|Temperature(F)|Wind_Chill(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Direction|Wind_Speed(mph)|Precipitation(in)|Weather_Condition|Amenity|Bump|Crossing|Give_Way|Junction|No_Exit|Railway|Roundabout|Station|Stop|Traffic_Calming|Traffic_Signal|Turning_Loop|Sunrise_Sunset|Civil_Twilight|N

Looking at the DataFrame summary we provided, it's evident that several columns have missing values. Whether we need to address this depends on the specific requirements of our project and how we plan to use the data.

Here are some general considerations:

    Columns with Missing Values: We need to examine which columns have missing values. If a column is not critical to our analysis and has a substantial amount of missing values, we might consider excluding it. For instance, 'Wind_Chill(F)' and 'Precipitation(in)' have many missing values and may not be critical for a traffic accidents analysis.

    Rows with Missing Values: If only a small fraction of rows have missing values, we might consider removing those rows, particularly if they're not crucial to our analysis.

    Imputation: If a column is critical to our analysis and has some missing values, we might consider imputing the missing values (i.e., filling them in based on other data). For example, we could fill missing values in the 'Temperature(F)' column with the average temperature.

    Data Completeness: We must consider the completeness of our data. If there are many missing values, our analysis might be biased. For instance, if we're missing weather conditions for a certain region or time period, any analysis related to weather conditions might be skewed.

**Cleaning Steps**

In [21]:
# Remove missing values
accidents_df = accidents_df.na.drop()
demographics_df = demographics_df.na.drop()
city_attributes_df = city_attributes_df.na.drop()
humidity_df = humidity_df.na.drop()
pressure_df = pressure_df.na.drop()
temperature_df = temperature_df.na.drop()
weather_desc_df = weather_desc_df.na.drop()
wind_direction_df = wind_direction_df.na.drop()
wind_speed_df = wind_speed_df.na.drop()


In [22]:
# Remove duplicates
accidents_df = accidents_df.dropDuplicates()
demographics_df = demographics_df.dropDuplicates()
city_attributes_df = city_attributes_df.dropDuplicates()
humidity_df = humidity_df.dropDuplicates()
pressure_df = pressure_df.dropDuplicates()
temperature_df = temperature_df.dropDuplicates()
weather_desc_df = weather_desc_df.dropDuplicates()
wind_direction_df = wind_direction_df.dropDuplicates()
wind_speed_df = wind_speed_df.dropDuplicates()

In [23]:
# Drop columns with a large number of missing values
accidents_df = accidents_df.drop('Wind_Chill(F)', 'Precipitation(in)')

# Drop rows with missing values in critical columns
accidents_df = accidents_df.na.drop(subset=["Temperature(F)", "Humidity(%)", "Pressure(in)", "Visibility(mi)"])

# Impute missing values in 'Temperature(F)' column with its mean
mean_val = accidents_df.select(mean(accidents_df['Temperature(F)'])).collect()[0][0]
accidents_df = accidents_df.na.fill(mean_val, subset=['Temperature(F)'])


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Conceptual Data Model:

    Star schema is chosen as it's simple, yet effective for this use case.
    Fact table: Accidents data.
    Dimension tables: Demographics and Weather.

#### 3.2 Mapping Out Data Pipelines
    Extract data from all sources using PySpark.
    Transform data (clean, format, and aggregate as necessary).
    Load transformed data into Redshift tables.

In [6]:
from graphviz import Digraph

# Create a new directed graph
dot = Digraph(comment='Data Model')

# Add nodes (tables)
dot.node('A', 'Accidents')
dot.node('C', 'City_Attributes')
dot.node('W', 'Weather')

# Add edges (relationships)
dot.edge('A', 'C', label='Many-to-One')
dot.edge('A', 'W', label='Many-to-One')

# Visualize the graph
dot.view()


Error: Could not open "Digraph.gv.pdf" for writing : Permission denied


CalledProcessError: Command '[WindowsPath('dot'), '-Kdot', '-Tpdf', '-O', 'Digraph.gv']' returned non-zero exit status 1. [stderr: b'Error: Could not open "Digraph.gv.pdf" for writing : Permission denied\r\n']

In [23]:
dot.render('data_model.png', view=True, format='png')


### Step 4: Run Pipelines to Model the Data 



#### 4.1 Create the data model
Build the data pipelines to create the data model.

The data model will be based on the Star Schema, where we will have one Fact table and several Dimension tables.

    Fact Table: accidents
    Dimension Tables: demographics, city_attributes, humidity, pressure, temperature, weather_desc, wind_direction, wind_speed.

In [27]:
def unpivot(df):
    # Get the list of all cities (all column names except 'datetime')
    cities = [x for x in df.columns if x != 'datetime']
    
    # Create an array of struct with two fields: 'City' and 'Value'
    df = df.withColumn("City_Value", explode(array([struct(lit(c).alias("City"), col(c).alias("Value")) for c in cities])))
    
    # Select 'datetime' and fields from the struct
    df = df.select("datetime", "City_Value.City", "City_Value.Value")
    
    return df

# Unpivot all weather dataframes
humidity_df = unpivot(humidity_df)
pressure_df = unpivot(pressure_df)
temperature_df = unpivot(temperature_df)
weather_desc_df = unpivot(weather_desc_df)
wind_direction_df = unpivot(wind_direction_df)

# Rename the 'Value' column in each dataframe to correspond to the correct weather attribute
humidity_df = humidity_df.withColumnRenamed('Value', 'Humidity')
pressure_df = pressure_df.withColumnRenamed('Value', 'Pressure')
temperature_df = temperature_df.withColumnRenamed('Value', 'Temperature')
weather_desc_df = weather_desc_df.withColumnRenamed('Value', 'Weather_Description')
wind_direction_df = wind_direction_df.withColumnRenamed('Value', 'Wind_Direction')

In [28]:
# 1. Fact Table - accidents
accidents_table = accidents_df.select("ID", "Severity", "Start_Time", "End_Time", "Start_Lat", "Start_Lng", "Description", "City", "County", "State", "Zipcode", "Temperature(F)", "Humidity(%)", "Pressure(in)", "Visibility(mi)", "Wind_Direction", "Wind_Speed(mph)", "Weather_Condition")

# Write accidents table to parquet file (if not empty)
if accidents_table.rdd.isEmpty():
    print("The accidents_table is empty.")
else:
    accidents_table.write.mode('overwrite').parquet('data/accidents.parquet')

# 2. Dimension Table - city_attributes
city_attributes_table = city_attributes_df.select("City", "Country", "Latitude", "Longitude")

# Write city_attributes table to parquet file (if not empty)
if city_attributes_table.rdd.isEmpty():
    print("The city_attributes_table is empty.")
else:
    city_attributes_table.write.mode('overwrite').parquet('data/acity_attributes.parquet')

# 3. Dimension Table - weather
# Merge weather related dataframes into one
weather_df = humidity_df.join(pressure_df, ["datetime", "City"]).join(temperature_df, ["datetime", "City"]).join(weather_desc_df, ["datetime", "City"]).join(wind_direction_df, ["datetime", "City"]).join(wind_speed_df, ["datetime", "City"])

weather_table = weather_df.select("datetime", "City", "Humidity", "Pressure", "Temperature", "Weather_Description", "Wind_Direction", "Wind_Speed")

# Write weather table to parquet file (if not empty)
if weather_table.rdd.isEmpty():
    print("The weather_table is empty.")
else:
    weather_table.write.mode('overwrite').parquet('data/aweather.parquet')


Py4JJavaError: An error occurred while calling o1990.parquet.
: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1249)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1454)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:334)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:404)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
	at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:192)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$writeAndCommit$3(FileFormatWriter.scala:275)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:640)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:275)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:354)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:382)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:354)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:387)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:360)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:789)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.