# ESP Experiment

This notebook defines an agent to play the ESP card game. It runs on Python 3.8.

In [None]:
#@title Imports
from typing import Optional
from __future__ import absolute_import, division, print_function
import os, sys, pdb, pickle
from multiprocessing import Pool
import time, math
from copy import deepcopy

import abc
import numpy as np
import random
import scipy
import pandas as pd
import plotnine as gg
import matplotlib.pyplot as plt
import pylab

colors = {0: 'b',
          1: 'g',
          2: 'r',
          3: 'c',
          4: 'm',
          5: 'y',
          6: 'k'}

# Interfaces, utilities and experiment constants

In [None]:
#@title Agent interface
class Agent(metaclass=abc.ABCMeta):

  @abc.abstractmethod
  def reset(self, rng: np.random.Generator, num_action: int) -> None:
    # Reset the agent.
    raise NotImplementedError
 
  @abc.abstractmethod
  def update(self, action: int, obs: np.ndarray) -> None:
    # Update agent state.
    raise NotImplementedError

  @abc.abstractmethod
  def select_action(self) -> int:
    # Select an action.
    raise NotImplementedError

In [None]:
#@title Experiment constants
TYPES = 3
NUM_CARDS = 3
initial = (3, 3, 3)

In [None]:
#@title Utilities for running the experiments
def random_argmax(rng, scores: np.ndarray):
  probs = (scores==scores.max()).astype(np.float32)
  probs /= probs.sum()
  return rng.choice(np.arange(scores.size), p=probs)

def run_ESP_experiment(
    num_sims: int, 
    agent,
    types_of_cards: int = TYPES,
    num_of_each_type: int = NUM_CARDS,
    plot_results=False):
  
  num_action = types_of_cards
  sims = range(num_sims)
  num_timesteps = types_of_cards * num_of_each_type
  timesteps = range(num_timesteps)
  actions = list(range(num_action))
  rewards = []

  action_count = []

  for sim in sims:
    rng = np.random.default_rng(sim)
    agent.reset(rng)
    true = [i for i in range(TYPES) for j in range(NUM_CARDS)]
    rng.shuffle(true)
    # TODO: Change regret calculation method
    action_count += [[num_timesteps*[0] for a in actions]]
    rewards += [num_timesteps * [0]]
    for timestep in timesteps:
      a = agent.select_action()
      obs = (true[timestep] == a)
      agent.update(a, obs)

      action_count[sim][a][timestep] = 1
      rewards[sim][timestep] += obs

    action_count1 = [[float(action_count[0][a][t]) 
    for t in range(num_timesteps)] for a in actions]
    rewards1 = sum(rewards[0])
  action_count = [[sum([float(action_count[sim][a][t]) 
    for sim in sims]) / num_sims 
    for t in range(num_timesteps)] for a in actions]
  expected_reward = sum(sum(rewards,[])) / num_sims

  if plot_results:
    # plot action frequencies averaged over simulations
    for a in actions:
      plt.plot(timesteps,
              pd.Series(action_count[a]).rolling(10, min_periods=1).mean(),
              colors[a], label='$a =$' + str(a+1))
    plt.axis([0,num_timesteps+1,0.0,1.01])
    plt.xlabel(r'time $t$', fontsize=20)
    plt.ylabel('$\mathbb{P}(A_t = a|\mathcal{E})$', fontsize=20)
    pylab.legend(loc='best')
    plt.show()

    # plot action counts over single simulation
    for a in actions:
      plt.plot(timesteps, np.cumsum(action_count1[a]), colors[a], label='$a =$' + str(a+1))
    plt.axis([0,num_timesteps+1,0.0,num_timesteps+1])
    plt.xlabel(r'time $t$', fontsize=20)
    plt.ylabel('$N_{t,a}$', fontsize=20)
    pylab.legend(loc='best')
    plt.show()

    # plot action counts averaged over simulations
    for a in actions:
      plt.plot(timesteps, np.cumsum(action_count[a]), colors[a], label='$a =$' + str(a+1))
    plt.axis([0,num_timesteps+1,0.0,num_timesteps+1])
    plt.xlabel(r'time $t$', fontsize=20)
    plt.ylabel('$\mathrm{E}[N_{t,a}|\mathcal{E}]$', fontsize=20)
    pylab.legend(loc='best')
    plt.show()

  return action_count, expected_reward

# Agents

In [None]:
#@title An agent that learns from observation
class LearnFromObsAgent(Agent):
  def __init__(self, compute_action, initial: tuple):
    self._compute_action = compute_action
    self._initial = initial
    self._states = {self._initial: 1}
    self._num_type  = len(initial)
    self._type_prob = np.ones(self._num_type) / self._num_type
    self._rng = None

  def reset(self, rng: np.random.Generator):
    self._rng = rng
    self._states = {self._initial: 1}
    self._type_prob = np.ones(self._num_type) / self._num_type
    
  def update(self, action: int, correct: bool):
    next_states = {}
    for comp, prob in self._states.items():
      if correct and comp[action]:
        posterior = (comp[action] / sum(comp)) * prob / self._type_prob[action]
        cl = list(comp)
        cl[action] -= 1
        cl = tuple(cl)
        if cl not in next_states: next_states[cl] = 0
        next_states[cl] += posterior
      if not correct and sum(comp) - comp[action]:
        posterior_0 = (1 - (comp[action] / sum(comp))) * prob / (1 - self._type_prob[action])
        for other in range(self._num_type):
          if other == action or not comp[other]: continue
          posterior = posterior_0 * comp[other] / (sum(comp) - comp[action])
          cl = list(comp)
          cl[other] -= 1
          cl = tuple(cl)
          if cl not in next_states: next_states[cl] = 0
          next_states[cl] += posterior 
    self._states = next_states
    # update type probs: there may be a faster incremental way
    self._type_prob = np.zeros(self._num_type)
    for comp, prob in self._states.items():
      ca = np.array(comp)
      if ca.sum() != 0:
        self._type_prob += ca / ca.sum() * prob

  def select_action(self):
    return self._compute_action(self._rng, self._type_prob)

In [None]:
#@title Greedy Action
def greedy_action(rng, type_prob):
  return random_argmax(rng, type_prob)

In [None]:
#@title TS Action
def ts_action(rng, type_prob):
  num_type = len(type_prob)
  return rng.choice(range(num_type), p=type_prob)

In [None]:
#@title IDS Action
def ids_action(rng, type_prob):
  num_type = len(type_prob)
  mutual_infos = [scipy.stats.entropy([1-p, p]) for p in type_prob]
  shortfall = 1 - type_prob
  minimum = float('inf')
  min_actions = [-1, -1]
  min_alpha = -1
  for a in range(num_type):
    func = (shortfall[a])**2 / (mutual_infos[a])
    if func < minimum:
      minimum, min_actions, min_alpha = func, [a, 0], 1
    for b in range(a, num_type):
      func = lambda x: (x * shortfall[a] + (1 - x) * shortfall[b])**2 / (x * mutual_infos[a] + (1 - x) * mutual_infos[b])
      res = scipy.optimize.minimize(func, 0, bounds=[(0, 1)])
      if res.fun < minimum:
        minimum, min_actions, min_alpha = res.fun, [a, b], res.x[0]
  if min_alpha == -1:
    return random_argmax(rng, type_prob)
  return rng.choice(min_actions, p=[min_alpha, 1 - min_alpha])


In [None]:
#@title Test evaluation
num_sims = 1000 # number of simulations over which to average

In [None]:
greedy_agent = LearnFromObsAgent(greedy_action, initial)

action_count_greedy, reward_greedy = run_ESP_experiment(
    num_sims,
    greedy_agent)

reward_greedy

4.221

In [None]:
ts_agent = LearnFromObsAgent(ts_action, initial)

action_count_ts, reward_ts = run_ESP_experiment(
    num_sims,
    ts_agent)
reward_ts

3.426

In [None]:
ids_agent = LearnFromObsAgent(ids_action, initial)

action_count_ids, reward_ids = run_ESP_experiment(
    num_sims,
    ids_agent)
reward_ids



4.273

In [None]:
#@title Expectation calculation
