# Azure Discovery Day 2019
## Analytics with NRT Intelligence on Azure

#### Summary
In this Python Jupyter notebook, you will:
1. Connect to Azure storage
2. Ingest data from CSV files in Azure storage to Spark dataframes
3. Conform and merge heterogenous data sets using the Spark dataframe API
4. Emit data to Azure storage in Parquet file format

Additionally, there are optional steps to create Hive tables on the data, query them with Spark SQL, as well as some exploratory data analysis (EDA).

In [2]:
## Need some library includes

from pyspark.sql.types import *
from pyspark.sql.functions import broadcast, lit

### Functions

In [4]:
## Function to get a Spark DataFrame from a CSV source file

def GetDataFrameFromCsvFile(schema, sourceFilePath, delimiter):
  df = spark\
    .read\
    .format("csv")\
    .option("header", "true")\
    .option("delimiter", delimiter)\
    .schema(schema)\
    .load(sourceFilePath)
  
  return df;

In [5]:
## Given a reference dataframe (this would not make sense for large transaction dataframes), broadcast it across the cluster, lazy-cache it, and return the count, which instantiates the dataframe

def HandleReferenceDataFrame(df):
  broadcast(df)
  df.cache()
  count = df.count()
  
  return count;

In [6]:
## Delete Spark job residual files (_SUCCESS, _start*, _committed*) down the folder/file hierarchy

import os

def CleanupSparkJobFiles(parquetFolderPath):
  file_paths = GetFilesRecursive(parquetFolderPath)
  
  for file_path in file_paths:
    # Get just the file name
    file_name = os.path.basename(file_path)
    # print(file_name)
    
    if file_name.startswith("_"):
      # Temp job file - delete it
      dbutils.fs.rm(file_path)
    # elif file_name.endswith(".parquet"):
      # Data file - no op
    # else:
      # Something else - no op

In [7]:
## Get iterable file list. Flattens hierarchical folder/file structure.

def GetFilesRecursive(rootPath):
  final_list = []

  for directoryItem in dbutils.fs.ls(rootPath):
    directoryItemPathClean = directoryItem.path.replace("%25", "%").replace("%25", "%")
    
    if directoryItem.isDir() == True:
      final_list = final_list + GetFilesRecursive(directoryItemPathClean)
    else:
      final_list.append(directoryItemPathClean)
  
  return final_list;

### Connect to Azure Storage

In [9]:
# Define some variables to minimize "hard-coding" in below cells. Note that variables can also be defined in a separate notebook.

# Azure storage account information
storageAcctName = "PROVIDE"
storageAcctKey = "PROVIDE"
containerName = "PROVIDE"

# The mount point in the DBFS file system - this will look like a local folder but points to the Azure storage location
mountPoint = "/mnt/" + containerName + "/"

In [10]:
## Use the Databricks file system utilities to mount a Databricks file system location (/mnt/YOUR CONTAINER NAME) that points to the Azure storage account where data files are located
## We use variables defined above and string concatenation here so that no "hard-coding" is needed

dbutils.fs.mount(
  source = "wasbs://" + containerName + "@" + storageAcctName + ".blob.core.windows.net",
  mount_point = mountPoint,
  extra_configs = {"fs.azure.account.key." + storageAcctName + ".blob.core.windows.net":storageAcctKey}
)

In [11]:
## This is included to remove the Azure storage mount
## Commented out since not needed for the lab, but included here "just in case" for debugging/experimenting - for example, mount, unmount, try something different, mount again

#dbutils.fs.unmount(mountPoint)

In [12]:
## List contents of the Azure storage account to validate successful connect and mount
## We are using the Databricks display() function here to improve the esthetics of the output

display(dbutils.fs.ls(mountPoint))

### Load Reference Data Files into DataFrames

##### Define variables to hold the source path for each of the reference data files

In [15]:
src_file_ref_payment_type = mountPoint + "reference-data/payment_type_lookup.csv"
src_file_ref_rate_code = mountPoint + "reference-data/rate_code_lookup.csv"
src_file_ref_taxi_zone = mountPoint + "reference-data/taxi_zone_lookup.csv"
src_file_ref_trip_month = mountPoint + "reference-data/trip_month_lookup.csv"
src_file_ref_trip_type = mountPoint + "reference-data/trip_type_lookup.csv"
src_file_ref_vendor = mountPoint + "reference-data/vendor_lookup.csv"

##### Define explicit schemas for each of the reference data files

We could also ingest files with schema inference (i.e. tell Spark to try to figure it out) but let's be explicit here for greater control.

In [17]:
## Payment type
schema_ref_payment_type = StructType([
    StructField("payment_type", IntegerType(), True),
    StructField("abbreviation", StringType(), True),
    StructField("description", StringType(), True)
])

## Rate code ID
schema_ref_rate_code = StructType([
    StructField("rate_code_id", IntegerType(), True),
    StructField("description", StringType(), True)
])

## Taxi zone
schema_ref_taxi_zone = StructType([
    StructField("location_id", StringType(), True),
    StructField("borough", StringType(), True),
    StructField("zone", StringType(), True),
    StructField("service_zone", StringType(), True)
])

## Trip month
schema_ref_trip_month = StructType([
    StructField("trip_month", StringType(), True),
    StructField("month_name_short", StringType(), True),
    StructField("month_name_full", StringType(), True)
])

## Trip type
schema_ref_trip_type = StructType([
    StructField("trip_type", IntegerType(), True),
    StructField("description", StringType(), True)
])

## Vendor ID
schema_ref_vendor = StructType([
    StructField("vendor_id", IntegerType(), True),
    StructField("abbreviation", StringType(), True),
    StructField("description", StringType(), True)
])

##### Load each reference data set into a Spark DataFrame

We load the data from source file into dataframe using a function (above) for that purpose.

Then we do some more optimizations for the reference dataframes:
1. Broadcast the dataframe. These are small dataframes with reference data. Broadcasting means we replicate a dataframe to each worker node in a Spark cluster, so that cross-node (cross-network) joins are avoided.
2. Lazy-cache the dataframe into memory as another performance optimization.

Last, we print the number rows in the dataframe.

In [19]:
df_ref_payment_type = GetDataFrameFromCsvFile(schema_ref_payment_type, src_file_ref_payment_type, "|")

print(HandleReferenceDataFrame(df_ref_payment_type))
display(df_ref_payment_type)

In [20]:
df_ref_rate_code = GetDataFrameFromCsvFile(schema_ref_rate_code, src_file_ref_rate_code, "|")

print(HandleReferenceDataFrame(df_ref_rate_code))
display(df_ref_rate_code)

In [21]:
df_ref_taxi_zone = GetDataFrameFromCsvFile(schema_ref_taxi_zone, src_file_ref_taxi_zone, ",")

print(HandleReferenceDataFrame(df_ref_taxi_zone))
display(df_ref_taxi_zone)

In [22]:
df_ref_trip_month = GetDataFrameFromCsvFile(schema_ref_trip_month, src_file_ref_trip_month, ",")

print(HandleReferenceDataFrame(df_ref_trip_month))
display(df_ref_trip_month)

In [23]:
df_ref_trip_type = GetDataFrameFromCsvFile(schema_ref_trip_type, src_file_ref_trip_type, "|")

print(HandleReferenceDataFrame(df_ref_trip_type))
display(df_ref_trip_type)

In [24]:
df_ref_vendor = GetDataFrameFromCsvFile(schema_ref_vendor, src_file_ref_vendor, "|")

print(HandleReferenceDataFrame(df_ref_vendor))
display(df_ref_vendor)

### Write reference data out to Parquet files

Parquet files are faster to load than CSV. They also support partitioning, but for the small reference data files, we coalesce the dataframe to 1 piece and we do not partition.

In [26]:
## Define the root path where we will write all Parquet data files
path_parquet = mountPoint + "parquet/"

## Cleanup - delete the Parquet folder if it's present
dbutils.fs.rm(path_parquet, True)

In [27]:
df_ref_payment_type.coalesce(1).write.parquet(path_parquet + "ref-payment-type/")

In [28]:
df_ref_rate_code.coalesce(1).write.parquet(path_parquet + "ref-rate-code/")

In [29]:
df_ref_taxi_zone.coalesce(1).write.parquet(path_parquet + "ref-taxi-zone/")

In [30]:
df_ref_trip_month.coalesce(1).write.parquet(path_parquet + "ref-trip-month/")

In [31]:
df_ref_trip_type.coalesce(1).write.parquet(path_parquet + "ref-trip-type/")

In [32]:
df_ref_vendor.coalesce(1).write.parquet(path_parquet + "ref-vendor/")

In [33]:
## Delete Spark job files recursively

CleanupSparkJobFiles(path_parquet)

### Load transactions (trip data) into DataFrames

In [35]:
## Define variable for source data path
src_path_data = mountPoint + "transactional-data-small/"

In [36]:
## Canonical ordered column list to homogenize schema

schema_rides_canonical = [
	"trip_year",
	"trip_month",
	"taxi_type",
	"vendor_id",
	"pickup_datetime",
	"dropoff_datetime",
	"passenger_count",
	"trip_distance",
	"rate_code_id",
	"store_and_fwd_flag",
	"pickup_location_id",
	"dropoff_location_id",
	"pickup_longitude",
	"pickup_latitude",
	"dropoff_longitude",
	"dropoff_latitude",
	"payment_type",
	"fare_amount",
	"extra",
	"mta_tax",
	"tip_amount",
	"tolls_amount",
	"improvement_surcharge",
	"total_amount"
]

#### Load yellow taxi rides

In [38]:
## Function to add columns to dataframe as required to homogenize schema
## Input:  Dataframe, year and month
## Output: Dataframe with homogenized schema 
## Sample call: println(GetSchemaHomogenizedDataframe(DF, 2014, 6))

def GetSchemaHomogenizedDataframe(sourceDF, tripYear, tripMonth):
  years10To14 = [2010, 2011, 2012, 2013, 2014]
  
  df = null

  if (tripYear >= 2017 or (tripYear == 2016 and tripMonth > 6)):
    df = sourceDF\
      .withColumn("trip_year", substring(col("tpep_pickup_datetime"), 0, 4))\
      .withColumn("trip_month", substring(col("tpep_pickup_datetime"), 6, 2))\
      .withColumn("taxi_type", lit("yellow"))\
      .withColumn("temp_vendor_id", col("VendorID").cast(StringType())).drop("VendorID").withColumnRenamed("temp_vendor_id", "vendor_id")\
      .withColumnRenamed("tpep_pickup_datetime", "pickup_datetime")\
      .withColumnRenamed("tpep_dropoff_datetime", "dropoff_datetime")\
      .withColumnRenamed("RatecodeID", "rate_code_id")\
      .withColumnRenamed("PULocationID", "pickup_location_id")\
      .withColumnRenamed("DOLocationID", "dropoff_location_id")\
      .withColumn("pickup_longitude", lit(""))\
      .withColumn("pickup_latitude", lit(""))\
      .withColumn("dropoff_longitude", lit(""))\
      .withColumn("dropoff_latitude", lit(""))\
      .withColumn("temp_payment_type", col("payment_type").cast(StringType())).drop("payment_type").withColumnRenamed("temp_payment_type", "payment_type")

      # passenger_count
      # trip_distance
      # store_and_fwd_flag
      # fare_amount
      # extra
      # mta_tax
      # tip_amount
      # tolls_amount
      # improvement_surcharge
      # total_amount
  elif ((tripYear == 2016 and tripMonth <= 6) or (tripYear == 2015)):
    df = sourceDF\
      .withColumn("trip_year", substring(col("tpep_pickup_datetime"), 0, 4))\
      .withColumn("trip_month", substring(col("tpep_pickup_datetime"), 6, 2))\
      .withColumn("taxi_type", lit("yellow"))\
      .withColumn("temp_vendor_id", col("VendorID").cast(StringType())).drop("VendorID").withColumnRenamed("temp_vendor_id", "vendor_id")\
      .withColumnRenamed("tpep_pickup_datetime", "pickup_datetime")\
      .withColumnRenamed("tpep_dropoff_datetime", "dropoff_datetime")\
      .withColumnRenamed("RatecodeID", "rate_code_id")\
      .withColumn("pickup_location_id", lit(0).cast(IntegerType()))\
      .withColumn("dropoff_location_id", lit(0).cast(IntegerType()))\
      .withColumn("temp_pickup_longitude", col("pickup_longitude").cast(StringType())).drop("pickup_longitude").withColumnRenamed("temp_pickup_longitude", "pickup_longitude")\
      .withColumn("temp_pickup_latitude", col("pickup_latitude").cast(StringType())).drop("pickup_latitude").withColumnRenamed("temp_pickup_latitude", "pickup_latitude")\
      .withColumn("temp_dropoff_longitude", col("dropoff_longitude").cast(StringType())).drop("dropoff_longitude").withColumnRenamed("temp_dropoff_longitude", "dropoff_longitude")\
      .withColumn("temp_dropoff_latitude", col("dropoff_latitude").cast(StringType())).drop("dropoff_latitude").withColumnRenamed("temp_dropoff_latitude", "dropoff_latitude")\
      .withColumn("temp_payment_type", col("payment_type").cast(StringType())).drop("payment_type").withColumnRenamed("temp_payment_type", "payment_type")

      # passenger_count
      # trip_distance
      # store_and_fwd_flag
      # fare_amount
      # extra
      # mta_tax
      # tip_amount
      # tolls_amount
      # improvement_surcharge
      # total_amount
  elif (tripYear in years10To14):
    df = sourceDF\
      .withColumn("trip_year", substring(col("pickup_datetime"), 0, 4))\
      .withColumn("trip_month", substring(col("pickup_datetime"), 6, 2))\
      .withColumn("taxi_type", lit("yellow"))\
      .withColumn("temp_vendor_id", col("vendor_id").cast(StringType())).drop("vendor_id").withColumnRenamed("temp_vendor_id", "vendor_id")\
      .withColumnRenamed("rate_code", "rate_code_id")\
      .withColumn("pickup_location_id", lit(0).cast(IntegerType()))\
      .withColumn("dropoff_location_id", lit(0).cast(IntegerType()))\
      .withColumn("temp_pickup_longitude", col("pickup_longitude").cast(StringType())).drop("pickup_longitude").withColumnRenamed("temp_pickup_longitude", "pickup_longitude")\
      .withColumn("temp_pickup_latitude", col("pickup_latitude").cast(StringType())).drop("pickup_latitude").withColumnRenamed("temp_pickup_latitude", "pickup_latitude")\
      .withColumn("temp_dropoff_longitude", col("dropoff_longitude").cast(StringType())).drop("dropoff_longitude").withColumnRenamed("temp_dropoff_longitude", "dropoff_longitude")\
      .withColumn("temp_dropoff_latitude", col("dropoff_latitude").cast(StringType())).drop("dropoff_latitude").withColumnRenamed("temp_dropoff_latitude", "dropoff_latitude")\
      .withColumn("temp_payment_type", col("payment_type").cast(StringType())).drop("payment_type").withColumnRenamed("temp_payment_type", "payment_type")\
      .withColumnRenamed("surcharge", "extra")\
      .withColumn("improvement_surcharge",lit(0).cast(DoubleType()))

      # pickup_datetime
      # dropoff_datetime
      # passenger_count
      # trip_distance 
      # store_and_fwd_flag
      # fare_amount
      # mta_tax
      # tip_amount
      # tolls_amount
      # total_amount

  return df;

In [39]:
## Function to return schema for a given year and month
## Input:  Year and month
## Output: StructType for applicable schema 
## Sample call: println(GetTaxiSchema(2009,1))

def GetTaxiSchema(tripYear, tripMonth):
  taxiSchema = null

  years10To14 = [2010, 2011, 2012, 2013, 2014]
  
  if (tripYear >= 2017 or (tripYear == 2016 and tripMonth > 6)):
    taxiSchema = yellowTripSchema_16H2to18
  elif ((tripYear == 2016 and tripMonth <= 6) or tripYear == 2015):
    taxiSchema = yellowTripSchema_15to16H1
  elif (tripYear in years10To14):
    taxiSchema = yellowTripSchema_10to14

  return taxiSchema;

##### Define source file schemas

These vary by year. We have to define several schemas to fit the different source file layouts.

In [41]:
## 2016H2, 2017, 2018
schema_rides_yellow_16H2to18 = StructType([
    StructField("VendorID", StringType(), True),
    StructField("tpep_pickup_datetime", TimestampType(), True),
    StructField("tpep_dropoff_datetime", TimestampType(), True),
    StructField("passenger_count", IntegerType(), True),
    StructField("trip_distance", DoubleType(), True),
    StructField("RatecodeID", IntegerType(), True),
    StructField("store_and_fwd_flag", StringType(), True),
    StructField("PULocationID", IntegerType(), True),
    StructField("DOLocationID", IntegerType(), True),
    StructField("payment_type", StringType(), True),
    StructField("fare_amount", DoubleType(), True),
    StructField("extra", DoubleType(), True),
    StructField("mta_tax", DoubleType(), True),
    StructField("tip_amount", DoubleType(), True),
    StructField("tolls_amount", DoubleType(), True),
    StructField("improvement_surcharge", DoubleType(), True),
    StructField("total_amount", DoubleType(), True)
])

## 2015 and 2016H1
schema_rides_yellow_15to16H1 = StructType([
    StructField("VendorID", StringType(), True),
    StructField("tpep_pickup_datetime", TimestampType(), True),
    StructField("tpep_dropoff_datetime", TimestampType(), True),
    StructField("passenger_count", IntegerType(), True),
    StructField("trip_distance", DoubleType(), True),
    StructField("pickup_longitude", DoubleType(), True),
    StructField("pickup_latitude", DoubleType(), True),
    StructField("RatecodeID", IntegerType(), True),
    StructField("store_and_fwd_flag", StringType(), True),
    StructField("dropoff_longitude", DoubleType(), True),
    StructField("dropoff_latitude", DoubleType(), True),
    StructField("payment_type", StringType(), True),
    StructField("fare_amount", DoubleType(), True),
    StructField("extra", DoubleType(), True),
    StructField("mta_tax", DoubleType(), True),
    StructField("tip_amount", DoubleType(), True),
    StructField("tolls_amount", DoubleType(), True),
    StructField("improvement_surcharge", DoubleType(), True),
    StructField("total_amount", DoubleType(), True)
])

## 2010 though 2014
schema_rides_yellow_10to14 = StructType([
    StructField("vendor_id", StringType(), True),
    StructField("pickup_datetime", TimestampType(), True),
    StructField("dropoff_datetime", TimestampType(), True),
    StructField("passenger_count", IntegerType(), True),
    StructField("trip_distance", DoubleType(), True),
    StructField("pickup_longitude", DoubleType(), True),
    StructField("pickup_latitude", DoubleType(), True),
    StructField("rate_code", IntegerType(), True),
    StructField("store_and_fwd_flag", StringType(), True),
    StructField("dropoff_longitude", DoubleType(), True),
    StructField("dropoff_latitude", DoubleType(), True),
    StructField("payment_type", StringType(), True),
    StructField("fare_amount", DoubleType(), True),
    StructField("surcharge", DoubleType(), True),
    StructField("mta_tax", DoubleType(), True),
    StructField("tip_amount", DoubleType(), True),
    StructField("tolls_amount", DoubleType(), True),
    StructField("total_amount", DoubleType(), True)
])