## Cleansing

## Cleansing rules to be followed:

##### Drops rows with any null values.
##### Ensures columns are of the correct data type based on the schema.
##### Removes duplicate rows.
##### Trims whitespace and removes special characters from text columns.
##### Handles outliers using the IQR method.
##### Ensures data consistency and checks for non-null and non-NaN values.</h5>

<h4> Here we are creating Dynamic function for cleansing the All the customers,Branch, Transaction Data </h4>

##CLEANSING

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    LongType,
    DoubleType,
    TimestampType,
)
from pyspark.sql.functions import (
    col,
    trim,
    regexp_replace,
    to_timestamp,
    isnan,
    date_format,
)


# transactions schema
transaction_schema = StructType(
    [
        StructField("transaction_id", StringType(), False),
        StructField("customer_id", StringType(), False),
        StructField("branch_id", StringType(), False),
        StructField("channel", StringType(), False),
        StructField("transaction_type", StringType(), False),
        StructField("amount", DoubleType(), False),
        StructField("currency", StringType(), False),
        StructField("timestamp", TimestampType(), False),
        StructField("status", StringType(), False),
    ]
)

# customer schema
customer_schema = StructType(
    [
        StructField("customer_id", StringType(), False),
        StructField("name", StringType(), False),
        StructField("email", StringType(), True),
        StructField("phone", StringType(), True),
        StructField("address", StringType(), True),
        StructField("credit_score", LongType(), True),
        StructField("join_date", TimestampType(), True),
        StructField("last_update", TimestampType(), True),
    ]
)

# branch schema
branch_schema = StructType(
    [
        StructField("branch_id", StringType(), False),
        StructField("name", StringType(), False),  # Changed from branch_name to name
        StructField("location", StringType(), True),
        StructField("timezone", StringType(), True),
    ]
)

# cleanse functions
def handle_missing_values(df):
    return df.na.drop()


def convert_data_types(df, schema):
    for field in schema:
        if field.dataType == LongType():
            df = df.withColumn(field.name, col(field.name).cast(LongType()))
        elif field.dataType == StringType():
            df = df.withColumn(field.name, col(field.name).cast(StringType()))
        elif field.dataType == TimestampType():
            df = df.withColumn(field.name, to_timestamp(col(field.name)))
    return df


def format_timestamps(df, timestamp_columns):
    for column, fmt in timestamp_columns.items():
        if column in df.columns:
            df = df.withColumn(column, date_format(col(column), fmt))
    return df


def remove_duplicates(df):
    return df.dropDuplicates()


def clean_text_data(df, columns, exclude_columns=[]):
    for column in columns:
        if column in df.columns and column not in exclude_columns:
            df = df.withColumn(column, trim(col(column)))
            df = df.withColumn(
                column, regexp_replace(col(column), "[^a-zA-Z0-9\s]", "")
            )
    return df


def handle_outliers(df, columns):
    for column in columns:
        if column in df.columns:
            # using static threshold
            threshold = 1000000
            df = df.filter((col(column) >= -threshold) & (col(column) <= threshold))
    return df


def data_quality_checks(df, columns):
    for column in columns:
        if column in df.columns:
            df = df.filter(col(column).isNotNull())
            df = df.filter(~isnan(col(column)))
    return df


# Applying to transactions
def cleanse_transaction(df):
    df = handle_missing_values(df)
    df = convert_data_types(df, transaction_schema)
    df = format_timestamps(df, {"timestamp": "yyyy-MM-dd HH:mm:ss"})
    df = remove_duplicates(df)
    df = clean_text_data(df, ["channel", "transaction_type", "currency", "status"])
    df = handle_outliers(df, ["amount"])
    df = data_quality_checks(df, ["transaction_id", "amount"])
    return df


# Applying to customers
def cleanse_customer(df):
    df = handle_missing_values(df)
    df = convert_data_types(df, customer_schema)
    df = format_timestamps(
        df, {"join_date": "yyyy-MM-dd", "last_update": "yyyy-MM-dd HH:mm:ss"}
    )
    df = remove_duplicates(df)
    df = clean_text_data(
        df, ["name", "email", "phone", "address"], exclude_columns=["email"]
    )
    df = data_quality_checks(df, ["customer_id", "credit_score"])
    return df


# Applying to branch
def cleanse_branch(df):
    df = handle_missing_values(df)
    df = convert_data_types(df, branch_schema)
    df = remove_duplicates(df)
    # Changed branch_name to name
    df = clean_text_data(df, ["name", "location"])
    return df


# function for cleaning all data
def process_and_cleanse_data():
    # transaction streaming
    transactions_stream_df = spark.readStream.table("Bronze_layer.transactions")

    cleaned_transaction_stream_df = cleanse_transaction(transactions_stream_df)

    # writing stream
    transaction_query = (
        cleaned_transaction_stream_df.writeStream.format("delta")
        .option("checkpointLocation", "Silver_layer.checkpoint")
        .outputMode("append")
        .toTable("Silver_layer.transactions")
    )

    print("Streaming transactions started...")

    # Processing customer
    customer_df = spark.read.table("Bronze_layer.customers")

    cleaned_customer_df = cleanse_customer(customer_df)

    cleaned_customer_df.write.format("delta").mode("overwrite").saveAsTable(
        "Silver_layer.customers"
    )

    print("batch customers completed...")

    # processing branch
    branch_df = spark.read.table("Bronze_layer.branches")

    cleaned_branch_df = cleanse_branch(branch_df)

    cleaned_branch_df.write.format("delta").mode("overwrite").saveAsTable(
        "Silver_layer.branches"
    )

    print("batch branches completed...")

    return transaction_query


# starting processing and cleansing all data
transaction_streaming = process_and_cleanse_data()

if transaction_streaming:
    try:
        transaction_streaming.awaitTermination()
    except KeyboardInterrupt:
        print("Streaming interrupted.")
else:
    print("Streaming not started.")

Streaming transactions started...
batch customers completed...
batch branches completed...
