In [1]:
from pyspark.sql import SparkSession

In [2]:
import findspark
findspark.init()

In [3]:
spark=SparkSession.builder.appName('BigData Cricket Analysis').getOrCreate()

In [4]:
spark

In [7]:
fact_balls = spark.read.csv("C:\\Users\\Patron\\Downloads\\balls.csv", header = True)
dim_matches = spark.read.csv("C:\\Users\\Patron\\Downloads\\matches.csv", header = True)
dim_players = spark.read.csv("C:\\Users\\Patron\\Downloads\\players.csv", header = True)

In [9]:
import pyspark.sql.functions as F

In [10]:
# Calculate bat_details
bat_details = fact_balls.groupBy('match_id', 'innings', 'striker') \
    .agg(
        F.sum('runs_off_bat').alias('runs'),
        F.sum(F.when(F.col('extras_type') == 'WIDE', 0).otherwise(1)).alias('balls_faced'),
        (F.sum(F.when(F.col('runs_off_bat') == 0, 1).otherwise(0)) -
         F.sum(F.when(F.col('extras_type') == 'WIDE', 1).otherwise(0))).alias('0s'),
        F.sum(F.when(F.col('runs_off_bat') == 4, 1).otherwise(0)).alias('4s'),
        F.sum(F.when(F.col('runs_off_bat') == 6, 1).otherwise(0)).alias('6s')
    )

# Join with dim_players and dim_matches
players = dim_players.select("match_id", "player_id", "player_name", "team")
matches = dim_matches.select("match_id", "team", "event", "dates_played").withColumnRenamed("dates_played", "match_date")

# Calculate player_dismissed_flg
player_dismissed_flg = fact_balls.filter(F.col("player_dismissed").isNotNull()) \
    .groupBy("match_id", "innings") \
    .agg(F.collect_list("player_dismissed").alias("dismissed_players"))

# Join bat_details, players, matches, and player_dismissed_flg
score_data = matches.join(players, "match_id") \
    .join(bat_details, (matches.match_id == bat_details.match_id) & (players.player_name == bat_details.striker)) \
    .join(player_dismissed_flg, (matches.match_id == player_dismissed_flg.match_id) & (bat_details.innings == player_dismissed_flg.innings)) \
    .select(
        matches["match_id"], "match_date", "event", matches["team"],
        bat_details.striker.alias("batsman"), bat_details.innings.alias("innings_number"),
        "runs", "balls_faced", "0s", "4s", "6s",
        F.round((bat_details.runs / bat_details.balls_faced) * 100, 2).alias("strikerate")
    )

# Show the result
score_data.show()

+--------+----------+--------------------+------------+--------+--------------+----+-----------+---+---+---+----------+
|match_id|match_date|               event|        team| batsman|innings_number|runs|balls_faced| 0s| 4s| 6s|strikerate|
+--------+----------+--------------------+------------+--------+--------------+----+-----------+---+---+---+----------+
| 1000851|2016-11-04|South Africa in A...|South Africa|AC Voges|             2|27.0|         60| 49|  4|  0|      45.0|
| 1000851|2016-11-04|South Africa in A...|South Africa|AC Voges|             4| 1.0|         12| 11|  0|  0|      8.33|
| 1000851|2016-11-07|South Africa in A...|South Africa|AC Voges|             2|27.0|         60| 49|  4|  0|      45.0|
| 1000851|2016-11-07|South Africa in A...|South Africa|AC Voges|             4| 1.0|         12| 11|  0|  0|      8.33|
| 1000851|2016-11-04|South Africa in A...|   Australia|AC Voges|             2|27.0|         60| 49|  4|  0|      45.0|
| 1000851|2016-11-04|South Africa in A..