In [36]:
%run "E:/DataEngineering/Ipl-Analytics/projects/ipynb/00_Set_Environmnet.ipynb"

[INFO] Initializing Spark session...
[INFO] Spark session initialized successfully.


In [37]:
from pyspark.sql import SparkSession
from pyspark.sql.types import (StructField, StructType, IntegerType, StringType, BooleanType, DateType, DecimalType)
from pyspark.sql import functions as F
from pyspark.sql.functions import trim, lower, initcap, regexp_replace, col, when, lit, to_date
import shutil
import os

In [38]:
def delete_folder_if_exists(path):
    try:
        if os.path.exists(path):
            shutil.rmtree(path)
            print(f"[INFO] Deleted Existing Directory: {path}")
    except Exception as e:
        print(f"[ERROR] Could not delete the folder {path}: {e}")

In [39]:
# ------------------------------------------------------------------------------------------------------------------------
# 📄 3. Schema Definitions
# ------------------------------------------------------------------------------------------------------------------------

print("[INFO] Defining custom schemas for all input datasets...")

ball_by_ball_schema = StructType([
    StructField("match_id", IntegerType(), True),
    StructField("over_id", IntegerType(), True),
    StructField("ball_id", IntegerType(), True),
    StructField("innings_no", IntegerType(), True),
    StructField("team_batting", StringType(), True),
    StructField("team_bowling", StringType(), True),
    StructField("striker_batting_position", IntegerType(), True),
    StructField("extra_type", StringType(), True),
    StructField("runs_scored", IntegerType(), True),
    StructField("extra_runs", IntegerType(), True),
    StructField("wides", IntegerType(), True),
    StructField("legbyes", IntegerType(), True),
    StructField("byes", IntegerType(), True),
    StructField("noballs", IntegerType(), True),
    StructField("penalty", IntegerType(), True),
    StructField("bowler_extras", IntegerType(), True),
    StructField("out_type", StringType(), True),
    StructField("caught", IntegerType(), True),
    StructField("bowled", IntegerType(), True),
    StructField("run_out", IntegerType(), True),
    StructField("lbw", IntegerType(), True),
    StructField("retired_hurt", IntegerType(), True),
    StructField("stumped", IntegerType(), True),
    StructField("caught_and_bowled", IntegerType(), True),
    StructField("hit_wicket", IntegerType(), True),
    StructField("obstructingfeild", IntegerType(), True),
    StructField("bowler_wicket", IntegerType(), True),
    StructField("match_date", StringType(), True),
    StructField("season", IntegerType(), True),
    StructField("striker", IntegerType(), True),
    StructField("non_striker", IntegerType(), True),
    StructField("bowler", IntegerType(), True),
    StructField("player_out", IntegerType(), True),
    StructField("fielders", IntegerType(), True),
    StructField("striker_match_sk", IntegerType(), True),
    StructField("strikersk", IntegerType(), True),
    StructField("nonstriker_match_sk", IntegerType(), True),
    StructField("nonstriker_sk", IntegerType(), True),
    StructField("fielder_match_sk", IntegerType(), True),
    StructField("fielder_sk", IntegerType(), True),
    StructField("bowler_match_sk", IntegerType(), True),
    StructField("bowler_sk", IntegerType(), True),
    StructField("playerout_match_sk", IntegerType(), True),
    StructField("battingteam_sk", IntegerType(), True),
    StructField("bowlingteam_sk", IntegerType(), True),
    StructField("keeper_catch", IntegerType(), True),
    StructField("player_out_sk", IntegerType(), True),
    StructField("matchdatesk", IntegerType(), True),
])

match_schema = StructType([
    StructField("match_sk", IntegerType(), True),
    StructField("match_id", IntegerType(), True),
    StructField("team1", StringType(), True),
    StructField("team2", StringType(), True),
    StructField("match_date", StringType(), True),
    StructField("season_year", IntegerType(), True),  # Year as IntegerType
    StructField("venue_name", StringType(), True),
    StructField("city_name", StringType(), True),
    StructField("country_name", StringType(), True),
    StructField("toss_winner", StringType(), True),
    StructField("match_winner", StringType(), True),
    StructField("toss_name", StringType(), True),
    StructField("win_type", StringType(), True),
    StructField("outcome_type", StringType(), True),
    StructField("manofmach", StringType(), True),
    StructField("win_margin", IntegerType(), True),
    StructField("country_id", IntegerType(), True),
])

player_schema = StructType([
    StructField("player_sk", IntegerType(), True),
    StructField("player_id", IntegerType(), True),
    StructField("player_name", StringType(), True),
    StructField("dob", StringType(), True),
    StructField("batting_hand", StringType(), True),
    StructField("bowling_skill", StringType(), True),
    StructField("country_name", StringType(), True),
])

player_match_schema = StructType([
    StructField("player_match_sk", IntegerType(), True),
    StructField("playermatch_key",StringType(), True),  # Adjust precision/scale as needed
    StructField("match_id", IntegerType(), True),
    StructField("player_id", IntegerType(), True),
    StructField("player_name", StringType(), True),
    StructField("dob", StringType(), True),
    StructField("batting_hand", StringType(), True),
    StructField("bowling_skill", StringType(), True),
    StructField("country_name", StringType(), True),
    StructField("role_desc", StringType(), True),
    StructField("player_team", StringType(), True),
    StructField("opposit_team", StringType(), True),
    StructField("season_year", IntegerType(), True),
    StructField("is_manofthematch", IntegerType(), True),
    StructField("age_as_on_match", IntegerType(), True),
    StructField("isplayers_team_won", IntegerType(), True),
    StructField("batting_status", StringType(), True),
    StructField("bowling_status", StringType(), True),
    StructField("player_captain", StringType(), True),
    StructField("opposit_captain", StringType(), True),
    StructField("player_keeper", StringType(), True),
    StructField("opposit_keeper", StringType(), True),
])

team_schema = StructType([
    StructField("team_sk", IntegerType(), True),
    StructField("team_id", IntegerType(), True),
    StructField("team_name", StringType(), True),
])
print("[INFO] Schema definitions completed.")

[INFO] Defining custom schemas for all input datasets...
[INFO] Schema definitions completed.


In [40]:
# ------------------------------------------------------------------------------------------------------------------------
# 📥 4. Load DataFrames from GCS
# ------------------------------------------------------------------------------------------------------------------------

from pyspark.sql import SparkSession

# Your spark session is assumed to be already initialized as `spark`

# List of table names
tables = ["Ball_By_Ball", "Match", "Player", "Player_match", "Team"]
basepath = 'gs://ipl-data-project/'
dataframes = {}

# Schema mapping (replace with actual schema variables you have defined)
schemas = {
    "Ball_By_Ball": ball_by_ball_schema,
    "Match": match_schema,
    "Player": player_schema,
    "Player_match": player_match_schema,
    "Team": team_schema
}
print(f"[INFO] Data Engine Started:")

# Loop to read each table
for table in tables:
    try:
        print(f"[INFO] Loading the Table: {table}")
        df = spark.read.schema(schemas[table]) \
            .option("header", "true") \
            .csv(basepath + f"{table}.csv")
        
        dataframes[table] = df
        print(f"[SUCCESS] Completed loading table: {table}, Rows: {df.count()}")
    except Exception as e:
        print(f"[ERROR] Failed to load table {table}: {e}")



[INFO] Data Engine Started:
[INFO] Loading the Table: Ball_By_Ball
[SUCCESS] Completed loading table: Ball_By_Ball, Rows: 150451
[INFO] Loading the Table: Match
[SUCCESS] Completed loading table: Match, Rows: 637
[INFO] Loading the Table: Player
[SUCCESS] Completed loading table: Player, Rows: 497
[INFO] Loading the Table: Player_match
[SUCCESS] Completed loading table: Player_match, Rows: 13993
[INFO] Loading the Table: Team
[SUCCESS] Completed loading table: Team, Rows: 13


In [41]:

# ------------------------------------------------------------------------------------------------------------------------
# 🧾 4. Schema Validation
# ------------------------------------------------------------------------------------------------------------------------

print("\n[INFO] Printing all schemas for verification...")

dataframes["Ball_By_Ball"].printSchema()
dataframes["Match"].printSchema()
dataframes["Player"].printSchema()
dataframes["Player_match"].printSchema()
dataframes["Team"].printSchema()

print("\n[INFO] All datasets loaded from cloud and schemas enforced and verified.")




[INFO] Printing all schemas for verification...
root
 |-- match_id: integer (nullable = true)
 |-- over_id: integer (nullable = true)
 |-- ball_id: integer (nullable = true)
 |-- innings_no: integer (nullable = true)
 |-- team_batting: string (nullable = true)
 |-- team_bowling: string (nullable = true)
 |-- striker_batting_position: integer (nullable = true)
 |-- extra_type: string (nullable = true)
 |-- runs_scored: integer (nullable = true)
 |-- extra_runs: integer (nullable = true)
 |-- wides: integer (nullable = true)
 |-- legbyes: integer (nullable = true)
 |-- byes: integer (nullable = true)
 |-- noballs: integer (nullable = true)
 |-- penalty: integer (nullable = true)
 |-- bowler_extras: integer (nullable = true)
 |-- out_type: string (nullable = true)
 |-- caught: integer (nullable = true)
 |-- bowled: integer (nullable = true)
 |-- run_out: integer (nullable = true)
 |-- lbw: integer (nullable = true)
 |-- retired_hurt: integer (nullable = true)
 |-- stumped: integer (nulla

In [42]:
print("\n[INFO] Deleting all Existing Tables")

tables = ["Ball_By_Ball", "Match", "Player", "Player_match", "Team"]
rawpath='E:/DataEngineering/Ipl-Analytics/raw-data/'

for table in tables:
    fullpath=rawpath+table+"/"
    delete_folder_if_exists(fullpath)
    
print("\n[INFO] Realoading Full Raw Data")

tables = ["Ball_By_Ball", "Match", "Player", "Player_match", "Team"]
rawpath='E:/DataEngineering/Ipl-Analytics/raw-data/'

for table in tables:
    fullpath=rawpath+table+"/"
    dataframes[table].coalesce(1).write.option("header", "true").mode("overwrite").csv(fullpath)
    print(f"Loaded Table: {table}")

print("\n[INFO] Loaded Full Raw Data")


[INFO] Deleting all Existing Tables
[INFO] Deleted Existing Directory: E:/DataEngineering/Ipl-Analytics/raw-data/Ball_By_Ball/
[INFO] Deleted Existing Directory: E:/DataEngineering/Ipl-Analytics/raw-data/Match/
[INFO] Deleted Existing Directory: E:/DataEngineering/Ipl-Analytics/raw-data/Player/
[INFO] Deleted Existing Directory: E:/DataEngineering/Ipl-Analytics/raw-data/Player_match/
[INFO] Deleted Existing Directory: E:/DataEngineering/Ipl-Analytics/raw-data/Team/

[INFO] Realoading Full Raw Data
Loaded Table: Ball_By_Ball
Loaded Table: Match
Loaded Table: Player
Loaded Table: Player_match
Loaded Table: Team

[INFO] Loaded Full Raw Data
