In [1]:
!apt-get -qq update
!apt-get -y -qq install openjdk-17-jdk-headless
!pip -q install pyspark==3.5.1


W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)
(Reading database ... 121713 files and directories currently installed.)
Preparing to unpack .../openjdk-17-jdk-headless_17.0.17+10-1~22.04_amd64.deb ...
Unpacking openjdk-17-jdk-headless:amd64 (17.0.17+10-1~22.04) over (17.0.16+8~us1-0ubuntu1~22.04.1) ...
Preparing to unpack .../openjdk-17-jre-headless_17.0.17+10-1~22.04_amd64.deb ...
Unpacking openjdk-17-jre-headless:amd64 (17.0.17+10-1~22.04) over (17.0.16+8~us1-0ubuntu1~22.04.1) ...
Setting up openjdk-17-jre-headless:amd64 (17.0.17+10-1~22.04) ...
Installing new version of config file /etc/java-17-openjdk/security/default.policy ...
Installing new version of config file /etc/java-17-openjdk/security/java.security ...
Setting up openjdk-17-jdk-headless:amd64 (17.0.17+10-1~22.04) ...


In [2]:
import os, sys
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-17-openjdk-amd64"
os.environ["PYSPARK_PYTHON"] = sys.executable

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .master("local[*]")
    .appName("Sprint6_Step3_4")
    .getOrCreate()
)

spark


In [3]:
raw_df = spark.read.csv("/content/clean_play_by_play.csv", header=True, inferSchema=True)


In [4]:
print("=== RAW SCHEMA ===")
raw_df.printSchema()
raw_df.show(10, truncate=False)


=== RAW SCHEMA ===
root
 |-- game_id: integer (nullable = true)
 |-- period: integer (nullable = true)
 |-- pctimestring: timestamp (nullable = true)
 |-- eventmsgtype: integer (nullable = true)
 |-- eventmsgactiontype: integer (nullable = true)
 |-- actor_id: integer (nullable = true)
 |-- actor_name: string (nullable = true)
 |-- actor_team_id: integer (nullable = true)
 |-- actor_team_abbr: string (nullable = true)
 |-- homedescription: string (nullable = true)
 |-- visitordescription: string (nullable = true)

+--------+------+-------------------+------------+------------------+--------+---------------+-------------+---------------+--------------------------------------------+------------------------------+
|game_id |period|pctimestring       |eventmsgtype|eventmsgactiontype|actor_id|actor_name     |actor_team_id|actor_team_abbr|homedescription                             |visitordescription            |
+--------+------+-------------------+------------+------------------+--------+

In [10]:
from pyspark.sql.functions import col, coalesce, lit

final_df = (
    raw_df
    # ensure numeric type is consistent
    .withColumn("actor_team_id", col("actor_team_id").cast("long"))
    # create unified event text
    .withColumn(
        "event_text",
        coalesce(col("homedescription"), col("visitordescription"))
    )
    # add season flag for downstream grouping
    .withColumn("season", lit("2014-2015"))
    # optional: keep or drop columns depending on what your team wants
    # .drop("homedescription", "visitordescription")   # uncomment if you only want event_text
)


In [11]:
print("Before cleaning:", raw_df.count())
print("After cleaning:", final_df.count())

print("=== FINAL OUTPUT SCHEMA ===")
final_df.printSchema()
final_df.show(10, truncate=False)


Before cleaning: 925
After cleaning: 925
=== FINAL OUTPUT SCHEMA ===
root
 |-- game_id: integer (nullable = true)
 |-- period: integer (nullable = true)
 |-- pctimestring: timestamp (nullable = true)
 |-- eventmsgtype: integer (nullable = true)
 |-- eventmsgactiontype: integer (nullable = true)
 |-- actor_id: integer (nullable = true)
 |-- actor_name: string (nullable = true)
 |-- actor_team_id: long (nullable = true)
 |-- actor_team_abbr: string (nullable = true)
 |-- homedescription: string (nullable = true)
 |-- visitordescription: string (nullable = true)
 |-- event_text: string (nullable = true)
 |-- season: string (nullable = false)

+--------+------+-------------------+------------+------------------+--------+---------------+-------------+---------------+--------------------------------------------+------------------------------+--------------------------------------------+---------+
|game_id |period|pctimestring       |eventmsgtype|eventmsgactiontype|actor_id|actor_name     |ac

In [12]:
import time


In [13]:
print("=== TEST A: repartition(1) ===")

start_a = time.time()

(final_df
 .repartition(1)
 .write
 .mode("overwrite")
 .parquet("/content/test_output_a/")
)

end_a = time.time()
runtime_a = end_a - start_a
print(f"Runtime A: {runtime_a:.2f} seconds")


=== TEST A: repartition(1) ===
Runtime A: 2.31 seconds


In [14]:
print("=== TEST B: partitionBy('actor_team_id') ===")

start_b = time.time()

(final_df
 .write
 .mode("overwrite")
 .partitionBy("actor_team_id")
 .parquet("/content/test_output_b/")
)

end_b = time.time()
runtime_b = end_b - start_b
print(f"Runtime B: {runtime_b:.2f} seconds")


=== TEST B: partitionBy('actor_team_id') ===
Runtime B: 1.72 seconds


In [15]:
import os

print("Files in test_output_a:", len(os.listdir("/content/test_output_a/")))
print("Files in test_output_b:", len(os.listdir("/content/test_output_b/")))


Files in test_output_a: 4
Files in test_output_b: 32


In [16]:
spark.sparkContext.uiWebUrl


'http://d3ac2f14c4b4:4040'