In [1]:
%%writefile ../scripts/pyspark_data_validation_framework.py

from pyspark.sql import Window
from pyspark.sql import DataFrame
from pyspark.sql.functions import *
from datetime import datetime
import os
import sys

class data_frame_data_validation:

    def __init__(self, df1, df2, primary_keys, list_of_columns=None, num_records_to_validate=5):
        if(isinstance(df1, DataFrame) and isinstance(df1, DataFrame)):
            self.df1 = df1
            self.df2 = df2
            self.intersecting_columns = list(set(df1.columns).intersection(df2.columns))
            if(len(self.intersecting_columns) == 0):
                print("Provided Data Frame doesn't have same schema. Exiting..")
                sys.exit(1)
            self.num_records_to_validate = list(range(1,num_records_to_validate+1))
            if(isinstance(primary_keys, str)):
                self.primary_keys = list(map(lambda x: x.strip(), primary_keys.split(",")))
            else:
                self.primary_keys = primary_keys
            if(list_of_columns != None):
                if(isinstance(list_of_columns, str)):
                    self.list_of_columns = list(map(lambda x: x.strip(), list_of_columns.split(",")))
                else:
                    self.list_of_columns = list_of_columns
            else:
                self.list_of_columns = df1.columns
        else:
            print("Parameter type violation. Exiting")
            sys.exit(1)

    def create_window(self):
        window = Window.orderBy(self.primary_keys)
        return window

    def data_preparation(self, df, no_of_rows_to_validate):
        w = self.create_window()
        output = df.withColumn("row_num", row_number().over(w)).filter(col("row_num").isin(no_of_rows_to_validate)).orderBy(col("row_num")).collect()
        return list(map(lambda x: x.asDict(), output))

    def do_validation(self, output_file_path=None):
        source_list_dict = self.data_preparation(self.df1, self.num_records_to_validate)
        dest_list_dict = self.data_preparation(self.df2, self.num_records_to_validate)
        a = []
        b = []
        r = []
        for i in range(len(self.num_records_to_validate)):
            A = []
            B = []
            R = []
            for column in self.list_of_columns:
                df1_value = str(source_list_dict[i][column])
                df2_value = str(dest_list_dict[i][column])
                A.append(df1_value)
                B.append(df2_value)
                R.append(str(df1_value == df2_value))
            a.append("~".join(A))
            b.append("~".join(B))
            r.append("~".join(R))
        c = "~".join(self.list_of_columns)
        a = "\n".join(a)
        b = "\n".join(b)
        r = "\n".join(r)

        print("SOURCE DATA\n")
        print(c)
        print(a)
        print("\nDESTINATION DATA\n")
        print(c)
        print(b)
        print("\nRESULT\n")
        print(c)
        print(r)
        
        if(output_file_path != None):
            current_timestamp = datetime.now().strftime("%Y-%m-%d_%H_%M_%S")
            output_file = os.path.join(output_file_path, "data_validation_output_{}.csv".format(current_timestamp))
            with open(output_file, 'w') as write_output:
                write_output.write("SOURCE DATA\n")
                write_output.writelines(c+"\n")
                write_output.writelines(a+"\n")
                write_output.write("\nDESTINATION DATA\n")
                write_output.writelines(c+"\n")
                write_output.writelines(b+"\n")
                write_output.write("\nRESULT\n")
                write_output.writelines(c+"\n")
                write_output.writelines(r+"\n")

Overwriting ../scripts/pyspark_data_validation_framework.py


In [2]:
# Creating Spark Session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PySpark - Data Validation").getOrCreate()

In [3]:
import os
source_file_path = os.path.join("../inputs", "employee_source.csv")
destination_file_path = os.path.join("../inputs", "employee_destination.csv")
header = "true"
infer_schema = "true"

# Creating Two DataFrames which has same schema

source_df = spark.read.option("infer", infer_schema).option("header", "true").csv(source_file_path)
destination_df = spark.read.option("infer", infer_schema).option("header", "true").csv(destination_file_path)

In [4]:
# Scehma of DataFrame 1
source_df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Department: string (nullable = true)
 |-- Manager: string (nullable = true)
 |-- Salary: string (nullable = true)



In [5]:
# Scehma of DataFrame 2
destination_df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Department: string (nullable = true)
 |-- Manager: string (nullable = true)
 |-- Salary: string (nullable = true)



In [6]:
# Sample Data in DataFrame 1
source_df.show(truncate=False)

+-------------+----------+-----------+------+
|Name         |Department|Manager    |Salary|
+-------------+----------+-----------+------+
|Robin Hood   |Bar       |null       |200   |
|Arsene Wenger|Bar       |Friar Tuck |50    |
|Friar Tuck   |Foo       |Robin Hood |100   |
|Little John  |Foo       |Robin Hood |100   |
|Sam Allardyce|Bar       |null       |250   |
|Dimi Berbatov|Foo       |Little John|50    |
+-------------+----------+-----------+------+



In [7]:
# Scehma of DataFrame 2
destination_df.show(truncate=False)

+-------------+----------+-----------+------+
|Name         |Department|Manager    |Salary|
+-------------+----------+-----------+------+
|Robin Hood   |null      |null       |200   |
|Arsene Wenger|Bar       |Friar Tuck |50    |
|Friar Tuck   |Foo       |Robin Hood |100   |
|Little John  |Foo       |Robin Hood |100   |
|Sam Allardyce|null      |null       |250   |
|Dimi Berbatov|Foo       |Little John|50    |
+-------------+----------+-----------+------+



In [8]:
# importing data validation module

import sys

sys.path.append("../scripts")

from pyspark_data_validation_framework import data_frame_data_validation

In [9]:
# Creating DataFrame Data Validation Object
# For the class the first and second argument are source and destination DataFrames respectively.
# Third Argument is the primary_keys which are mandatory inorder to select records based on primary keys.
# Fourth Argument is the num_records_to_validate
data_validation = data_frame_data_validation(source_df, destination_df, primary_keys=["Name"], num_records_to_validate=6)

In [10]:
# Calling Data Validation Method. This will print the output in the console
# If output_file_path argument is not provided the output will be printed only in console and it will not create output file
data_validation.do_validation("../outputs")

SOURCE DATA

Name~Department~Manager~Salary
Arsene Wenger~Bar~Friar Tuck~50
Dimi Berbatov~Foo~Little John~50
Friar Tuck~Foo~Robin Hood~100
Little John~Foo~Robin Hood~100
Robin Hood~Bar~None~200
Sam Allardyce~Bar~None~250

DESTINATION DATA

Name~Department~Manager~Salary
Arsene Wenger~Bar~Friar Tuck~50
Dimi Berbatov~Foo~Little John~50
Friar Tuck~Foo~Robin Hood~100
Little John~Foo~Robin Hood~100
Robin Hood~None~None~200
Sam Allardyce~None~None~250

RESULT

Name~Department~Manager~Salary
True~True~True~True
True~True~True~True
True~True~True~True
True~True~True~True
True~False~True~True
True~False~True~True


In [11]:
# Creating DataFrame with different schema to test for the failure case
column_name = "timestamp,userid"
values = "2018-01-01T11:00:00Z u1, 2018-01-01T12:00:00Z u1, 2018-01-01T11:00:00Z u2, 2018-01-02T11:00:00Z u2, 2018-01-01T12:15:00Z u1,  2018-01-01T12:30:00Z u1,  2018-01-01T12:45:00Z u1,  2018-01-01T13:14:00Z u1,  2018-01-01T13:35:00Z u1, 2018-01-01T13:55:00Z u1, 2018-01-01T14:20:00Z u1, 2018-01-01T14:45:00Z u1, 2018-01-01T15:05:00Z u1"
list_row = list(map(lambda x: tuple(x.strip().split(" ")),values.split(",")))
df = spark.sparkContext.parallelize(list_row).toDF(column_name.split(","))

In [12]:
# Comparing Two Data Frames of different schema will fails the applications
data_validation1 = data_frame_data_validation(df, destination_df, primary_keys="Name")

Provided Data Frame doesn't have same schema. Exiting..


SystemExit: 1

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


In [None]:
# Failure case for empty primary_keys
data_validation1 = data_frame_data_validation(source_df, destination_df)