#  IPL BIGDATA ANALYSIS :

### Step 1 : Create a Spark Session :

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName(" Ipl Data Analysis ").getOrCreate()

In [0]:
spark

###  Step 2 : Data Ingesting :

**Dataset-1 [ Ball_by_Ball ]**

In [0]:
ball_by_ball_df=spark.read.format("csv")\
    .option("header",True)\
    .option("inferSchema",True)\
    .load("s3://ipl-bigdata-analysis/Ball_By_Ball.csv")

In [0]:
from pyspark.sql.types import *

ball_by_ball_schema = StructType([
    StructField("match_id",                IntegerType(), True),
    StructField("over_id",                 IntegerType(), True),
    StructField("ball_id",                 IntegerType(), True),
    StructField("innings_no",              IntegerType(), True),
    StructField("team_batting",            StringType(),  True),
    StructField("team_bowling",            StringType(),  True),
    StructField("striker_batting_position",IntegerType(), True),
    StructField("extra_type",              StringType(),  True),
    StructField("runs_scored",             IntegerType(), True),
    StructField("extra_runs",              IntegerType(), True),
    StructField("wides",                   IntegerType(), True),
    StructField("legbyes",                 IntegerType(), True),
    StructField("byes",                    IntegerType(), True),
    StructField("noballs",                 IntegerType(), True),
    StructField("penalty",                 IntegerType(), True),
    StructField("bowler_extras",           IntegerType(), True),
    StructField("out_type",                StringType(),  True),
    StructField("caught",                  BooleanType(), True),
    StructField("bowled",                  BooleanType(), True),
    StructField("run_out",                 BooleanType(), True),
    StructField("lbw",                     BooleanType(), True),
    StructField("retired_hurt",            BooleanType(), True),
    StructField("stumped",                 BooleanType(), True),
    StructField("caught_and_bowled",       BooleanType(), True),
    StructField("hit_wicket",              BooleanType(), True),
    StructField("obstructingfeild",        BooleanType(), True),
    StructField("bowler_wicket",           BooleanType(), True),
    StructField("match_date",              DateType(),    True),
    StructField("season",                  IntegerType(), True),
    StructField("striker",                 IntegerType(), True),
    StructField("non_striker",             IntegerType(), True),
    StructField("bowler",                  IntegerType(), True),
    StructField("player_out",              IntegerType(), True),
    StructField("fielders",                IntegerType(), True),
    StructField("striker_match_sk",        IntegerType(), True),
    StructField("strikersk",               IntegerType(), True),
    StructField("nonstriker_match_sk",     IntegerType(), True),
    StructField("nonstriker_sk",           IntegerType(), True),
    StructField("fielder_match_sk",        IntegerType(), True),
    StructField("fielder_sk",              IntegerType(), True),
    StructField("bowler_match_sk",         IntegerType(), True),
    StructField("bowler_sk",               IntegerType(), True),
    StructField("playerout_match_sk",      IntegerType(), True),
    StructField("battingteam_sk",          IntegerType(), True),
    StructField("bowlingteam_sk",          IntegerType(), True),
    StructField("keeper_catch",            BooleanType(), True),
    StructField("player_out_sk",           IntegerType(), True),
    StructField("matchdatesk",             DateType(),    True)
])


In [0]:
ball_by_ball_df=spark.read.schema(ball_by_ball_schema).format("csv")\
    .option("header",True)\
    .load("s3://ipl-bigdata-analysis/Ball_By_Ball.csv")

In [0]:
ball_by_ball_df.display()

**Dataset-2 [ Match ]**

In [0]:
match_df=spark.read.format("csv")\
    .option("header",True)\
    .option("inferSchema",True)\
    .load("s3://ipl-bigdata-analysis/Match.csv")

In [0]:
match_schema = StructType([
    StructField("match_sk",        IntegerType(), True),
    StructField("match_id",        IntegerType(), True),
    StructField("team1",           StringType(),  True),
    StructField("team2",           StringType(),  True),
    StructField("match_date",      DateType(),    True),
    StructField("season_year",     IntegerType(), True),   
    StructField("venue_name",      StringType(),  True),
    StructField("city_name",       StringType(),  True),
    StructField("country_name",    StringType(),  True),
    StructField("toss_winner",     StringType(),  True),
    StructField("match_winner",    StringType(),  True),
    StructField("toss_name",       StringType(),  True),
    StructField("win_type",        StringType(),  True),
    StructField("outcome_type",    StringType(),  True),
    StructField("manofmach",       StringType(),  True),
    StructField("win_margin",      IntegerType(), True),
    StructField("country_id",      IntegerType(), True)
])


In [0]:
match_df=spark.read.schema(match_schema).format("csv")\
    .option("header",True)\
    .load("s3://ipl-bigdata-analysis/Match.csv")

In [0]:
match_df.display()

**Dataset-3 [ player ]**

In [0]:
player_df=spark.read.format("csv")\
    .option("header",True)\
    .option("inferSchema",True)\
    .load("s3://ipl-bigdata-analysis/Player.csv")

In [0]:
player_schema = StructType([
    StructField("player_sk",      IntegerType(), True),
    StructField("player_id",      IntegerType(), True),
    StructField("player_name",    StringType(),  True),
    StructField("dob",            DateType(),    True),
    StructField("batting_hand",   StringType(),  True),
    StructField("bowling_skill",  StringType(),  True),
    StructField("country_name",   StringType(),  True)
])


In [0]:
player_df=spark.read.schema(player_schema).format("csv")\
    .option("header",True)\
    .load("s3://ipl-bigdata-analysis/Player.csv")

In [0]:
player_df.display()

**Dataset-4 [ Player_match ]**

In [0]:
player_match_df=spark.read.format("csv")\
    .option("header",True)\
    .option("inferSchema",True)\
    .load("s3://ipl-bigdata-analysis/Player_match.csv")

In [0]:
player_match_schema = StructType([
    StructField("player_match_sk",     IntegerType(), True),
    StructField("playermatch_key",     DecimalType(38,10), True),  
    StructField("match_id",            IntegerType(), True),
    StructField("player_id",           IntegerType(), True),
    StructField("player_name",         StringType(),  True),
    StructField("dob",                 DateType(),    True),
    StructField("batting_hand",        StringType(),  True),
    StructField("bowling_skill",       StringType(),  True),
    StructField("country_name",        StringType(),  True),
    StructField("role_desc",           StringType(),  True),
    StructField("player_team",         StringType(),  True),
    StructField("opposit_team",        StringType(),  True),
    StructField("season_year",         IntegerType(), True),  
    StructField("is_manofthematch",    BooleanType(), True),
    StructField("age_as_on_match",     IntegerType(), True),
    StructField("isplayers_team_won",  BooleanType(), True),
    StructField("batting_status",      StringType(),  True),
    StructField("bowling_status",      StringType(),  True),
    StructField("player_captain",      StringType(),  True),
    StructField("opposit_captain",     StringType(),  True),
    StructField("player_keeper",       StringType(),  True),
    StructField("opposit_keeper",      StringType(),  True)
])


In [0]:
player_match_df=spark.read.schema(player_match_schema).format("csv")\
    .option("header",True)\
    .load("s3://ipl-bigdata-analysis/Player_match.csv")

In [0]:
player_match_df.display()

**Dataset-5 [ Team ]**

In [0]:
team_df=spark.read.format("csv")\
    .option("header",True)\
    .option("inferSchema",True)\
    .load("s3://ipl-bigdata-analysis/Team.csv")

In [0]:
team_schema = StructType([
    StructField("team_sk",    IntegerType(), True),
    StructField("team_id",    IntegerType(), True),
    StructField("team_name",  StringType(),  True)
])


In [0]:
team_df=spark.read.schema(team_schema).format("csv")\
    .option("header",True)\
    .load("s3://ipl-bigdata-analysis/Team.csv")

In [0]:
team_df.display()

### Preprocessing on Ball_by_ball Dataset :

In [0]:
ball_by_ball_df.display()

In [0]:
from pyspark.sql.functions import *

ball_by_ball_df = ball_by_ball_df.withColumn(
    "standard_extra_type",
    when(col("Wides") == 1, "wide_ball")
    .when(col("Legbyes") == 1, "legbye_ball")
    .when(col("Byes") == 1, "bye_ball")
    .when(col("Noballs") == 1, "noball_ball")
    .when(col("Penalty") == 1, "penalty_ball")
    .when(col("Bowler_Extras") == 1, "bowler_extra_ball")
    .otherwise(None)
)


In [0]:
ball_by_ball_df=ball_by_ball_df.drop("Wides", "Legbyes", "Byes", "Noballs", "Penalty", "Bowler_Extras","Caught", "Bowled", "Run_out", "LBW", "Retired_hurt", "Stumped", "caught_and_bowled", "hit_wicket", "ObstructingFeild", "Player_Out","fielders")


In [0]:
total_and_avg_runs=ball_by_ball_df.groupBy("match_id","innings_no").agg(
    sum("runs_scored").alias("Total_Runs"),
    avg("runs_scored").alias("Avg_Runs")
)
total_and_avg_runs.display()

In [0]:

total_runs_per_season = (
    ball_by_ball_df
    .withColumn(
        "total_run",
        col("runs_scored").cast(IntegerType()) + col("extra_runs").cast(IntegerType())
    )
    .groupBy("season")
    .agg(sum("total_run").alias("Total_Runs"))
    .orderBy("season")
)

display(total_runs_per_season)

In [0]:
from pyspark.sql.window import *
windowSpec = Window.partitionBy("match_id","innings_no").orderBy("over_id")
ball_by_ball_df=ball_by_ball_df.withColumn(
    "Running_total_runs",
    sum("runs_scored").over(windowSpec)
)


In [0]:
ball_by_ball_df.display()

### Preprocessing on Match Dataset :

In [0]:
match_df.display()

In [0]:
# Extracting year, month, and day from the match date for more detailed time-based analysis
match_df = match_df.withColumn("year", year("match_date"))
match_df = match_df.withColumn("month", month("match_date"))
match_df = match_df.withColumn("day", dayofmonth("match_date"))

In [0]:
# Analyze the impact of the toss: who wins the toss and the match
match_df = match_df.withColumn(
    "toss_match_winner",
    when(col("toss_winner") == col("match_winner"), "Yes").otherwise("No")
)

In [0]:
# High margin win: categorizing win margins into 'High', 'Medium', and 'Low'
match_df = match_df.withColumn(
    "win_margin_category",
    when(col("win_margin") >= 100, "High")
    .when((col("win_margin") >= 50) & (col("win_margin") < 100), "Medium")
    .otherwise("Low")
)

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Count wins per team per year
team_wins_per_year = match_df.groupBy("Season_Year", "match_winner") \
    .agg(F.count("*").alias("wins"))

team_wins_per_year.display()

In [0]:
# Get the team with the most wins per year
windowSpec = Window.partitionBy("Season_Year").orderBy(F.desc("wins"))

most_win_team_per_year = team_wins_per_year.withColumn("rank", F.rank().over(windowSpec)) \
    .filter(F.col("rank") == 1) \
    .select("Season_Year", "match_winner", "wins")

most_win_team_per_year.show()

In [0]:
match_df.display()

### Preprocessing of Player Dataset :

In [0]:
player_df.display()

In [0]:
# Normalize and clean player names
player_df = player_df.withColumn(
    "player_name",
    lower(regexp_replace("player_name", "[^a-zA-Z0-9 ]", ""))
)

In [0]:
# Handle missing values in 'batting_hand' and 'bowling_skill' with a default 'unknown'
player_df = player_df.na.fill({"batting_hand": "unknown", "bowling_skill": "unknown"})

In [0]:
# Categorizing players based on batting hand
player_df = player_df.withColumn(
    "batting_style",
    when(col("batting_hand").contains("left"), "Left-Handed").otherwise("Right-Handed")
)


In [0]:
player_df.display()

**Preprocessing of Player_match dataset :**

In [0]:
player_match_df.display()

In [0]:
# Add a 'veteran_status' column based on player age
player_match_df = player_match_df.withColumn(
    "veteran_status",
    when(col("age_as_on_match") >= 35, "Veteran").otherwise("Non-Veteran")
)

In [0]:
# Dynamic column to calculate years since debut
player_match_df = player_match_df.withColumn(
    "years_since_debut",
    (year(current_date()) - col("season_year"))
)

In [0]:
#Player Win Contribution
player_match_df = player_match_df.withColumn(
    "impact_on_win",
    when((col("isplayers_team_won") == True) & (col("is_manofthematch") == True), "High")
    .when(col("isplayers_team_won") == True, "Medium")
    .otherwise("Low")
)


In [0]:
#Convert DOB to Age
player_match_df = player_match_df.withColumn(
    "current_age",
    year(current_date()) - year(col("dob"))
)#

In [0]:
#Home or Away Match
player_match_df = player_match_df.withColumn(
    "match_location_type",
    when(col("country_name") == col("player_team"), "Home").otherwise("Away")
)

In [0]:
#Total Matches Played by Each Player
from pyspark.sql.functions import count

matches_per_player = player_match_df.groupBy("player_id", "player_name") \
    .agg(count("*").alias("matches_played"))
matches_per_player.display()

In [0]:
#Most Wins by Team
wins_by_team = player_match_df.groupBy("player_team") \
    .agg(sum(col("isplayers_team_won").cast("int")).alias("total_wins"))
wins_by_team.display()

In [0]:
#Man of the Match Count by Player
mom_count = player_match_df.groupBy("player_id", "player_name") \
    .agg(sum(col("is_manofthematch").cast("int")).alias("mom_wins"))
mom_count.display()

In [0]:
#Win Percentage by Team
win_percentage = player_match_df.groupBy("player_team") \
    .agg(
        count("*").alias("total_matches"),
        sum(col("isplayers_team_won").cast("int")).alias("wins")
    ) \
    .withColumn(
        "win_percentage",
        (col("wins") / col("total_matches")) * 100
    )
win_percentage.display()

In [0]:
#Matches Played vs Won (Team Performance Summary)
team_summary = player_match_df.groupBy("player_team") \
    .agg(
        count("match_id").alias("matches_played"),
        sum(col("isplayers_team_won").cast("int")).alias("wins"),
        (sum(col("isplayers_team_won").cast("int")) / count("match_id") * 100)
        .alias("win_rate")
    )
team_summary.display()

In [0]:
player_match_df.display()

**Preprocessing of Team Dataset :**

In [0]:
team_df.display()

### Convert datasets into SQL table for further analysis :

In [0]:
ball_by_ball_df.createOrReplaceTempView("ball_by_ball")
match_df.createOrReplaceTempView("match")
player_match_df.createOrReplaceTempView("player_match")
player_df.createOrReplaceTempView("player")
team_df.createOrReplaceTempView("team")


In [0]:
top_scoring_batsman_2008=spark.sql("""
SELECT
p.player_name,
m.season_year,
SUM(CAST(b.runs_scored AS INT)) AS Total_Runs
FROM ball_by_ball AS b
JOIN match AS m ON m.match_id=b.match_id
JOIN player_match AS pm ON m.match_id=pm.match_id AND b.striker=pm.player_id
JOIN player AS p ON p.player_id=pm.player_id
GROUP BY p.player_name,m.season_year
ORDER BY m.season_year,Total_Runs DESC
""")

In [0]:
top_scoring_batsman_2008.show(5)

In [0]:
top_scoring_batsman_per_season = spark.sql("""
WITH season_runs AS (
    SELECT
        p.player_name,
        m.season_year,
        SUM(CAST(b.runs_scored AS INT)) AS total_runs
    FROM ball_by_ball b
    JOIN match m ON m.match_id = b.match_id
    JOIN player_match pm ON m.match_id = pm.match_id AND b.striker = pm.player_id
    JOIN player p ON p.player_id = pm.player_id
    GROUP BY p.player_name, m.season_year
)
SELECT player_name, season_year, total_runs
FROM (
    SELECT 
        *,
        ROW_NUMBER() OVER (
            PARTITION BY season_year
            ORDER BY total_runs DESC
        ) AS rn
    FROM season_runs
)
WHERE rn = 1
ORDER BY season_year;
""")


In [0]:
top_scoring_batsman_per_season.show()

In [0]:
economical_bowlers_powerplay = spark.sql("""
SELECT
    p.player_name,
    AVG(b.runs_scored) AS avg_runs_per_ball,
    COUNT(b.bowler_wicket) AS total_wickets
FROM ball_by_ball b
JOIN player_match pm ON b.match_id = pm.match_id AND b.bowler = pm.player_id
JOIN player p ON pm.player_id = p.player_id
WHERE b.over_id <= 6
GROUP BY p.player_name
HAVING COUNT(*) > 120
ORDER BY avg_runs_per_ball, total_wickets DESC
""")

economical_bowlers_powerplay.show()


In [0]:
toss_impact_individual_matches = spark.sql("""  
SELECT m.match_id, m.toss_winner, m.toss_name, m.match_winner,  
CASE WHEN m.toss_winner = m.match_winner THEN 'Won' ELSE 'Lost' END AS match_outcome  
FROM match m  
WHERE m.toss_name IS NOT NULL  
ORDER BY m.match_id  
""")  
toss_impact_individual_matches.show()  

In [0]:
average_runs_in_wins = spark.sql("""
SELECT 
    p.player_name, 
    AVG(CAST(b.runs_scored AS INT)) AS avg_runs_in_wins, 
    COUNT(*) AS innings_played
FROM ball_by_ball b
JOIN player_match pm ON b.match_id = pm.match_id AND b.striker = pm.player_id
JOIN player p ON pm.player_id = p.player_id
JOIN match m ON pm.match_id = m.match_id
WHERE m.match_winner = pm.player_team   
GROUP BY p.player_name
ORDER BY avg_runs_in_wins DESC
""")

average_runs_in_wins.show()


In [0]:
import matplotlib.pyplot as plt
# Convert Spark DF to Pandas DF
economical_bowlers_pd = economical_bowlers_powerplay.toPandas()

# Visualizing using Matplotlib
plt.figure(figsize=(10, 5))

# Top 10 most economical bowlers â†’ lowest avg_runs_per_ball
top_economical_bowlers = economical_bowlers_pd.nsmallest(10, 'avg_runs_per_ball')

# Bar plot
plt.bar(top_economical_bowlers['player_name'],
        top_economical_bowlers['avg_runs_per_ball'])

plt.xlabel('Bowler Name')
plt.ylabel('Average Runs per Ball')
plt.title('Most Economical Bowlers in Powerplay Overs (Top 10)')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()


In [0]:
import seaborn as sns
toss_impact_pd = toss_impact_individual_matches.toPandas()

# Creating a countplot to show win/loss after winning toss
plt.figure(figsize=(10, 6))

sns.countplot(x='toss_winner',
              hue='match_outcome',
              data=toss_impact_pd)

plt.title('Impact of Winning Toss on Match Outcomes')
plt.xlabel('Toss Winner')
plt.ylabel('Number of Matches')
plt.legend(title='Match Outcome')
plt.xticks(rotation=90)
plt.tight_layout()
plt.show()


In [0]:
average_runs_pd = average_runs_in_wins.toPandas()

# Using seaborn to plot average runs in winning matches
plt.figure(figsize=(12, 8))

top_scorers = average_runs_pd.nlargest(10, 'avg_runs_in_wins')

sns.barplot(x='player_name',
            y='avg_runs_in_wins',
            data=top_scorers)

plt.title('Average Runs Scored by Batsmen in Winning Matches (Top 10 Scorers)')
plt.xlabel('Player Name')
plt.ylabel('Average Runs in Wins')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()


In [0]:
scores_by_venue = spark.sql("""
SELECT 
    venue_name, 
    AVG(total_runs) AS average_score, 
    MAX(total_runs) AS highest_score
FROM (
    SELECT 
        b.match_id, 
        m.venue_name, 
        SUM(b.runs_scored) AS total_runs
    FROM ball_by_ball b
    JOIN match m 
        ON b.match_id = m.match_id
    GROUP BY b.match_id, m.venue_name
)
GROUP BY venue_name
ORDER BY average_score DESC
""")


In [0]:
# Convert to Pandas DataFrame  
scores_by_venue_pd = scores_by_venue.toPandas()  
# Plot  
plt.figure(figsize=(14, 8))  
sns.barplot(x='average_score', y='venue_name', data=scores_by_venue_pd)  
plt.title('Distribution of Scores by Venue')  
plt.xlabel('Average Score')  
plt.ylabel('Venue')  
plt.show()  

In [0]:
# Execute SQL Query  
dismissal_types = spark.sql("""  
SELECT out_type, COUNT(*) AS frequency  
FROM ball_by_ball  
WHERE out_type IS NOT NULL  
GROUP BY out_type  
ORDER BY frequency DESC  
""")  
 

In [0]:
# Convert to Pandas DataFrame
dismissal_types_pd = dismissal_types.toPandas()

# Remove "Not Applicable"
dismissal_filtered = dismissal_types_pd[dismissal_types_pd['out_type'] != "Not Applicable"]

# Pie Chart
plt.figure(figsize=(5, 8))
plt.pie(
    dismissal_filtered['frequency'],
    labels=dismissal_filtered['out_type'],
    autopct='%1.1f%%',
    startangle=140
)
plt.title('Distribution of Dismissal Types (Excluding Not Applicable)')
plt.tight_layout()
plt.show()


In [0]:
# SQL Query
team_toss_win_performance = spark.sql("""
SELECT 
    team1,
    COUNT(*) AS matches_played,
    SUM(CASE WHEN toss_winner = match_winner THEN 1 ELSE 0 END) AS wins_after_toss
FROM match
WHERE toss_winner = team1
GROUP BY team1
ORDER BY wins_after_toss DESC
""")

team_toss_win_performance.show()


In [0]:
team_toss_win_pd = team_toss_win_performance.toPandas()

plt.figure(figsize=(12, 8))
sns.barplot(x='wins_after_toss', y='team1', data=team_toss_win_pd, palette='viridis')

plt.title('Team Performance After Winning Toss')
plt.xlabel('Wins After Winning Toss')
plt.ylabel('Team')
plt.tight_layout()
plt.show()



###---------------------------------------------------- THANK YOU --------------------------------------------