In [8]:
import requests
import zipfile
import os
import shutil

# Step 1: Update the correct IPL zip URL
url = "https://cricsheet.org/downloads/ipl_male_json.zip"
zip_local_path = "/tmp/ipl_male_json.zip"
extract_path = "/tmp/ipl_male_json"
lakehouse_path = "/lakehouse/default/Files/IPL_json"  # Folder in Lakehouse Files

# Step 2: Download ZIP with streaming
print("Downloading IPL data...")
response = requests.get(url, stream=True)
with open(zip_local_path, "wb") as f:
    for chunk in response.iter_content(chunk_size=8192):
        if chunk:
            f.write(chunk)
print("Downloaded zip file to /tmp")

# Step 3: Extract the ZIP
print("Extracting IPL files...")
with zipfile.ZipFile(zip_local_path, 'r') as zip_ref:
    zip_ref.extractall(extract_path)
print("Extracted files to /tmp/ipl_male_json")

# Step 4: Copy JSON files into Lakehouse Files
print("Copying IPL files to Lakehouse...")
os.makedirs(lakehouse_path, exist_ok=True)

file_count = 0
for filename in os.listdir(extract_path):
    if filename.endswith(".json"):
        src = os.path.join(extract_path, filename)
        dest = os.path.join(lakehouse_path, filename)
        shutil.copyfile(src, dest)
        file_count += 1

print(f"Copied {file_count} IPL JSON files to Lakehouse path: {lakehouse_path}")


StatementMeta(, a0801ae7-b033-48d5-a89e-71cf5633490e, 10, Finished, Available, Finished)

📥 Downloading IPL data...
✅ Downloaded zip file to /tmp
📦 Extracting IPL files...
✅ Extracted files to /tmp/ipl_male_json
🚚 Copying IPL files to Lakehouse...
✅ Copied 1139 IPL JSON files to Lakehouse path: /lakehouse/default/Files/IPL_json


In [1]:
from notebookutils import mssparkutils
mssparkutils.fs.ls("Files/IPL_json/")


StatementMeta(, 98b67f78-9ad5-443d-a246-79d64056e1f3, 3, Finished, Available, Finished)

[FileInfo(path=abfss://0ba040c6-11df-4da7-9fb4-89b3952d3a24@onelake.dfs.fabric.microsoft.com/794fc58f-594c-4735-8e4e-cf9cb51dea99/Files/IPL_json/1082591.json, name=1082591.json, size=78351),
 FileInfo(path=abfss://0ba040c6-11df-4da7-9fb4-89b3952d3a24@onelake.dfs.fabric.microsoft.com/794fc58f-594c-4735-8e4e-cf9cb51dea99/Files/IPL_json/1082592.json, name=1082592.json, size=76503),
 FileInfo(path=abfss://0ba040c6-11df-4da7-9fb4-89b3952d3a24@onelake.dfs.fabric.microsoft.com/794fc58f-594c-4735-8e4e-cf9cb51dea99/Files/IPL_json/1082593.json, name=1082593.json, size=66392),
 FileInfo(path=abfss://0ba040c6-11df-4da7-9fb4-89b3952d3a24@onelake.dfs.fabric.microsoft.com/794fc58f-594c-4735-8e4e-cf9cb51dea99/Files/IPL_json/1082594.json, name=1082594.json, size=76353),
 FileInfo(path=abfss://0ba040c6-11df-4da7-9fb4-89b3952d3a24@onelake.dfs.fabric.microsoft.com/794fc58f-594c-4735-8e4e-cf9cb51dea99/Files/IPL_json/1082595.json, name=1082595.json, size=77551),
 FileInfo(path=abfss://0ba040c6-11df-4da7-9fb

In [2]:
df_raw = spark.read.option("multiline", "true").json("Files/IPL_json/*.json")
display(df_raw.limit(5))


StatementMeta(, 98b67f78-9ad5-443d-a246-79d64056e1f3, 4, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 15d23863-e124-4a4a-b69a-d5d75296d238)

In [6]:
from pyspark.sql.functions import (
    col, lit, explode, concat_ws, monotonically_increasing_id,
    row_number, when
)
from pyspark.sql.window import Window

# Step 1: Load IPL JSON files
df_raw = spark.read.option("multiline", "true").json("Files/IPL_json/*.json")

# Step 2: Explode innings with match metadata
df_innings = df_raw.select(
    explode("innings").alias("inning"),
    col("info.dates")[0].alias("Match_date"),
    col("info.officials.match_referees")[0].alias("Match_Referee")
).withColumn("Join_Column", concat_ws(" - ", col("Match_date"), col("Match_Referee")))

# Step 3: Explode overs
df_overs = df_innings.select(
    col("inning.team").alias("team"),
    explode("inning.overs").alias("over"),
    col("Match_date"),
    col("Join_Column")
)

# Step 4: Explode deliveries (keep as struct)
df_deliveries_struct = df_overs.select(
    concat_ws(" ", lit("Innings"), col("team")).alias("Innings"),
    col("over.over").alias("Over"),
    explode("over.deliveries").alias("delivery_struct"),
    col("Match_date"),
    col("Join_Column")
)

# Step 5: Add delivery number per over
window_spec = Window.partitionBy("Innings", "Over", "Match_date").orderBy(monotonically_increasing_id())
df_deliveries = df_deliveries_struct.withColumn("Delivery", row_number().over(window_spec))

# Step 6: Select and extract extra_type
df_final = df_deliveries.select(
    col("Innings"),
    col("Over"),
    col("Delivery"),
    col("delivery_struct.batter").alias("Batter"),
    col("delivery_struct.non_striker").alias("Nonstriker"),
    col("delivery_struct.bowler").alias("Bowler"),
    col("delivery_struct.runs.batter").alias("runs_batter"),
    col("delivery_struct.runs.extras").alias("runs_extras"),
    col("delivery_struct.runs.total").alias("runs_total"),
    col("delivery_struct.wickets")[0]["player_out"].alias("Batsman_Out"),
    col("delivery_struct.wickets")[0]["kind"].alias("Wicket_Type"),
    col("delivery_struct.wickets")[0]["fielders"][0]["name"].alias("Fielder_Name"),
    
    # NEW: extra type column
    when(col("delivery_struct.extras.byes").isNotNull(), "byes")
    .when(col("delivery_struct.extras.legbyes").isNotNull(), "legbyes")
    .when(col("delivery_struct.extras.noballs").isNotNull(), "noballs")
    .when(col("delivery_struct.extras.penalty").isNotNull(), "penalty")
    .when(col("delivery_struct.extras.wides").isNotNull(), "wides")
    .alias("extra_type"),

    col("Match_date"),
    col("Join_Column")
)

# Step 7: Overwrite table with new schema
df_final.write \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("ipl_ball_by_ball_clean")

print("✅ Table 'ipl_ball_by_ball_clean' created with 'extra_type' column.")


StatementMeta(, 435c1448-96b8-414b-89ac-10d2186daa42, 8, Finished, Available, Finished)

✅ Table 'ipl_ball_by_ball_clean' created with 'extra_type' column.


In [9]:
from pyspark.sql.functions import col, concat_ws

# Re-read the raw JSON data
df_raw = spark.read.option("multiline", "true").json("Files/IPL_json/*.json")

# Extract and clean match-level info
df_summary = df_raw.select(
    col("info.dates")[0].alias("Match_date"),
    col("info.event.name").alias("event_name"),
    col("info.gender").alias("gender"),
    col("info.match_type").alias("match_type"),
    col("info.officials.match_referees")[0].alias("Match_Referee"),
    col("info.officials.tv_umpires")[0].alias("TV_Umpire"),
    col("info.officials.umpires")[0].alias("Umpire_1"),
    col("info.officials.umpires")[1].alias("Umpire_2"),
    col("info.outcome.by.wickets").alias("outcome_by_wickets"),
    col("info.outcome.by.runs").alias("outcome_by_runs"),
    col("info.outcome.winner").alias("outcome_winner"),
    col("info.overs").alias("overs"),
    col("info.player_of_match")[0].alias("Player_of_Match"),
    col("info.season").alias("season"),
    col("info.team_type").alias("team_type"),
    col("info.teams")[0].alias("Team_1"),
    col("info.teams")[1].alias("Team_2"),
    col("info.toss.decision").alias("toss_decision"),
    col("info.toss.winner").alias("toss_winner"),
    col("info.venue").alias("venue"),
    concat_ws(" - ", col("info.dates")[0], col("info.officials.match_referees")[0]).alias("Join_Column")
)

# Save summary table
df_summary.write.mode("overwrite").saveAsTable("ipl_match_summary_clean")

print("Created 'ipl_match_summary_clean' table")


StatementMeta(, 98b67f78-9ad5-443d-a246-79d64056e1f3, 11, Finished, Available, Finished)

✅ Created 'ipl_match_summary_clean' table
