<a href="https://www.kaggle.com/code/huikang/kore-2022-match-analysis?scriptVersionId=98093614" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

# Introduction

This notebook contains
- Plotting the match statistics
- Visualize gameplay (see the last cell)

Refer to my [discussion post](https://www.kaggle.com/competitions/kore-2022/discussion/320987) for plans and suggestions.

In [None]:
%%capture

%reset -sf
!pip install --user kaggle-environments > /dev/null
!rm *.py *.pickle
!echo $KAGGLE_KERNEL_RUN_TYPE

from IPython.core.magic import register_cell_magic

@register_cell_magic
def writefile_and_run(line, cell):
    argz = line.split()
    file = argz[-1]
    mode = 'w'
    if len(argz) == 2 and argz[0] == '-a':
        mode = 'a'
    with open(file, mode) as f:
        f.write(cell)
    get_ipython().run_cell(cell)

In [None]:
%%writefile_and_run kore_analysis.py

import os, re, json, enum, glob, shutil, collections, requests, pickle

import numpy as np
import pandas as pd
import matplotlib
import matplotlib.animation
import matplotlib.patheffects
import matplotlib.pyplot as plt
import IPython.display

import kaggle_environments


plt.rcParams["interactive"] = False
plt.rcParams["animation.html"] = "jshtml"
plt.rcParams["animation.embed_limit"] = 70.0   # default 20.0 stopped at step 200
plt.rcParams["figure.figsize"] = [8,8]
plt.rcParams["figure.dpi"] = 100
plt.rcParams["savefig.facecolor"] = "white"


def load_from_simulated_game(home_agent_path, away_agent_path):
    env = kaggle_environments.make("kore_fleets", debug=True)
    env.run([home_agent_path, away_agent_path])
    return env

def fix_overage_time(match):
    for turn_idx, match_state in enumerate(match["steps"]):
        for player_id in [0,1]:
            match_state[player_id]["observation"]["remainingOverageTime"] \
                = max(0, match_state[player_id]["observation"]["remainingOverageTime"])
    return match

def load_from_replay_json(path_to_json):
    with open(path_to_json, 'r') as f:
        match = json.load(f)
    match = fix_overage_time(match)
    env = kaggle_environments.make("kore_fleets", steps=match['steps'],
                                   configuration=match['configuration'])
    home_agent = "home"
    away_agent = "away"
    return env

def load_from_episode_id(episode_id):
    # kaggle.com/code/robga/kore-episode-scraper-match-downloader/
    base_url = "https://www.kaggle.com/api/i/competitions.EpisodeService/"
    get_url = base_url + "GetEpisodeReplay"
    req = requests.post(get_url, json = {"episodeId": int(episode_id)}).json()
    match = json.loads(req["replay"])
    match = fix_overage_time(match)
    env = kaggle_environments.make("kore_fleets", steps=match['steps'],
                                   configuration=match['configuration'])
    return env

# Run match or load

In [None]:
# various ways to load your agent

# simulate a match between two agents
home_agent_path = "../input/kore-beta-1st-place-solution/main.py"
away_agent_path = "../input/kore-starter-6th-in-beta-rule-based-agent/main.py"
# env = load_from_simulated_game(home_agent_path, away_agent_path)

# from match replay file saved
file_pattern = "../input/kore-episode-scraper-match-downloader/*.json"
path_to_json = sorted(fn for fn in glob.glob(file_pattern) if "_" not in fn)[-2]
env = load_from_replay_json(path_to_json)

# from episode_id
episode_id = 37215729  # 26646197
env = load_from_episode_id(episode_id)

In [None]:
# save env object for animation later
with open('env.pickle', 'wb') as handle:
    pickle.dump(env, handle, protocol=pickle.HIGHEST_PROTOCOL)

In [None]:
# object extracted
type(env.steps), len(env.steps)

In [None]:
env.render(mode="ipython", width=720, height=680)

# Information Extraction

Here we compile the information of the action, shipyards, fleets, and kore storage fo each player.

In [None]:
%%writefile_and_run -a kore_analysis.py

class FlightPlanClass(enum.IntEnum):
    invalid = 0
    unknown = 1
    acyclic = 2
    boomerang = 3
    rectangle = 4
    construct = 5
    attack = 6

def kore_mining_rate(kore_amount, fleetsize):
    kore_amount_before_regeneration = kore_amount / 1.02
    if kore_amount_before_regeneration < 500:
        kore_amount = kore_amount_before_regeneration
    precentage_mining_rate = np.log(max(1,fleetsize)) / 20
    kore_amount = kore_amount / (1-precentage_mining_rate)
    return kore_amount * precentage_mining_rate

def calculate_mining_rates(kore_amount_matrices, agent_fleetsize_matrices):
    return [sum(kore_mining_rate(kore_amount, fleetsize) 
                for kore_amount, fleetsize in zip(kore_amounts, fleetsizes))
            for kore_amounts, fleetsizes in zip(kore_amount_matrices, agent_fleetsize_matrices)]

In [None]:
%%writefile_and_run -a kore_analysis.py

class KoreMatch():
    def __init__(self, match_info, home_agent="home", away_agent="away", save_animation=False):
        self.match_info = match_info
        self.home_agent = home_agent
        self.away_agent = away_agent
        self.save_animation = save_animation

        res = match_info
        self.num_turns = len(res)
        self.home_kore_stored = [info[0]["observation"]["players"][0][0] for info in res]
        self.away_kore_stored = [info[0]["observation"]["players"][1][0] for info in res]

        self.home_shipyards = [info[0]["observation"]["players"][0][1] for info in res]
        self.away_shipyards = [info[0]["observation"]["players"][1][1] for info in res]
        self.home_shipyards_locations = [
            set(kaggle_environments.helpers.Point.from_index(int(loc_idx), 21) for loc_idx, _, _ in home_shipyards.values())
            for home_shipyards in self.home_shipyards]
        self.away_shipyards_locations = [
            set(kaggle_environments.helpers.Point.from_index(int(loc_idx), 21) for loc_idx, _, _ in away_shipyards.values())
            for away_shipyards in self.away_shipyards]
        self.all_shipyards_locations = [
            home_shipyards_location | away_shipyards_location for home_shipyards_location, away_shipyards_location in
            zip(self.home_shipyards_locations, self.away_shipyards_locations)]

        self.home_actions = [{shipyard:action for shipyard,action in home_info["action"].items()
                              if shipyard in self.home_shipyards[turn_idx]}
                             for turn_idx,(home_info,_) in enumerate(res)]
        self.away_actions = [{shipyard:action for shipyard,action in away_info["action"].items()
                              if shipyard in self.away_shipyards[turn_idx]}
                             for turn_idx,(_,away_info) in enumerate(res)]

        self.home_fleets = [info[0]["observation"]["players"][0][2] for info in res]
        self.away_fleets = [info[0]["observation"]["players"][1][2] for info in res]
        
        self.home_kore_carried = [sum(x[1] for x in fleet_info.values()) for fleet_info in self.home_fleets]
        self.away_kore_carried = [sum(x[1] for x in fleet_info.values()) for fleet_info in self.away_fleets]

        self.home_ship_standby = [sum(shipyard[1] for shipyard in shipyards.values()) for shipyards in self.home_shipyards]
        self.away_ship_standby = [sum(shipyard[1] for shipyard in shipyards.values()) for shipyards in self.away_shipyards]
        self.home_ship_launched = [sum(fleet[2] for fleet in fleets.values()) for fleets in self.home_fleets]
        self.away_ship_launched = [sum(fleet[2] for fleet in fleets.values()) for fleets in self.away_fleets]

        self.home_fleetsize_matrices = [[0 for _ in info[0]["observation"]["kore"]] for info in res]
        self.away_fleetsize_matrices = [[0 for _ in info[0]["observation"]["kore"]] for info in res]

        for turn, (home_fleets_info, away_fleets_info) in enumerate(zip(self.home_fleets, self.away_fleets)):
            for home_fleet_info in home_fleets_info.values():
                location, fleetsize = home_fleet_info[0], home_fleet_info[2]
                self.home_fleetsize_matrices[turn][location] += fleetsize
            for away_fleet_info in away_fleets_info.values():
                location, fleetsize = away_fleet_info[0], away_fleet_info[2]
                self.away_fleetsize_matrices[turn][location] += fleetsize

        self.kore_amount_matrices = [info[0]["observation"]["kore"] for info in res]

        self.home_mining_rates = calculate_mining_rates(self.kore_amount_matrices, self.home_fleetsize_matrices)
        self.away_mining_rates = calculate_mining_rates(self.kore_amount_matrices, self.away_fleetsize_matrices)

        self.home_spawing_costs = [-10 * sum(int(action.split("_")[1]) for action in actions.values() 
                                        if action.startswith("SPAWN")) for actions in self.home_actions]
        self.away_spawing_costs = [-10 * sum(int(action.split("_")[1]) for action in actions.values() 
                                        if action.startswith("SPAWN")) for actions in self.away_actions]

        self.home_launch_counts = [[int(action.split("_")[1]) for action in actions.values() if action.startswith("LAUNCH")]
                                   for actions in self.home_actions]
        self.away_launch_counts = [[int(action.split("_")[1]) for action in actions.values() if action.startswith("LAUNCH")] 
                                   for actions in self.away_actions]
        self.home_launch_plans = [[(action.split("_")[2]) for action in actions.values() if action.startswith("LAUNCH")]
                                  for actions in self.home_actions]
        self.away_launch_plans = [[(action.split("_")[2]) for action in actions.values() if action.startswith("LAUNCH")] 
                                  for actions in self.away_actions]

        self.home_combat_diffs = [(a2+b2-a1-b1)-x-y for x,y,a1,b1,a2,b2 in 
                             zip(self.home_mining_rates[1:], self.home_spawing_costs[1:], self.home_kore_carried, self.home_kore_stored, 
                                 self.home_kore_carried[1:], self.home_kore_stored[1:])]
        self.away_combat_diffs = [(a2+b2-a1-b1)-x-y for x,y,a1,b1,a2,b2 in 
                             zip(self.away_mining_rates[1:], self.away_spawing_costs[1:], self.away_kore_carried, self.away_kore_stored, 
                                 self.away_kore_carried[1:], self.away_kore_stored[1:])]

        self.home_kore_asset_sums = 500*np.array(list(map(len,self.home_shipyards))) \
                                   + 10*np.array(self.home_ship_standby) + 10*np.array(self.home_ship_launched) \
                                      + np.array(self.home_kore_stored) + np.array(self.home_kore_carried)
        self.away_kore_asset_sums = 500*np.array(list(map(len,self.away_shipyards))) \
                                   + 10*np.array(self.away_ship_standby) + 10*np.array(self.away_ship_launched) \
                                      + np.array(self.away_kore_stored) + np.array(self.away_kore_carried)

In [None]:
kore_match = KoreMatch(env.steps)

In [None]:
kore_match.home_actions[178]  # name-player: instruction_shipcount(_flightplan)

In [None]:
kore_match.home_kore_stored[178]

In [None]:
kore_match.away_shipyards[178]  # name-player: location, ship count, turn existence

In [None]:
kore_match.home_fleets[178]  # name-player: location, kore carried, ship count, direction?, remaining flight plan

# Statisical Visualizations

In [None]:
%%writefile_and_run -a kore_analysis.py

def plot_statistics_kore(self):
    plt.figure(figsize=(15,5))
    plt.plot(self.home_kore_stored, label=self.home_agent + " (stored)", color="blue", linestyle="dotted")
    plt.plot(self.away_kore_stored, label=self.away_agent + " (stored)", color="red", linestyle="dotted")
    plt.plot(self.home_kore_carried, label=self.home_agent + " (carried)", color="blue")
    plt.plot(self.away_kore_carried, label=self.away_agent + " (carried)", color="red")
    plt.title("Kore carried and stored over time")
    plt.xlim(-20,400+20)
    plt.legend()
    plt.show()

KoreMatch.plot_statistics_kore = plot_statistics_kore

In [None]:
kore_match.plot_statistics_kore()

In [None]:
%%writefile_and_run -a kore_analysis.py

def plot_statistics_shipyards(self):
    plt.figure(figsize=(15,4))
    plt.stairs(list(map(len,self.home_shipyards)), label=self.home_agent, lw=1.5, baseline=None, color="blue")
    plt.stairs(list(map(len,self.away_shipyards)), label=self.away_agent, lw=1.5, baseline=None, color="red")
    plt.title("Number of shipyards over time")
    plt.xlim(-20,400+20)
    plt.legend()
    plt.show()

KoreMatch.plot_statistics_shipyards = plot_statistics_shipyards

In [None]:
kore_match.plot_statistics_shipyards()

In [None]:
%%writefile_and_run -a kore_analysis.py

def plot_statistics_ships(self):
    plt.figure(figsize=(15,5))
    plt.stairs(self.home_ship_standby, label=self.home_agent + " (standby)", baseline=None, color="blue")
    plt.stairs(self.away_ship_standby, label=self.away_agent + " (standby)", baseline=None, color="red")
    plt.stairs(self.home_ship_launched, label=self.home_agent + " (launched)", baseline=None, color="blue", linestyle="dotted")
    plt.stairs(self.away_ship_launched, label=self.away_agent + " (launched)", baseline=None, color="red", linestyle="dotted")
    plt.title("Ships standby and launched over time")
    plt.xlim(-20,400+20)
    plt.legend()
    plt.show()

KoreMatch.plot_statistics_ships = plot_statistics_ships

In [None]:
kore_match.plot_statistics_ships()

In [None]:
%%writefile_and_run -a kore_analysis.py

def plot_statistics_kore_rates(self):        
    plt.figure(figsize=(15,5))
    plt.plot(self.home_mining_rates, label=self.home_agent + " (mining)", color="blue")
    plt.plot(self.away_mining_rates, label=self.away_agent + " (mining)", color="red")
    plt.stairs(self.home_spawing_costs, label=self.home_agent + " (spawning)", baseline=None, color="blue")
    plt.stairs(self.away_spawing_costs, label=self.away_agent + " (spawning)", baseline=None, color="red")
    plt.title("Kore change rates over time")
    plt.xlim(-20,400+20)
    plt.legend()
    plt.show()


KoreMatch.plot_statistics_kore_rates = plot_statistics_kore_rates

In [None]:
kore_match.plot_statistics_kore_rates()

In [None]:
%%writefile_and_run -a kore_analysis.py

def plot_statistics_combat_diffs(self):
    plt.figure(figsize=(15,5))
    plt.stairs(self.home_combat_diffs, label=self.home_agent + "(combat)", baseline=None, color="blue")
    plt.stairs(self.away_combat_diffs, label=self.away_agent + "(combat)", baseline=None, color="red")
    plt.title("Kore combat diffs over time")
    plt.xlim(-20,400+20)
    plt.legend()
    plt.show()

KoreMatch.plot_statistics_combat_diffs = plot_statistics_combat_diffs

In [None]:
kore_match.plot_statistics_combat_diffs()

In [None]:
%%writefile_and_run -a kore_analysis.py

def plot_statistics_asset_sums(self):        
    plt.figure(figsize=(15,3))
    plt.stairs(self.home_kore_asset_sums, label=self.home_agent, baseline=None, color="blue")
    plt.stairs(self.away_kore_asset_sums, label=self.away_agent, baseline=None, color="red")
    plt.title("Value of assets in terms of Kore over time")
    plt.xlim(-20,400+20)
    plt.legend()
    plt.show()

KoreMatch.plot_statistics_asset_sums = plot_statistics_asset_sums

In [None]:
kore_match.plot_statistics_asset_sums()

# Strategic Visualizations

In [None]:
%%writefile_and_run -a kore_analysis.py

def split_into_number_and_char(srr):
    # https://stackoverflow.com/q/430079/5894029
    arr = []
    for word in re.split('(\d+)', srr):
        try:
            num = int(word)
            arr.append(num)
        except ValueError:
            for c in word:
                arr.append(c)
    return arr

def reflect_points(xy_points):
    xy_points_reflected = set()
    for ex,ey in xy_points:
        for zx in [-2,-1,0,1,2]:
            for zy in [-2,-1,0,1,2]:
                xy_points_reflected.add((ex+zx*21, ey+zy*21))
    return xy_points_reflected


def simulate_flight_plan(x, y, dir_idx, plan, self_endpoints=set(), other_endpoints=set()):
    # returns xy_path, nesw_arr, construct, is_cyclic, attack_xy
    
    dir_to_dxdy = [(0,1), (1,0), (0,-1), (-1,0)]  # NESW
    dcode_to_dxdy = {"N":(0,1), "E":(1,0), "S":(0,-1), "W":(-1,0)}
    dxdy_to_dcode = {v:k for k,v in dcode_to_dxdy.items()}
    dx,dy = dir_to_dxdy[dir_idx]

    self_endpoints = reflect_points(self_endpoints)
    other_endpoints = reflect_points(other_endpoints)

    plan = collections.deque(split_into_number_and_char(plan))

    cx,cy = x, y
    xy_path = [(cx,cy)]
    nesw_arr = []
    construct = []
    target = None
    first_move_complete = False

    while plan:
        if first_move_complete and (cx, cy) in self_endpoints:
            return xy_path, nesw_arr, construct, (cx, cy) == (x,y), target
        if (cx, cy) in other_endpoints:  # lands in opponent shipyard
            target = (cx, cy)
            return xy_path, nesw_arr, construct, False, target
        first_move_complete = True
        word = plan.popleft()
        if type(word) == int:
            if word == 0:
                continue
            if not plan:
                continue
            cx += dx
            cy += dy
            xy_path.append((cx,cy))
            nesw_arr.append(dxdy_to_dcode[dx,dy])
            word -= 1
            if word > 0:
                plan.appendleft(word)
            continue
        if word == "C":
            construct.append((cx,cy))
            continue
        dx,dy = dcode_to_dxdy[word]
        cx += dx
        cy += dy
        xy_path.append((cx,cy))
        nesw_arr.append(dxdy_to_dcode[dx,dy])

    is_cyclic = False
    visited = set()
    for _ in range(21):
        if cx == x and cy == y:
            is_cyclic = True
        if (cx, cy) in self_endpoints:
            break
        if construct:
            break
        if (cx, cy) in other_endpoints:
            target = (cx, cy)
            break
        if (cx, cy) in visited:
            break
        visited.add((cx,cy))
        cx += dx
        cy += dy
        xy_path.append((cx,cy))
        nesw_arr.append(dxdy_to_dcode[dx,dy])

    return xy_path, nesw_arr, construct, is_cyclic, target

In [None]:
%%writefile_and_run -a kore_analysis.py

def classify_nesw_arr(nesw_arr):
    # returns FlightPlanClass, target_x, target_y
    
    assert not set(list(nesw_arr)) - set(list("NESW"))
    
    dcode_to_dxdy = {"N":(0,1), "E":(1,0), "S":(0,-1), "W":(-1,0)}
    dxdy_to_dcode = {v:k for k,v in dcode_to_dxdy.items()}
    
    x_max_extent, y_max_extent, x, y = 0, 0, 0, 0
    for dcode in nesw_arr:
        dx, dy = dcode_to_dxdy[dcode]
        x += dx
        y += dy
        if abs(x) > x_max_extent or abs(y) > y_max_extent:
            x_max_extent = abs(x)
            y_max_extent = abs(y)
            target_x, target_y = x, y
            
    nesw_str = "".join(nesw_arr)
    
    # orbit
    for d1 in "NSEW":
        if re.match(f"^[{d1}]+$", nesw_str):
            return (FlightPlanClass.acyclic, target_x, target_y)
    
    # sneek peek, yo-yo
    if nesw_str.count("N") == nesw_str.count("S") and nesw_str.count("E") == nesw_str.count("W"):
        for d1,d2 in zip("NSEW", "SNWE"):
            if re.match(f"^[{d1}]+[{d2}]+$", nesw_str):
                return (FlightPlanClass.boomerang, target_x, target_y)

    # acyclic
    for d1 in "NSEW":
        for d2 in "NSEW":
            if re.match(f"^[{d1}]+[{d2}]+$", nesw_str):
                return (FlightPlanClass.acyclic, target_x, target_y)

    # flat rectangle, rectangle
    if nesw_str.count("N") == nesw_str.count("S") and nesw_str.count("E") == nesw_str.count("W"):
        for d1,d2 in zip("NS", "SN"):
            for d3,d4 in zip("EW", "WE"):
                if re.match(f"^[{d1}]+[{d3}]+[{d2}]+[{d4}]+$", nesw_str):
                    return (FlightPlanClass.rectangle, target_x, target_y)
        for d1,d2 in zip("EW", "WE"):
            for d3,d4 in zip("NS", "SN"):
                if re.match(f"^[{d1}]+[{d3}]+[{d2}]+[{d4}]+$", nesw_str):
                    return (FlightPlanClass.rectangle, target_x, target_y)

    # crowbar, boomerang
    if nesw_str.count("N") == nesw_str.count("S") and nesw_str.count("E") == nesw_str.count("W"):
        for d1,d2 in zip("NS", "SN"):
            for d3,d4 in zip("EW", "WE"):
                if re.match(f"^[{d3}]+[{d1}]+[{d2}]+[{d4}]+$", nesw_str):
                    return (FlightPlanClass.boomerang, target_x, target_y)
        for d1,d2 in zip("EW", "WE"):
            for d3,d4 in zip("NS", "SN"):
                if re.match(f"^[{d3}]+[{d1}]+[{d2}]+[{d4}]+$", nesw_str):
                    return (FlightPlanClass.boomerang, target_x, target_y)

    return (FlightPlanClass.unknown, target_x, target_y)

In [None]:
nesw_arrs = ["N", "NNWW", "NNNSSS", "NEWS", "NWSE", "NEW"]
for nesw_arr in nesw_arrs:
    print(nesw_arr, "\t", classify_nesw_arr(nesw_arr))

In [None]:
%%writefile_and_run -a kore_analysis.py

def quantize_launch_action(shipyard_x, shipyard_y, fleet_size, flight_plan, shipyard_count,
                           xy_path, nesw_arr, construct, is_cyclic, attack_xy, 
                           flight_plan_class, target_x, target_y):

    polarity = flight_plan[0] == "N" or flight_plan[0] == "S"
    if fleet_size < 50:
        construct = []
        attack_xy = None
    
    if attack_xy != None:
        target_x, target_y = attack_xy
    elif construct:
        target_x, target_y = construct[0]
    else:
        target_x += shipyard_x
        target_y += shipyard_y
    
    diff_x = target_x - shipyard_x
    diff_y = target_y - shipyard_y
        
    if flight_plan_class <= 1:
        action_class = -2
    elif construct:
        action_class = 0
    elif attack_xy != None or not is_cyclic:
        action_class = 3
    elif polarity:
        action_class = 1
    else:
        action_class = 2
    
    return (shipyard_x,shipyard_y,target_x,target_y,diff_x,diff_y,action_class,fleet_size,"LAUNCH_"+flight_plan,shipyard_count)

In [None]:
%%writefile_and_run -a kore_analysis.py

def extract_player_actions_for_turn(self, turn_idx, acting_player_id, debug=False):
    if acting_player_id == 0:
        actions = self.home_actions[turn_idx]
        shipyard_infos = self.home_shipyards[turn_idx]
        shipyard_infos_prev = self.home_shipyards[turn_idx-1]
        self_shipyard_locations = self.home_shipyards_locations[turn_idx]
        other_shipyard_locations = self.away_shipyards_locations[turn_idx]
    else:
        actions = self.away_actions[turn_idx]
        shipyard_infos = self.away_shipyards[turn_idx]
        shipyard_infos_prev = self.away_shipyards[turn_idx-1]
        self_shipyard_locations = self.away_shipyards_locations[turn_idx]
        other_shipyard_locations = self.home_shipyards_locations[turn_idx]
    
    quantized_actions = []
    for shipyard_id, (loc_idx,_,_) in shipyard_infos.items():

        shipyard_x,shipyard_y = kaggle_environments.helpers.Point.from_index(int(loc_idx), 21)
        shipyard_count = -1
        if shipyard_id in shipyard_infos_prev:
            shipyard_count = shipyard_infos_prev[shipyard_id][1]
        
        if shipyard_id not in actions:
            # no action
            quantized_action = (shipyard_x, shipyard_y, shipyard_x, shipyard_y, 0, 0, -1, 0, "", shipyard_count)
            quantized_actions.append(quantized_action)
            continue

        action = actions[shipyard_id]
        if action.startswith("SPAWN"):
            _, spawn_amount = action.split("_")
            spawn_amount = int(spawn_amount)
            quantized_action = (shipyard_x, shipyard_y, shipyard_x, shipyard_y, 0, 0, -1, spawn_amount, action, shipyard_count)
            quantized_actions.append(quantized_action)
            continue
        
        if action.startswith("LAUNCH"):
            _, fleet_size, flight_plan = action.split("_")
            fleet_size = int(fleet_size)
            
            xy_path, nesw_arr, construct, is_cyclic, attack_xy = simulate_flight_plan(
                shipyard_x,shipyard_y, 0, flight_plan, self_endpoints=self_shipyard_locations, other_endpoints=other_shipyard_locations
            )
            flight_plan_class, target_x, target_y = classify_nesw_arr(nesw_arr)
            
            quantized_action = quantize_launch_action(
                shipyard_x, shipyard_y, fleet_size, flight_plan, shipyard_count,
                xy_path, nesw_arr, construct, is_cyclic, attack_xy, 
                flight_plan_class, target_x, target_y,
            )
            quantized_actions.append(quantized_action)
            continue

        assert False  # raise error

    actions_dict = [{
        "turn_idx": turn_idx,
        "shipyard_x": shipyard_x,
        "shipyard_y": shipyard_y,
        "target_x": target_x,
        "target_y": target_y,
        "diff_x": diff_x,
        "diff_y": diff_y,
        "action_class": action_class,
        "ship_amount": ship_amount,
        "action": action,
        "shipyard_count": shipyard_count,
    } for shipyard_x, shipyard_y, target_x, target_y, diff_x, diff_y, action_class, ship_amount, action, shipyard_count in quantized_actions]
        
    if debug:
        print("shipyard_infos\n", shipyard_infos, "\n")
        print("actions\n", actions, "\n")
        print("actions_dict", "\n" + "\n".join(str(action_dict) for action_dict in actions_dict))
    
    return actions_dict

KoreMatch.extract_player_actions_for_turn = extract_player_actions_for_turn

In [None]:
acting_player_id = 1
turn_idx = 170
_ = kore_match.extract_player_actions_for_turn(turn_idx, acting_player_id, debug=True)

In [None]:
%%writefile_and_run -a kore_analysis.py

def extract_player_actions(self, acting_player_id):
    actions_dict_all_turns = []
    for turn_idx in range(1, self.num_turns):
        actions_dict = self.extract_player_actions_for_turn(turn_idx, acting_player_id)
        actions_dict_all_turns.extend(actions_dict)
    actions_df = pd.DataFrame.from_dict(actions_dict_all_turns)
    actions_df["player_id"] = acting_player_id
    actions_df.loc[actions_df["action_class"] == 3, "diff_x"] = actions_df.loc[actions_df["action_class"] == 3]["diff_x"].clip(-10, 10)
    actions_df.loc[actions_df["action_class"] == 3, "diff_y"] = actions_df.loc[actions_df["action_class"] == 3]["diff_y"].clip(-10, 10)
    return actions_df

KoreMatch.extract_player_actions = extract_player_actions

In [None]:
kore_match.extract_player_actions(acting_player_id)

In [None]:
kore_match.extract_player_actions(acting_player_id)["action_class"].value_counts()

In [None]:
%%writefile_and_run -a kore_analysis.py

import plotly
import plotly.express

def plot_3d_matrix(input_matrix, **kwargs):
    dict_list = []
    xrr, yrr, zrr, crr = [], [], [], []
    nx, ny, nz = input_matrix.shape
    for x in range(nx):
        for y in range(ny):
            for z in range(nz):
                val = input_matrix[x,y,z]
                if val != 0:
                    dict_item = {
                        'x': x,
                        'y': y,
                        'z': z,
                        'value': val,
                    }
                    dict_list.append(dict_item)
    input_dataframe = pd.DataFrame.from_records(dict_list)
    plot_dataframe_of_3d_points(input_dataframe, **kwargs)


def plot_dataframe_of_3d_points(input_dataframe, x_colname='x', y_colname='y', z_colname='z', 
                                range_color_upper=None, range_y=[0,20], range_z=[0,20], 
                                scene_camera_eye=dict(x=0, y=-1, z=-1), scene={"aspectmode": "data"},
                                color_colname='value', symbol_colname=None, size_colname=None, dragmode='pan'):
    plotly.offline.init_notebook_mode()
    
    if range_color_upper == None:
        range_color_upper = max(0, min(1, max(input_dataframe[color_colname])))
    
    fig = plotly.express.scatter_3d(
        input_dataframe, x=x_colname, y=y_colname, z=z_colname,
        range_x=[0,max(input_dataframe[x_colname])], range_y=range_y, range_z=range_z,
        symbol=symbol_colname, size=size_colname,
        color=color_colname, range_color=[0,range_color_upper], color_continuous_scale='bluered',
        opacity=0.7, hover_data=input_dataframe.columns)
    
    fig.update_layout(margin=dict(l=0, r=0, b=0, t=0), scene=scene, 
                      scene_camera=dict(eye=scene_camera_eye), dragmode=dragmode)
    fig.show()

In [None]:
%%writefile_and_run -a kore_analysis.py

def plot_shipyard_on_location_over_turns(self):
    players_actions_df = pd.concat([self.extract_player_actions(0), self.extract_player_actions(1)])
    players_actions_df = players_actions_df[players_actions_df["action_class"] != -1]  # remove spawn from viz
    plot_dataframe_of_3d_points(players_actions_df, x_colname='turn_idx', y_colname='shipyard_x', z_colname='shipyard_y',
                                color_colname='player_id', symbol_colname='action_class', size_colname='ship_amount')
    
KoreMatch.plot_shipyard_on_location_over_turns = plot_shipyard_on_location_over_turns

In [None]:
kore_match.plot_shipyard_on_location_over_turns()

In [None]:
%%writefile_and_run -a kore_analysis.py

def plot_shipyard_on_diff_over_turns(self, acting_player_id):
    player_launches_df = self.extract_player_actions(acting_player_id)
    player_launches_df = player_launches_df[player_launches_df["action_class"] != -1].copy()  # remove spawn from viz
    plot_dataframe_of_3d_points(player_launches_df, x_colname='turn_idx', y_colname='diff_x', z_colname='diff_y',
                                range_color_upper=1, range_y=[-10,10], range_z=[-10,10],
                                color_colname='player_id', symbol_colname='action_class', size_colname='ship_amount')
    
KoreMatch.plot_shipyard_on_diff_over_turns = plot_shipyard_on_diff_over_turns

In [None]:
kore_match.plot_shipyard_on_diff_over_turns(0)

In [None]:
kore_match.plot_shipyard_on_diff_over_turns(1)

# Animation Generation

In [None]:
%%writefile_and_run -a kore_analysis.py

def draw_fleet(x,y,dir_idx,ships_size,kore_amount,color):
    mx,my = x+0.5, y+0.5
    
    icon_size = 0.4
    tip = (0, icon_size)
    left_wing = (icon_size/1.5, -icon_size)
    right_wing = (-icon_size/1.5, -icon_size)
    
    polygon = plt.Polygon([tip, left_wing, right_wing], color=color, alpha=0.3)
    transform = matplotlib.transforms.Affine2D().rotate_deg(270*dir_idx).translate(mx,my)
    polygon.set_transform(transform + plt.gca().transData)
    plt.gca().add_patch(
        polygon
    )
    
    text = plt.text(x+0.1, y+0.75, ships_size, color="purple",
                    horizontalalignment='left', verticalalignment='center')
    text.set_path_effects([matplotlib.patheffects.withStroke(linewidth=2, foreground='w', alpha=0.8)])
    
    kore_amount = int(kore_amount)
    if kore_amount > 0:
        text = plt.text(x+0.1, y+0.25, kore_amount, color="grey",
                        horizontalalignment='left', verticalalignment='center')
        text.set_path_effects([matplotlib.patheffects.withStroke(linewidth=3, foreground='w', alpha=0.8)])

def draw_flight_plan(x,y,dir_idx,plan,fleetsize,color,self_endpoints=set(),other_endpoints=set()):
    path, _, construct, _, _ = simulate_flight_plan(
        int(x+0.5),int(y+0.5),dir_idx,plan,self_endpoints=self_endpoints,other_endpoints=other_endpoints
    )

    px = np.array([x for x,y in path])
    py = np.array([y for x,y in path])
    for ox in [-21,0,21]:
        for oy in [-21,0,21]:
            plt.plot(px+ox, py+oy, color=color, lw=np.log(fleetsize+1)**2/1.5, alpha=0.3, solid_capstyle='round')
    for x,y in construct:
        plt.scatter(x, y, s=100, marker="x", color=color)

def existence_to_production_capacity(existence):
    if existence >= 294: return 10
    if existence >= 212: return 9
    if existence >= 147: return 8
    if existence >= 97: return 7
    if existence >= 60: return 6
    if existence >= 34: return 5
    if existence >= 17: return 4
    if existence >= 7: return 3
    if existence >= 2: return 2
    return 1
            
def draw_shipyard(x,y,ships_size,existence,color):
    plt.text(x+0.5,y+0.5,"⊕", fontsize=23, color=color,
             horizontalalignment='center', verticalalignment='center', alpha=0.5)
    if ships_size > 0:
        text = plt.text(x+0.1, y+0.75, ships_size, color="purple",
                        horizontalalignment='left', verticalalignment='center')
        text.set_path_effects([matplotlib.patheffects.withStroke(linewidth=2, foreground='w', alpha=0.8)])
    text = plt.text(x+0.9, y+0.25, existence_to_production_capacity(existence), color="black",
                    horizontalalignment='right', verticalalignment='center')
    text.set_path_effects([matplotlib.patheffects.withStroke(linewidth=2, foreground='w', alpha=0.8)])

def draw_kore_amounts(kore_amounts, excluded_xys={}):
    for loc_idx,kore_amount in enumerate(kore_amounts):
        x,y = kaggle_environments.helpers.Point.from_index(loc_idx, 21)
        color = "gainsboro"
        if kore_amount >= 20: color = "silver"
        if kore_amount >= 100: color = "gray"
        if kore_amount >= 500: color = "black"
        if (x,y) not in excluded_xys and kore_amount > 0:
            text = plt.text(x, y, int(kore_amount), color=color, fontsize=7,
                            horizontalalignment='center', verticalalignment='center')
            text.set_path_effects([matplotlib.patheffects.withStroke(linewidth=3, foreground='w', alpha=0.8)])

def draw_statistics(turn_num, home_stored_kore, away_stored_kore):
    plt.text(0-0.5, 21-0.5, f"Kore: {home_stored_kore:.0f}" , color="blue",
             horizontalalignment='left', verticalalignment='bottom')
    plt.text(21-0.5, 21-0.5, f"Kore: {away_stored_kore:.0f}" , color="red",
             horizontalalignment='right', verticalalignment='bottom')
    plt.text(21/2-0.5, 21-0.5, f"Turn: {turn_num:.0f}" , color="black",
             horizontalalignment='center', verticalalignment='bottom')    

In [None]:
%%writefile_and_run -a kore_analysis.py

def render_turn(self, turn):
    res = self.match_info
    # -0.5 for all x,y for all functions called here as a stopgap to shift axes
    turn_info = res[turn][0]["observation"]
    
    plt.gca().cla()
    plt.gcf().clf()
    plt.gcf().subplots_adjust(left=0.05, bottom=0.05, right=0.95, top=0.95, wspace=None, hspace=None)
    excluded_xys = set()

    color="blue"
    player_idx=0

    for shipyard_info in turn_info["players"][player_idx][1].values():
        loc_idx, ships_count, existence = shipyard_info
        x,y = kaggle_environments.helpers.Point.from_index(loc_idx, 21)
        draw_shipyard(x-0.5,y-0.5, ships_count, existence, color)
        excluded_xys.add((x,y))

    for fleet_info in turn_info["players"][player_idx][2].values():
        loc_idx, kore_amount, ships_size, dir_idx, flight_plan = fleet_info
        x,y = kaggle_environments.helpers.Point.from_index(loc_idx, 21)
        draw_fleet(x-0.5,y-0.5,dir_idx,ships_size,kore_amount, color)
        draw_flight_plan(x-0.5,y-0.5,dir_idx,flight_plan,ships_size, color, 
                         self_endpoints=self.home_shipyards_locations[turn],
                         other_endpoints=self.away_shipyards_locations[turn])
        excluded_xys.add((x,y))

    color="red"
    player_idx=1

    for shipyard_info in turn_info["players"][player_idx][1].values():
        loc_idx, ships_count, existence = shipyard_info
        x,y = kaggle_environments.helpers.Point.from_index(loc_idx, 21)
        draw_shipyard(x-0.5,y-0.5, ships_count, existence, color)
        excluded_xys.add((x,y))

    for fleet_info in turn_info["players"][player_idx][2].values():
        loc_idx, kore_amount, ships_size, dir_idx, flight_plan = fleet_info
        x,y = kaggle_environments.helpers.Point.from_index(loc_idx, 21)
        draw_fleet(x-0.5,y-0.5,dir_idx,ships_size,kore_amount, color)
        draw_flight_plan(x-0.5,y-0.5,dir_idx,flight_plan,ships_size, color, 
                         self_endpoints=self.away_shipyards_locations[turn],
                         other_endpoints=self.home_shipyards_locations[turn])
        excluded_xys.add((x,y))

    draw_kore_amounts(turn_info["kore"], excluded_xys=excluded_xys)
    draw_statistics(turn, self.home_kore_stored[turn], self.away_kore_stored[turn])

    plt.gca().set_xlim(0-0.5,21-0.5)
    plt.gca().set_ylim(0-0.5,21-0.5)
    plt.gca().set_aspect('equal')
    plt.gca().xaxis.set_major_locator(matplotlib.ticker.MaxNLocator(integer=True))
    plt.gca().yaxis.set_major_locator(matplotlib.ticker.MaxNLocator(integer=True))
    
    if self.save_animation:
        plt.savefig(f"frames/{turn:03}.png")

# https://stackoverflow.com/a/24865663/5894029
KoreMatch.render_turn = lambda self, turn: render_turn(self, turn)

In [None]:
if os.environ.get("KAGGLE_KERNEL_RUN_TYPE") == "Interactive":
    kore_match.render_turn(185)
    plt.show()
    assert False  # I want to stop here if running interactively

In [None]:
%%writefile_and_run -a kore_analysis.py

def animate(self):
    if self.save_animation:
        os.system("mkdir -p frames")

    self.anim = matplotlib.animation.FuncAnimation(plt.gcf(), self.render_turn, frames=len(self.match_info))
    self.html_animation = IPython.display.HTML(self.anim.to_jshtml())
    plt.close()
    
    if self.save_animation:
        os.system("convert -resize 75% -loop 0 frames/*.png gameplay.gif")
        os.system("rm -rf frames/*.png")

KoreMatch.animate = lambda self: animate(self)

# Function Export

In [None]:
%reset -sf

# you can generate the animation anywhere given the env object
import pickle
with open('env.pickle', 'rb') as handle:
    env = pickle.load(handle)

In [None]:
# just import the the KoreMatch class
# kore_analysis might be a different directory, recommend to copy to local directory
from kore_analysis import KoreMatch

kore_match = KoreMatch(env.steps, save_animation=True)
kore_match.animate()

# Strategy Visualizations

In [None]:
kore_match.html_animation

In [None]:
!rm -rf __pycache__/