### Data Cleaning and Transformation

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS silver

In [0]:
from pyspark.sql.functions import *

In [0]:
%run /Workspace/Users/tesfamit03@gmail.com/code/utils/logging_utils

In [0]:
%sql
CREATE TABLE IF NOT EXISTS pipeline.silver_stats (
    stage STRING,
    source_table STRING,
    target_table STRING,
    start_time TIMESTAMP,
    end_time TIMESTAMP,
    processing_duration_seconds DOUBLE,
    num_records_cleaned LONG,
    error_message STRING,
    timestamp TIMESTAMP
) USING DELTA;

**Configuration for data cleaning and transformation**

In [0]:
silver_cleaning_config = [
    {
        "source_table": "bronze.appearance",
        "target_table": "silver.appearance",
        "cleaning_rules": {
            "fill_missing_values": {
                "yellow_cards": 0,
                "red_cards": 0,
                "goals": 0,
                "assists": 0,
                "minutes_played": 0
            },
            "convert_to_date": ["date"],
            "drop_duplicates": True
        }
    },
    {
        "source_table": "bronze.club_games",
        "target_table": "silver.club_games",
        "cleaning_rules": {
            "fill_missing_values": {
                "own_manager_name": "Unknown",
                "opponent_manager_name": "Unknown",
                "own_goals": 0,
                "opponent_goals": 0,
                "own_position": -1,
                "opponent_position": -1,
                "club_id": -1,
                "opponent_id": -1
            },
            "cast_columns": {
                "club_id": "int",
                "game_id": "int",
                "opponent_id": "int",
                "own_goals": "int",
                "opponent_goals": "int",
                "is_win": "boolean"
            },
            "drop_duplicates": True
        }
    },
    {
        "source_table": "bronze.clubs",
        "target_table": "silver.clubs",
        "cleaning_rules": {
            "drop_columns": ["total_market_value", "coach_name"],
            "cast_columns": {
                "club_id": "int",
                "domestic_competition_id": "int",
                "squad_size": "int",
                "national_team_players": "int",
                "stadium_seats": "int",
                "last_season": "int"
            },
            "fill_missing_values": {
                "club_code": "Unknown",
                "name": "Unknown",
                "domestic_competition_id": 0,
                "foreigners_number": 0,
                "foreigners_percentage": 0.0,
                "stadium_name": "Unknown",
                "stadium_seats": 0,
                "net_transfer_record": "Unknown",
                "filename": "Unknown",
                "url": "Unknown"
            },
            "drop_duplicates": True
        }
    },
    {
        "source_table": "bronze.competitions",
        "target_table": "silver.competitions",
        "cleaning_rules": {
            "drop_columns": ["url"],
            "drop_duplicates": True
        }
    },
    {
        "source_table": "bronze.game_events",
        "target_table": "silver.game_events",
        "cleaning_rules": {
            "drop_columns": ["player_in_id", "player_assist_id"],
            "cast_columns": {
                "game_id": "int",
                "minute": "int",
                "club_id": "int",
                "player_id": "int"
            },
            "fill_missing_values": {
                "description": "Unknown"
            },
            "drop_duplicates": True
        }
    },
    {
        "source_table": "bronze.game_lineups",
        "target_table": "silver.game_lineups",
        "cleaning_rules": {
            "drop_columns": ["game_lineups_id", "team_captain"],
            "cast_columns": {
                "game_id": "int",
                "player_id": "int",
                "club_id": "int",
                "number": "int"
            },
            "drop_duplicates": True
        }
    },
    {
        "source_table": "bronze.games",
        "target_table": "silver.games",
        "cleaning_rules": {
            "drop_columns": ["aggregate", "url"],
            "fill_missing_values": {
                "home_club_formation": "Unknown",
                "away_club_formation": "Unknown",
                "stadium": "Unknown",
                "referee": "Unknown",
                "home_club_name": "Unknown",
                "away_club_name": "Unknown"
            },
            "cast_columns": {
                "game_id": "int",
                "season": "int",
                "home_club_id": "int",
                "away_club_id": "int",
                "home_club_goals": "int",
                "away_club_goals": "int",
                "home_club_position": "float",
                "away_club_position": "float",
                "attendance": "int"
            },
            "convert_to_date": ["date"],
            "drop_duplicates": True,
            "drop_na": True
        }
    },
    {
        "source_table": "bronze.player_valuations",
        "target_table": "silver.player_valuations",
        "cleaning_rules": {
            "cast_columns": {
                "market_value_in_eur": "float"
            },
            "convert_to_date": ["date"],
            "drop_duplicates": True
        }
    },
    {
        "source_table": "bronze.players",
        "target_table": "silver.players",
        "cleaning_rules": {
            "drop_columns": ["image_url", "url", "name"],
            "fill_missing_values": {
                "country_of_birth": "Unknown",
                "city_of_birth": "Unknown",
                "agent_name": "Unknown",
                "foot": "Unknown",
                "position": "Unknown",
                "sub_position": "Unknown",
                "market_value_in_eur": 0.0,
                "highest_market_value_in_eur": 0.0
            },
            "cast_columns": {
                "date_of_birth": "date",
                "contract_expiration_date": "date",
                "player_id": "int",
                "current_club_id": "int",
                "market_value_in_eur": "float",
                "highest_market_value_in_eur": "float"
            },
            "drop_duplicates": True,
            "drop_na": True
        }
    },
    {
        "source_table": "bronze.transfers",
        "target_table": "silver.transfers",
        "cleaning_rules": {
            "drop_columns": ["transfer_fee"],
            "convert_to_date": ["transfer_date"],
            "fill_missing_values": {
                "market_value_in_eur": 0.0
            },
            "drop_duplicates": True,
            "drop_na": True
        }
    }
]

**Data Cleaning**

In [0]:
def clean_data(df, cleaning_rules):
    """
    Cleans a DataFrame based on the provided cleaning rules.
    :param df: The input DataFrame.
    :param cleaning_rules: A dictionary of cleaning rules.
    :return: The cleaned DataFrame.
    """
    # Drop columns
    if "drop_columns" in cleaning_rules:
        df = df.drop(*cleaning_rules["drop_columns"])
    
    # Fill missing values
    if "fill_missing_values" in cleaning_rules:
        df = df.fillna(cleaning_rules["fill_missing_values"])
    
    # Cast columns to specified data types
    if "cast_columns" in cleaning_rules:
        for column, dtype in cleaning_rules["cast_columns"].items():
            if dtype == "date":
                df = df.withColumn(column, to_date(col(column)))
            else:
                df = df.withColumn(column, col(column).cast(dtype))
    
    # Convert columns to DATE or TIMESTAMP
    if "convert_to_date" in cleaning_rules:
        for column in cleaning_rules["convert_to_date"]:
            df = df.withColumn(column, to_date(col(column)))
    
    # Handle conditional replacements
    if "replace_values" in cleaning_rules:
        for column, replacements in cleaning_rules["replace_values"].items():
            for old_value, new_value in replacements.items():
                df = df.withColumn(column, when(col(column) == old_value, new_value).otherwise(col(column)))
    
    # Drop duplicates
    if "drop_duplicates" in cleaning_rules and cleaning_rules["drop_duplicates"]:
        df = df.dropDuplicates()
    
    # Drop rows with null values
    if "drop_na" in cleaning_rules and cleaning_rules["drop_na"]:
        df = df.dropna()
    
    return df

**Transform and save as Delta Tables**

In [0]:
from pyspark.sql import SparkSession
import datetime
import logging

# Configure structured logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(module)s - %(message)s",
    handlers=[logging.StreamHandler()]
)

# Initialize Spark session
spark = SparkSession.builder.getOrCreate()

# Process each table in the configuration
for config in silver_cleaning_config:
    start_time = datetime.datetime.now()
    source_table = None  # Initialize to handle potential errors
    target_table = None  # Initialize to handle potential errors
    try:
        # Extract configuration details
        source_table = config.get("source_table")
        target_table = config.get("target_table")
        cleaning_rules = config.get("cleaning_rules")

        # Validate inputs
        if not all([source_table, target_table, cleaning_rules]):
            error_message = f"Missing required fields in configuration: {config}"
            logging.error(error_message)
            raise ValueError(error_message)

        logging.info(f"Starting transformation process for table: {source_table} -> {target_table}")

        # Read the Bronze table
        logging.info(f"Reading Bronze table: {source_table}")
        df_bronze = spark.read.table(source_table)

        # Clean the data
        logging.info(f"Cleaning data using rules: {cleaning_rules}")
        df_silver = clean_data(df_bronze, cleaning_rules)

        # Write the cleaned data to the Silver table
        logging.info(f"Writing cleaned data to Silver table: {target_table}")
        df_silver.write.format("delta") \
                 .mode("overwrite") \
                 .option("mergeSchema", "true") \
                 .saveAsTable(target_table)

        end_time = datetime.datetime.now()
        processing_duration = (end_time - start_time).total_seconds()

        logging.info(f"Successfully processed {source_table} -> {target_table}. Duration: {processing_duration} seconds.")
        print(f"Successfully processed {source_table} -> {target_table}.")

        # Log metadata
        log_pipeline_stats(
            stage="transformation",
            stats={
                "source_table": source_table,
                "target_table": target_table,
                "start_time": start_time,
                "end_time": end_time,
                "processing_duration_seconds": processing_duration,
                "num_records_cleaned": df_silver.count()
            },
            table_name="pipeline.silver_stats"
        )

    except Exception as e:
        # Log errors if processing fails
        end_time = datetime.datetime.now()
        error_message = f"Failed to process table: {source_table} -> {target_table}. Error: {str(e)}"
        logging.error(error_message)

        # Log metadata for the failure
        log_pipeline_stats(
            stage="transformation",
            stats={
                "source_table": source_table,
                "target_table": target_table,
                "start_time": start_time.isoformat() if start_time else None,  # Handle potential None
                "end_time": end_time.isoformat(),
                "error_message": str(e)
            },
            table_name="pipeline.silver_stats"
        )

        logging.error(f"Error details: {str(e)}")
        raise RuntimeError(error_message) from e

Successfully processed bronze.appearance -> silver.appearance.
Successfully processed bronze.club_games -> silver.club_games.
Successfully processed bronze.clubs -> silver.clubs.
Successfully processed bronze.competitions -> silver.competitions.
Successfully processed bronze.game_events -> silver.game_events.
Successfully processed bronze.game_lineups -> silver.game_lineups.
Successfully processed bronze.games -> silver.games.
Successfully processed bronze.player_valuations -> silver.player_valuations.
Successfully processed bronze.players -> silver.players.
Successfully processed bronze.transfers -> silver.transfers.
