In [1]:
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import broadcast, substring, col, avg, monotonically_increasing_id, lit, when
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

In [2]:
sc = SparkContext(appName="Simple App")
spark = SparkSession(sc)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/08 11:47:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df_schema=StructType([
    StructField('tourney_id', StringType(), True),
    StructField('tourney_name', StringType(), True),
    StructField('surface', StringType(), True),
    StructField('draw_size', IntegerType(), True),
    StructField('tourney_level', StringType(), True),
    StructField('tourney_date', IntegerType(), True),
    StructField('match_num', IntegerType(), True),
    StructField('winner_id', IntegerType(), True),
    StructField('winner_seed', IntegerType(), True),
    StructField('winner_entry', IntegerType(), True),
    StructField('winner_name', StringType(), True),
    StructField('winner_hand', StringType(), True),
    StructField('winner_ht', IntegerType(), True),
    StructField('winner_ioc', StringType(), True),
    StructField('winner_age', FloatType(), True),
    StructField('loser_id', IntegerType(), True),
    StructField('loser_seed', IntegerType(), True),
    StructField('loser_entry', IntegerType(), True),
    StructField('loser_name', StringType(), True),
    StructField('loser_hand', StringType(), True),
    StructField('loser_ht', IntegerType(), True),
    StructField('loser_ioc', StringType(), True),
    StructField('loser_age', FloatType(), True),
    StructField('score', StringType(), True),
    StructField('best_of', IntegerType(), True),
    StructField('round', StringType(), True),
    StructField('minutes', IntegerType(), True),
    StructField('w_ace', IntegerType(), True),
    StructField('w_df', IntegerType(), True),
    StructField('w_svpt', IntegerType(), True),
    StructField('w_1stIn', IntegerType(), True),
    StructField('w_1stWon', IntegerType(), True),
    StructField('w_2ndWon', IntegerType(), True),
    StructField('w_SvGms', IntegerType(), True),
    StructField('w_bpSaved', IntegerType(), True),
    StructField('w_bpFaced', IntegerType(), True),
    StructField('l_ace', IntegerType(), True),
    StructField('l_df', IntegerType(), True),
    StructField('l_svpt', IntegerType(), True),
    StructField('l_1stIn', IntegerType(), True),
    StructField('l_1stWon', IntegerType(), True),
    StructField('l_2ndWon', IntegerType(), True),
    StructField('l_SvGms', IntegerType(), True),
    StructField('l_bpSaved', IntegerType(), True),
    StructField('l_bpFaced', IntegerType(), True),
    StructField('winner_rank', IntegerType(), True),
    StructField('winner_rank_points', IntegerType(), True),
    StructField('loser_rank', IntegerType(), True),
    StructField('loser_rank_points', IntegerType(), True)
])

df = spark.read.schema(df_schema).csv('../files/wta/wta_matches_*', header=True)

                                                                                

In [4]:
# Iga Swiatek ID 216347

swiatek_df = df.select(
    col('tourney_name'), 
    col('surface'),
    col('draw_size'),
    col('tourney_level'), 
    col('tourney_date'),
    col('winner_id'),
    col('winner_seed'),
    col('winner_name'),
    col('winner_hand'),
    col('winner_ioc'),
    col('winner_age'),
    col('loser_id'),
    col('loser_seed'),
    col('loser_name'),
    col('loser_hand'),
    col('loser_ioc'),
    col('loser_age'),
    col('score'),
    col('best_of'),
    col('round'),
    col('minutes'),
    col('w_ace'),
    col('w_df'),
    col('w_svpt'),
    col('w_1stIn'),
    col('w_1stWon'),
    col('w_2ndWon'),
    col('w_SvGms'),
    col('w_bpSaved'),
    col('w_bpFaced'),
    col('l_ace'),
    col('l_df'),
    col('l_svpt'),
    col('l_1stIn'),
    col('l_1stWon'),
    col('l_2ndWon'),
    col('l_SvGms'),
    col('l_bpSaved'),
    col('l_bpFaced'),
    col('winner_rank'),
    col('winner_rank_points'),
    col('loser_rank'),
    col('loser_rank_points')
    ).where((col('winner_id') == 216347) | (col('loser_id') == 216347)).withColumn('match_id', monotonically_increasing_id()).cache()

24/07/08 11:51:20 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [5]:
matches_over_the_years = swiatek_df.groupby(col('tourney_date')[0:4].alias('Year')).count().orderBy(col('Year')).withColumnRenamed('count', 'Matches')
wins_over_the_years = swiatek_df.where(col('winner_id') == 216347).groupby(col('tourney_date')[0:4].alias('Year')).count().orderBy(col('Year')).withColumnRenamed('count', 'Wins')
losses_over_the_years = swiatek_df.where(col('loser_id') == 216347).groupby(col('tourney_date')[0:4].alias('Year')).count().orderBy(col('Year')).withColumnRenamed('count', 'Losses')

matches_wins_losses_over_the_years = matches_over_the_years.join(wins_over_the_years,['Year'])
matches_wins_losses_over_the_years = matches_wins_losses_over_the_years.join(losses_over_the_years,['Year']).orderBy(col('Year'))

matches_wins_losses_percentage_over_the_years = matches_wins_losses_over_the_years.withColumn('%', (col('Wins')/ ((col('Wins')+col('Losses'))) * 100).cast('int') )
matches_wins_losses_percentage_over_the_years.show()

                                                                                

+----+-------+----+------+---+
|Year|Matches|Wins|Losses|  %|
+----+-------+----+------+---+
|2019|     28|  15|    13| 53|
|2020|     21|  16|     5| 76|
|2021|     51|  36|    15| 70|
|2022|     76|  67|     9| 88|
|2023|     82|  70|    12| 85|
|2024|     43|  39|     4| 90|
+----+-------+----+------+---+



In [7]:
matches_each_surface = swiatek_df.groupby(col('Surface')).count().withColumnRenamed('count', 'Matches')
wins_each_surface = swiatek_df.where(col('winner_id') == 216347).groupby(col('Surface')).count().withColumnRenamed('count', 'Wins')
losses_each_surface = swiatek_df.where(col('loser_id') == 216347).groupby(col('Surface')).count().withColumnRenamed('count', 'Losses')

matches_wins_losses_each_surface = matches_each_surface.join(wins_each_surface,['Surface'])
matches_wins_losses_each_surface = matches_wins_losses_each_surface.join(losses_each_surface,['Surface'])

matches_wins_losses_percentage_each_surface = matches_wins_losses_each_surface.withColumn('%', (col('Wins')/ ((col('Wins')+col('Losses'))) * 100).cast('int') )

matches_wins_losses_percentage_each_surface.show()

+-------+-------+----+------+---+
|Surface|Matches|Wins|Losses|  %|
+-------+-------+----+------+---+
|   Clay|     87|  77|    10| 88|
|   Hard|    193| 152|    41| 78|
|  Grass|     20|  13|     7| 65|
+-------+-------+----+------+---+



In [8]:
titles_by_level = swiatek_df.where((col('winner_id') == 216347) & (col('round') == 'F')).groupBy(col('tourney_level').alias('Tourney Level')).count()
titles_by_level = titles_by_level.select(when(col('Tourney Level') == 'M', 'Masters').when(col('Tourney Level') == 'A', 'ATP').when(col('Tourney Level') == 'G', 'Grand Slams').alias('Level'),col('count').alias('Titles'))
titles_by_level.show()

+-----------+------+
|      Level|Titles|
+-----------+------+
|       NULL|     5|
|       NULL|     1|
|Grand Slams|     4|
|       NULL|     2|
|       NULL|    10|
+-----------+------+



In [11]:
titles_by_name_level_year = swiatek_df.select(col('tourney_name').alias('Tourney Name'),when(col('tourney_level') == 'M', 'Masters').when(col('tourney_level') == 'A', 'ATP').when(col('tourney_level') == 'G', 'Grand Slams').alias('Level'), col('tourney_date')[0:4].alias('Year')).where((col('winner_id') == 216347) & (col('round') == 'F')).orderBy(col('Year'), col('Level'))
titles_by_name_level_year.show()

+-------------+-----------+----+
| Tourney Name|      Level|Year|
+-------------+-----------+----+
|Roland Garros|Grand Slams|2020|
|     Adelaide|       NULL|2021|
|         Rome|       NULL|2021|
|         Doha|       NULL|2022|
| Indian Wells|       NULL|2022|
|        Miami|       NULL|2022|
|    Stuttgart|       NULL|2022|
|         Rome|       NULL|2022|
|    San Diego|       NULL|2022|
|Roland Garros|Grand Slams|2022|
|      Us Open|Grand Slams|2022|
|         Doha|       NULL|2023|
|    Stuttgart|       NULL|2023|
|       Warsaw|       NULL|2023|
|      Beijing|       NULL|2023|
|Cancun Finals|       NULL|2023|
|Roland Garros|Grand Slams|2023|
|   United Cup|       NULL|2024|
|         Doha|       NULL|2024|
| Indian Wells|       NULL|2024|
+-------------+-----------+----+
only showing top 20 rows



In [16]:
rank_progression = swiatek_df.select(col('tourney_date')[0:6].alias('Date'), col('winner_rank').alias('Rank')).distinct().where(col('winner_id') == 216347).orderBy(col('Date'))
rank_progression.show(50)

+------+----+
|  Date|Rank|
+------+----+
|201802| 725|
|201901| 177|
|201902| 140|
|201902| 141|
|201904| 115|
|201905| 104|
|201907|  66|
|201908|  49|
|201908|  55|
|201908|  65|
|202001|  56|
|202002|  48|
|202002|  50|
|202008|  53|
|202009|  54|
|202101|  17|
|202102|  18|
|202102|  17|
|202103|  15|
|202104|  17|
|202105|   9|
|202105|  15|
|202106|   9|
|202107|   8|
|202108|   8|
|202109|   6|
|202110|   4|
|202111|   9|
|202201|   9|
|202202|   9|
|202202|   8|
|202203|   2|
|202203|   4|
|202204|   1|
|202205|   1|
|202206|   1|
|202207|   1|
|202208|   1|
|202210|   1|
|202301|   1|
|202302|   1|
|202303|   1|
|202304|   1|
|202305|   1|
|202306|   1|
|202307|   1|
|202308|   1|
|202309|   2|
|202310|   2|
|202401|   1|
+------+----+
only showing top 50 rows

