## Transform all data

### Matches

In [0]:
from pyspark.sql.types import (
    StructType, StructField,
    StringType, IntegerType, LongType
)

from pyspark.sql.functions import col, from_json, to_timestamp, to_date, current_timestamp, when, lit

# Define schema for payload_json
match_schema = StructType([
    StructField("area", StructType([
        StructField("id", IntegerType()),
        StructField("name", StringType()),
        StructField("code", StringType())
    ])),
    StructField("competition", StructType([
        StructField("id", IntegerType()),
        StructField("name", StringType()),
        StructField("code", StringType())
    ])),
    StructField("season", StructType([
        StructField("id", IntegerType()),
        StructField("startDate", StringType()),
        StructField("endDate", StringType())
    ])),
    StructField("id", LongType()),
    StructField("utcDate", StringType()),
    StructField("status", StringType()),
    StructField("attendance", IntegerType()),
    StructField("venue", StringType()),
    StructField("matchday", IntegerType()),
    StructField("stage", StringType()),
    StructField("homeTeam", StructType([
        StructField("id", IntegerType()),
        StructField("name", StringType())
    ])),
    StructField("awayTeam", StructType([
        StructField("id", IntegerType()),
        StructField("name", StringType())
    ])),
    StructField("score", StructType([
        StructField("fullTime", StructType([
            StructField("home", IntegerType()),
            StructField("away", IntegerType())
        ])),
        StructField("halfTime", StructType([
            StructField("home", IntegerType()),
            StructField("away", IntegerType())
        ]))
    ])),
    StructField("lastUpdated", StringType())
])

# Parse JSON and rename columns
CATALOG_NAME = "football_data_org"
BRONZE_SCHEMA = "bronze"
SILVER_SCHEMA = "silver"

df_bronze_matches = spark.table(f"{CATALOG_NAME}.{BRONZE_SCHEMA}.raw_matches")

df_matches_parsed = (
    df_bronze_matches
        .withColumn("data", from_json("payload_json", match_schema))
)

# To reorder columns so that an added column (e.g., "kickoff_utc") appears between two original columns,
# use select() after all transformations to specify the desired order.

df_matches_silver_base = (
    df_matches_parsed
        .select(
            col("matchId").alias("match_id"),
            col("data.area.code").alias("area_code"),
            col("data.area.name").alias("area_name"),
            col("data.competition.code").alias("competition_code"),            
            col("data.status").alias("status"),
            col("data.season.id").alias("season_id"),
            col("data.stage").alias("stage"),
            col("data.venue").alias("venue"),
            col("data.attendance").alias("attendance"),
            col("data.season.startDate").alias("season_start_date_raw"),
            col("data.season.endDate").alias("season_end_date_raw"),
            col("data.utcDate").alias("utc_datetime_raw"),
            col("data.matchday").alias("matchday"),
            col("data.homeTeam.id").alias("home_team_id"),
            col("data.homeTeam.name").alias("home_team_name"),
            col("data.awayTeam.id").alias("away_team_id"),
            col("data.awayTeam.name").alias("away_team_name"),
            col("data.score.fullTime.home").alias("full_time_home_goals"),
            col("data.score.fullTime.away").alias("full_time_away_goals"),
            col("data.score.halfTime.home").alias("half_time_home_goals"),
            col("data.score.halfTime.away").alias("half_time_away_goals"),
            col("ingest_ts"),
            col("source_api_url")
        )
        .withColumn("kickoff_utc", to_timestamp("utc_datetime_raw"))
        .withColumn("kickoff_time", 
                    to_timestamp("utc_datetime_raw").cast("timestamp").substr(12,5))
        .withColumn("match_date", to_date("utc_datetime_raw"))
        .withColumn("season_start_date", to_date("season_start_date_raw"))
        .withColumn("season_end_date", to_date("season_end_date_raw"))
        .drop("utc_datetime_raw", "season_start_date_raw", "season_end_date_raw")
        .withColumn("silver_loaded_ts", current_timestamp())
        # Reorder columns: place "kickoff_utc" between "competition_code" and "status"
        .select(
            "match_id", "status", "season_id", "stage", "area_code", "area_name", "competition_code", "venue", "attendance",
            "matchday", "match_date",
            "kickoff_time","home_team_id", "home_team_name", "away_team_id", "away_team_name",
            "full_time_home_goals", "full_time_away_goals", "half_time_home_goals", "half_time_away_goals",
            "kickoff_utc", "season_start_date", "season_end_date",
            "ingest_ts", "source_api_url", "silver_loaded_ts"
        )
)


# Handle nulls and invalid rows
df_matches_flagged = (
    df_matches_silver_base
        .withColumn(
            "data_quality_issue",
            when(col("match_id").isNull(), lit("MISSING_MATCH_ID"))
            .when(col("competition_code").isNull(), lit("MISSING_COMPETITION"))
            .when(col("home_team_id").isNull() | col("away_team_id").isNull(), lit("MISSING_TEAM"))
            .when(col("kickoff_utc").isNull(), lit("MISSING_KICKOFF"))
            .when(
                (col("full_time_home_goals") < 0) | (col("full_time_away_goals") < 0),
                lit("NEGATIVE_GOALS")
            )
        )
        .withColumn("is_valid_record", col("data_quality_issue").isNull())
)

df_matches_valid = df_matches_flagged.filter(col("is_valid_record") == True)
df_matches_invalid = df_matches_flagged.filter(col("is_valid_record") == False)

df_matches_valid.write.format("delta").mode("overwrite").saveAsTable(
    f"{CATALOG_NAME}.{SILVER_SCHEMA}.matches_all_data"
)

df_matches_invalid.write.format("delta").mode("overwrite").saveAsTable(
    f"{CATALOG_NAME}.{SILVER_SCHEMA}.matches_all_data_quarantine"
)

### Teams

In [0]:
from pyspark.sql.types import (
    StructType, StructField,
    StringType, IntegerType, LongType
)

from pyspark.sql.functions import col, from_json, to_timestamp, to_date, current_timestamp, when, lit


# Schema for payload_json
team_schema = StructType([
    StructField("area", StructType([
        StructField("id", IntegerType()),
        StructField("name", StringType()),
        StructField("code", StringType())
    ])),
    StructField("id", IntegerType()),
    StructField("name", StringType()),
    StructField("shortName", StringType()),
    StructField("tla", StringType()),
    StructField("crest", StringType()),
    StructField("address", StringType()),
    StructField("website", StringType()),
    StructField("founded", IntegerType()),
    StructField("clubColors", StringType()),
    StructField("venue", StringType()),
    StructField("lastUpdated", StringType())
])

# Parse and flatten
df_bronze_teams = spark.table(f"{CATALOG_NAME}.{BRONZE_SCHEMA}.raw_teams")

df_teams_parsed = df_bronze_teams.withColumn(
    "data", from_json("payload_json", team_schema)
)

df_teams_silver_base = (
    df_teams_parsed
    .select(
        col("teamId").alias("team_id"),
        col("data.name").alias("team_name"),
        col("data.shortName").alias("short_name"),
        col("data.tla").alias("tla"),
        col("data.area.id").alias("area_id"),
        col("data.area.name").alias("area_name"),
        col("data.area.code").alias("area_code"),
        col("data.crest").alias("crest_url"),
        col("data.founded").alias("founded_year"),
        col("data.clubColors").alias("club_colors"),
        col("data.venue").alias("venue"),
        col("data.address").alias("address"),
        col("data.website").alias("website"),
        col("data.lastUpdated").alias("last_updated_raw"),
        col("ingest_ts"),
        col("source_api_url")
    )
    .withColumn("last_updated", to_timestamp("last_updated_raw"))
    .drop("last_updated_raw")
    .withColumn("silver_loaded_ts", current_timestamp())
)

# Quality rules
df_teams_flagged = (
    df_teams_silver_base
    .withColumn(
        "data_quality_issue",
        when(col("team_id").isNull(), lit("MISSING_TEAM_ID"))
        .when(col("team_name").isNull(), lit("MISSING_TEAM_NAME"))
    )
    .withColumn("is_valid_record", col("data_quality_issue").isNull())
)

df_teams_valid = df_teams_flagged.filter(col("is_valid_record") == True)
df_teams_invalid = df_teams_flagged.filter(col("is_valid_record") == False)

df_teams_valid.write.format("delta").mode("overwrite").saveAsTable(
    f"{CATALOG_NAME}.{SILVER_SCHEMA}.teams_all_data"
)

df_teams_invalid.write.format("delta").mode("overwrite").saveAsTable(
    f"{CATALOG_NAME}.{SILVER_SCHEMA}.teams_all_data_quarantine"
)

### players

In [0]:

from pyspark.sql.types import (
    StructType, StructField,
    StringType, IntegerType, LongType,  BooleanType
)

from pyspark.sql.functions import col, from_json, to_timestamp, to_date, current_timestamp, when, lit, coalesce


person_schema = StructType([
    StructField("id", IntegerType()),
    StructField("name", StringType()),
    StructField("firstName", StringType()),
    StructField("lastName", StringType()),
    StructField("dateOfBirth", StringType()),
    StructField("nationality", StringType()),
    StructField("section", StringType()),
    StructField("position", StringType()),
    StructField("shirtNumber", IntegerType()),
    StructField("lastUpdated", StringType()),
    StructField("currentTeam", StructType([
        StructField("id", IntegerType()),
        StructField("name", StringType()),
        StructField("shortName", StringType()),
        StructField("tla", StringType())
    ]))
])


# Parse JSON and rename columns
CATALOG_NAME = "football_data_org"
BRONZE_SCHEMA = "bronze"
SILVER_SCHEMA = "silver"

# Parse and flatten nested currentTeam
df_bronze_persons = spark.table(f"{CATALOG_NAME}.{BRONZE_SCHEMA}.raw_players")

df_persons_parsed = df_bronze_persons.withColumn(
    "data", from_json("payload_json", person_schema)
)

df_persons_silver_base = (
    df_persons_parsed
    .select(
        col("personId").alias("person_id"),
        col("data.name").alias("full_name"),
        col("data.firstName").alias("first_name"),
        col("data.lastName").alias("last_name"),
        col("data.dateOfBirth").alias("date_of_birth_raw"),
        col("data.nationality").alias("nationality"),
        col("data.section").alias("section"),
        col("data.position").alias("position"),
        col("data.shirtNumber").alias("shirt_number"),
        col("data.lastUpdated").alias("last_updated_raw"),
        col("data.currentTeam.id").alias("current_team_id"),
        col("data.currentTeam.name").alias("current_team_name"),
        col("data.currentTeam.tla").alias("current_team_tla"),
        col("competition_code"),
        col("source_system"),
        col("ingest_ts"),
        col("source_api_url")
    )
    .withColumn("date_of_birth", to_date("date_of_birth_raw"))
    .withColumn("last_updated", to_timestamp("last_updated_raw"))
    .drop("date_of_birth_raw", "last_updated_raw")
    .withColumn("silver_loaded_ts", current_timestamp())
)

# Quality rules
df_persons_flagged = (
    df_persons_silver_base
    .withColumn(
        "data_quality_issue",
        when(col("person_id").isNull(), lit("MISSING_PERSON_ID"))
        .when(col("full_name").isNull(), lit("MISSING_NAME"))
    )
    .withColumn("is_valid_record", col("data_quality_issue").isNull())
)

df_persons_valid = df_persons_flagged.filter(col("is_valid_record") == True)
df_persons_invalid = df_persons_flagged.filter(col("is_valid_record") == False)

df_persons_valid.write.format("delta").mode("overwrite").saveAsTable(
    f"{CATALOG_NAME}.{SILVER_SCHEMA}.players_all_data"
)

df_persons_invalid.write.format("delta").mode("overwrite").saveAsTable(
    f"{CATALOG_NAME}.{SILVER_SCHEMA}.players_all_data_quarantine"
)