In [None]:
from pyspark.sql import SparkSession


spark = (
    SparkSession.builder.master("local[*]")
    .appName("local_testing")
    .config("spark.driver.host", "127.0.0.1")
    .getOrCreate()
    )

spark

In [None]:
# STEP 1: Read in Advanced Stats CSV data and filter to only NBA data (removing historical ABA data)
adv_stats_file = 'NBA_data/Advanced.csv'
# filter down to only NBA data
df_adv_stats = spark.read.csv(adv_stats_file, header=True, nullValue="NA", inferSchema=True).where("lg = 'NBA'")
# df_adv_stats.createOrReplaceTempView('tmp_advanced_stats')
df_adv_stats.printSchema()
print(df_adv_stats.count())

In [None]:
# STEP 2: dedupe data to 1 record per player, per season.
# Players who played for multiple teams in a season have 1 record per team and 1 overall summary record (where "tm = 'TOT")
# We only care about the overall summary record

# overall summary record for players with multiple teams in the season
df_multi_team = df_adv_stats.where("tm = 'TOT'")

# summary record for players only on one team in a season
df_single_team = df_adv_stats.join(df_multi_team, on=["player_id", "season"], how="left_anti")

# union them together
df_all = df_multi_team.unionByName(df_single_team)
df_all.createOrReplaceTempView("tmp_advanced_stats_deduped")
print(df_all.count())

# Confirm each player only has one record per season played
dupe_count = spark.sql("""
    SELECT
        player_id,
        player,
        season,
        COUNT(*) AS record_count
    FROM tmp_advanced_stats_deduped
    GROUP BY ALL
    HAVING COUNT(*) > 1
    """).count()

assert dupe_count == 0, "Duplicate records found"

In [None]:
# STEP 3: Filter down to only players who meet the following criteria:
## earliest season was 1975 onward
## and who played a minimum of 2 seasons (41 games each)
## filter the seasons down to 1980 onward as it is concerned the "modern era"
df_season_filtered = spark.sql("""
    WITH cte_season_count_filter AS (
        SELECT
            player_id,
            player,
            COUNT(DISTINCT season) AS total_seasons
        FROM
            tmp_advanced_stats_deduped
        WHERE
            g >= 41
        GROUP BY ALL
        HAVING COUNT(DISTINCT season) >= 2
        AND MIN(season) >= 1975
        ORDER BY 3 DESC
    )

    SELECT
        tas.*,
        tas.mp/tas.g AS mpg,
        ows/ws * ws_48 AS ows_48,
        dws/ws * ws_48 AS dws_48
    FROM
        tmp_advanced_stats_deduped tas
    LEFT SEMI JOIN
        cte_season_count_filter cscf
        USING (player_id)
    WHERE
        season >= 1980
        AND g >= 41
    """)

df_season_filtered.createOrReplaceTempView('tmp_season_filtered')
print(df_season_filtered.count())
# df.show()

In [None]:
# STEP 4: Get the per-season league-wide mean and standard deviations for each advanced stat of interest.
# This will be used to standardize the performance of each player with regards to their peers *for a single season*
df_season_avg = spark.sql("""
    SELECT
        season,
        ROUND(AVG(ts_percent), 3) AS avg_ts_percent,
        ROUND(AVG(per), 3) AS avg_per,
        ROUND(AVG(mpg), 3) AS avg_mpg,
        ROUND(AVG(x3p_ar), 3) AS avg_x3p_ar,
        ROUND(AVG(orb_percent), 3) AS avg_orb_percent,
        ROUND(AVG(drb_percent), 3) AS avg_drb_percent,
        ROUND(AVG(ast_percent), 3) AS avg_ast_percent,
        ROUND(AVG(stl_percent), 3) AS avg_stl_percent,
        ROUND(AVG(blk_percent), 3) AS avg_blk_percent,
        ROUND(AVG(tov_percent), 3) AS avg_tov_percent,
        ROUND(AVG(usg_percent), 3) AS avg_usg_percent,
        ROUND(AVG(ows_48), 3) AS avg_ows_48,
        ROUND(AVG(dws_48), 3) AS avg_dws_48,
        ROUND(AVG(obpm), 3) AS avg_obpm,
        ROUND(AVG(dbpm), 3) AS avg_dbpm,
        ROUND(AVG(bpm), 3) AS avg_bpm,
        ROUND(AVG(vorp), 3) AS avg_vorp,

        ROUND(STD(ts_percent), 3) AS std_ts_percent,
        ROUND(STD(per), 3) AS std_per,
        ROUND(STD(mp/g), 3) AS std_mpg,
        ROUND(STD(x3p_ar), 3) AS std_x3p_ar,
        ROUND(STD(orb_percent), 3) AS std_orb_percent,
        ROUND(STD(drb_percent), 3) AS std_drb_percent,
        ROUND(STD(ast_percent), 3) AS std_ast_percent,
        ROUND(STD(stl_percent), 3) AS std_stl_percent,
        ROUND(STD(blk_percent), 3) AS std_blk_percent,
        ROUND(STD(tov_percent), 3) AS std_tov_percent,
        ROUND(STD(usg_percent), 3) AS std_usg_percent,
        ROUND(STD(ows_48), 3) AS std_ows_48,
        ROUND(STD(dws_48), 3) AS std_dws_48,
        ROUND(STD(obpm), 3) AS std_obpm,
        ROUND(STD(dbpm), 3) AS std_dbpm,
        ROUND(STD(bpm), 3) AS std_bpm,
        ROUND(STD(vorp), 3) AS std_vorp
    FROM
        tmp_season_filtered
    WHERE tm != 'TOT'
    GROUP BY season
    ORDER BY season
    """)


df_season_avg.createOrReplaceTempView("tmp_season_avg")
display(df_season_avg.toPandas()) #show()

In [None]:
# standardize each feature per-player-per-season using the overall mean and standard deviation of each feature per season
df_standardized = spark.sql("""
    SELECT
        player,
        player_id,
        season,
        g,
        ROUND((ts_percent - avg_ts_percent)/std_ts_percent, 3) AS ts_percent,
        ROUND((per - avg_per)/std_per, 3) AS per,
        ROUND((mpg - avg_mpg)/std_mpg, 3) AS mpg,
        ROUND((x3p_ar - avg_x3p_ar)/std_x3p_ar, 3) AS x3p_ar,
        ROUND((orb_percent - avg_orb_percent)/std_orb_percent, 3) AS orb_percent,
        ROUND((drb_percent - avg_drb_percent)/std_drb_percent, 3) AS drb_percent,
        ROUND((ast_percent - avg_ast_percent)/std_ast_percent, 3) AS ast_percent,
        ROUND((stl_percent - avg_stl_percent)/std_stl_percent, 3) AS stl_percent,
        ROUND((blk_percent - avg_blk_percent)/std_blk_percent, 3) AS blk_percent,
        ROUND((tov_percent - avg_tov_percent)/std_tov_percent, 3) AS tov_percent,
        ROUND((usg_percent - avg_usg_percent)/std_usg_percent, 3) AS usg_percent,
        ROUND((ows_48 - avg_ows_48)/std_ows_48, 3) AS ows_48,
        ROUND((dws_48 - avg_dws_48)/std_dws_48, 3) AS dws_48,
        ROUND((obpm - avg_obpm)/std_obpm, 3) AS obpm,
        ROUND((dbpm - avg_dbpm)/std_dbpm, 3) AS dbpm,
        ROUND((bpm - avg_bpm)/std_bpm, 3) AS bpm,
        ROUND((vorp - avg_vorp)/std_vorp, 3) AS vorp
    FROM
        tmp_season_filtered tsf
    INNER JOIN
        tmp_season_avg tsa
        USING (season)
    """)
df_standardized.createOrReplaceTempView("tmp_standardized")

display(df_standardized.toPandas())

In [None]:
df_final = spark.sql("""
SELECT
    player_id,
    player,
    SUM(g) AS games_played,
    COUNT(DISTINCT season) AS seasons_played,
    ROUND(AVG(ts_percent), 3) AS avg_ts_percent,
    ROUND(AVG(per), 3) AS avg_per,
    ROUND(AVG(mpg), 3) AS avg_mpg,
    ROUND(AVG(x3p_ar), 3) AS avg_x3p_ar,
    ROUND(AVG(orb_percent), 3) AS avg_orb_percent,
    ROUND(AVG(drb_percent), 3) AS avg_drb_percent,
    ROUND(AVG(ast_percent), 3) AS avg_ast_percent,
    ROUND(AVG(stl_percent), 3) AS avg_stl_percent,
    ROUND(AVG(blk_percent), 3) AS avg_blk_percent,
    ROUND(AVG(tov_percent), 3) AS avg_tov_percent,
    ROUND(AVG(usg_percent), 3) AS avg_usg_percent,
    ROUND(AVG(ows_48), 3) AS avg_ows_48,
    ROUND(AVG(dws_48), 3) AS avg_dws_48,
    ROUND(AVG(obpm), 3) AS avg_obpm,
    ROUND(AVG(dbpm), 3) AS avg_dbpm,
    ROUND(AVG(bpm), 3) AS avg_bpm,
    ROUND(AVG(vorp), 3) AS avg_vorp,
    ROUND(MAX(ts_percent), 3) AS max_ts_percent,
    ROUND(MAX(per), 3) AS max_per,
    ROUND(MAX(mpg), 3) AS max_mpg,
    ROUND(MAX(x3p_ar), 3) AS max_x3p_ar,
    ROUND(MAX(orb_percent), 3) AS max_orb_percent,
    ROUND(MAX(drb_percent), 3) AS max_drb_percent,
    ROUND(MAX(ast_percent), 3) AS max_ast_percent,
    ROUND(MAX(stl_percent), 3) AS max_stl_percent,
    ROUND(MAX(blk_percent), 3) AS max_blk_percent,
    ROUND(MAX(tov_percent), 3) AS max_tov_percent,
    ROUND(MAX(usg_percent), 3) AS max_usg_percent,
    ROUND(MAX(ows_48), 3) AS max_ows_48,
    ROUND(MAX(dws_48), 3) AS max_dws_48,
    ROUND(MAX(obpm), 3) AS max_obpm,
    ROUND(MAX(dbpm), 3) AS max_dbpm,
    ROUND(MAX(bpm), 3) AS max_bpm,
    ROUND(MAX(vorp), 3) AS max_vorp
FROM
    tmp_standardized
GROUP BY ALL
""")

print(df_final.count())
pdf_final = df_final.toPandas()
display(pdf_final)

In [None]:
pdf_final.to_csv('cleaned_data/nba_cleaned_data.csv')