In [1]:
import sys
import os

dir_path = os.path.abspath("..")

if dir_path not in sys.path:
    sys.path.append(dir_path)

import pandas as pd
import warnings
from db.models import GameType, Player, SM5Game, Events, EventType, Team
from helpers import userhelper
from typing import List
from openskill.models.weng_lin.plackett_luce import PlackettLuceRating as Rating, PlackettLuce
from openskill.models.weng_lin.common import phi_major, phi_minor, v as v_, w as w_
import sanic
from tortoise import Tortoise
from config import config
import math
import copy

# CONSTANTS

MU = 25
SIGMA = 25 / 3
BETA = 25 / 6
KAPPA = 0.0001
TAU = 25 / 300

class LasertagModel(PlackettLuce):
    """
    Modified Plackett-Luce model for lasertag

    Uses event based ratings
    """
    def __init__(self) -> None:
        # model params
        self.mu = MU
        self.sigma = SIGMA
        self.beta = BETA
        self.kappa = KAPPA
        self.tau = TAU
    
    def rate_event(self, players: List[Rating], weight: float) -> List[Rating]:
        """
        Rates an event
        """

        new_players = copy.deepcopy(players)

        c = self._c(self._calculate_team_ratings([players]))
        omega = weight # weighting for the event
        # what is w and v?

        for player in new_players:
            v = phi_minor((self.mu - player.mu) / c) / phi_major((self.mu - player.mu) / c)
            w = v * (v + (self.mu - player.mu) / c)
            player.mu = player.mu + omega * v * ((player.sigma**2 - self.tau**2) / c)
            player.sigma = math.sqrt((player.sigma**2 + self.tau**2) * (1 - abs(omega) * w * ((player.sigma**2 + self.tau**2) / c**2)))

        return new_players

    def rate_vs_event(self, winners: List[Rating], losers: List[Rating], weight: float) -> List[Rating]:
        """
        Rates a vs event where there is a winner and a loser
        """

        new_winners = copy.deepcopy(winners)
        new_losers = copy.deepcopy(losers)

        c = self._c(self._calculate_team_ratings([winners, losers]))

        omega = weight
        winners_sum = sum(map(lambda x: x.mu, winners))
        losers_sum = sum(map(lambda x: x.mu, losers))

        delta_mu = winners_sum - losers_sum

        v = phi_minor(delta_mu / c) / phi_major(delta_mu / c)
        w = v * (v + delta_mu / c)

        for player in new_winners:
            player.mu = player.mu + omega * v * ((player.sigma**2 - self.tau**2) / c)
            player.sigma = math.sqrt((player.sigma**2 + self.tau**2) * (1 - abs(omega) * w * ((player.sigma**2 + self.tau**2) / c**2)))

        for player in new_losers:
            player.mu = player.mu - omega * v * ((player.sigma**2 - self.tau**2) / c)
            player.sigma = math.sqrt((player.sigma**2 + self.tau**2) * (1 - abs(omega) * w * ((player.sigma**2 + self.tau**2) / c**2)))

        return new_winners, new_losers

async def test():
    model = LasertagModel()

    team1 = [Rating(25, 25/3), Rating(25, 25/3), Rating(25, 25/3)]
    team2 = [Rating(25, 25/3), Rating(25, 25/3), Rating(25, 25/3)]

    model.rate_vs_event(team1, team2, 0.25)

    print(team1)
    print(team2)


await test()
raise Exception("test")

warnings.filterwarnings("ignore")

model = LasertagModel()

await Tortoise.init(
    db_url=f"mysql://{config['db_user']}:{config['db_password']}@{config['db_host']}:{config['db_port']}/laserforce",
    modules={"models": ["db.models"]}
)

data = {}
players_elo = {}

all_players = await Player.all()

for player in all_players:
    data[player.codename] = []
    players_elo[player.codename] = Rating(25, 25/3)

async def update_game_elo(game: SM5Game) -> bool:
    if not game.ranked:
        return False

    # go through all events for each game

    events: List[Events] = await game.events.filter(type__in=
        [EventType.DAMAGED_OPPONENT, EventType.DOWNED_OPPONENT, EventType.MISSILE_DAMAGE_OPPONENT,
        EventType.MISSILE_DOWN_OPPONENT, EventType.RESUPPLY_LIVES, EventType.RESUPPLY_AMMO]
    ).order_by("time").all() # only get the events that we need

    for event in events:
        if "@" in event.arguments[0] or "@" in event.arguments[2]:
            continue

        match event.type:
            case EventType.DAMAGED_OPPONENT | EventType.DOWNED_OPPONENT:
                shooter = await userhelper.player_from_token(game, event.arguments[0])
                shooter_player = await Player.filter(entity_id=shooter.entity_id).first()
                shooter_elo = players_elo[shooter_player.codename]
                target = await userhelper.player_from_token(game, event.arguments[2])
                target_player = await Player.filter(entity_id=target.entity_id).first()
                target_elo = players_elo[target_player.codename]

                out = model.rate([[shooter_elo], [target_elo]], ranks=[0, 1])

                players_elo[shooter_player.codename] = Rating(out[0][0].mu, shooter_player.sm5_sigma + ((out[0][0].sigma - shooter_player.sm5_sigma) * 0.1))
                players_elo[target_player.codename] = Rating(out[1][0].mu, target_player.sm5_sigma + ((out[1][0].sigma - shooter_player.sm5_sigma) * 0.1))

            case EventType.MISSILE_DAMAGE_OPPONENT | EventType.MISSILE_DOWN_OPPONENT:
                shooter = await userhelper.player_from_token(game, event.arguments[0])
                shooter_player = await Player.filter(entity_id=shooter.entity_id).first()
                shooter_elo = players_elo[shooter_player.codename]
                target = await userhelper.player_from_token(game, event.arguments[2])
                target_player = await Player.filter(entity_id=target.entity_id).first()
                target_elo = players_elo[target_player.codename]

                out = model.rate([[shooter_elo], [target_elo]], ranks=[0, 1])

                players_elo[shooter_player.codename] = Rating(out[0][0].mu, shooter_player.sm5_sigma + ((out[0][0].sigma - shooter_player.sm5_sigma) * 0.1))
                players_elo[target_player.codename] = Rating(out[1][0].mu, target_player.sm5_sigma + ((out[1][0].sigma - shooter_player.sm5_sigma) * 0.1))
    
    # rate game

    players_codenames = list(map(lambda x: x.name, await game.entity_starts.all()))

    team1 = []
    team2 = []

    for player in await game.entity_starts.filter(type="player"):
        if (await player.team).color_name == "Fire":
            team1.append(await Player.filter(entity_id=player.entity_id).first())
        else:
            team2.append(await Player.filter(entity_id=player.entity_id).first())

    team1_elo = list(map(lambda x: players_elo[x.codename], team1))
    team2_elo = list(map(lambda x: players_elo[x.codename], team2))

    if game.winner == Team.RED:
        team1_new, team2_new = model.rate([team1_elo, team2_elo], ranks=[0, 1])
    else:
        team1_new, team2_new = model.rate([team1_elo, team2_elo], ranks=[1, 0])

    for player, rating in zip(team1, team1_new):
        player.sm5_mu += (rating.mu - player.sm5_mu) * 5
        player.sm5_sigma += (rating.sigma - player.sm5_sigma) * 5

        players_elo[player.codename] = rating
    
    for player, rating in zip(team2, team2_new):
        player.sm5_mu += (rating.mu - player.sm5_mu) * 5
        player.sm5_sigma += (rating.sigma - player.sm5_sigma) * 5

        players_elo[player.codename] = rating

    for p_codename in data.keys():
        if p_codename in players_codenames:
            data[p_codename].append(players_elo[p_codename].mu - 3 * players_elo[p_codename].sigma)
        else:
            if len(data[p_codename]) == 0:
                data[p_codename].append(0)
            else:
                data[p_codename].append(data[p_codename][-1])

    return True

0.7978845608028654 0.6366197723675814
0.7978845608028654 0.6366197723675814
0.7978845608028654 0.6366197723675814
0.7978845608028654 0.6366197723675814
0.7978845608028654 0.6366197723675814
0.7978845608028654 0.6366197723675814
[PlackettLuceRating(mu=25.65192661894771, sigma=8.231079848573277), PlackettLuceRating(mu=25.65192661894771, sigma=8.231079848573277), PlackettLuceRating(mu=25.65192661894771, sigma=8.231079848573277)]
[PlackettLuceRating(mu=24.34807338105229, sigma=8.231079848573277), PlackettLuceRating(mu=24.34807338105229, sigma=8.231079848573277), PlackettLuceRating(mu=24.34807338105229, sigma=8.231079848573277)]


Exception: test

In [2]:
from IPython.display import clear_output
# update elo and format data
games = await SM5Game.filter(ranked=False)
i = 0

for game in games:
    print(f"Updating elo for game {i}/{len(games)}")
    clear_output(wait=True)
    await update_game_elo(game)
    i += 1

data = {key:val for key, val in data.items() if val.count(0) != len(val)}

print(data)

df = pd.DataFrame(data=data)

TypeError: Model.all() got an unexpected keyword argument 'ranked'