## Setups
- Let's mount the blob with the files to access the folder more easily ==> Using Scala = %scala

In [2]:
# Just in case, we need to cleanup
#dbutils.fs.unmount("/mnt/airdelaysblob")

In [3]:
%scala
dbutils.fs.mount(
  source = "wasbs://[Container]@[Storageaccount].blob.core.windows.net/",
  mountPoint = "/mnt/airdelaysblob",
  extraConfigs = Map("fs.azure.sas.airdelays.[Stoageaccount].blob.core.windows.net" -> "[ACCESSKEY]"))

## But let's start engineering. 
We'll create a dataframe from the content of the "airdelays" folder. 

Assumption is, the files look the same. (if not, there are methods for this ;) )

In [5]:
dfairdelays = sqlContext.read.format('csv').options(header='true', inferSchema='true').load("/mnt/airdelaysblob")

## The cache function takes dataframe into memory.

In [7]:
dfairdelays.cache()

In [8]:
display(dfairdelays.select("ORIGIN").distinct())

ORIGIN
BGM
DLG
PSE
INL
MSY
GEG
DRT
BUR
SNA
GRB


## Next steps: 

- drop unnecessary columns from the dataframe
- eliminate or replace Nulls from the dataframe?
- create new column with a new value
- visually check some values
- persist the dataset into a Databricks table
- persist the dataset into a Parquet file

In [10]:
dfairdelays = dfairdelays.select( \
                                 
          "DAY_OF_WEEK", \
          "MONTH", \
          "UNIQUE_CARRIER", \
          "ORIGIN_STATE_ABR", \
          "ORIGIN", \
          "DEST_STATE_ABR", \
          "DEST", \
          "DEP_DEL15", \
          "DEP_DELAY", \
          "ARR_DEL15", \
          "ARR_DELAY"
         )

## NULL Handling
Do we create additional bias into our data?

In [12]:
# Do we replace NULL values? (in this case with Zero = 0)
delaysfilna = dfairdelays.fillna(0)

# Or do we eliminate them?
dfairdelays = dfairdelays.dropna()

In [13]:
from pyspark.sql.functions import isnan, when, count, col 
dfairdelays.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in dfairdelays.columns]).show()

In [14]:
display(dfairdelays)

DAY_OF_WEEK,MONTH,UNIQUE_CARRIER,ORIGIN_STATE_ABR,ORIGIN,DEST_STATE_ABR,DEST,DEP_DEL15,DEP_DELAY,ARR_DEL15,ARR_DELAY
4,7,AA,NY,JFK,CA,LAX,0.0,0.0,0.0,8.0
5,7,AA,NY,JFK,CA,LAX,0.0,-2.0,1.0,15.0
6,7,AA,NY,JFK,CA,LAX,0.0,-6.0,0.0,2.0
7,7,AA,NY,JFK,CA,LAX,0.0,-5.0,0.0,-2.0
1,7,AA,NY,JFK,CA,LAX,0.0,-7.0,0.0,1.0
2,7,AA,NY,JFK,CA,LAX,0.0,3.0,0.0,6.0
3,7,AA,NY,JFK,CA,LAX,0.0,-8.0,0.0,-2.0
4,7,AA,NY,JFK,CA,LAX,0.0,-2.0,1.0,31.0
5,7,AA,NY,JFK,CA,LAX,0.0,-7.0,1.0,20.0
6,7,AA,NY,JFK,CA,LAX,1.0,41.0,1.0,40.0


## But how can we create a new column into the dataframe?

In [16]:
dfairdelays = dfairdelays.withColumn("DELTA", dfairdelays['ARR_DELAY'] - dfairdelays['DEP_DELAY']*-1)

display(dfairdelays)

DAY_OF_WEEK,MONTH,UNIQUE_CARRIER,ORIGIN_STATE_ABR,ORIGIN,DEST_STATE_ABR,DEST,DEP_DEL15,DEP_DELAY,ARR_DEL15,ARR_DELAY,DELTA
4,7,AA,NY,JFK,CA,LAX,0.0,0.0,0.0,8.0,8.0
5,7,AA,NY,JFK,CA,LAX,0.0,-2.0,1.0,15.0,13.0
6,7,AA,NY,JFK,CA,LAX,0.0,-6.0,0.0,2.0,-4.0
7,7,AA,NY,JFK,CA,LAX,0.0,-5.0,0.0,-2.0,-7.0
1,7,AA,NY,JFK,CA,LAX,0.0,-7.0,0.0,1.0,-6.0
2,7,AA,NY,JFK,CA,LAX,0.0,3.0,0.0,6.0,9.0
3,7,AA,NY,JFK,CA,LAX,0.0,-8.0,0.0,-2.0,-10.0
4,7,AA,NY,JFK,CA,LAX,0.0,-2.0,1.0,31.0,29.0
5,7,AA,NY,JFK,CA,LAX,0.0,-7.0,1.0,20.0,13.0
6,7,AA,NY,JFK,CA,LAX,1.0,41.0,1.0,40.0,81.0


## And there are other programming languages that we could use on the data....

We will create a temporary view for the exchange

In [18]:
dfairdelays.createOrReplaceTempView("airdelays_tempview")

##Now we just switch to SQL 
(could also be R or Scala btw)

In [20]:
%sql
select *
from airdelays_tempview

DAY_OF_WEEK,MONTH,UNIQUE_CARRIER,ORIGIN_STATE_ABR,ORIGIN,DEST_STATE_ABR,DEST,DEP_DEL15,DEP_DELAY,ARR_DEL15,ARR_DELAY,DELTA
4,7,AA,NY,JFK,CA,LAX,0.0,0.0,0.0,8.0,8.0
5,7,AA,NY,JFK,CA,LAX,0.0,-2.0,1.0,15.0,13.0
6,7,AA,NY,JFK,CA,LAX,0.0,-6.0,0.0,2.0,-4.0
7,7,AA,NY,JFK,CA,LAX,0.0,-5.0,0.0,-2.0,-7.0
1,7,AA,NY,JFK,CA,LAX,0.0,-7.0,0.0,1.0,-6.0
2,7,AA,NY,JFK,CA,LAX,0.0,3.0,0.0,6.0,9.0
3,7,AA,NY,JFK,CA,LAX,0.0,-8.0,0.0,-2.0,-10.0
4,7,AA,NY,JFK,CA,LAX,0.0,-2.0,1.0,31.0,29.0
5,7,AA,NY,JFK,CA,LAX,0.0,-7.0,1.0,20.0,13.0
6,7,AA,NY,JFK,CA,LAX,1.0,41.0,1.0,40.0,81.0


In [21]:
%sql
select distinct ORIGIN_STATE_ABR
from airdelays_tempview

ORIGIN_STATE_ABR
AZ
SC
LA
MN
NJ
OR
VA
RI
KY
WY


In [22]:
%sql
select 
  ORIGIN_STATE_ABR,
  avg(DELTA)
from airdelays_tempview
group by
  ORIGIN_STATE_ABR

ORIGIN_STATE_ABR,avg(DELTA)
AZ,13.915584400511106
SC,12.99142922601379
LA,12.197028176900096
MN,13.476952854052517
NJ,21.43110894576146
OR,11.808571116288118
VA,13.122191054927685
RI,10.96060559431336
KY,14.13639068556433
WY,8.972081090174965


## And there is also the way back for data that might have been calculated with SQL: 

create or replace temporary View [NAME]

as

select ...



and then 

dfairdelays = spark.table("[NAME]")

## But let's persist the data now into a Databricks table

In [25]:
dfairdelays.write.saveAsTable('delays_table')