In [0]:
spark.conf.set("fs.azure.account.auth.type.<storage-account>.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.storage-account.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.storage-account.dfs.core.windows.net", "application-id")
spark.conf.set("fs.azure.account.oauth2.client.secret.storage-account.dfs.core.windows.net", service_credential)
spark.conf.set("fs.azure.account.oauth2.client.endpoint.storage-account.dfs.core.windows.net", "https://login.microsoftonline.com/directory-id/oauth2/token")

###importing nesessary modules

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window

###creating spark session

In [0]:
spark = SparkSession.builder.appName('IPL Data Analysis').getOrCreate()


##Ball by Ball DF

####creating Schema for ball_by_ball file (Bronze)

In [0]:
ball_by_ball_schema = StructType([
    StructField("match_id", IntegerType(), True),
    StructField("over_id", IntegerType(), True),
    StructField("ball_id", IntegerType(), True),
    StructField("innings_no", IntegerType(), True),
    StructField("team_batting", IntegerType(), True),
    StructField("team_bowling", IntegerType(), True),
    StructField("striker_batting_position", IntegerType(), True),
    StructField("extra_type", StringType(), True),
    StructField("runs_scored", IntegerType(), True),
    StructField("extra_runs", IntegerType(), True),
    StructField("wides", IntegerType(), True),
    StructField("legbyes", IntegerType(), True),
    StructField("byes", IntegerType(), True),
    StructField("noballs", IntegerType(), True),
    StructField("penalty", IntegerType(), True),
    StructField("bowler_extras", IntegerType(), True),
    StructField("out_type", StringType(), True),
    StructField("caught", IntegerType(), True),
    StructField("bowled", IntegerType(), True),
    StructField("run_out", IntegerType(), True),
    StructField("lbw", IntegerType(), True),
    StructField("retired_hurt", IntegerType(), True),
    StructField("stumped", IntegerType(), True),
    StructField("caught_and_bowled", IntegerType(), True),
    StructField("hit_wicket", IntegerType(), True),
    StructField("obstructingfeild", IntegerType(), True),
    StructField("bowler_wicket", IntegerType(), True),
    StructField("match_date", StringType(), True),   # STRING intentionally (Bronze)
    StructField("season", IntegerType(), True),
    StructField("striker", IntegerType(), True),
    StructField("non_striker", IntegerType(), True),
    StructField("bowler", IntegerType(), True),
    StructField("player_out", IntegerType(), True),
    StructField("fielders", IntegerType(), True),
    StructField("striker_match_sk", IntegerType(), True),
    StructField("strikersk", IntegerType(), True),
    StructField("nonstriker_match_sk", IntegerType(), True),
    StructField("nonstriker_sk", IntegerType(), True),
    StructField("fielder_match_sk", IntegerType(), True),
    StructField("fielder_sk", IntegerType(), True),
    StructField("bowler_match_sk", IntegerType(), True),
    StructField("bowler_sk", IntegerType(), True),
    StructField("playerout_match_sk", IntegerType(), True),
    StructField("battingteam_sk", IntegerType(), True),
    StructField("bowlingteam_sk", IntegerType(), True),
    StructField("keeper_catch", IntegerType(), True),
    StructField("player_out_sk", IntegerType(), True),
    StructField("matchdatesk", StringType(), True)   # STRING intentionally (Bronze)

])


####Reading ball_by_ball.csv

In [0]:
ball_by_ball_df = (spark.read
                    .schema(ball_by_ball_schema)
                    .format('csv')
                    .option("header",True)
                    .load("abfss://data@iplanalysis.dfs.core.windows.net/IPL_Ball_By_Ball")
)

####Type casting some fileds in ball_by_ball_df to match actual data types (Silver)

In [0]:
ball_by_ball_df = ( ball_by_ball_df
    # --- DATE CASTING ---
    .withColumn("match_date", to_date(col("match_date"), "M/d/yyyy") )
    
    # --- DATE SURROGATE KEY ---
    .withColumn( "matchdatesk", date_format(col("match_date"), "yyyyMMdd").cast("int") )

    # --- BOOLEAN CASTING ---
    .withColumn("caught", col("caught").cast("boolean"))
    .withColumn("bowled", col("bowled").cast("boolean"))
    .withColumn("run_out", col("run_out").cast("boolean"))
    .withColumn("lbw", col("lbw").cast("boolean"))
    .withColumn("retired_hurt", col("retired_hurt").cast("boolean"))
    .withColumn("stumped", col("stumped").cast("boolean"))
    .withColumn("caught_and_bowled", col("caught_and_bowled").cast("boolean"))
    .withColumn("hit_wicket", col("hit_wicket").cast("boolean"))
    .withColumn("obstructingfeild", col("obstructingfeild").cast("boolean"))
    .withColumn("bowler_wicket", col("bowler_wicket").cast("boolean"))
    .withColumn("keeper_catch", col("keeper_catch").cast("boolean"))
)


In [0]:
display(ball_by_ball_df.limit(5))

##Match DF

####creating Schema for match file (Bronze)

In [0]:
match_schema = StructType([

    StructField("match_sk", IntegerType(), True),
    StructField("match_id", IntegerType(), True),
    StructField("team1", StringType(), True),
    StructField("team2", StringType(), True),
    StructField("match_date", StringType(), True),   # STRING intentionally (Bronze)
    StructField("season_year", IntegerType(), True),
    StructField("venue_name", StringType(), True),
    StructField("city_name", StringType(), True),
    StructField("country_name", StringType(), True),
    StructField("toss_winner", StringType(), True),
    StructField("match_winner", StringType(), True),
    StructField("toss_name", StringType(), True),
    StructField("win_type", StringType(), True),
    StructField("outcome_type", StringType(), True),
    StructField("manofmach", StringType(), True),
    StructField("win_margin", IntegerType(), True),
    StructField("country_id", IntegerType(), True)

])


####Reading match.csv

In [0]:
match_df = (spark.read
            .schema(match_schema)
            .format('csv')
            .option("header",True)
            .load("abfss://data@iplanalysis.dfs.core.windows.net/IPL_Match")
)


####Type casting some fileds in match_df to match actual data types (Silver)

In [0]:
match_df = (
    match_df
        .withColumn( "match_date", to_date(col("match_date"), "M/d/yyyy") )
)


In [0]:
display(match_df.limit(5))

##PLayer DF

####creating Schema for player file (Bronze)

In [0]:
player_schema = StructType([

    StructField("player_sk", IntegerType(), True),
    StructField("player_id", IntegerType(), True),
    StructField("player_name", StringType(), True),
    StructField("dob", StringType(), True),          # STRING intentionally (Bronze)
    StructField("batting_hand", StringType(), True),
    StructField("bowling_skill", StringType(), True),
    StructField("country_name", StringType(), True)

])


####Reading player.csv

In [0]:
player_df = (spark.read
            .schema(player_schema)
            .format('csv')
            .option("header",True)
            .load("abfss://data@iplanalysis.dfs.core.windows.net/IPL_Player")
)

####Type casting some fileds in player_df to match actual data types (Silver)

In [0]:
player_df = ( player_df
                .withColumn("dob", to_date(col("dob"), "M/d/yyyy") )
)


In [0]:
display(player_df.limit(5))

##Player Match DF

####creating Schema for player match file (Bronze)

In [0]:
player_match_schema = StructType([

    StructField("player_match_sk", IntegerType(), True),
    StructField("playermatch_key", LongType(), True),   # big composite key in the sample
    StructField("match_id", IntegerType(), True),
    StructField("player_id", IntegerType(), True),
    StructField("player_name", StringType(), True),
    StructField("dob", StringType(), True),             # keep STRING in Bronze
    StructField("batting_hand", StringType(), True),
    StructField("bowling_skill", StringType(), True),
    StructField("country_name", StringType(), True),
    StructField("role_desc", StringType(), True),
    StructField("player_team", StringType(), True),
    StructField("opposit_team", StringType(), True),
    StructField("season_year", IntegerType(), True),
    StructField("is_manofThematch", IntegerType(), True),   # 0/1 in Bronze
    StructField("age_as_on_match", IntegerType(), True),
    StructField("isplayers_team_won", IntegerType(), True), # 0/1 in Bronze
    StructField("batting_status", StringType(), True),
    StructField("bowling_status", StringType(), True),
    StructField("player_captain", StringType(), True),
    StructField("opposit_captain", StringType(), True),
    StructField("player_keeper", StringType(), True),
    StructField("opposit_keeper", StringType(), True)

])


####Reading player_match.csv

In [0]:
player_match_df = (spark.read
            .schema(player_match_schema)
            .format('csv')
            .option("header",True)
            .load("abfss://data@iplanalysis.dfs.core.windows.net/IPL_Player_Match")
)

####Type casting some fileds in player_match_df to match actual data types (Silver)

In [0]:
player_match_df = ( player_match_df
                    # cast DOB safely
                    .withColumn("dob", to_date(col("dob"), "M/d/yyyy"))

                    # boolean casts for 0/1 flags
                    .withColumn("is_manofThematch", col("is_manofThematch").cast("boolean"))
                    .withColumn("isplayers_team_won", col("isplayers_team_won").cast("boolean"))
                
                    # Drop dummy row
                    .filter(col("player_match_sk") != -1)
)


In [0]:
display(player_match_df.limit(5))

####Type casting some fileds in player_match_df to match actual data types (Silver)

##Team DF

####creating schema for team file 

In [0]:
team_schema = StructType([
    StructField("team_sk", IntegerType(), True),
    StructField("team_id", IntegerType(), True),
    StructField("team_name", StringType(), True)
])

####Reading team.csv

In [0]:
team_df = (spark.read
            .schema(team_schema)
            .format('csv')
            .option("header",True)
            .load("abfss://data@iplanalysis.dfs.core.windows.net/IPL_Team")
)

#TRANSFORMATIONS

In [0]:
# Filter to include only valid deliveries (excluding extras like wides and no balls for specific analyses)
ball_by_ball_df = ball_by_ball_df.filter((col("wides") == 0) & (col("noballs")==0))

# Aggregation: Calculate the total and average runs scored in each match and inning
total_and_avg_runs = ball_by_ball_df.groupBy("match_id", "innings_no").agg(
                                                                            sum("runs_scored").alias("total_runs"), 
                                                                            round(avg("runs_scored"),3).alias("avg_runs")
                                                                            )

In [0]:
# Window Function: Calculate running total of runs in each match for each over
windowSpec = Window.partitionBy("match_id","Innings_no").orderBy("Over_id","ball_id")

running_total = ball_by_ball_df.withColumn(
        "running_total",
        sum("runs_scored").over(windowSpec)
    )

In [0]:
ball_by_ball_df = ball_by_ball_df.withColumn(
    "high_impact",
    when((col("runs_scored") + col("extra_runs") > 6) | (col("bowler_wicket") == True), True).otherwise(False)
)

In [0]:
display(ball_by_ball_df.limit(5))

In [0]:
# Extracting year, month, and day from the match date for more detailed time-based analysis
match_df = match_df.withColumn("year", year("match_date"))
match_df = match_df.withColumn("month", month("match_date"))
match_df = match_df.withColumn("day", dayofmonth("match_date"))

# High margin win: categorizing win margins into 'high', 'medium', and 'low'
match_df = match_df.withColumn(
    "win_margin_category",
    when(col("win_margin") >= 100, "High")
    .when((col("win_margin") >= 50) & (col("win_margin") < 100), "Medium")
    .otherwise("Low")
)

# Analyze the impact of the toss: who wins the toss and the match
match_df = match_df.withColumn(
    "toss_match_winner",
    when(col("toss_winner") == col("match_winner"), "Yes").otherwise("No")
)

#match_date surrogate key
match_df = match_df.withColumn(
    "matchdatesk",
    date_format(col("match_date"), "yyyyMMdd").cast("int")
)

display(match_df.limit(5))


In [0]:
# Normalize and clean player names, batting_hand, bowling_skill
player_df = player_df.withColumn("player_name", lower(regexp_replace("player_name", r"[^a-zA-Z0-9]", "")))\
                      .withColumn("batting_hand",regexp_replace("batting_hand", r"[^a-zA-Z0-9\- ]", ""))\
                      .withColumn("bowling_skill",regexp_replace("bowling_skill", r"[^a-zA-Z0-9\- ]", ""))

In [0]:
display(player_df)

In [0]:
#replacing null values from batting_hand or bowling_skill with Unknown
player_df = (player_df
        .withColumn(
            "batting_hand",
            when(
                col("batting_hand").isNull() | (trim(col("batting_hand")) == "") | (col("batting_hand") == "N/A") | (col("batting_hand") == "NA") | (col("batting_hand") == "NULL"),
                "Unknown"
            ).otherwise(col("batting_hand"))
        )
        .withColumn(
            "bowling_skill",
            when(
                col("bowling_skill").isNull() | (trim(col("bowling_skill")) == "") | (col("bowling_skill") == "N/A") | (col("bowling_skill") == "NA") | (col("bowling_skill") == "NULL"),
                "Unknown"
            ).otherwise(col("bowling_skill"))
        )
    )
display(player_df)


In [0]:
#Adding new column depending on bating style of the player
player_df = player_df.withColumn(
    "batting_style",
    when(
        lower(col("batting_hand")).contains("left"),
        "Left-Handed")
    .when(
        lower(col("batting_hand")).contains("unknown"),
        "Unknown")
    .otherwise("Right-Handed")
)
display(player_df)

In [0]:
# Add a 'veteran_status' column based on player age
player_match_df = player_match_df.withColumn(
    "veteran_status",
    when(col("age_as_on_match") >= 35, "Veteran").otherwise("Non-Veteran")
)

# Dynamic column to calculate years since debut
player_match_df = player_match_df.withColumn(
    "years_since_debut",
    (year(current_date()) - col("season_year"))
)

#dob surrogate key
player_df = player_df.withColumn(
    "dob_sk",
    date_format(col("dob"), "yyyyMMdd").cast("int")
)


# Show the enriched DataFrame
display(player_match_df.limit(5))

#Writing back to storage conatiner marking close of Silver layer

In [0]:
ball_by_ball_df.write\
    .format("Parquet") \
    .mode("overwrite") \
    .option('path',"abfss://silver@iplanalysis.dfs.core.windows.net/IPL_Ball_By_Ball")\
    .save()


In [0]:
match_df.write\
    .format("Parquet") \
    .mode("overwrite") \
    .option('path',"abfss://silver@iplanalysis.dfs.core.windows.net/IPL_Match")\
    .save()

In [0]:
player_df.write\
    .format("Parquet") \
    .mode("overwrite") \
    .option('path',"abfss://silver@iplanalysis.dfs.core.windows.net/IPL_Player")\
    .save()

In [0]:
player_match_df.write\
    .format("Parquet") \
    .mode("overwrite") \
    .option('path',"abfss://silver@iplanalysis.dfs.core.windows.net/IPL_Player_Match")\
    .save()

In [0]:
team_df.write\
    .format("Parquet") \
    .mode("overwrite") \
    .option('path',"abfss://silver@iplanalysis.dfs.core.windows.net/IPL_Team")\
    .save()