# Imports

In [1]:
from scipy.stats import beta
from itertools import combinations, combinations_with_replacement
from collections import defaultdict

# Definitions

In [2]:
def get_beta_mean(obs_pos, obs_n):
    return beta.mean(obs_pos + 1, obs_n - obs_pos + 1)

# Recursive AB

## Notes

This notebook aims to quantify the value of exploration vs exploitation in the AB testing context:
 - It uses a recursive approach inspired by the one used to solve the mirror trader problem
 - The value of a test is in its ability to generate information that will change a decision
 
Initial assumptions:
 - There are two choices of coin to flip (A and B)
 - These coins have fixed inherent p(heads)

Minimal scenario specification:
 - The number of successes and failures of coin A
 - The number of successes and failures of coin B
 - The number of tosses remaining
 - i.e. this is a five-dimensional state space - could get tricky quickly...
 - can use symmetry to shrink the problem a bit maybe (combinations not permutations)
 
Objective = maximise expected return

Recursive strategy:
 - start from ((0,0), (0,0), 1)
 - then all possible states of 2-flip game, then 3-flip game etc
 
Some heuristics
 - If coins have identical histories, pick A
 - If coins have same expectation, pick one with fewer throws

## Constants

In [3]:
MAX_GAME_LENGTH = 40

## Enumerate possible states for a given length of history

In [4]:
%%time
max_flips = MAX_GAME_LENGTH - 1

def get_scores(N):
    scores = []
    for n in range(N+1):
        scores.append((n,N))
    return scores

scores_dict = {}
for flips_so_far in range(max_flips + 1):
    scores_dict[flips_so_far] = get_scores(flips_so_far)
    
shares = defaultdict(list)
for flips_so_far in range(max_flips + 1):
    for flips_A in range(flips_so_far // 2 + 1):
        shares[flips_so_far].append((flips_A, flips_so_far-flips_A))

shares_with_results = defaultdict(set)
for flips_so_far in range(max_flips + 1):
    for s in shares[flips_so_far]:
        shares_with_results[flips_so_far].update(set([
            tuple(sorted((a,b)))
            for a in scores_dict[s[0]]
            for b in scores_dict[s[1]]
        ]))
    shares_with_results[flips_so_far] = sorted(shares_with_results[flips_so_far],
                                               key=lambda x: (x[0][1], x[0][0], x[1][0]))

CPU times: user 115 ms, sys: 3.64 ms, total: 119 ms
Wall time: 140 ms


In [5]:
# shares_with_results[2]

## For a given length game, enumerate all possible game states

In [6]:
%%time
game_states = defaultdict(list)
max_game_length = MAX_GAME_LENGTH

for game_length in range(1, max_game_length + 1):
    for flips_so_far in range(game_length):
        game_states[game_length] += [(tup, game_length - flips_so_far)
                                     for tup in shares_with_results[flips_so_far]]

CPU times: user 86.2 ms, sys: 15.7 ms, total: 102 ms
Wall time: 113 ms


In [7]:
# game_states[3]

## Evaluate game states

In [8]:
def tuple_add(tup1, tup2):
    assert len(tup1) == len(tup2)
    return tuple([tup1[i] + tup2[i] for i in range(len(tup1))])

In [9]:
def update_state(game_state, coin, result):
    assert coin in ('A', 'B')
    assert result in (0, 1)
    (coin_A_history, coin_B_history), flips_left = game_state
    update = (result, 1)
    if coin == 'A':
        coin_A_history = tuple_add(coin_A_history, update)
    if coin == 'B':
        coin_B_history = tuple_add(coin_B_history, update)
    return (tuple(sorted((coin_A_history, coin_B_history))), flips_left - 1)

In [10]:
def evaluate(game_state, value_dict):
    (coin_A_history, coin_B_history), flips_left = game_state
    
    p_A = get_beta_mean(*coin_A_history)
    p_B = get_beta_mean(*coin_B_history)
    
    if flips_left == 1:
        value = max(p_A, p_B)
        choice = 'A' if p_A > p_B else 'B' if p_B > p_A else 'either'
    else:
        coin_A_value = (p_A
                        + p_A * value_dict[update_state(game_state, 'A', 1)][0]
                        + (1 - p_A) * value_dict[update_state(game_state, 'A', 0)][0])
        coin_B_value = (p_B
                        + p_B * value_dict[update_state(game_state, 'B', 1)][0]
                        + (1 - p_B) * value_dict[update_state(game_state, 'B', 0)][0])
        value = max(coin_A_value, coin_B_value)
        choice = 'A' if coin_A_value > coin_B_value else 'B' if coin_B_value > coin_A_value else 'either'
        
    return value, choice

In [11]:
%%time
value_dict = {}
for game_length in range(1, MAX_GAME_LENGTH + 1):
    for game_state in sorted(game_states[game_length], key=lambda x: x[-1]):
        value_dict[game_state] = evaluate(game_state, value_dict)

CPU times: user 2min 51s, sys: 495 ms, total: 2min 52s
Wall time: 2min 53s


In [12]:
[(k, len(game_states[k])) for k in game_states.keys()]

[(1, 1),
 (2, 3),
 (3, 9),
 (4, 19),
 (5, 38),
 (6, 66),
 (7, 110),
 (8, 170),
 (9, 255),
 (10, 365),
 (11, 511),
 (12, 693),
 (13, 924),
 (14, 1204),
 (15, 1548),
 (16, 1956),
 (17, 2445),
 (18, 3015),
 (19, 3685),
 (20, 4455),
 (21, 5346),
 (22, 6358),
 (23, 7514),
 (24, 8814),
 (25, 10283),
 (26, 11921),
 (27, 13755),
 (28, 15785),
 (29, 18040),
 (30, 20520),
 (31, 23256),
 (32, 26248),
 (33, 29529),
 (34, 33099),
 (35, 36993),
 (36, 41211),
 (37, 45790),
 (38, 50730),
 (39, 56070),
 (40, 61810)]

In [30]:
value_dict[(((3, 5), (15, 23)), 12)]

(7.770686872480059, 'A')

In [15]:
len(value_dict)

544544

In [31]:
def choose_lower_expectation(game_state):
    (coin_A_history, coin_B_history), flips_left = game_state
    
    p_A = get_beta_mean(*coin_A_history)
    p_B = get_beta_mean(*coin_B_history)
    
    lower_expectation_choice = 'A' if p_A < p_B else 'B'
    
    return lower_expectation_choice == value_dict[game_state][1]

In [55]:
choose_lower_expectation((((1, 2), (4, 7)), 12))

True

In [57]:
[game_state for game_state in game_states[16] if choose_lower_expectation(game_state)]

[(((0, 0), (2, 3)), 13),
 (((0, 0), (3, 5)), 11),
 (((0, 0), (4, 6)), 10),
 (((1, 1), (4, 5)), 10),
 (((0, 0), (4, 7)), 9),
 (((0, 1), (2, 6)), 9),
 (((0, 2), (1, 5)), 9),
 (((0, 0), (5, 8)), 8),
 (((0, 0), (5, 9)), 7),
 (((0, 0), (6, 9)), 7),
 (((0, 1), (3, 8)), 7),
 (((1, 1), (6, 8)), 7),
 (((1, 2), (4, 7)), 7),
 (((2, 2), (6, 7)), 7),
 (((2, 3), (4, 6)), 7),
 (((0, 0), (6, 10)), 6),
 (((0, 1), (3, 9)), 6),
 (((1, 1), (7, 9)), 6),
 (((0, 2), (2, 8)), 6),
 (((0, 3), (1, 7)), 6),
 (((0, 0), (6, 11)), 5),
 (((0, 0), (7, 11)), 5),
 (((0, 1), (4, 10)), 5),
 (((0, 2), (2, 9)), 5),
 (((1, 2), (5, 9)), 5),
 (((0, 0), (7, 12)), 4),
 (((0, 1), (4, 11)), 4),
 (((1, 1), (8, 11)), 4),
 (((3, 3), (8, 9)), 4),
 (((0, 0), (7, 13)), 3),
 (((0, 1), (4, 12)), 3),
 (((1, 1), (9, 12)), 3),
 (((1, 2), (6, 11)), 3),
 (((2, 2), (9, 11)), 3),
 (((1, 3), (4, 10)), 3),
 (((0, 4), (1, 9)), 3),
 (((2, 3), (7, 11)), 2)]

In [49]:
[(game_length, len(game_states[game_length]), len(
    [game_state for game_state in game_states[game_length] if choose_lower_expectation(game_state)]
)) for game_length in range(41)]

[(0, 0, 0),
 (1, 1, 0),
 (2, 3, 0),
 (3, 9, 0),
 (4, 19, 0),
 (5, 38, 0),
 (6, 66, 0),
 (7, 110, 0),
 (8, 170, 0),
 (9, 255, 1),
 (10, 365, 3),
 (11, 511, 6),
 (12, 693, 9),
 (13, 924, 17),
 (14, 1204, 19),
 (15, 1548, 29),
 (16, 1956, 37),
 (17, 2445, 50),
 (18, 3015, 65),
 (19, 3685, 81),
 (20, 4455, 106),
 (21, 5346, 131),
 (22, 6358, 154),
 (23, 7514, 189),
 (24, 8814, 221),
 (25, 10283, 262),
 (26, 11921, 305),
 (27, 13755, 363),
 (28, 15785, 418),
 (29, 18040, 477),
 (30, 20520, 546),
 (31, 23256, 624),
 (32, 26248, 714),
 (33, 29529, 807),
 (34, 33099, 911),
 (35, 36993, 1017),
 (36, 41211, 1140),
 (37, 45790, 1274),
 (38, 50730, 1400),
 (39, 56070, 1534),
 (40, 61810, 1704)]

In [43]:
value_dict[(((1, 1), (4, 5)), 9)]

(6.770926097711813, 'A')

In [14]:
break

SyntaxError: 'break' outside loop (<ipython-input-14-6aaf1f276005>, line 4)

# Mirror trader

In [None]:
days = 200

In [None]:
%%time
expectations = {(obs_pos, obs_n): get_beta_mean(obs_pos, obs_n)
                for obs_n in range(days)
                for obs_pos in range(int((obs_n + 1)/2), obs_n + 1)}

In [None]:
def calculate_pv(obs_pos, obs_n, n_to_go, pv_lookup, exp_p_lookup):
    if n_to_go == 1:
        return max(2 * exp_p_lookup.get((obs_pos, obs_n), 0) - 1, 0)
    elif obs_n > (2 * obs_pos):
        return pv_lookup[(0, 0, n_to_go)]
    else:
        p_success = exp_p_lookup.get((obs_pos, obs_n))
        return max(pv_lookup.get((0, 0, n_to_go), 0),
                   (p_success * (1 + pv_lookup[(obs_pos + 1, obs_n + 1, n_to_go - 1)])
                    + (1 - p_success) * (-1 + pv_lookup[(obs_pos, obs_n + 1, n_to_go - 1)])))

In [None]:
def bet_against_pv(obs_pos, obs_n, n_to_go, pv_lookup, exp_p_lookup):
    obs_pos = max(obs_pos, obs_n - obs_pos)
    if n_to_go == 1:
        return 2 * exp_p_lookup.get((obs_pos, obs_n), 0) - 1
    else:
        p_success = exp_p_lookup.get((obs_pos, obs_n))
        return max(pv_lookup.get((0, 0, n_to_go), 0),
                   (p_success * (1 + pv_lookup[(obs_pos + 1, obs_n + 1, n_to_go - 1)])
                    + (1 - p_success) * (-1 + pv_lookup[(obs_pos, obs_n + 1, n_to_go - 1)])))

In [None]:
expectations[(5,10)]

In [None]:
%%time
pv_lookup = {}
for N in range(1,days+1):
    for n_to_go in range(1, N+1):
        n_obs = N - n_to_go
        for n_pos in range(n_obs+1):
            pv_lookup[(n_pos, n_obs, n_to_go)] = bet_against_pv(n_pos, n_obs, n_to_go, pv_lookup, expectations)

In [None]:
for i in range(20,201,20):
    print(f'{i} days: Optimal return per day: {pv_lookup[(0,0,i)] / i:.3f}')

# Appendix

In [None]:
import pandas as pd
from scipy.stats import binom

In [None]:
def get_likelihood(obs_pos, obs_n, resolution=200):
    likelihood = pd.DataFrame()
    likelihood['prob'] = np.linspace(0, 1, (resolution * 2 + 1))[1:-1:2]
    likelihood['ldf'] = likelihood['prob'].apply(lambda p: binom.pmf(obs_pos, obs_n, p))
    likelihood['ldf'] /= likelihood['ldf'].mean()
    likelihood['lmf'] = likelihood['ldf'] / likelihood['ldf'].sum()
    likelihood['expectation'] = likelihood['prob'] * likelihood['lmf']
    return likelihood

In [None]:
def get_expectation(obs_pos, obs_n, resolution=200):
    return get_likelihood(obs_pos, obs_n, resolution)['expectation'].sum()

In [None]:
# TODO: enable big numbers
def prob_success(obs_pos, obs_n):
    top = bottom = 0
    pascal_coeffs = pascal(obs_n - obs_pos + 1, kind='lower', exact=True)[-1]
    pascal_coeffs = pascal_coeffs * [(-1)**n for n in range(len(pascal_coeffs))]
    integrated_exponent = obs_pos + 1
    for i, coeff in enumerate(pascal_coeffs):
        top += coeff / (integrated_exponent + 1)
        bottom += coeff / integrated_exponent
        exponent += 1
    return top / bottom

### Try only calculating beta_mean on demand (this was much slower)

In [None]:
# def get_beta_mean(obs_pos, obs_n, lookup=None):
#     beta_mean = beta.mean(obs_pos + 1, obs_n - obs_pos + 1)
#     if lookup:
#         lookup[(obs_pos, obs_n)] = beta_mean
#     return beta_mean

In [None]:
# def calculate_pv(obs_pos, obs_n, n_to_go, pv_lookup, exp_p_lookup):
#     if n_to_go == 1:
#         return max(2 * exp_p_lookup.get((obs_pos, obs_n), get_beta_mean(obs_pos, obs_n, exp_p_lookup)) - 1, 0)
#     elif obs_n > (2 * obs_pos):
#         return pv_lookup[(0, 0, n_to_go)]
#     else:
#         p_success = exp_p_lookup.get((obs_pos, obs_n), get_beta_mean(obs_pos, obs_n, exp_p_lookup))
#         return max(pv_lookup.get((0, 0, n_to_go), 0),
#                    (p_success * (1 + pv_lookup[(obs_pos + 1, obs_n + 1, n_to_go - 1)])
#                     + (1 - p_success) * (-1 + pv_lookup[(obs_pos, obs_n + 1, n_to_go - 1)])))