In [25]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Test Bronze Ingest") \
    .master("local[*]") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()


In [26]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, expr, count
from pyspark.sql.functions import col, when, sum as spark_sum

In [27]:
scores_df = (
        spark.read.option("header", True)
        .csv("../data/scores.csv")
    )

In [28]:
scores_df.printSchema()

root
 |-- game_id: string (nullable = true)
 |-- home_goals: string (nullable = true)
 |-- away_goals: string (nullable = true)
 |-- ingestion_time: string (nullable = true)



In [29]:
scores_df.count()

386

In [30]:
scores_df.show()

+--------------------+----------+----------+--------------------+
|             game_id|home_goals|away_goals|      ingestion_time|
+--------------------+----------+----------+--------------------+
|9a7624be518dffb4c...|       2.0|       1.0|2025-08-22T17:47:56Z|
|55970f64637d0f56d...|       2.0|       3.0|2025-08-22T22:29:28Z|
|750ba4567999ebd97...|       1.0|       1.0|2025-08-23T20:27:10Z|
|15feb050f27b2c46e...|       4.0|       0.0|2025-08-23T20:37:18Z|
|f2bc560c91217f7e3...|       2.0|       2.0|2025-08-23T21:03:21Z|
|de44a47a4dfb2803d...|       1.0|       1.0|2025-08-23T22:30:55Z|
|fcc94bed9cfb4412d...|       3.0|       0.0|2025-08-23T22:31:50Z|
|73a4567666ebd9fcc...|       1.0|       2.0|2025-08-24T17:29:04Z|
|73a4567666ebd9fcc...|       1.0|       2.0|2025-08-24T17:29:04Z|
|f21b2961c297b2c60...|       5.0|       1.0|2025-08-24T21:14:52Z|
|7b231677734908a6a...|       0.0|       5.0|2025-08-24T22:51:04Z|
|07b3a6ff878548296...|       4.0|       0.0|2025-08-25T20:48:08Z|
|5274552fd

We have 6 extra values

Check how many different teams we have

Since teams are 20 and we have 38 rounds we need to have 20x38

# Check if game_id has unique values

In [31]:
def column_unique(df: DataFrame, column: str) -> tuple[bool, str]:
    total = df.count()
    distinct = df.select(column).distinct().count()
    if total != distinct:
        return False, f"Column '{column}' has {total - distinct} duplicate values"
    return True, f"Column '{column}' is unique"

In [32]:
column_unique(scores_df, "game_id")

(False, "Column 'game_id' has 5 duplicate values")

We need to identify the dublicates in game_id and check if the rows are same

In [33]:
duplicate_ids = (
    scores_df.groupBy("game_id")
    .agg(count("*").alias("count"))
    .filter(col("count") > 1)
    .select("game_id")
)

# Step 2: Join back to original DataFrame to show full duplicate rows
dupe_rows = scores_df.join(duplicate_ids, on="game_id", how="inner")

# Step 3: Show result
dupe_rows.show(truncate=False)

+--------------------------------+----------+----------+--------------------+
|game_id                         |home_goals|away_goals|ingestion_time      |
+--------------------------------+----------+----------+--------------------+
|73a4567666ebd9fcc18424d19ef4d4b7|1.0       |2.0       |2025-08-24T17:29:04Z|
|73a4567666ebd9fcc18424d19ef4d4b7|1.0       |2.0       |2025-08-24T17:29:04Z|
|2a656651aa83c4f9d7c9a2ef926e3d4d|3.0       |2.0       |2025-11-10T20:50:55Z|
|2a656651aa83c4f9d7c9a2ef926e3d4d|NULL      |NULL      |2025-11-10T20:50:55Z|
|c0bd9b33b58e37d501d3a19c5aa6a86f|2.0       |1.0       |2025-11-16T17:44:33Z|
|c0bd9b33b58e37d501d3a19c5aa6a86f|2.0       |1.0       |2025-11-16T17:44:33Z|
|9c0a9f78fc51bbc8b88dd7d3422c65a7|2.0       |0.0       |2026-02-14T21:06:21Z|
|9c0a9f78fc51bbc8b88dd7d3422c65a7|2.0       |0.0       |2026-02-14T21:06:21Z|
|18b68ec5b5ca681ead406efc54d73ff4|0.0       |2.0       |2026-04-11T20:42:57Z|
|18b68ec5b5ca681ead406efc54d73ff4|0.0       |2.0       |2026-04-

Clean dublicate rows in dataframe

In [34]:
scores_df = scores_df.dropDuplicates()

In [35]:
scores_df.count()

382

For now just drop rows with null values (there should be a logic to fix nulls in order not to loose info but it is outside of this scope)

In [36]:
scores_df = scores_df.dropna()

In [37]:
scores_df.count()

381

Now we have one extra value

# Check if round containes only numbers (integers) and if it is between 1 and 38

In [17]:
from pyspark.sql.utils import AnalysisException

In [22]:
from typing import Optional

def column_values_between(
    df: DataFrame,
    column: str,
    min_val: Optional[float] = None,
    max_val: Optional[float] = None
) -> tuple[bool, str]:
    """
    Validate that values in the column can be cast to numeric
    and are within min/max if provided.
    """
    try:
        # Cast to double using Spark SQL expression
        df_casted = df.withColumn(column, expr(f"CAST({column} AS DOUBLE)"))

        condition = col(column).isNull()

        if min_val is not None and max_val is not None:
            condition |= (col(column) < min_val) | (col(column) > max_val)
            range_msg = f"outside range [{min_val}, {max_val}]"
        elif min_val is not None:
            condition |= col(column) < min_val
            range_msg = f"less than {min_val}"
        elif max_val is not None:
            condition |= col(column) > max_val
            range_msg = f"greater than {max_val}"
        else:
            range_msg = "not numeric"

        invalid_count = df_casted.filter(condition).count()

        if invalid_count > 0:
            return False, f"Column '{column}' has {invalid_count} invalid values ({range_msg}) where {df_casted.filter(condition).show()}"
        return True, f"Column '{column}' passed numeric and range validation"

    except Exception as e:
        return False, f"Validation error on column '{column}': {str(e)}"


In [38]:
column_values_between(scores_df,"home_goals", 0 )

(True, "Column 'home_goals' passed numeric and range validation")

In [39]:
column_values_between(scores_df,"away_goals", 0 )

(True, "Column 'away_goals' passed numeric and range validation")

In [21]:
column_values_between(scores_df,"away_goals", 0 )

(False, "Column 'away_goals' has 1 invalid values (less than 0)")

Check that ingestion_time is between same times as game_start_time

In [40]:
from datetime import datetime
from pyspark.sql.functions import to_timestamp, col

In [41]:
def column_datetime_in_range(
        df: DataFrame,
        column: str,
        start_date: datetime,
        end_date: datetime
    ) -> tuple[bool, str]:
        """
        Validates that a datetime column contains:
        1. Only valid parsable timestamps
        2. All values between start_date and end_date (inclusive)

        Automatically parses strings to timestamp if needed.
        """
        
        try:
            # Try parsing column to timestamp if needed
            df_ts = df.withColumn("parsed_ts", to_timestamp(col(column)))

            # Check unparseable rows
            invalid_ts_count = df_ts.filter(col("parsed_ts").isNull()).count()
            if invalid_ts_count > 0:
                return False, f"Column '{column}' has {invalid_ts_count} unparseable datetime values"

            # Now check date range (inclusive)
            df_range = df_ts.filter(
                (col("parsed_ts") < start_date) | (col("parsed_ts") > end_date)
            )
            out_of_range = df_range.count()

            if out_of_range > 0:
                return False, (
                    f"Column '{column}' has {out_of_range} values outside range "
                    f"The values are {df_range.show()}"
                    f"[{start_date.date()} to {end_date.date()}]"
                )

            return True, f"All values in column '{column}' are valid datetimes within range."

        except AnalysisException as e:
            return False, f"Failed to analyze column '{column}': {str(e)}"
        except Exception as e:
            return False, f"Unexpected error validating column '{column}': {str(e)}"


In [43]:
column_datetime_in_range(scores_df,"ingestion_time", datetime(2025, 8, 22), datetime(2026,5,18))

(True,
 "All values in column 'ingestion_time' are valid datetimes within range.")

All of the basic checks are done now we need to implement some buisness logic check to see why we have this extra value in scores
1. We need to check if the scores.csv have the same game_id as the schedule_id (I believe there should be one extra value in score)

In [44]:
schedule_df = (
        spark.read.option("header", True)
        .csv("../data/schedule.csv")
    )

In [64]:
def game_ids_match(
        df1: DataFrame,
        df2: DataFrame,
        id_column: str = "game_id"
    ) -> tuple[bool, str]:
        """
        Check whether the given column (default: 'game_id') has exactly the same values
        in both DataFrames, and print differences as lists.
        """
        ids1 = df1.select(id_column).distinct()
        ids2 = df2.select(id_column).distinct()

        # Rows only in df1 and only in df2
        only_in_2 = ids2.subtract(ids1)
        only_in_1 = ids1.subtract(ids2)

        missing_in_2 = only_in_2.count()
        missing_in_1 = only_in_1.count()

        if missing_in_1 == 0 and missing_in_2 == 0:
            return True, f"✅ Both DataFrames contain the same set of '{id_column}' values."

        # Convert results to Python lists for printing
        list_2 = [row[id_column] for row in only_in_2.collect()]
        list_1 = [row[id_column] for row in only_in_1.collect()]

        return False, (
            f"❌ Mismatch in '{id_column}':"
            f"- {missing_in_2} missing in df1: {list_2}"
            f"- {missing_in_1} missing in df2: {list_1}"
        )

In [65]:
game_ids_match(schedule_df,scores_df,"game_id")

(False,
 "❌ Mismatch in 'game_id': \\n             - 1 missing in df1: ['750ba4567999ebd97nv18424d81bf412lz'] \\n            - 0 missing in df2: [] \\n")

In [52]:
scores_df.filter(col("game_id")=="750ba4567999ebd97nv18424d81bf412lz").show()

+--------------------+----------+----------+--------------------+
|             game_id|home_goals|away_goals|      ingestion_time|
+--------------------+----------+----------+--------------------+
|750ba4567999ebd97...|       1.0|       1.0|2025-08-23T20:27:10Z|
+--------------------+----------+----------+--------------------+



We can safely drop this value.