#Imports

In [0]:
# imports
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number,lit, date_format,DataFrame, current_timestamp
from functools import reduce

#Mount point

In [0]:
# Unmounting if mount point exists
dbutils.fs.unmount('/mnt/dataquaility-data')

/mnt/dataquaility-data has been unmounted.
Out[135]: True

In [0]:
# Checking mount points available
dbutils.fs.mounts()

Out[136]: [MountInfo(mountPoint='/databricks-datasets', source='databricks-datasets', encryptionType='sse-s3'),
 MountInfo(mountPoint='/databricks/mlflow-tracking', source='databricks/mlflow-tracking', encryptionType='sse-s3'),
 MountInfo(mountPoint='/databricks-results', source='databricks-results', encryptionType='sse-s3'),
 MountInfo(mountPoint='/databricks/mlflow-registry', source='databricks/mlflow-registry', encryptionType='sse-s3'),
 MountInfo(mountPoint='/mnt/global', source='wasbs://global@blob0411new.blob.core.windows.net', encryptionType=''),
 MountInfo(mountPoint='/', source='DatabricksRoot', encryptionType='sse-s3')]

In [0]:
# Creating a mount point to a blob storage container in Azure Storage account 
dbutils.fs.mount(
  source = "wasbs://dataquaility-data@dbprojectblob0809.blob.core.windows.net",
  mount_point = "/mnt/dataquaility-data",
  extra_configs = {"fs.azure.account.key.dbprojectblob0809.blob.core.windows.net":"/VG83esTrKDB6HUns0kZuHDfPkZnoRVT42DZz2ajcpI3MSWIrnq20zOSm48Xvat9K/zQjDeDwf+t+ASte9cNRQ=="}) # Blob storage Accesskey

Out[137]: True

#Notebook Paramaters

In [0]:
# Creating Notebook Parameters to automate and easily process on a daily basis
dbutils.widgets.text("config_filepath","")
dbutils.widgets.text("processed_date","")

In [0]:
# Storing the notebook parameters received in variables
config_filepath = dbutils.widgets.get("config_filepath")
processed_date = dbutils.widgets.get("processed_date")

In [0]:
print(config_filepath)
print(processed_date)

/mnt/dataquaility-data/india/config/orders.json
2022/07/04


# Config JSON document

In [0]:
# Extracting the table name from config_filepath string variable
tablename = config_filepath.split("/")[-1].split(".")[-2]
print(tablename)

orders


In [0]:
# Reading the Json file
# converting the json file to a dictionary.
df = spark.read.format("json").option("multiLine", True).load(config_filepath)
parameters = df.rdd.map(lambda x: x.asDict()).first()
print(parameters)

{'auditfile': '/mnt/dataquaility-data/india/auditdata/orders/', 'cols_datatype': [Row(orderdate='date', orderprice='int')], 'dateformatchecks': ['orderdate'], 'duplicate_check': 'yes', 'no_negative_value': ['orderprice'], 'null_check': ['ordername', 'email'], 'pendingfile': '/mnt/dataquaility-data/india/rejected/orders/', 'required_cols': ['ordername', 'email', 'orderaddress', 'orderdate', 'orderprice'], 'sourcefile': '/mnt/dataquaility-data/india/bronze/orders/', 'targetfile': '/mnt/dataquaility-data/india/silver/orders/'}


In [0]:
# Creating variables for each parameter in the dictionary
source_filelocation = parameters['sourcefile']
target_filelocation = parameters['targetfile']
pending_filelocation = parameters['pendingfile']
audit_filelocation = parameters['auditfile']
duplicatedcheck = parameters['duplicate_check']
selectcols = parameters['required_cols']
nullcheck = parameters['null_check']
castcols = parameters['cols_datatype'][0].asDict()
dateformatcheck = parameters['dateformatchecks']
negativecheck = parameters['no_negative_value']

In [0]:
# A function which returns the file location path with processed_date
def append_date(url, pdate):
    return url+pdate

In [0]:
# Complete path
source_path = append_date(source_filelocation,processed_date)
target_path = append_date(target_filelocation,processed_date)
pending_path = append_date(pending_filelocation,processed_date)
audit_path = append_date(audit_filelocation,processed_date)

In [0]:
source_path

Out[146]: '/mnt/dataquaility-data/india/bronze/orders/2022/07/04'

# Loading data from source

In [0]:
# Reading orders.csv source file
# Storing the number of records present in a variable
df_csv = spark.read.format("csv").option("header", True).load(source_path)
df_csv.show()
sourcecount = df_csv.count()

+-------+---------+-----------+----------+---------------+------------+
|orderid|ordername|  orderdate|orderprice|          email|orderaddress|
+-------+---------+-----------+----------+---------------+------------+
|      1|   Laptop| 2020-10-10|        10|  abc@gmail.com|   hyderabad|
|      2|     null| 2020-10-11|        20|  xyz@gmail.com|   bangalore|
|      2|     Book| 2022-10-10|        30|  xxx@gmail.com|        pune|
|      3|       TV| 2021-09-10|       -40|  yyy@gmail.com|       delhi|
|      5|    watch| 2020-08-10|        50|  zzz@gmail.com|      mumbai|
|      1|   Laptop| 2020-10-10|        10|  abc@gmail.com|   hyderabad|
|      6|   mobile|02-01-20233|        40|abbbb@gmail.com|       noida|
+-------+---------+-----------+----------+---------------+------------+



# Duplicate check

In [0]:
# duplicate records check
cols = df_csv.columns
windowspec = Window.partitionBy(*cols).orderBy(cols[0])
df_csv1 = df_csv.select('*', row_number().over(windowspec).alias("row_number"))
df_csv1.show()

+-------+---------+-----------+----------+---------------+------------+----------+
|orderid|ordername|  orderdate|orderprice|          email|orderaddress|row_number|
+-------+---------+-----------+----------+---------------+------------+----------+
|      1|   Laptop| 2020-10-10|        10|  abc@gmail.com|   hyderabad|         1|
|      1|   Laptop| 2020-10-10|        10|  abc@gmail.com|   hyderabad|         2|
|      2|     null| 2020-10-11|        20|  xyz@gmail.com|   bangalore|         1|
|      2|     Book| 2022-10-10|        30|  xxx@gmail.com|        pune|         1|
|      3|       TV| 2021-09-10|       -40|  yyy@gmail.com|       delhi|         1|
|      5|    watch| 2020-08-10|        50|  zzz@gmail.com|      mumbai|         1|
|      6|   mobile|02-01-20233|        40|abbbb@gmail.com|       noida|         1|
+-------+---------+-----------+----------+---------------+------------+----------+



In [0]:
# Empty list to store the rejected records
pending_records_list = []

In [0]:
# filtering for duplicate reacords
df_duplicates = df_csv1.filter(col("row_number") > 1).drop("row_number").withColumn("reject_reason", lit("Duplicate record"))
df_duplicates.show()
pending_records_list.append(df_duplicates)

+-------+---------+----------+----------+-------------+------------+----------------+
|orderid|ordername| orderdate|orderprice|        email|orderaddress|   reject_reason|
+-------+---------+----------+----------+-------------+------------+----------------+
|      1|   Laptop|2020-10-10|        10|abc@gmail.com|   hyderabad|Duplicate record|
+-------+---------+----------+----------+-------------+------------+----------------+



In [0]:
# Filter for only unique records
df_unique = df_csv1.filter(col("row_number") == 1).drop("row_number")
df_unique.count()

Out[151]: 6

# NULL check

In [0]:
# null check on columns
if len(nullcheck)!= 0:
    for column in nullcheck:
        df_null = df_unique.filter(col(column).isNull()).withColumn("reject_reason",lit("Null Record"))
        pending_records_list.append(df_null)

# Negative Check

In [0]:
# negative check on columns
if len(negativecheck)!= 0:
    for column in negativecheck:
        df_negative = df_unique.filter(col(column) < 0).withColumn("reject_reason", lit("Negative Value"))
        pending_records_list.append(df_negative)

#Date Format check

In [0]:
# date format check
if len(dateformatcheck) != 0:
    for column in dateformatcheck:
        df_datefmt = df_unique.select("*",date_format(column, 'yyyy-MM-dd').alias("orderdate1"))\
            .filter(col("orderdate1").isNull()).drop('orderdate1')\
            .withColumn('reject_reason',lit("Incorrect Dateformat"))
        pending_records_list.append(df_datefmt)

# Rejected records

In [0]:
# converting pending_records_list to a Dataframe
df_reject = reduce(DataFrame.union,pending_records_list)
reject_count = df_reject.count()
display(df_reject)

orderid,ordername,orderdate,orderprice,email,orderaddress,reject_reason
1,Laptop,2020-10-10,10,abc@gmail.com,hyderabad,Duplicate record
2,,2020-10-11,20,xyz@gmail.com,bangalore,Null Record
3,TV,2021-09-10,-40,yyy@gmail.com,delhi,Negative Value
6,mobile,02-01-20233,40,abbbb@gmail.com,noida,Incorrect Dateformat


# Good Records

In [0]:
# Good records
df_final = df_csv.exceptAll(df_reject.drop("reject_reason"))
df_final.show()

+-------+---------+----------+----------+-------------+------------+
|orderid|ordername| orderdate|orderprice|        email|orderaddress|
+-------+---------+----------+----------+-------------+------------+
|      1|   Laptop|2020-10-10|        10|abc@gmail.com|   hyderabad|
|      5|    watch|2020-08-10|        50|zzz@gmail.com|      mumbai|
|      2|     Book|2022-10-10|        30|xxx@gmail.com|        pune|
+-------+---------+----------+----------+-------------+------------+



#Datatype Check

In [0]:
castcols.items()

Out[157]: dict_items([('orderdate', 'date'), ('orderprice', 'int')])

In [0]:
# Datatype conversion
if len(castcols) > 0:
    for key, value in castcols.items():
        df_final = df_final.withColumn(key,col(key).cast(v))

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-1607880844680047>:4[0m
[1;32m      2[0m [38;5;28;01mif[39;00m [38;5;28mlen[39m(castcols) [38;5;241m>[39m [38;5;241m0[39m:
[1;32m      3[0m     [38;5;28;01mfor[39;00m key, value [38;5;129;01min[39;00m castcols[38;5;241m.[39mitems():
[0;32m----> 4[0m         df_final [38;5;241m=[39m df_final[38;5;241m.[39mwithColumn([43mk[49m,col(k)[38;5;241m.[39mcast(v))

[0;31mNameError[0m: name 'k' is not defined

# Final Dataframe

In [0]:
df_final.show()

+-------+---------+----------+----------+-------------+------------+
|orderid|ordername| orderdate|orderprice|        email|orderaddress|
+-------+---------+----------+----------+-------------+------------+
|      1|   Laptop|2020-10-10|        10|abc@gmail.com|   hyderabad|
|      5|    watch|2020-08-10|        50|zzz@gmail.com|      mumbai|
|      2|     Book|2022-10-10|        30|xxx@gmail.com|        pune|
+-------+---------+----------+----------+-------------+------------+



In [0]:
writtencount = df_final.count()

#Column Names check

In [0]:
# Selecting only columns required
df_final = df_final.select(selectcols)

#Data integrity Check

In [0]:
# making sure to not miss any data
if sourcecount == (writtencount + reject_count):
    print("Valid")
else:
    print("Data missing")

Valid


In [0]:
target_path

Out[114]: '/mnt/dataquaility-data/india/silver/orders/2022/07/04'

#Writing good records to silver layer folder

In [0]:
# Writing good records
df_final.write.mode("overwrite").format("parquet").save(target_path)

In [0]:
pending_path

Out[116]: '/mnt/dataquaility-data/india/rejected/orders/2022/07/04'

#Writing rejected records to rejected folder

In [0]:
# Writing rejected records
df_reject.write.mode("overwrite").format("csv").option("header", True).save(pending_path)

# Audit table creation

In [0]:
# manually creating an audit table
df_audit = spark.createDataFrame([(tablename,sourcecount,reject_count,writtencount)],["tablename", "sourcecount", "rejectcount", "writtencount"])\
    .withColumn("loadtimestamp",current_timestamp())

df_audit.show()

+---------+-----------+-----------+------------+--------------------+
|tablename|sourcecount|rejectcount|writtencount|       loadtimestamp|
+---------+-----------+-----------+------------+--------------------+
|   orders|          7|          4|           3|2023-08-09 20:30:...|
+---------+-----------+-----------+------------+--------------------+



In [0]:
audit_path

Out[132]: '/mnt/dataquaility-data/india/auditdata/orders/2022/07/04'

#Writing to audit folder

In [0]:
# writting the audit DF to the audit_path
df_audit.write.mode("overwrite").format("csv").option("header",True).save(audit_path)