In [1]:
from pyspark.sql import SparkSession
import os
from pyspark.sql.functions import avg, col, count, lit, to_date, split, when, coalesce, round, unix_timestamp
from pyspark.sql.types import MapType, StringType
from pyspark.sql.window import Window
import pandas as pd

In [2]:
# Just for easier visualization
def show(df):
    pandas_df = df.toPandas()
    return pandas_df

In [3]:
spark = SparkSession.builder \
                    .appName('tennis') \
                    .getOrCreate()

In [4]:
singles=spark.read.option("header","true").option("inferSchema","true") \
.csv('singles')

In [5]:
from pyspark.sql.functions import col, count, round

singles = singles.withColumn('winner_entry', when(singles['winner_entry'].isNull(), "Standard").otherwise(singles['winner_entry']))
singles = singles.withColumn('loser_entry', when(singles['loser_entry'].isNull(), "Standard").otherwise(singles['loser_entry']))

null_counts = singles.select([round(((count(when(col(c).isNull(), c)) / singles.count()) * 100), 1).alias(c) for c in singles.columns])
show(null_counts)

Unnamed: 0,tourney_id,tourney_name,surface,draw_size,tourney_level,tourney_date,match_num,winner_id,winner_seed,winner_entry,...,l_1stIn,l_1stWon,l_2ndWon,l_SvGms,l_bpSaved,l_bpFaced,winner_rank,winner_rank_points,loser_rank,loser_rank_points
0,0.0,0.0,1.6,0.4,0.0,0.0,0.0,0.0,63.0,0.0,...,50.5,50.5,50.5,50.5,50.5,50.5,18.6,43.2,23.0,44.1


In [6]:
singles.printSchema()

root
 |-- tourney_id: string (nullable = true)
 |-- tourney_name: string (nullable = true)
 |-- surface: string (nullable = true)
 |-- draw_size: integer (nullable = true)
 |-- tourney_level: string (nullable = true)
 |-- tourney_date: integer (nullable = true)
 |-- match_num: integer (nullable = true)
 |-- winner_id: integer (nullable = true)
 |-- winner_seed: integer (nullable = true)
 |-- winner_entry: string (nullable = true)
 |-- winner_name: string (nullable = true)
 |-- winner_hand: string (nullable = true)
 |-- winner_ht: integer (nullable = true)
 |-- winner_ioc: string (nullable = true)
 |-- winner_age: double (nullable = true)
 |-- loser_id: integer (nullable = true)
 |-- loser_seed: integer (nullable = true)
 |-- loser_entry: string (nullable = true)
 |-- loser_name: string (nullable = true)
 |-- loser_hand: string (nullable = true)
 |-- loser_ht: integer (nullable = true)
 |-- loser_ioc: string (nullable = true)
 |-- loser_age: double (nullable = true)
 |-- score: string (

In [7]:
# Selecting relevant columns for feature engineering
selected_columns = ['surface',
                    'tourney_level',
                    'tourney_date',
                    'tourney_name',
                    'draw_size',
                    'round',
                    'best_of',
                    'winner_name',
                    'winner_hand',
                    'winner_rank',
                    'winner_ht',
                    'winner_ioc',
                    'winner_age',
                    'winner_entry',
                    'loser_name',
                    'loser_hand',
                    'loser_rank',
                    'loser_ht',
                    'loser_ioc',
                    'loser_age',
                    'loser_entry',
                    'w_ace',
                    'w_df',
                    'w_svpt',
                    'w_1stIn',
                    'w_1stWon',
                    'w_2ndWon',
                    'w_SvGms',
                    'l_ace',
                    'l_df',
                    'l_svpt',
                    'l_1stIn',
                    'l_1stWon',
                    'l_2ndWon',
                    'l_SvGms'
                   ]

# Filtering relevant columns
clean_df = singles.select(selected_columns)

# Kepping only Bo3 and Bo5 matches
clean_df = clean_df.filter(clean_df['best_of'] != 1)

# Removing Nulls
clean_df = clean_df.dropna()

# Changing date format 
clean_df = clean_df.withColumn('tourney_date', to_date(clean_df['tourney_date'].cast('string'), 'yyyyMMdd'))

show(clean_df)

Unnamed: 0,surface,tourney_level,tourney_date,tourney_name,draw_size,round,best_of,winner_name,winner_hand,winner_rank,...,w_1stWon,w_2ndWon,w_SvGms,l_ace,l_df,l_svpt,l_1stIn,l_1stWon,l_2ndWon,l_SvGms
0,Hard,A,1994-01-03,Adelaide,32,R32,3,Thomas Muster,L,9,...,21,13,8,6,6,64,30,17,15,8
1,Hard,A,1994-01-03,Adelaide,32,R32,3,Brett Steven,R,43,...,29,8,9,0,4,55,34,21,6,8
2,Hard,A,1994-01-03,Adelaide,32,R32,3,Karel Novacek,R,17,...,31,10,9,1,5,63,35,24,12,9
3,Hard,A,1994-01-03,Adelaide,32,R32,3,Jamie Morgan,R,64,...,38,24,16,2,5,99,52,32,21,15
4,Hard,A,1994-01-03,Adelaide,32,R32,3,David Rikl,L,95,...,27,11,10,2,2,74,48,29,11,10
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
92799,Hard,A,2020-10-19,Antwerp,32,R32,3,Daniel Evans,R,35,...,36,14,13,4,1,77,43,31,14,13
92800,Hard,A,2020-10-19,Antwerp,32,R32,3,Zizou Bergs,R,528,...,33,13,12,1,3,66,45,27,10,12
92801,Hard,A,2020-10-19,Antwerp,32,R32,3,Taylor Fritz,R,28,...,39,6,10,19,3,58,37,30,5,9
92802,Hard,A,2020-10-19,Antwerp,32,R32,3,Lloyd Harris,R,90,...,28,11,10,0,2,62,45,29,5,10


In [8]:
print(f'Number of rows: {clean_df.count()}')
print(f'Number of columns: {len(clean_df.columns)}')

Number of rows: 92804
Number of columns: 35


In [9]:
# Creating Label column. 1 if the first player won, 0 if the second player won.
clean_df = clean_df.withColumn("Winner", lit(1))

In [10]:
## Randomizing player 1 and 2 so that the label isn't always 1

# Dividing into two df randomly
split_1, split_2 = clean_df.randomSplit([0.5, 0.5], seed=10)

# Changing column order for split 2
split_2_order = ['surface',
                'tourney_level',
                'tourney_date',
                'tourney_name',
                'draw_size',
                'round',
                'best_of',
                'loser_name',
                'loser_hand',
                'loser_rank',
                'loser_ht',
                'loser_ioc',
                'loser_age',
                'loser_entry',
                'winner_name',
                'winner_hand',
                'winner_rank',
                'winner_ht',
                'winner_ioc',
                'winner_age',
                'winner_entry',
                'l_ace',
                'l_df',
                'l_svpt',
                'l_1stIn',
                'l_1stWon',
                'l_2ndWon',
                'l_SvGms',
                'w_ace',
                'w_df',
                'w_svpt',
                'w_1stIn',
                'w_1stWon',
                'w_2ndWon',
                'w_SvGms'
               ]
split_2 = split_2.select(split_2_order)

# Reverting the name change (at this point the names are wrong the winner_name will be the name of the loser actually, etc.
old_names = ['loser_name', 'loser_hand', 'loser_rank', 'loser_ht', 'loser_ioc', 'loser_age', 'loser_entry',
             'winner_name', 'winner_hand', 'winner_rank', 'winner_ht', 'winner_ioc', 'winner_age', 'winner_entry',
             'l_ace', 'l_df', 'l_svpt', 'l_1stIn', 'l_1stWon', 'l_2ndWon', 'l_SvGms',
             'w_ace', 'w_df', 'w_svpt', 'w_1stIn', 'w_1stWon', 'w_2ndWon', 'w_SvGms']

new_names = ['winner_name', 'winner_hand', 'winner_rank', 'winner_ht', 'winner_ioc', 'winner_age', 'winner_entry',
             'loser_name', 'loser_hand', 'loser_rank', 'loser_ht', 'loser_ioc', 'loser_age', 'loser_entry',
             'w_ace', 'w_df', 'w_svpt', 'w_1stIn', 'w_1stWon', 'w_2ndWon', 'w_SvGms',
             'l_ace', 'l_df', 'l_svpt', 'l_1stIn', 'l_1stWon', 'l_2ndWon', 'l_SvGms']

# Adjust label
split_2 = split_2.withColumn("Winner", lit(0))

# Merge them back together
clean_df = split_1.union(split_2)

In [11]:
# Changing column names from winner/loser to player 1/player 2
old_names = ['winner_name', 'winner_hand', 'winner_rank', 'winner_ht', 'winner_ioc', 'winner_age', 'winner_entry',
             'loser_name', 'loser_hand', 'loser_rank', 'loser_ht', 'loser_ioc', 'loser_age', 'loser_entry',
             'w_ace', 'w_df', 'w_svpt', 'w_1stIn', 'w_1stWon', 'w_2ndWon', 'w_SvGms',
             'l_ace', 'l_df', 'l_svpt', 'l_1stIn', 'l_1stWon', 'l_2ndWon', 'l_SvGms']

new_names = ['p1_name', 'p1_hand', 'p1_rank', 'p1_ht', 'p1_ioc', 'p1_age', 'p1_entry',
             'p2_name', 'p2_hand', 'p2_rank', 'p2_ht', 'p2_ioc', 'p2_age', 'p2_entry',
             'p1_ace', 'p1_df', 'p1_svpt', 'p1_1stIn', 'p1_1stWon', 'p1_2ndWon', 'p1_SvGms',
             'p2_ace', 'p2_df', 'p2_svpt', 'p2_1stIn', 'p2_1stWon', 'p2_2ndWon', 'p2_SvGms']

for old_name, new_name in zip(old_names, new_names):
    clean_df = clean_df.withColumnRenamed(old_name, new_name)

In [12]:
# From here it's still not working (trying to calculate the average of a stat for the past three months)

In [13]:
# Convert tourney_date to Unix timestamp
clean_df = clean_df.withColumn("tourney_date_unix", unix_timestamp(col("tourney_date")))

# Define window specification
windowSpec = Window.partitionBy("p1_name").orderBy("tourney_date_unix").rangeBetween(-90 * 86400, 0)

# Now you can use the window function with the converted tourney_date column
clean_df = clean_df.withColumn("avg_p1_ace", avg("p1_ace").over(windowSpec))

In [14]:
show(clean_df[['p1_ace', 'avg_p1_ace']])

Unnamed: 0,p1_ace,avg_p1_ace
0,3,3.0
1,6,4.0
2,3,4.0
3,6,4.5
4,8,5.2
...,...,...
92799,5,5.0
92800,1,1.0
92801,10,5.5
92802,2,2.0


### Encode categorical features

In [16]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import year


# Columns to encode
to_encode = ['surface', 'tourney_level', 'tourney_name', 'round', 
                     'p1_name', 'p1_hand', 'p1_ioc', 'p1_entry', 
                     'p2_name', 'p2_hand', 'p2_ioc', 'p2_entry']


# Create Ids
stages = []


for col in to_encode:
    indexer = StringIndexer(inputCol=col, outputCol=f"{col}_index")
    
    stages.append(indexer)


pipeline = Pipeline(stages=stages)

# Extract year from tourney_date
clean_df = clean_df.withColumn("tourney_year", year("tourney_date").cast("integer"))

# Encode categorical features
encoded_df = pipeline.fit(clean_df).transform(clean_df)
selected_features = [
    #old columns
    'tourney_year', 'best_of', 'p1_rank', 'p1_ht', 'p1_age', 'p2_rank', 'p2_ht', 'p2_age',
    'p1_ace', 'p1_df', 'p1_svpt', 'p1_1stIn', 'p1_1stWon', 'p1_2ndWon', 'p1_SvGms',
    'p2_ace', 'p2_df', 'p2_svpt', 'p2_1stIn', 'p2_1stWon', 'p2_2ndWon', 'p2_SvGms',
    'avg_p1_ace', 
    # encoded columns
    'surface_index', 'tourney_level_index', 'tourney_name_index',
    'round_index', 'p1_name_index', 'p1_hand_index', 'p1_ioc_index', 'p1_entry_index',
    'p2_name_index', 'p2_hand_index', 'p2_ioc_index', 'p2_entry_index',
    #Label
    'Winner'
]

encoded_df = encoded_df.select(selected_features)
show(encoded_df)

Unnamed: 0,tourney_year,best_of,p1_rank,p1_ht,p1_age,p2_rank,p2_ht,p2_age,p1_ace,p1_df,...,round_index,p1_name_index,p1_hand_index,p1_ioc_index,p1_entry_index,p2_name_index,p2_hand_index,p2_ioc_index,p2_entry_index,Winner
0,2015,3,130,185,29.2,68,183,33.1,6,4,...,0.0,816.0,0.0,0.0,0.0,208.0,0.0,7.0,0.0,0
1,2015,3,116,185,29.7,115,190,26.4,5,5,...,0.0,816.0,0.0,0.0,0.0,582.0,0.0,29.0,0.0,1
2,2017,3,175,185,31.5,91,183,23.0,1,2,...,0.0,816.0,0.0,0.0,1.0,277.0,0.0,5.0,0.0,0
3,2017,3,140,185,31.7,129,183,19.4,7,2,...,0.0,816.0,0.0,0.0,0.0,753.0,0.0,1.0,0.0,1
4,2017,3,140,185,31.7,72,188,26.3,1,0,...,1.0,816.0,0.0,0.0,0.0,278.0,0.0,2.0,0.0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
92799,2023,3,903,193,18.5,102,185,27.3,10,3,...,0.0,1392.0,0.0,57.0,2.0,279.0,0.0,21.0,0.0,0
92800,2023,3,905,193,18.5,25,183,24.3,2,1,...,0.0,1392.0,0.0,57.0,2.0,370.0,0.0,0.0,0.0,0
92801,2018,3,221,185,21.7,39,175,36.2,2,3,...,0.0,1263.0,0.0,9.0,1.0,3.0,0.0,0.0,0.0,0
92802,2022,5,134,185,25.6,162,185,28.2,4,8,...,3.0,1263.0,0.0,9.0,1.0,254.0,0.0,2.0,2.0,1


In [24]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create a Spark session
spark = SparkSession.builder.appName('RandomForestTennisPrediction').getOrCreate()

# Assuming 'final_df' contains the processed DataFrame with features and labels
# Check and adjust the feature column names if needed
feature_columns = ['tourney_year', 'best_of', 'p1_rank', 'p1_ht', 'p1_age', 'p2_rank', 'p2_ht', 'p2_age',
                   'p1_ace', 'p1_df', 'p1_svpt', 'p1_1stIn', 'p1_1stWon', 'p1_2ndWon', 'p1_SvGms',
                   'p2_ace', 'p2_df', 'p2_svpt', 'p2_1stIn', 'p2_1stWon', 'p2_2ndWon', 'p2_SvGms',
                   'avg_p1_ace', 
                   'surface_index', 'tourney_level_index', 'tourney_name_index',
                   'round_index', 'p1_name_index', 'p1_hand_index', 'p1_ioc_index', 'p1_entry_index',
                   'p2_name_index', 'p2_hand_index', 'p2_ioc_index', 'p2_entry_index']


final_df = encoded_df
problematic_feature_index = 30
if problematic_feature_index < len(feature_columns):
    del feature_columns[problematic_feature_index]

problematic_feature_index = 27
if problematic_feature_index < len(feature_columns):
    del feature_columns[problematic_feature_index]

problematic_feature_index = 29
if problematic_feature_index < len(feature_columns):
    del feature_columns[problematic_feature_index]

problematic_feature_index = 25
if problematic_feature_index < len(feature_columns):
    del feature_columns[problematic_feature_index]

problematic_feature_index = 29
if problematic_feature_index < len(feature_columns):
    del feature_columns[problematic_feature_index]

problematic_feature_index = 27
if problematic_feature_index < len(feature_columns):
    del feature_columns[problematic_feature_index]

# Vector column
assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')
final_df = assembler.transform(final_df)

# Split data
(train_data, test_data) = final_df.randomSplit([0.8, 0.2], seed=42)

#train the Random Forest
rf_classifier = RandomForestClassifier(featuresCol='features', labelCol='Winner', seed=42)
model = rf_classifier.fit(train_data)

# Make predictions on the test data
predictions = model.transform(test_data)

# Evaluate the model
evaluator = BinaryClassificationEvaluator(labelCol='Winner')
accuracy = evaluator.evaluate(predictions)

print(f"Accuracy: {accuracy}")
feature_importances = model.featureImportances
print("Feature Importances:")
for i, feature in enumerate(feature_columns):
    print(f"{feature}: {feature_importances[i]}")

# Stop the Spark session
spark.stop()

Accuracy: 0.8364138142677787
Feature Importances:
tourney_year: 7.855378579952219e-05
best_of: 0.0005504185724632567
p1_rank: 0.10612200686409738
p1_ht: 0.0
p1_age: 5.254705723951052e-05
p2_rank: 0.16824468308319632
p2_ht: 0.0007852767078389356
p2_age: 0.0
p1_ace: 0.06936018648452405
p1_df: 0.015334233575148473
p1_svpt: 0.02778969771124724
p1_1stIn: 0.019196244737760142
p1_1stWon: 0.1898715168717707
p1_2ndWon: 0.04543980848510739
p1_SvGms: 0.005589882487498846
p2_ace: 0.05914206447811003
p2_df: 0.03274253960724904
p2_svpt: 0.015494975225656016
p2_1stIn: 0.011940111233843866
p2_1stWon: 0.15877351094992892
p2_2ndWon: 0.053590284268586454
p2_SvGms: 0.013397064632509264
avg_p1_ace: 0.00410474136043584
surface_index: 0.0
tourney_level_index: 0.0
round_index: 0.0
p1_hand_index: 5.694852796815054e-05
p2_hand_index: 0.0
p2_entry_index: 0.0023427032920206145
