# Import

In [None]:
import pandas as pd 
import numpy as np
import re
import datetime
import dateparser
import pytz

from typing import List
from tzlocal import get_localzone
from pyspark.sql import functions as sqlf
from pyspark.sql import types as sqlTypes
from pyspark.sql import SQLContext
from pyspark.sql.window import Window
from pyspark.sql import DataFrame
from pyspark.sql.functions import pandas_udf, PandasUDFType
from functools import reduce
from datetime import date, timedelta
from dateutil.relativedelta import relativedelta


from pyspark.sql.functions import when

In [None]:
%sql

# --fix legacy dates in Spark 2.0
set spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED;
set spark.sql.shuffle.partitions=48;

# --disable broadcast joins with larger data sets
set spark.sql.autoBroadcastJoinThreshold=-1;

# --set all properties for delta tables to optimize
set spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite = true;
set spark.databricks.delta.properties.defaults.autoOptimize.autoCompact = true;

# Parameters

In [None]:
#truncate table param
dbutils.widgets.text("EmptyTable","FALSE")
EmptyTable = dbutils.widgets.get("EmptyTable")

#day to start load
dbutils.widgets.text("sDate",datetime.datetime.now().strftime('%Y-%m-%d'))
sDate = dbutils.widgets.get("sDate")

if (sDate is None) | (sDate==''):
  sDate = datetime.datetime.now()
  sDate = sDate.strftime("%Y-%m-%d")
else:
  sDate = datetime.datetime.strptime(sDate,'%Y-%m-%d')
  
#day to end load
dbutils.widgets.text("eDate", datetime.datetime.now().strftime('%Y-%m-%d') )
eDate = dbutils.widgets.get("eDate")

if (eDate is None) | (eDate==''):
  eDate = datetime.datetime.now()
  eDate = eDate.strftime("%Y-%m-%d")
else:
  eDate = datetime.datetime.strptime(eDate,'%Y-%m-%d')

#set variable to be used in sql
spark.conf.set('start.date', str(sDate))
spark.conf.set('end.date', str(eDate))

#get rolling 4 years
rollingDate = (datetime.datetime.now(pytz.timezone('US/Central')) + relativedelta(months=-48)).strftime("%Y-%m-%d")

# Setup Database

In [None]:
# datalake path
baseDataLakePath = "/mnt/folder_path_name/"

# folder for read/write
STATIC_FOLDER = baseDataLakePath + "folder_name/"
TABLE_FOLDER = "file_name/"
FINAL_FOLDER = str(STATIC_FOLDER) + str(TABLE_FOLDER)
TMP_FOLDER = str(FINAL_FOLDER) + "DELTA/"

# setup databricks database variables
dbName = "schema"
permTable = "table_name"

#set variable to be used in sql
spark.conf.set('delta.folder', str(TMP_FOLDER))
spark.conf.set('db.table', str(dbName + "." + permTable))

In [None]:
if EmptyTable.upper()=="TRUE":
  spark.sql("DROP TABLE IF EXISTS " + dbName + "." + permTable)
  dbutils.fs.rm(FINAL_FOLDER + 'DELTA/',recurse=True);
  dbutils.fs.rm(FINAL_FOLDER + 'LOAD/',recurse=True);

# Get Data From Datalake and Convert to Delta Format

In [None]:
# parquet file
file_type = "parquet"

from_file = SHARED_FOLDER + 'datalake_file_path/filename.parquet'
to_file = FINAL_FOLDER + 'TMP_FILE/'

if file_exists(to_file):
  #delete file
  dbutils.fs.rm(to_file, True)
  
df = spark.read.format(file_type) \
  .load(from_file)
df.write.format("delta").save(to_file)

# csv file
df = spark.read.format("csv") \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(FINAL_FOLDER + 'filename.csv')


to_file = FINAL_FOLDER + 'TMP_FILE/'

if file_exists(to_file):
  #delete file
  dbutils.fs.rm(to_file, True)
#write csv data to delta  
df.write.format("delta").save(to_file)

#clean up
df.unpersist()
spark.sparkContext._jvm.System.gc()

# Read data back in as views

In [None]:
file_location = FINAL_FOLDER + 'TMP_FILE/'
file_type = "delta"

df = spark.read.format(file_type).load(file_location)
#create temp view
df.registerTempTable("vw_table_name")

# Transformations

In [None]:
%sql

DROP VIEW IF EXISTS vw_final;
CREATE TEMPORARY VIEW vw_final
As
Select Distinct *
From vw_table_name

In [None]:
# load up data
theSQL = "Select distinct * From vw_final Where Date>='" + str(rollingDate) + "'"
tmp_final_df = spark.sql(theSQL)

# dropping one record
tmp_final_df = tmp_final_df.withColumn('New_col',
					when(tmp_final_df.column1 != 'value1', "True")
					.when(tmp_final_df.column2 != 'value2', "True")
                    .when(tmp_final_df.column3 != 'value3', "True")
                    .when(tmp_final_df.column4 != 'value4', "True")
					).filter("New_col == True").drop("New_col")

# UDF Functions

In [None]:
sql = 'select * from tmp_final_df'
final_df = sqlContext.sql(sql)

def myFunction(column_value):
  return None


In [None]:
# Apply UDF
get_udf = udf(lambda i: df_column_name(i))
final_df = final_df.withColumn('New_Column_Output_From_UDF', get_udf('df_column_name'))

# Initial Data Load to Delta Folder - Only do on first load of data

In [None]:
#sql = "Select * From tmp_site"
#final_df = spark.sql(sql)

#set up first load variables
isFirstLoad = False

first_file_location = FINAL_FOLDER + 'DELTA/'

if not file_exists(first_file_location):
  #drop table
  spark.sql("DROP TABLE IF EXISTS " + str(dbName + "." + permTable))
  final_df.write.format("delta").save(first_file_location)
  spark.sql("ALTER TABLE delta.`" + FINAL_FOLDER + "DELTA/` SET TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true)")
  isFirstLoad = True

# Initial Data Load Table Creation - Only do on first load of data

In [None]:
%sql
CREATE DATABASE IF NOT EXISTS schema_name;

# --create the table where the data will go from here foward
CREATE TABLE IF NOT EXISTS ${db.table}
USING DELTA 
LOCATION '${delta.folder}';

# --clean up
DROP VIEW IF EXISTS vw_final;

# Get Deltas and write to databricks db and load folder

In [None]:
#only do if not first load
if not isFirstLoad:
  
  #optimize table
  optSql = "OPTIMIZE " + dbName + "." + permTable + " ZORDER BY (" + ','.join(fieldJoins) + ")"
  spark.sql(optSql)
  
  #get deltas to load
  
  #get latest file uploaded
  historySQL = "Select * From " + dbName + "." + permTable
  df_HISTORY = spark.sql(historySQL)
  
  #match the schema and fields in the existing table
  final_df = match_Schema(df_HISTORY,final_df)

  #drop UTC date field
  df_HISTORY = df_HISTORY.drop('EXTRACTUTCDATE')
  final_df = final_df.drop('EXTRACTUTCDATE')

  #find records that are not in or different from history to load (deltas)
  df_deltas = final_df.exceptAll(df_HISTORY)
  df_deltas.drop_duplicates()

  #add back in utc date
  df_deltas = df_deltas.withColumn("EXTRACTUTCDATE", sqlf.lit(datetime.datetime.now()))

  #write file
  file_location = FINAL_FOLDER + 'LOAD/'
  df_deltas.write.mode("Overwrite").format("parquet").save(file_location)

  #create temp table to hold updates
  df_deltas.registerTempTable("latestRecords")

  #create dynamic join string
  joinStg = ""
  for x in range(0, len(fieldJoins)):
    if x==0:
      joinStg += " (" + permTable + "." + str(fieldJoins[x]) + "=" + tmpTable + "." + str(fieldJoins[x]) + ")"
    else:
      joinStg += " And (" + permTable + "." + str(fieldJoins[x]) + "=" + tmpTable + "." + str(fieldJoins[x]) + ")"

  #create overall sql to run merge
  mainSQL = "MERGE INTO " + dbName + "." + permTable + " USING " + tmpTable + " ON (" + joinStg + ") " + \
            "WHEN MATCHED THEN UPDATE SET * " + \
            "WHEN NOT MATCHED THEN INSERT *"

  spark.sql(mainSQL)

  #remove data frame
  final_df.unpersist()
  df_deltas.unpersist()

else:
  
  #write file
  file_location = FINAL_FOLDER + 'LOAD/'
  final_df.write.mode("Overwrite").format("parquet").save(file_location)

#clean up memory before closing app
spark.sparkContext._jvm.System.gc()
