In [0]:
# Import pandas dataframes for transformation
import pandas as pd
# Import pandas schema library for datatype and schema validation
import pandas_schema
# Import schema validation librarires
from pandas_schema import Column
# Import Custom validation pandas schema libraries
from pandas_schema.validation import CustomElementValidation,CustomSeriesValidation
import numpy as np
#Import decimal Library
from decimal import *
import sys
import math
from csv import writer
# contains string manipulation libraries
import re
#import datatime library
from datetime import datetime
#import pyspark libraries
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import lit
from pyspark.sql.functions import max as max_
from pyspark.sql.functions import col
# import time library to perfrom time related validations.
import time

In [0]:
spark.conf.set(
  "fs.azure.account.key.metadatapocsalesforce",
  dbutils.secrets.get(scope="metadatapocsalesforce",key="storageaccesskey"))


In [0]:
# set the raw mount location
raw_zone_mnt = "/mnt/rawzone/"
# set the stage mount location
stage_zone_mnt = "/mnt/stagezone/"
#set errorlog mnt location
errorlog_mnt = stage_zone_mnt+"Temptable/"
# set the Qualified Folder location
qffilepath = stage_zone_mnt + 'Qualified' +'/'
# set the Rejected Folder location
rjfilepath = stage_zone_mnt  + 'Rejected' +'/'
#print(errorlog_mnt)
#print(rjfilepath)
errorlogtable ='pocdwh.ErrorLog'

In [0]:
%sql
-- truncate  table error log table before each execution 
TRUNCATE TABLE pocdwh.SAP_ErrorLog

In [0]:
jdbcHostname = "poc-metadatadriven.database.windows.net"
jdbcDatabase = "MetadataDB"
jdbcUsername="vsagala"
jdbcPassword="Pass@123"
jdbcPort = 1433
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname, jdbcPort, jdbcDatabase)
connectionProperties = {
  "user" : jdbcUsername,
  "password" : jdbcPassword,
  "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

In [0]:
#print(qffilepath)
#dbutils.fs.ls(rjfilepath)
#dbutils.fs.ls("/mnt/material_sap_raw/2020/05/21")
#display(dbutils.fs.ls("abfss://edna-materialdatadomain@[REDACTED].dfs.core.windows.net/Raw/SAP/2020/05/21/Prod_Master_Material_Attr_05-21-2020_03_04_06.csv"))
#filepath = "/mnt/material_sap_raw/2020/05/21"

In [0]:
#dbutils.widgets.text("sourcetype", "","")
#source_type = dbutils.widgets.get("sourcetype")
#df_datatype_details_query = "(SELECT SAP_DataType as sapdatatype,SPARK_DataType as sparkdatatype,ValidationFunction as validationfn from dbo.Conf_SAPtoSPARKDataTypeMapping where Source = '" + source_type + "') conf"
#df_datatype_details_spk = spark.read.jdbc(url=jdbcUrl, table=df_datatype_details_query, properties=connectionProperties)
#df_datatype_details = df_datatype_details_spk.toPandas() 
#display(df_datatype_details)

#dbutils.widgets.removeAll()


In [0]:
df_datatype_details_query = "(SELECT SAP_DataType as sapdatatype,SPARK_DataType as sparkdatatype,ValidationFunction as validationfn from dbo.Conf_SAPtoSPARKDataTypeMapping) conf"
df_datatype_details_spk = spark.read.jdbc(url=jdbcUrl, table=df_datatype_details_query, properties=connectionProperties)
df_datatype_details = df_datatype_details_spk.toPandas() 
#display(df_datatype_details)

In [0]:
def generate_schemafile_df(sfp):
  schemafilepath = sfp
  #read the schema file from the ADLS Storage
  df_schemafile_raw=spark.read.format('csv').options(header='false',inferSchema='true').load(sfp).toPandas()
  # remove the first 7 rows which contains the header information.
  #df_schemafile_raw_splt=df_schemafile_raw.drop(df_schemafile_raw.index[[0,1,2,3]])._c0.str.split("|",expand=True)
  df_schemafile_raw_splt=df_schemafile_raw._c0.str.split("|",expand=True)
  # rename the columns to the  names defined in the schema file.
  df_schemafile_raw_splt_ren=df_schemafile_raw_splt.rename(columns={0: "Column", 1: "FieldName",2: "Key",3: "sapdatatype",4: "Length",5: "outputlen",6: "decimals"})
  #merge the schema file dataframe with the datatype mapping dataframe to create a mapping between SAP and SPARK dattypes
  df_schemafile_datatypes = pd.merge(df_schemafile_raw_splt_ren, df_datatype_details, on='sapdatatype', how='inner').sort_values(by=['Column'])
  # update the validation function to include an empty_validation check on key columns.
  df_schemafile_datatypes.loc[df_schemafile_datatypes['Key'] == 'X', 'empty_check'] = '+empty_validation'
  df_schemafile_datatypes.loc[df_schemafile_datatypes['Key'] != 'X', 'empty_check'] = ''
  df_schemafile_datatypes['validationfnconcat'] = df_schemafile_datatypes['validationfn'] + df_schemafile_datatypes['empty_check']
  df_schemafile_datatypes_final=df_schemafile_datatypes.drop(['validationfn'], axis=1).drop(['empty_check'], axis=1).rename(columns={"validationfnconcat": "validationfn"})
  # reset the index of the dataframe as the we have deleted 6 rows from the file which contained the file metadata information
  df_schemafile_datatypes_resetindex = df_schemafile_datatypes_final.reset_index()
  # transpose the dataframe to get the header information.
  df_header_trans = df_schemafile_datatypes_resetindex.transpose()
  # get the Field names from the transposed dataframe
  df_header= df_header_trans.iloc[[2], :]
  #create a new attribute indexno.
  df_schemafile_datatypes_resetindex['indexno']= df_schemafile_datatypes_resetindex.index
  return(df_schemafile_datatypes_resetindex)

In [0]:
def generate_datafile_df(dfp):
  datafilepath = dfp
  #Read the csv file from the mount drive
  df_datafile_raw_splt=spark.read.option("delimiter", "|").csv(datafilepath).toPandas()
  #replace the column names of the dataframe
  df_datafile_raw_splt = df_datafile_raw_splt.rename(columns=rename_col_fun)
  #index the rows
  df_datafile_raw_splt['indexno']= df_datafile_raw_splt.index
  #Remove all null values and replace it with empty string to maintaing consistency
  df_datafile_raw_splt.fillna("", inplace = True)
  #sort by index no
  df_datafile_raw_splt.sort_values(by=['indexno'], inplace=True)
  #return(df_datafile_raw_splt_resetindex)
  return(df_datafile_raw_splt)

In [0]:
def rename_col_fun(x):
    if "_c" in x :
      return int(x.replace("_c", "")) # or None
    return int(x.replace("_c", ""))

In [0]:
  def merge_schemaanddatafiles(sdf,ddf) : 
    df_sf_dpindx = sdf
    df_df = ddf
    del df_sf_dpindx['indexno']
    # transpose the rows to columns 
    df_header_trans = df_sf_dpindx.transpose()
    # get the Field names from the transposed dataframe
    df_header= df_header_trans.iloc[[2], :]
    #Merge the header file with the data file.
    df_datafile_with_header=pd.concat([df_header,df_df],axis=0)
    #Grab the first row of the data frame to set it as the header
    new_header = df_datafile_with_header.iloc[0] 
    #set the first row as the header
    df_datafile_with_header = df_datafile_with_header[1:] 
    #set the header row as the dataframe header
    df_datafile_with_header.columns = new_header 
    return(df_datafile_with_header)

In [0]:
def column_count_check(sdf,ddf,flname):
  #del ddf['index']
  del ddf['indexno']
  if (sdf.empty ==False and ddf.empty ==False):
      columns_cnt = (len(ddf.columns))
      rows_cnt = len(sdf)
      if (columns_cnt != rows_cnt):
          error_desc='Schema file has '+ str(rows_cnt) + ' attributes and data file has '+ str(columns_cnt) + ' attributes.Schema mismatch'
          dateTime = datetime.now()
          # if not already created automatically, instantiate Sparkcontext
          spark = SparkSession.builder.getOrCreate()
          column_names = ['error_desc', 'error_file', 'error_time','validationtype']
          vals = [(error_desc,flname,str(dateTime),'SchemaValidation')]
          error_df = spark.createDataFrame(vals, column_names)
          #call the error log function
          write_log(error_df)  
      else : print('Schema and File structure match')
  else: print('Schema file or DataFile is empty')    

In [0]:
def check_duplicates(ddf,flname):
  # identify all the duplicate records using the inbuilt duplicated function and store it to a dataframe
  df_rem_indxno =ddf.loc[:, ddf.columns != 'indexno']
  df_duplicate_records = df_rem_indxno[df_rem_indxno.duplicated(keep=False)]
  # if there are no duplicates return true
  if (df_duplicate_records.empty == True) :
      print ("No Duplicate records")
  else: 
      #initiate the error data frame
      column_names = ['error_desc', 'error_file', 'error_time','validationtype']
      error_df = pd.DataFrame(columns = column_names)
      # Get the rownumber of the duplicate record in the file.
      df_duplicate_records['index'] = df_duplicate_records.index
      # concatenate the entire duplicate row using pipe seperator, the last column in each row will contain the duplicate record row number
      df_duplicate_records['concatenate_row'] = df_duplicate_records.loc[:, df_duplicate_records.columns].apply(lambda row: '|'.join(row.values.astype(str)), axis=1)
      # assign the error_desc and the context information to the error dataframe
      error_df["error_desc"] = df_duplicate_records["concatenate_row"]
      error_df["error_file"] = flname
      dateTime = datetime.now()
      error_df['error_time'] = str(dateTime)
      error_df["validationtype"] = 'check_duplicates'
      #convert to spark dataframe and send the information to the error log table.
      df_duplicatecheck_validation_errors_spk = spark.createDataFrame(error_df.astype(str))
      #call the error log function
      write_log(df_duplicatecheck_validation_errors_spk)  

In [0]:
def check_businesskeyduplicates(sdf,ddf,flname):
  # Check for the primary key fields in the schema file
  df_pk_keys =  sdf['Key']=='X'
  #filter only the PK rows in the schema file.
  df_pk_attributes = sdf[df_pk_keys]
  #loop the dataframe to get the complete list of pk attributes for the data frame.
  if (df_pk_attributes.empty == False):
    lst =[]
    for index, row in df_pk_attributes.iterrows():
      #derive the key values and append it to a list
      col_name = row['indexno']
      lst.append(col_name)
      dfObj = pd.DataFrame(ddf, columns=lst)
      #identify the duplicate records and store it a dataframe
      df_duplicatebusinesskeysdups = dfObj[dfObj.duplicated(keep=False)]
      # if there are no duplicates return true
    if (df_duplicatebusinesskeysdups.empty == True) :
        print ("No Duplicate records")
    else:
      column_names = ['error_desc', 'error_file', 'error_time','validationtype']
      error_df = pd.DataFrame(columns = column_names)
      # this will give the row number of the duplicate record in the file.
      df_duplicatebusinesskeysdups['indexno'] = df_duplicatebusinesskeysdups.index
      #get the index nos of the duplicate rows from the dataframe
      df_duplicatebusinesskeysdups_final = df_duplicatebusinesskeysdups[['indexno']]
      #join on the index with the original dataframe to get the full row vales
      df_duplicatebusinesskeys_duplicate=pd.merge(ddf,df_duplicatebusinesskeysdups_final,on='indexno')
      # concatenate the entire duplicate row using pipe seperator, the last column in each row will contain the duplicate record row number
      df_duplicatebusinesskeys_duplicate['concatenate_row'] = df_duplicatebusinesskeys_duplicate.loc[:, df_duplicatebusinesskeys_duplicate.columns].apply(lambda row: '|'.join(row.values.astype(str)), axis=1)
      # assign the error_desc and the context information to the error dataframe
      error_df["error_desc"] = df_duplicatebusinesskeys_duplicate["concatenate_row"]
      error_df["error_file"] = flname
      dateTime = datetime.now()
      error_df['error_time'] = str(dateTime)
      error_df["validationtype"] = 'check_businesskey_duplicates'
      #convert to spark dataframe and send the information to the error log table.
      df_duplicatecheck_validation_errors_spk = spark.createDataFrame(error_df.astype(str))
      #call the error log function
      write_log(df_duplicatecheck_validation_errors_spk)  
  else: 
    print ("No Business keys defined")
    # When no keys are defined to validate the uniqueness of the record, check for exact duplicates.
    check_duplicates(ddf,flname)

In [0]:
def file_exists(path):
  try:
    dbutils.fs.ls(path)
    return True
  except Exception as e:
    if 'java.io.FileNotFoundException' in str(e):
      return False
    else:
      raise

In [0]:
def df_isempty(dfp):
  datafilepath = dfp
  #Read the csv file from the mount drive
  df_datafile_raw=spark.read.format('csv').options(header='false',inferSchema='true').load(datafilepath).toPandas()
  #print(df_datafile_raw)  
  if (df_datafile_raw.empty == True):
    return True
  else:
    return False


In [0]:
def check_int(num):
    default = 0
    try:
        if num is None or num =='' :
          int(default)
        else: 
          int(num)
    except ValueError:
        return False
    return True
  
def check_string(st):
    try:
        str(st)
    except ValueError:
        return False
    return True
  
def check_decimal(dec):
  default = 0
  try:
        if dec is None or dec =='':
          Decimal(default)
        else:
          if "," in dec :
            Decimal(dec.replace(',',''))
          else: Decimal(dec)
  except InvalidOperation:
        return False
  return True
  
def check_float(flt):
    default =0
    try:
        if flt is None or  flt =='':
          float(default)
        else :
          if "," in dec :
            float(flt.replace(',',''))
          else:float(flt)
    except InvalidOperation:
        return False
    return True
  
def check_missingvalues(ip_val):
   return False if len(str(ip_val)) == 0  else True

def validate_date(date_text):
    dateformat = "%Y%m%d"
    try:
      if date_text == '' or date_text=='00000000':
          date_text= '19000101'
      else:
        validdate=datetime.strptime(date_text, dateformat)
    #except InvalidOperation:
    except ValueError:
        return False
    return True

def validate_time(time_text):
    timeformat = "%H%M%S"
    try:
      if time_text == '':
          time_text= '000000'
      else:
        validtime = time.strptime(time_text, timeformat)
    except ValueError:
        return False
    return True

In [0]:
decimal_validation = [CustomElementValidation(lambda d: check_decimal(d), 'is not decimal')]
int_validation = [CustomElementValidation(lambda i: check_int(i), 'is not integer')]
string_validation = [CustomElementValidation(lambda s: check_string(s), 'is not string')]
null_validation = [CustomElementValidation(lambda d: d is not np.nan, 'this field cannot be null')]
empty_validation = [CustomElementValidation(lambda ev: check_missingvalues(ev), 'this field cannot have empty values')]
date_validation = [CustomElementValidation(lambda dt: validate_date(dt), 'Incorrect date format, should be in YYYYMMDD format')]
time_validation = [CustomElementValidation(lambda tm: validate_time(tm), 'Incorrect time format, should be HHMMDD format')]

In [0]:
def write_csv(df,qffilepath,flname,fltimestamp):
  dateTime = datetime.now() 
  ldYear = str(dateTime.year)
  ldMonth = str(dateTime.month)
  ldDay = str(dateTime.day)
  file_name = flname
  if len(ldMonth) ==1:
    ldMonth = '0'+ldMonth
  else : ldMonth
  if len(ldDay) ==1:
    ldDay = '0'+ldDay
  else : ldDay
  df['Year'] =ldYear
  df['Month']=ldMonth
  df['Day'] = ldDay
  #path = "/mnt/material_sap_stage/Qualified/Prod_Master_Mat_Sales_Attr/"
  df_datafile_with_header_spk = spark.createDataFrame(df.astype(str))
  df_datafile_with_header_spk_tmpstmp = df_datafile_with_header_spk.withColumn("StageZoneTimestamp", lit(dateTime)).withColumn("File_Name", lit(flname)).withColumn("File_Time", lit(fltimestamp))
  df_datafile_with_header_spk_tmpstmp.write.format("delta").mode("append").partitionBy('Year','Month','Day').option("mergeSchema", "true").save(qffilepath)
  #display(df_datafile_with_header_spk_tmpstmp)

In [0]:
def write_log(error_df):
  error_df.write.format("delta").mode("append").option("mergeSchema", "true").saveAsTable(errorlogtable)

In [0]:
def write_log_todb(error_df):
  df_error_desc_spk = spark.createDataFrame(error_df.astype(str))
  df_error_write = DataFrameWriter(df_error_desc_spk) 
  #write audit record to database.
  df_error_write.jdbc(url=jdbcUrl, table= "[dbo].[Log_DBX_DataValidations_ErrorDetails]", mode ="append", properties = connectionProperties) 

In [0]:
def validate_datatype(colindx,validation,d,sapdatatype,flname,colname):
  #set the parameter values
  col_index = colindx
  validation_type =validation
  datatype= sapdatatype
  col_name = colname
  df_vald=d
  #rename the columnname of the dataframe
  df_vald.rename(columns = {col_index:col_name}, inplace = True) 
  #convert function name to object
  function=eval(validation_type)
  #validate the column datatype
  schema=pandas_schema.column.Column(col_name,function)
  #capture errors from validation function
  errors = schema.validate(df_vald[col_name])
  #initialize the error dataframe
  error_df = pd.DataFrame({'col':errors})
  #capture the error rows
  errors_index_rows = [e.row+1 for e in errors]
  if error_df.empty == False :
    error_df = pd.DataFrame({'error_desc':errors})
    error_df['error_file'] = flname
    dateTime = datetime.now()
    error_df['error_time'] = str(dateTime)
    error_df["validationtype"] = 'check_datatype'
    #display(error_df)
    return(error_df)
  else: print('No Records to print')  

In [0]:
def check_float_datatype(flt):
    default ='0'
    try:
        if flt is None or  flt =='':
          float(default)
        else :
          float(flt)
    except ValueError:
        return False
    return True

In [0]:
def precision_and_scale(col_idx,dtlen,dtprec,dtype,ddf,fl,colname):
  max_len = dtlen
  col_name = colname
  error_desc = []
  errorfile = []
  errortimestamp=[]
  validationtype=[]
  for index, row in ddf.iterrows():
    field_val=row[col_idx]
    idx= index
    if (dtprec>0) :
      if check_float_datatype(field_val) == True:
        if (field_val is None) :
          field_val ='0.00'
        else:
          if (len(field_val) )==0 : 
            field_val ='0.00'
          else : field_val
        deci = Decimal(field_val.replace(',',''))
        exp=deci.as_tuple().exponent
        mag=int(deci)
        exp_len=abs(exp)
        mag_len=len(str(mag))
        if (mag_len >max_len) or (exp_len > dtprec) :
          error_value= '{row: '+str(idx)+' column:" '+ col_name +'"} Value'+ str(deci) +' has exceeded the maximum length limit of Magnitude' + str(max_len) +  ' and/or maximum scale limit of ' + str(dtprec) + ' respectively'
        else: error_value ='No Error'
        dateTime = datetime.now()
        error_desc.append(error_value)
        errorfile.append(fl)
        errortimestamp.append(dateTime)
        validationtype.append("checkdatatypelength")
      # for the rest of the datatypes where the precision is not required check the field value length against the maximum allowable length  
    elif (dtprec==0):
      if field_val is None:
          field_val =''
      else : field_val
      if (len(str(field_val)) > max_len):
        print("entered this loop")
        error_value='{row: '+str(idx)+' column:"'+ col_name +'"} Value '+ str(field_val)+ '  has the exceeded maximum length limit of ' +str(max_len)
      else:error_value ='No Error'
      dateTime = datetime.now()
      error_desc.append(error_value)
      errorfile.append(fl)
      errortimestamp.append(dateTime)
      validationtype.append("checkdatatypelength")
  df_error = pd.DataFrame({'error_desc':error_desc,'error_file':errorfile,'error_time':errortimestamp,'validationtype':validationtype})
  return df_error

In [0]:
def datatype_validation(sdf,ddf,flname) :
  column_names = ['error_desc', 'error_file', 'error_time','validationtype']
  df_datatype_validation_errors = pd.DataFrame(columns = column_names)
  #Check the count of records in the schema file data frame.
  if (len(sdf)>0):
    #Check the count of records in the data file data frame.
    row_cnt = len(ddf)
    # perform the validations only when the records exists
    if (row_cnt > 0):
          for index, row in sdf.iterrows():
            col_index = row['indexno']
            col_name = row['FieldName']
            validation_function=row['validationfn']
            sapdatatype=row['sapdatatype']
            returnvalue=validate_datatype(col_index,validation_function,df_data_file,sapdatatype,flname,col_name)
            df_datatype_validation_errors = df_datatype_validation_errors.append(returnvalue, ignore_index = True)
            #print(returnvalue)
    else: print('No records avilable in the data file dataframe for validation')
  else: print('No records avilable in the scheme file dataframe for validation') 
  print(df_datatype_validation_errors)
  if df_datatype_validation_errors.empty == False :
    print(df_datatype_validation_errors)
    df_datatype_validation_errors_spk = spark.createDataFrame(df_datatype_validation_errors.astype(str))
    write_log(df_datatype_validation_errors_spk)
  else: print("No Error Records to write")  

In [0]:
def deflt_date_time_cols(sdf,ddf):
  df_schema = sdf
  df_data = ddf
  df_schema_filter=df_schema.query('sapdatatype == "DATS" | sapdatatype == "TIMS"')
  for index, row in df_schema_filter.iterrows():
    col_index = row['indexno'] #row_no
    colid=str(col_index)
    dtype = row['sapdatatype']
    srccolname='df_data'+'['+colid+']'
    eval_col_name=eval(srccolname)
    tgtcolname=eval(colid)
    if (dtype == 'DATS') :
      df_data.loc[(eval_col_name == '00000000') | (eval_col_name == '') , tgtcolname] = "19000101"
    else:
      df_data.loc[(eval_col_name == '000000') | (eval_col_name == '') , tgtcolname] = "000000"
  return(df_data) 

In [0]:
def remove_commas_from_numericalcols(sdf,ddf):
  df_schema = sdf
  df_data = ddf
  df_schema_filter=df_schema.query('sapdatatype == "INT" | sapdatatype != "000000"')
  for index, row in df_schema_filter.iterrows():
    col_index = row['indexno'] #row_no
    colid=col_index
    df_data[colid] = df_data[colid].str.replace(',','')
  return(df_data) 

In [0]:
def move_to_rejected(sfile,dpath):
  df_read_rawfile=spark.read.format('csv').options(header='false',inferSchema='true').load(sfile)
  df_read_rawfile.coalesce(1).write.mode('overwrite').format('com.databricks.spark.csv').option('header', 'false').save(dpath)

In [0]:
#iterate the rows on Schema dataframe and assign the values and pick the values required for validation.
def datalength_validation(sdf,ddf,flname) : 
  column_names = ['error_desc', 'error_file', 'error_time','validationtype']
  df_datalength_validation_errors = pd.DataFrame(columns = column_names)
  if (len(sdf)>0):
    row_cnt = len(ddf)
    if (len(ddf)>0):
        for index, row in sdf.iterrows():
          col_index = row['indexno'] #row_no
          dtlen = int(row['Length']) #permissible length
          dtprec =int(row['decimals']) #permissible decimal length
          dtype = row['sapdatatype']# sapdatatype
          col_name = row['FieldName']#Fieldname
          df_data_file = pd.DataFrame(ddf, columns = [col_index]) 
          # fn call to datalength validation function
          returnvalue=precision_and_scale(col_index,dtlen,dtprec,dtype,df_data_file,flname,col_name)
          #returnvalue=precision_and_scale(col_index,dtlen,dtprec,dtype,df_data_file,flname)
          df_datalength_validation_errors = df_datalength_validation_errors.append(returnvalue, ignore_index = True)
          df_datalength_validation_errors_fltr=df_datalength_validation_errors[df_datalength_validation_errors['error_desc']!='No Error']
    else: print('No records avilable in the scheme file dataframe for validation') 
  else: print('No records avilable in the scheme file dataframe for validation') 
  if df_datalength_validation_errors_fltr.empty == False :
    df_datalength_validation_errors_fltr_spk = spark.createDataFrame(df_datalength_validation_errors_fltr.astype(str))
    write_log(df_datalength_validation_errors_fltr_spk)
    #write_log_todb(df_datalength_validation_errors_fltr)
  else: print("No Error Records to write")

In [0]:
def write_process_status(flname,sf,st,enty,ft,zfp) :
  #FileProcessing End Time
  dateTime = datetime.now()
  File_Name = flname #filename
  Source = 'SAP' #sourcename
  Loaded_By ="DB-Materials-SAP-RawtoStageDataValidation" #loadsource
  Updated_By= "DB-Materials-SAP-RawtoStageDataValidation" #udpatedsource
  Status_Flag = sf #statusflag:2-Qualifed,3-Rejected
  Start_Time=st #file process start time
  End_Time =dateTime #file process end time
  Entity =enty #sap file entity name
  File_Time =ft # derived from filename
  Loaded_Time =dateTime # load time into log table
  Updated_Time =dateTime # updated time into log table
  Zone_FilePath=zfp # destination path of the file
  DataDomain='Material'
  # instantiate a empty spark dataframe
  spark = SparkSession.builder.getOrCreate()
  #create dummy columns with dummydata
  columns = ['id','id1']
  vals = [(0,0)]
  # create audit dataframe
  df_audit_frame = spark.createDataFrame(vals, columns)
  #create audit columns and delete dummy columns
  df_audit=df_audit_frame.withColumn("File_Name", lit(File_Name)).withColumn("Source", lit(Source)).withColumn("Status_Flag", lit(Status_Flag)).withColumn("Start_Time", lit(Start_Time)).withColumn("End_Time", lit(End_Time)).withColumn("Entity", lit(Entity)).withColumn("File_Time", lit(File_Time)).withColumn("Loaded_By", lit(Loaded_By)).withColumn("Loaded_Time", lit(Loaded_Time)).withColumn("Updated_By", lit(Updated_By)).withColumn("Updated_Time", lit(Updated_Time)).withColumn("Zone_FilePath", lit(Zone_FilePath)).withColumn("DataDomain", lit(DataDomain)).drop("id").drop("id1")
  df_audit_write = DataFrameWriter(df_audit) 
  #write audit record to database.
  df_audit_write.jdbc(url=jdbcUrl, table= "[dbo].[Log_DataFeedRepository]", mode ="append", properties = connectionProperties)

In [0]:
#list all the files from rawzone  
df_file_details=(dbutils.fs.ls("/mnt/rawzone"))
df_cntrl_info_pd=pd.DataFrame(df_file_details)

for index, row in df_cntrl_info_pd.iterrows():
  #fileprocessing starttime
   # derive filename
  flname=row['name'] 
  #filter the schema files and loop data files only
  if (flname.startswith("S_") != True ) :
    st = datetime.now()
    #derive schemaname
    sflname = 'S_'+flname
    #derive file data information
    match_pattern = flname[-32:]
    derive_filedate = match_pattern[:10].replace('-','')
    #print(derive_filedate)
    year = derive_filedate[:4]
    month = derive_filedate[4:6]
    day = derive_filedate[6:8]
    extract_time = flname[-21:-13].replace('_',':')
    #print(extract_time)
    feednm=flname[:-33]
    #print(feednm)
    filetimestamp_str = year+'-'+month+'-'+day+'T'+extract_time+'.000000'
    filetimestamp=datetime.strptime(filetimestamp_str, '%Y-%m-%dT%H:%M:%S.%f')
    #print(filetimestamp_str)
    #print(filetimestamp)
    #set data file path
    #datafilepath = raw_zone_mnt  + year + '/'+ month +'/'+ day +'/'+flname
    datafilepath = raw_zone_mnt  + flname
    #print(datafilepath)
    # set schema file path
    #schemafilepath = raw_zone_mnt  + year + '/'+ month +'/'+ day +'/'+sflname
    schemafilepath = raw_zone_mnt  +sflname
    #print(datafilepath)
    #print(schemafilepath)
    #set qualified folder path
    qualifiedfolderpath = qffilepath+feednm
    #rejected folder path
    rejectedfolderpath = rjfilepath + year + '/'+ month +'/'+ day 
    # check if the schema file exists
    #print(qualifiedfolderpath)
    #print(rejectedfolderpath)
    sfileexists = file_exists(schemafilepath)
    # check if dataframe is empty
    dfcheckempty= df_isempty(datafilepath)
    # check if the data file exists
    dfileexists = file_exists(datafilepath)
    # process the files only when both schema file and data file exists
    if (sfileexists == True) and (dfileexists == True) :
      #check if dataframe is empty, this is possible when there are no incremental records coming in from the SAP Feeds.
      if (dfcheckempty == False) :
        # fn call to get a schema file data
        #print(schemafilepath)
        df_schema_file = generate_schemafile_df(schemafilepath)
        # fn call to get the data file data
        df_data_file_raw = generate_datafile_df(datafilepath)
        #load data dataframe
        df_data_file_dflt = deflt_date_time_cols(df_schema_file,df_data_file_raw)
        #remove commas from numerical columns
        df_data_file = remove_commas_from_numericalcols(df_schema_file,df_data_file_dflt)
        #fn call to execute column count validation
        duplicatecheckbusinesskey_validation_result=check_businesskeyduplicates(df_schema_file,df_data_file,flname)
        # fn call to execute column count validation
        columncountcheck_validation_result = column_count_check(df_schema_file,df_data_file,flname)
        #fn call to execute datalength validation
        data_length_validation_result = datalength_validation(df_schema_file,df_data_file,flname)
        # fn call to execute datatype validation
        data_type_validation_result = datatype_validation(df_schema_file,df_data_file,flname)
        #df_datafile_final = merge_schemaanddatafiles(df_schema_file,df_data_file)
        df_datafile_final = df_data_file
        df_datafile_final['SeqNo']= df_datafile_final.index
        qtflname = "'"+flname+"'"
        #build the querystring to query the error tables
        query = 'select error_desc,error_file,error_time,validationtype from {} where error_file = {}'.format(errorlogtable,qtflname)
        querycnt = 'select count(*) as cnt from {} where error_file = {}'.format(errorlogtable,qtflname)
        df_error = sqlContext.sql(querycnt).toPandas()
        df_error_desc_result = sqlContext.sql(query)
        df_error_desc_result_sql= df_error_desc_result.registerTempTable("df_error_desc_result")
        #pick only top 20 records for each file,validationtype error combination 
        df_error_desc=sqlContext.sql("select error_desc,error_file,error_time,validationtype from (select *,row_number() over (partition by error_file,validationtype order by error_time desc) as rn from df_error_desc_result) a where a.rn <=20").toPandas()
        #df_errcnt = int(df_error.get_value(0,'cnt')
        df_errcnt = int(df_error.loc[0,'cnt'])
        #print(df_errcnt)
        #if(df_Material_filter.count() == 0)
        #if (df_error.count()!=0) :
        print(schemafilepath)
        print(datafilepath)
        print(rejectedfolderpath)
        print(qualifiedfolderpath)
        if (df_errcnt > 0 ):
          print("entered this loop")
          if (file_exists(rejectedfolderpath) == True) :
          #create the directory.
            dbutils.fs.mv(schemafilepath,rejectedfolderpath)
            dbutils.fs.mv(datafilepath,rejectedfolderpath) 
            sf=3
            write_process_status(flname,sf,st,feednm,filetimestamp,rejectedfolderpath)
            write_process_status(sflname,sf,st,feednm,filetimestamp,rejectedfolderpath)
            write_log_todb(df_error_desc)
          else:
            dbutils.fs.mkdirs(rejectedfolderpath)
            dbutils.fs.mv(schemafilepath,rejectedfolderpath)
            dbutils.fs.mv(datafilepath,rejectedfolderpath)
            sf=3
            write_process_status(flname,sf,st,feednm,filetimestamp,rejectedfolderpath)
            write_process_status(sflname,sf,st,feednm,filetimestamp,rejectedfolderpath)
            write_log_todb(df_error_desc)
        else :
          if (file_exists(qualifiedfolderpath) == True):
            print("writedata")
            #write_csv(df_datafile_final,qualifiedfolderpath,flname,filetimestamp)
            dbutils.fs.mv(schemafilepath,qualifiedfolderpath)
            dbutils.fs.mv(datafilepath,qualifiedfolderpath)
            sf=2
            write_process_status(flname,sf,st,feednm,filetimestamp,qualifiedfolderpath)
            write_process_status(sflname,sf,st,feednm,filetimestamp,qualifiedfolderpath)
          else :
            #create the directory.
            dbutils.fs.mkdirs(qualifiedfolderpath)
            #write_csv(df_datafile_final,qualifiedfolderpath,flname,filetimestamp)
            dbutils.fs.mv(schemafilepath,qualifiedfolderpath)
            dbutils.fs.mv(datafilepath,qualifiedfolderpath)
            sf=2
            write_process_status(flname,sf,st,feednm,filetimestamp,qualifiedfolderpath)
            write_process_status(sflname,sf,st,feednm,filetimestamp,qualifiedfolderpath)
      else:
        #the file is empty,therefore no data to process
        print("nodata to write")
        sf=2
        write_process_status(flname,sf,st,feednm,filetimestamp,qualifiedfolderpath)
        write_process_status(sflname,sf,st,feednm,filetimestamp,qualifiedfolderpath)
    else : 
      print("Schema file or data file is missing")
      #log when the schema file or the data file is missing.
      error_desc='Schema file '+ sflname +' or ' +flname + ' is missing'
      error_file = sflname + ' or ' + flname
      validationtype = 'Fileexistsvalidation'
      dateTime = datetime.now()
      # if not already created automatically, instantiate Sparkcontext
      spark = SparkSession.builder.getOrCreate()
      column_names = ['error_desc', 'error_file', 'error_time','validationtype']
      vals = [(error_desc,flname,str(dateTime),validationtype)]
      error_df = spark.createDataFrame(vals, column_names)
      #call the error log function
      sf=3
      st=dateTime 
      rejectedfolderpath ='NA'
      write_process_status(flname,sf,st,feednm,filetimestamp,rejectedfolderpath)
      write_log_todb(error_df.toPandas())
  else :
    print("No Files Found")