# Spark Structured Streaming to Databricks Delta Lake

In this project, we will set up a streaming ETL pipeline from object storage to the Databricks Delta Lake. Data will first be read from files stored at DBFS, transformed via Spark SQL, and loaded onto Delta Lake. Data used will be the cleaned NHTSA data from the prior *Data Engineering Capstone* project. We will join state information (glcc.csv) to accident data (accident2016c.csv, accident2017c.csv, accident2018c.csv) and calculate aggregate fatalities (deaths) by location. This group summary statistics will be loaded into a Delta table.

***Preview accident/glc data***

In [0]:
accident2016df = (spark.read
                       .option("header", True)
                       .option("inferSchema", True)
                       .csv("/FileStore/tables/accident_data/accident2016c.csv"))

glcdf = (spark.read
              .option("header", True)
              .option("inferSchema", True)
              .csv("/FileStore/tables/glc_data/glcc.csv"))

display(accident2016df)
display(glcdf)

state,st_case,county,city,day,month,year,day_week,hour,minute,latitude,longitud,fatals
1,10001,73,1716,1,1,2016,6,9,25,33.42645833,-86.8197,1
1,10002,73,1716,10,1,2016,1,19,59,33.40030556,-86.7872,1
1,10003,73,0,1,1,2016,6,2,20,33.64924444,-86.6073,1
1,10004,73,0,17,1,2016,1,21,45,33.42995556,-86.8222,1
1,10005,73,3125,26,1,2016,3,20,7,33.41205,-86.8056,1
1,10006,109,0,1,1,2016,6,1,10,31.72063333,-85.96,1
1,10007,117,530,1,1,2016,6,23,34,33.14415,-86.7928,1
1,10008,83,0,4,1,2016,2,17,30,34.96508056,-87.0148,1
1,10009,103,2545,4,1,2016,2,5,37,34.50008611,-86.8537,2
1,10010,69,0,5,1,2016,3,15,18,31.31075,-85.2521,2


state_name,state_code,city_code,city_name,county_code,county_name
ALABAMA,1,10,ABBEVILLE,67,HENRY
ALABAMA,1,50,ALBERTVILLE,95,MARSHALL
ALABAMA,1,60,ALEXANDER CITY,123,TALLAPOOSA
ALABAMA,1,70,ALICEVILLE,107,PICKENS
ALABAMA,1,90,ANDALUSIA,39,COVINGTON
ALABAMA,1,96,ANDERSON,77,LAUDERDALE
ALABAMA,1,100,ANNISTON,15,CALHOUN
ALABAMA,1,101,ANNISTON ARMY DEPOT,15,CALHOUN
ALABAMA,1,103,ANSLEY,109,PIKE
ALABAMA,1,110,ARAB,95,MARSHALL


***Save schema from accident table***

In [0]:
accident_schema = accident2016df.schema

***Set up streaming accident dataframe***

In [0]:
accident_sdf = (spark.readStream
                     .format("csv")
                     .schema(accident_schema)
                     .load("/FileStore/tables/accident_data/"))

***Join with static glc dataframe (created above)***

In [0]:
accident_sdf.createOrReplaceTempView("accident_sdf")
glcdf.createOrReplaceTempView("glc_df")

accident_sdf2 = spark.sql("""SELECT g.state_name, g.city_name, g.county_name, a.fatals fatalities 
                             FROM accident_sdf a
                             JOIN glc_df g
                             ON a.state = g.state_code AND a.county = g.county_code AND a.city = g.city_code""")

***Calculate aggregate deaths***

In [0]:
accident_sdf2.createOrReplaceTempView("accident_sdf2")

accident_sdf3 = spark.sql("""SELECT state_name, city_name, county_name, SUM(fatalities) fatalities
                             FROM accident_sdf2
                             GROUP BY state_name, city_name, county_name""")

***Write to Delta table***

In [0]:
(accident_sdf3.writeStream
              .format("delta")
              .outputMode("complete")
              .option("checkpointLocation", "/FileStore/tables/checkpoint/")
              .queryName("stream1")
              .start("/FileStore/tables/delta_save/")
)

***Register "accident" Delta table***

In [0]:
%sql
CREATE TABLE accident
USING DELTA
LOCATION "/FileStore/tables/delta_save/"

***Check table data***

In [0]:
%sql
SELECT *
FROM accident
ORDER BY state_name, city_name, county_name

state_name,city_name,county_name,fatalities
ALABAMA,ABBEVILLE,HENRY,1
ALABAMA,ALBERTVILLE,MARSHALL,2
ALABAMA,ALEXANDER CITY,TALLAPOOSA,2
ALABAMA,ALICEVILLE,PICKENS,1
ALABAMA,ANNISTON,CALHOUN,3
ALABAMA,ARITON,DALE,1
ALABAMA,ASHFORD,HOUSTON,1
ALABAMA,ATTALLA,ETOWAH,3
ALABAMA,AUBURN,LEE,5
ALABAMA,BAILEYTON,CULLMAN,1


***Check table data for update - current is only 2016; we will add 2017 and 2018 data by uploading additional CSV files to "/FileStore/tables/accident_data/"***

In [0]:
%sql
SELECT *
FROM accident
ORDER BY state_name, city_name, county_name

state_name,city_name,county_name,fatalities
ALABAMA,ABBEVILLE,HENRY,1
ALABAMA,ALBERTVILLE,MARSHALL,8
ALABAMA,ALEXANDER CITY,TALLAPOOSA,8
ALABAMA,ALICEVILLE,PICKENS,2
ALABAMA,ANDALUSIA,COVINGTON,1
ALABAMA,ANNISTON,CALHOUN,11
ALABAMA,ARAB,MARSHALL,1
ALABAMA,ARDMORE,LIMESTONE,1
ALABAMA,ARITON,DALE,1
ALABAMA,ASHFORD,HOUSTON,2


***Stop the stream***

In [0]:
for stream in spark.streams.active:
  stream.stop()