# Databricks Coding Assignment

The assignment is graded for a total of **125 points**. The grade is determined by a combination of correctness, conciseness, and organization. You are expected to perform research online for many of these questions, so please note any resources you referred to with a link or comment.

Your answers do not need to fit into one cell. Just ensure that your answers are neat, labelled, and clearly explained. Additionally, this notebook should be able to run top-to-bottom, and it should not produce any errors.

There are two sections: data engineering/ETL and machine learning. You may write your code answers in Scala or Python unless otherwise directed. Each code answer should be accompanied by a short description that outlines what your code does and why you chose to implement it that way.

## Using Airlines Data Set
The following questions use the airlines dataset located at `dbfs:/databricks-datasets/airlines`. 

**Note**: The full dataset is 20+ GB and will take too long to read on a small cluster, but you must read in **at least 5 files**.

#### Airlines Question 1 [5 Points]
Write code that uses the DataFrame API to read in the airlines data set with clearly named columns.

In [0]:
# Get an overview of the files contained in the airlines dataset
display(dbutils.fs.ls("/databricks-datasets/airlines"))

path,name,size
dbfs:/databricks-datasets/airlines/README.md,README.md,1089
dbfs:/databricks-datasets/airlines/_SUCCESS,_SUCCESS,0
dbfs:/databricks-datasets/airlines/part-00000,part-00000,67108879
dbfs:/databricks-datasets/airlines/part-00001,part-00001,67108862
dbfs:/databricks-datasets/airlines/part-00002,part-00002,67108930
dbfs:/databricks-datasets/airlines/part-00003,part-00003,67108804
dbfs:/databricks-datasets/airlines/part-00004,part-00004,67108908
dbfs:/databricks-datasets/airlines/part-00005,part-00005,67108890
dbfs:/databricks-datasets/airlines/part-00006,part-00006,67108825
dbfs:/databricks-datasets/airlines/part-00007,part-00007,67108880


In [0]:
df_1 = spark.read.load("/databricks-datasets/airlines/part-00000",format="csv",sep=",",inferSchema="true",header="true" )
df_2 = spark.read.load("/databricks-datasets/airlines/part-000[1-6]*",format="csv",sep=",",inferSchema="true")
df = df_1.union(df_2)
print(df.count())


In [0]:
# Display the dataframe in tabular format
display(df)

Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay,IsArrDelayed,IsDepDelayed
1987,10,14,3,741.0,730,912.0,849,PS,1451,,91.0,79,,23.0,11.0,SAN,SFO,447,,,0,,0,,,,,,YES,YES
1987,10,15,4,729.0,730,903.0,849,PS,1451,,94.0,79,,14.0,-1.0,SAN,SFO,447,,,0,,0,,,,,,YES,NO
1987,10,17,6,741.0,730,918.0,849,PS,1451,,97.0,79,,29.0,11.0,SAN,SFO,447,,,0,,0,,,,,,YES,YES
1987,10,18,7,729.0,730,847.0,849,PS,1451,,78.0,79,,-2.0,-1.0,SAN,SFO,447,,,0,,0,,,,,,NO,NO
1987,10,19,1,749.0,730,922.0,849,PS,1451,,93.0,79,,33.0,19.0,SAN,SFO,447,,,0,,0,,,,,,YES,YES
1987,10,21,3,728.0,730,848.0,849,PS,1451,,80.0,79,,-1.0,-2.0,SAN,SFO,447,,,0,,0,,,,,,NO,NO
1987,10,22,4,728.0,730,852.0,849,PS,1451,,84.0,79,,3.0,-2.0,SAN,SFO,447,,,0,,0,,,,,,YES,NO
1987,10,23,5,731.0,730,902.0,849,PS,1451,,91.0,79,,13.0,1.0,SAN,SFO,447,,,0,,0,,,,,,YES,YES
1987,10,24,6,744.0,730,908.0,849,PS,1451,,84.0,79,,19.0,14.0,SAN,SFO,447,,,0,,0,,,,,,YES,YES
1987,10,25,7,729.0,730,851.0,849,PS,1451,,82.0,79,,2.0,-1.0,SAN,SFO,447,,,0,,0,,,,,,YES,NO


Explain your answer to Question 1 here.

1.1

I first need to understand the dbfs-structure to see what the airlines-dataset includes. I use the dbutils to view the directory of files.

1.2 

I started out with trying to use spark.read.load() on the first 5 files with a wildcard. This worked well but it is only the first file, part-00000, which includes the row-headers. When I tried to load the data with a wildcard for the first 5 files, the header gets overwritten with the next part-00001.
Instead I opted to read the first file, part-00000 into one dataframe with the headers, then read the 5 following parts, part-000[1-6] with a wildcard.
I then use union() to join the dataframes together into a new dataframe and count the rows.

1.3

I view the data in tabular format with display() to verify that I got my rows with the headers and the data looks good.

#### Airlines Question 2 [5 Points]
How many unique airlines are present in this dataset?

In [0]:
# Please provide your code answer for Question 2 here.

# Create a sql-table from the airlines data

df.createOrReplaceTempView("airlines")

In [0]:
%sql
select count(distinct `UniqueCarrier`) as UniqueCarrierCount from airlines

UniqueCarrierCount
15


Explain your answer to Question 2 here.

2.1

First, I create a table from my previous dataframe with createOrReplaceTempView() and name it "airlines".

2.2

Then I use %sql to perform my sql-query on my table. I use count(distinct()) to count each unique airline company from the "UniqueCarrier" column.

#### Airlines Question 3 [10 Points]
Which airline is delayed on departure most often? Show a bar graph of the top five most delayed airlines.

In [0]:
%sql
select `UniqueCarrier`, count(case when `IsDepDelayed` = 'YES' then 1 else 0 end)
  as delayedCount 
  from airlines
  group by `UniqueCarrier` 
  order by delayedCount desc 
  limit 5

UniqueCarrier,delayedCount
DL,6530112
US,6348244
AA,5515582
UA,4914249
NW,3661404


Please provide your brief, written description of your code here.

3.1

I start of by selecting UniqueCarrier to work with that column, and then I count each time a departure is delayed from the "IsDepDelayed" column with a case. Each time a value in the "IsDepDelayed" column corresponds to "YES" it gets counted, else it is skipped. I group the values by "UniqueCarrier" and order it by "delayedCount" in a descending order. I limit the columns to 5 and use the native-notebook "Bar Graph" feature to display my data.

Sources:

https://docs.databricks.com/spark/latest/spark-sql/language-manual/sql-ref-syntax-qry-select-case.html

#### Airlines Question 4 [15 Points]
What was the average arrival delay per airline, and on average, did flights arrive early or late? 

Add a column to this new dataframe (containing the grouped averages) that contains the string "Late" if the average arrival for that airline arrive >15 minutes late, "Early" if the average arrival is <0 minutes late, and "On-time" if the average arrival is between 0 and 15 minutes late.

To add the additional column, use a Spark UDF. Additionally, make sure to filter out or fill in null values in your dataframe (if there are any) prior to applying the UDF.

In [0]:
# Create new df with a new ArrDelayInt column which is ArrDelay converted from string
df_ArrDelayInt = df.withColumn("ArrDelayInt", df["ArrDelay"].cast("int"))

In [0]:
# Import modules
from pyspark.sql import functions as F
from pyspark.sql.types import *

# Create new dataframe which groups by UniqueCarrier and aggregates Arrival Delay values
df_arrDelay = df_ArrDelayInt.groupBy("UniqueCarrier").agg(F.avg("ArrDelayInt"))

# Function to calculate Arrival Time Value
def ArrTimeCalc(ArrDelayInt):
  """Function to calculate if a flight was Late, Early or OnTime for their arrival, based on the flight's Arrival Delay"""
  if ArrDelayInt < 0:
    return "Early"
  elif ArrDelayInt > 15:
    return "Late"
  else:
    return "OnTime"

# Convert function to UDF and declare that the output should be String-type
udfArrTimeCalc = F.udf(ArrTimeCalc, StringType())

# Create new dataframe based on df_arrDelay and add column AvgArrTime, pass udf to calculate AvgArrTime
airlines_ArrTimeCalc = df_arrDelay.withColumn("AvgArrTime", udfArrTimeCalc("avg(ArrDelayInt)"))

# Show the dataframe
airlines_ArrTimeCalc.show()

Please provide your brief, written description of your code here.

4.1

I started off by trying to calculate the Arrival Time Value (ontime, early, late) with the arrDelay-value from the initial load. However, since this value was a string, I tried to find an easy way to convert the value to an integer to be able to work with it.
The easiest (probably not the most efficient) way I found was to read the data into a new dataframe and use the arrDelay-value and cast() to convert it into int and add it as a new column.

4.2 

After I have my arrDelay in the proper type, I start off by creating another dataframe only for this purpose and group it on UniqueCarrier and use agg(avg)
Then I define a function to calculate the arrival time based on the average arrival delay which I aggregated earlier. This was only possible if the arrDelay was converted from string to int or float etc.
Then I define a function which I use my function import as F and convert the ArrTimeCalc into a udf which will output a string.
The udf is then used to create a new dataframe with a new column AvgArrTime, calculated by the udf.

I then display the dataframe.


Sources:
https://docs.databricks.com/spark/latest/spark-sql/udf-python.html
https://towardsdatascience.com/5-ways-to-add-a-new-column-in-a-pyspark-dataframe-4e75c2fd8c08
https://stackoverflow.com/questions/46956026/how-to-convert-column-with-string-type-to-int-form-in-pyspark-data-frame

#### Airlines Question 5 [15 Points]
What file format is airlines data stored in, and was this the most optimal format for the questions asked above?


What format would you store this data in if you frequently queried only the UniqueCarr and CancellationCode columns? 


What if you frequently read entire rows of the dataset? 


**Note:** Cite any sources used. You do not need a code answer for this question.

Please write your answer in this cell. 

1. 
The data is formatted as csv-files. In my mind, when working with a small cluster like in this solution, anything which improves performance on read/writes is preferred.
Parquet would probably be better in this case, due to the native capabilities of Parquet. Parquet is designed as a columnar storage format to support complex data processing, and stores its data in columns.
ORC is also an alternative which would improve performance, and is also natively columnar storage.

Sources: 
https://oswinrh.medium.com/parquet-avro-or-orc-47b4802b4bcb

https://dzone.com/articles/how-to-be-a-hero-with-powerful-parquet-google-and

https://www.datanami.com/2018/05/16/big-data-file-formats-demystified/

https://blog.openbridge.com/how-to-be-a-hero-with-powerful-parquet-google-and-amazon-f2ae0f35ee04

2. 
If you only want to frequently read only a few columns, Parquet might be a good option. Parquet reads only required columns, thus reducing the disk I/O.

Sources:

https://oswinrh.medium.com/parquet-avro-or-orc-47b4802b4bcb

3.
If you frequently want to read entire rows of a dataset, Avro could be a good option. It is row-based (store data in rows), and row-based databases are best for write-heavy transactional workloads. It also supports Spark SQL.

Sources:

https://oswinrh.medium.com/parquet-avro-or-orc-47b4802b4bcb

https://en.wikipedia.org/wiki/Apache_Avro

#### Airlines Question 6 [5 Points]
If you needed to keep multiple versions of this dataset, why might you use the [Delta format](https://delta.io/) to do this efficiently?

Answer goes here

Delta Lake uses versioned Parquet files to store data. Apart from versions, Delta Lake also stores a transaction log to keep track of all the commits made to the table or blod store directory to provide ACID transactions.

This allows you to multiple versions of the same data, and efficiently query them individually.

This can be useful for Audit purposes, Experimenting with different versions and scenario testing and rollbacks from failed data pipelines.

Sources:
https://databricks.com/blog/2019/02/04/introducing-delta-time-travel-for-large-scale-data-lakes.html
https://docs.delta.io/latest/delta-faq.html

## Using Baby Names Data Set

This dataset comes from a website referenced by [Data.gov](http://catalog.data.gov/dataset/baby-names-beginning-2007). It lists baby names used in the state of NY from 2007 to 2012.

Run the following three cells to copy this file to the cluster.

In [0]:
%fs rm dbfs:/tmp/rows.json

In [0]:
%scala

import java.net.URL
import java.io.File
import org.apache.commons.io.FileUtils

val tmpFile = new File("/tmp/rows.json")
FileUtils.copyURLToFile(new URL("https://health.data.ny.gov/api/views/jxy9-yhdk/rows.json?accessType=DOWNLOAD"), tmpFile)

In [0]:
%fs mv file:/tmp/rows.json dbfs:/tmp/rows.json

In [0]:
display(dbutils.fs.ls("/tmp/rows.json"))

path,name,size
dbfs:/tmp/rows.json,rows.json,11651510


In [0]:
%fs head dbfs:/tmp/rows.json

#### Baby Names Question 1 - Nested Data [15 Points]


Use Spark SQL's native JSON support to read the baby names file into a dataframe. Use this dataframe to create a temporary table containing all the nested data columns ("sid", "id", "position", "created_at", "created_meta", "updated_at", "updated_meta", "meta", "year", "first_name", "county", "sex", "count") so that they can be queried using SQL. 

Hint: you can use ```%fs head dbfs:/tmp/rows.json``` to take a look at the dataset before reading it in. 

Suggested Steps:
1. Read in the JSON data
2. Pull all columns in the nested data column to top level, following the schema specified above
3. Create a temp table from this expanded dataframe using createOrReplaceTempView()

In [0]:
# Please provide your code answer for Question 1 here
from pyspark.sql.functions import explode

# Create a temporary table from the rows.json-file. Since the .json-file is nested, so we need to flatten the data to be able to pull all the columns into a tabular format, and still keep the schema.
# We need to declare that the .json-file is multiLine, then we use explode() to return a row for each element in an array. This gets written into a temporary table tempData

spark.read.json('/tmp/rows.json', multiLine=True).select(explode("data").alias("data")).createOrReplaceTempView("tempData")

# I use sqlContext to extract the fields I am interested in from the tempData-view. I select where how I want to order my new table and then I create it from the tempData-view. I store the data as a dataframe.
namesDF = sqlContext.sql("SELECT data[0] AS sid, data[1] AS id, data[2] AS position, data[3] AS created_at, data[4] AS created_meta, data[5] AS updated_at, data[6] AS updated_meta, data[7] AS meta, data[8] AS year, data[9] AS first_name, data[10] AS county, data[11] AS sex, data[12] AS count FROM tempData")

# I use namesDF to create a TempView with name "babynames"
namesDF.createOrReplaceTempView("babynames")

# I display the table with a simple select * to verify that my data is correct
display(sqlContext.sql("select * from babynames order by year desc limit 10"))

sid,id,position,created_at,created_meta,updated_at,updated_meta,meta,year,first_name,county,sex,count
row-brkm-7izk-trjm,00000000-0000-0000-4F52-2DEB83640FD1,0,1611674742,,1611674742,,{ },2018,OLIVIA,Albany,F,17
row-fcck.7stn~2bpy,00000000-0000-0000-492D-E5EE5BEF7EA9,0,1611674742,,1611674742,,{ },2018,AMELIA,Albany,F,9
row-2m5x_rpr2.gwvc,00000000-0000-0000-1EA7-DD209008AB39,0,1611674742,,1611674742,,{ },2018,AVA,Albany,F,17
row-xcx9~hw65_ib5p,00000000-0000-0000-B9C1-8C79EAD9373A,0,1611674742,,1611674742,,{ },2018,ISABELLA,Albany,F,15
row-684m~4agu.tevb,00000000-0000-0000-2B4E-17FEA39BCFA9,0,1611674742,,1611674742,,{ },2018,EMMA,Albany,F,14
row-jgz3~57z7_bxvk,00000000-0000-0000-148F-D9BDB1F240A9,0,1611674742,,1611674742,,{ },2018,ELLA,Albany,F,14
row-wd5t.hgpt_y3ca,00000000-0000-0000-B33D-FBFDDD4A2ED9,0,1611674742,,1611674742,,{ },2018,SOPHIA,Albany,F,13
row-nx5n_6sgb-9zw6,00000000-0000-0000-E6B5-81172AF1194C,0,1611674742,,1611674742,,{ },2018,CHARLOTTE,Albany,F,13
row-a7si_ssat_idv7,00000000-0000-0000-5404-3A8960FEC50A,0,1611674742,,1611674742,,{ },2018,ABIGAIL,Albany,F,13
row-2wv2_td5s.4ez7,00000000-0000-0000-28D7-F41FABE3E982,0,1611674742,,1611674742,,{ },2018,EVELYN,Albany,F,11


See code comments

#### Baby Names Question 2 - Multiple Languages [10 Points]

Using the temp table you created in the question above, write a SQL query that gives the most popular baby name for each year in the dataset. Then, write the same query using either the Scala or Python dataframe APIs.

In [0]:
%sql
SELECT
  first_name,
  count,
  year
FROM (
  SELECT
    first_name,
    count,
    year,
    dense_rank() OVER (PARTITION BY year ORDER BY count DESC) as rank
  FROM babynames) tmp
WHERE
  rank <= 1

first_name,count,year
JOHN,99,2007
JAKE,99,2007
JEREMIAH,99,2008
MATTHEW,99,2008
JACK,99,2009
SOPHIA,99,2009
MADISON,99,2009
CHAIM,99,2010
JUSTIN,99,2010
LOGAN,99,2010


In [0]:
from pyspark.sql import Window
from pyspark.sql.functions import rank,dense_rank

df2=namesDF.withColumn("rank",rank().over(Window.partitionBy("year").orderBy("count")))
print("Printing the dataframe df2")
df2.show()


In [0]:
df2.select("year", "first_name", "count", "rank").show()

In [0]:
https://www.linkedin.com/pulse/window-function-spark-sql-sunita-sharma/?articleId=6648094047598632960

  https://issuu.com/aegiscanada/docs/nested_json_data_processing_using_spark
    
    https://learnsql.com/blog/how-to-rank-rows-in-sql/
      
      https://www.sqlshack.com/overview-of-sql-rank-functions/
        
        https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-window.html
          
          https://github.com/PhantomInsights/baby-names-analysis
            
            https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
              
              https://medium.com/@AustenCMyers/an-ongoing-pyspark-reference-guide-4d7dd1342ef2
                
                https://www.sqltutorial.org/sql-window-functions/sql-rank/

Please provide your brief, written description of your code here.

#### Baby Names Question 3 - Performance [10 Points]

Are there any performance considerations when choosing a language API (SQL vs Python vs Scala)

Are there any performance considerations when using different data representations (RDD, Dataframe) in Spark? Please explain, and provide references if possible. No code answer is required.

Please write your written answer here.

#### Baby Names Question 4 - Nested XML [15 Points]
Imagine that a new upstream system now automatically adds an XML field to the JSON baby dataset.  The added field is called visitors. It contains an XML string with visitor information for a given birth. We have simulated this upstream system by creating another JSON file with the additional field.  

Using the JSON dataset at https://raw.githubusercontent.com/jservin/scratch-pad/master/rows-with-xml.json, do the following:
0. read the rows-with-xml.json file into a dataframe and parse the nested XML fields into columns
0. Find the county with the highest average number of visitors across all births in that county
0. Find the average visitor age for a birth in the county of KINGS
0. Find the most common birth visitor age in the county of KINGS

In [0]:
%sh wget https://raw.githubusercontent.com/jservin/scratch-pad/master/births-with-visitor-data.json && cp births-with-visitor-data.json /dbfs/tmp

In [0]:
## Hint: the code below will read in the downloaded JSON files. However, the xml column needs to be given structure. Consider using a UDF.
#df = spark.read.option("inferSchema", True).json("/tmp/births-with-visitor-data.json")


Please provide your written answer for Question 4 here

### ML Question 1 [20 Points]
Choose a data set then demonstrate the training and testing of a Spark MLlib algorithm. You may use any dataset contained in dbfs:/databricks-datasets, or you may use a publically available dataset of your choice. However, if you choose a public data set, please ensure your code downloads the dataset from a publically available source. The grader needs to be able to run all your code. Please cite any sources/guides/blogs used. 

Show and discuss an example of tuning the algorithm to improve prediction accuracy. Please describe in detail what algorithm you chose and why.

In [0]:
# Please provide your code answer for Question 1 Here

Please provide your brief, written description of your code here