## Setups
Mount (and unmount before) a folder on ADLS where the data was put by the ADF-Pipeline

See https://docs.microsoft.com/en-us/azure/azure-resource-manager/resource-group-create-service-principal-portal  how the Azure Active Directory App ID is created with its credentials. Please note: DON'T FORGET TO COPY THE KEY BEFORE YOU SAVE IT, YOU WON'T BE ABLE TO ACCESS IT AFTERWARDS!!!

In [2]:
dbutils.fs.unmount("/mnt/airdelays")

In [3]:
configs = {"dfs.adls.oauth2.access.token.provider.type": "ClientCredential",
           "dfs.adls.oauth2.client.id": "[YOURCLIENTID]",
           "dfs.adls.oauth2.credential": "[YOURCREDENTIAL]",
           "dfs.adls.oauth2.refresh.url": "https://login.microsoftonline.com/72f988bf-86f1-41af-91ab-2d7cd011db47/oauth2/token"}

dbutils.fs.mount(
  source = "adl://[YOURDATALAKE].azuredatalakestore.net/[YOURFOLDER]/",
  mount_point = "/mnt/airdelays",
  extra_configs = configs)

## There are all kinds of possibilities top handle stuff in the storage layer.
Looks familiar to the Linux guys?

In [5]:
dbutils.fs.ls("/mnt/airdelays/airdelays")

## But let's start playing around. 
We'll create a dataframe from the content of the "airdelays" folder. 

Assumption is, the files look the same. (if not, there are methods for this ;) )

In [7]:
csvFolder = "/mnt/airdelays/airdelays"
df = (spark.read                        # The DataFrameReader
   .option("header", "true")            # Use first line of all files as header
   .option("inferSchema", "true")       # Automatically infer data types
   .csv(csvFolder)        
                                        # Creates a DataFrame from CSV after reading in the file
)

Cache the dataframe to speed up the following steps

In [9]:
df.cache()

Need the distinct values of a dataframe... 

Here (https://docs.databricks.com/spark/latest/dataframes-datasets/introduction-to-dataframes-python.html) you can find some documentation about PySpark.

In [11]:
display(df.select("ORIGIN").distinct())

ORIGIN
BGM
DLG
PSE
INL
MSY
GEG
DRT
BUR
SNA
GTF


## Next steps: 

- drop unnecessary columns from the dataframe
- eliminate or replace Nulls from the dataframe?
- create new column with a new value
- visually check some values
- create a job out of the workbook
- write out the results as a parquet file
- maybe add some streaming data?

In [13]:
delays = df.select( \
          "YEAR", \
          "MONTH", \
          "DAY_OF_MONTH", \
          "DAY_OF_WEEK", \
          "ORIGIN", \
          "UNIQUE_CARRIER", \
          "ARR_DELAY", \
          "ARR_DEL15", \
          "ORIGIN_STATE_ABR", \
          "DEST_STATE_ABR", \
          "DEP_DELAY"\
         )

In [14]:
display(delays)

YEAR,MONTH,DAY_OF_MONTH,DAY_OF_WEEK,ORIGIN,UNIQUE_CARRIER,ARR_DELAY,ARR_DEL15,ORIGIN_STATE_ABR,DEST_STATE_ABR,DEP_DELAY
2004,7,1,4,JFK,AA,8.0,0.0,NY,CA,0.0
2004,7,2,5,JFK,AA,15.0,1.0,NY,CA,-2.0
2004,7,3,6,JFK,AA,2.0,0.0,NY,CA,-6.0
2004,7,4,7,JFK,AA,-2.0,0.0,NY,CA,-5.0
2004,7,5,1,JFK,AA,1.0,0.0,NY,CA,-7.0
2004,7,6,2,JFK,AA,6.0,0.0,NY,CA,3.0
2004,7,7,3,JFK,AA,-2.0,0.0,NY,CA,-8.0
2004,7,8,4,JFK,AA,31.0,1.0,NY,CA,-2.0
2004,7,9,5,JFK,AA,20.0,1.0,NY,CA,-7.0
2004,7,10,6,JFK,AA,40.0,1.0,NY,CA,41.0


In [15]:
from pyspark.sql.functions import isnan, when, count, col 
delays.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in delays.columns]).show()

## NULL Handling
Do we create additional bias into our data?

In [17]:
# Do we replace NULL values? (in this case with Zero = 0)
delaysfilna = delays.fillna(0)

# Or do we eliminate them?
delaysnona = delays.dropna()

In [18]:
delaysnona.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in delays.columns]).show()

In [19]:
display(delaysnona)

YEAR,MONTH,DAY_OF_MONTH,DAY_OF_WEEK,ORIGIN,UNIQUE_CARRIER,ARR_DELAY,ARR_DEL15,ORIGIN_STATE_ABR,DEST_STATE_ABR,DEP_DELAY
2004,7,1,4,JFK,AA,8.0,0.0,NY,CA,0.0
2004,7,2,5,JFK,AA,15.0,1.0,NY,CA,-2.0
2004,7,3,6,JFK,AA,2.0,0.0,NY,CA,-6.0
2004,7,4,7,JFK,AA,-2.0,0.0,NY,CA,-5.0
2004,7,5,1,JFK,AA,1.0,0.0,NY,CA,-7.0
2004,7,6,2,JFK,AA,6.0,0.0,NY,CA,3.0
2004,7,7,3,JFK,AA,-2.0,0.0,NY,CA,-8.0
2004,7,8,4,JFK,AA,31.0,1.0,NY,CA,-2.0
2004,7,9,5,JFK,AA,20.0,1.0,NY,CA,-7.0
2004,7,10,6,JFK,AA,40.0,1.0,NY,CA,41.0


also do some calcs...

In [21]:
delaysnona_newc = delaysnona.withColumn("DELTA", delaysnona['ARR_DELAY'] - delaysnona['DEP_DELAY']*-1)


display(delaysnona_newc)


YEAR,MONTH,DAY_OF_MONTH,DAY_OF_WEEK,ORIGIN,UNIQUE_CARRIER,ARR_DELAY,ARR_DEL15,ORIGIN_STATE_ABR,DEST_STATE_ABR,DEP_DELAY,DELTA
2004,7,1,4,JFK,AA,8.0,0.0,NY,CA,0.0,8.0
2004,7,2,5,JFK,AA,15.0,1.0,NY,CA,-2.0,13.0
2004,7,3,6,JFK,AA,2.0,0.0,NY,CA,-6.0,-4.0
2004,7,4,7,JFK,AA,-2.0,0.0,NY,CA,-5.0,-7.0
2004,7,5,1,JFK,AA,1.0,0.0,NY,CA,-7.0,-6.0
2004,7,6,2,JFK,AA,6.0,0.0,NY,CA,3.0,9.0
2004,7,7,3,JFK,AA,-2.0,0.0,NY,CA,-8.0,-10.0
2004,7,8,4,JFK,AA,31.0,1.0,NY,CA,-2.0,29.0
2004,7,9,5,JFK,AA,20.0,1.0,NY,CA,-7.0,13.0
2004,7,10,6,JFK,AA,40.0,1.0,NY,CA,41.0,81.0


## Create temporary View ==> for the later use with SQL

In [23]:
delaysnona_newc.createOrReplaceTempView("airdelays_table")

Now we just switch to SQL 
(could also be R or Scala btw)

In [25]:
%sql

select *
from airdelays_table


YEAR,MONTH,DAY_OF_MONTH,DAY_OF_WEEK,ORIGIN,UNIQUE_CARRIER,ARR_DELAY,ARR_DEL15,ORIGIN_STATE_ABR,DEST_STATE_ABR,DEP_DELAY,DELTA
2004,7,1,4,JFK,AA,8.0,0.0,NY,CA,0.0,8.0
2004,7,2,5,JFK,AA,15.0,1.0,NY,CA,-2.0,13.0
2004,7,3,6,JFK,AA,2.0,0.0,NY,CA,-6.0,-4.0
2004,7,4,7,JFK,AA,-2.0,0.0,NY,CA,-5.0,-7.0
2004,7,5,1,JFK,AA,1.0,0.0,NY,CA,-7.0,-6.0
2004,7,6,2,JFK,AA,6.0,0.0,NY,CA,3.0,9.0
2004,7,7,3,JFK,AA,-2.0,0.0,NY,CA,-8.0,-10.0
2004,7,8,4,JFK,AA,31.0,1.0,NY,CA,-2.0,29.0
2004,7,9,5,JFK,AA,20.0,1.0,NY,CA,-7.0,13.0
2004,7,10,6,JFK,AA,40.0,1.0,NY,CA,41.0,81.0


In [26]:
%sql

select
  ORIGIN
  , avg(ARR_DELAY)
from airdelays_table
group by ORIGIN

ORIGIN,avg(ARR_DELAY)
BGM,3.338702543309989
DLG,10.86890182694238
PSE,-0.7377800407331976
INL,5.622596153846154
MSY,5.167156959000795
GEG,4.104301394051742
DRT,-6.914364640883978
SNA,4.164027677504417
BUR,4.34880618953682
GRB,4.205632628307179


In [27]:
%sql

select 
  UNIQUE_CARRIER
  , (ARR_DELAY - DEP_DELAY) as DELTA
from airdelays_table

UNIQUE_CARRIER,DELTA
AA,8.0
AA,17.0
AA,8.0
AA,3.0
AA,8.0
AA,3.0
AA,6.0
AA,33.0
AA,27.0
AA,-1.0


## But enough with that, let's output the results. First switch back to a PySpark Dataframe

In [29]:
dfairdelays_new = spark.table("airdelays_table")

## and then remove the file if there and write it to the Data lake Store in Parquet-Format

In [31]:
#dbutils.fs.rm("/mnt/airdelays/airdelays/airdelays_table.parquet", True)
#dfairdelays_new.write.parquet("/mnt/airdelays/airdelays/airdelays_table.parquet")
dfairdelays_new.write.csv("/mnt/airdelays/airdelays/airdelays_table.csv")

In [32]:
dfairdelays_new

## But wait, how many rows did we just write to our Data Lake and how long did it take?

In [34]:
dfairdelays_new.count()

## OK, then lets play a little with Python
First: very boring show() the content of the dataframe

In [36]:
delays.show()

Then: let's have a more sophisticated view on the things: display([YOURDATAFRAME])

In [38]:
display(delays)

YEAR,MONTH,DAY_OF_MONTH,DAY_OF_WEEK,ORIGIN,UNIQUE_CARRIER,ARR_DELAY,ARR_DEL15,ORIGIN_STATE_ABR,DEST_STATE_ABR,DEP_DELAY
2004,7,1,4,JFK,AA,8.0,0.0,NY,CA,0.0
2004,7,2,5,JFK,AA,15.0,1.0,NY,CA,-2.0
2004,7,3,6,JFK,AA,2.0,0.0,NY,CA,-6.0
2004,7,4,7,JFK,AA,-2.0,0.0,NY,CA,-5.0
2004,7,5,1,JFK,AA,1.0,0.0,NY,CA,-7.0
2004,7,6,2,JFK,AA,6.0,0.0,NY,CA,3.0
2004,7,7,3,JFK,AA,-2.0,0.0,NY,CA,-8.0
2004,7,8,4,JFK,AA,31.0,1.0,NY,CA,-2.0
2004,7,9,5,JFK,AA,20.0,1.0,NY,CA,-7.0
2004,7,10,6,JFK,AA,40.0,1.0,NY,CA,41.0


Count() the rows in the df

In [40]:
delays.count()

Cache the df into the memory of the cluster to speed things up a little

In [42]:
delays.cache()

Now do some math (the Data Scientists will know, where this is heading...)
Note the small buttons below the chart? ==> Here you can toggle table or chart-view after you have displayed the dataframe

In [44]:
display(delays.groupby('DAY_OF_WEEK').avg('ARR_DELAY'))

DAY_OF_WEEK,avg(ARR_DELAY)
1,6.365682122137005
6,3.75676157340381
3,6.538510925600507
5,8.977518779083464
4,8.40099970359445
7,6.062001282397349
2,5.472585296998161


In [45]:
display(delays.groupby('UNIQUE_CARRIER').avg('ARR_DELAY'))

UNIQUE_CARRIER,avg(ARR_DELAY)
UA,7.807771316077553
EA,7.102667527828459
PI,10.464421039446352
PS,9.261881326479774
AA,6.62380545255037
NW,5.467372338142225
EV,9.340070831564269
B6,8.1544325003662
HP,7.565357564088615
TW,6.856540040794198


But can we create a python function and use it?

In [47]:
def patsum( arg1, arg2 ):
  # Add both the parameters and return them."
  total = arg1 + arg2
  print("Inside the function : ", total)
  return total;

In [48]:
display(delays.withColumn('Summenfeld', patsum(col('ARR_DELAY'), col('ARR_DEL15'))))

YEAR,MONTH,DAY_OF_MONTH,DAY_OF_WEEK,ORIGIN,UNIQUE_CARRIER,ARR_DELAY,ARR_DEL15,ORIGIN_STATE_ABR,DEST_STATE_ABR,DEP_DELAY,Summenfeld
2004,7,1,4,JFK,AA,8.0,0.0,NY,CA,0.0,8.0
2004,7,2,5,JFK,AA,15.0,1.0,NY,CA,-2.0,16.0
2004,7,3,6,JFK,AA,2.0,0.0,NY,CA,-6.0,2.0
2004,7,4,7,JFK,AA,-2.0,0.0,NY,CA,-5.0,-2.0
2004,7,5,1,JFK,AA,1.0,0.0,NY,CA,-7.0,1.0
2004,7,6,2,JFK,AA,6.0,0.0,NY,CA,3.0,6.0
2004,7,7,3,JFK,AA,-2.0,0.0,NY,CA,-8.0,-2.0
2004,7,8,4,JFK,AA,31.0,1.0,NY,CA,-2.0,32.0
2004,7,9,5,JFK,AA,20.0,1.0,NY,CA,-7.0,21.0
2004,7,10,6,JFK,AA,40.0,1.0,NY,CA,41.0,41.0


OK, let's go to where the Data Scientists feel more comfortable...

describe() will show the most important stats for the dataframe given

In [50]:
delays.describe('ARR_DELAY').show()

And we can find a correlation between two values. This doesn't need to be important and can be a coincidence.

In [52]:
delays.corr('ARR_DELAY', 'DEP_DELAY')

From here on the Data Scientists might take over ==> more to come in the Advanced Analytics and Machine Learning Lab... ;)