# Markov Chain

> Klaasen and Magnus [12] show that points in tennis are approximately independent and indenticallydistributed (iid).  This finding allows us to assume that for any point played during the match, thepoint outcome does not depend on any of the previous points. Let’s further assume that we know theprobability of each player winning a point on their serve. Namely, letpbe the probability that playerAwins a point on their serve, andqthe probability that playerBwins a point on their serve. Usingthe iid assumption and the point-winning probabilities, we can formulate a Markov chain describing theprobability of a player winning a game.

## Data Preprocessing

The paper by Bernett and Clark describes how to find the serve-winning probabilities for matches that have not been played using historical player statistics:

$$
\begin{align}
    f_i &= a_i b_i + (1 - a_i)c_i \\
    g_i &= a_{av} d_i + (1-a_{av}) e_i
\end{align}
$$

Where: 

$$
\begin{align}
    f_i &= \text{percentage of points won on serve for player }i \\
    g_i &= \text{percentage of points won on return for player }i \\
    a_i &= \text{first serve percentage of player }i \\
    a_{av} &= \text{average first serve percentage (across all players)} \\
    b_i &= \text{first serve win percentage of player }i \\
    c_i &= \text{second serve win percentage of player }i \\
    d_i &= \text{first service return points win percentage of player }i \\
    e_i &= \text{second service return points win percentage of player }i \\
\end{align}
$$

In [1]:
# Init
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, when, lit, sum, avg, max

spark = SparkSession \
        .builder \
        .appName("Python Spark SQL basic example") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()

def init_df(): 
    return spark.read \
        .csv("./dataset/all_matches.csv", header=True)

In [2]:
def average_first_serve_percentage():
    df = init_df()
    df = df.select(["player_id", "first_serve_made", "first_serve_attempted"]) \
    .dropna() \
    .groupBy(['player_id']) \
    .agg(sum('first_serve_made'),
        sum('first_serve_attempted'))
    df = df.withColumn('first_serve_percentage', df['sum(first_serve_made)'] / df['sum(first_serve_attempted)']) \
    .groupBy() \
    .agg(avg('first_serve_percentage').alias('average_first_serve_percentage'))
    
    return df.collect()[0]['average_first_serve_percentage']

def find_players_statistics(players):
    a_av = average_first_serve_percentage()
    
    df = init_df()
    df = df.select([
            "player_id", 
            "service_points_won", 
            "service_points_attempted", 
            "return_points_won", 
            "return_points_attempted",
            "first_serve_made", 
            "first_serve_attempted",
            "first_serve_points_made",
            "second_serve_points_made",
            "second_serve_points_attempted",
            "first_serve_return_points_made",
            "first_serve_return_points_attempted",
            "second_serve_return_points_made",
            "second_serve_return_points_attempted"
        ]) \
        .where(df['player_id'].isin(players)) \
        .dropna() \
        .groupBy(['player_id']) \
        .agg(
            sum("service_points_won"), 
            sum("service_points_attempted"), 
            sum("return_points_won"),
            sum("return_points_attempted"),
            sum('first_serve_made'),
            sum('first_serve_attempted'),
            sum('first_serve_points_made'),
            sum('second_serve_points_made'),
            sum('second_serve_points_attempted'),
            sum("first_serve_return_points_made"),
            sum("first_serve_return_points_attempted"),
            sum("second_serve_return_points_made"),
            sum("second_serve_return_points_attempted")
        )
    df = df \
        .withColumn('first_serve_percentage', df['sum(first_serve_made)'] / df['sum(first_serve_attempted)']) \
        .withColumn('first_serve_win_percentage', df['sum(first_serve_points_made)'] / df['sum(first_serve_made)']) \
        .withColumn('second_serve_win_percentage', df['sum(second_serve_points_made)'] / df['sum(second_serve_points_attempted)']) \
        .withColumn('first_service_return_points_win_percentage', df['sum(first_serve_return_points_made)'] / df['sum(first_serve_return_points_attempted)']) \
        .withColumn('second_service_return_points_win_percentage', df['sum(second_serve_return_points_made)'] / df['sum(second_serve_return_points_attempted)'])
    df = df.withColumn('f_i', 
            df['first_serve_percentage'] * df['first_serve_win_percentage'] + (1 - df['first_serve_percentage']) * df['second_serve_win_percentage']) \
        .withColumn('g_i',
            a_av * df['first_service_return_points_win_percentage'] + (1 - a_av) * df['second_serve_win_percentage']) \
        .select([
            'player_id',
            'f_i',
            'g_i',
            'first_serve_percentage',
            'first_serve_win_percentage',
            'second_serve_win_percentage',
            'first_service_return_points_win_percentage',
            'second_service_return_points_win_percentage'
        ])

    
    return df

#stats = find_players_statistics([
#    'roger-federer', 
#    'rafael-nadal',
#    'novak-djokovic',
#    'daniil-medvedev',
#    'dominic-thiem',
#    'stefanos-tsitsipas',
#    'alexander-zverev'
#]) 

Once the player statistics have been found, we can estimate the probabilities of player $A$ and $B$ winning a point on their serve as $f_{AB}$ and $f_{BA}$ respectively using the following equation:

$$
f_{AB} = f_t + (f_i - f_{av}) - (g_j - g_{av})
$$

Where: 

$$
\begin{align}
    f_t =& \text{ average percentage of points won on serve for tournament} \\
    f_{av} =& \text{ average percentage of points won on server (accross all players)} \\
    g_{av} =& \text{ average percentage of points won on return (across all players)} \\
\end{align}
$$

In [3]:
# f_t
def average_points_won_on_serve_percentage_for_tournament(players, tournaments):
    df = init_df()
    df = df.select(["player_id", "tournament", "service_points_won", "service_points_attempted"]) \
    .where(df['player_id'].isin(players)) \
    .where(df['tournament'].isin(tournaments)) \
    .dropna() \
    .groupBy(['player_id', 'tournament']) \
    .agg(sum('service_points_won'),
        sum('service_points_attempted'))
    return df.withColumn('f_t', df['sum(service_points_won)'] / df['sum(service_points_attempted)']) \
        .select(["player_id", "tournament", 'f_t'])

# f_av
def average_points_won_on_server_percentage():
    df = init_df()
    df = df.select(["player_id", "service_points_won", "service_points_attempted"]) \
    .dropna() \
    .groupBy(['player_id']) \
    .agg(sum('service_points_won'),
        sum('service_points_attempted'))
    df = df.withColumn('points_won_on_server_percentage', df['sum(service_points_won)'] / df['sum(service_points_attempted)']) \
    .groupBy() \
    .agg(avg('points_won_on_server_percentage').alias('average_points_won_on_server_percentage'))
    
    return df.collect()[0]['average_points_won_on_server_percentage']

# g_av
def average_points_won_on_return_percentage():
    df = init_df()
    df = df.select(["player_id", "return_points_won", "return_points_attempted"]) \
    .dropna() \
    .groupBy(['player_id']) \
    .agg(sum('return_points_won'),
        sum('return_points_attempted'))
    df = df.withColumn('points_won_on_server_percentage', df['sum(return_points_won)'] / df['sum(return_points_attempted)']) \
    .groupBy() \
    .agg(avg('points_won_on_server_percentage').alias('average_points_won_on_server_percentage'))
    
    return df.collect()[0]['average_points_won_on_server_percentage']

In [30]:
def find_serve_point_win_probability(players, tournaments):
    f_t = average_points_won_on_serve_percentage_for_tournament(players, tournaments)  
    f_av = average_points_won_on_server_percentage()
    g_av = average_points_won_on_return_percentage()
    stats = find_players_statistics(players)

    # Manupulate player statistics
    df = stats.select(['player_id', 'f_i', 'g_i']) \
        .where(stats['player_id'].isin(players)) \
        .join(f_t, on='player_id') \
        .select(['player_id', 'tournament', 'f_t', 'f_i', 'g_i'])
    
    # Create table with opponent information
    df2 = df.withColumnRenamed('player_id', 'opponent_id') \
        .withColumnRenamed('g_i', 'g_j') \
        .select(['opponent_id', 'tournament', 'g_j'])
    
    # Join player and opponent tables
    df3 = df.join(df2, on="tournament")
    df3 = df3.where(df3['player_id'] != df3['opponent_id']) \
        .withColumn('win_on_serve', df3['f_t'] + (df3['f_i'] - f_av) - (df3['g_j'] - g_av)) \
        .select(['player_id', 'opponent_id', 'tournament', 'win_on_serve'])
    
    return df3

players = [
    'roger-federer', 
    'rafael-nadal',
#    'novak-djokovic',
#    'daniil-medvedev',
#    'dominic-thiem',
#    'stefanos-tsitsipas',
#    'alexander-zverev'
]

serve_point_win_probability = find_serve_point_win_probability(players, ['us-open'])
serve_point_win_probability.show()

+-------------+-------------+----------+------------------+
|    player_id|  opponent_id|tournament|      win_on_serve|
+-------------+-------------+----------+------------------+
| rafael-nadal|roger-federer|   us-open|0.7546003064686689|
|roger-federer| rafael-nadal|   us-open|0.7743710177100784|
+-------------+-------------+----------+------------------+



## Markov Chain

The probability of player $A$ winning a game on their serve game $P$ using the following recursive definition:

$$
P(a,b) = p \cdot P(a + 1, b) + (1 - p) \cdot P(a, b + 1)
$$

With the following boundaries:

$$
P(a,b) =
\begin{cases}
    1 & \text{if } a = 4, b < 3 \\
    0 & \text{if } a < 3, b = 4\\
    \frac{p^2}{p^2 + (1 - p)^2} & \text{if } a = 3, b = 3   
\end{cases}
$$

In [5]:
'''
Assumes player A is serving.
'''
def P_game(a, b, p):
    if (a == 4 and b < 3): return 1
    if (b == 4 and a < 3): return 0
    if (a == 3 and b == 3): return (p ** 2)/(p ** 2 + (1 - p) ** 2)
    
    return p * P_game(a + 1, b, p) + (1 - p) * P_game(a, b + 1, p)

def P_tiebreaker(a, b, p_a, p_b):
    if (a == 7 and a - b >= 2): return 1
    if (b == 7 and b - a >= 2): return 0
    if (a == 6 and b == 6): return (p_a * (1 - p_b))/(p_a * (1 - p_b) + (1 - p_a) * p_b)
    
    p = p_a if ((a + b + 3) % 4 >= 2 and (a + b + 3) % 4 <= 3 ) else p_b
    return p * P_tiebreaker(a + 1, b, p_a, p_b) + (1 - p) * P_tiebreaker(a, b + 1, p_a, p_b) 

def P_tiebreaker_set(a, b, p_a, p_b, first_serving = "A"):
    # Type checking
    if (first_serving != "A" and first_serving != "B"):
        raise TypeError(f'The "first_serving" parameter cannot be set to "{first_serving}". It must be either "A" or "B".')
        
    if (a >= 6 and a - b >= 2): return 1
    if (b >= 6 and b - a >= 2): return 0
    if (a == 6 and b == 6): return P_tiebreaker(0, 0, p_a, p_b)
    
    # Find who is supposed to serve this set.
    serving = None
    if first_serving == "A":
        serving = "A" if (a + b) % 2 == 0 else "B"
    else:
        serving = "B" if (a + b) % 2 == 0 else "A"
    
    # Find the probability that player A wins this set, 
    # taking into account who is serving.
    p = None
    if serving == "A": 
        p = P_game(0, 0, p_a)
    else: 
        p = 1 - P_game(0, 0, p_b)
    
    return p * P_tiebreaker_set(a + 1, b, p_a, p_b, first_serving) + (1 - p) * P_tiebreaker_set(a, b + 1, p_a, p_b, first_serving)

def P_advantage_set(a, b, p_a, p_b, first_serving = "A"):
    # Type checking
    if (first_serving != "A" and first_serving != "B"):
        raise TypeError(f'The "first_serving" parameter cannot be set to "{first_serving}". It must be either "A" or "B".')
    
    # Boundaries
    if (a >= 6 and a - b >= 2 ): return 1
    if (b >= 6 and b - a >= 2): return 0
    if (a == 5 and b == 5):
        pp_a = P_game(0, 0, p_a)
        pp_b = P_game(0, 0, p_b)
        return (pp_a * (1 - pp_b)) / (pp_a * (1 - pp_b) + (1 - pp_a) * pp_b)
    
    # Find who is supposed to serve this set.
    serving = None
    if first_serving == 'A':
        serving = 'A' if (a + b) % 2 == 0 else 'B'
    else:
        serving = 'B' if (a + b) % 2 == 0 else 'A'
    
    # Find the probability that player A wins this set, 
    # taking into account who is serving.
    p = None
    if serving == 'A': 
        p = P_game(0, 0, p_a)
    else: 
        p = 1 - P_game(0, 0, p_b)
    
    return p * P_advantage_set(a + 1, b, p_a, p_b, first_serving) + (1 - p) * P_advantage_set(a, b + 1, p_a, p_b, first_serving)

'''
Assumes player A is serving
'''
def P_match(a, b, p, q, sets, isTiebreakerSet = False):
    # Type checking
    if (type(isTiebreakerSet) != bool):
        raise TypeError(f'"isTiebreakset" is type {type(isTiebreakerSet)}, but it should be type bool.')
        
    # Boundaries
    if (sets == 5): 
        if (a == 3 and b < 3): return 1
        if (b == 3 and a < 3): return 0
        if (a == 2 and b == 2): 
            if (isTiebreakerSet): return P_tiebreaker_set(a, b, p, q, 'A')
            else: return P_advantage_set(a, b, p, q, 'A')
    elif (sets == 3):
        if (a == 2 and b < 2): return 1
        if (b == 2 and a < 2): return 0
        if (a == 1 and b == 1):
            if (isTiebreakerSet): return P_tiebreaker_set(a, b, p, q, 'A')
            else: return P_advantage_set(a, b, p, q, 'A')
    else: 
        raise TypeError(f'"sets" has value {sets} but should have a value of 3 or 5.')
        
    first_serving = 'A' if ((a + b) % 2 == 0) else 'B'
    p_set = P_tiebreaker_set(0, 0, p, q, first_serving) if isTiebreakerSet else P_advantage_set(0,0, p, q, first_serving)
    return  p_set * P_match(a + 1, b, p, q, sets, isTiebreakerSet) + (1 - p_set) * P_match(a, b + 1, p, q, sets, isTiebreakerSet)

# print(P_match(0, 0, 0.7743710177100784, 0.7546003064686689, 5))

0.6539098606174539


In [28]:
def predict(player, opponent, tournament, **kwargs):
    sets = kwargs.get('sets', 3)
    tiebreaker = kwargs.get('tiebreaker', False)
    df = find_serve_point_win_probability([player, opponent], [tournament])
    df = df.where(df['player_id'].isin([player, opponent])) \
        .where(df['opponent_id'].isin([player, opponent]))
    
    player_p = df.where(df['player_id'] == player) \
        .select('win_on_serve') \
        .collect()[0][0]
    
    opponent_p = df.where(df['player_id'] == opponent) \
        .select('win_on_serve') \
        .collect()[0][0]
    
    print(f'serve advantage:\n  {player}: {player_p}\n  {opponent}: {opponent_p}')
    out = P_match(0, 0, player_p, opponent_p, sets, tiebreaker)
    print(f'{player} has {round(out*100, 2)}% chance to win a match against {opponent} during {tournament}.')
    
predict('roger-federer', 'rafael-nadal', 'us-open', sets=5)

serve advantage:
  roger-federer: 0.7743710177100784
  rafael-nadal: 0.7546003064686689
roger-federer has 65.39% chance to win a match against rafael-nadal during us-open.
