In [6]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("app1").getOrCreate()
spark

23/08/04 19:46:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/04 19:46:42 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [20]:
from pyspark.sql.functions import col 
def validate_nulls(df, columns_to_validate):
    for col_name in columns_to_validate:
        df = df.withColumn(col_name+"_is_valid", col(col_name).isNotNull())
    return df

def count_match_validation(raw_df, refined_df):
    raw_count = raw_df.count()
    refined_count = refined_df.count()
    count_match = raw_count == refined_count
    validation_result = "Count match" if count_match else "Counts do not match"
    return validation_result

In [8]:
def get_df():    
    data = [("Alice", 30),
            ("Bob", None),
            ("Charlie", 25)]
    columns = ["name", "age"]
    df = spark.createDataFrame(data, columns)
    return df 

df = get_df()
df.show()

                                                                                

+-------+----+
|   name| age|
+-------+----+
|  Alice|  30|
|    Bob|null|
|Charlie|  25|
+-------+----+



In [9]:
columns_to_validate = ["name","age"]
columns_to_validate

['name', 'age']

In [12]:
### Apply validation functions 
validated_df = validate_nulls(df, columns_to_validate)
validated_df.show()

+-------+----+-------------+------------+
|   name| age|name_is_valid|age_is_valid|
+-------+----+-------------+------------+
|  Alice|  30|         true|        true|
|    Bob|null|         true|       false|
|Charlie|  25|         true|        true|
+-------+----+-------------+------------+



In [13]:
Define validation rules for each column. These rules can be functions that check for specific conditions,
 such as 
data type checks, 
range checks,
null checks, 
pattern checks, etc.

SyntaxError: invalid syntax (826937797.py, line 1)

In [15]:
raw_data = [("Alice", 30), ("Bob", None), ("Charlie", 25)]
refined_data = [("Alice", 30), ("Charlie", 25)]

columns = ["name", "age"]
raw_df = spark.createDataFrame(raw_data, columns)
refined_df = spark.createDataFrame(refined_data, columns)

raw_df.show()
refined_df.show()

+-------+----+
|   name| age|
+-------+----+
|  Alice|  30|
|    Bob|null|
|Charlie|  25|
+-------+----+

+-------+---+
|   name|age|
+-------+---+
|  Alice| 30|
|Charlie| 25|
+-------+---+



In [21]:
columns_to_validate = ["name", "age"]

validated_raw_df = validate_nulls(raw_df, columns_to_validate)
validated_raw_df.show()

validated_refined_df = validate_nulls(refined_df, columns_to_validate)
validated_refined_df.show()


+-------+----+-------------+------------+
|   name| age|name_is_valid|age_is_valid|
+-------+----+-------------+------------+
|  Alice|  30|         true|        true|
|    Bob|null|         true|       false|
|Charlie|  25|         true|        true|
+-------+----+-------------+------------+

+-------+---+-------------+------------+
|   name|age|name_is_valid|age_is_valid|
+-------+---+-------------+------------+
|  Alice| 30|         true|        true|
|Charlie| 25|         true|        true|
+-------+---+-------------+------------+



In [23]:
validation_result = count_match_validation(validated_raw_df, validated_refined_df)
validation_result

'Counts do not match'

In [24]:
def compare_layers(raw_df, refined_df, key_column, columns_to_compare):
    # Join the DataFrames based on the key column
    joined_df = raw_df.join(refined_df, on=key_column)

    # Compare the columns and create a new column indicating the comparison result
    comparison_exprs = [when(col(f"{raw_df[col]}") == col(f"{refined_df[col]}"), True).otherwise(False).alias(col)
                        for col in columns_to_compare]


    compared_df = joined_df.select("*", *comparison_exprs)

    return compared_df


In [56]:
raw_data = [(1, "Alice", 30), (2, "Bob", None), (3, "Charlie", 25)]
refined_data = [(1, "Alice", 30), (2, "Bob", 28), (3, "Charlie", 25)]

raw_columns = ["id", "a_name", "a_age"]
refined_columns = ["id", "b_name", "b_age"]

In [57]:
raw_df = spark.createDataFrame(raw_data, raw_columns)
refined_df = spark.createDataFrame(refined_data, refined_columns)

 # Define the key column and columns to compare
key_column = "id"
columns_to_compare = ["name", "age"]

raw_df.show()
refined_df.show()

+---+-------+-----+
| id| a_name|a_age|
+---+-------+-----+
|  1|  Alice|   30|
|  2|    Bob| null|
|  3|Charlie|   25|
+---+-------+-----+

+---+-------+-----+
| id| b_name|b_age|
+---+-------+-----+
|  1|  Alice|   30|
|  2|    Bob|   28|
|  3|Charlie|   25|
+---+-------+-----+



In [58]:
joined_df = raw_df.join(refined_df, on=key_column)
joined_df.show()

+---+-------+-----+-------+-----+
| id| a_name|a_age| b_name|b_age|
+---+-------+-----+-------+-----+
|  1|  Alice|   30|  Alice|   30|
|  3|Charlie|   25|Charlie|   25|
|  2|    Bob| null|    Bob|   28|
+---+-------+-----+-------+-----+



In [61]:
from pyspark.sql.functions import when
comparison_exprs = [
        when(col(f"a_{col_name}") == col(f"b_{col_name}"), True).otherwise(False).alias(col_name+"_is_match")
        for col_name in columns_to_compare
]
comparison_exprs



[Column<b'CASE WHEN (a_name = b_name) THEN true ELSE false END AS `name_is_match`'>,
 Column<b'CASE WHEN (a_age = b_age) THEN true ELSE false END AS `age_is_match`'>]

In [62]:
#    compared_df = joined_df.select("*", *comparison_exprs)
joined_df.select("*", *comparison_exprs).show()

+---+-------+-----+-------+-----+-------------+------------+
| id| a_name|a_age| b_name|b_age|name_is_match|age_is_match|
+---+-------+-----+-------+-----+-------------+------------+
|  1|  Alice|   30|  Alice|   30|         true|        true|
|  3|Charlie|   25|Charlie|   25|         true|        true|
|  2|    Bob| null|    Bob|   28|         true|       false|
+---+-------+-----+-------+-----+-------------+------------+

