# Import Rules Notebook

In [1]:
%run ./Rules.ipynb

# Import Dependencies 

In [2]:
import findspark
findspark.init()
import pyspark # only run after findspark.init()
from pyspark.sql.functions import *
from pyspark.sql.functions import col, explode,coalesce,udf,pandas_udf
from pyspark.sql import SparkSession
from pyspark.sql.types import LongType,StructType,StringType,ArrayType
spark = SparkSession.builder.config('spark.jars','D:\\Programs\\Spark\\spark-2.4.3-bin-hadoop2.7\\jars\\spark-avro_2.11-2.4.5.jar').getOrCreate()
import datetime
from datetime import timedelta 
from pyspark.sql.functions import split
from pyspark.sql import functions
from pyspark.sql.window import *
import pandas as pd
import json

In [3]:
pd.set_option('display.max_colwidth',-1)

# Initialise Paths for you Input/Output Directories,Log and DQ Bible

In [4]:
DQDatasetPath = "datasets"
DQSrc = f"{DQDatasetPath}/Source"
DQSucc = f"{DQDatasetPath}/Success"
DQRej = f"{DQDatasetPath}/Rejects"
DQBible=  f"{DQDatasetPath}/Bible"
AuditLogs = f"{DQDatasetPath}/AuditLogs"
DQLogs = f"{DQDatasetPath}/DQLogs"

# Record Start Time for the Job

In [5]:
start_time = str(datetime.datetime.now())
print(start_time)

2021-05-07 23:12:35.843471


# Load Employee Data and Display


In [6]:
src_data =  spark.read.option("header",True).csv(f"{DQSrc}/employee100.csv")
src_data.toPandas()

Unnamed: 0,id,first_name,last_name,email,gender,ip_address,emp_join_date,emp_country
0,1,Fayth,Paddock,fpaddock0@nytimes.com,Female,95.152.83.222,2020/09/15,China
1,2,Ogden,Ander,oander1@w3.org,Female,79.153.96.224,2020/10/15,United States
2,3,Ludovico,Defew,ldefew2@github.io,Male,44.72.101.250,2020/05/23,Peru
3,4,Ambrosius,Menlow,amenlow3@weather.com,Male,66.138.160.23,2021/04/11,China
4,5,Inglebert,Barrasse,ibarrasse4@soup.io,Male,180.85.153.228,2020/08/03,Indonesia
...,...,...,...,...,...,...,...,...
96,97,Huntington,Sherrocks,hsherrocks2o@jalbum.net,ale,94.186.119.255,2019/12/16,Bangladesh
97,98,Randell,Brister,rbrister2p@dagondesign,Male,166.154.213.220,2020/10/15,Philippines
98,99,Hamnet,Rowbotham,hrowbotham2q@timesonline.co.uk,Female,121.121.36,2021/01/19,Ethiopia
99,100,Derrick,Lunge,dlunge2r@ovh.net,e,204.134.255,2021/01/23,Indonesia


# Read and Display DQ Bible 

In [7]:
dq_rule_bible = spark.read.option("header",True).csv(f"{DQBible}/bible101.csv")
bible_pandas = dq_rule_bible.toPandas()
display(bible_pandas)

Unnamed: 0,RULE_ID,ACTIVE,PRIORITY,DESCRIPTION,TYPE,OUTPUT_COL
0,R001,Y,1,Trims Employee IP address and Soft Fail Record in case of invalid IP else it’s a PASS.The trimmed input column is returned as modified column value in case of PASS else return Original Value,SOFT,ip_address
1,R002,Y,2,Trims Emp Join Date and Reject/Hard Fail in case date is not in YYYY/MM/DD format else it’s a pass.The trimmed input column is returned as modified column value in case of PASS else return original value,HARD,emp_join_date
2,R003,Y,3,Trims Employee Gender and Soft Fail record in case of Invalid Gender else it’s a PASS if values are (m/f/male/female).The trimmed input column is returned as modified column value in case of PASS as Male/Female else return Original Value.,SOFT,gender
3,R004,Y,4,Trims Employee Mail and Rejects/Hard Fail record in case of Invalid email else it’s a PASS.The trimmed Input Column is returned as Modifid column value in case of PASS else return Original Value,HARD,email
4,R005,Y,5,Concatenate Emp_ with Employee id,TRANS,emp_sor_id


# Filter Active DQ Rules

In [8]:
dq_rule_bible_actv = dq_rule_bible.filter(col("Active") == "Y")
dq_rule_bible_actv.toPandas()

Unnamed: 0,RULE_ID,ACTIVE,PRIORITY,DESCRIPTION,TYPE,OUTPUT_COL
0,R001,Y,1,Trims Employee IP address and Soft Fail Record in case of invalid IP else it’s a PASS.The trimmed input column is returned as modified column value in case of PASS else return Original Value,SOFT,ip_address
1,R002,Y,2,Trims Emp Join Date and Reject/Hard Fail in case date is not in YYYY/MM/DD format else it’s a pass.The trimmed input column is returned as modified column value in case of PASS else return original value,HARD,emp_join_date
2,R003,Y,3,Trims Employee Gender and Soft Fail record in case of Invalid Gender else it’s a PASS if values are (m/f/male/female).The trimmed input column is returned as modified column value in case of PASS as Male/Female else return Original Value.,SOFT,gender
3,R004,Y,4,Trims Employee Mail and Rejects/Hard Fail record in case of Invalid email else it’s a PASS.The trimmed Input Column is returned as Modifid column value in case of PASS else return Original Value,HARD,email
4,R005,Y,5,Concatenate Emp_ with Employee id,TRANS,emp_sor_id


# Collect Active Rules as List of Tuples ad sort on Priority

In [9]:
actv_rule_to_rdd = dq_rule_bible_actv.repartition(1).orderBy(asc("PRIORITY")).toJSON().collect()
actv_rule_to_list = [json.loads(jsonString) for jsonString in actv_rule_to_rdd ]
print(actv_rule_to_rdd)

['{"RULE_ID":"R001","ACTIVE":"Y","PRIORITY":"1","DESCRIPTION":"Trims Employee IP address and Soft Fail Record in case of invalid IP else it’s a PASS.The trimmed input column is returned as modified column value in case of PASS else return Original Value","TYPE":"SOFT","OUTPUT_COL":"ip_address"}', '{"RULE_ID":"R002","ACTIVE":"Y","PRIORITY":"2","DESCRIPTION":"Trims Emp Join Date and Reject/Hard Fail in case date is not in YYYY/MM/DD format else it’s a pass.The trimmed input column is returned as modified column value in case of PASS else return original value","TYPE":"HARD","OUTPUT_COL":"emp_join_date"}', '{"RULE_ID":"R003","ACTIVE":"Y","PRIORITY":"3","DESCRIPTION":"Trims Employee Gender and Soft Fail record in case of Invalid Gender else it’s a PASS if values are (m/f/male/female).The trimmed input column is returned as modified column value in case of PASS as Male/Female else return Original Value.","TYPE":"SOFT","OUTPUT_COL":"gender"}', '{"RULE_ID":"R004","ACTIVE":"Y","PRIORITY":"4","

# Create Target Dataframe after applying Data Quality Rules & display Data Quality Rules and Display Resultant DF

In [10]:
resultantDF = src_data
# Iterate list of Dictionaries to invoke all rules 1 by 1
for ruleRec in actv_rule_to_list:
    # Store RuleID,OutputColumnName and RuleType
    ruleID,output_col_name,rule_categ = (ruleRec['RULE_ID'],ruleRec['OUTPUT_COL'],ruleRec['TYPE'])
    if rule_categ == "TRANS":
        # If Rule is a transformation,invoke the rule by passing only RuleID,Category and retrieve Output Column Name
        resultantDF =  resultantDF.withColumn(output_col_name,RuleEngine.udf_caller(ruleID,rule_categ))
    else:
        # Else if rule is SOFT/FAIL rule invoke the UDF by only passing RuleID,Category and retrieving array column which has output field name and rule execution status
        resultantDF = resultantDF.withColumn("RuleExecStatus",RuleEngine.udf_caller(ruleID,rule_categ))
        # From array column retrieve output column value and status of the executed rule
        resultantDF = resultantDF.withColumn(ruleID+"_RFLAG",resultantDF['RuleExecStatus'].getItem(1)).withColumn(output_col_name,resultantDF['RuleExecStatus'].getItem(0))
        

In [11]:
resultantDF.toPandas()

Unnamed: 0,id,first_name,last_name,email,gender,ip_address,emp_join_date,emp_country,RuleExecStatus,R001_RFLAG,R002_RFLAG,R003_RFLAG,R004_RFLAG,emp_sor_id
0,1,Fayth,Paddock,fpaddock0@nytimes.com,Female,95.152.83.222,2020/09/15,China,"[fpaddock0@nytimes.com, ]",,,,,EMP_1
1,2,Ogden,Ander,oander1@w3.org,Female,79.153.96.224,2020/10/15,United States,"[oander1@w3.org, ]",,,,,EMP_2
2,3,Ludovico,Defew,ldefew2@github.io,Male,44.72.101.250,2020/05/23,Peru,"[ldefew2@github.io, ]",,,,,EMP_3
3,4,Ambrosius,Menlow,amenlow3@weather.com,Male,66.138.160.23,2021/04/11,China,"[amenlow3@weather.com, ]",,,,,EMP_4
4,5,Inglebert,Barrasse,ibarrasse4@soup.io,Male,180.85.153.228,2020/08/03,Indonesia,"[ibarrasse4@soup.io, ]",,,,,EMP_5
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
96,97,Huntington,Sherrocks,hsherrocks2o@jalbum.net,ale,94.186.119.255,2019/12/16,Bangladesh,"[hsherrocks2o@jalbum.net, ]",,,SOFT_R003,,EMP_97
97,98,Randell,Brister,rbrister2p@dagondesign,Male,166.154.213.220,2020/10/15,Philippines,"[rbrister2p@dagondesign, HARD_R004]",,,,HARD_R004,EMP_98
98,99,Hamnet,Rowbotham,hrowbotham2q@timesonline.co.uk,Female,121.121.36,2021/01/19,Ethiopia,"[hrowbotham2q@timesonline.co.uk, ]",,,,,EMP_99
99,100,Derrick,Lunge,dlunge2r@ovh.net,e,204.134.255,2021/01/23,Indonesia,"[dlunge2r@ovh.net, ]",,,SOFT_R003,,EMP_100


# Find all Flag Columns from the list containing dictionary of active rules and create a list from the same

In [12]:
srccols = src_data.columns
RFLAG_ARR = list(
    map(lambda rulerec : json.loads(rulerec)['RULE_ID']+"_RFLAG",
        list(filter(lambda rulerec : (json.loads(rulerec)['TYPE'] != "TRANS"),actv_rule_to_rdd)))
)
    
print(RFLAG_ARR)
# Concat All Flag Columns & check if it contains the string HARD to find all reject records
rulesinglecol = resultantDF.withColumn("ruleflag",concat_ws(",",*RFLAG_ARR))

['R001_RFLAG', 'R002_RFLAG', 'R003_RFLAG', 'R004_RFLAG']


# Find Error Rejects

In [13]:
error_reject_records = rulesinglecol.filter(col("ruleflag").contains("HARD")).drop("ruleflag","RuleExecStatus")
error_reject_records.toPandas()

Unnamed: 0,id,first_name,last_name,email,gender,ip_address,emp_join_date,emp_country,R001_RFLAG,R002_RFLAG,R003_RFLAG,R004_RFLAG,emp_sor_id
0,51,Gilburt,Pinnick,gpinnick1e.google.co.uk,Kill,253.186.232.9000,2020/51/08,Honduras,SOFT_R001,HARD_R002,SOFT_R003,HARD_R004,EMP_51
1,52,Jobyna,Langmaid,jlangmaid1f,End,91.5000.172.208,2020/02/51,Ethiopia,SOFT_R001,HARD_R002,SOFT_R003,HARD_R004,EMP_52
2,53,Earle,Ulyet,eulyet1g@slideshare.net,Fem,9999.255.188.107,2021/12/33,Japan,SOFT_R001,HARD_R002,SOFT_R003,,EMP_53
3,54,Sunny,Lindsey,slindsey1h.com.cn,HU,9000.34.248.154,2020/15/29,Portugal,SOFT_R001,HARD_R002,SOFT_R003,HARD_R004,EMP_54
4,55,Bobbi,Olliver,bolliver1i@java.com,le,98.150.137.280,2021/33/28,Russia,SOFT_R001,HARD_R002,SOFT_R003,,EMP_55
5,56,Petronille,Idell,pidell1jshareasale.com,ale,88.252.165.159,2021/01/16,China,,,SOFT_R003,HARD_R004,EMP_56
6,57,Domeniga,Broose,dbroose1k@people.com.cn,el,94.190.11.225,2021/33/22,China,,HARD_R002,SOFT_R003,,EMP_57
7,59,Elke,Schollar,ugedomains.com,Female,99999.183.233.178,2020/06/16,Russia,SOFT_R001,,,HARD_R004,EMP_59
8,60,Briano,Copper,bcopper1nbravesites.com,Male,45.213.51.102,2020/87/05,China,,HARD_R002,,HARD_R004,EMP_60
9,61,Lesly,Iltchev,liltchev1o@linkedin.com,H,137.212.213.165,2021/71/28,Swaziland,,HARD_R002,SOFT_R003,,EMP_61


In [14]:
error_reject_records.write.option("header",True).format("csv").mode("overwrite").save(DQRej)

# Find Cleansed/Target Records

In [15]:
# Find all cleansed/target records by concatenating all FLAG columns and check if it doesnt contain String HARD
passed_records = rulesinglecol.filter(col("ruleflag").contains("HARD") ==  False).drop("ruleflag","RuleExecStatus")
passed_records.toPandas()

Unnamed: 0,id,first_name,last_name,email,gender,ip_address,emp_join_date,emp_country,R001_RFLAG,R002_RFLAG,R003_RFLAG,R004_RFLAG,emp_sor_id
0,1,Fayth,Paddock,fpaddock0@nytimes.com,Female,95.152.83.222,2020/09/15,China,,,,,EMP_1
1,2,Ogden,Ander,oander1@w3.org,Female,79.153.96.224,2020/10/15,United States,,,,,EMP_2
2,3,Ludovico,Defew,ldefew2@github.io,Male,44.72.101.250,2020/05/23,Peru,,,,,EMP_3
3,4,Ambrosius,Menlow,amenlow3@weather.com,Male,66.138.160.23,2021/04/11,China,,,,,EMP_4
4,5,Inglebert,Barrasse,ibarrasse4@soup.io,Male,180.85.153.228,2020/08/03,Indonesia,,,,,EMP_5
...,...,...,...,...,...,...,...,...,...,...,...,...,...
61,89,Catina,Butte,cbutte2g@goo.gl,Mle,191.67.160.27,2020/05/20,Portugal,,,SOFT_R003,,EMP_89
62,97,Huntington,Sherrocks,hsherrocks2o@jalbum.net,ale,94.186.119.255,2019/12/16,Bangladesh,,,SOFT_R003,,EMP_97
63,99,Hamnet,Rowbotham,hrowbotham2q@timesonline.co.uk,Female,121.121.36,2021/01/19,Ethiopia,,,,,EMP_99
64,100,Derrick,Lunge,dlunge2r@ovh.net,e,204.134.255,2021/01/23,Indonesia,,,SOFT_R003,,EMP_100


In [16]:
passed_records.write.option("header",True).format("csv").mode("overwrite").save(DQSucc)

# Compute Audit Logs

In [17]:
# Src Counts
src_count = src_data.count()
# Target Record Counts
target_count = passed_records.count()
# Reject Records Count(Records that have failed atleast one Hard Rule and 0 or more soft rules)
reject_fail_count = error_reject_records.count()
# Soft Fail Record Counts(Records that have failed atleast one Soft Rule but no Hard Rule)
soft_fail_only_count = rulesinglecol.filter((col("ruleflag").contains("HARD") == False) & (col("ruleflag").contains("SOFT") == True)).count()
# Hard Fail Record Counts(Records that have failed atleast one Hard Rule but no Soft Rule)
hard_fail_only_count = rulesinglecol.filter((col("ruleflag").contains("HARD") == True) & (col("ruleflag").contains("SOFT") == False)).count()
# Total Fail Record Counts
total_fail_count = reject_fail_count + soft_fail_only_count
# End Time for Job
end_time = str(datetime.datetime.now())
# Create DQ Dictionary for DQ Log metadata and convert it to Dataframe
audit_log_dic = {"SourceRecCount" : src_count,"TargetRecCount" : target_count,"RejectFailRecCount" : reject_fail_count,"SoftFailOnlyRecCount" : soft_fail_only_count,"HardFailOnlyRecCount" : hard_fail_only_count,"TotalFailRecCount" : total_fail_count,"StartTime" : start_time,"EndTime" :  end_time}


In [18]:
audit_log_df = spark.createDataFrame([audit_log_dic]).select("SourceRecCount","TargetRecCount","RejectFailRecCount","SoftFailOnlyRecCount","HardFailOnlyRecCount","TotalFailRecCount","StartTime","EndTime")



In [19]:
audit_log_df.write.format("avro").mode("append").save(AuditLogs)

# Compute DQ Logs


In [20]:
dqlog = rulesinglecol.select("id",explode(split("ruleflag",",")).alias("CATEGORY_RULEID")).filter(trim(col("CATEGORY_RULEID")) != "").withColumn("RULEONLY",split("CATEGORY_RULEID","_").getItem(1))
dqlog_with_det = dqlog.join(dq_rule_bible,dqlog.RULEONLY == dq_rule_bible.RULE_ID).select("id","CATEGORY_RULEID","PRIORITY","DESCRIPTION","OUTPUT_COL")
dqlog_pandas = dqlog_with_det.toPandas()
dqlog_pandas

Unnamed: 0,id,CATEGORY_RULEID,PRIORITY,DESCRIPTION,OUTPUT_COL
0,51,SOFT_R001,1,Trims Employee IP address and Soft Fail Record in case of invalid IP else it’s a PASS.The trimmed input column is returned as modified column value in case of PASS else return Original Value,ip_address
1,51,HARD_R002,2,Trims Emp Join Date and Reject/Hard Fail in case date is not in YYYY/MM/DD format else it’s a pass.The trimmed input column is returned as modified column value in case of PASS else return original value,emp_join_date
2,51,SOFT_R003,3,Trims Employee Gender and Soft Fail record in case of Invalid Gender else it’s a PASS if values are (m/f/male/female).The trimmed input column is returned as modified column value in case of PASS as Male/Female else return Original Value.,gender
3,51,HARD_R004,4,Trims Employee Mail and Rejects/Hard Fail record in case of Invalid email else it’s a PASS.The trimmed Input Column is returned as Modifid column value in case of PASS else return Original Value,email
4,52,SOFT_R001,1,Trims Employee IP address and Soft Fail Record in case of invalid IP else it’s a PASS.The trimmed input column is returned as modified column value in case of PASS else return Original Value,ip_address
...,...,...,...,...,...
79,96,HARD_R002,2,Trims Emp Join Date and Reject/Hard Fail in case date is not in YYYY/MM/DD format else it’s a pass.The trimmed input column is returned as modified column value in case of PASS else return original value,emp_join_date
80,96,HARD_R004,4,Trims Employee Mail and Rejects/Hard Fail record in case of Invalid email else it’s a PASS.The trimmed Input Column is returned as Modifid column value in case of PASS else return Original Value,email
81,97,SOFT_R003,3,Trims Employee Gender and Soft Fail record in case of Invalid Gender else it’s a PASS if values are (m/f/male/female).The trimmed input column is returned as modified column value in case of PASS as Male/Female else return Original Value.,gender
82,98,HARD_R004,4,Trims Employee Mail and Rejects/Hard Fail record in case of Invalid email else it’s a PASS.The trimmed Input Column is returned as Modifid column value in case of PASS else return Original Value,email


In [21]:
dqlog_with_det.write.option("header",True).format("csv").mode("overwrite").save(DQLogs)