In [2]:
# Import necessary modules
from pyspark.sql import SparkSession
import pandas as pd

In [3]:
# Start Spark session
spark = SparkSession.builder.appName("DraftAnalysis").getOrCreate()

In [19]:
# Load files into Spark DataFrames
draft_data = spark.read.csv("2024_draft_leagues_1283_recap_combined.csv", header=True, inferSchema=True)
draft_data.show(20)

+-------+---------+------------+-----+----+---------+--------+----------+---------------+
|     id|league_id|user_team_id|round|pick|player_id|position|autopicked|           time|
+-------+---------+------------+-----+----+---------+--------+----------+---------------+
|1705893|     1283|         714|    1|   1|      695|     MID|         0|8/03/2024 13:18|
|1705894|     1283|         719|    1|   2|      138|     DEF|         0|8/03/2024 13:18|
|1705901|     1283|         718|    1|   3|      706|     RUC|         0|8/03/2024 13:18|
|1705907|     1283|         717|    1|   4|      459|     MID|         0|8/03/2024 13:18|
|1705908|     1283|         713|    1|   5|      443|     RUC|         0|8/03/2024 13:19|
|1705909|     1283|         716|    1|   6|      197|     MID|         0|8/03/2024 13:19|
|1705910|     1283|         715|    1|   7|      458|     MID|         0|8/03/2024 13:19|
|1705913|     1283|         720|    1|   8|      629|     RUC|         0|8/03/2024 13:19|
|1705915| 

In [20]:
player_data = spark.read.csv("2024_SC_Player_list.csv", header=True, inferSchema=True)
player_data.show(20)

+---+---+----------+----------+-------+--------------+----------------+--------------+-------+------+------------------------+-----------------------------+------+--------------------+------+----+--------------------+----+--------------------+-----+--------+---------+
|_c0| id|first_name| last_name|team_id|previous_games|previous_average|previous_total|feed_id|hs_url|injury_suspension_status|injury_suspension_status_text|locked|       played_status|active|team|        player_stats|odds|           positions|notes|position|long_team|
+---+---+----------+----------+-------+--------------+----------------+--------------+-------+------+------------------------+-----------------------------+------+--------------------+------+----+--------------------+----+--------------------+-----+--------+---------+
|  0|  1|       Sam|     Berry|      1|             4|           50.75|           203|1012807|  NULL|                    NULL|                         NULL| false|{'status': 'pre',...|  true| A

In [12]:
# Import functions from PySpark
from pyspark.sql import functions as F

In [14]:
# Calculate median score
median_data = player_data.groupBy("player_id").agg(
    F.expr('percentile_approx(points, 0.5)').alias('median_score')
).orderBy(F.desc('median_score'))

# Replace NA values with 0
median_data = median_data.fillna({'median_score': 0})

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `player_id` cannot be resolved. Did you mean one of the following? [`feed_id`, `player_stats`, `team_id`, `active`, `hs_url`].;
'Aggregate ['player_id], ['player_id, 'percentile_approx('points, 0.5) AS median_score#371]
+- Relation [_c0#305,id#306,first_name#307,last_name#308,team_id#309,previous_games#310,previous_average#311,previous_total#312,feed_id#313,hs_url#314,injury_suspension_status#315,injury_suspension_status_text#316,locked#317,played_status#318,active#319,team#320,player_stats#321,odds#322,positions#323,notes#324,position#325,long_team#326] csv


In [None]:
# Join dataframes
heat_map_data = draft_data.join(
    median_data, draft_data["player_id"] == median_data["player_id"], "left"
)

In [None]:
# Normalize scores by position
positions = ['DEF', 'MID', 'RUC', 'FWD']
pos_count = [8*5, 8*7, 8*1, 8*5]

for i, pos in enumerate(positions):
    pos_data = heat_map_data.filter(
        (heat_map_data["pos_1"] == pos) | (heat_map_data["pos_2"] == pos)
    ).orderBy(F.desc('median_score'))

    mean_score = pos_data.select(F.mean("median_score")).collect()[0][0]
    stdev_score = pos_data.select(F.stddev("median_score")).collect()[0][0]

    -- Calculate normalized scores
    pos_data = pos_data.withColumn(
        f"{pos}_norm_score",
        F.round((F.col("median_score") - mean_score) / stdev_score, 3)
    )

    -- Add normalized scores to main data
    heat_map_data = heat_map_data.join(
        pos_data.select("player_id", f"{pos}_norm_score"),
        on="player_id",
        how="left"
    )

In [None]:
# Calculate final score
heat_map_data = heat_map_data.withColumn(
    "final_score",
    F.round(
        F.greatest(
            F.col("DEF_norm_score"),
            F.col("MID_norm_score"),
            F.col("RUC_norm_score"),
            F.col("FWD_norm_score")
        ), 4
    )
)

In [None]:
# Save the final dataframe as a CSV
heat_map_data.write.csv("/mnt/data/2024_draft_heat_map.csv", header=True, mode="overwrite")