In [1]:
gold_df_master = spark.sql("SELECT * FROM fabriclab01.silver_master_table")
gold_df_summary = spark.sql("SELECT * FROM fabriclab01.silver_summary_table")


StatementMeta(, ea94c8da-16e5-4ad3-b86d-756971247bc4, 3, Finished, Available, Finished)

In [2]:
from pyspark.sql.functions import monotonically_increasing_id, col, concat, lpad

batter_df = gold_df_master.select("batter").distinct() # Get the 'batter' unique values
bowler_df = gold_df_master.select("bowler").distinct() # Get the 'bowler' unique values
non_striker_df = gold_df_master.select("non_striker").distinct() # Get the 'non_striker' unique values

# Union all players into one dataframe
players_df = batter_df.union(bowler_df).union(non_striker_df).distinct()

# Assign a unique 4-digit ID to each player
players_df_with_id = players_df.withColumn("player_id", lpad(monotonically_increasing_id().cast("string"), 4, '0'))

# Show the list of unique players with their IDs
players_df_with_id.show(truncate=False)

StatementMeta(, ea94c8da-16e5-4ad3-b86d-756971247bc4, 4, Finished, Available, Finished)

+--------------+---------+
|batter        |player_id|
+--------------+---------+
|TM Dilshan    |0000     |
|Kuldeep Yadav |0001     |
|S Anirudha    |0002     |
|LA Carseldine |0003     |
|KA Pollard    |0004     |
|M Muralitharan|0005     |
|J Botha       |0006     |
|M Theekshana  |0007     |
|SS Cottrell   |0008     |
|CA Ingram     |0009     |
|R Sanjay Yadav|0010     |
|DR Smith      |0011     |
|Jaskaran Singh|0012     |
|M Manhas      |0013     |
|A Flintoff    |0014     |
|GR Napier     |0015     |
|AR Patel      |0016     |
|SA Yadav      |0017     |
|B Lee         |0018     |
|BMAJ Mendis   |0019     |
+--------------+---------+
only showing top 20 rows



In [3]:

from delta.tables import DeltaTable

# Write the transformed data into the Silver layer
players_df_with_id.write.option("overwriteSchema", "true").format("delta").mode("overwrite").save("Tables/gold_player_table")

StatementMeta(, ea94c8da-16e5-4ad3-b86d-756971247bc4, 5, Finished, Available, Finished)

In [5]:
team_1_df = gold_df_master.select("team_1").distinct() # Get the 'team_1' unique values
team_2_df = gold_df_master.select("team_2").distinct() # Get the 'team_2' unique values

# Union the teams and get distinct entries
teams_df = team_1_df.union(team_2_df).distinct()

# Assign a unique 3-digit ID to each team
teams_df_with_id = teams_df.withColumn("team_id", lpad(monotonically_increasing_id().cast("string"), 3, '0'))

# Show the list of unique teams with their IDs
teams_df_with_id.show(truncate=False)

StatementMeta(, ea94c8da-16e5-4ad3-b86d-756971247bc4, 7, Finished, Available, Finished)

+---------------------------+-------+
|team_1                     |team_id|
+---------------------------+-------+
|Sunrisers Hyderabad        |000    |
|Lucknow Super Giants       |001    |
|Chennai Super Kings        |002    |
|Gujarat Titans             |003    |
|Royal Challengers Bengaluru|004    |
|Rising Pune Supergiant     |005    |
|Unknown                    |006    |
|Deccan Chargers            |007    |
|Kochi Tuskers Kerala       |008    |
|Rajasthan Royals           |009    |
|Gujarat Lions              |010    |
|Royal Challengers Bangalore|011    |
|Kolkata Knight Riders      |012    |
|Rising Pune Supergiants    |013    |
|Punjab Kings               |014    |
|Pune Warriors              |015    |
|Delhi Capitals             |016    |
|Mumbai Indians             |017    |
+---------------------------+-------+



In [6]:
teams_df_with_id.write.option("overwriteSchema", "true").format("delta").mode("overwrite").save("Tables/gold_teams_table")

StatementMeta(, ea94c8da-16e5-4ad3-b86d-756971247bc4, 8, Finished, Available, Finished)

In [7]:

from pyspark.sql.functions import col

# Create an alias for the main DataFrame
silver_df = gold_df_master.alias("silver")

# First join for batter_id
players_df_with_batter_alias = players_df_with_id.alias("batter_players")
ball_by_ball_df = silver_df.join(players_df_with_batter_alias,
                                 col("silver.batter") == col("batter_players.batter"), "left") \
    .withColumnRenamed("player_id", "batter_id")

# Second join for bowler_id
players_df_with_bowler_alias = players_df_with_id.alias("bowler_players")
ball_by_ball_df = ball_by_ball_df.join(players_df_with_bowler_alias,
                                       col("silver.bowler") == col("bowler_players.batter"), "left") \
    .withColumnRenamed("player_id", "bowler_id")

# Third join for non_striker_id
players_df_with_non_striker_alias = players_df_with_id.alias("non_striker_players")
ball_by_ball_df = ball_by_ball_df.join(players_df_with_non_striker_alias,
                                       col("silver.non_striker") == col("non_striker_players.batter"), "left") \
    .withColumnRenamed("player_id", "non_striker_id")

# Create an alias for teams DataFrame
teams_df_with_team1_alias = teams_df_with_id.alias("team1_teams")
ball_by_ball_df = ball_by_ball_df.join(teams_df_with_team1_alias,
                                       col("silver.team_1") == col("team1_teams.team_1"), "left") \
    .withColumnRenamed("team_id", "team_1_id")

teams_df_with_team2_alias = teams_df_with_id.alias("team2_teams")
ball_by_ball_df = ball_by_ball_df.join(teams_df_with_team2_alias,
                                       col("silver.team_2") == col("team2_teams.team_1"), "left") \
    .withColumnRenamed("team_id", "team_2_id")

# Select necessary columns including updated player and team IDs
ball_by_ball_df_final = ball_by_ball_df.select(
    "batter_id", "bowler_id", "non_striker_id", "team_1_id", "team_2_id", 
    "season", "city", "`runs.batter`", "`runs.total`", "`wicket.kind`", "`wicket.player_out`", "venue"
)

# Show the updated ball-by-ball data with unique player and team IDs
ball_by_ball_df_final.show(truncate=False)

StatementMeta(, ea94c8da-16e5-4ad3-b86d-756971247bc4, 9, Finished, Available, Finished)

+---------+---------+--------------+---------+---------+------+--------------+-----------+----------+-----------+-----------------+---------------------------------------+
|batter_id|bowler_id|non_striker_id|team_1_id|team_2_id|season|city          |runs.batter|runs.total|wicket.kind|wicket.player_out|venue                                  |
+---------+---------+--------------+---------+---------+------+--------------+-----------+----------+-----------+-----------------+---------------------------------------+
|0454     |0397     |0146          |016      |007      |2008  |Delhi         |0          |1         |stumped    |G Gambhir        |FerozShahKotla                         |
|0343     |0317     |0649          |017      |011      |2009  |Port Elizabeth|0          |1         |stumped    |RE van der Merwe |StGeorge'sPark                         |
|0289     |0317     |0152          |006      |006      |2010  |Unknown       |0          |1         |stumped    |KP Pietersen     |Unknown  

In [8]:
from delta.tables import DeltaTable

# Write the transformed data into the Silver layer
ball_by_ball_df_final.write.option("overwriteSchema", "true").format("delta").mode("overwrite").save("Tables/gold_ball_by_ball_table")

StatementMeta(, ea94c8da-16e5-4ad3-b86d-756971247bc4, 10, Finished, Available, Finished)