In [0]:
%sql
USE CATALOG `football-analyze-v1`;
USE SCHEMA `football`;
SELECT current_catalog(), current_schema()

In [0]:
import requests
import json
from datetime import datetime
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from src.api.api_handler import APIError, APIRequestHandler
from src.api.endpoints import LEAGUES_ENDPOINT
from src.schemas.fields import TableNames, CommonFields, LeagueFields
from src.schemas.league_schema import LeagueSchema
from pyspark.sql.types import LongType
from src.schemas.schema_validation import SchemaValidation, ValidationResult
from src.utils.data_utils import DataUtils



def flatten_dataframe(df: DataFrame, separator: str = "_") -> DataFrame:
    """
    Recursively flatten a DataFrame with nested structs and arrays.
    
    Args:
        df: Input DataFrame with nested structures
        separator: String to separate nested field names (default: "_")
    
    Returns:
        Flattened DataFrame
    """
    
    def get_nested_columns(schema, prefix=""):
        """Extract all nested column paths from schema"""
        columns = []
        
        for field in schema.fields:
            field_name = f"{prefix}{separator}{field.name}" if prefix else field.name
            
            if isinstance(field.dataType, StructType):
                # Recursively get nested struct columns
                columns.extend(get_nested_columns(field.dataType, field_name))
            elif isinstance(field.dataType, ArrayType) and isinstance(field.dataType.elementType, StructType):
                # Handle array of structs - we'll explode these
                columns.append((field_name, "array_struct"))
            else:
                # Regular column
                columns.append((field_name, "regular"))
        
        return columns
    
    def flatten_struct_columns(df, separator="_"):
        """Flatten struct columns by selecting nested fields"""
        select_exprs = []
        
        for field in df.schema.fields:
            if isinstance(field.dataType, StructType):
                # Flatten struct fields
                for nested_field in field.dataType.fields:
                    alias_name = f"{field.name}{separator}{nested_field.name}"
                    select_exprs.append(col(f"{field.name}.{nested_field.name}").alias(alias_name))
            elif isinstance(field.dataType, ArrayType) and isinstance(field.dataType.elementType, StructType):
                # Keep array of structs as is for now - will handle in next step
                select_exprs.append(col(field.name))
            else:
                # Regular column
                select_exprs.append(col(field.name))
        
        return df.select(*select_exprs)
    
    def has_nested_structs(df):
        """Check if DataFrame still has nested structs"""
        for field in df.schema.fields:
            if isinstance(field.dataType, StructType):
                return True
            elif isinstance(field.dataType, ArrayType) and isinstance(field.dataType.elementType, StructType):
                return True
        return False
    
    # Iteratively flatten until no more nested structures
    current_df = df
    max_iterations = 10  # Prevent infinite loops
    iteration = 0
    
    while has_nested_structs(current_df) and iteration < max_iterations:
        # First, explode any arrays of structs
        array_columns = [field.name for field in current_df.schema.fields 
                        if isinstance(field.dataType, ArrayType) and 
                        isinstance(field.dataType.elementType, StructType)]
        
        for array_col in array_columns:
            # Explode array and flatten resulting structs
            other_cols = [col(field.name) for field in current_df.schema.fields if field.name != array_col]
            
            # Use explode_outer to handle null arrays
            exploded_df = current_df.select(*other_cols, explode_outer(col(array_col)).alias(f"{array_col}_exploded"))
            
            # Now flatten the exploded struct
            if exploded_df.schema[f"{array_col}_exploded"].dataType.__class__.__name__ == 'StructType':
                struct_cols = []
                for nested_field in exploded_df.schema[f"{array_col}_exploded"].dataType.fields:
                    alias_name = f"{array_col}{separator}{nested_field.name}"
                    struct_cols.append(col(f"{array_col}_exploded.{nested_field.name}").alias(alias_name))
                
                other_cols_final = [col(field.name) for field in exploded_df.schema.fields if field.name != f"{array_col}_exploded"]
                current_df = exploded_df.select(*other_cols_final, *struct_cols)
            else:
                current_df = exploded_df.withColumnRenamed(f"{array_col}_exploded", array_col)
        
        # Then flatten any remaining struct columns
        current_df = flatten_struct_columns(current_df, separator)
        iteration += 1
    
    return current_df



bronze_league_df = spark.read.table(f'staging.{TableNames.STAGING_LEAGUES}').select(
        col("record_id"),
        from_json(col("json_data"), schema=LeagueSchema.get_bronze_schema()).alias("parsed_data"),
        col("ingestion_timestamp")
    )

# Extract the parsed_data fields to remove the prefix
parsed_fields_df = bronze_league_df.select(
        col("record_id"),
        col("parsed_data.*"),  # This expands all fields from parsed_data without the prefix
        col("ingestion_timestamp")
    )

# Flatten the DataFrame
flattened_df = DataUtils.flatten_dataframe(parsed_fields_df, separator="_")

flattened_df.printSchema()

In [0]:
import requests
import json
from datetime import datetime
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from src.api.api_handler import APIError, APIRequestHandler
from src.api.endpoints import LEAGUES_ENDPOINT
from src.schemas.fields import TableNames, CommonFields, LeagueFields
from src.schemas.league_schema import LeagueSchema
from pyspark.sql.types import LongType
from src.schemas.schema_validation import SchemaValidation, ValidationResult


# spark.sql("USE SCHEMA `bronze`")
bronze_league_df = spark.read.table(f'bronze.leagues_raw_data').select(
    col("record_id"),
    from_json(col("json_data"), schema=LeagueSchema.get_bronze_schema()).alias("parsed_data"),
    col("ingestion_timestamp")
)

bronze_league_df = bronze_league_df.filter(
    col("parsed_data.league.id").isNotNull() &
    col("parsed_data.league.name").isNotNull() &
    col("parsed_data.league.type").isNotNull() &
    col("parsed_data.league.logo").isNotNull() &
    col("parsed_data.country.name").isNotNull() &
    col("parsed_data.country.flag").isNotNull()
)



silver_league_df = (bronze_league_df
                        .select(
                            col("parsed_data.league.id").cast(LongType()).alias(CommonFields.LEAGUE_ID),
                            col("parsed_data.league.name").cast(StringType()).alias(LeagueFields.LEAGUE_NAME),
                            col("parsed_data.league.type").cast(StringType()).alias(LeagueFields.TYPE),
                            col("parsed_data.league.logo").cast(StringType()).alias(LeagueFields.LOGO),
                            col("parsed_data.country.name").cast(StringType()).alias(LeagueFields.COUNTRY),
                            col("parsed_data.country.flag").cast(StringType()).alias(LeagueFields.COUNTRY_FLAG)
                        )

                        # Remove rows with null values for non-nullable columns
                        .filter(col(CommonFields.LEAGUE_ID).isNotNull())
                        .filter(col(LeagueFields.LEAGUE_NAME).isNotNull())
                        .filter(col(LeagueFields.TYPE).isNotNull()))

silver_league_df = silver_league_df.withColumn(
        "valid_schema",
        lit(True)
    )


validation_results: ValidationResult = SchemaValidation.validate_schema_and_data_quality(
    silver_league_df,
    LeagueSchema.get_silver_schema(),
    TableNames.SILVER_LEAGUES)
print(validation_results.to_str())
