In [0]:

player_mapping_path = "dbfs:/FileStore/tables/player_mapping.csv"
player_mapping_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(player_mapping_path)
player_mapping = {row["Player"]: int(row["player_id"]) for row in player_mapping_df.collect()}

player_pass_path = "dbfs:/FileStore/tables/processed_players_pass_data_2025_01_23.csv"
player_pass_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(player_pass_path)

player_goals_path = "dbfs:/FileStore/tables/processed_players_goals_data_2025_01_23.csv"
player_goals_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(player_goals_path)

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

def map_player_id(player_name):
    return player_mapping.get(player_name, None)

map_player_id_udf = udf(map_player_id, IntegerType())
player_pass_df = player_pass_df.withColumn("player_id", map_player_id_udf(player_pass_df["Player"]))
player_goals_df = player_goals_df.withColumn("player_id", map_player_id_udf(player_goals_df["Player"]))

match_data_path = "dbfs:/FileStore/tables/processed_match_data_2025_01_23-3.csv"
match_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(match_data_path)

salary_data_path = "dbfs:/FileStore/tables/processed_salary_data_2025_01_23-3.csv"
salary_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(salary_data_path)
salary_df = salary_df.withColumn("player_id", map_player_id_udf(salary_df["Player"]))

match_df.show()
salary_df.show()
player_pass_df.show()
player_goals_df.show()

In [0]:

match_data_delta_path = "/delta/match_data"
salary_data_delta_path = "/delta/salary_data"
player_pass_delta_path = "/delta/player_pass_data"
player_goal_delta_path = "/delta/player_goals_data"

match_df.write.format("delta").option("mergeSchema", "true").mode("overwrite").save(match_data_delta_path)
salary_df.write.format("delta").option("mergeSchema", "true").mode("overwrite").save(salary_data_delta_path)
player_pass_df.write.format("delta").option("mergeSchema", "true").mode("overwrite").save(player_pass_delta_path)
player_goals_df.write.format("delta").option("mergeSchema", "true").mode("overwrite").save(player_goal_delta_path)  

spark.sql(f"""
CREATE TABLE IF NOT EXISTS match_data
USING DELTA
LOCATION '{match_data_delta_path}'
""")

spark.sql(f"""
CREATE TABLE IF NOT EXISTS salary_data
USING DELTA
LOCATION '{salary_data_delta_path}'
""")

spark.sql(f"""
CREATE TABLE IF NOT EXISTS player_pass_data
USING DELTA
LOCATION '{player_pass_delta_path}'
""")

spark.sql(f"""
CREATE TABLE IF NOT EXISTS player_goals_data
USING DELTA
LOCATION '{player_goal_delta_path}'
""")    

spark.sql("SELECT * FROM match_data LIMIT 10").show()
spark.sql("SELECT * FROM salary_data LIMIT 10").show()
spark.sql("SELECT * FROM player_pass_data LIMIT 10").show()
spark.sql("SELECT * FROM player_goals_data LIMIT 10").show()

In [0]:
%sql
CREATE TABLE IF NOT EXISTS team_salaries_2024 AS
SELECT 
    team,
    SUM(base_salary) AS total_base_salary,
    SUM(Guaranteed_Compensation) AS total_guaranteed_salary
FROM 
    salary_data
WHERE 
    season = 2024
GROUP BY 
    team;

SELECT * FROM team_salaries_2024 LIMIT 10;

In [0]:
%sql
CREATE TABLE match_data_summary AS
SELECT 
    Date AS match_date,
    Home AS home_team,
    Away AS away_team,
    HxGt AS home_team_expected_goals,
    AxGt AS away_team_expected_goals,
    hg AS home_team_goals,
    ag AS away_team_goals
FROM 
    match_data;

SELECT * FROM match_data_summary LIMIT 10;

In [0]:
%sql
SELECT
  player_id,
  player,
  team,
  season,
  goals_added
FROM
  player_goals_data
ORDER BY
  goals_added DESC
LIMIT 10;