In [0]:
%run ./common_functions

In [0]:
from pyspark.sql.types import TimestampType

In [0]:
src_container = "ipl-bronze"
storage_acc = "ipldataadlsg2"
dynamic_dfs = []

for path in list_files_in_blob(src_container, storage_acc):
    # Extract folder name 
    folder_name = path.strip("/").split("/")[-1]  # e.g., "match_performance_bronze"
    # Variable name like 'match_performance_bronze_df'
    var_name = f"{folder_name}_df"
    df = spark.read.format('parquet').load(path)
    globals()[var_name] = df
    # list to track the df names 
    dynamic_dfs.append(var_name)

dynamic_dfs

['match_performance_bronze_df',
 'match_stadium_bronze_df',
 'player_performance_bronze_df',
 'player_team_bronze_df',
 'stadium_bronze_df',
 'team_bronze_df']

In [0]:
# checking for null valuesin the columns in the dataframes
for i in dynamic_dfs:
    print(i+ " :-")
    null_counts(globals()[i]).show()

    # so match_performance_bronze_df has null values
    # player_team_bronze_df has null values
    # stadium_bronze_df has null values
    # team_bronze_df has null values

match_performance_bronze_df :-
+--------+-------+----------------+-----------+-------------+------------+--------------+-----------+
|match_id|team_id|opponent_team_id|runs_scored|wickets_taken|match_result|ingestion_date|source_file|
+--------+-------+----------------+-----------+-------------+------------+--------------+-----------+
|       0|      0|               0|          1|            0|           0|             0|          0|
+--------+-------+----------------+-----------+-------------+------------+--------------+-----------+

match_stadium_bronze_df :-
+--------+----------+--------------+-----------+
|match_id|stadium_id|ingestion_date|source_file|
+--------+----------+--------------+-----------+
|       0|         0|             0|          0|
+--------+----------+--------------+-----------+

player_performance_bronze_df :-
+--------+---------+-----------+-------------+----------+--------------+-----------+
|match_id|player_id|runs_scored|wickets_taken|ball_taken|ingestion_d

In [0]:
# cleaning match_performance_bronze_df
# Dropping null values from crital columns (like runs_scored or wickets_taken) and removing unknowns
match_performance_cleaned = (
    match_performance_bronze_df
    .dropna(subset=["runs_scored", "wickets_taken"])
    .filter(col("match_result").isin(["Win", "Loss", "Tie"]))
    .withColumn("ingestion_date", col("ingestion_date").cast(TimestampType())).select("match_id", "team_id", "opponent_team_id", "runs_scored", "wickets_taken", "match_result")
)
display(match_performance_cleaned)

match_id,team_id,opponent_team_id,runs_scored,wickets_taken,match_result
201,4,1,213.0,7,Tie
202,1,3,202.0,10,Loss
203,2,3,238.0,8,Win
205,4,3,123.0,8,Tie
206,1,2,215.0,6,Loss
207,5,3,211.0,5,Win
209,4,1,201.0,5,Tie
210,4,3,233.0,7,Win
211,5,1,144.0,10,Loss
212,1,5,227.0,6,Win


In [0]:
# cleaning player_team_bronze_df 
# Dropping null values from player_name column and replacing null values and casting ingestion_date to TimestampType
from pyspark.sql.functions import col, when
from pyspark.sql.types import TimestampType

player_team_cleaned_df = (
    player_team_bronze_df
    .dropna(subset=["player_name"])
    .withColumn(
        "player_role",
        when(col("player_role").isNull(), "Unknown").otherwise(col("player_role"))
    )
    .withColumn("ingestion_date", col("ingestion_date").cast(TimestampType())).select("player_id", "player_name", "team_id", "player_role")
)

display(player_team_cleaned_df)

player_id,player_name,team_id,player_role
101,Thomas Mendez,1,All-rounder
102,Laurie Brooks,5,Batsman
103,Warren Morgan,5,All-rounder
104,Amber Bell,2,Bowler
105,Shawn Bowers,1,Batsman
107,Autumn Chang,3,All-rounder
108,Jared Knox,2,Bowler
109,Kevin Conrad,2,Bowler
110,Alan Padilla,4,All-rounder
111,Tim Powers,4,Unknown


In [0]:
# cleaning stadium_bronze_df 
# finding null values in the capacity columns and replacing them with average of the column
# replacing null values in city column with "Unknown" and casting ingestion_date to TimestampType
from pyspark.sql.functions import avg 
avg_capacity = stadium_bronze_df.select(avg(col("capacity"))).first()[0]

stadium_cleaned_df = stadium_bronze_df \
    .withColumn("capacity", when(col("capacity").isNull(), int(avg_capacity)).otherwise(col("capacity"))) \
    .withColumn("city", when(col("city").isNull(), "Unknown").otherwise(col("city"))) \
    .withColumn("ingestion_date", col("ingestion_date").cast(TimestampType()))\
    .select("stadium_id", "stadium_name", "city", "capacity")
display(stadium_cleaned_df)


stadium_id,stadium_name,city,capacity
301,Wankhede Stadium,Mumbai,33108
302,M. A. Chidambaram Stadium,Chennai,48982
303,Arun Jaitley Stadium,Delhi,41820
304,Eden Gardens,Kolkata,66000
305,Rajiv Gandhi Intl. Cricket Stadium,Unknown,55000


In [0]:
    # cleaning team_bronze_df
    # replacing null values in home_ground and captain columns with "Unknown Homeground" and "Unknown Captain"
    # casting ingestion_date to TimestampType
    team_cleaned_df=team_bronze_df.withColumn("home_ground",when(col("home_ground").isNull(), "Unknown Homeground").otherwise(col("home_ground"))).withColumn("captain",when(col("captain").isNull(), "Unknown Captain").otherwise(col("captain"))) \
        .withColumn("ingestion_date", col("ingestion_date").cast(TimestampType())) \
        .select("team_id", "team_name", "home_ground", "captain")
    display(team_cleaned_df)

team_id,team_name,home_ground,captain
1,Mumbai Indians,Wankhede Stadium,Rohit Sharma
2,Chennai Super Kings,M. A. Chidambaram Stadium,MS Dhoni
3,Delhi Capitals,Unknown Homeground,Rishabh Pant
4,Kolkata Knight Riders,Eden Gardens,Shreyas Iyer
5,Sunrisers Hyderabad,Rajiv Gandhi Intl. Cricket Stadium,Unknown Captain


In [0]:
# 'match_stadium_bronze_df','player_performance_bronze_df',
# casting ingestion_date to TimestampType
match_stadium_cleaned_df=match_stadium_bronze_df.withColumn("ingestion_date", col("ingestion_date").cast(TimestampType()))\
    .select("match_id", "stadium_id")
player_performance_cleaned_df=player_performance_bronze_df.withColumn("ingestion_date", col("ingestion_date").cast(TimestampType())).select("match_id", "player_id", "runs_scored", "wickets_taken", "ball_taken")

In [0]:
# viewing the dataframes to confirm the data has been cleaned
display(match_performance_cleaned)
display(player_team_cleaned_df)
display(stadium_cleaned_df)
display(team_cleaned_df)
display(match_stadium_cleaned_df)
display(player_performance_cleaned_df)

In [0]:
src_container = "ipl-silver"
storage_acc = "ipldataadlsg2"

In [0]:
# using write_file_to_silver function to write the dataframes to silver container
write_file_to_silver(match_performance_cleaned,"match_performance.csv")
write_file_to_silver(player_team_cleaned_df,"player_team.csv")
write_file_to_silver(stadium_cleaned_df,"stadium.csv")
write_file_to_silver(team_cleaned_df,"team.csv")
write_file_to_silver(match_stadium_cleaned_df,"match_stadium.csv")
write_file_to_silver(player_performance_cleaned_df,"player_performance.csv")

In [0]:
# writing files to sql database using write_df_to_sql function 
jdbc_url = "jdbc:sqlserver://ipl-server.database.windows.net:1433;databaseName=ipl-db;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"

connection_properties = {
    "user": "ipl-admin-server@ipl-server",
    "password": "King@123", 
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

def write_df_to_sql(df, table_name):
    df.write \
        .format("jdbc") \
        .option("url", jdbc_url) \
        .option("dbtable", f"silver_db.{table_name}") \
        .option("user", user) \
        .option("password", password) \
        .option("driver", driver) \
        .mode("overwrite") \
        .save()
user= "ipl-admin-server@ipl-server"
password= "King@123"
driver="com.microsoft.sqlserver.jdbc.SQLServerDriver"

write_df_to_sql(match_performance_cleaned, "match_performance_silver")
write_df_to_sql(player_team_cleaned_df, "player_team_silver")
write_df_to_sql(stadium_cleaned_df, "stadium_silver")
write_df_to_sql(team_cleaned_df, "team_silver")
write_df_to_sql(match_stadium_cleaned_df, "match_stadium_silver")
write_df_to_sql(player_performance_cleaned_df, "player_performance_silver")
