In [1]:
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import warnings
import re

image_path = 'C:/Users/tabm9/OneDrive - Caissa Analytica/Documents/DS_Portfolio/Projects/Images/RLCS/'

# Functions for figure formating

In [2]:
def write_plot(fig: go.Figure, file_name: str, path: str = image_path, **kwargs):
    # Transform into html
    html = fig.to_html(include_plotlyjs='cdn', div_id=file_name, **kwargs)

    # Extract js code
    js_start = [s.end() for s in re.finditer('<script type="text/javascript">', html)][-1]
    js_end = [s.start() for s in re.finditer('</script>', html)][-1]

    js = html[js_start: js_end]

    # Write files
    with open(path + file_name + '.js', 'w') as f:
        f.write(js)


line_color = '#f1f5f9'
font_color = '#f1f5f9'
background_color = '#032137'

layout_dict = dict(
    font_color=font_color,
    paper_bgcolor='rgba(0,0,0,0)',
    plot_bgcolor='rgba(0,0,0,0)',
    xaxis = dict(gridcolor=line_color, linecolor=line_color, zerolinecolor=line_color),
    yaxis = dict(gridcolor=line_color, linecolor=line_color, zerolinecolor=line_color),
    legend=dict(x=0, y=1, bgcolor=background_color),
    margin=dict(r=0, l=0, b=0)
)

tablet_args = dict(default_width='700px', default_height='300px')
mobile_args = dict(default_width='400px', default_height='175px')

# Bounded Random Walks

In [3]:
def bound_array(a, bounds=[0, 1]):
    s = a.copy()

    # Check number of times it breaks bounds
    signed_crosses = np.floor((s - bounds[0]) / (bounds[1] - bounds[0])).astype(int)
    crosses = np.floor(np.where(signed_crosses < 0, -signed_crosses, signed_crosses)).astype(int)

    # Reorient direction
    s *= ((-1) ** crosses)

    # Reposition to maintain continuity
    s += (bounds[1] - bounds[0]) * signed_crosses * (-1) ** (crosses + 1)
    s += (bounds[1] + bounds[0]) * 0 ** ((crosses + 1) % 2)

    return s


def bounded_random_walk(initial=1/2, bounds=[0, 1], size=[1000, 10], drift=0, std=0.01, is_deterministic=False):
    if is_deterministic: return np.ones(size) * initial

    # Generate random walk initialized on 0
    walk = np.random.normal(loc=drift, scale=std, size=size).cumsum(0) + initial

    # Adjust walk to bounds
    walk = bound_array(walk, bounds)        

    return walk


# Nice example plot
fig = go.Figure(layout=layout_dict)

walks = bounded_random_walk().T

for walk in walks:
    fig.add_trace(go.Scatter(y=walk, opacity=0.5, showlegend=False))

fig.update_yaxes(range=[0, 1])
fig.show()

# Elo Theory

The basic idea is that a team $X$ has a rating $R_X$. These ratings can help us calculate the expected result that a given team, say $A$, will beat another team, say $B$:

$ \begin{align}
    \nonumber \bar{E}[A \text{ beats } B] &= \bar{E}_{A, B} \\
    \nonumber &= \frac{1}{1 + 10^{(R_B - R_A) / 400}}
\end{align} $

Now, this formula is actually an estimation of the expected value. We can not know the actual value and that is why we need the ratings in the first place. As such, we need to update the ratings (and their inherent probabilities) everytime we actually observe a match between $A$ and $B$. Assume that we observed said match, and let us denote the score outcome from the viewpoint of $A$ as $S_A$. $S_A$ will be $1$ if $A$ won, and $0$ if $A$ lost and $1/2$ if they tied. Similarly, from the viewpoint of $B$, $S_B$ is $1$ if $B$ won, $0$ if they lost and $1/2$ if they tied. Let us observe what happens when we express the expectation as its original probabillistic definition:

$ \begin{align}
    \nonumber \bar{E}_{A, B} = 1 \cdot \bar{P}(A \text{ beats } B) + 1/2 \cdot \bar{P}(A \text{ ties with } B) + 0 \cdot \bar{P}(A \text{ loses to } B)
\end{align} $

We can't know each individal probability in this context. However, if our game has no ties, we can simplify this expression to:

$ \begin{align}
    \nonumber \bar{E}_{A, B} &= \bar{P}(A \text{ beats } B) \\
    \nonumber &= \bar{P}_{A, B}
\end{align} $

This means that, for games with only win/lose results, the Elo system actually approximates win probabilities. This interpretation will be useful in the future to test the performance of the system. 

Now, as we are talking about approximations, we need a way to update them with new observed matches. The original method uses a parameter $k$ for the update.

$ \begin{align}
    \nonumber R_{A, n + 1} = R_{A, n} + k \cdot (S_{A, B} - \bar{P}_{A, B})
\end{align} $

## Update problem

The issue with the vanilla implementation is that it has no way of recognizing when the ratings (and implicitly the underlying aproximated probabilities) are too different from the real values. Neither has it the ability to react to the size of this difference, as the update parameter $k$ is assumed constant. Let us run some tests on the number of games to convergence and the distribution of the difference after convergence.

First, we need to agree on what we mean by "convergence". Say we have the distribution for $D_{A, B}(n) = |P_{A, B}(n) - \bar{P}_{A, B}(n)|$. Integrating the match number $n$ into the equation allows both the actual probabilities and the approximations to change through time, which is a healthy assumption as teams can train and hone their abilities. We can then define the convergence $C_{A, B}$ as:

$ \begin{align}
    \nonumber C_{A, B}(N) = \sum_{n=0}^N w(n) \cdot F_{D_{A, B}(n)}^{-1}(1/2)
\end{align} $

It looks scary, but is quite simple. $F_{D_{A, B}(n)}^{-1}(1/2)$ denotes the median (or $50\%$ quantile) of $D_{A, B}(n)$. We do this because we want something stable on which to measure convergence. The next step is to measure the rate of change of this median as we step further into the games, thus we wight the median according to $n$. We want to tune $w(n)$ such that it gives more importance to bigger $n$'s, which we assume would have converged better than smaller ones.

Let us see how this works if $F_{D_{A, B}(n)}^{-1}(1/2) = e^{-n / 100} + 0.2$ and $w(n) = \frac{e^\frac{-5n}{N}}{\sum_{k=0}^N e^\frac{-5k}{N}}$.

In [22]:
N = 1000
n = np.array(list(range(N)))
median = np.e ** (-n / 100) + 0.2

fig = go.Figure(layout=layout_dict | dict(title='Convergence example', legend=dict(x=0.8, bgcolor=background_color), xaxis=dict(title='n')))

# Plot the median
fig.add_trace(go.Scatter(x=n, y=median, name='Median of the Difference'))

# Plot convergenve value
weights = np.e ** -np.linspace(5, 0, N)
weights /= sum(weights)
convergence = sum(median * weights)
fig.add_trace(go.Scatter(x=n, y=[convergence] * N, name='Convergence Value'))


fig.show()

We can see that our method is a fairly good approximation, but it still needs to be tested against $N$ to see how it behaves. Fortunately, we know that our choice for the median converges to $0.2$, so we can observe how we reach that value for each increasing choice of $N$:

In [20]:
convergence = []
Ns = list(range(50, 10000, 50))
for N in Ns:
    # Setup median
    n = np.array(list(range(N)))
    median = np.e ** (-n / 100) + 0.2

    # Calculate convergence value
    weights = np.e ** -np.linspace(5, 0, N)
    weights /= sum(weights)
    convergence.append(sum(median * weights))

# Plot results
fig = go.Figure(layout=layout_dict | dict(title='Convergence Sensitivity to N', legend=dict(x=0.8, bgcolor=background_color), xaxis=dict(title='N')))

fig.add_trace(go.Scatter(x=Ns, y=convergence, name='Approximated Convergence Values'))
fig.add_trace(go.Scatter(x=Ns, y=[0.2] * len(Ns), name='Actual Convergence at Limit'))

fig.show()

## Convergence Tests

Let $P_{A, B}(n) = BRW_{0.25, 0, 0.01}(n)$, where $BRW_{\beta, \mu, \sigma}$ is a Bounded Random Walk with initial value $BRW_{\beta, \mu, \sigma}(0) = \beta$, drift $\mu$ and standard deviation $\sigma$. We first need a function to generate the ratings.

In [24]:
def calculate_elo_2_players(games_results: np.array, k=1):
    # Get shapes
    N = games_results.shape
    Sa = games_results.copy()

    # Calculate Ratings
    Ra = np.zeros(N)
    pa = np.zeros(N)
    pa[0] += 1/2
    for i in range(1, N[0]):
        # Update rating
        Ra[i] = Ra[i-1] + k * (Sa[i - 1] - pa[i - 1])

        # Update Approx Probabilities
        pa[i] = 1 / (1 + 10 ** (-2 * Ra[i] / 400))

    return Ra, pa

In [17]:
# Example for k=1
N = [10000, 3000]
p = bounded_random_walk(initial = 1/4, size=N)
games_results = (np.random.random(N) <= p).astype(int)

Ra, pa = calculate_elo_2_players(games_results)

diff = abs(p - pa)
quantiles = np.quantile(diff, [0.1, 0.5, 0.9], axis=1)

In [18]:
# Calculate convergence value
weights = np.e ** -np.linspace(5, 0, N[0])
weights /= sum(weights)
converge_value = sum(quantiles[1] * weights)

# Plot example
x_range = list(range(N[0])) + list(reversed(range(N[0])))

fig = go.Figure(layout=layout_dict | dict(title='Example of D(n) for k=1'))

fig.add_trace(go.Scatter(
        x = x_range, y=list(quantiles[2]) + list(quantiles[0][::-1]), fill='toself', mode='none',
        hoveron='points', fillcolor='lightblue', name=f'80% CI', opacity=0.5
    ))
fig.add_trace(go.Scatter(y=quantiles[1], line=dict(color='lightblue'), name='Median'))
fig.add_trace(go.Scatter(y=[converge_value] * N[0], line=dict(color='red'), name='Convergence Value'))

fig.show()

In [28]:
# Example for k=1
N = [10000, 3000]
p = bounded_random_walk(initial = 1/4, size=N)
games_results = (np.random.random(N) <= p).astype(int)

Ra, pa = calculate_elo_2_players(games_results, k=10)

diff = abs(p - pa)
quantiles = np.quantile(diff, [0.1, 0.5, 0.9], axis=1)

# Calculate convergence value
weights = np.e ** -np.linspace(5, 0, N[0])
weights /= sum(weights)
converge_value = sum(quantiles[1] * weights)

# Plot example
x_range = list(range(N[0])) + list(reversed(range(N[0])))

fig = go.Figure(layout=layout_dict | dict(title='Example of D(n) for k=1'))

fig.add_trace(go.Scatter(
        x = x_range, y=list(quantiles[2]) + list(quantiles[0][::-1]), fill='toself', mode='none',
        hoveron='points', fillcolor='lightblue', name=f'80% CI', opacity=0.5
    ))
fig.add_trace(go.Scatter(y=quantiles[1], line=dict(color='lightblue'), name='Median'))
fig.add_trace(go.Scatter(y=[converge_value] * N[0], line=dict(color='red'), name='Convergence Value'))

fig.show()

In [41]:
def calculate_convergence(N, initial, K):
    # Run games
    p = bounded_random_walk(initial = initial, size=N)
    games_results = (np.random.random(N) <= p).astype(int)

    convergence_values = []
    for k in K:
        # Approximated probabilities
        _, pa = calculate_elo_2_players(games_results, k=k)

        # Calculate difference and median
        diff = abs(p - pa)
        median = np.quantile(diff, 0.5, axis=1)

        # Calculate convergence value
        weights = np.e ** -np.linspace(5, 0, N[0])
        weights /= sum(weights)
        convergence_values.append(sum(median * weights))

    return convergence_values

In [48]:
K = [1, 2, 5, 7, 10, 20, 50, 100]
convergence_values = []
N0s = [100, 500, 1000, 2000, 3000, 5000, 10000]
for N0 in N0s:
    N = [N0, 3000]
    convergence_values += [calculate_convergence(N, 1/4, K)]

convergence_values = np.array(convergence_values).T

fig = go.Figure(layout=layout_dict)

for i in range(len(K)):
    fig.add_trace(go.Scatter(x=N0s, y=convergence_values[i], name=f'k = {K[i]}'))

fig.show()

In [50]:
K = [1, 2, 5, 7, 10, 20, 50, 100]
convergence_values = []
N0s = [100, 500, 1000, 2000, 3000, 5000, 10000]
for N0 in N0s:
    N = [3000, N0]
    convergence_values += [calculate_convergence(N, 1/4, K)]

convergence_values = np.array(convergence_values).T

fig = go.Figure(layout=layout_dict)

for i in range(len(K)):
    fig.add_trace(go.Scatter(x=N0s, y=convergence_values[i], name=f'k = {K[i]}'))

fig.show()

The test will measure $C_{A, B}$ and the size of the $80\%$ Central Interval (CI) on $D_{A, B}(n)$ for different values of $k$. 

# Test with only 2 teams

## Naive Elo

In [5]:
def run_games(Pa=1/4, K=1, N=[10000, 1], thres=0.10, stop_burnout=False, is_deterministic=True):
    # Run N games
    Ps = bounded_random_walk(initial=Pa, size=N, is_deterministic=is_deterministic)
    Sa = (np.random.random(N) < Ps).astype(int)

    # Calculate Ratings
    Ra = np.zeros(N)
    pa = np.zeros(N)
    pa[0] += 1/2
    finished_burnout = np.array([False] * N[1])
    n_to_convergence = np.ones(N[1]) * (N[0] + 1)
    for i in range(1, N[0]):
        # Update rating
        Ra[i] = Ra[i-1] + K * (Sa[i - 1] - pa[i - 1])

        # Update Approx Probabilities
        pa[i] = 1 / (1 + 10 ** (-2 * Ra[i] / 400))

        # Check if burnout is finished
        if not all(finished_burnout):
            condition = abs(pa[i] - Ps[i]) <= Ps[i] * thres
            finished_burnout = np.where(condition, True, finished_burnout)
            n_to_convergence = np.where(finished_burnout, n_to_convergence, i + 1)

            if all(finished_burnout) and stop_burnout:
                break

    return abs(Ps - pa).T, n_to_convergence


## Improved Elo

In [6]:
def logbase(base, x):
    return np.log(x) / np.log(base)

def K_generator(K_interval=[0,400], K_default=300):
    K_d = (K_default - K_interval[0]) / (K_interval[1] - K_interval[0])
    b = logbase(1/2, 1/2 - np.log((1 + np.e ** (-2 * np.e) - K_d) / (K_d + np.e ** (-2 * np.e))) / (4 * np.e))

    def K_function(x, b=b, K_interval=K_interval):
        return (K_interval[1] - K_interval[0]) * ((1 + 2 * np.e ** (-2 * np.e)) / (1 + np.e ** (-4 * np.e * (x ** b - 1/2))) - np.e ** (-2 * np.e)) + K_interval[0]

    return K_function



def run_games_improved(Pa=1/4, K_interval=[0,200], K_default=180, N=[10000, 1], thres=0.10, thres_n=10, window=100, stop_burnout=False, is_deterministic=True):
    # Run N games
    Ps = bounded_random_walk(initial=Pa, size=N, is_deterministic=is_deterministic)
    Sa = (np.random.random(N) < Ps).astype(int)

    # Calculate Ratings
    Ra = np.zeros(N)
    pa = np.zeros(N)
    pa[0] += 1/2
    fpa_list = pa.copy()
    finished_burnout = np.array([False] * N[1])
    n_to_convergence = np.ones(N[1]) * (N[0] + 1)
    K = K_generator(K_interval=K_interval, K_default=K_default)
    K_list = -np.ones(N)

    for i in range(1, N[0]):
        # Update rating
        frequentist_pa = Sa[max(0, i - window):min(N[0], i + window)].mean(0)
        
        K_i = K(abs(pa[i - 1] - frequentist_pa))
        Ra[i] = Ra[i-1] + K_i * (Sa[i - 1] - pa[i - 1])

        K_list[i] = K_i
        fpa_list[i] = frequentist_pa

        # Update Approx Probabilities
        pa[i] = 1 / (1 + 10 ** (-2 * Ra[i] / 400))

        # Check if burnout is finished
        if not all(finished_burnout) and i >= thres_n:
            condition = (abs(pa[i - thres_n: i] - Ps[i - thres_n: i]) <= Ps[i - thres_n: i].mean() * thres).T.all(1)
            finished_burnout = np.where(condition, True, finished_burnout)
            n_to_convergence = np.where(finished_burnout, n_to_convergence, i + 1 - thres_n)

            if all(finished_burnout) and stop_burnout:
                break

    return abs(Ps - pa).T, n_to_convergence, fpa_list.T

## Comparison with static probabilities

In [7]:
# Setup parameters
Pa = 1/4
N_pa = [1000, 100]
N_conv = [3000, 10000]
N_range_pa = list(range(N_pa[0] + 1))

# Simulation
pa, _ = run_games(Pa=Pa, N=N_pa)
pa_improved, _, _ = run_games_improved(Pa=Pa, N=N_pa)

_, n_to_convergence = run_games(Pa=Pa, N=N_conv, stop_burnout=True)
_, n_to_convergence_improved, fpa_list = run_games_improved(Pa=Pa, N=N_conv, stop_burnout=True)


# Initialize plot
fig = make_subplots(
    rows = 2, 
    subplot_titles=('Actual vs. Approximated Probabilities Difference', 'Time to Convergence Distributions')
)

# Calculate quantiles
naive_quantiles = np.quantile(pa.T, [0.1, 0.5, 0.9], axis=1)
improved_quantiles = np.quantile(pa_improved.T, [0.1, 0.5, 0.9], axis=1)

# Add Probability lines
x_range = list(range(N_pa[0])) + list(reversed(range(N_pa[0])))

fig.add_trace(go.Scatter(
        x = x_range, y=list(naive_quantiles[2]) + list(naive_quantiles[0][::-1]), fill='toself', mode='none',
        hoveron='points', fillcolor='blue', name=f'Naive Probabilities', legendgroup=0, opacity=0.5
    ), row=1, col=1)
fig.add_trace(go.Scatter(y=naive_quantiles[1], line=dict(color='blue'), showlegend=False, legendgroup=0, name='Naive (Median)'), row=1, col=1)


fig.add_trace(go.Scatter(
        x = x_range, y=list(improved_quantiles[2]) + list(improved_quantiles[0][::-1]), fill='toself', mode='none',
        hoveron='points', fillcolor='red', name=f'Improved Probabilities', legendgroup=1, opacity=0.5
    ), row=1, col=1)
fig.add_trace(go.Scatter(y=improved_quantiles[1], line=dict(color='red'), showlegend=False, legendgroup=1, name='Improved (Median)'), row=1, col=1)

# Prepare histogram parameters
bin_width = 50
bins = list(range(0, int(np.ceil(max(max(n_to_convergence), max(n_to_convergence_improved)))), bin_width))

x_naive = np.mean(n_to_convergence)
y_naive = sum(np.digitize(n_to_convergence, bins) == np.digitize(x_naive, bins)) / N_conv[1]
x_improved = np.mean(n_to_convergence_improved)
y_improved = sum(np.digitize(n_to_convergence_improved, bins) == np.digitize(x_improved, bins)) / N_conv[1]

# Add histograms
fig.add_trace(go.Histogram(histnorm='probability', x=n_to_convergence, xbins=dict(size=bin_width), opacity=0.5, showlegend=False), row=2, col=1)
fig.add_trace(go.Histogram(histnorm='probability', x=n_to_convergence_improved, xbins=dict(size=bin_width), opacity=0.5, showlegend=False), row=2, col=1)

fig.add_annotation(x=x_naive, y=y_naive, xref='x2', yref='y2', text='Naive Elo', ax=50)
fig.add_annotation(x=x_improved, y=y_improved, xref='x2', yref='y2', text='Improved Elo', ax=50)

# Update Layout
fig.update_layout(barmode='overlay', title='Test with 2 Teams, Static Probabilities', **layout_dict)
fig.update_xaxes(title_text='Iteration', row=1, col=1)
fig.update_xaxes(title_text='Iteration', row=2, col=1)
fig.update_yaxes(title_text='Probability', range=[0, 1], row=1, col=1)
fig.update_yaxes(title_text='Probability', row=2, col=1)

# Show / save figure
fig.update_yaxes(range=[0, 1])
fig.show()
# fig.write_html(image_path + '2_Teams_Static_Probability.html')

## Comparison with Bounded Random Walk

In [8]:
# Setup parameters
Pa = 1/4
N_pa = [3000, 100]
N_conv = [3000, 10000]
N_range_pa = list(range(N_pa[0] + 1))

# Simulation
pa, _ = run_games(Pa=Pa, N=N_pa, is_deterministic=False)
pa_improved, _, _ = run_games_improved(Pa=Pa, N=N_pa, is_deterministic=False)

_, n_to_convergence = run_games(Pa=Pa, N=N_conv, is_deterministic=False, stop_burnout=True)
_, n_to_convergence_improved, fpa_list = run_games_improved(Pa=Pa, N=N_conv, is_deterministic=False, stop_burnout=True)


# Initialize plot
fig = make_subplots(
    rows = 2, 
    subplot_titles=('Actual vs. Approximated Probabilities', 'Time to Convergence Distributions')
)

# Calculate quantiles
naive_quantiles = np.quantile(pa.T, [0.1, 0.5, 0.9], axis=1)
improved_quantiles = np.quantile(pa_improved.T, [0.1, 0.5, 0.9], axis=1)

# Add Probability lines
x_range = list(range(N_pa[0])) + list(reversed(range(N_pa[0])))

fig.add_trace(go.Scatter(
        x = x_range, y=list(naive_quantiles[2]) + list(naive_quantiles[0][::-1]), fill='toself', mode='none',
        hoveron='points', fillcolor='blue', name=f'Naive Probabilities', legendgroup=0, opacity=0.5
    ), row=1, col=1)
fig.add_trace(go.Scatter(y=naive_quantiles[1], line=dict(color='blue'), showlegend=False, legendgroup=0, name='Naive (Median)'), row=1, col=1)


fig.add_trace(go.Scatter(
        x = x_range, y=list(improved_quantiles[2]) + list(improved_quantiles[0][::-1]), fill='toself', mode='none',
        hoveron='points', fillcolor='red', name=f'Improved Probabilities', legendgroup=1, opacity=0.5
    ), row=1, col=1)
fig.add_trace(go.Scatter(y=improved_quantiles[1], line=dict(color='red'), showlegend=False, legendgroup=1, name='Improved (Median)'), row=1, col=1)

# Prepare histogram parameters
bin_width = 50
bins = list(range(0, int(np.ceil(max(max(n_to_convergence), max(n_to_convergence_improved)))), bin_width))

x_naive = np.mean(n_to_convergence)
y_naive = sum(np.digitize(n_to_convergence, bins) == np.digitize(x_naive, bins)) / N_conv[1]
x_improved = np.mean(n_to_convergence_improved)
y_improved = sum(np.digitize(n_to_convergence_improved, bins) == np.digitize(x_improved, bins)) / N_conv[1]

# Add histograms
fig.add_trace(go.Histogram(histnorm='probability', x=n_to_convergence, xbins=dict(size=bin_width), opacity=0.5, showlegend=False), row=2, col=1)
fig.add_trace(go.Histogram(histnorm='probability', x=n_to_convergence_improved, xbins=dict(size=bin_width), opacity=0.5, showlegend=False), row=2, col=1)

fig.add_annotation(x=x_naive, y=y_naive, xref='x2', yref='y2', text='Naive Elo', ax=50)
fig.add_annotation(x=x_improved, y=y_improved, xref='x2', yref='y2', text='Improved Elo', ax=50)

# Update Layout
fig.update_layout(barmode='overlay', title='Test with 2 Teams, Bounded Random Walks', **layout_dict)
fig.update_xaxes(title_text='Iteration', row=1, col=1)
fig.update_xaxes(title_text='Iteration', row=2, col=1)
fig.update_yaxes(title_text='Probability', range=[0, 1], row=1, col=1)
fig.update_yaxes(title_text='Probability', row=2, col=1)

# Show / save figure
fig.update_yaxes(range=[0, 1])
fig.show()
# fig.write_html(image_path + '2_Teams_Bounded_Random_Walks.html')

# Test with 3 teams and stable process

$ P = \frac{1}{1 + a^{b \cdot D}} \rightarrow P_{AC} = \frac{1}{1 + \left[1 - \frac{1}{P_{AB}} \right] \cdot \left[1 - \frac{1}{P_{BC}} \right]} $

$ P = (1/2)^{y^{z \cdot D}} \rightarrow P_{AC} = P_{AB}^{log_{1/2}(P_{BC})} = P_{BC}^{log_{1/2}(P_{AB})} $

In [9]:
def calculate_implicit(p1, p2):
    return p1 ** (np.log(p2) / np.log(1/2))

P_1_beats_2 = bounded_random_walk(size=1000)
P_2_beats_3 = bounded_random_walk(size=1000)
P_1_beats_3	= 1 / (1 + (1 - 1 / P_1_beats_2) * (1 - 1 / P_2_beats_3))

fig = go.Figure(layout=layout_dict)

fig.add_trace(go.Scatter(y=P_1_beats_2, name='Probability 1 beats 2'))
fig.add_trace(go.Scatter(y=P_2_beats_3, name='Probability 2 beats 3'))
fig.add_trace(go.Scatter(y=P_1_beats_3, name=f'Implicit Probability 1 beats 3 (sigmoid)'))
fig.add_trace(go.Scatter(y=calculate_implicit(P_1_beats_2, P_2_beats_3), name=f'Implicit Probability 1 beats 3 (exp)'))
fig.add_trace(go.Scatter(y=calculate_implicit(P_2_beats_3, P_1_beats_2), name=f'Implicit Probability 1 beats 3 (exp inv)'))

fig.show()

In [10]:
def calculate_implicit_probability(P_A_beats_B, P_B_beats_C, weights_AB=1, weights_BC=1):
    weights = weights_AB + weights_BC
    with warnings.catch_warnings():
        warnings.simplefilter("ignore", category=RuntimeWarning)
        weighted_P_AB = (1 / P_A_beats_B - 1) ** (weights_AB / weights)
        weighted_P_BC = (1 / P_B_beats_C - 1) ** (weights_BC / weights)
    return 1 / (1 + weighted_P_AB * weighted_P_BC)

def stable_three_player_generator(N=[10000, 1], initials=[1/2, 1/2]):
    # Set probabilities
    P_1_beats_2 = bounded_random_walk(initial=initials[0], size=N)
    P_2_beats_3 = bounded_random_walk(initial=initials[1], size=N)
    P_1_beats_3 = calculate_implicit_probability(P_1_beats_2, P_2_beats_3)

    # Generate order of games
    games_players = np.random.choice([0, 1, 2], size=N)

    # Generate results
    games_results = np.random.random(N)
    p = np.array([P_1_beats_2, P_1_beats_3, P_2_beats_3])
    games_results = (games_results <= np.choose(games_players, p)).astype(int)

    return p, games_players, games_results

def calculate_weighted_probability(P_A_beats_B, P_A_beats_C, P_B_beats_C, weights_AB, weights_AC, weights_BC):
    weights = weights_AB + weights_AC + weights_BC
    
    with warnings.catch_warnings():
        warnings.simplefilter("ignore", category=RuntimeWarning)
        prob = P_A_beats_C * weights_AC 
        prob += calculate_implicit_probability(P_A_beats_B, P_B_beats_C, weights_AB, weights_BC) * (weights_AB + weights_BC)
        prob /= weights

    weird_cases = (P_A_beats_B == 0) | (P_A_beats_B == 1) | (P_B_beats_C == 0) | (P_B_beats_C == 1)
    return np.where(weird_cases, P_A_beats_C, prob)



def three_player_heuristic(games_players, games_results):
    games_choices = np.unique(games_players)

    averages = []
    counts = []
    for i in games_choices:
        games_choice_results = np.where(games_players == games_choices[i], games_results, np.nan)
        with warnings.catch_warnings():
            warnings.simplefilter("ignore", category=RuntimeWarning)
            averages.append(np.nanmean(games_choice_results, axis=0))
            counts.append(np.count_nonzero(~np.isnan(games_choice_results), axis=0))

    averages = np.array(averages)
    counts = np.array(counts)

    return np.array([
        # P that 0 beats 1
        calculate_weighted_probability(averages[1], averages[0], 1 - averages[2], counts[1], counts[0], counts[2]),
        # P that 0 beats 2
        calculate_weighted_probability(averages[0], averages[1], averages[2], counts[0], counts[1], counts[2]),
        # P that 1 beats 2
        calculate_weighted_probability(1 - averages[0], averages[2], averages[1], counts[0], counts[2], counts[1])
    ])

def calculate_probability(R_A, R_B):
    # Probability that A beats B
    return 1 / (1 + 10 ** ((R_B - R_A) / 400))

In [11]:
N = [1000, 1]
p, games_players, games_results = stable_three_player_generator(N=N)

window = 50

pa = []
for i in range(N[0]):
    games_players_i = games_players[max(0, i - window):min(N[0], i + window)]
    games_results_i = games_results[max(0, i - window):min(N[0], i + window)]
    pa.append(three_player_heuristic(games_players_i, games_results_i))
pa = np.array(pa)

fig = go.Figure(layout=layout_dict)

fig.add_trace(go.Scatter(y=p[0].flatten(), line=dict(color='blue'), name='A beats B', legendgroup=0))
fig.add_trace(go.Scatter(y=pa.T[0][0].flatten(), line=dict(color='blue'), opacity=0.5, showlegend=False, legendgroup=0))

fig.add_trace(go.Scatter(y=p[1].flatten(), line=dict(color='red'), name='A beats C', legendgroup=1))
fig.add_trace(go.Scatter(y=pa.T[0][1].flatten(), line=dict(color='red'), opacity=0.5, showlegend=False, legendgroup=1))

fig.add_trace(go.Scatter(y=p[2].flatten(), line=dict(color='green'), name='B beats C', legendgroup=2))
fig.add_trace(go.Scatter(y=pa.T[0][2].flatten(), line=dict(color='green'), opacity=0.5, showlegend=False, legendgroup=2))

fig.show()

## Naive Elo

In [None]:
def calculate_probability(R_A, R_B):
    # Probability that A beats B
    return 1 / (1 + 10 ** ((R_B - R_A) / 400))

def calculate_probability_combinations(ratings):
    probabilities = []
    for i in range(len(ratings) - 1):
        for j in range(i + 1, len(ratings)):
            probabilities.append(
                calculate_probability(ratings[i], ratings[j])
            )
    
    return probabilities

def calculate_elo(games_results, games_players=None, K=1):
    # Get shapes and initialize if missing
    N = games_results.shape
    if not games_players:
        games_players = np.apply_along_axis(lambda x: np.array([[0, 1]] * len(x)), 1, games_results)
    players = np.unique(games_players)
    player_combinations = np.array(
        [[players[i], players[j]] for i in range(len(players) - 1) for j in range(i + 1, len(players))]
    )

    # Calculate Ratings
    coordinate_base = np.array([range(N[1])] * 2).T.flatten()
    Ra = np.zeros([*N, len(players)])
    pa = np.zeros([*N, len(players)])
    pa[0] += 1/2
    for i in range(1, N[0]):
        # Set up players in game
        player_coordinates = np.apply_along_axis(lambda x: np.where((player_combinations == x).all(axis=1)), 1, games_players[i - 1]).flatten()
        Ra_coordinates = [coordinate_base, games_players[i - 1].flatten()]
        pa_coordinates = [[0, 1] * N[1], coordinate_base, np.array([player_coordinates, player_coordinates]).T.flatten()]

        # Update rating
        Sa_i = np.array([games_results[i - 1], 1 - games_results[i - 1]]).T.flatten()
        pa_i = np.array([pa[i - 1], 1 - pa[i - 1]])[*pa_coordinates]
        Ra[i] = Ra[i - 1].copy()
        Ra[i][*Ra_coordinates] += K * (Sa_i - pa_i)

        # Update Approx Probabilities
        pa[i] = np.apply_along_axis(
                    lambda ratings: calculate_probability_combinations(ratings),
                    1, 
                    Ra[i]
                )

    return Ra, pa

In [12]:
def run_games_three_players(p, games_players, games_results, K=10, thres=0.10, stop_burnout=False):
    N = games_results.shape

    # Calculate Ratings
    coordinate_base = np.array([range(N[1])] * 2).T.flatten()
    players = [0, 1, 2]
    Ra = np.zeros([*N, 3])
    pa = np.zeros([*N, 3])
    pa[0] += 1/2
    finished_burnout = np.array([False] * N[1])
    n_to_convergence = np.ones(N[1]) * (N[0] + 1)
    for i in range(1, N[0]):
        # Set up players in game
        players_i = np.array([[0,1], [0,2], [1,2]])[games_players[i - 1]]
        Ra_coordinates = [coordinate_base, players_i.flatten()]
        pa_coordinates = [[0, 1] * N[1], coordinate_base, np.array([games_players[i - 1], games_players[i - 1]]).T.flatten()]

        # Update rating
        Sa_i = np.array([games_results[i - 1], 1 - games_results[i - 1]]).T.flatten()
        pa_i = np.array([pa[i - 1], 1 - pa[i - 1]])[*pa_coordinates]
        Ra[i] = Ra[i - 1].copy()
        Ra[i][*Ra_coordinates] += K * (Sa_i - pa_i)

        # Update Approx Probabilities
        pa[i] = np.apply_along_axis(
                    lambda x: [
                        calculate_probability(x[0], x[1]),
                        calculate_probability(x[0], x[2]), 
                        calculate_probability(x[1], x[2])
                    ],
                    1, 
                    Ra[i]
                )

        # Check if burnout is finished
        if not all(finished_burnout):
            condition = abs(pa[i].T - p[players, [i]]) <= p[players, [i]] * thres
            condition = condition.T.all(axis=1)
            finished_burnout = np.where(condition, True, finished_burnout)
            n_to_convergence = np.where(finished_burnout, n_to_convergence, i + 1)
            
            if all(finished_burnout) and stop_burnout: break

    return abs(np.transpose(p, [1, 2, 0]) - pa), n_to_convergence

In [13]:
N = [3000, 1000]
p, games_players, games_results = stable_three_player_generator(initials=[0.1, 0.7], N=N)

p_diff, n_to_convergence = run_games_three_players(p, games_players, games_results, K=50)
p_diff = np.transpose(p_diff, [2, 0, 1])

fig = go.Figure(layout=layout_dict)

group = 0
colors = ['lightblue', 'pink', 'lightgreen']
x_range = list(range(N[0])) + list(reversed(range(N[0])))
for C2_players in p_diff:
    quantiles = np.apply_along_axis(lambda x: np.quantile(x, [0.1, 0.5, 0.9]), 1, C2_players).T
    fig.add_trace(go.Scatter(
        x = x_range, y=list(quantiles[2]) + list(quantiles[0][::-1]), fill='toself', mode='none',
        hoveron='points', fillcolor=colors[group], name=f'Match {group}', legendgroup=group, opacity=0.5
    ))
    fig.add_trace(go.Scatter(y=quantiles[1], line=dict(color=colors[group]), legendgroup=group, showlegend=False, name=f'Match {group} median'))
    group += 1

fig.update_yaxes(range=[0, 1])
fig.show()


## Improved

In [14]:

def run_games_three_players_improved(p, games_players, games_results, K_interval=[0,200], K_default=180, window=50, thres=0.10, stop_burnout=False):
    N = games_results.shape

    # Calculate Ratings
    coordinate_base = np.array([range(N[1])] * 2).T.flatten()
    players = [0, 1, 2]
    Ra = np.zeros([*N, 3])
    pa = np.zeros([*N, 3])
    pa[0] += 1/2
    finished_burnout = np.array([False] * N[1])
    n_to_convergence = np.ones(N[1]) * (N[0] + 1)
    K = K_generator(K_interval=K_interval, K_default=K_default)
    for i in range(1, N[0]):
        # Calculate heuristic
        window_list = [max(0, i - window), min(N[0], i + window)]
        frequentist_pa = three_player_heuristic(games_players[window_list[0]: window_list[1]], games_results[window_list[0]: window_list[1]]).T
        K_i = np.choose(games_players[i - 1], K(abs(pa[i - 1] - frequentist_pa)).T)


        # Set up players in game
        players_i = np.array([[0,1], [0,2], [1,2]])[games_players[i - 1]]
        Ra_coordinates = [coordinate_base, players_i.flatten()]
        pa_coordinates = [[0, 1] * N[1], coordinate_base, np.array([games_players[i - 1], games_players[i - 1]]).T.flatten()]

        # Update rating
        Sa_i = np.array([games_results[i - 1], 1 - games_results[i - 1]]).T.flatten()
        pa_i = np.array([pa[i - 1], 1 - pa[i - 1]])[*pa_coordinates]
        Ra[i] = Ra[i - 1].copy()
        Ra[i][*Ra_coordinates] += (K_i * (Sa_i - pa_i).reshape([N[1],-1]).T).T.flatten()

        # Update Approx Probabilities
        pa[i] = np.apply_along_axis(
                    lambda x: [
                        calculate_probability(x[0], x[1]),
                        calculate_probability(x[0], x[2]), 
                        calculate_probability(x[1], x[2])
                    ],
                    1, 
                    Ra[i]
                )

        # Check if burnout is finished
        if not all(finished_burnout):
            condition = abs(pa[i].T - p[players, [i]]) <= p[players, [i]] * thres
            condition = condition.T.all(axis=1)
            finished_burnout = np.where(condition, True, finished_burnout)
            n_to_convergence = np.where(finished_burnout, n_to_convergence, i + 1)
            
            if all(finished_burnout) and stop_burnout: break

    return abs(np.transpose(p, [1, 2, 0]) - pa), n_to_convergence

In [19]:
N = [3000, 1000]
p, games_players, games_results = stable_three_player_generator(initials=[0.1, 0.7], N=N)

p_diff, n_to_convergence = run_games_three_players_improved(p, games_players, games_results, K_interval=[0,500], K_default=400)
p_diff = np.transpose(p_diff, [2, 0, 1])



fig = go.Figure(layout=layout_dict)

group = 0
colors = ['blue', 'red', 'green']
x_range = list(range(N[0])) + list(reversed(range(N[0])))
for C2_players in p_diff:
    quantiles = np.apply_along_axis(lambda x: np.quantile(x, [0.1, 0.5, 0.9]), 1, C2_players).T
    fig.add_trace(go.Scatter(
        x = x_range, y=list(quantiles[2]) + list(quantiles[0][::-1]), fill='toself', mode='none',
        hoveron='points', fillcolor=colors[group], name=f'Match {group}', legendgroup=group, opacity=0.5
    ))
    fig.add_trace(go.Scatter(y=quantiles[1], line=dict(color=colors[group]), legendgroup=group, showlegend=False, name=f'Match {group} median'))
    group += 1

fig.update_yaxes(range=[0, 1])
fig.show()