In [None]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.dynamicframe import DynamicFrame
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.context import SparkContext
from pyspark.sql.functions import when,udf,lit,split,current_timestamp
import re

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)


## @type: DataSource
## @args: [database = "aroa-dw-pedro", table_name = "sources", transformation_ctx = "<transformation_ctx>"]
## @return: sources
## @inputs: []
sources = glueContext.create_dynamic_frame.from_catalog(database = "aroa-dw-pedro", table_name = "sources", transformation_ctx = "sources")
sourcesDF = sources.toDF()

## @type: DataSource
## @args: [database = "aroa-dw-pedro", table_name = "rules", transformation_ctx = "rules"]
## @return: rules
## @inputs: []
rules = glueContext.create_dynamic_frame.from_catalog(database = "aroa-dw-pedro", table_name = "rules", transformation_ctx = "rules")
rulesDF = rules.toDF()

## @type: DataSource
## @args: [database = "aroa-dw-pedro", table_name = "targets", transformation_ctx = "targets"]
## @return: targets
## @inputs: []
targets = glueContext.create_dynamic_frame.from_catalog(database = "aroa-dw-pedro", table_name = "targets", transformation_ctx = "targets")
targetsDF = targets.toDF()

sources = {}
rules = {}
targets = {}

for x in sourcesDF.collect():
    sources[x['source_var']] = x.asDict()

for x in rulesDF.collect():
    rules[x['rule_name']] = x.asDict()
    
for x in targetsDF.collect():
    targets[x['target_var']] = x.asDict()

def setNewValue(severity,df,oldColumn,newColumn,default):
    
    if severity.upper() == 'CRITICAL':
        interupt = df.filter(df[newColumn]==False).count()
        if interupt:
            sys.exit()
    elif severity.upper() == 'NONE':
        result = df
        return result
    
    result = df.withColumn('severity', when(df[newColumn],'Success').otherwise(severity))
    result = result.withColumn(newColumn, when(df[newColumn],df[oldColumn]).otherwise(default))
    
    return result

def checkRegex(name,df,column,exp):
    temp = df.withColumn('ptemp', df[column])
    result = temp.withColumn(name, temp.ptemp.rlike(exp)).drop('ptemp')
    
    return result

def checkLink(name,df1,column1,df2,column2):
    
    # Rename the column in the second dataframe to avoid ambiguity
    temp = df2.withColumnRenamed(column2,'ptemp').select('ptemp')
    
    # Execute the left join to check links
    case = df1.join(temp, df1[column1] == temp.ptemp, 'left')
    
    # Create the column with true or false argument according to null values in left join
    result = case.withColumn(name, when(case.ptemp.isNull(),False).otherwise(True)).drop('ptemp')
    
    return result

# Define if the value is 4 numerical digits
def isYear(arg):
    if re.match('\d\d\d\d$', arg):
        return True
    else:
        return False

# Define if the value is a number in a 1-31 range    
def isDay(arg):    
    if re.match('^\d{1,2}$', arg):
        if int(arg)>0 and int(arg)<=31:
            return True
        else:
            return False
    else:
        return False

# Define if the value is considerable as a month and return it in a numerical format
def isMonth(arg):
    months = ['JANUARY','FEBRUARY','MARCH','APRIL','MAY','JUNE','JULY','SEPTEMBER','OCTOBER','NOVEMBER','DECEMBER']
    
    #Check if the month is in 99 format and between 1-12
    if re.match('^\d{1,2}$', arg):
        if int(arg)>=1 and int(arg)<=12:
            return arg
        else:
            return arg
    
    #Check if the month correspond to one of the preset values
    elif any(re.match('^'+arg.upper(),month) for month in months):
        month = next(i+1 for i,month in enumerate(months) if re.match('^'+arg.upper(),month))
        return "{0:0=2d}".format(month)
    else:
        return False
    
def getDate(row):    
    if isYear(row[0]) and isMonth(row[1]) and isDay(row[2]):
        return row[0]+'-'+str(isMonth(row[1]))+'-'+row[2]
        
    elif isDay(row[0]) and isMonth(row[1]) and isYear(row[2]):
        return row[2]+'-'+str(isMonth(row[1]))+'-'+row[0]
        
    elif isMonth(row[0]) and isDay(row[1]) and isYear(row[2]):
        return row[2]+'-'+str(isMonth(row[0]))+'-'+row[1]
        
    else:
        return None
getDateUDF = udf(getDate)

def transformDateFormat(name,df,column):
    case = split(column,'\W').alias(name)
    
    result = df.withColumn(name, getDateUDF(case))
    
    return result
    
# ============================================================================================================================
# ============================================================================================================================    
# ============================================================================================================================    

df_source = {}
df = {}

# Load Dataframes
for source in sources:
    table_source = sources[source]['table_source']
    database_source = sources[source]['database_source']
    
    if table_source not in df_source:
        print(database_source,table_source)
        load = glueContext.create_dynamic_frame.from_catalog(database = database_source, table_name = table_source, transformation_ctx = "load")
        df_source[table_source] = load.toDF()
        
    df[sources[source]['source_var']] = df_source[table_source]
    
# ============================================================================================================================
# ============================================================================================================================    
# ============================================================================================================================    
    
# Load rules
for rule in rules:
    
    rule_name = rules[rule]['rule_name']
    source_var = rules[rule]['source_var']
    function_name = rules[rule]['func_name']
    arg = rules[rule]['arg']
    default = rules[rule]['default_value']
    severity = rules[rule]['severity']
    target_var = rules[rule]['target_var']
    dq_dim = rules[rule]['dq_dim']
    column = sources[source_var]['column_source']
    dataFrame = df[source_var]
    
    if function_name == 'checkRegex':
        
        df[target_var] = checkRegex(rule_name, dataFrame, column, arg)
        
    elif function_name == 'checkLink':
        
        dataFrame2 = df[arg]
        column2 = sources[arg]['column_source']
        
        df[target_var] = checkLink(rule_name,dataFrame,column,dataFrame2,column2)
        
    elif function_name == 'transformDateFormat':
        
        df[target_var] = transformDateFormat(rule_name,dataFrame,column)
   
    else: 
        break
        
    if target_var not in sources:
        sources[target_var] = sources[source_var]
        sources[target_var]['column_source'] = rule_name
            
    df[target_var] = setNewValue(severity,df[target_var],column,rule_name,default)
    df[target_var] = df[target_var].withColumn('dq_dim', lit(dq_dim))
    df[target_var] = df[target_var].withColumn('rule', lit(rule_name))
    
for x in df:
    if 'severity' in df[x].columns:
        data_source = 'SALES'
        print(x)
        dataset = sources[x]['table_source']
        key_nm = sources[x]['df_id']
        field_nm = sources[x]['column_source']
        
        
        df[x] = df[x].withColumn('processing_dttm', current_timestamp())
        df[x] = df[x].withColumn('data_source', lit(data_source))
        df[x] = df[x].withColumn('dataset', lit(dataset))
        df[x] = df[x].withColumn('data_domain', lit(dataset))
        df[x] = df[x].withColumn('key_nm', lit(key_nm))
        df[x] = df[x].withColumn('key_val', df[x][key_nm])
        df[x] = df[x].withColumn('field_nm', lit(field_nm))
        df[x] = df[x].withColumn('field_value', df[x][field_nm])
        df[x] = df[x].withColumn('dq_rule_result', df[x]['severity'])
        
        target_frame = DynamicFrame.fromDF(df[x], glueContext, "target_frame")
        
        ## @type: ApplyMapping
        ## @args: [mappings = [("processing_dttm","string","processing_dttm","string"),("data_source","string","data_source","string"),("dataset","string","dataset","string"),("data_domain","string","data_domain","string"),("key_nm","string","key_nm","string"),("key_value","bigint","key_value","bigint"),("field_nm","string","field_nm","string"),("field_val","string","field_val","string"),("dq_dim","string","dq_dim","string"),("Dq_rule_result","string","Dq_rule_result","string"),("rule","string","rule","string")]
        ## @return: target_frame_mapped
        ## @inputs: [frame = <frame>]
        target_frame_mapped = ApplyMapping.apply(frame = target_frame, mappings = [("processing_dttm","string","processing_dttm","string"),("data_source","string","data_source","string"),("dataset","string","dataset","string"),("data_domain","string","data_domain","string"),("key_nm","string","key_nm","string"),(key_nm,"bigint","key_value","bigint"),("field_nm","string","field_nm","string"),(field_nm,"string","field_val","string"),("dq_dim","string","dq_dim","string"),("Dq_rule_result","string","Dq_rule_result","string"),("rule","string","rule","string")], transformation_ctx = "target_frame_mapped")
    
        ## @type: DataSink
        ## @args: [database = "aroa-dw-pedro", table_name = "dq_target", transformation_ctx = "dataSink"]
        ## @return: dataSink
        ## @inputs: [frame = target_frame_mapped]
        dataSink = glueContext.write_dynamic_frame.from_catalog(frame = target_frame_mapped, database = "aroa-dw-pedro", table_name = "dq_target", transformation_ctx = "dataSink")

job.commit()