# Creates Delta Table for YinzCam Realtime API : Actions, Sessions, Hardware, Geo IP Join

## Description
This notebook generates a delta table for "actions_sessions_geo_hardware" to be reused later in the mc pipeline.
This process saves execution time as this table won't have to recalculated for each notebook.

## Process

1. Ingest Actions, Sessions, GeoIp, Hardware from ADL
2. Join Actions & Sessions & GeoIp & Hardware tables together
3. Write to a delta table

### Clean up Notes

* spark.sql("DELETE FROM delta. `{}`".format(delta_url))
* spark.sql("VACUUM delta. `{}` RETAIN 168 HOURS".format(delta_url))
* dbutils.fs.rm(delta_url,recurse=True)

## Outputs
Delta Tables
* `actions_sessions_geo_hardware`

## QA

* Nicole Ridout, Data Engineer, Nicole.Ridout@MLSE.com
* Farah Bastien, Manager of Data Science, Farah.Bastien@MLSE.com

#### Load the necessary functions from `PySpark` and `Python`

In [0]:
#SQL-like functions from PySpark
from pyspark.sql.functions import col,date_format,from_utc_timestamp, unix_timestamp, sum, count,countDistinct, month,min,max,when,collect_set
from pyspark.sql.functions import least, greatest, size, isnan, explode, lit, broadcast, concat
from pyspark.sql.types import StructType, StructField, BinaryType, BooleanType, ByteType, DateType, DecimalType, DoubleType, FloatType, IntegerType,\
           LongType, ShortType, StringType, TimestampType
from pyspark.sql import SQLContext

#python packages
from time import time, sleep, mktime
from datetime import datetime, date, timedelta

#### Load ADL credentials

In [0]:
url = "https://login.microsoftonline.com/{0}/oauth2/token".format(dbutils.secrets.get(scope = "adl_cred", key = "directory_id"))
spark.conf.set("dfs.adls.oauth2.access.token.provider.type", "ClientCredential")
spark.conf.set("dfs.adls.oauth2.client.id", dbutils.secrets.get(scope = "adl_cred", key = "client_id"))
spark.conf.set("dfs.adls.oauth2.credential", dbutils.secrets.get(scope = "adl_cred", key = "credential"))
spark.conf.set("dfs.adls.oauth2.refresh.url", url)
# spark.conf.set("spark.sql.broadcastTimeout", "36000")

#### Define the team and url from ADL

In [0]:
dbutils.widgets.text("team", "","")
dbutils.widgets.get("team")
team = getArgument("team")
print("Working with {0} team".format(team))

In [0]:
adlurl = "adl://mlse1.azuredatalakestore.net/yinz_cam/"+ team +"_tor/realtime_api/"
print(adlurl)

#### Load Realtime API Data from ADL

In [0]:
timeFmt = "yyyy-MM-ddTHH:mm:ss.SSS"

actions  = (spark.read.csv(adlurl + 'actions',header=True)
            .drop_duplicates()
            .withColumnRenamed('id','action_id')
            .withColumn('request_date_time',from_utc_timestamp(col('request_date_time').cast(TimestampType()), "America/Toronto"))
            .withColumn('invisible_date_time',from_utc_timestamp(col('invisible_date_time').cast(TimestampType()), "America/Toronto"))
            .withColumn('action_date',date_format('request_date_time', 'yyyy-MM-dd'))
            .withColumn('dwell_time_s',unix_timestamp("invisible_date_time",timeFmt) - unix_timestamp("request_date_time",timeFmt))
           )

sessions = (spark.read.csv(adlurl + 'sessions',header=True)
            .withColumnRenamed('id','ses_id')
            .withColumn('start_date_time', from_utc_timestamp(col('start_date_time').cast(TimestampType()), "America/Toronto"))
            .withColumn('end_date_time', from_utc_timestamp(col('end_date_time').cast(TimestampType()), "America/Toronto"))
            .withColumn('session_date',date_format('start_date_time', 'yyyy-MM-dd'))
            .orderBy('end_date_time',ascending=False)
            .drop_duplicates(subset=['ses_id'])
            .where(col('ses_id').isNotNull())
            .withColumn("hardware_device_id",col("hardware_device_id").cast(IntegerType()))
           )

hardware = (spark.read.csv(adlurl + 'hardware',header=True)
            .withColumnRenamed('id','hardware_id')
            .drop_duplicates(subset=['hardware_id'])
            .where(col('hardware_id').isNotNull())
            .withColumn("hardware_id",col("hardware_id").cast(IntegerType()))
           )

geoip    = (spark.read.csv(adlurl + 'geoip' ,header=True)
            .withColumnRenamed('id','geoip_id')
            .withColumn('geoip_id',col('geoip_id').cast(IntegerType()))
            .where(col('geoip_id').isNotNull())
           )

geoip = (geoip
         .orderBy(geoip.columns, ascending=[False for i in range(len(geoip.columns))])
         .drop_duplicates(subset=['geoip_id'])
        )

#### Actions Sessions Hardware Geo Join

In [0]:
# Join Actions & Sessions
actions_sessions = (actions.join(sessions,col('session_id') == col('device_generated_id')))

# Join actions_sessions with Geo
actions_sessions_geo = actions_sessions.join(geoip,col('session_device_generated_id') == col('device_generated_id'))

# The hardware table is a very small table about 5000 records, and the other table is 300 million. So we use "Broadcast Join" to optimize the join on this situation
actions_sessions_geo_hardware = broadcast(hardware).join(actions_sessions_geo,col('hardware_id')==col('hardware_device_id'))

#### Write Actions/Sessions/GeoIp/Hardware to Delta

In [0]:
# Clean old Actions Sessions Hardware Geo Delta Table data
delta_url = '/delta/yinzcam/' + team + '/actions_sessions_hardware_geo'
spark.sql("VACUUM delta. `{}` RETAIN 168 HOURS".format(delta_url))
spark.sql("CLEAR CACHE")

In [0]:
actions_sessions_geo_hardware.write.format("delta").mode("overwrite").save("/delta/yinzcam/" + team + "/actions_sessions_hardware_geo")