In [0]:
from pyspark.sql.functions import mean, stddev, col, when
from pyspark.sql import DataFrame
from pyspark.sql import DataFrameWriter
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType
from pyspark.sql.functions import col, mean, stddev, when, abs

In [0]:
metadata = spark.read.format("csv").option("header", "true").load("wasbs://dlbikestorelanding@adlsbikestoreinterns.blob.core.windows.net/Mostafa_Landing/metadata/metadata_mostafa.csv")

In [0]:
def get_unique_tables(metadata):
    return [row['TableName'] for row in metadata.select('TableName').distinct().collect()]

In [0]:
##load data
def load_source_data(spark, metadata_df):

    dataframes = {}

    # Extract unique folder names from the metadata DataFrame
    folder_names = metadata_df.select("destination_filename").distinct().rdd.flatMap(lambda x: x).collect()

    # Load files for each folder name
    for folder_name in folder_names:
        file_path = f"wasbs://dlbikestorelanding@adlsbikestoreinterns.blob.core.windows.net/Mostafa_Landing/{folder_name}/"
        df = spark.read.parquet(file_path, inferSchema=True)
        dataframes[folder_name] = df
    for source_filename, df in dataframes.items():
     globals()[f"{source_filename}_df"] = df
    
    return dataframes


In [0]:
##Handling Outliers using Z-Score and IQR methods 
def handle_outliers(df, column, method, z_thresh=3.0) -> DataFrame:

    if method == "zscore" or "Zscore" or "z score" or "Z score" or "z-score" or "Z-score":
        for column in df.columns:
            mean_value = df.select(mean(col(column))).collect()[0][0]
            stddev_value = df.select(stddev(col(column))).collect()[0][0]
            df = df.withColumn(
                column,
                when(
                    abs((col(column) - mean_value) / stddev_value) > z_thresh,
                    None 
                ).otherwise(col(column))
            )

    elif method == "iqr" or "IQR" or "Iqr":
        for column in df.columns:
            q1 = df.approxQuantile(column, [0.25], 0.01)[0]
            q3 = df.approxQuantile(column, [0.75], 0.01)[0]
            iqr = q3 - q1
            lower_bound = q1 - 1.5 * iqr
            upper_bound = q3 + 1.5 * iqr
            df = df.withColumn(
                column,
                when(
                    (col(column) < lower_bound) | (col(column) > upper_bound),
                    None  # Mark outliers as None (can be replaced or dropped later)
                ).otherwise(col(column))
            )

    else:
        raise ValueError("Unsupported method! Use 'zscore' or 'iqr'.")
    
    return df


In [0]:
def standardize_name_case(df, column_name):

    lower_count = df.filter(F.lower(F.col(column_name)) == F.col(column_name)).count()
    upper_count = df.filter(F.upper(F.col(column_name)) == F.col(column_name)).count()

    if lower_count > upper_count:

        standardized_df = df.withColumn(column_name, F.lower(F.col(column_name)))
    else:

        standardized_df = df.withColumn(column_name, F.upper(F.col(column_name)))

    return standardized_df


In [0]:
def handle_nulls(df, metadata, table_name):
    # Get column names and their nullable status from metadata
    table_metadata = metadata.filter(F.col('table_name') == table_name).collect()

    for row in table_metadata:
        column_name = row['table_name']
        nullable = row['can_be_nulled']

        if nullable == 'Y':
            # Fill nulls with -1
            df = df.na.fill({column_name: 'unknown'})
        elif nullable == 'N':
            # Drop rows where column has nulls
            df = df.na.drop(subset=[column_name])

    return df

In [0]:
## Handling Duplicates in a DataFrame by metadata 
def handle_duplicates(df, metadata, table_name):
    table_metadata = metadata.filter(F.col('source_filename') == table_name).collect()

    for row in table_metadata:
        column_name = row['table_name']
        unique = row['unique?']

        if unique == 'Y':
            # Drop duplicates, keeping the first
            df = df.drop_duplicates(subset=[column_name])
    
    return df


In [0]:
#validating data types and changing them if neccesary
def change_data_type(df: DataFrame, column: str, new_type: str) -> DataFrame:
  return df.withColumn(column, df[column].cast(new_type))

In [0]:
def check_column_data_type(df, table_name, metadata_df):

    column_types = dict(metadata_df.filter(F.col('destination_filename') == table_name)
                        .select('table_column', 'data_type')
                        .collect())

    for column, expected_type in column_types.items():
        if column in df.columns:
            if expected_type == 'int':
                df = df.withColumn(column, F.col(column).cast(IntegerType()))
            elif expected_type == 'float':
                df = df.withColumn(column, F.col(column).cast(FloatType()))
            elif expected_type == 'string': 
                df = df.withColumn(column, F.col(column).cast(StringType()))
            elif expected_type == 'datetime':  
                df = df.withColumn(column, F.col(column).cast("timestamp"))
            elif expected_type == 'boolean':
                df = df.withColumn(column, F.col(column).cast(BooleanType()))
            else:
                raise ValueError(f"Unsupported data type: {expected_type} for column: {column}")
    
    return df
