In [None]:
from pyspark.sql import SparkSession

#create session
spark = SparkSession.builder.appName("IPL Data Analysis").getOrCreate()

In [None]:
spark

In [None]:
from pyspark.sql.types import StructField, StructType, IntegerType, StringType, BooleanType, DateType, DecimalType
from pyspark.sql.functions import col, when, sum, avg, row_number
from pyspark.sql.window import Window


In [None]:
ball_by_ball_schema = StructType([
    StructField("match_id", IntegerType(), True),
    StructField("over_id", IntegerType(), True),
    StructField("ball_id", IntegerType(), True),
    StructField("innings_no", IntegerType(), True),
    StructField("team_batting", StringType(), True),
    StructField("team_bowling", StringType(), True),
    StructField("striker_batting_position", IntegerType(), True),
    StructField("extra_type", StringType(), True),
    StructField("runs_scored", IntegerType(), True),
    StructField("extra_runs", IntegerType(), True),
    StructField("wides", IntegerType(), True),
    StructField("legbyes", IntegerType(), True),
    StructField("byes", IntegerType(), True),
    StructField("noballs", IntegerType(), True),
    StructField("penalty", IntegerType(), True),
    StructField("bowler_extras", IntegerType(), True),
    StructField("out_type", StringType(), True),
    StructField("caught", BooleanType(), True),
    StructField("bowled", BooleanType(), True),
    StructField("run_out", BooleanType(), True),
    StructField("lbw", BooleanType(), True),
    StructField("retired_hurt", BooleanType(), True),
    StructField("stumped", BooleanType(), True),
    StructField("caught_and_bowled", BooleanType(), True),
    StructField("hit_wicket", BooleanType(), True),
    StructField("obstructingfeild", BooleanType(), True),
    StructField("bowler_wicket", IntegerType(), True),
    StructField("match_date", DateType(), True),
    StructField("season", IntegerType(), True),
    StructField("striker", IntegerType(), True),
    StructField("non_striker", IntegerType(), True),
    StructField("bowler", IntegerType(), True),
    StructField("player_out", IntegerType(), True),
    StructField("fielders", IntegerType(), True),
    StructField("striker_match_sk", IntegerType(), True),
    StructField("strikersk", IntegerType(), True),
    StructField("nonstriker_match_sk", IntegerType(), True),
    StructField("nonstriker_sk", IntegerType(), True),
    StructField("fielder_match_sk", IntegerType(), True),
    StructField("fielder_sk", IntegerType(), True),
    StructField("bowler_match_sk", IntegerType(), True),
    StructField("bowler_sk", IntegerType(), True),
    StructField("playerout_match_sk", IntegerType(), True),
    StructField("battingteam_sk", IntegerType(), True),
    StructField("bowlingteam_sk", IntegerType(), True),
    StructField("keeper_catch", BooleanType(), True),
    StructField("player_out_sk", IntegerType(), True),
    StructField("matchdatesk", DateType(), True)
])
ball_by_ball_df = spark.read.schema(ball_by_ball_schema).format("csv").option("header","true").load("s3://samiksha-khare-projects/ipl-dataset-analysis/Ball_By_Ball.csv")


+--------+-------+-------------+------+
|match_id|over_id|bowler_wicket|caught|
+--------+-------+-------------+------+
|  335987|     11|            0|  null|
|  335987|      1|            0|  null|
|  335987|     16|            0|  null|
|  335987|      1|            0|  null|
|  335987|     16|            1|  null|
|  335987|      1|            0|  null|
|  335987|     15|            0|  null|
|  335987|      1|            0|  null|
|  335987|     15|            0|  null|
|  335987|      1|            0|  null|
|  335987|     15|            0|  null|
|  335987|      1|            0|  null|
|  335987|     15|            0|  null|
|  335987|      1|            0|  null|
|  335987|     15|            0|  null|
|  335987|      2|            0|  null|
|  335987|     15|            0|  null|
|  335987|      2|            0|  null|
|  335987|     15|            0|  null|
|  335987|      2|            0|  null|
|  335987|     14|            1|  null|
|  335987|      2|            0|  null|


In [None]:
match_schema = StructType([
    StructField("match_sk", IntegerType(), True),
    StructField("match_id", IntegerType(), True),
    StructField("team1", StringType(), True),
    StructField("team2", StringType(), True),
    StructField("match_date", DateType(), True),
    StructField("season_year", IntegerType(), True), 
    StructField("venue_name", StringType(), True),
    StructField("city_name", StringType(), True),
    StructField("country_name", StringType(), True),
    StructField("toss_winner", StringType(), True),
    StructField("match_winner", StringType(), True),
    StructField("toss_name", StringType(), True),
    StructField("win_type", StringType(), True),
    StructField("outcome_type", StringType(), True),
    StructField("manofmach", StringType(), True),
    StructField("win_margin", IntegerType(), True),
    StructField("country_id", IntegerType(), True)
])
match_df = spark.read.schema(match_schema).format("csv").option("header","true").load("s3://samiksha-khare-projects/ipl-dataset-analysis/Ball_By_Ball.csv")

In [None]:
player_schema = StructType([
    StructField("player_sk", IntegerType(), True),
    StructField("player_id", IntegerType(), True),
    StructField("player_name", StringType(), True),
    StructField("dob", DateType(), True),
    StructField("batting_hand", StringType(), True),
    StructField("bowling_skill", StringType(), True),
    StructField("country_name", StringType(), True)
])
player_df = spark.read.schema(player_schema).format("csv").option("header","true").load("s3://samiksha-khare-projects/ipl-dataset-analysis/Ball_By_Ball.csv")

In [None]:
player_match_schema = StructType([
    StructField("player_match_sk", IntegerType(), True),
    StructField("playermatch_key", DecimalType(10, 2), True),  # Adjust precision and scale as necessary
    StructField("match_id", IntegerType(), True),
    StructField("player_id", IntegerType(), True),
    StructField("player_name", StringType(), True),
    StructField("dob", DateType(), True),
    StructField("batting_hand", StringType(), True),
    StructField("bowling_skill", StringType(), True),
    StructField("country_name", StringType(), True),
    StructField("role_desc", StringType(), True),
    StructField("player_team", StringType(), True),
    StructField("opposit_team", StringType(), True),
    StructField("season_year", IntegerType(), True),  
    StructField("is_manofthematch", BooleanType(), True),
    StructField("age_as_on_match", IntegerType(), True),
    StructField("isplayers_team_won", BooleanType(), True),
    StructField("batting_status", StringType(), True),
    StructField("bowling_status", StringType(), True),
    StructField("player_captain", StringType(), True),
    StructField("opposit_captain", StringType(), True),
    StructField("player_keeper", StringType(), True),
    StructField("opposit_keeper", StringType(), True)
])
player_match_df = spark.read.schema(player_match_schema).format("csv").option("header","true").load("s3://samiksha-khare-projects/ipl-dataset-analysis/Ball_By_Ball.csv")

In [None]:
team_schema = StructType([
    StructField("team_sk", IntegerType(), True),
    StructField("team_id", IntegerType(), True),
    StructField("team_name", StringType(), True)
])
team_df = spark.read.schema(team_schema).format("csv").option("header","true").load("s3://samiksha-khare-projects/ipl-dataset-analysis/Ball_By_Ball.csv")

In [None]:
ball_by_ball_df.show(5)

+--------+-------+-------+----------+------------+------------+------------------------+----------+-----------+----------+-----+-------+----+-------+-------+-------------+--------------+------+------+-------+----+------------+-------+-----------------+----------+----------------+-------------+----------+------+-------+-----------+------+----------+--------+----------------+---------+-------------------+-------------+----------------+----------+---------------+---------+------------------+--------------+--------------+------------+-------------+-----------+
|match_id|over_id|ball_id|innings_no|team_batting|team_bowling|striker_batting_position|extra_type|runs_scored|extra_runs|wides|legbyes|byes|noballs|penalty|bowler_extras|      out_type|caught|bowled|run_out| lbw|retired_hurt|stumped|caught_and_bowled|hit_wicket|obstructingfeild|bowler_wicket|match_date|season|striker|non_striker|bowler|player_out|fielders|striker_match_sk|strikersk|nonstriker_match_sk|nonstriker_sk|fielder_match_sk|

In [None]:

# Transformation 1: 
# Filter to include only valid deliveries (excluding extras- like white and no balls for specific analysis)
ball_by_ball_df = ball_by_ball_df.filter((col("wides")==0) & (col("noballs")==0))

# Transformation 2: 
# Aggregation: calculate the total and average runs scored in each match and innings
total_and_avg_runs = ball_by_ball_df.groupBy("match_id","innings_no").agg(
    sum("runs_scored").alias("total_runs"),
    avg("runs_scored").alias("average_runs"),
)

In [None]:
total_and_avg_runs.orderBy('match_id','innings_no').show(10)


+--------+----------+----------+------------------+
|match_id|innings_no|total_runs|      average_runs|
+--------+----------+----------+------------------+
|  335987|         1|       205|1.7083333333333333|
|  335987|         2|        63|0.6923076923076923|
|  335988|         1|       230|1.9166666666666667|
|  335988|         2|       196|1.6333333333333333|
|  335989|         1|       118|0.9833333333333333|
|  335989|         2|       122|1.3406593406593406|
|  335990|         1|       154|1.2833333333333334|
|  335990|         2|       161|1.3644067796610169|
|  335991|         1|       100|0.8928571428571429|
|  335991|         2|        84|0.7368421052631579|
+--------+----------+----------+------------------+
only showing top 10 rows



In [None]:
# Transformation 2: 
# Window function: Calculate runnung total of runs in each match for each over
windowFunction = Window.partitionBy("match_id","innings_no").orderBy("over_id")

running_total_runs = ball_by_ball_df.withColumn(
    "running_toal_runs_each_over", 
    sum("runs_scored").over(windowFunction)
)

In [None]:

# Show running total of match number 335987
runs_per_match_per_inns_per_over = running_total_runs.select("match_id", "over_id","innings_no","running_toal_runs_each_over").orderBy("match_id","innings_no")

runs_per_match_per_inns_per_over.where(runs_per_match_per_inns_per_over.match_id == 335987).show(240)


+--------+-------+----------+---------------------------+
|match_id|over_id|innings_no|running_toal_runs_each_over|
+--------+-------+----------+---------------------------+
|  335987|      1|         1|                          0|
|  335987|      1|         1|                          0|
|  335987|      1|         1|                          0|
|  335987|      1|         1|                          0|
|  335987|      1|         1|                          0|
|  335987|      1|         1|                          0|
|  335987|      2|         1|                         18|
|  335987|      2|         1|                         18|
|  335987|      2|         1|                         18|
|  335987|      2|         1|                         18|
|  335987|      2|         1|                         18|
|  335987|      2|         1|                         18|
|  335987|      3|         1|                         23|
|  335987|      3|         1|                         23|
|  335987|    

In [None]:
# Conditional column flag for high impact balls (either a wicket or more than 6 runs including extras)
ball_by_ball_df = ball_by_ball_df.withColumn(
    "high_impact",
    when( (col("runs_scored") + col("extra_runs") > 6) | (col("bowler_wicket") == 1),True).otherwise(False)
)

ball_by_ball_df.select("match_id", "over_id", "runs_scored", "extra_runs","bowler_wicket","high_impact").where(ball_by_ball_df.high_impact == True).orderBy("match_id", "over_id").show(300)

+--------+-------+-----------+----------+-------------+-----------+
|match_id|over_id|runs_scored|extra_runs|bowler_wicket|high_impact|
+--------+-------+-----------+----------+-------------+-----------+
|  335987|      2|          0|         0|            1|       true|
|  335987|      3|          0|         0|            1|       true|
|  335987|      5|          0|         0|            1|       true|
|  335987|      6|          0|         0|            1|       true|
|  335987|      6|          0|         0|            1|       true|
|  335987|      8|          0|         0|            1|       true|
|  335987|      9|          0|         0|            1|       true|
|  335987|      9|          0|         0|            1|       true|
|  335987|     13|          0|         0|            1|       true|
|  335987|     14|          0|         0|            1|       true|
|  335987|     16|          0|         0|            1|       true|
|  335987|     18|          0|         0|       