## IPL Data Analysis

Dataset from https://data.world/raghu543/ipl-data-till-2017

This data set has the ball by ball data of all the Indian Premier League (IPL) matches till 2017 season.


**Note!!!**

This notebook can be executed without any modifications in Databricks or Azure Databricks compute clusters. To access ADLS Gen2 from a local Spark cluster, you need to configure Spark with the necessary libraries and credentials.

***`Setting PYSPARK_SUBMIT_ARGS - Per Session`***

**Install the Hadoop Azure Libraries:**

You can use pip to install the Hadoop Azure libraries directly from a Jupyter notebook.

*pip install hadoop-azure*
      
*pip install azure-datalake-store*

**Download the Required JAR Files:**

Hadoop Azure JAR files are required for Spark to communicate with ADLS Gen2. Download the following JAR files and place them in a directory accessible to Spark (e.g., /path/to/jars):

*hadoop-azure-3.2.1.jar*
    
*hadoop-azure-datalake-3.2.1.jar*
    
*azure-storage-7.0.0.jar*
    
*azure-data-lake-store-sdk-2.3.6.jar*

You can download these JAR files from Maven repositories or directly from the Apache Hadoop website.

**Configure PySpark to Use These JARs:**

When starting PySpark, specify the location of these JAR files. This can be done by setting the PYSPARK_SUBMIT_ARGS environment variable before starting the Jupyter notebook.

*export PYSPARK_SUBMIT_ARGS="--jars /path/to/jars/hadoop-azure-3.2.1.jar,/path/to/jars/hadoop-azure-datalake-3.2.1.jar,/path/to/jars/azure-storage-7.0.0.jar,/path/to/jars/azure-data-lake-store-sdk-2.3.6.jar pyspark-shell"*

**Start Jupyter Notebook with PySpark:**

Ensure that you start Jupyter Notebook in an environment where the above environment variable is set.

**Configure Spark Session in Jupyter Notebook:**

In your Jupyter notebook, configure the Spark session with the necessary Hadoop configurations for ADLS Gen2.

***`Setting PYSPARK_SUBMIT_ARGS - Globally`***

**Edit Shell Profile:**

Add the PYSPARK_SUBMIT_ARGS export statement to your shell profile configuration file. This can be ~/.bashrc, ~/.bash_profile, ~/.zshrc, or another appropriate configuration file depending on the shell you are using.

For example, if you are using bash: nano ~/.bashrc and for zsh: nano ~/.zshrc

**Add the Environment Variable:**

*export PYSPARK_SUBMIT_ARGS="--jars /path/to/jars/hadoop-azure-3.2.1.jar,/path/to/jars/hadoop-azure-datalake-3.2.1.jar,/path/to/jars/azure-storage-7.0.0.jar,/path/to/jars/azure-data-lake-store-sdk-2.3.6.jar pyspark-shell"*

**Apply the Changes:**

*source ~/.bashrc* or *source ~/.zshrc*

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark_context = SparkSession.builder.appName('IPL Data Analysis').getOrCreate()

In [0]:
spark_context

**Directly using the endpoint to access the data files**

In [0]:
application_id = 'c3bc59ef-a4ba-4a46-b547-66862ee4730d'
directory_id = '97443b3e-100a-4ba4-947f-5e78ae387174'

client_secret_id = '823b5971-5050-4585-a0b4-c48f4e84203a'
client_secret = 'Bgo8Q~cd4gs467JXlJqCPfoC8S5S4xlTm.fYCc5B'

oauth2_client_endpoint = "https://login.microsoftonline.com/{}/oauth2/token".format(directory_id)

spark.conf.set("fs.azure.account.auth.type.adslgen2fortrainings.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.adslgen2fortrainings.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.adslgen2fortrainings.dfs.core.windows.net", application_id)
spark.conf.set("fs.azure.account.oauth2.client.secret.adslgen2fortrainings.dfs.core.windows.net", client_secret)
spark.conf.set("fs.azure.account.oauth2.client.endpoint.adslgen2fortrainings.dfs.core.windows.net", oauth2_client_endpoint)


In [0]:
#Azure Storage Configuration
storage_account_name = 'adslgen2fortrainings'
storage_container_name = 'ipl-data-analysis'

input_data_files_path = "abfss://{}@{}.dfs.core.windows.net/input/{}"
output_data_files_path = "abfss://{}@{}.dfs.core.windows.net/output/{}"

In [0]:
# Create Spark Dataframe for 'ball_by_ball' dataset
path_ball_by_ball = input_data_files_path.format(storage_container_name, storage_account_name, 'Ball_By_Ball.csv')

schema_ball_by_ball = StructType([
    StructField("match_id", IntegerType(), False),
    StructField("over_id", IntegerType(), False),
    StructField("ball_id", IntegerType(), False),
    StructField("innings_no", IntegerType(), False),
    StructField("team_batting", StringType(), False),
    StructField("team_bowling", StringType(), False),
    StructField("striker_batting_position", IntegerType(), True),
    StructField("extra_type", StringType(), True),
    StructField("runs_scored", IntegerType(), True),
    StructField("extra_runs", IntegerType(), True),
    StructField("wides", IntegerType(), True),
    StructField("legbyes", IntegerType(), True),
    StructField("byes", IntegerType(), True),
    StructField("noballs", IntegerType(), True),
    StructField("penalty", IntegerType(), True),
    StructField("bowler_extras", IntegerType(), True),
    StructField("out_type", StringType(), True),
    StructField("caught", StringType(), True),
    StructField("bowled", StringType(), True),
    StructField("run_out", StringType(), True),
    StructField("lbw", StringType(), True),
    StructField("retired_hurt", StringType(), True),
    StructField("stumped", StringType(), True),
    StructField("caught_and_bowled", StringType(), True),
    StructField("hit_wicket", StringType(), True),
    StructField("obstructingfeild", StringType(), True),
    StructField("bowler_wicket", StringType(), False),
    StructField("match_date", DateType(), True),
    StructField("season", IntegerType(), True),
    StructField("striker", IntegerType(), True),
    StructField("non_striker", IntegerType(), True),
    StructField("bowler", IntegerType(), True),
    StructField("player_out", IntegerType(), True),
    StructField("fielders", IntegerType(), True),
    StructField("striker_match_sk", IntegerType(), True),
    StructField("strikersk", IntegerType(), True),
    StructField("nonstriker_match_sk", IntegerType(), True),
    StructField("nonstriker_sk", IntegerType(), True),
    StructField("fielder_match_sk", IntegerType(), True),
    StructField("fielder_sk", IntegerType(), True),
    StructField("bowler_match_sk", IntegerType(), True),
    StructField("bowler_sk", IntegerType(), True),
    StructField("playerout_match_sk", IntegerType(), True),
    StructField("battingteam_sk", IntegerType(), True),
    StructField("bowlingteam_sk", IntegerType(), True),
    StructField("keeper_catch", StringType(), True),
    StructField("player_out_sk", IntegerType(), True),
    StructField("matchdatesk", StringType(), True)
])

input_file_date_format = "M/d/yyyy"
input_file_options = {
    'delimiter': ',',
    'header': 'True',
    'inferSchema': 'False',
    'dateFormat': input_file_date_format
}
df_ball_by_ball = spark_context.read \
    .format('csv') \
    .options(**input_file_options) \
    .schema(schema_ball_by_ball) \
    .load(path_ball_by_ball)

df_ball_by_ball = \
    df_ball_by_ball.withColumns({
        'caught': df_ball_by_ball.caught.cast("boolean"),
        'bowled': df_ball_by_ball.bowled.cast("boolean"),
        'run_out': df_ball_by_ball.run_out.cast("boolean"),
        'lbw': df_ball_by_ball.lbw.cast("boolean"),
        'retired_hurt': df_ball_by_ball.retired_hurt.cast("boolean"),
        'stumped': df_ball_by_ball.stumped.cast("boolean"),
        'caught_and_bowled': df_ball_by_ball.caught_and_bowled.cast("boolean"),
        'hit_wicket': df_ball_by_ball.hit_wicket.cast("boolean"),
        'obstructingfeild': df_ball_by_ball.obstructingfeild.cast("boolean"),
        'bowler_wicket': df_ball_by_ball.bowler_wicket.cast("boolean"),
        'keeper_catch': df_ball_by_ball.keeper_catch.cast("boolean")
    })
df_ball_by_ball.show(5)
display(df_ball_by_ball)

In [0]:
# Create Spark Dataframe for 'match' dataset
path_match = input_data_files_path.format(storage_container_name, storage_account_name, 'Match.csv')

schema_match = StructType([
    StructField("match_sk", IntegerType(), True),
    StructField("match_id", IntegerType(), False),
    StructField("team1", StringType(), True),
    StructField("team2", StringType(), True),
    StructField("match_date", StringType(), True),
    StructField("season_year", IntegerType(), True),
    StructField("venue_name", StringType(), True),
    StructField("city_name", StringType(), True),
    StructField("country_name", StringType(), True),
    StructField("toss_winner", StringType(), True),
    StructField("match_winner", StringType(), True),
    StructField("toss_name", StringType(), True),
    StructField("win_type", StringType(), True),
    StructField("outcome_type", StringType(), True),
    StructField("manofmach", StringType(), True),
    StructField("win_margin", IntegerType(), True),
    StructField("country_id", IntegerType(), True)
])

input_file_options = {
    'delimiter': ',',
    'header': 'True',
    'inferSchema': 'False',
    'dateFormat': input_file_date_format
}
df_match = spark_context.read \
    .format('csv') \
    .options(**input_file_options) \
    .schema(schema_match) \
    .load(path_match)

df_match.show(5)

In [0]:
# Create Spark Dataframe for 'player' dataset
path_player = input_data_files_path.format(storage_container_name, storage_account_name, 'Player.csv')

schema_player = StructType([
    StructField("player_sk", IntegerType(), True),
    StructField("player_id", IntegerType(), False),
    StructField("player_name", StringType(), True),
    StructField("dob", StringType(), True),
    StructField("batting_hand", StringType(), True),
    StructField("bowling_skill", StringType(), True),
    StructField("country_name", StringType(), True)
])

input_file_date_format = 'M/d/yyyy'
input_file_options = {
    'delimiter': ',',
    'header': 'True',
    'inferSchema': 'False',
    'dateFormat': input_file_date_format
}
df_player = spark_context.read \
    .format('csv') \
    .options(**input_file_options) \
    .schema(schema_player) \
    .load(path_player)

df_player.show(5)

In [0]:
# Create Spark Dataframe for 'player_match' dataset
path_player_match = input_data_files_path.format(storage_container_name, storage_account_name, 'Player_match.csv')

schema_player_match = StructType([
    StructField("player_match_sk", IntegerType(), True),
    StructField("playermatch_key", DecimalType(), True),
    StructField("match_id", IntegerType(), False),
    StructField("player_id", IntegerType(), False),
    StructField("player_name", StringType(), True),
    StructField("dob", StringType(), True),
    StructField("batting_hand", StringType(), True),
    StructField("bowling_skill", StringType(), True),
    StructField("country_name", StringType(), True),
    StructField("role_desc", StringType(), True),
    StructField("player_team", StringType(), True),
    StructField("opposit_team", StringType(), True),
    StructField("season_year", IntegerType(), True),
    StructField("is_manofthematch", StringType(), True),
    StructField("age_as_on_match", IntegerType(), True),
    StructField("isplayers_team_won", StringType(), True),
    StructField("batting_status", StringType(), True),
    StructField("bowling_status", StringType(), True),
    StructField("player_captain", StringType(), True),
    StructField("opposit_captain", StringType(), True),
    StructField("player_keeper", StringType(), True),
    StructField("opposit_keeper", StringType(), True)
])

input_file_date_format = 'M/d/yyyy'
input_file_options = {
    'delimiter': ',',
    'header': 'True',
    'inferSchema': 'False',
    'dateFormat': input_file_date_format
}
df_player_match = spark_context.read \
    .format('csv') \
    .options(**input_file_options) \
    .schema(schema_player_match) \
    .load(path_player_match)

df_player_match = df_player_match.withColumns({
    'is_manofthematch': df_player_match.is_manofthematch.cast('boolean'),
    'isplayers_team_won': df_player_match.isplayers_team_won.cast('boolean')
})

df_player_match.show(5)

In [0]:
# Create Spark Dataframe for 'team' dataset
path_team = input_data_files_path.format(storage_container_name, storage_account_name, 'Player_match.csv')

schema_team = StructType([
    StructField("team_sk", IntegerType(), True),
    StructField("team_id", IntegerType(), False),
    StructField("team_name", StringType(), True)
])

df_team = spark_context.read \
    .format('csv') \
    .option('header', 'true') \
    .schema(schema_team) \
    .load(path_team)
df_team.show(5)

In [0]:
# Filter to include only valid deliveries (excluding wides and no balls for specific analysis)
df_ball_by_ball.filter((df_ball_by_ball.wides == 0) & (df_ball_by_ball.noballs == 0)).show(5)

In [0]:
# Calculate the total runs scored in each match and inning : Aggregation
df_ball_by_ball = \
    df_ball_by_ball.withColumn(
        'ball_total_runs',
        (df_ball_by_ball.runs_scored + df_ball_by_ball.extra_runs)
    )

df_innings_runs_scored = \
    df_ball_by_ball.groupBy(df_ball_by_ball.match_id, df_ball_by_ball.innings_no).agg(
        F.sum(df_ball_by_ball.ball_total_runs).alias('innings_total_runs')
    ).orderBy(df_ball_by_ball.match_id, df_ball_by_ball.innings_no)
df_innings_runs_scored.show(5)

In [0]:
# Calculate the average runs scored in each inning : Aggregation
df_innings_runs_scored.groupBy(df_innings_runs_scored.innings_no).agg(
    F.round(F.avg(df_innings_runs_scored.innings_total_runs)).alias('innings_avg_runs')
).orderBy(df_innings_runs_scored.innings_no).show(5)

In [0]:
# Find the maximum runs scored in a super over
super_over_innings_values = [3, 4]

df_innings_runs_scored \
    .filter(df_innings_runs_scored.innings_no.isin(super_over_innings_values)) \
    .select(F.max(df_innings_runs_scored.innings_total_runs).alias('super_over_max_runs')) \
    .show()

In [0]:
# Conditional Column: Flag for wicket (either a bowler_wicket or run_out or obstructing_field)
df_ball_by_ball = df_ball_by_ball.withColumn(
    "wicket",
    F.when(
        ((df_ball_by_ball.bowler_wicket == True) | (df_ball_by_ball.run_out == True) | (df_ball_by_ball.obstructingfeild == True)),
        True
    ).otherwise(False)
)
df_ball_by_ball.filter(df_ball_by_ball.wicket == True).show(5)

# Conditional Column: Flag for high impact balls (either a wicket or more than 6 runs including extras)
df_ball_by_ball = df_ball_by_ball.withColumn(
    "high_impact",
    F.when(
        (df_ball_by_ball.runs_scored + df_ball_by_ball.extra_runs > 6) | (df_ball_by_ball.wicket == True),
        True
    ).otherwise(False)
)
df_ball_by_ball.filter(df_ball_by_ball.high_impact == False).show(5)

In [0]:
# Calculate running total of runs in each match for each ball : Window Functions
windowSpec = Window.partitionBy(df_ball_by_ball.match_id, df_ball_by_ball.innings_no).orderBy(df_ball_by_ball.over_id, df_ball_by_ball.ball_id);

col_innings_runs = F.sum(df_ball_by_ball.runs_scored + df_ball_by_ball.extra_runs).over(windowSpec).alias('innings_runs')
col_innings_wickets = F.sum(F.when(df_ball_by_ball.wicket, 1).otherwise(0)).over(windowSpec).alias('innings_wickets')

df_running_innings_run = \
    df_ball_by_ball.select(
        df_ball_by_ball.match_id,
        df_ball_by_ball.innings_no,
        df_ball_by_ball.over_id,
        df_ball_by_ball.ball_id,
        df_ball_by_ball.runs_scored,
        df_ball_by_ball.extra_runs,
        col_innings_runs,
        col_innings_wickets
    )
df_running_innings_run.filter((df_running_innings_run.match_id == 335987) & (df_running_innings_run.innings_no == 1)).show(120)

In [0]:
# Extracting year, month, and day from the match date for more detailed time-based analysis
df_match = df_match.withColumns({
    'match_year': F.year(df_match.match_date),
    'match_month': F.month(df_match.match_date),
    'match_day': F.dayofmonth(df_match.match_date)
})
df_match.show(5)

In [0]:
# Win margin category: Categorizing win margins into 'high', 'medium', and 'low'
df_match = df_match.withColumn(
    'win_margin_category',
    F.when(
        ((df_match.win_type == 'runs') & (df_match.win_margin >= 100)) |
        ((df_match.win_type == 'wickets') & (df_match.win_margin >= 5)),
        'High'
    ).when(
        ((df_match.win_type == 'runs') & ((df_match.win_margin < 100) &  (df_match.win_margin >= 50))) |
        ((df_match.win_type == 'wickets') & ((df_match.win_margin < 5) &  (df_match.win_margin >= 7))),
        'Medium'
    ).otherwise(
        'Low'
    )
)
df_match.show(5)

In [0]:
# Analyze the impact of the toss: who wins the toss and the match
count_total_matches = df_match.count()
print('Total number of matches: ' + str(count_total_matches))

count_teams_winning_toss_and_match = df_match.filter(df_match.toss_winner == df_match.match_winner).count()
print('Total number of matches where team won both the toss and match: ' + str(count_teams_winning_toss_and_match))

impact_toss = (count_teams_winning_toss_and_match / count_total_matches) * 100
print('Teams winning the toss won the match ' + str(impact_toss) + ' out of 100 times.')

In [0]:
# Normalize and clean player names

# Regular expression to match characters other than alphabets and numbers
# \w matches word characters (alphanumeric and underscore)
pattern = r"[^a-zA-Z0-9 ]"

# Players with names having matching the pattern
df_player.filter(df_player.player_name.rlike(pattern)).show()

# Replacing characters that are not in the pattern
df_player = df_player.withColumn('player_name', F.lower(F.regexp_replace(df_player.player_name, '[^a-zA-Z0-9 ]', '')))

# Players with names having matching the pattern
df_player.filter(df_player.player_name.rlike(pattern)).show()

# Handle missing values in 'batting_hand' and 'bowling_skill' with a default 'unknown'
df_player_match.filter(df_player_match.batting_status.isNull() | df_player_match.bowling_status.isNull()).show(5)

df_player_match = \
    df_player_match.fillna(
        'unknown',
        subset=['batting_status', 'bowling_status']
    )

df_player_match.filter(df_player_match.batting_status.isNull() | df_player_match.bowling_status.isNull()).show(5)
df_player_match.filter(df_player_match.batting_status == 'unknown').show(5)

In [0]:
# Add a 'veteran' column based on player age
df_player_match = df_player_match.withColumn(
    'veteran',
    F.when(df_player_match.age_as_on_match >= 35, True).otherwise(False)
)
df_player_match.show(5)

In [0]:
# Create views for the Dataframes
df_ball_by_ball.createOrReplaceGlobalTempView('view_ball_by_ball')
df_match.createOrReplaceGlobalTempView('view_match')
df_player.createOrReplaceGlobalTempView('view_player')
df_player_match.createOrReplaceGlobalTempView('view_player_match')
df_team.createOrReplaceGlobalTempView('view_team')

In [0]:
# Total runs scored by batsman per season

df_season_batsman_runs = \
    df_ball_by_ball.join(
        df_match,
        df_ball_by_ball.match_id == df_match.match_id,
        'inner'
    ).join(
        df_player_match,
        (df_match.match_id == df_player_match.match_id) & (df_ball_by_ball.striker == df_player_match.player_id),
        'inner'
    ).join(
        df_player,
        df_player_match.player_id == df_player.player_id,
        'inner'
    ).groupBy(df_match.season_year, df_player.player_id).agg(
        F.sum(df_ball_by_ball.runs_scored).alias('season_runs')
    )

df_season_batsman_runs = \
    df_season_batsman_runs.alias('batsman').join(
        df_player.alias('player'),
        F.col('batsman.player_id') == F.col('player.player_id'),
        'inner'
    ).select(
        F.col('batsman.season_year'),
        F.col('player.player_id'),
        F.col('player.player_name'),
        F.col('batsman.season_runs')
    ).orderBy(
        F.col('batsman.season_year'), F.col('batsman.season_runs').desc()
    )

df_season_batsman_runs.show(10)

# Save total runs scored by batsman per season
path_season_batsman_runs = output_data_files_path.format(storage_container_name, storage_account_name, 'season_batsman_runs')
output_file_options = {
    'delimiter': ',',
    'header': 'True'
}
df_season_batsman_runs.write.format('csv').mode('overwrite').options(**output_file_options).save(path_season_batsman_runs)

In [0]:
# Orange cap holder per season (Top scoring batsman)
windowSpec = Window.partitionBy(df_season_batsman_runs.season_year).orderBy(df_season_batsman_runs.season_runs.desc())

df_season_batsman_runs.select(
    df_season_batsman_runs.season_year,
    df_season_batsman_runs.player_id,
    df_season_batsman_runs.player_name,
    df_season_batsman_runs.season_runs,
    F.rank().over(windowSpec).alias('rank')
).filter(F.col('rank') == 1).show()