In [1]:
import os
import datetime
import re
import tarfile

from pyspark.sql.types import *
from pyspark.sql.functions import broadcast, lit

In [2]:
%run ./0-Config

In [3]:
%run ./0-Functions

#### DataSet1 from Blob

In [5]:
delimiter_dataset_1 = "\t"

#### Lookups

These appear to be strictly incrementing type 1 SCDs. I.e. each successive file maintains the previous one (including unique IDs) but adds new records. Example, browsers file.

Lookups are delivered hourly, each as a tar.gz which in turn contains all the individual lookup files. All the .tar.gz files contain all the .tsv lookups.

As the .tsv files appear to be cumulative and non-incremental (i.e. each is complete), we'll get the last file for each day. We also do not need to do an initial load, as each file contains all previous lookup data.

In [7]:
# Prepare date variables we'll need
# ASSUMPTION - we have data in a folder with YESTERDAY'S date. Adjust as obviously needed.
# Start with current date and subtract a day
yesterday = datetime.datetime.now() + datetime.timedelta(days=-1)
datetime_yesterday = datetime.datetime(yesterday.year, yesterday.month, yesterday.day)

print("Start | " + str(datetime_yesterday))

path_chunk_yesterday = str(datetime_yesterday.year) + "/" + "{:02d}".format(datetime_yesterday.month) + "/" + "{:02d}".format(datetime_yesterday.day)
print("Input | " + path_chunk_yesterday)

##### Lookup Schemas

In [9]:
schema_dataset_1_lookup_browsers = StructType([
  StructField("browser_id", IntegerType(), True),
  StructField("browser_name", StringType(), True)
])

schema_dataset_1_lookup_color_depths = StructType([
  StructField("color_depth_id", IntegerType(), True),
  StructField("color_depth_name", StringType(), True)
])

schema_dataset_1_lookup_countries = StructType([
  StructField("country_id", IntegerType(), True),
  StructField("country_name", StringType(), True)
])

schema_dataset_1_lookup_events = StructType([
  StructField("event_id", IntegerType(), True),
  StructField("event_name", StringType(), True)
])

schema_dataset_1_lookup_languages = StructType([
  StructField("language_id", IntegerType(), True),
  StructField("language_name", StringType(), True)
])

schema_dataset_1_lookup_operating_systems = StructType([
  StructField("operating_system_id", IntegerType(), True),
  StructField("operating_system_name", StringType(), True)
])

schema_dataset_1_lookup_resolutions = StructType([
  StructField("resolution_id", IntegerType(), True),
  StructField("resolution_name", StringType(), True)
])

schema_dataset_1_lookup_search_engines = StructType([
  StructField("search_engine_id", IntegerType(), True),
  StructField("search_engine_name", StringType(), True)
])

In [10]:
def GetLookupSchema(lookup_name):
  if (lookup_name == "browser"):
    schema = schema_dataset_1_lookup_browsers
  elif (lookup_name == "color_depth"):
    schema = schema_dataset_1_lookup_color_depths
  elif (lookup_name == "country"):
    schema = schema_dataset_1_lookup_countries
  elif (lookup_name == "event"):
    schema = schema_dataset_1_lookup_events
  elif (lookup_name == "languages"):
    schema = schema_dataset_1_lookup_languages
  elif (lookup_name == "operating_systems"):
    schema = schema_dataset_1_lookup_operating_systems
  elif (lookup_name == "resolution"):
    schema = schema_dataset_1_lookup_resolutions
  elif (lookup_name == "search_engines"):
    schema = schema_dataset_1_lookup_search_engines
  else:
    schema = None

  return schema;

In [11]:
# Get all lookup files for this day
dataset_1_lookup_files_yesterday = GetFilesRecursive(adls2uri_raw + "dataset_1/" + path_chunk_yesterday)

# Filter down to just the lookup files
dataset_1_lookup_files_yesterday_we_care_about = list(filter(lambda x: bool(re.search("dataset_1[0-9]{8}-[0-9]{6}-lookup_data.tar.gz", x)), dataset_1_lookup_files_yesterday))

# Sort descending (i.e. reverse sort)
dataset_1_lookup_files_yesterday_we_care_about.sort(key = None, reverse = True)

# Grab the first path, which will be the last lookup file path for the day, i.e. the most recent one with all additions for the day
dataset_1_lookup_path_yesterday = dataset_1_lookup_files_yesterday_we_care_about[0]

# Get just the filename
dataset_1_lookup_filename_yesterday_tar_gz = os.path.basename(dataset_1_lookup_path_yesterday)

# Local filename
dataset_1_lookup_path_local = '/Shared/dataset_1/lookup/' + path_chunk_yesterday + '/'

print("dataset_1_lookup_path_yesterday = " + dataset_1_lookup_path_yesterday)
print("dataset_1_lookup_filename_yesterday_tar_gz = " + dataset_1_lookup_filename_yesterday_tar_gz)
print("dataset_1_lookup_path_local = " + dataset_1_lookup_path_local)

In [12]:
# Copy the lookup file to the local Databricks file system since we have to unzip it to get to the contained individual lookup files

dbutils.fs.cp(dataset_1_lookup_path_yesterday, dataset_1_lookup_path_local + dataset_1_lookup_filename_yesterday_tar_gz, False)

In [13]:
display(dbutils.fs.ls(dataset_1_lookup_path_local))

In [14]:
# Extract all the lookup files - this goes into the contained .tar file and extracts all the .tsv files directly

tf = tarfile.open('/dbfs' + dataset_1_lookup_path_local + dataset_1_lookup_filename_yesterday_tar_gz)

tf.extractall(path = '/dbfs' + dataset_1_lookup_path_local)

tf.close()

In [15]:
# Delete the .tar.gz file

dbutils.fs.rm(dataset_1_lookup_path_local + dataset_1_lookup_filename_yesterday_tar_gz)

In [16]:
# The folder should now contain ONLY .tsv files

display(dbutils.fs.ls(dataset_1_lookup_path_local))

In [17]:
# Get a list of all the .tsv file paths

lookup_tsv_files = GetFilesRecursive(dataset_1_lookup_path_local)

In [18]:
# Iterate through the files. For each, get the corresponding schema; ingest to dataframe; then write out to Parquet.
# As these are small lookups, we're coalescing to one file and not partitioning. That will enable us later to easily broadcast these to all workers as needed.

for tsv_file in lookup_tsv_files:
  print(tsv_file)

  lookup_name = os.path.splitext(os.path.basename(tsv_file))[0]
  print(lookup_name)
  
  lookup_schema = GetLookupSchema(lookup_name)

  if lookup_schema != None:
    df_lookup = spark\
      .read\
      .format("csv")\
      .schema(lookup_schema)\
      .option("sep", delimiter_dataset_1)\
      .option("quote", "")\
      .option("header", None)\
      .load(tsv_file)

    path_output_lookup = adls2uri_staging1 + "dataset_1/lookup/" + path_chunk_yesterday + "/" + lookup_name
    print("Output | " + path_output_lookup)
    
    # Write output for this lookup
    df_lookup.coalesce(1).write.parquet(path_output_lookup)
  
    # Clean up job files
    CleanupSparkJobFiles(path_output_lookup)

In [19]:
# Delete the local extracted lookup files folder

dbutils.fs.rm(dataset_1_lookup_path_local, True)