Inferring Schema

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

# ---------------------------------------------
# 1. Define Schema (for production, avoid inferSchema)
# ---------------------------------------------
nba_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("player_name", StringType(), True),
    StructField("team_abbreviation", StringType(), True),
    StructField("age", DoubleType(), True),
    StructField("player_height", DoubleType(), True),
    StructField("player_weight", DoubleType(), True),
    StructField("college", StringType(), True),
    StructField("country", StringType(), True),
    StructField("draft_year", StringType(), True),
    StructField("draft_round", StringType(), True),
    StructField("draft_number", StringType(), True),
    StructField("gp", IntegerType(), True),
    StructField("pts", DoubleType(), True),
    StructField("reb", DoubleType(), True),
    StructField("ast", DoubleType(), True),
    StructField("net_rating", DoubleType(), True),
    StructField("oreb_pct", DoubleType(), True),
    StructField("dreb_pct", DoubleType(), True),
    StructField("usg_pct", DoubleType(), True),
    StructField("ts_pct", DoubleType(), True),
    StructField("ast_pct", DoubleType(), True),
    StructField("season", StringType(), True)
])

Loading CSV File using AutoLoader

In [0]:
input_path = "dbfs:/Volumes/workspace/default/nba/"   # replace with your mounted volume path

nba_df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("header", "true")
    .schema(nba_schema)
    .option("cloudFiles.schemaLocation", "dbfs:/Volumes/workspace/default/nba/")
    .load(input_path)
)

Clean and Transform Data

In [0]:
nba_clean = (nba_df
    .withColumn("age", col("age").cast("int"))
    .withColumn("player_height_m", round(col("player_height")/100, 2))
    .withColumn("player_weight_kg", round(col("player_weight")/2.205, 1))
    .drop("id")   # drop junk column
)

Write To Delta Lake

In [0]:
(nba_clean.writeStream
    .format("delta")
    .option("checkpointLocation", "dbfs:/mnt/checkpoints/nba_checkpoint/")
    .option("path", "dbfs:/mnt/processed/nba_delta/")
    .outputMode("append")
    .start()
)


Analytics Queries

In [0]:
nba_clean.createOrReplaceTempView("nba")

# Example queries:

# Top scorers by season
top_scorers = spark.sql("""
SELECT season, player_name, team_abbreviation, ROUND(pts,2) as points
FROM nba
ORDER BY season, points DESC
LIMIT 20
""")

# Team average stats
team_summary = spark.sql("""
SELECT season, team_abbreviation,
       ROUND(AVG(pts),2) as avg_pts,
       ROUND(AVG(reb),2) as avg_reb,
       ROUND(AVG(ast),2) as avg_ast
FROM nba
GROUP BY season, team_abbreviation
ORDER BY season, avg_pts DESC
""")

# Country-wise player distribution
country_dist = spark.sql("""
SELECT country, COUNT(DISTINCT player_name) as players
FROM nba
GROUP BY country
ORDER BY players DESC
""")

display(top_scorers)
display(team_summary)
display(country_dist)


Efficiency crackdown

In [0]:
efficiency = nba_clean.select("player_name", "season", "usg_pct", "ts_pct", "ast_pct")
display(efficiency)

# College influence
college_perf = spark.sql("""
SELECT college, ROUND(AVG(pts),2) as avg_points, COUNT(DISTINCT player_name) as players
FROM nba
WHERE college IS NOT NULL AND college <> 'None'
GROUP BY college
ORDER BY avg_points DESC
LIMIT 20
""")
display(college_perf)