In [0]:
from pyspark.sql import SparkSession

In [0]:
from pyspark.sql.types import StructField, StructType, IntegerType, BooleanType, DateType, DoubleType, StringType, DecimalType
from pyspark.sql.functions import *
from pyspark.sql.window import *
from pyspark.sql.functions import col, when

In [0]:
spark = SparkSession.builder.appName("IPL Data Analysis").getOrCreate()

In [0]:
# Define the schema using StructType and StructField
ball_by_ball_schema = StructType([
    StructField("match_id", IntegerType(), nullable=True),
    StructField("over_id", IntegerType(), nullable=True),
    StructField("ball_id", IntegerType(), nullable=True),
    StructField("innings_no", IntegerType(), nullable=True),
    StructField("team_batting", StringType(), nullable=True),
    StructField("team_bowling", StringType(), nullable=True),
    StructField("striker_batting_position", IntegerType(), nullable=True),
    StructField("extra_type", StringType(), nullable=True),
    StructField("runs_scored", IntegerType(), nullable=True),
    StructField("extra_runs", IntegerType(), nullable=True),
    StructField("wides", IntegerType(), nullable=True),
    StructField("legbyes", IntegerType(), nullable=True),
    StructField("byes", IntegerType(), nullable=True),
    StructField("noballs", IntegerType(), nullable=True),
    StructField("penalty", IntegerType(), nullable=True),
    StructField("bowler_extras", IntegerType(), nullable=True),
    StructField("out_type", StringType(), nullable=True),
    StructField("caught", BooleanType(), nullable=True),
    StructField("bowled", BooleanType(), nullable=True),
    StructField("run_out", BooleanType(), nullable=True),
    StructField("lbw", BooleanType(), nullable=True),
    StructField("retired_hurt", BooleanType(), nullable=True),
    StructField("stumped", BooleanType(), nullable=True),
    StructField("caught_and_bowled", BooleanType(), nullable=True),
    StructField("hit_wicket", BooleanType(), nullable=True),
    StructField("obstructingfeild", BooleanType(), nullable=True),
    StructField("bowler_wicket", BooleanType(), nullable=True),
    StructField("match_date", DateType(), nullable=True),
    StructField("season", IntegerType(), nullable=True),
    StructField("striker", IntegerType(), nullable=True),
    StructField("non_striker", IntegerType(), nullable=True),
    StructField("bowler", IntegerType(), nullable=True),
    StructField("player_out", IntegerType(), nullable=True),
    StructField("fielders", IntegerType(), nullable=True),
    StructField("striker_match_sk", IntegerType(), nullable=True),
    StructField("strikersk", IntegerType(), nullable=True),
    StructField("nonstriker_match_sk", IntegerType(), nullable=True),
    StructField("nonstriker_sk", IntegerType(), nullable=True),
    StructField("fielder_match_sk", IntegerType(), nullable=True),
    StructField("fielder_sk", IntegerType(), nullable=True),
    StructField("bowler_match_sk", IntegerType(), nullable=True),
    StructField("bowler_sk", IntegerType(), nullable=True),
    StructField("playerout_match_sk", IntegerType(), nullable=True),
    StructField("battingteam_sk", IntegerType(), nullable=True),
    StructField("bowlingteam_sk", IntegerType(), nullable=True),
    StructField("keeper_catch", BooleanType(), nullable=True),
    StructField("player_out_sk", IntegerType(), nullable=True),
    StructField("matchdatesk", DateType(), nullable=True)
])


In [0]:
ball_by_ball_df = spark.read.schema(ball_by_ball_schema).format("csv").option("header", "true").load("s3://ipl-data-analysis/Ball_By_Ball.csv")

In [0]:
# Define the schema using StructType and StructField
match_schema = StructType([
    StructField("match_sk", IntegerType(), nullable=True),
    StructField("match_id", IntegerType(), nullable=True),
    StructField("team1", StringType(), nullable=True),
    StructField("team2", StringType(), nullable=True),
    StructField("match_date", DateType(), nullable=True),
    StructField("season_year", IntegerType(), nullable=True),
    StructField("venue_name", StringType(), nullable=True),
    StructField("city_name", StringType(), nullable=True),
    StructField("country_name", StringType(), nullable=True),
    StructField("toss_winner", StringType(), nullable=True),
    StructField("match_winner", StringType(), nullable=True),
    StructField("toss_name", StringType(), nullable=True),
    StructField("win_type", StringType(), nullable=True),
    StructField("outcome_type", StringType(), nullable=True),
    StructField("manofmach", StringType(), nullable=True),
    StructField("win_margin", IntegerType(), nullable=True),
    StructField("country_id", IntegerType(), nullable=True)
])


In [0]:
match_df = spark.read.schema(match_schema).format("csv").option("header", "true").load("s3://ipl-data-analysis/Match.csv")

In [0]:
# Define the schema using StructType and StructField
player_schema = StructType([
    StructField("player_sk", IntegerType(), nullable=True),
    StructField("player_id", IntegerType(), nullable=True),
    StructField("player_name", StringType(), nullable=True),
    StructField("dob", DateType(), nullable=True),
    StructField("batting_hand", StringType(), nullable=True),
    StructField("bowling_skill", StringType(), nullable=True),
    StructField("country_name", StringType(), nullable=True)
])


In [0]:
players_df = spark.read.schema(player_schema).format("csv").option("header", "true").load("s3://ipl-data-analysis/Player.csv")

In [0]:
# Define the schema using StructType and StructField
player_match_schema = StructType([
    StructField("player_match_sk", IntegerType(), nullable=True),
    StructField("playermatch_key", DecimalType(10, 0), nullable=True),
    StructField("match_id", IntegerType(), nullable=True),
    StructField("player_id", IntegerType(), nullable=True),
    StructField("player_name", StringType(), nullable=True),
    StructField("dob", DateType(), nullable=True),
    StructField("batting_hand", StringType(), nullable=True),
    StructField("bowling_skill", StringType(), nullable=True),
    StructField("country_name", StringType(), nullable=True),
    StructField("role_desc", StringType(), nullable=True),
    StructField("player_team", StringType(), nullable=True),
    StructField("opposit_team", StringType(), nullable=True),
    StructField("season_year", IntegerType(), nullable=True),
    StructField("is_manofthematch", BooleanType(), nullable=True),
    StructField("age_as_on_match", IntegerType(), nullable=True),
    StructField("isplayers_team_won", BooleanType(), nullable=True),
    StructField("batting_status", StringType(), nullable=True),
    StructField("bowling_status", StringType(), nullable=True),
    StructField("player_captain", StringType(), nullable=True),
    StructField("opposit_captain", StringType(), nullable=True),
    StructField("player_keeper", StringType(), nullable=True),
    StructField("opposit_keeper", StringType(), nullable=True)
])


In [0]:
player_match_df = spark.read.schema(player_match_schema).format("csv").option("header", "true").load("s3://ipl-data-analysis/Player_match.csv")

In [0]:
# Define the schema using StructType and StructField
team_schema = StructType([
    StructField("team_sk", IntegerType(), nullable=True),
    StructField("team_id", IntegerType(), nullable=True),
    StructField("team_name", StringType(), nullable=True)
])


In [0]:
team_df = spark.read.schema(team_schema).format("csv").option("header", "true").load("s3://ipl-data-analysis/Team.csv")

In [0]:
# filtering only legal deiliveries(removing wides and no balls)
ball_by_ball_df = ball_by_ball_df.filter((col("wides")== 0) & (col("noballs")== 0))

#cal total and avg runs scored in each match and inning
total_and_avg_runs_scored = ball_by_ball_df.groupBy("match_id", "innings_no").agg(
    sum("runs_scored").alias("Total_runs"),
    avg("runs_scored").alias("Avg_runs")
)

In [0]:
#Window Function: Calculate running total of runs in each match for each over.
window_spec = Window.partitionBy("match_id", "innings_no").orderBy("over_id")

ball_by_ball_df = ball_by_ball_df.withColumn(
    "running_total_runs",
    sum("runs_scored").over(window_spec)
)

In [0]:
# Conditional Column flag for high impact ball
ball_by_ball_df = ball_by_ball_df.withColumn(
    "high_impact",
    when(((col("runs_scored") + col("extra_runs")) > 6) | (col("bowler_wicket") == True), True).otherwise(False)
)


In [0]:
#Extracting year, month and year for time-based analysis
match_df = match_df.withColumn("year", year("match_date"))
match_df = match_df.withColumn("month", month("match_date"))
match_df = match_df.withColumn("day", dayofmonth("match_date"))

#High Margin Win
match_df = match_df.withColumn(
    "win_margin_category",
    when(col("win_margin") >= 100, "High")
    .when((col("win_margin") >= 50) & (col("win_margin") < 100), "Medium")
    .otherwise("Less")
)

#Analyse the impact of the toss
match_df = match_df.withColumn(
    "toss_match_winner",
    when(col("toss_winner") == col("match_winner"), "Yes").otherwise("No")
)


In [0]:
#Normalize and clean player names
players_df = players_df.withColumn(
    "player_name",
    lower(regexp_replace("player_name", "[^a-zA-Z ]", ""))
)

#Handle Missing values
players_df = players_df.na.replace("N/A", None)

players_df = players_df.na.fill({
    "batting_hand" : "Unknown",
    "bowling_skill" : "Unknown"
})

#Categorizing players based on the batting hand
players_df = players_df.withColumn(
    "batting_style",
    when(col("batting_hand").contains("Left"), "Left-Handed Batsman").otherwise("Right-Handed Batsman")
)



In [0]:
#Add a veteran status based on the age
player_match_df = player_match_df.withColumn(
    "veteran_status",
    when(col("age_as_on_match") >= 35, "Yes").otherwise("No")
)

#calulate year since debut
player_match_df = player_match_df.withColumn(
    "years_since_Debut",
    (year(current_date()) - col("season_year"))
)



In [0]:
ball_by_ball_df.createOrReplaceTempView("ball_by_ball")
match_df.createOrReplaceTempView("match")
players_df.createOrReplaceTempView("player")
player_match_df.createOrReplaceTempView("player_match")
team_df.createOrReplaceTempView("team")

In [0]:
top_scoring_batsmen_per_season = spark.sql("""
SELECT 
p.player_name,
m.season_year,
SUM(b.runs_scored) AS total_runs 
FROM ball_by_ball b
JOIN match m ON b.match_id = m.match_id   
JOIN player_match pm ON m.match_id = pm.match_id AND b.striker = pm.player_id     
JOIN player p ON p.player_id = pm.player_id
GROUP BY p.player_name, m.season_year
ORDER BY m.season_year, total_runs DESC
""")

In [0]:
economical_bowlers_powerplay = spark.sql("""
SELECT 
p.player_name, 
AVG(b.runs_scored) AS avg_runs_per_ball, 
COUNT(b.bowler_wicket) AS total_wickets
FROM ball_by_ball b
JOIN player_match pm ON b.match_id = pm.match_id AND b.bowler = pm.player_id
JOIN player p ON pm.player_id = p.player_id
WHERE b.over_id <= 6
GROUP BY p.player_name
HAVING COUNT(*) >= 1
ORDER BY avg_runs_per_ball, total_wickets DESC
""")
economical_bowlers_powerplay.show()

+---------------+------------------+-------------+
|    player_name| avg_runs_per_ball|total_wickets|
+---------------+------------------+-------------+
|     sm harwood|0.3333333333333333|            0|
|     ankit soni|               0.5|            0|
|      gr napier|               0.5|            0|
|       aj finch|               0.5|            0|
|        a zampa|               0.5|            0|
|     avesh khan|               0.5|            0|
|       nb singh|0.5833333333333334|            0|
|     ag murtaza|0.6538461538461539|            0|
|      sb bangar|0.6666666666666666|            0|
|     d du preez|0.6666666666666666|            0|
|        s gopal|0.6666666666666666|            0|
|     fh edwards|0.6923076923076923|            0|
|       a kumble|0.7685185185185185|            0|
|j syed mohammad|0.7777777777777778|            0|
|   kp pietersen|0.7777777777777778|            0|
|       umar gul|0.7777777777777778|            0|
|  la carseldine|0.833333333333

In [0]:
toss_impact_individual_matches = spark.sql("""
SELECT m.match_id, m.toss_winner, m.toss_name, m.match_winner,
       CASE WHEN m.toss_winner = m.match_winner THEN 'Won' ELSE 'Lost' END AS match_outcome
FROM match m
WHERE m.toss_name IS NOT NULL
ORDER BY m.match_id
""")

In [0]:
toss_impact_individual_matches.show()

+--------+--------------------+---------+--------------------+-------------+
|match_id|         toss_winner|toss_name|        match_winner|match_outcome|
+--------+--------------------+---------+--------------------+-------------+
|  335987|Royal Challengers...|    field|Kolkata Knight Ri...|         Lost|
|  335988| Chennai Super Kings|      bat| Chennai Super Kings|          Won|
|  335989|    Rajasthan Royals|      bat|    Delhi Daredevils|         Lost|
|  335990|      Mumbai Indians|      bat|Royal Challengers...|         Lost|
|  335991|     Deccan Chargers|      bat|Kolkata Knight Ri...|         Lost|
|  335992|     Kings XI Punjab|      bat|    Rajasthan Royals|         Lost|
|  335993|     Deccan Chargers|      bat|    Delhi Daredevils|         Lost|
|  335994|      Mumbai Indians|    field| Chennai Super Kings|         Lost|
|  335995|    Rajasthan Royals|    field|    Rajasthan Royals|          Won|
|  335996|      Mumbai Indians|    field|     Kings XI Punjab|         Lost|