In [51]:
%pip install ujson



In [52]:
import requests
import gzip
import shutil
import time
import os
import logging
import sys
import os.path
import requests
import multiprocessing as mp
import ujson as json
from collections import deque
from io import BytesIO
from enum import Enum, auto
from tqdm.notebook import tqdm
from os import listdir

In [53]:
import pandas as pd
import numpy as np
from sklearn.mixture import BayesianGaussianMixture, GaussianMixture
from sklearn.datasets import make_blobs
from sklearn.decomposition import PCA
import numpy as np
import matplotlib.pyplot as plt

In [54]:
S3_BUCKET_URL = "https://vcthackathon-data.s3.us-west-2.amazonaws.com"

# (game-changers, vct-international, vct-challengers)
LEAGUE = "vct-international"

# (2022, 2023, 2024)
YEAR = 2022

In [55]:
def download_gzip_and_write_to_json(file_name):
    if os.path.isfile(f"{file_name}.json"):
        return False

    remote_file = f"{S3_BUCKET_URL}/{file_name}.json.gz"
    response = requests.get(remote_file, stream=True)

    if response.status_code == 200:
        gzip_bytes = BytesIO(response.content)
        with gzip.GzipFile(fileobj=gzip_bytes, mode="rb") as gzipped_file:
            with open(f"{file_name}.json", 'wb') as output_file:
                shutil.copyfileobj(gzipped_file, output_file)
        return True
    elif response.status_code == 404:
        # Ignore
        return False
    else:
        print(response)
        print(f"Failed to download {file_name}")
        return False


def download_esports_files():
    print(f"Downloading esports files for {LEAGUE}...")
    directory = f"{LEAGUE}/esports-data"

    if not os.path.exists(directory):
        os.makedirs(directory)

    esports_data_files = ["leagues", "tournaments",
                          "players", "teams", "mapping_data"]
    for file_name in tqdm(esports_data_files):
        download_gzip_and_write_to_json(f"{directory}/{file_name}")

    print("Done downloading esports files")


def download_games():
    print(f"Downloading game files for {LEAGUE}...")

    local_mapping_file = f"{LEAGUE}/esports-data/mapping_data.json"
    with open(local_mapping_file, "r") as json_file:
        mappings_data = json.load(json_file)

    local_directory = f"{LEAGUE}/games/{YEAR}"
    if not os.path.exists(local_directory):
        os.makedirs(local_directory)

    game_counter = 0

    for esports_game in tqdm(mappings_data):
        s3_game_file = f"{LEAGUE}/games/{YEAR}/{esports_game['platformGameId']}"

        response = download_gzip_and_write_to_json(s3_game_file)

        if (response == True):
            game_counter += 1

    print("Done downloading game files")


In [56]:
download_esports_files()
download_games()

Downloading esports files for vct-international...


  0%|          | 0/5 [00:00<?, ?it/s]

Done downloading esports files
Downloading game files for vct-international...


  0%|          | 0/1742 [00:00<?, ?it/s]

Done downloading game files


In [57]:
with open(f'/content/{LEAGUE}/esports-data/players.json', 'r') as f:
    player_df = pd.DataFrame(json.load(f))

with open(f'/content/{LEAGUE}/esports-data/mapping_data.json', 'r') as f:
    mapping_df = pd.DataFrame(json.load(f))

with open(f'/content/{LEAGUE}/esports-data/teams.json', 'r') as f:
    team_df = pd.DataFrame(json.load(f))

with open(f'/content/{LEAGUE}/esports-data/tournaments.json', 'r') as f:
    tournament_df = pd.DataFrame(json.load(f))

with open(f'/content/{LEAGUE}/esports-data/leagues.json', 'r') as f:
    league_df = pd.DataFrame(json.load(f))

In [58]:
logging.basicConfig(
    format='{asctime} [{levelname}] {message}',
    style="{",
    datefmt="%H:%M",
    level=logging.WARNING,
    force=True
)

bucket = "actvaldata"
prefix = f'{LEAGUE}/esports-data'

In [64]:
def add_item_to_dynamodb(table_name, item):
    # dynamodb = boto3.resource('dynamodb')
    # table = dynamodb.Table(table_name)

    # response = table.put_item(Item=item)
    # return response
    print(item.values())

def read_json_from_s3(bucket_name, file):
    # s3 = boto3.client('s3')

    # response = s3.get_object(Bucket=bucket_name, Key=file_key)
    # content = response['Body'].read().decode('utf-8')

    with open(f'/content/{file}', 'r') as f:
        content = f.read()

    return json.loads(content)

def list_s3_files(bucket_name, prefix):
    # s3 = boto3.client('s3')
    # response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)

    # files = []
    # if 'Contents' in response:
    #     for obj in response['Contents']:
    #         files.append(obj['Key'])
    files = listdir(f"/content/{prefix}")

    return files

In [60]:
maps = {
    "Infinity": 'ABYSS',
    "Ascent": 'ASCENT',
    "Duality": 'BIND',
    "Foxtrot": 'BREEZE',
    "Canyon": 'FRACTURE',
    "Triad": 'HAVEN',
    "Port": 'ICEBOX',
    "Jam": 'LOTUS',
    "Pitt": 'PEARL',
    "Bonsai": 'SPLIT',
    "Juliett": 'SUNSET',
}

vec_fields = [
    'ROUND_NUMBER',
    'OUTCOME',
    'SIDE',
    'KILLS',
    'DEATHS',
    'ASSISTS',
    'COMBAT_SCORE',
    'KILLS_STINGER',
    'KILLS_BUCKY',
    'KILLS_JUDGE',
    'KILLS_SPECTRE',
    'KILLS_BULLDOG',
    'KILLS_GUARDIAN',
    'KILLS_PHANTOM',
    'KILLS_VANDAL',
    'KILLS_MARSHAL',
    'KILLS_OUTLAW',
    'KILLS_OPERATOR',
    'KILLS_ARES',
    'KILLS_ODIN',
    'KILLS_CLASSIC',
    'KILLS_SHORTY',
    'KILLS_FRENZY',
    'KILLS_GHOST',
    'KILLS_SHERIFF',
    'KILLS_MELEE',
    'TIME_ALIVE',
    'DEAD',
    'DAMAGE_TAKEN',
    'DAMAGE_DONE',
    'SPIKE_CARRY_PERCENT',
    'SPIKE_PLANT',
    '''
    'AFFINITY_ABYSS',
    'AFFINITY_ASCENT',
    'AFFINITY_BIND',
    'AFFINITY_BREEZE',
    'AFFINITY_FRACTURE',
    'AFFINITY_HAVEN',
    'AFFINITY_ICEBOX',
    'AFFINITY_LOTUS',
    'AFFINITY_PEARL',
    'AFFINITY_SPLIT',
    'AFFINITY_SUNSET',
    '''
    'ASTRA_PICK_RATE',
    'BREACH_PICK_RATE',
    'BRIMSTONE_PICK_RATE',
    'CHAMBER_PICK_RATE',
    'CYPHER_PICK_RATE',
    'DEADLOCK_PICK_RATE',
    'FADE_PICK_RATE',
    'GEKKO_PICK_RATE',
    'HARBOR_PICK_RATE',
    'JETT_PICK_RATE',
    'KAYO_PICK_RATE',
    'KILLJOY_PICK_RATE',
    'NEON_PICK_RATE',
    'OMEN_PICK_RATE',
    'PHOENIX_PICK_RATE',
    'RAZE_PICK_RATE',
    'REYNA_PICK_RATE',
    'SAGE_PICK_RATE',
    'SKYE_PICK_RATE',
    'SOVA_PICK_RATE',
    'VIPER_PICK_RATE',
    'YORU_PICK_RATE',
    'ISO_PICK_RATE',
    'CLOVE_PICK_RATE',
    'VYSE_PICK_RATE',
    'DUELIST_PICK_RATE',
    'INITIATOR_PICK_RATE',
    'SENTINEL_PICK_RATE',
    'CONTROLLER_PICK_RATE',
    # TODO: map score
    # TODO: win type
]

In [61]:
class PlayerRound:
    def __init__(self, game_id, player_id, map):
        # TODO: add abilities, player killed data, more damage data
        self.metadata = {
            'game_id': game_id,
            'map': map,
        }
        self.vec = {'id': player_id}
        for v in vec_fields:
            self.vec[v] = 0

    def update_vec(self, idx, val):
        self.vec[idx] = val

    def add_vec(self, idx, i):
        self.vec[idx] += i

    def get_vec(self, idx):
        return self.vec[idx]

    def upload(self):
        self.vec['metadata'] = self.metadata
        add_item_to_dynamodb('RoundData', self.vec)

class Game:
    def _process_event(self, event):
        if 'snapshot' in event:
            return

        # agent_name, agent_class, side, round number
        if 'roundStarted' in event:
            e = event['roundStarted']
            logging.debug(f'Round started {e}')

            self._processing_round = True
            self._curr_round_start_time = float(event['metadata']['eventTime']['omittingPauses'][:-1])

            attacking_team = str(e['spikeMode']['attackingTeam']['value'])
            # agent_name and agent_class
            for i, p in enumerate(self.player_loc.values()):
                pi = str(i+1)
                self.players[pi]['player_round'] = PlayerRound(self.game_id, p, self.map)

                agent = self.players[pi]['agent_name'] + '_PICK_RATE'
                agent_class = self.players[pi]['agent_role'] + '_PICK_RATE'

                self.players[pi]['player_round'].update_vec(agent, 1)
                self.players[pi]['player_round'].update_vec(agent_class, 1)

                # Set current round number
                self.players[pi]['player_round'].update_vec('ROUND_NUMBER', e['roundNumber'])

                # Set side
                if int(pi) in self.teams[attacking_team]['players']:
                    self.players[pi]['player_round'].update_vec('SIDE', 1)
                else:
                    self.players[pi]['player_round'].update_vec('SIDE', -1)

            return

        # Skip processing if not inside of a round
        if not self._processing_round:
            return

        cur_time = float(event['metadata']['eventTime']['omittingPauses'][:-1])

        # damage receive, damage dealt
        if 'damageEvent' in event:
            e = event['damageEvent']
            logging.debug(f'Damage Event {e}')

            # Set damage dealt
            if 'causerId' in e:
                causer = str(e['causerId']['value'])
                self.players[causer]['player_round'].add_vec('DAMAGE_DONE', e['damageAmount'])

            # Set damage received
            victim = str(e['victimId']['value'])
            self.players[victim]['player_round'].add_vec('DAMAGE_TAKEN', e['damageAmount'])

            return

        # death flag, weapon kill, time alive, kills, deaths, asissts,
        if 'playerDied' in event:
            e = event['playerDied']
            time_stamp = float(event['metadata']['eventTime']['omittingPauses'][:-1])
            logging.debug(f'Player Died {e}')

            # Set death flag and death counter
            dead_player = str(e['deceasedId']['value'])
            self.players[dead_player]['player_round'].update_vec('DEAD', 1)
            self.players[dead_player]['player_round'].add_vec('DEATHS', 1)

            # Set time alive
            time_alive = time_stamp - self._curr_round_start_time
            self.players[dead_player]['player_round'].update_vec('TIME_ALIVE', time_alive)

            # Update weapon kill tracker and kill counter
            killer = str(e['killerId']['value'])
            self.players[killer]['player_round'].add_vec('KILLS', 1)
            if 'weapon' in e:
                weapon_guid = e['weapon']['fallback']['guid']
                if weapon_guid == "":
                    self.players[killer]['player_round'].add_vec('KILLS_MELEE', 1)
                else:
                    g = requests.get(f'https://valorant-api.com/v1/weapons/{weapon_guid}')
                    wkey = 'KILLS_' + g.json()['data']['displayName'].upper()
                    self.players[killer]['player_round'].add_vec(wkey, 1)

            # Update assist counter
            if 'assistants' in e:
                for a in e['assistants']:
                    assister = str(a['assistantId']['value'])
                    self.players[assister]['player_round'].add_vec('ASSISTS', 1)
            return

        # spike plant, spike carry time, spike defuse
        if 'spikeStatus' in event:
            e = event['spikeStatus']
            logging.debug(f'Spike Status {e}')

            # Set spike plant flag and update spike carry time and spike defuse flag
            if e['status'] == "IN_HANDS" and 'carrier' in e:
                if not 'carrier' in e:
                    logging.warning("SPIKE IN_HANDS event with no carrier found")
                else:
                    self._curr_spike_carrier = str(e['carrier']['value'])
                self._curr_spike_pickup_stamp = float(event['metadata']['eventTime']['omittingPauses'][:-1])
            elif e['status'] == "PLANTED":
                self.players[self._curr_spike_carrier]['player_round'].update_vec('SPIKE_PLANT', 1)
                self.players[self._curr_spike_carrier]['player_round'].add_vec('SPIKE_CARRY_PERCENT', cur_time - self._curr_spike_pickup_stamp)
            elif e['status'] == "ON_GROUND":
                self.players[self._curr_spike_carrier]['player_round'].add_vec('SPIKE_CARRY_PERCENT', cur_time - self._curr_spike_pickup_stamp)

            return

        # combat score, outcome, time alive, noramlize spike carry time
        if 'roundDecided' in event:
            e = event['roundDecided']
            logging.debug(f'Round Decided {e}')

            round_length = cur_time - self._curr_round_start_time
            winning_team = str(e['result']['winningTeam']['value'])
            for p in self.players:
                # Set outcome
                if int(p) in self.teams[winning_team]['players']:
                    self.players[p]['player_round'].update_vec('OUTCOME', 1)
                else:
                    self.players[p]['player_round'].update_vec('OUTCOME', -1)

                # Set time alive
                if self.players[p]['player_round'].get_vec('DEAD') == 0:
                    self.players[p]['player_round'].add_vec('TIME_ALIVE', round_length)

                # Normalize spike time
                spike_time = self.players[p]['player_round'].get_vec('SPIKE_CARRY_PERCENT')
                self.players[p]['player_round'].update_vec('SPIKE_CARRY_PERCENT', spike_time / round_length)


            round_end_stamp = float(event['metadata']['eventTime']['omittingPauses'][:-1])
            while 'snapshot' not in event:
                event = self.event_feed.popleft()

            e = event['snapshot']

            # Set combat score
            for p in e['players']:
                player = str(p['playerId']['value'])
                self.players[player]['player_round'].update_vec('COMBAT_SCORE', p['scores']['combatScore']['roundScore'])

            self._processing_round = False
            self._curr_round_start_time = None
            self._curr_spike_carrier = None
            self._curr_spike_pickup_stamp = None
            self._curr_round_start_time = None

            for p in self.players.values():
                p['player_round'].upload()

            return


    def __init__(self, file):
        self.name = file.split('/')[-1]
        self.players = dict()
        self.teams = dict()
        self._curr_round_start_time = None
        self._curr_spike_carrier = None
        self._curr_spike_pickup_stamp = None
        self._curr_round_start_time = None
        self._processing_round = False

        logging.warning(f"Ingesting {self.name}")
        j = read_json_from_s3(bucket, file)

        self.event_feed = deque(j)

        first_event = self.event_feed.popleft()

        self.game_id = first_event['platformGameId']

        self.player_loc = mapping_df.loc[mapping_df['platformGameId'] == self.game_id, 'participantMapping'].values[0]

        second_event = self.event_feed.popleft()

        for i, p in enumerate(self.player_loc.values()):
            self.players[str(i+1)] = {
                'player_round': None,
                'agent_name': "",
                'agent_role': "",
            }

        self.map = maps[second_event['configuration']['selectedMap']['fallback']['displayName']]

        self.player_agents = dict()
        for i, p in enumerate(second_event['configuration']['players']):
            agent_guid = p['selectedAgent']['fallback']['guid']
            agent_data = requests.get(f'https://valorant-api.com/v1/agents/{agent_guid}')
            self.players[str(p['playerId']['value'])]['agent_name'] = agent_data.json()['data']['displayName'].upper()
            self.players[str(p['playerId']['value'])]['agent_role'] = agent_data.json()['data']['role']['displayName'].upper()

        teamid = str(second_event['configuration']['teams'][0]['teamId']['value'])
        self.teams[teamid] = dict()
        self.teams[teamid]['players'] = [p['value'] for p in second_event['configuration']['teams'][0]['playersInTeam']]
        self.teams[teamid]['name'] = team_df.iloc[second_event['configuration']['teams'][0]['teamId']['value']]['slug']

        teamid = str(second_event['configuration']['teams'][1]['teamId']['value'])
        self.teams[teamid] = dict()
        self.teams[teamid]['players'] = [p['value'] for p in second_event['configuration']['teams'][1]['playersInTeam']]
        self.teams[teamid]['name'] = team_df.iloc[second_event['configuration']['teams'][1]['teamId']['value']]['slug']

        # ingest events
        logging.info(f"Ingesting events for {self.name}")
        while len(self.event_feed) != 0:
            current_event = self.event_feed.popleft()
            self._process_event(current_event)
        logging.info(f"Done ingesting events for {self.name}")

In [62]:
game_files = list_s3_files(bucket, f'{LEAGUE}/games/{YEAR}')

In [65]:
Game('vct-international/games/2022/val:273de7c3-0541-438e-b663-c8c23767b3e4.json')



dict_values([1, -1, -1, 0, 1, 1, 78, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 567.1070000000001, 1, 100.0, 98.1476669, 0.0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, {'game_id': 'val:273de7c3-0541-438e-b663-c8c23767b3e4', 'player_id': '106525414942310346', 'map': 'HAVEN'}])
dict_values([1, -1, -1, 1, 1, 0, 172, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 571.9530000000001, 1, 100.0, 22, 0.0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, {'game_id': 'val:273de7c3-0541-438e-b663-c8c23767b3e4', 'player_id': '106525416849604559', 'map': 'HAVEN'}])
dict_values([1, -1, -1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 565.354, 1, 100, 0, 0.0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, {'game_id': 'val:273de7c3-0541-438e-b663-c8c23767b3e4', 'player_id': '106525415831916494', 'map': 'HAVEN'}])
dict_values(

<__main__.Game at 0x7b5ef42e47f0>

In [None]:
import time
start = time.time()
for g in game_files:
    Game(f'{LEAGUE}/games/{YEAR}/{g}')
delta = time.time() - start
print(f'linear ingestion took {delta:0.2f} seconds')



linear ingestion took 111.55 seconds


In [63]:
def consume_game(g):
    Game(f'{LEAGUE}/games/{YEAR}/{g}')

start = time.time()

num_processes = mp.cpu_count() * 2
with mp.Pool(processes=num_processes) as pool:
    pool.map(consume_game, game_files)

delta = time.time() - start
print(f'parallel ingestion took {delta:0.2f} seconds')



parallel ingestion took 1009.11 seconds
