This notebook shows how to use multi-armed bandits to build the agent to beat many (partially)deterministic agents.

For every public strategy (except random), it is easy to construct an algorithm that beats it. But you don't know in advance what method is used by the opponent.

The multi-armed bandit approach is a Reinforcement learning technic that allows us to select the most promising counter-strategy and use it to beat the opponent.

Multi-armed bandit is a widely used RL-algorithm because it is very balanced in terms of exploitation/exploration.

Algorithm logic:
- Create N agents with different logic
- At each step for each agent, generate a random number from B(a+1, b+1). B - beta-distribution, a - number of historical wins of this agent, b - number of this agent's historical losses.
- Select the agent with the largest generated number and use it to generate the next step.
- Repeat

In this notebook, I also show how to create the following types of agents:
- Mirror + shift. This class of agents takes the opponent's last step and return (step + shift) % 3. E.g., with shift=0 - it is a mirroring agent, with shift=1 agent that beats the opponent's previous hand.
- Self + shift. These agents add some shifts to their previous steps. With shift=0, it is a constant agent. With shift = 1 or 2, it just repeats all 3 possible steps in direct or reversed order.
- Popular beater - returns the step that beats the most popular action of the opponent so far.
- Anti-popular beater - return the step that beats the step that wins the most popular step )
- Transition matrix - computes the historical probability to go from one state to another (both for opponent and yourself) and returns the step that beats the most probable step of the opponent.
- Transition tensor - the same thing but considers both player's and opponent's previous steps. The last two agents have both deterministic and random versions and a decay option - higher weight for the more recent rounds.
- Pattern mathcing - is a development of the transition matrix idea that takes into account n last steps. It is inspired by https://www.kaggle.com/yegorbiryukov/rock-paper-scissors-with-memory-patterns.

In [1]:
%%writefile submission.py

import pandas as pd
import numpy as np
import json


# base class for all agents, random agent
class agent():
    def initial_step(self):
        return np.random.randint(3)
    
    def history_step(self, history):
        return np.random.randint(3)
    
    def step(self, history):
        if len(history) == 0:
            return int(self.initial_step())
        else:
            return int(self.history_step(history))
    
# agent that returns (previousCompetitorStep + shift) % 3
class mirror_shift(agent):
    def __init__(self, shift=0):
        self.shift = shift
    
    def history_step(self, history):
        return (history[-1]['competitorStep'] + self.shift) % 3
    
    
# agent that returns (previousPlayerStep + shift) % 3
class self_shift(agent):
    def __init__(self, shift=0):
        self.shift = shift
    
    def history_step(self, history):
        return (history[-1]['step'] + self.shift) % 3    


# agent that beats the most popular step of competitor
class popular_beater(agent):
    def history_step(self, history):
        counts = np.bincount([x['competitorStep'] for x in history])
        return (int(np.argmax(counts)) + 1) % 3

    
# agent that beats the agent that beats the most popular step of competitor
class anti_popular_beater(agent):
    def history_step(self, history):
        counts = np.bincount([x['step'] for x in history])
        return (int(np.argmax(counts)) + 2) % 3
    
    
# simple transition matrix: previous step -> next step
class transition_matrix(agent):
    def __init__(self, deterministic = False, counter_strategy = False, init_value = 0.1, decay = 1):
        self.deterministic = deterministic
        self.counter_strategy = counter_strategy
        if counter_strategy:
            self.step_type = 'step' 
        else:
            self.step_type = 'competitorStep'
        self.init_value = init_value
        self.decay = decay
        
    def history_step(self, history):
        matrix = np.zeros((3,3)) + self.init_value
        for i in range(len(history) - 1):
            matrix = (matrix - self.init_value) / self.decay + self.init_value
            matrix[int(history[i][self.step_type]), int(history[i+1][self.step_type])] += 1

        if  self.deterministic:
            step = np.argmax(matrix[int(history[-1][self.step_type])])
        else:
            step = np.random.choice([0,1,2], p = matrix[int(history[-1][self.step_type])]/matrix[int(history[-1][self.step_type])].sum())
        
        if self.counter_strategy:
            # we predict our step using transition matrix (as competitor can do) and beat probable competitor step
            return (step + 2) % 3 
        else:
            # we just predict competitors step and beat it
            return (step + 1) % 3
    

# similar to the transition matrix but rely on both previous steps
class transition_tensor(agent):
    
    def __init__(self, deterministic = False, counter_strategy = False, init_value = 0.1, decay = 1):
        self.deterministic = deterministic
        self.counter_strategy = counter_strategy
        if counter_strategy:
            self.step_type1 = 'step' 
            self.step_type2 = 'competitorStep'
        else:
            self.step_type2 = 'step' 
            self.step_type1 = 'competitorStep'
        self.init_value = init_value
        self.decay = decay
        
    def history_step(self, history):
        matrix = np.zeros((3,3, 3)) + 0.1
        for i in range(len(history) - 1):
            matrix = (matrix - self.init_value) / self.decay + self.init_value
            matrix[int(history[i][self.step_type1]), int(history[i][self.step_type2]), int(history[i+1][self.step_type1])] += 1

        if  self.deterministic:
            step = np.argmax(matrix[int(history[-1][self.step_type1]), int(history[-1][self.step_type2])])
        else:
            step = np.random.choice([0,1,2], p = matrix[int(history[-1][self.step_type1]), int(history[-1][self.step_type2])]/matrix[int(history[-1][self.step_type1]), int(history[-1][self.step_type2])].sum())
        
        if self.counter_strategy:
            # we predict our step using transition matrix (as competitor can do) and beat probable competitor step
            return (step + 2) % 3 
        else:
            # we just predict competitors step and beat it
            return (step + 1) % 3

        
# looks for the same pattern in history and returns the best answer to the most possible counter strategy
class pattern_matching(agent):
    def __init__(self, steps = 3, deterministic = False, counter_strategy = False, init_value = 0.1, decay = 1):
        self.deterministic = deterministic
        self.counter_strategy = counter_strategy
        if counter_strategy:
            self.step_type = 'step' 
        else:
            self.step_type = 'competitorStep'
        self.init_value = init_value
        self.decay = decay
        self.steps = steps
        
    def history_step(self, history):
        if len(history) < self.steps + 1:
            return self.initial_step()
        
        next_step_count = np.zeros(3) + self.init_value
        pattern = [history[i][self.step_type] for i in range(- self.steps, 0)]
        
        for i in range(len(history) - self.steps):
            next_step_count = (next_step_count - self.init_value)/self.decay + self.init_value
            current_pattern = [history[j][self.step_type] for j in range(i, i + self.steps)]
            if np.sum([pattern[j] == current_pattern[j] for j in range(self.steps)]) == self.steps:
                next_step_count[history[i + self.steps][self.step_type]] += 1
        
        if next_step_count.max() == self.init_value:
            return self.initial_step()
        
        if  self.deterministic:
            step = np.argmax(next_step_count)
        else:
            step = np.random.choice([0,1,2], p = next_step_count/next_step_count.sum())
        
        if self.counter_strategy:
            # we predict our step using transition matrix (as competitor can do) and beat probable competitor step
            return (step + 2) % 3 
        else:
            # we just predict competitors step and beat it
            return (step + 1) % 3
        
# if we add all agents the algorithm will spend more that 1 second on turn and will be invalidated
# right now the agens are non optimal and the same computeations are repeated a lot of times
# the approach can be optimised to run much faster
agents = {
    'mirror_0': mirror_shift(0),
    'mirror_1': mirror_shift(1),  
    'mirror_2': mirror_shift(2),
    'self_0': self_shift(0),
    'self_1': self_shift(1),  
    'self_2': self_shift(2),
    'popular_beater': popular_beater(),
    'anti_popular_beater': anti_popular_beater(),
    'random_transitison_matrix': transition_matrix(False, False),
    'determenistic_transitison_matrix': transition_matrix(True, False),
    'random_self_trans_matrix': transition_matrix(False, True),
    'determenistic_self_trans_matrix': transition_matrix(True, True),
    'random_transitison_tensor': transition_tensor(False, False),
    'determenistic_transitison_tensor': transition_tensor(True, False),
    'random_self_trans_tensor': transition_tensor(False, True),
    'determenistic_self_trans_tensor': transition_tensor(True, True),
    
    'random_transitison_matrix_decay': transition_matrix(False, False, decay = 1.05),
    'random_self_trans_matrix_decay': transition_matrix(False, True, decay = 1.05),
    'random_transitison_tensor_decay': transition_tensor(False, False, decay = 1.05),
    'random_self_trans_tensor_decay': transition_tensor(False, True, decay = 1.05),
    
    'determenistic_transitison_matrix_decay': transition_matrix(True, False, decay = 1.05),
    'determenistic_self_trans_matrix_decay': transition_matrix(True, True, decay = 1.05),
    'determenistic_transitison_tensor_decay': transition_tensor(True, False, decay = 1.05),
    'determenistic_self_trans_tensor_decay': transition_tensor(True, True, decay = 1.05),
    
#     'random_transitison_matrix_decay2': transition_matrix(False, False, decay = 1.001),
#     'random_self_trans_matrix_decay2': transition_matrix(False, True, decay = 1.001),
#     'random_transitison_tensor_decay2': transition_tensor(False, False, decay = 1.001),
#     'random_self_trans_tensor_decay2': transition_tensor(False, True, decay = 1.001),
    
#     'determenistic_transitison_matrix_decay2': transition_matrix(True, False, decay = 1.001),
#     'determenistic_self_trans_matrix_decay2': transition_matrix(True, True, decay = 1.001),
#     'determenistic_transitison_tensor_decay2': transition_tensor(True, False, decay = 1.001),
#     'determenistic_self_trans_tensor_decay2': transition_tensor(True, True, decay = 1.001),
    
#     'random_pattern_matching_decay_1': pattern_matching(1, False, False, decay = 1.001),
#     'random_self_pattern_matching_decay_1': pattern_matching(1, False, True, decay = 1.001),
#     'determenistic_pattern_matching_decay_1': pattern_matching(1, True, False, decay = 1.001),
#     'determenistic_self_pattern_matching_decay_1': pattern_matching(1, True, True, decay = 1.001),
    
#     'random_pattern_matching_decay_2': pattern_matching(2, False, False, decay = 1.001),
#     'random_self_pattern_matching_decay_2': pattern_matching(2, False, True, decay = 1.001),
#     'determenistic_pattern_matching_decay_2': pattern_matching(2, True, False, decay = 1.001),
#     'determenistic_self_pattern_matching_decay_2': pattern_matching(2, True, True, decay = 1.001),
    
#     'random_pattern_matching_decay_3': pattern_matching(3, False, False, decay = 1.001),
#     'random_self_pattern_matching_decay_3': pattern_matching(3, False, True, decay = 1.001),
    'determenistic_pattern_matching_decay_3': pattern_matching(3, True, False, decay = 1.001),
    'determenistic_self_pattern_matching_decay_3': pattern_matching(3, True, True, decay = 1.001),
    
#     'random_pattern_matching_decay_4': pattern_matching(4, False, False, decay = 1.001),
#     'random_self_pattern_matching_decay_4': pattern_matching(4, False, True, decay = 1.001),
#     'determenistic_pattern_matching_decay_4': pattern_matching(4, True, False, decay = 1.001),
#     'determenistic_self_pattern_matching_decay_4': pattern_matching(4, True, True, decay = 1.001),
    
#     'random_pattern_matching_decay_5': pattern_matching(5, False, False, decay = 1.001),
#     'random_self_pattern_matching_decay_5': pattern_matching(5, False, True, decay = 1.001),
#     'determenistic_pattern_matching_decay_5': pattern_matching(5, True, False, decay = 1.001),
#     'determenistic_self_pattern_matching_decay_5': pattern_matching(5, True, True, decay = 1.001),
    
#     'random_pattern_matching_decay_6': pattern_matching(6, False, False, decay = 1.001),
#     'random_self_pattern_matching_decay_6': pattern_matching(6, False, True, decay = 1.001),
#     'determenistic_pattern_matching_decay_6': pattern_matching(6, True, False, decay = 1.001),
#     'determenistic_self_pattern_matching_decay_6': pattern_matching(6, True, True, decay = 1.001),
}

history = []
bandit_state = {k:[1,1] for k in agents.keys()}
    
def multi_armed_bandit_agent (observation, configuration):
    
    # bandits' params
    step_size = 3 # how much we increase a and b 
    decay_rate = 1.05 # how much do we decay old historical data
    
    global history, bandit_state
    
    def log_step(step = None, history = None, agent = None, competitorStep = None, file = 'history.csv'):
        if step is None:
            step = np.random.randint(3)
        if history is None:
            history = []
        history.append({'step': step, 'competitorStep': competitorStep, 'agent': agent})
        if file is not None:
            pd.DataFrame(history).to_csv(file, index = False)
        return step
    
    def update_competitor_step(history, competitorStep):
        history[-1]['competitorStep'] = int(competitorStep)
        return history
    
    # load history
    if observation.step == 0:
        pass
    else:
        history = update_competitor_step(history, observation.lastOpponentAction)
        
        # updating bandit_state using the result of the previous step
        # we can update all states even those that were not used
        for name, agent in agents.items():
            agent_step = agent.step(history[:-1])
            bandit_state[name][1] = (bandit_state[name][1] - 1) / decay_rate + 1
            bandit_state[name][0] = (bandit_state[name][0] - 1) / decay_rate + 1
            
            if (history[-1]['competitorStep'] - agent_step) % 3 == 1:
                bandit_state[name][1] += step_size
            elif (history[-1]['competitorStep'] - agent_step) % 3 == 2:
                bandit_state[name][0] += step_size
            else:
                bandit_state[name][0] += step_size/2
                bandit_state[name][1] += step_size/2
            
    # we can use it for analysis later
    with open('bandit.json', 'w') as outfile:
        json.dump(bandit_state, outfile)
    
    
    # generate random number from Beta distribution for each agent and select the most lucky one
    best_proba = -1
    best_agent = None
    for k in bandit_state.keys():
        proba = np.random.beta(bandit_state[k][0],bandit_state[k][1])
        if proba > best_proba:
            best_proba = proba
            best_agent = k
        
    step = agents[best_agent].step(history)
    
    return log_step(step, history, best_agent)

Writing submission.py


Let's beat the mirroring agent using our solutioin:

In [2]:
%%writefile copy_opponent.py
    
def copy_opponent_agent(observation, configuration):
    if observation.step > 0:
        return observation.lastOpponentAction
    else:
        return 0

Writing copy_opponent.py


In [3]:
from kaggle_environments import evaluate, make, utils
env = make("rps", debug=True)

Failed: football: No module named 'gfootball'


In [4]:
env.reset()
env.run(["submission.py", "copy_opponent.py"])
env.render(mode="ipython", width=800, height=700)

Let's look at the final state of the bandit.

In [5]:
import json

In [6]:
with open('bandit.json') as json_file:
    bandit_state = json.load(json_file)
bandit_state

{'mirror_0': [1.2744565555572842, 63.7255434431246],
 'mirror_1': [32.49557328062912, 32.504426718052784],
 'mirror_2': [63.72997016177055, 1.2700298369113396],
 'self_0': [32.49999999930799, 32.4999999993739],
 'self_1': [63.99999999861599, 1.000000000065903],
 'self_2': [1.0000000000329512, 63.99999999864894],
 'popular_beater': [32.523356952652385, 32.4766430460295],
 'anti_popular_beater': [31.564347218355238, 33.43565278032666],
 'random_transitison_matrix': [63.61624415073981, 1.3837558479420757],
 'determenistic_transitison_matrix': [63.729970161509165, 1.27002983717273],
 'random_self_trans_matrix': [32.078195145703184, 32.921804852978724],
 'determenistic_self_trans_matrix': [32.4999999994335, 32.499999999248395],
 'random_transitison_tensor': [63.99999521224971, 1.0000047864321873],
 'determenistic_transitison_tensor': [63.822931253057426, 1.1770687456244762],
 'random_self_trans_tensor': [33.2840585157656, 31.715941482916303],
 'determenistic_self_trans_tensor': [32.40703890

We can see that self_1 was applied much often than other strategies, and it is a clear winner. It is a counter-strategy for the simple mirroring strategy. That is why a multi-armed bandit algorithm used it much more often than any other agent.

Let's now create an agent acting as the self_1 agent (counter strategy for the mirroring algorithm https://www.kaggle.com/ilialar/beating-mirror-agent) and compare it to our bandit. 

In [7]:
%%writefile anti_mirror.py

import pandas as pd

history = []
    
def anti_copy_opponent_agent (observation, configuration):
    
    global history
    
    if observation.step == 0:
        history.append({'step': 1, 'competitorStep': None})
        return 1
    else:
        history[-1]['competitorStep'] = observation.lastOpponentAction
        step = (history[-1]['step'] + 1) % 3
        history.append({'step': step, 'competitorStep': None})
        return step

Writing anti_mirror.py


In [8]:
env.reset()
env.run(["submission.py", "anti_mirror.py"])
env.render(mode="ipython", width=800, height=700)

In [9]:
with open('bandit.json') as json_file:
    bandit_state = json.load(json_file)
bandit_state

{'mirror_0': [1.0, 63.99999999847413],
 'mirror_1': [32.49999999919892, 32.499999999275204],
 'mirror_2': [63.99999999843599, 1.0000000000381453],
 'self_0': [32.49999999927742, 32.49999999919673],
 'self_1': [63.99999999826097, 1.0000000002131806],
 'self_2': [1.0000000002109728, 63.99999999826317],
 'popular_beater': [21.48374306060468, 43.516256937869464],
 'anti_popular_beater': [31.475812846130527, 33.5241871523436],
 'random_transitison_matrix': [63.98230054341219, 1.0176994550619507],
 'determenistic_transitison_matrix': [63.99999999827358, 1.0000000002005538],
 'random_self_trans_matrix': [31.778472287817117, 33.221527710657],
 'determenistic_self_trans_matrix': [32.49999999904449, 32.49999999942967],
 'random_transitison_tensor': [63.99999903016218, 1.0000009683119626],
 'determenistic_transitison_tensor': [63.99999999816788, 1.000000000306259],
 'random_self_trans_tensor': [32.47824173673718, 32.521758261736935],
 'determenistic_self_trans_tensor': [32.499999999185256, 32.499

Our bandit is also the winner, but it mostly used the mirror_2 strategy that is the optimal counter-strategy to the self_1 agent.

And finally let's try to beat a strong public baseline (https://www.kaggle.com/yegorbiryukov/rock-paper-scissors-with-memory-patterns)

In [10]:
%%writefile pattern_matching.py

import random


# current memory of the agent
current_memory = []
# maximum steps in the pattern
steps_max = 6
# minimum steps in the pattern
steps_min = 3
# memory length of patterns in first group
# steps_max is multiplied by 2 to consider both my_agent's and opponent's actions
group_memory_length = steps_max * 2
# list of groups of memory patterns
groups_of_memory_patterns = []
for i in range(steps_max, steps_min - 1, -1):
    groups_of_memory_patterns.append({
        # how many steps in a row are in the pattern
        "memory_length": group_memory_length,
        # list of memory patterns
        "memory_patterns": []
    })
    group_memory_length -= 2

    
def find_pattern(memory_patterns, memory, memory_length):
    """ find appropriate pattern in memory """
    for pattern in memory_patterns:
        actions_matched = 0
        for i in range(memory_length):
            if pattern["actions"][i] == memory[i]:
                actions_matched += 1
            else:
                break
        # if memory fits this pattern
        if actions_matched == memory_length:
            return pattern
    # appropriate pattern not found
    return None

def my_agent(obs, conf):
    """ your ad here """
    # action of my_agent
    my_action = None
    # if it's not first step, add opponent's last action to agent's current memory
    if obs["step"] > 0:
        current_memory.append(obs["lastOpponentAction"])
    for group in groups_of_memory_patterns:
        # if length of current memory is bigger than necessary for a new memory pattern
        if len(current_memory) > group["memory_length"]:
            # get momory of the previous step
            previous_step_memory = current_memory[:group["memory_length"]]
            previous_pattern = find_pattern(group["memory_patterns"], previous_step_memory, group["memory_length"])
            if previous_pattern == None:
                previous_pattern = {
                    "actions": previous_step_memory.copy(),
                    "opp_next_actions": [
                        {"action": 0, "amount": 0, "response": 1},
                        {"action": 1, "amount": 0, "response": 2},
                        {"action": 2, "amount": 0, "response": 0}
                    ]
                }
                group["memory_patterns"].append(previous_pattern)
            for action in previous_pattern["opp_next_actions"]:
                if action["action"] == obs["lastOpponentAction"]:
                    action["amount"] += 1
            # delete first two elements in current memory (actions of the oldest step in current memory)
            del current_memory[:2]
            # if action was not yet found
            if my_action == None:
                pattern = find_pattern(group["memory_patterns"], current_memory, group["memory_length"])
                # if appropriate pattern is found
                if pattern != None:
                    my_action_amount = 0
                    for action in pattern["opp_next_actions"]:
                        # if this opponent's action occurred more times than currently chosen action
                        # or, if it occured the same amount of times and this one is choosen randomly among them
                        if (action["amount"] > my_action_amount or
                                (action["amount"] == my_action_amount and random.random() > 0.5)):
                            my_action_amount = action["amount"]
                            my_action = action["response"]
    # if no action is found
    if my_action == None:
        my_action = random.randint(0, 2)
    current_memory.append(my_action)
    return my_action

Writing pattern_matching.py


In [11]:
env.reset()
env.run(["submission.py", "pattern_matching.py"])
env.render(mode="ipython", width=800, height=700)

In [12]:
with open('bandit.json') as json_file:
    bandit_state = json.load(json_file)
bandit_state

{'mirror_0': [29.18232838907542, 35.81767161092453],
 'mirror_1': [32.573811201921146, 32.4261887980788],
 'mirror_2': [35.74386040900333, 29.256139590996604],
 'self_0': [27.988855381283162, 37.011144618716784],
 'self_1': [32.83406828434533, 32.165931715654644],
 'self_2': [36.67707633437145, 28.322923665628498],
 'popular_beater': [29.541605260278786, 35.45839473972115],
 'anti_popular_beater': [29.67623034042136, 35.323769659578566],
 'random_transitison_matrix': [39.671488945719666, 25.32851105428026],
 'determenistic_transitison_matrix': [32.52895856419417, 32.47104143580579],
 'random_self_trans_matrix': [35.5798555349535, 29.420144465046437],
 'determenistic_self_trans_matrix': [31.468416741642525, 33.531583258357415],
 'random_transitison_tensor': [26.100482560320785, 38.899517439679165],
 'determenistic_transitison_tensor': [32.79788023653725, 32.2021197634627],
 'random_self_trans_tensor': [40.36423630606385, 24.63576369393609],
 'determenistic_self_trans_tensor': [28.150536

It is much harder but our agent wins.

Validation run:

In [13]:
env.reset()
env.run(["submission.py", "submission.py"])
env.render(mode="ipython", width=800, height=700)

You can add your algorithms to the `agents` dict. If one of your agents works well against the current opponent, a multi-armed bandit will find it out and use it to win.