In [None]:
from pathlib import Path
import pandas as pd
import os
import matplotlib.pyplot as plt
import numpy as np
import math
import statistics
from random import choice, shuffle
import scipy.stats as st
import seaborn as sns
import csv
import random


In [None]:
# Import the GVGAI win-rate results from the external csv files.
# If the levels param is not None then only results from the specified levels will be imported.
def gvgai_dataset_import(levels=None):
    original_results = {}
    game_path = "datasets/gvgai_raw/"
    game_names = os.listdir(game_path)
    for game in game_names:
        original_results[game] = {}
        if levels == None:
            level_path = game_path + game + "/"
            level_names = os.listdir(level_path)
        else:
            level_names = levels
        for level in level_names:
            agent_names = os.listdir(game_path+game+"/"+level+"/")
            for agent in agent_names:
                csv_input = pd.read_csv(game_path+game+"/"+level+"/"+agent+"/alpha_rank_data.csv", sep=',', header=0)
                results_temp = csv_input.to_numpy()
                if original_results[game].get(agent) is None:
                    original_results[game][agent] = []
                for row in results_temp:
                    if row[0] >= 0: # Win-rate cannot be negative, indicates an incomplete trial
                        original_results[game][agent].append(row[0])
    return original_results

# Import the Ludii win-rate results from the external csv files.
def ludii_dataset_import():
    original_results = {}
    game_path = "datasets/ludii_raw/"
    game_names = os.listdir(game_path)
    for game in game_names:
        original_results[game] = {}
        agent_names = os.listdir(game_path + game + "/")
        for agent in agent_names:
            csv_input = pd.read_csv(game_path + game + "/" + agent + "/alpha_rank_data.csv", sep=',', header=0)
            agent_new = agent.split("UCT")[0]
            results_temp = csv_input.to_numpy()
            if original_results[game].get(agent_new) is None:
                original_results[game][agent_new] = []
            for row in results_temp:
                original_results[game][agent_new].append(row[0])
    return original_results

# Provides an empty version of the full results structure to be filled by sampled results
def get_sample_results_template(results):
    sample_results = {}
    for game in results.keys():
        sample_results[game] = {}
        for agent in results[game].keys():
            sample_results[game][agent] = []
    return sample_results


In [None]:
# Updates the avg_win_rates from the provided results. 
# If last_game is not None, then only the results for this game will be updated.
def average_results(results, last_game, avg_win_rates):
    for game in results.keys():
        if last_game == None or last_game == game:
            avg_win_rates[game] = {}
        for agent in results[game].keys():
            avg_win_rate = 0
            for r in range(len(results[game][agent])):
                avg_win_rate += results[game][agent][r]
            avg_win_rates[game][agent] = avg_win_rate / len(results[game][agent])
    return avg_win_rates

# Returns the best agent for a given game based on the provided results.
def get_best_agent(game_results):
    best_agent_value = float('-inf')
    best_agent = None
    for agent in game_results.keys():
        if game_results[agent] > best_agent_value:
            best_agent_value = game_results[agent]
            best_agent = agent
    return best_agent

# Calculates the regret (difference) between the sample best agent and the true best agent.
def regret(sample_results, true_results):
    difference = 0.0
    counter = 0.0
    for game in true_results.keys():
        best_agent_sample = get_best_agent(sample_results[game])
        best_agent_true = get_best_agent(true_results[game])
        game_difference = true_results[game][best_agent_true] - true_results[game][best_agent_sample]
        difference += game_difference
        counter += 1.0
    return difference / counter

# Calculates the APE between the sample best agent and the true best agent.
def average_probability_error(sample_results, true_results):
    difference = 0.0
    counter = 0.0
    for game in true_results.keys():
        best_agent_sample = get_best_agent(sample_results[game])
        best_agent_true = get_best_agent(true_results[game])
        game_difference = true_results[game][best_agent_true] - true_results[game][best_agent_sample]
        if game_difference > 0.0:
            game_difference = 1.0
        difference += game_difference
        counter += 1.0
    return difference / counter

# Calculates the best and second best agents, based on their average win-rates, for a given game.
def calculateBestWinRates(game, avg_win_rate):
    best_win_rate = -1
    second_best_win_rate = -1
    best_agent = None
    second_best_agent = None
    for agent in avg_win_rate[game].keys():
        if avg_win_rate[game][agent] > best_win_rate:
            second_best_win_rate = best_win_rate
            best_win_rate = avg_win_rate[game][agent]
            best_agent = agent
        elif avg_win_rate[game][agent] > second_best_win_rate:
            second_best_win_rate = avg_win_rate[game][agent]
            second_best_agent = agent
    return best_win_rate, second_best_win_rate, best_agent, second_best_agent

# Finds the game-agent pair with the highest score.
# If single_game is not None, then only the scores from this game will be compared
def findMaxScore(scores, single_game=None):
    max_score = float('-inf')
    max_game = None
    max_agent = None
    for game in scores.keys():
        if single_game == None or single_game == game:
            for agent in scores[game].keys():
                if scores[game][agent] > max_score:
                    max_score = scores[game][agent]
                    max_game = game
                    max_agent = agent
    return max_game, max_agent


In [None]:
# Returns the upper and lower confidence bounds for the Wilson Score
# p = win_rate, n = num_arm_samples
def calculateWilsonScoreBounds(p, n, alpha=0.05, continuity_correction=False):
    z = st.norm.ppf(1-(alpha/2))
    if continuity_correction == True:
        # Wallis, Sean A. (2021). Statistics in Corpus Linguistics - a new approach. New York: Routledge. ISBN 9781138589384.
        lowerBound,_ = calculateWilsonScoreBounds(max(p - 1/(2*n), 0), n, alpha, False)
        _,upperBound = calculateWilsonScoreBounds(min(p + 1/(2*n), 1), n, alpha, False)
    else:
        midPoint = (p + ((z*z) / (2*n))) / (1 + (z*z) / n)
        bound = (z / (1 + (z*z) / n)) * math.sqrt((p * (1-p)) / n + ((z*z) / (4 * (n*n))))
        lowerBound = midPoint - bound
        upperBound = midPoint + bound
    return lowerBound, upperBound

# Returns the upper and lower confidence bounds for UCB
# p = win_rate, n = num_arm_samples, t = total_samples
# c = exploration constant, with value of 2 being equivalent to equal UCB1
def calculateUCBBounds(p, n, t, c=2.0):
    bound = math.sqrt((c * math.log(t)) / n)
    lowerBound = p - bound
    upperBound = p + bound
    return lowerBound, upperBound

# Returns the total number of arms pulled so far
def calculate_arm_pulls(results):
    total_pulls = 0
    for game in results.keys():
        for agent in results[game].keys():
            for result in results[game][agent]:
                total_pulls += 1
    return total_pulls


In [None]:
def calculateBMK(test_results, last_game, scores, win_rate, alpha):
    avg_win_rate = average_results(test_results, last_game, win_rate)
    for game in avg_win_rate.keys():
        if last_game == None or last_game == game:
            scores[game] = {}
            best_win_rate, second_best_win_rate, best_agent, second_best_agent = calculateBestWinRates(game, avg_win_rate)
            for agent in test_results[game].keys():
                num_samples = len(test_results[game][agent])
                if agent == best_agent:
                    difference = best_win_rate - second_best_win_rate
                else:
                    difference = best_win_rate - avg_win_rate[game][agent]
                scores[game][agent] = -(difference) + math.sqrt(alpha/num_samples)
    return scores, avg_win_rate

def calculateBMKV(test_results, last_game, scores, win_rate, alpha):
    avg_win_rate = average_results(test_results, last_game, win_rate)
    for game in avg_win_rate.keys():
        if last_game == None or last_game == game:
            scores[game] = {}
            best_win_rate, second_best_win_rate, best_agent, second_best_agent = calculateBestWinRates(game, avg_win_rate)
            for agent in test_results[game].keys():
                num_samples = len(test_results[game][agent])
                if (num_samples == 1):
                    scores[game][agent] = math.inf
                else:
                    win_rates = [];
                    for i in range(num_samples):
                        win_rates.append(test_results[game][agent][i])
                    variance = np.var(win_rates)
                    if agent == best_agent:
                        difference = best_win_rate - second_best_win_rate
                    else:
                        difference = best_win_rate - avg_win_rate[game][agent]
                    scores[game][agent] = -(difference) + math.sqrt((2*alpha*variance)/num_samples) + ((7*alpha)/(3*(num_samples-1)))
    return scores, avg_win_rate

# --------------------------------------------

def calculatePossibleRegretChangeWilsonUpperBounds(test_results, last_game, last_agent, scores, win_rate, alpha, exploration_constant, total_num_arms):
    # dynamic alpha value
    if alpha == None:
        last_game = None # need to recalculate score for each arm / agent after every pull regardless of bandit / game
        c = exploration_constant # higher value means more exploration (1, 2, 4, 8, 16)
        total_samples = calculate_arm_pulls(test_results)
        alpha = total_num_arms / (total_samples * c)
    avg_win_rate = average_results(test_results, last_game, win_rate)
    for game in avg_win_rate.keys():
        if last_game == None or last_game == game:
            scores[game] = {}
            best_win_rate, second_best_win_rate, best_agent, second_best_agent = calculateBestWinRates(game, avg_win_rate)
            for agent in test_results[game].keys():
                num_samples = len(test_results[game][agent])
                win_rate = avg_win_rate[game][agent]
                lowerBound, upperBound = calculateWilsonScoreBounds(win_rate, num_samples, alpha)
                if agent == best_agent:
                    scores[game][agent] = win_rate - lowerBound
                else:
                    scores[game][agent] = upperBound - best_win_rate
    return scores, avg_win_rate

def calculateUCBE(test_results, last_game, scores, win_rate, alpha):
    # Jean-Yves Audibert, Sébastien Bubeck. Best Arm Identification in Multi-Armed Bandits. COLT-2010.
    total_samples = calculate_arm_pulls(test_results)
    avg_win_rate = average_results(test_results, last_game, win_rate)
    for game in test_results.keys():
        scores[game] = {}
        for agent in test_results[game].keys():
            num_samples = len(test_results[game][agent])
            win_rate = avg_win_rate[game][agent]
            lowerBound, upperBound = calculateUCBBounds(win_rate, num_samples, total_samples, alpha)
            scores[game][agent] = win_rate + upperBound
    return scores, avg_win_rate


In [None]:
def singleSample(original_results, true_win_rate):
    # get a single sample from across all games
    win_rate_regret = []
    win_rate_ape = []
    sample_results = get_sample_results_template(original_results)
    for game in original_results.keys():
        for agent in original_results[game].keys():
            sample_results[game][agent].append(original_results[game][agent][0])
    avg_win_rate = average_results(sample_results, None, {})
    win_rate_regret.append(regret(avg_win_rate, true_win_rate))
    win_rate_ape.append(average_probability_error(avg_win_rate, true_win_rate))
    return sample_results, win_rate_regret, win_rate_ape

def get_game_agent_pairs(original_results):
    game_agent_pairs = []
    game_keys = list(original_results.keys())
    for game in game_keys:
        agent_keys = list(original_results[game].keys())
        for agent in agent_keys:
            game_agent_pairs.append([game,agent])
    return game_agent_pairs

In [None]:
# Returns the number of times that each game has been selected
def numberTimesGameSelected(sample_results):
    times_selected = []
    for g in sample_results.keys():
        num_trials = 0
        num_agents = 0
        for j in sample_results[g].keys():
            num_agents += 1
            num_trials += len(sample_results[g][j])
        times_selected.append(g + "," + str(num_agents) + "," + str(num_trials))
    return times_selected

In [None]:
# Sample from all game/agent pairs with replacement.
def randomSample(original_results, true_win_rate, n, batch_size):
    sample_results, win_rate_regret, win_rate_ape = singleSample(original_results, true_win_rate)
    counter = 0
    for i in range(n):
        game = choice(list(original_results.keys()))
        agent = choice(list(original_results[game].keys()))
        sample_results[game][agent].append(choice(original_results[game][agent]))
        counter += 1
        if counter == batch_size:
            avg_win_rate = average_results(sample_results, None, {})
            win_rate_regret.append(regret(avg_win_rate, true_win_rate))
            win_rate_ape.append(average_probability_error(avg_win_rate, true_win_rate))
            counter = 0
    return win_rate_regret, win_rate_ape


# Sample from all game/agent pairs without replacement.
def uniformSample(original_results, true_win_rate, n, batch_size):
    sample_results, win_rate_regret, win_rate_ape = singleSample(original_results, true_win_rate)
    counter = 0
    game_agent_pairs = []
    for i in range(n):
        if len(game_agent_pairs) == 0:
            game_agent_pairs = get_game_agent_pairs(original_results)
        game_agent_pair = choice(game_agent_pairs)
        game = game_agent_pair[0]
        agent = game_agent_pair[1]
        game_agent_pairs.remove(game_agent_pair)
        sample_results[game][agent].append(choice(original_results[game][agent]))
        counter += 1
        if counter == batch_size:
            avg_win_rate = average_results(sample_results, None, {})
            win_rate_regret.append(regret(avg_win_rate, true_win_rate))
            win_rate_ape.append(average_probability_error(avg_win_rate, true_win_rate))
            counter = 0
    return win_rate_regret, win_rate_ape


# Taken from Multi-Bandit Best Arm Identification
def gapESample(original_results, true_win_rate, n, batch_size, alpha=1.0):
    sample_results, win_rate_regret, win_rate_ape = singleSample(original_results, true_win_rate)
    game = None
    scores = {}
    win_rate = {}
    counter = 0
    for i in range(n):
        scores, win_rate = calculateBMK(sample_results, game, scores, win_rate, alpha)
        game, agent = findMaxScore(scores)
        sample_results[game][agent].append(choice(original_results[game][agent]))
        counter += 1
        if counter == batch_size:
            win_rate_regret.append(regret(win_rate, true_win_rate))
            win_rate_ape.append(average_probability_error(win_rate, true_win_rate))
            counter = 0
    return win_rate_regret, win_rate_ape


# Taken from Multi-Bandit Best Arm Identification. 
def gapEVSample(original_results, true_win_rate, n, batch_size, alpha=1.0):
    sample_results, win_rate_regret, win_rate_ape = singleSample(original_results, true_win_rate)
    game = None
    scores = {}
    win_rate = {}
    counter = 0
    for i in range(n):
        scores, win_rate = calculateBMKV(sample_results, game, scores, win_rate, alpha)
        game, agent = findMaxScore(scores)
        sample_results[game][agent].append(choice(original_results[game][agent]))
        counter += 1
        if counter == batch_size:
            win_rate_regret.append(regret(win_rate, true_win_rate))
            win_rate_ape.append(average_probability_error(win_rate, true_win_rate))
            counter = 0
    return win_rate_regret, win_rate_ape


# Proposed approach of selecting the arm that would have the highest potential effect on regret for the optimal result
def optimisticSampleWilson(original_results, true_win_rate, n, batch_size, exploration_constant=1.0, alpha=None):
    sample_results, win_rate_regret, win_rate_ape = singleSample(original_results, true_win_rate)
    game = None
    agent = None
    scores = {}
    win_rate = {}
    counter = 0
    game_counts = {}

    total_num_arms = 0
    for game in sample_results.keys():
        for agent in sample_results[game].keys():
            total_num_arms += 1
    
    for i in range(n):
        scores, win_rate = calculatePossibleRegretChangeWilsonUpperBounds(sample_results, game, agent, scores, win_rate, alpha, exploration_constant, total_num_arms)
        game, agent = findMaxScore(scores)
        sample_results[game][agent].append(choice(original_results[game][agent]))
        counter += 1
        if counter == batch_size:
            win_rate_regret.append(regret(win_rate, true_win_rate))
            win_rate_ape.append(average_probability_error(win_rate, true_win_rate))
            counter = 0    
    for game in sample_results:
        game_counts[game] = 0
        for agent in sample_results[game]:
            game_counts[game] += len(sample_results[game][agent])
    return win_rate_regret, win_rate_ape

def UniformUCBESample(original_results, true_win_rate, n, batch_size, alpha=2.0):   
    sample_results, win_rate_regret, win_rate_ape = singleSample(original_results, true_win_rate)
    game = None
    scores = {}
    win_rate = {}
    counter = 0
    games = []
    for i in range(n):
        if len(games) == 0:
            games = list(sample_results.keys())
        selected_game = choice(games)
        games.remove(selected_game)
        scores, win_rate = calculateUCBE(sample_results, game, scores, win_rate, alpha)
        game, agent = findMaxScore(scores, selected_game)
        sample_results[game][agent].append(choice(original_results[game][agent]))
        counter += 1
        if counter == batch_size:
            win_rate_regret.append(regret(win_rate, true_win_rate))
            win_rate_ape.append(average_probability_error(win_rate, true_win_rate))
            counter = 0
    return win_rate_regret, win_rate_ape


def successiveRejects(original_results, true_win_rate, n, batch_size):
    sample_results, win_rate_regret, win_rate_ape = singleSample(original_results, true_win_rate)
    round_pulls = {}
    agent_pulls = {}
    remaining_agents = {}
    counter = 0
    total_pulls = 0
    
    max_num_rounds = 0
    num_games = len(sample_results)
    num_pulls_per_game = math.floor(n / num_games)
    for game in sample_results.keys():
        remaining_agents[game] = []
        for agent in list(sample_results[game].keys()):
            remaining_agents[game].append(agent)
        num_agents = len(remaining_agents[game])
        nk = [0]
        log_line = 0.5
        for i in range(2, num_agents+1):
            log_line += 1.0/i
        for k in range (1, num_agents):
            nk.append(math.ceil((1 / log_line) * ((num_pulls_per_game - num_agents) / (num_agents + 1 - k))))
        pulls = []
        for i in range(1, len(nk)):
            pulls.append(nk[i]-nk[i-1])
        round_pulls[game] = pulls
        if len(pulls) > max_num_rounds:
            max_num_rounds = len(pulls)
        
    for r in range(max_num_rounds):
        # Reset pulls across games for all remaining agents
        for game in sample_results.keys():
            agent_pulls[game] = {}
            for agent in remaining_agents[game]:
                if len(round_pulls[game]) > r:
                    agent_pulls[game][agent] = round_pulls[game][r]
                else:
                    agent_pulls[game][agent] = 0 # We have already finished pulling for this game
                
        # Select a random arm with pulls remaining
        pulls_remaining = True
        while(pulls_remaining):
            possible_choices = []
            for game in sample_results.keys():
                for agent in remaining_agents[game]:
                    if agent_pulls[game][agent] > 0:
                        possible_choices.append([game,agent])
            if len(possible_choices) == 0:
                pulls_remaining = False
                break 
            randomly_chosen_arm = random.choice(possible_choices)
            game = randomly_chosen_arm[0]
            agent = randomly_chosen_arm[1]
            agent_pulls[game][agent] -= 1
            sample_results[game][agent].append(choice(original_results[game][agent]))
            counter += 1
            total_pulls += 1
            if counter == batch_size:
                avg_win_rate = average_results(sample_results, None, {})
                win_rate_regret.append(regret(avg_win_rate, true_win_rate))
                win_rate_ape.append(average_probability_error(avg_win_rate, true_win_rate))
                counter = 0 
                
        # remove worst agent(s)
        # shuffle the list order so that agents are removed at random in event of a tie.
        for game in sample_results.keys():
            worst_agent = None
            worst_agent_score = math.inf
            shuffle(remaining_agents[game])
            if len(remaining_agents[game]) > 1:
                for agent in remaining_agents[game]:
                    avg_score = 0
                    for score in sample_results[game][agent]:
                        avg_score += score
                    avg_score = avg_score / len(sample_results[game][agent])
                    if (avg_score <= worst_agent_score):
                        worst_agent_score = avg_score
                        worst_agent = agent
                remaining_agents[game].remove(worst_agent)
                
    # successive reject can finish prematurely, so assume that regret does not change after this point.
    while len(win_rate_regret) < n/batch_size+1:
        win_rate_regret.append(win_rate_regret[-1])
        
    return win_rate_regret, win_rate_ape


def sequentialHalving(original_results, true_win_rate, n, batch_size):
    sample_results, win_rate_regret, win_rate_ape = singleSample(original_results, true_win_rate)
    round_pulls = {}
    agent_pulls = {}
    remaining_agents = {}
    counter = 0
    total_pulls = 0
    
    max_num_rounds = 0
    num_games = len(sample_results)
    num_pulls_per_game = math.floor(n / num_games)
    
    for game in sample_results.keys():
        remaining_agents[game] = []
        for agent in list(sample_results[game].keys()):
            remaining_agents[game].append(agent)
        num_agents = len(remaining_agents[game])
        
        pulls = []
        sk = num_agents
        b = math.ceil(math.log(num_agents,2))
        for k in range(b):
            pulls.append(math.floor(num_pulls_per_game/(sk*b)))
            sk = math.ceil(sk/2)
        round_pulls[game] = pulls
        if len(pulls) > max_num_rounds:
            max_num_rounds = len(pulls)

    for r in range(max_num_rounds):
        
        # Reset pulls across games for all remaining agents
        for game in sample_results.keys():
            agent_pulls[game] = {}
            for agent in remaining_agents[game]:
                if len(round_pulls[game]) > r:
                    agent_pulls[game][agent] = round_pulls[game][r]
                else:
                    agent_pulls[game][agent] = 0 # We have already finished pulling for this game

        # Select a random arm with pulls remaining
        pulls_remaining = True
        while(pulls_remaining):
            possible_choices = []
            for game in sample_results.keys():
                for agent in remaining_agents[game]:
                    if agent_pulls[game][agent] > 0:
                        possible_choices.append([game,agent])
            if len(possible_choices) == 0:
                pulls_remaining = False
                break 
            randomly_chosen_arm = random.choice(possible_choices)
            game = randomly_chosen_arm[0]
            agent = randomly_chosen_arm[1]
            agent_pulls[game][agent] -= 1
            sample_results[game][agent].append(choice(original_results[game][agent]))
            counter += 1
            total_pulls += 1
            if counter == batch_size:
                avg_win_rate = average_results(sample_results, None, {})
                win_rate_regret.append(regret(avg_win_rate, true_win_rate))
                win_rate_ape.append(average_probability_error(avg_win_rate, true_win_rate))
                counter = 0 
                
        # remove worst agent(s)
        # shuffle the list order so that agents are removed at random in event of a tie.
        for game in sample_results.keys():
            num_agents_to_keep = math.ceil(len(remaining_agents[game])/2)
            shuffle(remaining_agents[game])
            while len(remaining_agents[game]) > num_agents_to_keep:
                worst_agent = None
                worst_agent_score = math.inf
                if len(remaining_agents[game]) > 1:
                    for agent in remaining_agents[game]:
                        avg_score = 0
                        for score in sample_results[game][agent]:
                            avg_score += score
                        avg_score = avg_score / len(sample_results[game][agent])
                        if (avg_score <= worst_agent_score):
                            worst_agent_score = avg_score
                            worst_agent = agent
                    remaining_agents[game].remove(worst_agent)
                
    # successive reject can finish prematurely, so assume that regret does not change after this point.
    while len(win_rate_regret) < n/batch_size+1:
        win_rate_regret.append(win_rate_regret[-1])
        
    return win_rate_regret, win_rate_ape


def anytimeSequentialHalving(original_results, true_win_rate, n, batch_size):
    sample_results, win_rate_regret, win_rate_ape = singleSample(original_results, true_win_rate)
    counter = 0
    total_pulls = 0
    
    num_games = len(sample_results)
    num_pulls_per_game = math.floor(n / num_games)
    
    while total_pulls < n:
        max_num_rounds = 0
        round_pulls = {}
        agent_pulls = {}
        remaining_agents = {}
        
        for game in sample_results.keys():
            remaining_agents[game] = []
            for agent in list(sample_results[game].keys()):
                remaining_agents[game].append(agent)
            num_agents = len(remaining_agents[game])

            pulls = [1]
            b = math.ceil(math.log(num_agents,2))
            for k in range(b):
                pulls.append(pulls[k] * 2)
            round_pulls[game] = pulls
            if len(pulls) > max_num_rounds:
                max_num_rounds = len(pulls)

        for r in range(max_num_rounds):

            # Reset pulls across games for all remaining agents
            for game in sample_results.keys():
                agent_pulls[game] = {}
                for agent in remaining_agents[game]:
                    if len(round_pulls[game]) > r:
                        agent_pulls[game][agent] = round_pulls[game][r]
                    else:
                        agent_pulls[game][agent] = 0 # We have already finished pulling for this game

            # Select a random arm with pulls remaining
            pulls_remaining = True
            while(pulls_remaining):
                possible_choices = []
                for game in sample_results.keys():
                    for agent in remaining_agents[game]:
                        if agent_pulls[game][agent] > 0:
                            possible_choices.append([game,agent])
                if len(possible_choices) == 0:
                    pulls_remaining = False
                    break 
                randomly_chosen_arm = random.choice(possible_choices)
                game = randomly_chosen_arm[0]
                agent = randomly_chosen_arm[1]
                agent_pulls[game][agent] -= 1
                sample_results[game][agent].append(choice(original_results[game][agent]))
                counter += 1
                total_pulls += 1
                if counter == batch_size:
                    avg_win_rate = average_results(sample_results, None, {})
                    win_rate_regret.append(regret(avg_win_rate, true_win_rate))
                    win_rate_ape.append(average_probability_error(avg_win_rate, true_win_rate))
                    counter = 0 
                    
                    if total_pulls == n:
                        return win_rate_regret, win_rate_ape

            # remove worst agent(s)
            # shuffle the list order so that agents are removed at random in event of a tie.
            for game in sample_results.keys():
                num_agents_to_keep = math.ceil(len(remaining_agents[game])/2)
                shuffle(remaining_agents[game])
                while len(remaining_agents[game]) > num_agents_to_keep:
                    worst_agent = None
                    worst_agent_score = math.inf
                    if len(remaining_agents[game]) > 1:
                        for agent in remaining_agents[game]:
                            avg_score = 0
                            for score in sample_results[game][agent]:
                                avg_score += score
                            avg_score = avg_score / len(sample_results[game][agent])
                            if (avg_score <= worst_agent_score):
                                worst_agent_score = avg_score
                                worst_agent = agent
                        remaining_agents[game].remove(worst_agent)

    return win_rate_regret, win_rate_ape


In [None]:
def run_experiment(original_results, true_win_rate, methodName, repeats=10, n=50000, batch_size=1000):
    print("------------------------------------------------------")
    print(methodName)
    
    # Generate results
    win_rate_regret_results = []
    win_rate_ape_results = []
    for i in range(repeats):
        print(i)
        if methodName[0] == "randomSample":
            win_rate_regret, win_rate_ape = randomSample(original_results, true_win_rate, n, batch_size)
            win_rate_regret_results.append(win_rate_regret)
            win_rate_ape_results.append(win_rate_ape)
        elif methodName[0] == "uniformSample":
            win_rate_regret, win_rate_ape = uniformSample(original_results, true_win_rate, n, batch_size)
            win_rate_regret_results.append(win_rate_regret)
            win_rate_ape_results.append(win_rate_ape)
        elif methodName[0] == "gapESample":
            win_rate_regret, win_rate_ape = gapESample(original_results, true_win_rate, n, batch_size, methodName[1])
            win_rate_regret_results.append(win_rate_regret)
            win_rate_ape_results.append(win_rate_ape)
        elif methodName[0] == "gapEVSample":
            win_rate_regret, win_rate_ape = gapEVSample(original_results, true_win_rate, n, batch_size, methodName[1])
            win_rate_regret_results.append(win_rate_regret)
            win_rate_ape_results.append(win_rate_ape)
        elif methodName[0] == "UniformUCBESample":
            win_rate_regret, win_rate_ape = UniformUCBESample(original_results, true_win_rate, n, batch_size, methodName[1])
            win_rate_regret_results.append(win_rate_regret)
            win_rate_ape_results.append(win_rate_ape)
        elif methodName[0] == "successiveRejects":
            win_rate_regret, win_rate_ape = successiveRejects(original_results, true_win_rate, n, batch_size)
            win_rate_regret_results.append(win_rate_regret)
            win_rate_ape_results.append(win_rate_ape)
        elif methodName[0] == "sequentialHalving":
            win_rate_regret, win_rate_ape = sequentialHalving(original_results, true_win_rate, n, batch_size)
            win_rate_regret_results.append(win_rate_regret)
            win_rate_ape_results.append(win_rate_ape)
        elif methodName[0] == "anytimeSequentialHalving":
            win_rate_regret, win_rate_ape = anytimeSequentialHalving(original_results, true_win_rate, n, batch_size)
            win_rate_regret_results.append(win_rate_regret)
            win_rate_ape_results.append(win_rate_ape)
        elif methodName[0] == "anytimeSuccessiveRejects":
            win_rate_regret, win_rate_ape = anytimeSuccessiveRejects(original_results, true_win_rate, n, batch_size)
            win_rate_regret_results.append(win_rate_regret)
            win_rate_ape_results.append(win_rate_ape)
        elif methodName[0] == "optimisticSampleWilson":
            win_rate_regret, win_rate_ape = optimisticSampleWilson(original_results, true_win_rate, n, batch_size, methodName[1])
            win_rate_regret_results.append(win_rate_regret)
            win_rate_ape_results.append(win_rate_ape)
        else:
            print("Invalid method name")
            return
        
    #------------------------------------------------

    # Save results (regret)
    with open("results/"+str(methodName)+"_results_regret.csv", "w", newline='') as my_csv:
        csvWriter = csv.writer(my_csv,delimiter=',')
        csvWriter.writerows(win_rate_regret_results)
    
    # Display results (regret)
    win_rate_regret = []
    for i in range(len(win_rate_regret_results[0])):
        temp_results = []
        for j in range(len(win_rate_regret_results)):
            temp_results.append(win_rate_regret_results[j][i])
        win_rate_regret.append(statistics.mean(temp_results))
    plt.title("Win-Rate Regret")
    plt.plot(np.arange(len(win_rate_regret)), win_rate_regret, color="red")
    plt.show()
    for i in range(len(win_rate_regret)):
        print(win_rate_regret[i])
        
    #------------------------------------------------
        
    # Save results (ape)
    with open("results/"+str(methodName)+"_results_ape.csv", "w", newline='') as my_csv:
        csvWriter = csv.writer(my_csv,delimiter=',')
        csvWriter.writerows(win_rate_ape_results)
    
    # Display results (ape)
    win_rate_ape = []
    for i in range(len(win_rate_ape_results[0])):
        temp_results = []
        for j in range(len(win_rate_ape_results)):
            temp_results.append(win_rate_ape_results[j][i])
        win_rate_ape.append(statistics.mean(temp_results))
    plt.title("Win-Rate APE")
    plt.plot(np.arange(len(win_rate_ape)), win_rate_ape, color="red")
    plt.show()
    for i in range(len(win_rate_ape)):
        print(win_rate_ape[i])


In [None]:
def load_dataset(dataset="gvgai"):
    
    # Import original results set
    if dataset == "gvgai_0":
        original_results = gvgai_dataset_import(levels=["0"])
    elif dataset == "gvgai": 
        original_results = gvgai_dataset_import()
    elif dataset == "ludii": 
        original_results = ludii_dataset_import()
    else:
        print("ERROR! Invalid dataset name")

    # Determine the "true" win-rate of all agent-game pairs for future regret caulculations
    true_win_rate = average_results(original_results, None, {})
    
    return original_results, true_win_rate
    

def main():

    original_results, true_win_rate = load_dataset()
    run_experiment(original_results, true_win_rate, ["randomSample"])
    original_results, true_win_rate = load_dataset()
    run_experiment(original_results, true_win_rate, ["uniformSample"])

    original_results, true_win_rate = load_dataset()
    run_experiment(original_results, true_win_rate, ["gapESample", 1])
    original_results, true_win_rate = load_dataset()
    run_experiment(original_results, true_win_rate, ["gapESample", 2])
    original_results, true_win_rate = load_dataset()
    run_experiment(original_results, true_win_rate, ["gapESample", 4])
    original_results, true_win_rate = load_dataset()
    run_experiment(original_results, true_win_rate, ["gapESample", 8])
    original_results, true_win_rate = load_dataset()
    run_experiment(original_results, true_win_rate, ["gapESample", 16])

    original_results, true_win_rate = load_dataset()
    run_experiment(original_results, true_win_rate, ["gapEVSample", 1])
    original_results, true_win_rate = load_dataset()
    run_experiment(original_results, true_win_rate, ["gapEVSample", 2])
    original_results, true_win_rate = load_dataset()
    run_experiment(original_results, true_win_rate, ["gapEVSample", 4])
    original_results, true_win_rate = load_dataset()
    run_experiment(original_results, true_win_rate, ["gapEVSample", 8])
    original_results, true_win_rate = load_dataset()
    run_experiment(original_results, true_win_rate, ["gapEVSample", 16])

    original_results, true_win_rate = load_dataset()
    run_experiment(original_results, true_win_rate, ["UniformUCBESample", 2])
    original_results, true_win_rate = load_dataset()
    run_experiment(original_results, true_win_rate, ["UniformUCBESample", 4])
    original_results, true_win_rate = load_dataset()
    run_experiment(original_results, true_win_rate, ["UniformUCBESample", 8])
    original_results, true_win_rate = load_dataset()
    run_experiment(original_results, true_win_rate, ["UniformUCBESample", 16])

    original_results, true_win_rate = load_dataset()
    run_experiment(original_results, true_win_rate, ["successiveRejects"])

    original_results, true_win_rate = load_dataset()
    run_experiment(original_results, true_win_rate, ["sequentialHalving"])
    original_results, true_win_rate = load_dataset()
    run_experiment(original_results, true_win_rate, ["anytimeSequentialHalving"])

    original_results, true_win_rate = load_dataset()
    run_experiment(original_results, true_win_rate, ["optimisticSampleWilson", 1])
    original_results, true_win_rate = load_dataset()
    run_experiment(original_results, true_win_rate, ["optimisticSampleWilson", 2])
    original_results, true_win_rate = load_dataset()
    run_experiment(original_results, true_win_rate, ["optimisticSampleWilson", 4])
    original_results, true_win_rate = load_dataset()
    run_experiment(original_results, true_win_rate, ["optimisticSampleWilson", 8])
    original_results, true_win_rate = load_dataset()
    run_experiment(original_results, true_win_rate, ["optimisticSampleWilson", 16])

In [None]:
main()