In [1]:
import os

import pandas as pd
from pyspark.ml.feature import Imputer, StandardScaler, VectorAssembler
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import BooleanType, IntegerType

In [2]:
label_name = 'Hráč vyhrál'

In [3]:
spark = SparkSession.builder.appName("elo_app").getOrCreate()

21/12/22 18:23:36 WARN Utils: Your hostname, macusers-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.131 instead (on interface en0)
21/12/22 18:23:36 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/12/22 18:23:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/12/22 18:23:37 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


# Load main data

In [4]:
df = spark.read.csv('data/2_raw_unioned.csv.gz', header=True)
df.limit(5).toPandas()

Unnamed: 0,Hráč,Oddíl,Družstvo,Soupeř,Oddíl soupeř,Družstvo soupeř,Výsledek,Datum zápasu,Soutěž,Elo hráče,...,Elo (max),Elo soupeře,Elo (min) soupeře,Elo (max) soupeře,Rok narození,Max elo,Elo nejlepšího poraženého soupeře,ID Hráč,ID Soupeř,Rok narození soupeř
0,Tomáš Tregler,"HB Ostrov , z.s.",HB Ostrov H.Brod,Tomáš Martinko,Tělovýchovná jednota Ostrava,TJ Ostrava KST,0:3,26.11.2021,Extraliga mužů,2454-> 2444,...,2482-> 2470,2306-> 2318,2289-> 2301,2327-> 2338,1990,2458,2446,/st/hrac/tregler_tomas_1990,martinko_tomas_2002,2002
1,Tomáš Tregler,"HB Ostrov , z.s.",HB Ostrov H.Brod,Patrik Klos,Tělovýchovná jednota Ostrava,TJ Ostrava KST,3:1,26.11.2021,Extraliga mužů,2452-> 2454,...,2480-> 2482,2317-> 2315,2300-> 2299,2343-> 2341,1990,2458,2446,/st/hrac/tregler_tomas_1990,klos_patrik_1998,1998
2,Tomáš Tregler,"HB Ostrov , z.s.",HB Ostrov H.Brod,David Reitšpies,SKST Cheb,SKST Cheb,1:3,11.11.2021,Extraliga mužů,2458-> 2452,...,2485-> 2479,2409-> 2419,2356-> 2380,2476-> 2484,1990,2458,2446,/st/hrac/tregler_tomas_1990,reitspies_david_1996,1996
3,Tomáš Tregler,"HB Ostrov , z.s.",HB Ostrov H.Brod,Stanislav Kučera,SKST Cheb,SKST Cheb,3:0,11.11.2021,Extraliga mužů,2454-> 2458,...,2482-> 2485,2380-> 2377,2339-> 2337,2407-> 2403,1990,2458,2446,/st/hrac/tregler_tomas_1990,kucera_stanislav_1997,1997
4,Tomáš Tregler,"HB Ostrov , z.s.",HB Ostrov H.Brod,Martin Koblížek,Klub stolního tenisu KT Praha,KT Praha,3:0,5.11.2021,Extraliga mužů,2453-> 2454,...,2482-> 2482,2241-> 2241,2215-> 2215,2264-> 2263,1990,2458,2446,/st/hrac/tregler_tomas_1990,koblizek_martin_1996,1996


# Get players information only

In [5]:
players = (df
    .select(["ID hráč", "Max elo", "Elo nejlepšího poraženého soupeře"])
    .withColumn("ID hráč", F.substring(F.col("ID Hráč"), 10, 70))
    .distinct()
    .rdd.toDF(["ID hráč_soupeř", "Max elo_soupeř", "Elo nejlepšího poraženého soupeře_soupeř"])
)
players.limit(5).toPandas()

                                                                                

Unnamed: 0,ID hráč_soupeř,Max elo_soupeř,Elo nejlepšího poraženého soupeře_soupeř
0,zaboj_matej_2002,1893,2034
1,spacek_zbynek_1959,1917,2065
2,mokrejs_jan_2002,2110,2262
3,sikora_filip_1997,2022,2174
4,splichal_jan_1979,1899,1973


# Add players (opponents) information to the original games dataset

In [6]:
df_joined = df.join(players, on=(df['ID Soupeř'] == players['ID Hráč_soupeř']), how='left')
df_joined.limit(5).toPandas()

Unnamed: 0,Hráč,Oddíl,Družstvo,Soupeř,Oddíl soupeř,Družstvo soupeř,Výsledek,Datum zápasu,Soutěž,Elo hráče,...,Elo (max) soupeře,Rok narození,Max elo,Elo nejlepšího poraženého soupeře,ID Hráč,ID Soupeř,Rok narození soupeř,ID hráč_soupeř,Max elo_soupeř,Elo nejlepšího poraženého soupeře_soupeř
0,Tomáš Tregler,"HB Ostrov , z.s.",HB Ostrov H.Brod,Stanislav Kučera,SKST Cheb,SKST Cheb,3:0,11.11.2021,Extraliga mužů,2454-> 2458,...,2407-> 2403,1990,2458,2446,/st/hrac/tregler_tomas_1990,kucera_stanislav_1997,1997,kucera_stanislav_1997,2404,2425
1,Tomáš Tregler,"HB Ostrov , z.s.",HB Ostrov H.Brod,Martin Koblížek,Klub stolního tenisu KT Praha,KT Praha,3:0,5.11.2021,Extraliga mužů,2453-> 2454,...,2264-> 2263,1990,2458,2446,/st/hrac/tregler_tomas_1990,koblizek_martin_1996,1996,koblizek_martin_1996,2264,2339
2,Tomáš Tregler,"HB Ostrov , z.s.",HB Ostrov H.Brod,David Reitšpies,SKST Cheb,SKST Cheb,1:3,11.11.2021,Extraliga mužů,2458-> 2452,...,2476-> 2484,1990,2458,2446,/st/hrac/tregler_tomas_1990,reitspies_david_1996,1996,reitspies_david_1996,2419,2458
3,Tomáš Tregler,"HB Ostrov , z.s.",HB Ostrov H.Brod,Patrik Klos,Tělovýchovná jednota Ostrava,TJ Ostrava KST,3:1,26.11.2021,Extraliga mužů,2452-> 2454,...,2343-> 2341,1990,2458,2446,/st/hrac/tregler_tomas_1990,klos_patrik_1998,1998,klos_patrik_1998,2324,2419
4,Tomáš Tregler,"HB Ostrov , z.s.",HB Ostrov H.Brod,Tomáš Martinko,Tělovýchovná jednota Ostrava,TJ Ostrava KST,0:3,26.11.2021,Extraliga mužů,2454-> 2444,...,2327-> 2338,1990,2458,2446,/st/hrac/tregler_tomas_1990,martinko_tomas_2002,2002,martinko_tomas_2002,2327,2454


# Create new features

In [7]:
def get_season(dt):
    if dt >= 3 and dt <= 5:
        return 1
    if dt >= 6 and dt <= 8:
        return 2
    if dt >= 9 and dt <= 11:
        return 3
    return 4

# create pyspark UDF
udf_get_season = F.udf(get_season, IntegerType())

In [8]:
# apply transformations
result_df = (df_joined
    .withColumn("Datum zápasu", F.to_date(F.col("Datum zápasu"), 'd.m.yyyy'))   # convert to date
    .withColumn("obdobi", udf_get_season(F.month("Datum zápasu")))              # get year season
    .withColumn("Den v týdnu", F.dayofweek(F.col("Datum zápasu")) - 1)
    .withColumn("Víkend", F.col("Den v týdnu") > 6)
    .withColumn("Hráč sety", F.substring("Výsledek", 0, 1))                     # get players sets
    .withColumn("Soupeř sety", F.substring("Výsledek", -1, 1))                  # get opponents sets
    .withColumn("Hráč vyhrál", F.col("Hráč sety") > F.col("Soupeř sety"))
    .withColumn("Elo hráče před", F.split(F.col("Elo hráče"), '->').getItem(0))
    .withColumn("Elo soupeře před", F.split(F.col("Elo soupeře"), '->').getItem(1))
    .withColumn("Hráč je žena", F.col("Hráč").rlike(".*ová$"))
    .withColumn("Soupeř je žena", F.col("Soupeř").rlike(".*ová$"))
    .select(['Rok narození', 'Elo hráče před', 'Hráč je žena', 'Max elo', 'Elo nejlepšího poraženého soupeře',
        'Rok narození soupeř', 'Elo soupeře před', 'Soupeř je žena', 'Max elo_soupeř', 'Elo nejlepšího poraženého soupeře_soupeř',
        'Den v týdnu', 'Víkend', 'obdobi', 'Hráč vyhrál'])
)
result_df = result_df.select(*(F.col(c).cast("double").alias(c) for c in result_df.columns))
result_df.limit(5).toPandas()

                                                                                

Unnamed: 0,Rok narození,Elo hráče před,Hráč je žena,Max elo,Elo nejlepšího poraženého soupeře,Rok narození soupeř,Elo soupeře před,Soupeř je žena,Max elo_soupeř,Elo nejlepšího poraženého soupeře_soupeř,Den v týdnu,Víkend,obdobi,Hráč vyhrál
0,1990.0,2454.0,0.0,2458.0,2446.0,1997.0,2377.0,0.0,2404.0,2425.0,1.0,0.0,4.0,1.0
1,1990.0,2453.0,0.0,2458.0,2446.0,1996.0,2241.0,0.0,2264.0,2339.0,2.0,0.0,4.0,1.0
2,1990.0,2458.0,0.0,2458.0,2446.0,1996.0,2419.0,0.0,2419.0,2458.0,1.0,0.0,4.0,0.0
3,1990.0,2452.0,0.0,2458.0,2446.0,1998.0,2315.0,0.0,2324.0,2419.0,2.0,0.0,4.0,1.0
4,1990.0,2454.0,0.0,2458.0,2446.0,2002.0,2318.0,0.0,2327.0,2454.0,2.0,0.0,4.0,0.0


In [9]:
result_df.select('Hráč vyhrál').groupby('Hráč vyhrál').count().toPandas()

                                                                                

Unnamed: 0,Hráč vyhrál,count
0,0.0,2827534
1,1.0,3103891


Our label is pretty well balanced

In [10]:
result_df.describe().toPandas()

                                                                                

Unnamed: 0,summary,Rok narození,Elo hráče před,Hráč je žena,Max elo,Elo nejlepšího poraženého soupeře,Rok narození soupeř,Elo soupeře před,Soupeř je žena,Max elo_soupeř,Elo nejlepšího poraženého soupeře_soupeř,Den v týdnu,Víkend,obdobi,Hráč vyhrál
0,count,5931425.0,5931425.0,5931425.0,5928143.0,5928219.0,5931425.0,5931425.0,5931425.0,4248382.0,4248470.0,5931425.0,5931425.0,5931425.0,5931425.0
1,mean,1974.988180917739,1358.7082235719072,0.042905372654969,1461.2486998036316,1610.121636869353,1975.256277201516,1343.436152189398,0.0471085784613309,1472.660538529727,1619.898580900889,3.0331862916584127,0.0,4.0,0.5232960039113703
2,stddev,18.809642545545348,262.46993198730803,0.2026437972786701,251.3446583544276,227.68702035001095,18.717733731702715,282.033296310156,0.2118711114445313,251.55849379730608,228.2147897233473,1.9645722023332164,0.0,0.0,0.4994570434568486
3,min,1073.0,1.0,0.0,101.0,144.0,1073.0,1.0,0.0,101.0,144.0,0.0,0.0,4.0,0.0
4,max,2015.0,2458.0,1.0,2458.0,2458.0,2015.0,2458.0,1.0,2458.0,2458.0,6.0,0.0,4.0,1.0


Some values are missing.. Not a small number - lets impute them

# Impute

In [11]:
imp = Imputer(
    strategy='median', 
    inputCols=["Max elo", "Elo nejlepšího poraženého soupeře", "Max elo_soupeř", "Elo nejlepšího poraženého soupeře_soupeř"],
    outputCols=["Max elo", "Elo nejlepšího poraženého soupeře", "Max elo_soupeř", "Elo nejlepšího poraženého soupeře_soupeř"]
)
imp_model = imp.fit(result_df)
df_imputed = imp_model.transform(result_df)
df_imputed.limit(5).toPandas()

                                                                                

Unnamed: 0,Rok narození,Elo hráče před,Hráč je žena,Max elo,Elo nejlepšího poraženého soupeře,Rok narození soupeř,Elo soupeře před,Soupeř je žena,Max elo_soupeř,Elo nejlepšího poraženého soupeře_soupeř,Den v týdnu,Víkend,obdobi,Hráč vyhrál
0,1990.0,2454.0,0.0,2458.0,2446.0,1997.0,2377.0,0.0,2404.0,2425.0,1.0,0.0,4.0,1.0
1,1990.0,2453.0,0.0,2458.0,2446.0,1996.0,2241.0,0.0,2264.0,2339.0,2.0,0.0,4.0,1.0
2,1990.0,2458.0,0.0,2458.0,2446.0,1996.0,2419.0,0.0,2419.0,2458.0,1.0,0.0,4.0,0.0
3,1990.0,2452.0,0.0,2458.0,2446.0,1998.0,2315.0,0.0,2324.0,2419.0,2.0,0.0,4.0,1.0
4,1990.0,2454.0,0.0,2458.0,2446.0,2002.0,2318.0,0.0,2327.0,2454.0,2.0,0.0,4.0,0.0


In [12]:
# make sure we imputed missing values correctly
df_imputed.filter(F.col("Max elo").isNull()).limit(5).toPandas()



Unnamed: 0,Rok narození,Elo hráče před,Hráč je žena,Max elo,Elo nejlepšího poraženého soupeře,Rok narození soupeř,Elo soupeře před,Soupeř je žena,Max elo_soupeř,Elo nejlepšího poraženého soupeře_soupeř,Den v týdnu,Víkend,obdobi,Hráč vyhrál


# Scale

In [13]:
feature_cols = df_imputed.drop('Hráč vyhrál').columns

In [14]:
# create vector
assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol='features'
)
df_vector = assembler.transform(df_imputed)

# scale
ss = StandardScaler(inputCol='features', outputCol='features_scaled')
scaler_model = ss.fit(df_vector)
df_scaled = scaler_model.transform(df_vector)

df_scaled.limit(5).toPandas()

21/12/22 18:25:37 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
Traceback (most recent call last):                                              
  File "/Users/romanzdk/Repos/-VSE-Applied_ML/venv/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/Users/romanzdk/Repos/-VSE-Applied_ML/venv/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/Users/romanzdk/Repos/-VSE-Applied_ML/venv/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 663, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/Users/romanzdk/Repos/-VSE-Applied_ML/venv/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 564, in read_int
    raise EOFError
EOFError


Unnamed: 0,Rok narození,Elo hráče před,Hráč je žena,Max elo,Elo nejlepšího poraženého soupeře,Rok narození soupeř,Elo soupeře před,Soupeř je žena,Max elo_soupeř,Elo nejlepšího poraženého soupeře_soupeř,Den v týdnu,Víkend,obdobi,Hráč vyhrál,features,features_scaled
0,2002.0,1775.0,0.0,2327.0,2454.0,1996.0,1689.0,0.0,1739.0,1920.0,4.0,0.0,4.0,1.0,"[2002.0, 1775.0, 0.0, 2327.0, 2454.0, 1996.0, ...","[106.43477116337492, 6.762679391732501, 0.0, 9..."
1,2002.0,1729.0,0.0,2327.0,2454.0,1985.0,1690.0,0.0,1760.0,1927.0,4.0,0.0,4.0,1.0,"[2002.0, 1729.0, 0.0, 2327.0, 2454.0, 1985.0, ...","[106.43477116337492, 6.58742122158056, 0.0, 9...."
2,1998.0,1917.0,0.0,2324.0,2419.0,1987.0,1955.0,0.0,1981.0,2083.0,0.0,0.0,4.0,0.0,"[1998.0, 1917.0, 0.0, 2324.0, 2419.0, 1987.0, ...","[106.22211427793361, 7.303693743071101, 0.0, 9..."
3,2002.0,2088.0,0.0,2263.0,2334.0,1989.0,1963.0,0.0,1477.0,1622.0,3.0,0.0,4.0,1.0,"[2002.0, 2088.0, 0.0, 2263.0, 2334.0, 1989.0, ...","[106.43477116337492, 7.955196940809838, 0.0, 9..."
4,2002.0,1680.0,0.0,2263.0,2334.0,1981.0,1653.0,0.0,1477.0,1622.0,3.0,0.0,4.0,1.0,"[2002.0, 1680.0, 0.0, 2263.0, 2334.0, 1981.0, ...","[106.43477116337492, 6.400733170766536, 0.0, 9..."


# Output

In [15]:
df_final = (df_scaled
    .select(['features_scaled', 'Hráč vyhrál'])
    .withColumnRenamed('Hráč vyhrál', 'label')
    .withColumnRenamed('features_scaled', 'features')
)
df_final.write.mode('overwrite').parquet('data/4_processed')
with open('data/feature_list.txt', 'w') as f:
    for fe in feature_cols:
        f.write(f"{fe}\n")

df_final.limit(5).toPandas()

21/12/22 18:26:47 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

Unnamed: 0,features,label
0,"[106.43477116337492, 6.762679391732501, 0.0, 9...",1.0
1,"[106.43477116337492, 6.58742122158056, 0.0, 9....",1.0
2,"[106.22211427793361, 7.303693743071101, 0.0, 9...",0.0
3,"[106.43477116337492, 7.955196940809838, 0.0, 9...",1.0
4,"[106.43477116337492, 6.400733170766536, 0.0, 9...",1.0


Traceback (most recent call last):
  File "/Users/romanzdk/Repos/-VSE-Applied_ML/venv/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/Users/romanzdk/Repos/-VSE-Applied_ML/venv/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/Users/romanzdk/Repos/-VSE-Applied_ML/venv/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 663, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/Users/romanzdk/Repos/-VSE-Applied_ML/venv/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 564, in read_int
    raise EOFError
EOFError
