In [76]:
import numpy as np
from scipy.stats import distributions as dist
import matplotlib.pyplot as plt
import random

## 17.3 HW Problems 13-16

### Problem 13

In [48]:
"""
Write a program that performs A/B testing  Have each arm tested m times to 
estimate each (theta_1, ..., theta_n) with the MLE estimator.  Then choose
the largest theta_i and use the remaining N - nm pulls (where N is the total 
number of pulls) to try to maximize the average payoff.  Compare average payout 
with Thompson sampling in Algorithm 17.1
"""

def ab_test(thetas, rewards, N, m):
    n = len(thetas)

    # Calculate estimated thetas using samples
    est_thetas = np.array([np.sum(np.random.binomial(1, theta, m)) / m for theta in thetas])

    # Calculate expected values
    expected_vals = est_thetas * rewards
    max_index = np.argmax(expected_vals)
    if type(max_index) is np.ndarray:
        max_index = max_index[0]

    # Find and return best payout
    payout = np.sum(np.random.binomial(1, est_thetas[max_index], N - n*m)) * rewards[max_index] / N
    return payout


def thompson_sampling(theta, rewards, N):
    """
    Thompson sample to choose the arm, then simulate a pull, then update.  Repeat
    N times.

    theta : array of true probabilities for the arms
    N     : total number of pulls to make

    Return percentage of successes up to each time step
    """

    # Initialize
    n = len(theta)      # Number of arms
    a = np.ones(n)      # Initial 'a' hyperparameters
    b = np.ones(n)      # Initial 'b' hyperparameters
    X = np.random.random(N) # Draw from [0,1] to simulate pulls
    traj = np.zeros(N)      # Initial trajectory

    for k in range(N):
        draw = dist.beta.rvs(a,b)   # Thompson sample for all arms
        index = np.argmax(draw * rewards)     # Identify arm to pull
        if X[k] <= theta[index]:    # If pull is a success
            a[index] += 1   # Update posterior with success
            traj[k] = traj[k-1] + 1 # Update trajectory
        else:
            b[index] += 1           # Update posterior with failure
            traj[k] = traj[k-1]         # Update trajectory
    return traj / np.arange(1, N+1)     # Percentage successes



In [68]:
thetas = np.array([0.3, 0.45, 0.23, 0.4])
rewards = np.array([1, 1, 1, 1])
N = 100
m = 12

print(f'A/B Testing average payout over 500 runs: {np.mean(np.array([ab_test(thetas, rewards, N, m) for _ in range(500)]))}')
print(f'Thompson Sampling average payout over 500 runs: {np.mean(np.array([thompson_sampling(thetas, rewards, N)[-1] for _ in range(500)]))}')

A/B Testing average payout over 500 runs: 0.26539999999999997
Thompson Sampling average payout over 500 runs: 0.38567999999999997


### Problem 14

In [94]:
"""
As an alternative to A/B testing, try randomly choosing arms (with replacement) 
m times and give MLE estimates for each (theta_1,..., theta_n). Then choose
the largest theta_i and use the remaining N - m pulls to try to maximize the
average payoff.  Compare average payoff with A/B testing and Thompson sampling.
"""

def random_test(thetas, rewards, N, m):
    n = len(thetas)

    # Compute random m values to sample for each theta
    m_vals = [1] * n
    for _ in range(m - n):
        index = random.randint(0, n-1)
        m_vals[index] += 1

    # Calculate estimated thetas using samples
    for m_val in m_vals:
        est_thetas = np.array([np.sum(np.random.binomial(1, theta, m_val)) / m_val for theta in thetas])

    # Calculate expected values
    expected_vals = est_thetas * rewards
    max_index = np.argmax(expected_vals)
    if type(max_index) is np.ndarray:
        max_index = max_index[0]

    # Find and return best payout
    payout = np.sum(np.random.binomial(1, est_thetas[max_index], N - n*m)) * rewards[max_index] / N
    return payout

In [112]:
thetas = np.array([0.3, 0.45, 0.23, 0.4])
rewards = np.array([1, 1, 1, 1])
N = 100
m = 12

print(f'Random Testing average payout over 500 runs: {np.mean(np.array([random_test(thetas, rewards, N, m) for _ in range(500)]))}')
print(f'A/B Testing average payout over 500 runs: {np.mean(np.array([ab_test(thetas, rewards, N, m) for _ in range(500)]))}')
print(f'Thompson Sampling average payout over 500 runs: {np.mean(np.array([thompson_sampling(thetas, rewards, N)[-1] for _ in range(500)]))}')

Random Testing average payout over 500 runs: 0.35094
A/B Testing average payout over 500 runs: 0.27244000000000007
Thompson Sampling average payout over 500 runs: 0.37874


### Problem 15

In [125]:
"""
Instead of evaluating algorithms based on average payout over time, reapply
A/B testing and Thompson sampling but assess the quality with the discounted
utility function sum(B^i * u_i), where u_i is the payoff.  By doing multple
runs, compute the expected utility and use that as the basis of comparison
for deciding whether Thompson sampling is better than A/B testing
"""

def ab_test_discounted(thetas, rewards, N, m, betas):
    n = len(thetas)

    # Calculate estimated thetas using samples
    est_thetas = np.array([np.sum(np.random.binomial(1, theta, m)) / m for theta in thetas])

    # Calculate expected values
    expected_vals = est_thetas * rewards
    max_index = np.argmax(expected_vals)
    if type(max_index) is np.ndarray:
        max_index = max_index[0]

    # Find and return best payout
    discounted_utility = np.sum(np.random.binomial(1, est_thetas[max_index], N - n*m) * rewards[max_index] * betas)
    return discounted_utility

def thompson_sampling_discounted(theta, rewards, N, betas):
    """
    Thompson sample to choose the arm, then simulate a pull, then update.  Repeat
    N times.

    theta : array of true probabilities for the arms
    N     : total number of pulls to make

    Return percentage of successes up to each time step
    """

    # Initialize
    n = len(theta)      # Number of arms
    a = np.ones(n)      # Initial 'a' hyperparameters
    b = np.ones(n)      # Initial 'b' hyperparameters
    X = np.random.random(N) # Draw from [0,1] to simulate pulls
    traj = np.zeros(N)      # Initial trajectory

    for k in range(N):
        draw = dist.beta.rvs(a,b)   # Thompson sample for all arms
        index = np.argmax(draw * rewards)     # Identify arm to pull
        if X[k] <= theta[index]:    # If pull is a success
            a[index] += 1   # Update posterior with success
            traj[k] = traj[k-1] + 1 # Update trajectory
        else:
            b[index] += 1           # Update posterior with failure
            traj[k] = traj[k-1]         # Update trajectory
    return np.sum(traj / np.arange(1, N+1) * betas)     # Percentage successes

In [127]:
thetas = np.array([0.3, 0.45, 0.23, 0.4])
rewards = np.array([1, 1, 1, 1])
N = 100
m = 12
n = len(thetas)
domain = np.linspace(0, 1, N - m*n)
domain_thompson = np.linspace(0, 1, N)
betas = np.array([-domain + 1])
betas_thompson = np.array([-domain_thompson + 1])

print(f'A/B Discounted Utility: {ab_test_discounted(thetas, rewards, N, m, betas)}')
print(f'Thompson Sampling Discounted Utility: {thompson_sampling_discounted(thetas, rewards, N, betas_thompson)}')

A/B Discounted Utility: 12.196078431372548
Thompson Sampling Discounted Utility: 15.879353952603227


### Problem 16

In [132]:
"""
Generalize Algorithm 17.1 by coding up a simulation of a Bernoulli bandit 
process/solution where Thompson sampling can also accommodate an array of
payouts (J_1,..., J_n) for each arm
"""

def thompson_sampling_rewards(theta, rewards, N):
    """
    Thompson sample to choose the arm, then simulate a pull, then update.  Repeat
    N times.

    theta : array of true probabilities for the arms
    N     : total number of pulls to make

    Return percentage of successes up to each time step
    """

    # Initialize
    n = len(theta)      # Number of arms
    a = np.ones(n)      # Initial 'a' hyperparameters
    b = np.ones(n)      # Initial 'b' hyperparameters
    X = np.random.random(N) # Draw from [0,1] to simulate pulls
    traj = np.zeros(N)      # Initial trajectory

    for k in range(N):
        draw = dist.beta.rvs(a,b)   # Thompson sample for all arms
        index = np.argmax(draw * rewards)     # Identify arm to pull accounting for rewards
        if X[k] <= theta[index]:    # If pull is a success
            a[index] += 1   # Update posterior with success
            traj[k] = traj[k-1] + 1 # Update trajectory
        else:
            b[index] += 1           # Update posterior with failure
            traj[k] = traj[k-1]         # Update trajectory
    return (traj / np.arange(1, N+1))[-1]     # Percentage successes

In [136]:
thetas = np.array([0.3, 0.45, 0.23, 0.4])
rewards = np.array([1, 2, 3, 4])
N = 100
m = 12
n = len(thetas)

print(f'Thompson Sampling Accounting for Rewards: {thompson_sampling_rewards(thetas, rewards, N)}')

Thompson Sampling Accounting for Rewards: 0.45
