In [0]:
from datetime import date, timedelta, datetime
import hashlib 
import stringcase
import pandas as pd
from pyspark.sql.functions import col,concat,lit,current_date, lag, lead, first, last, desc, hash, date_format,coalesce
from pyspark.sql import *
from delta.tables import *
from pyspark.sql.types import TimestampType, LongType, StringType
import os
from concurrent.futures import ThreadPoolExecutor
from traitlets.config.loader import Config

def encrypt_value(encrvalue):
  sha_value = hashlib.sha256(encrvalue.encode()).hexdigest()
  #sha_value_int = int(sha_value, base=16)
  return sha_value

generate_hash = udf(encrypt_value)

def getNotebookUser():
  return dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().apply('user')

def getNotebookPath():
  return dbutils.notebook.entry_point.getDbutils().notebook().getContext().extraContext().apply('notebook_path')

def getUtcTimeNow():
  return datetime.strftime(datetime.utcnow(),'%Y-%m-%d %H:%M:%SZ')

def push_to_csv(environment, storage_name, folder_path, file_name):    
    
    output_container_name = environment
    
    # setting output paths
    output_container_path = "wasbs://{}@{}.blob.core.windows.net".format(output_container_name, storage_name)
    output_blob_folder = "{}/curated/temp/{}".format(output_container_path, file_name.replace(".csv", ""))
    
    output_blob_target = "{}/{}/{}".format(output_container_path, folder_path, file_name)
    # list files
    files = dbutils.fs.ls(output_blob_folder)

    # find data file
    output_file = [x for x in files if x.name.startswith("part-00000")]

    # copy data file under correct folder with appropriate name
    dbutils.fs.cp(output_file[0].path, output_blob_target)

    # remove temp fodler
    dbutils.fs.rm(output_blob_folder, True)

def push_to_sql(environment,df,table_name):
  Hostname = None
  Database = None
  Port = None
  UN = None
  PW = None
   
  if(environment == "dev"):
    Hostname = "d-win-sql-azwu-01.database.windows.net"
    Database = "sqldb-bimodel-devqa-01"
    Port = 1433
    UN = 'sqladmin'
    PW = 'serveradmin@123'

  if(environment == "prod"):
    Hostname = "d-win-sql-azwu-01.database.windows.net"
    Database = "sqldb-bimodel-devqa-01"
    Port = 1433
    UN = 'sqladmin'
    PW = 'serveradmin@123'
    
  Url = "jdbc:sqlserver://{0}:{1};database={2};user={3};password= {4}".format(Hostname, Port, Database, UN, PW)
  sql_table_name = "["+table_name.replace(".csv","")+"]"
  #print("push_to_sql")
  #display(df) 
  df.write.jdbc(Url, sql_table_name, mode="overwrite")
  print(sql_table_name)
  
def process_curated(environment,system_name,table_name = ""):

  #initializing variables
  g_bi_config_parameters_path = "/mnt/"+ environment + "/gold/g_bi_config_parameters"
  g_bi_config_curated_tables_columns_path = "/mnt/"+ environment + "/gold/g_bi_config_curated_tables_columns"
  
  #reading config tables
  df_bi_configuration = spark.read.format("delta").load(g_bi_config_parameters_path)
  df_bi_configuration = df_bi_configuration.filter(df_bi_configuration.SystemName == lit(system_name))
  df_bi_config_curated_tables_columns  = spark.read.format("delta").load(g_bi_config_curated_tables_columns_path)
  
  #initializing config parameter values
  curated_folder_path   =  df_bi_configuration.filter(df_bi_configuration.ParameterName == "curated_folder_path")\
                                                     .select("ParameterValue")\
                                                     .collect()[0][0]

  platinum_folder_path  =  df_bi_configuration.filter(df_bi_configuration.ParameterName == "platinum_folder_path")\
                                                      .select("ParameterValue")\
                                                      .collect()[0][0]

  #initialize storage account 
  storage_name = df_bi_configuration.filter(df_bi_configuration.ParameterName == "storage_name")\
                                                 .select("ParameterValue")\
                                                 .collect()[0][0]

  storage_key = None
  if(environment == "dev"):
    storage_key  = dbutils.secrets.get(scope="kv-bi-devqa-01-secrets", key="databricks-{}-storage-key".format(environment))
    
  if(environment == "prod"):
    storage_key  = dbutils.secrets.get(scope="kv-bi-devqa-01-secrets", key="databricks-{}-storage-key".format(environment))
 
  #initializing config parameter values
  spark.conf.set("fs.azure.account.key.{}.blob.core.windows.net".format(storage_name),storage_key)
  
  if len(table_name) > 0:
      df_bi_config_curated_tables  = df_bi_config_curated_tables_columns.filter(df_bi_config_curated_tables_columns.SourceTableName == lit(table_name))
  else:
      df_bi_config_curated_tables  = df_bi_config_curated_tables_columns.filter(df_bi_config_curated_tables_columns.SystemName == lit(system_name))
  
  table_rows  = df_bi_config_curated_tables.select("SourceTableName","TargetFilePath","TargetFileName").distinct().collect()
    
  def getTable(i):
    nonlocal platinum_folder_path
    nonlocal curated_folder_path
    nonlocal df_bi_config_curated_tables_columns
    nonlocal environment   # TS - for csv output 
    nonlocal storage_name  # TS - for csv output 
    
    source_table_name = i[0]
    target_file_path = i[1]
    target_file_name = i[2]

    p_table_name_path = platinum_folder_path + "/{}".format(source_table_name) 
    curated_folder_path_temp = curated_folder_path + "/temp/{}".format(target_file_name.replace(".csv",""))
    
    df_bi_config_curated_tables_columns_final  = df_bi_config_curated_tables_columns\
                                                 .filter(df_bi_config_curated_tables_columns.SourceTableName == lit(source_table_name))
    
    #initialize platinum dataframe
    p_df_table_name = spark.read.format("delta").load(p_table_name_path)
    
    #only export valid record in CSV file
    #p_df_table_name = p_df_table_name \
    #                  .where(p_df_table_name.ValidFromDateKey <= p_df_table_name.ValidToDateKey)

    p_table_name_schema = p_df_table_name.select("*").schema
    p_df_table_name = p_df_table_name.select(p_table_name_schema.fieldNames())
    
    for col in p_df_table_name.columns:        
      try:
        new_col = df_bi_config_curated_tables_columns_final\
                  .filter(df_bi_config_curated_tables_columns_final.SourceFieldName == lit(col)).select("TargetFieldName").collect()[0][0]
        p_df_table_name = p_df_table_name.withColumnRenamed(col,new_col)
      except IndexError:
            print(source_table_name + "-> Column Name: " + col)
            break
      
    if (system_name == "bimodelapi"):  # TS - for csv output added if condition
      p_df_table_name\
       .coalesce(1)\
       .write\
       .mode("overwrite")\
       .option("header", "true")\
       .format("com.databricks.spark.csv")\
       .save(curated_folder_path_temp)
      
      push_to_csv(environment, storage_name, target_file_path, target_file_name)
      
    #writing dataframe to Azure sql server
    push_to_sql(environment,p_df_table_name,target_file_name)
   

  with ThreadPoolExecutor(max_workers=20) as pool:
    result = pool.map(getTable,table_rows)

#   writing dataframe to temp  folder
#   p_df_table_name\
#      .coalesce(1)\
#      .write\
#      .mode("overwrite")\
#      .option("header", "true")\
#      .format("com.databricks.spark.csv")\
#      .save(curated_folder_path_temp)

#   push_to_csv(environment, storage_name, target_file_path, target_file_name)



In [0]:
# ===================================== Common Helper Functions ==================================== #
import hashlib
import os

# save file as json.
def write_to_json(filepath, data):
  try:
    dbutils.fs.rm(filepath) #if file alreay exist then delete it first
    dbutils.fs.put(filepath, json.dumps(data))
    return filepath
  except Exception as error:
    log_error("{} {}".format(notebook, error), filepath)

# save file as csv.
def _write_to_csv(filepath, data):
  return "waiting for csv file"

# hash code generator.
def encrypt_value(encrvalue):
  sha_value = hashlib.sha256(encrvalue.encode()).hexdigest()
  return sha_value
generate_hash = udf(encrypt_value)

# return bronze path
def get_bronze_path(tablename):
  return "/mnt/{}/bronze/{}".format(environment, tablename)

# return dataload audit fields
def get_dl_audit_fields(dataframe):
  dataframe = dataframe.withColumn("IBIBatchID", lit(batch_id).cast(LongType()))
  dataframe = dataframe.withColumn("IBIBatchStartDateTime", lit(datetime.utcnow()))
  dataframe = dataframe.withColumn("dl_updated_datetime", lit(datetime.utcnow()))
  return dataframe

# get file path
def get_filepath(source_path, system, entity, batch_id):
  raw_path =  "{}/{}/{}/{}/{}/{}".format(source_path, 
                                               system,
                                               entity,
                                               batch_start_datetime.strftime("%Y"), 
                                               batch_start_datetime.strftime("%m"),
                                               batch_start_datetime.strftime("%d"))
  return "{}/{}_{}.json".format(raw_path, entity, batch_id)

def convert_str_to_timestamp(date_col):
    if(date_col is None):
      date_col="01/01/1900"
    _date = datetime.strptime(date_col, '%m/%d/%Y')
    return _date.strftime('%Y-%m-%d 00:00:00')

timestamp_udf = udf(convert_str_to_timestamp)

In [0]:
def generate_automl_input_files(df, environment, storage_name, target_file_name):
    target_file_path = "/automl_rev_region_forecast/inputs/"  
    curated_folder_path_temp = curated_folder_path + "/temp/{}".format(target_file_name.replace(".csv",""))
    #display(df)
    df\
       .coalesce(1)\
       .write\
       .mode("overwrite")\
       .option("header", "true")\
       .format("com.databricks.spark.csv")\
       .save(curated_folder_path_temp)

    push_to_csv(environment, storage_name, target_file_path, target_file_name)
    

In [0]:
def check_audit_contorls(api_name,record_count,min_records,max_records,pipeline_type_null_count = 0):
  #Audit Control 
  error_msg = api_name
  if (min_records != max_records):
    error_msg = error_msg + " Min/Max record not matched {}/{} ....".format(str(min_records),str(max_records))
    
  if (api_name == "pipeline"):
    if pipeline_type_null_count > 0 :
      error_msg = error_msg + " Null Pipeline Type found in API data...."
  
  if (record_count != max_records):
    error_msg = error_msg  + " Rrecord count not matched with total records {}/{}....".format(str(record_count),str(max_records))

  if (error_msg != api_name):
    raise(Exception("Data Error", error_msg))