In [2]:
import os
from pathlib import Path

# ---------- 1) Stop any existing SparkSession (if any) ----------
try:
    spark.stop()
    print("Stopped existing SparkSession.")
except Exception:
    print("No SparkSession to stop (ok).")

# ---------- 2) Clear PySpark global state (fixes dead Py4J gateway) ----------
try:
    from pyspark.context import SparkContext
    from pyspark.sql import SparkSession

    for k in ["PYSPARK_GATEWAY_PORT", "PYSPARK_GATEWAY_SECRET"]:
        if k in os.environ:
            os.environ.pop(k, None)
            print(f"Cleared env {k}")

    SparkContext._active_spark_context = None
    SparkContext._gateway = None
    SparkContext._jvm = None

    try:
        SparkSession._instantiatedContext = None
        SparkSession._activeSession = None
        SparkSession._defaultSession = None
    except Exception:
        pass

    print("✅ PySpark internals reset")
except Exception as e:
    print("⚠️ Reset step warning:", type(e).__name__, e)

# ---------- 3) Start Spark (stable local mode) ----------
from pyspark.sql import SparkSession

OUTPUT_ROOT = Path.home() / "opendota_processed"
OUTPUT_ROOT.mkdir(parents=True, exist_ok=True)

SPARK_LOCAL_DIR = OUTPUT_ROOT / "spark_local"
SPARK_LOCAL_DIR.mkdir(parents=True, exist_ok=True)

spark = (
    SparkSession.builder
    .appName("OpenDota_Notebook")
    .master("local[*]")  # stable; we can change later
    .config("spark.local.dir", str(SPARK_LOCAL_DIR))
    .config("spark.sql.shuffle.partitions", "400")
    .config("spark.sql.adaptive.enabled", "true")
    # IMPORTANT: avoid surprise broadcasts that can destabilize the JVM
    .config("spark.sql.autoBroadcastJoinThreshold", "-1")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("WARN")

print("✅ Spark started:", spark.version)
print(" - Master:", spark.sparkContext.master)
print(" - Spark UI:", spark.sparkContext.uiWebUrl)
print("alive_check =", spark.range(1).count())

Stopped existing SparkSession.
✅ PySpark internals reset
✅ Spark started: 3.5.8
 - Master: local[*]
 - Spark UI: http://192.168.55.108:4040
alive_check = 1


In [4]:
import os
from pathlib import Path

# 1) Stop if possible
try:
    spark.stop()
    print("Stopped SparkSession.")
except Exception:
    print("No SparkSession to stop (ok).")

# 2) Hard reset PySpark globals (dead gateway fix)
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

for k in ["PYSPARK_GATEWAY_PORT", "PYSPARK_GATEWAY_SECRET"]:
    os.environ.pop(k, None)

SparkContext._active_spark_context = None
SparkContext._gateway = None
SparkContext._jvm = None
SparkSession._instantiatedContext = None
SparkSession._activeSession = None
SparkSession._defaultSession = None

# 3) Start Spark (stable settings)
OUTPUT_ROOT = Path.home() / "opendota_processed"
OUTPUT_ROOT.mkdir(parents=True, exist_ok=True)
SPARK_LOCAL_DIR = OUTPUT_ROOT / "spark_local"
SPARK_LOCAL_DIR.mkdir(parents=True, exist_ok=True)

spark = (
    SparkSession.builder
    .appName("OpenDota_Preprocessing_Notebook_Stable")
    .master("local[4]")                 # <-- reduce threads for stability
    .config("spark.ui.enabled", "true")# <-- remove UI port issues while debugging
    .config("spark.local.dir", str(SPARK_LOCAL_DIR))
    .config("spark.sql.shuffle.partitions", "32")
    .config("spark.sql.adaptive.enabled", "false")  # <-- disable AQE for now
    .config("spark.sql.autoBroadcastJoinThreshold", "-1")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("WARN")
print("✅ Spark started:", spark.version)
print(" - Master:", spark.sparkContext.master)
print(" - Spark UI:", spark.sparkContext.uiWebUrl)
print("alive_check =", spark.range(1).count())

Stopped SparkSession.
✅ Spark started: 3.5.8
 - Master: local[4]
 - Spark UI: http://192.168.55.108:4040
alive_check = 1


In [None]:
input_path = "data_output/part-*.csv" #ibahin path 

In [10]:
from pyspark.sql.types import StructType, StructField, LongType, IntegerType, StringType

schema = StructType([
    StructField("match_id", LongType(), True),
    StructField("match_seq_num", LongType(), True), # Needed to maintain index position
    StructField("radiant_win", StringType(), True),
    StructField("start_time", LongType(), True),
    StructField("duration", IntegerType(), True)
])

df_raw = spark.read.format("csv") \
    .option("header", "true") \
    .schema(schema) \
    .load(input_path)

df_matches = df_raw.select("match_id", "radiant_win", "start_time", "duration")

In [11]:
df_matches.select("match_id","radiant_win", "start_time", "duration").show(5)

+---------+-----------+----------+--------+
| match_id|radiant_win|start_time|duration|
+---------+-----------+----------+--------+
|473260719|          t|1390019262|    3234|
|473260722|          f|1390019274|    2360|
|473260723|          f|1390019260|    2546|
|473260724|          f|1390019278|    2647|
|473260725|          t|1390019250|    3030|
+---------+-----------+----------+--------+
only showing top 5 rows



In [12]:
from pyspark.sql import functions as F

# count of matches before drop
before_count = df_matches.count()

# constructing clean match data
matches_clean = df_matches.withColumn("match_id", F.col("match_id").cast("long")) \
                          .withColumn("start_time", F.col("start_time").cast("long")) \
                          .withColumn("duration", F.col("duration").cast("int")) \
                          .withColumn("radiant_win", F.col("radiant_win") == "t") \
                          .dropna(subset=["match_id"]) \
                          .dropDuplicates(["match_id"])

# Justification: Matches < 5 mins (300s) are usually 'null' games (early abandons).
# Matches > 3 hours (10800s) are extreme outliers/server errors in Dota 2.
lower_bound = 300
upper_bound = 10800
matches_clean = matches_clean.filter((F.col("duration") >= lower_bound) & (F.col("duration") <= upper_bound))

# D) Capture After Count
after_count = matches_clean.count()

In [13]:
# null rate calc
null_rate_df = matches_clean.select([
    (F.count(F.when(F.col(c).isNull(), c)) / after_count * 100).alias(c) 
    for c in matches_clean.columns
])

print("--- TASK 4: EVIDENCE ---")
print(f"Before cleaning: {before_count} rows")
print(f"After cleaning:  {after_count} rows")
print(f"Rows removed:    {before_count - after_count}")

print("\n--- Null Rate Summary (%) ---")
null_rate_df.show()

--- TASK 4: EVIDENCE ---
Before cleaning: 500000 rows
After cleaning:  497320 rows
Rows removed:    2680

--- Null Rate Summary (%) ---
+--------+--------------------+----------+--------+
|match_id|         radiant_win|start_time|duration|
+--------+--------------------+----------+--------+
|     0.0|2.010777768840987...|       0.0|     0.0|
+--------+--------------------+----------+--------+



In [14]:
matches_clean.printSchema()
matches_clean.limit(5).show()

root
 |-- match_id: long (nullable = true)
 |-- radiant_win: boolean (nullable = true)
 |-- start_time: long (nullable = true)
 |-- duration: integer (nullable = true)

+----------+-----------+----------+--------+
|  match_id|radiant_win|start_time|duration|
+----------+-----------+----------+--------+
| 473260800|      false|1390019708|    1843|
|1009842190|      false|1415362477|    2950|
|1808667856|      false|1442797852|    3096|
| 692810562|      false|1401589879|    2141|
|1980325600|       true|1449295127|    2140|
+----------+-----------+----------+--------+



In [15]:
from pyspark.sql import functions as F

# derive day, year, month, and duration_min
matches_features = matches_clean.withColumn("start_ts", F.from_unixtime("start_time").cast("timestamp")) \
    .withColumn("year", F.year("start_ts")) \
    .withColumn("month", F.month("start_ts")) \
    .withColumn("day", F.day("start_ts")) \
    .withColumn("duration_min", F.col("duration") / 60)

# bins: <20, 20–30, 30-40, 40-50, 50-60, 60+
matches_features = matches_features.withColumn("duration_bin", 
    F.when(F.col("duration_min") < 20, "<20")
     .when((F.col("duration_min") >= 20) & (F.col("duration_min") < 30), "20-30")
     .when((F.col("duration_min") >= 30) & (F.col("duration_min") < 40), "30-40")
     .when((F.col("duration_min") >= 40) & (F.col("duration_min") < 50), "40-50")
     .when((F.col("duration_min") >= 50) & (F.col("duration_min") < 60), "50-60")
     .otherwise("60+")
)

# preview of newest features 
matches_features.select("match_id", "start_ts", "year", "month", "day", "duration_min", "duration_bin").show(5)

+----------+-------------------+----+-----+---+------------------+------------+
|  match_id|           start_ts|year|month|day|      duration_min|duration_bin|
+----------+-------------------+----+-----+---+------------------+------------+
| 473260800|2014-01-18 12:35:08|2014|    1| 18|30.716666666666665|       30-40|
|1009842190|2014-11-07 20:14:37|2014|   11|  7|49.166666666666664|       40-50|
|1808667856|2015-09-21 09:10:52|2015|    9| 21|              51.6|       50-60|
| 692810562|2014-06-01 10:31:19|2014|    6|  1| 35.68333333333333|       30-40|
|1980325600|2015-12-05 13:58:47|2015|   12|  5|35.666666666666664|       30-40|
+----------+-------------------+----+-----+---+------------------+------------+
only showing top 5 rows



In [16]:
# define the output path
parquet_path = f"data/matches_features.parquet"

# write the data partitioned by year and month
matches_features.write.mode("overwrite") \
    .partitionBy("year", "month") \
    .parquet(parquet_path)

print(f"✅ Data successfully written to: {parquet_path}")

✅ Data successfully written to: data/matches_features.parquet


In [None]:
import os
os.system(f"ls -R {parquet_path} | head -n 20")

In [None]:
df_p = spark.read.parquet(parquet_path)
filtered_query = df_p.filter((F.col("year") == 2014) & (F.col("month") == 1))
filtered_query.explain(True)