# Analysis Practice on NBA Players with Spark
- [**Dataset**](https://www.kaggle.com/datasets/justinas/nba-players-data?resource=download)
- Useful Functions
    - **Dataframe**
        - select
        - filter
        - sort
        - agg
        - groupby
        - join
        - drop
        - distinct
        - dropna
        - fillna
    - **Column**
        - alias
        - cast
        - between
        - contains
        - isnull & isnotnull
        - isin
        - ilike
            - `Spark >= v3.1.0`
    - [**Functions**](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html)

In [56]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.window import Window

import pprint

In [3]:
spark = (
    SparkSession.builder
    .master("local")
    .appName("analysis_nba_players")
    .getOrCreate()
)

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/14 13:03:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
spark.version

'3.2.0'

In [None]:
df = spark.read.format("csv").option("header", "true").load("all_seasons.csv")

In [None]:
df = df.drop("_c0")

## Analysis

### (DataFrame) Descending Order on Country

In [None]:
df.groupby("country").count().sort(desc("count")).show()

### (DataFrame) Top Scorer Every Seasons
- gp = game-played
- pts = points

In [None]:
df.filter(df.player_name == "Dennis Rodman").count()

In [None]:
df_tmp = df.select("player_name", expr("gp * pts as total_score"), "season")

In [None]:
df_tmp.show()

In [None]:
df_tmp.select("player_name").distinct().count()

In [None]:
(df_tmp
     .groupby("season")
     .agg(max("total_score"), first("player_name"))
     .withColumnRenamed("max(total_score)", "top_score")
     .withColumnRenamed("first(player_name, false)", "player_name")
     .sort(desc("season"))
     .show())

In [None]:
df_tmp.select("season").distinct().sort(desc("season")).count()

### (DataFrame) Check Specific Player

In [None]:
df.filter((df.player_name.contains("Steph")) & (df.player_name.contains("Curry"))).show()

### (Column) Load Dataset with Fixed Schema
- All-Types to StringType is `Possible`
- StringType to All-Types is `Impossible`
    - by Manual Casting

In [None]:
# alias

# df.select(expr("player_name as player")).show()
# df.select(df.player_name.alias("player")).show()

In [None]:
# return null-contained row first or last

# df.select("*").orderBy(df.player_name.asc_nulls_first()).show()
# df.select("*").orderBy(df.player_name.asc_nulls_last()).show()

In [None]:
# df.columns

In [8]:
string_cols = [
    'player_name',
    'team_abbreviation',
    'college',
    'country',
    'draft_year',
    'season'
]

integer_cols = [
    'age',
    'draft_round',
    'draft_number',
    'gp',
    'pts'
]

double_cols = [
    'player_height',
    'player_weight',
    'reb',
    'ast',
    'net_rating',
    'oreb_pct',
    'dreb_pct',
    'usg_pct',
    'ts_pct',
    'ast_pct'
]

In [9]:
fixed_schema = []
for col in df.schema:
    if col.name in string_cols:
        data_type = StringType()
    elif col.name in integer_cols:
        data_type = IntegerType()
    elif col.name in double_cols:
        data_type = DoubleType()
    else:
        raise ValueError(f"[NOT_FOUND_DATA_TYPE]COLUMN::{col}")
    fixed_schema.append(StructField(name=col.name, dataType=data_type, nullable=True))

In [10]:
fixed_schema = StructType(fixed_schema)

In [11]:
fixed_schema

StructType(List(StructField(player_name,StringType,true),StructField(team_abbreviation,StringType,true),StructField(age,IntegerType,true),StructField(player_height,DoubleType,true),StructField(player_weight,DoubleType,true),StructField(college,StringType,true),StructField(country,StringType,true),StructField(draft_year,StringType,true),StructField(draft_round,IntegerType,true),StructField(draft_number,IntegerType,true),StructField(gp,IntegerType,true),StructField(pts,IntegerType,true),StructField(reb,DoubleType,true),StructField(ast,DoubleType,true),StructField(net_rating,DoubleType,true),StructField(oreb_pct,DoubleType,true),StructField(dreb_pct,DoubleType,true),StructField(usg_pct,DoubleType,true),StructField(ts_pct,DoubleType,true),StructField(ast_pct,DoubleType,true),StructField(season,StringType,true)))

In [12]:
# 1. load
# df = spark.read.format("csv").option("header", "true").load("all_seasons.csv")

In [13]:
# 2. drop unused columns
# drop_cols = [
#     "_c0"
# ]

# df = df.drop(*drop_cols)

In [14]:
# 3. apply fixed-schema
# df_tmp = spark.createDataFrame(df.rdd, fixed_schema)

In [15]:
# df_tmp.printSchema()

### (Optional) Manual Casting
- e.g. `String Value to IntegerType()`

In [16]:
# test

# schema = StructType([
#     StructField("age", StringType(), True),
#     StructField("region", StringType(), True)
# ])

# df = spark.createDataFrame([(2, "30"), (5, "40")], schema=schema)
# df = df.withColumn("region", df.region.cast("integer"))

In [90]:
df = spark.read.format("csv").option("header", "true").load("all_seasons.csv").drop("_c0")

In [91]:
for col in df.columns:
    if col in string_cols:
        data_type = "string"
    elif col in integer_cols:
        data_type = "integer"
    elif col in double_cols:
        data_type = "double"
    else:
        raise ValueError(f"[NOT_FOUND_DATA_TYPE]COLUMN::{col}")
    df = df.withColumn(col, df[col].cast(data_type))

In [92]:
pprint.pprint(df.take(1))

[Row(player_name='Dennis Rodman', team_abbreviation='CHI', age=36, player_height=198.12, player_weight=99.79024, college='Southeastern Oklahoma State', country='USA', draft_year='1986', draft_round=2, draft_number=27, gp=55, pts=5, reb=16.1, ast=3.1, net_rating=16.1, oreb_pct=0.18600000000000003, dreb_pct=0.32299999999999995, usg_pct=0.1, ts_pct=0.479, ast_pct=0.113, season='1996-97')]


In [93]:
# counts on age 20 - 30
df.filter(df.age.between(20, 30)).groupby("age").count().sort(desc("age")).show()

+---+-----+
|age|count|
+---+-----+
| 30|  711|
| 29|  785|
| 28|  868|
| 27|  972|
| 26| 1015|
| 25| 1141|
| 24| 1277|
| 23| 1164|
| 22|  771|
| 21|  500|
| 20|  286|
+---+-----+



### (Column) Add New Struct Column with Other Columns
- Add
- Casting Fields
    - `Spark >= v3.1.0`
- Drop
    - `Spark >= v3.1.0`

In [94]:
df = df.withColumn("player_info", struct(df.player_name, df.player_height, df.player_weight))

In [95]:
df.select("player_info.player_name").head(1)

[Row(player_name='Dennis Rodman')]

In [96]:
# (!) spark version issue
# df.withColumn("player_info", df.player_info.withField("player_height", df.player_info.player_height.cast("string"))).printSchema()

In [97]:
# (!) spark version issue
# df.withColumn("player_info", df.player_info.dropFields("player_height")).printSchema()

In [98]:
# startswith & endswith

# df.filter(df.player_info.player_name.endswith("Curry")).show()
# df.filter(df.player_info.player_name.startswith("Curry")).show()

In [99]:
# isin

# df.filter(df.player_name.isin(["Stephen Curry", "Dennis Rodman"])).show()

In [100]:
# like

# df.filter(df.player_name.like("%Stephen%")).show()

### (Column) Case When Then Else
- with `substr()`

In [101]:
(df
 .filter(df.player_name.substr(1, 10).like("%Stephen%"))
 .select(df.age, when(df.player_name == "Stephen Curry", "Jay Park").otherwise(df.player_name).alias("converted_player_name"))
 .show())

+---+---------------------+
|age|converted_player_name|
+---+---------------------+
| 26|       Stephen Howard|
| 27|       Stephen Howard|
| 23|      Stephen Jackson|
| 24|      Stephen Jackson|
| 25|      Stephen Jackson|
| 26|      Stephen Jackson|
| 27|      Stephen Jackson|
| 24|       Stephen Graham|
| 28|      Stephen Jackson|
| 29|      Stephen Jackson|
| 25|       Stephen Graham|
| 26|       Stephen Graham|
| 30|      Stephen Jackson|
| 31|      Stephen Jackson|
| 27|       Stephen Graham|
| 32|      Stephen Jackson|
| 28|       Stephen Graham|
| 22|             Jay Park|
| 33|      Stephen Jackson|
| 29|       Stephen Graham|
+---+---------------------+
only showing top 20 rows



### (Function) Add New Column with Literal
- List Support
    - `Spark >= v3.4.0`

In [102]:
df.select(lit(1).alias("lit_col"), "player_name").head(1)

[Row(lit_col=1, player_name='Dennis Rodman')]

In [103]:
# List Support

# df.select(lit([1, 2, 3]).alias("lit_col"), "player_name").head(1)

### (Function) Broadcast Join Test
- Check Column Name Duplication before Join

In [104]:
df_small = df.limit(100)

In [105]:
df_cols = set(df.columns)

In [106]:
df_small_cols = set(df_small.columns)

In [107]:
# intersection = function of python list

duplicated_cols = df_cols.intersection(df_small_cols)
duplicated_cols

{'age',
 'ast',
 'ast_pct',
 'college',
 'country',
 'draft_number',
 'draft_round',
 'draft_year',
 'dreb_pct',
 'gp',
 'net_rating',
 'oreb_pct',
 'player_height',
 'player_info',
 'player_name',
 'player_weight',
 'pts',
 'reb',
 'season',
 'team_abbreviation',
 'ts_pct',
 'usg_pct'}

In [108]:
# change duplicated to unique

# for duplicated_col in duplicated_cols:
#     df = df.withColumnRenamed(duplicated_col, f"org_{duplicated_col}")
#     df = df.withColumnRenamed(f"org_{duplicated_col}", duplicated_col)

In [109]:
# df_b = broadcast(df_small)
# df_join = df.join(df_b, on=["player_name"], how="inner").select("*")

### (Function) Check NaN & Null Values

In [110]:
df.select("player_name", "player_height", isnan("gp").alias("is_nan_gp"), isnull("season").alias("is_null_season")).show()

+-----------------+-------------+---------+--------------+
|      player_name|player_height|is_nan_gp|is_null_season|
+-----------------+-------------+---------+--------------+
|    Dennis Rodman|       198.12|    false|         false|
|Dwayne Schintzius|        215.9|    false|         false|
|     Earl Cureton|       205.74|    false|         false|
|      Ed O'Bannon|        203.2|    false|         false|
|      Ed Pinckney|       205.74|    false|         false|
|    Eddie Johnson|       200.66|    false|         false|
|      Eddie Jones|       198.12|    false|         false|
|   Elden Campbell|       213.36|    false|         false|
|Eldridge Recasner|       193.04|    false|         false|
|     Elliot Perry|       182.88|    false|         false|
|    Elmer Bennett|       182.88|    false|         false|
|   Elmore Spencer|       213.36|    false|         false|
|    Emanual Davis|       195.58|    false|         false|
|    Ennis Whatley|        190.5|    false|         fals

In [111]:
# check # of partition of row

# df.repartition(1).select(spark_partition_id().alias("pid")).show()

### (Function) Greatest Value among lots of Columns
- on Every Records

In [112]:
# the greatest numeric value among age, gp, pts columns

# df.select(greatest(df.age, df.gp, df.pts)).show()

### (Math) Test on Numeric Columns
- [Document](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html#math-functions)

In [113]:
# numeric to binary

# df.select(bin(df.age).alias("binary_col")).show()

### (Date) Test on Date Columns
- [Document](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html#datetime-functions)

In [114]:
# df.printSchema()

In [115]:
# add months

# df.select(add_months(current_date(), 2).alias("now")).show()
# df.select(add_months(current_timestamp(), 2).alias("now")).show()

In [116]:
# add days

# df.select(date_add(df.draft_year, 30)).show()

In [117]:
# date format

# df.select(date_format(df.draft_year, "yyyy-MM-dd")).show()

### Date Diff on Draft Year & Season

In [118]:
df = df.withColumn("experience", (datediff(df.season.substr(0, 4), df.draft_year) / 365))

In [119]:
df = df.withColumn("is_veteran", df.experience > 10)

In [120]:
# automatic cast integer (dayofmonth) to string

# df.select(when(df.draft_year != "Undrafted", dayofmonth(df.draft_year)).otherwise(df.draft_year).alias("day_of_month_draft_year")).show()

### (Window) Window Test on `pts` of Every Teams

In [121]:
df_tmp = df.groupby("team_abbreviation", "season").agg(avg("pts").alias("avg_pts"))

In [122]:
# rank based on avg_pts of every teams => pick 1st

window_spec = Window.partitionBy("team_abbreviation", "season").orderBy(df_tmp.avg_pts.desc())
df_tmp = df_tmp.withColumn("rank", rank().over(window_spec)).select("season", "team_abbreviation", "avg_pts", "rank")
df_tmp.filter(df_tmp.rank == 1).sort(desc("season"), desc("avg_pts")).show()

+-------+-----------------+------------------+----+
| season|team_abbreviation|           avg_pts|rank|
+-------+-----------------+------------------+----+
|2021-22|              BKN|10.117647058823529|   1|
|2021-22|              LAC|            9.8125|   1|
|2021-22|              GSW|             9.125|   1|
|2021-22|              CHA| 8.764705882352942|   1|
|2021-22|              IND|              8.55|   1|
|2021-22|              MIL| 8.526315789473685|   1|
|2021-22|              HOU|               8.5|   1|
|2021-22|              SAC|              8.35|   1|
|2021-22|              LAL| 8.333333333333334|   1|
|2021-22|              OKC|  8.26923076923077|   1|
|2021-22|              MIN| 8.176470588235293|   1|
|2021-22|              NOP| 8.166666666666666|   1|
|2021-22|              POR| 7.958333333333333|   1|
|2021-22|              ORL|7.9523809523809526|   1|
|2021-22|              MIA|7.9523809523809526|   1|
|2021-22|              DEN|              7.95|   1|
|2021-22|   

### (Collection) Test on Sample Code in Document
- filter with `UDF`
- zip_with with `UDF`

In [123]:
# filter

# df_tmp = spark.createDataFrame(
#     [(1, ["2018-09-20",  "2019-02-03", "2019-07-01", "2020-06-01"])],
#     ("key", "values")
# )

# # UDF
# def after_second_quarter(x):
#     return month(to_date(x)) > 6

# df_tmp.select(
#     filter("values", after_second_quarter).alias("after_second_quarter")
# ).show(truncate=False)

In [124]:
# zip_with
# df = spark.createDataFrame([(1, ["foo", "bar"], [1, 2, 3])], ("id", "xs", "ys"))

# concat_ws
# df.select(zip_with("xs", "ys", lambda x, y: concat_ws("_", x, y)).alias("xs_ys")).show()

### (Agg) Frequently Used Functions

In [129]:
# size => length of collection

df.select(size(collect_set(df.team_abbreviation))).show()

+------------------------------------+
|size(collect_set(team_abbreviation))|
+------------------------------------+
|                                  36|
+------------------------------------+



### Check Latest Draft on Every Teams
- e.g. NJN = New Jersey Nets `doesn't exist anymore`

In [131]:
df.groupby("team_abbreviation").agg(max("season")).sort(desc("team_abbreviation")).show()

+-----------------+-----------+
|team_abbreviation|max(season)|
+-----------------+-----------+
|              WAS|    2021-22|
|              VAN|    2000-01|
|              UTA|    2021-22|
|              TOR|    2021-22|
|              SEA|    2007-08|
|              SAS|    2021-22|
|              SAC|    2021-22|
|              POR|    2021-22|
|              PHX|    2021-22|
|              PHI|    2021-22|
|              ORL|    2021-22|
|              OKC|    2021-22|
|              NYK|    2021-22|
|              NOP|    2021-22|
|              NOK|    2006-07|
|              NOH|    2012-13|
|              NJN|    2011-12|
|              MIN|    2021-22|
|              MIL|    2021-22|
|              MIA|    2021-22|
+-----------------+-----------+
only showing top 20 rows



### Check Correlation on Height & Weight

In [133]:
# 0.82 (max 1)

df.select(corr(df.player_height, df.player_weight)).show()

+----------------------------------+
|corr(player_height, player_weight)|
+----------------------------------+
|                0.8254213536883795|
+----------------------------------+



In [135]:
df.select(count_distinct("team_abbreviation")).show()

+---------------------------------+
|count(DISTINCT team_abbreviation)|
+---------------------------------+
|                               36|
+---------------------------------+



### Check Statistics on Age
- generally, kurtosis 3 = Normal Distribution
- generally, skewness 0 = Symmetry Distribution
- kurtosis = 첨도 (데이터 분포의 뾰족한 정도)
- skewness = 왜도 (데이터 분포의 비대칭 정도)

In [141]:
df.select(kurtosis("age")).show()
df.select(skewness("age")).show()
df.select(stddev("age")).show()

+------------------+
|     kurtosis(age)|
+------------------+
|-0.239257737481414|
+------------------+

+------------------+
|     skewness(age)|
+------------------+
|0.5606653734849604|
+------------------+

+----------------+
|stddev_samp(age)|
+----------------+
|4.33586763083955|
+----------------+



In [139]:
df.agg(min("age"), mean("age")).show()

+--------+-----------------+
|min(age)|         avg(age)|
+--------+-----------------+
|      18|27.08451848841934|
+--------+-----------------+



### (Optional) Tracking Previous Value by Window

In [144]:
# df_tmp = spark.createDataFrame([("a", 1),
#                             ("a", 2),
#                             ("a", 3),
#                             ("b", 8),
#                             ("b", 2)], ["c1", "c2"])

# w = Window.partitionBy("c1").orderBy("c2")

# first result = 0
# df_tmp.withColumn("previos_value", lag("c2", 1, 0).over(w)).show()

+---+---+-------------+
| c1| c2|previos_value|
+---+---+-------------+
|  a|  1|            0|
|  a|  2|            1|
|  a|  3|            2|
|  b|  2|            0|
|  b|  8|            2|
+---+---+-------------+



### (Optioanl) Deduplication within Specific Column
- with `row_number()`
    - Select First Record Condition `row_number == 1`

In [None]:
# df = spark.createDataFrame([(1, "A", "2022-01-01"),
#                             (2, "B", "2022-01-02"),
#                             (3, "A", "2022-01-02"),
#                             (4, "C", "2022-01-03"),
#                             (5, "B", "2022-01-03")],
#                            ["id", "group", "timestamp"])

# df_sorted = df.orderBy(desc("timestamp"))
# window_spec = Window.partitionBy("group").orderBy("timestamp")
# df_with_row_number = df_sorted.withColumn("row_number", row_number().over(window_spec))
# df_first_distinct = df_with_row_number.filter(col("row_number") == 1)

# df_first_distinct.show()

### Show Numbers after `.` on String-Numeric

In [146]:
# df.select(format_number("pts", 4)).show()

In [149]:
# format string

df.select(format_string("%s는 %d살이고, 키는 $fcm입니다.", df.player_name, df.age, df.player_height).alias("format_string_col")).show()

+------------------------+
|       format_string_col|
+------------------------+
|   Dennis Rodman는 36...|
|    Dwayne Schintzius...|
|  Earl Cureton는 39살...|
| Ed O'Bannon는 24살이...|
| Ed Pinckney는 34살이...|
|   Eddie Johnson는 38...|
| Eddie Jones는 25살이...|
|   Elden Campbell는 2...|
|    Eldridge Recasner...|
|  Elliot Perry는 28살...|
|   Elmer Bennett는 27...|
|   Elmore Spencer는 2...|
|   Emanual Davis는 28...|
|   Ennis Whatley는 34...|
|  Eric Leckner는 31살...|
| Eric Mobley는 27살이...|
|   Eric Montross는 25...|
|  Eric Murdock는 29살...|
|   Eric Piatkowski는 ...|
|Eric Snow는 24살이고,...|
+------------------------+
only showing top 20 rows



In [160]:
# sentence split based on `.`

df.select(sentences(format_string("%s는 %d살이고, 키는 $fcm입니다. 정보가 도움이 됐나요?", df.player_name, df.age, df.player_height)).alias("format_string_col")).head(1)

[Row(format_string_col=[['Dennis', 'Rodman는', '36살이고', '키는', 'fcm입니다'], ['정보가', '도움이', '됐나요']])]

### Check Locate on String Column

In [152]:
# first index = 1

# df.select(locate("Rodman", df.player_name, 1)).show()

### Filter by RegExp on String Column

In [154]:
# col, pattern, idx

# df.filter(regexp_extract("player_name", "Curry", 0) != '').show()

### Translate Test on String Column
- `If this is shorter than matching string then those chars that don’t have replacement will be dropped.`

In [163]:
df.select(translate("player_name", "Rodman", "Park")).show()

+------------------------------------+
|translate(player_name, Rodman, Park)|
+------------------------------------+
|                           Deis Park|
|                      Dwye Schitzius|
|                          Erl Cureta|
|                             Er O'Ba|
|                           Er Pickey|
|                         Errie Jahsa|
|                          Errie Jaes|
|                        Elre Ckpbell|
|                     Elrrirge Pecser|
|                        Elliat Perry|
|                         Elker Beett|
|                       Elkare Specer|
|                           Ekul Dvis|
|                          Eis Whtley|
|                         Eric Lecker|
|                         Eric Mabley|
|                        Eric Matrass|
|                        Eric Murrack|
|                      Eric Pitkawski|
|                            Eric Saw|
+------------------------------------+
only showing top 20 rows



### (Optional) DayTimeIntervalType
- not Working in Spark v3.2.0

In [126]:
# schema = StructType([
#     StructField("job_id", IntegerType(), False),
#     StructField("start_time", DayTimeIntervalType(), False),
#     StructField("end_time", DayTimeIntervalType(), False)
# ])