<a href="https://colab.research.google.com/github/phnascimento/amazon-personalize-samples/blob/master/AB_Testing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install -q arviz

[K     |████████████████████████████████| 1.5MB 2.8MB/s 
[K     |████████████████████████████████| 4.3MB 16.0MB/s 
[K     |████████████████████████████████| 727kB 38.5MB/s 
[K     |████████████████████████████████| 296kB 49.8MB/s 
[?25h

In [2]:
!pip install -q pyspark

[K     |████████████████████████████████| 204.2MB 63kB/s 
[K     |████████████████████████████████| 204kB 47.6MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [3]:
!apt-get update

0% [Working]            Ign:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com (91.189.88.152)] [Waiting for headers] [Wa                                                                               Get:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
0% [Connecting to archive.ubuntu.com (91.189.88.152)] [Waiting for headers] [2 0% [Connecting to archive.ubuntu.com (91.189.88.152)] [Waiting for headers] [Wa0% [2 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (91.189.88.152)                                                                               Get:3 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
0% [2 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (91.189.88.152)                                                                               Get:4 http://security.ubuntu.com/ubuntu bionic-security InRelease 

In [4]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [5]:
!update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java

update-alternatives: using /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java to provide /usr/bin/java (java) in manual mode


In [6]:
!java -version

openjdk version "1.8.0_265"
OpenJDK Runtime Environment (build 1.8.0_265-8u265-b01-0ubuntu2~18.04-b01)
OpenJDK 64-Bit Server VM (build 25.265-b01, mixed mode)


In [7]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.getenv("JAVA_HOME")

'/usr/lib/jvm/java-8-openjdk-amd64'

In [8]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import arviz as az
import scipy.stats as stats
import pymc3 as pm
import tensorflow as tf
import tensorflow_probability as tfp
import ipywidgets as widgets
from ipywidgets import interact

tfd = tfp.distributions
tfb = tfp.bijectors

import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F


%matplotlib inline

In [9]:
sc = SparkSession.builder.appName('LRAC-6330').getOrCreate()

#**LRAC-6330 - Researching ways to parallelize A/B Testing code in Spark**<hr span=30%></hr> 

**Description**

Research the possibility of run the AB Testing code using pymc3 in parallel with Spark. The key to do this is to run multiple Markov chains in parallel to accelerate convergence and to build a more robust model. Other options are to use Tensorflow Probability, but it requrires more fine tuning and low level aspects of the lib and more deep knowledge of the math involved.<br><br>

This notebook contains the models descripted in the LRAC-6330 ticket and it's sub-tasks.<br><br><br>




##1. Input Data
---
This is the test data as described in the [repo]

In [10]:
dataset = pd.DataFrame(data={
    'trials':[3025,2895,3200,3523,2065,1645],
    'successes':[2235,2132,1789,3002,1245,1135],
    'days':[1,2,1,2,1,2]
},
index=['Control','Control','Variant A','Variant A','Variant B','Variant B']
)

dataset

Unnamed: 0,trials,successes,days
Control,3025,2235,1
Control,2895,2132,2
Variant A,3200,1789,1
Variant A,3523,3002,2
Variant B,2065,1245,1
Variant B,1645,1135,2


In [11]:
groups = dataset.groupby(dataset.index).agg({'trials':'sum','successes':'sum'})
groups

Unnamed: 0,trials,successes
Control,5920,4367
Variant A,6723,4791
Variant B,3710,2380


In [12]:
probs = groups.assign(rate = lambda df: df['successes']/df['trials'])['rate']
probs

Control      0.737669
Variant A    0.712628
Variant B    0.641509
Name: rate, dtype: float64

#**LRAC-6564** - Implement Analytical Model for Bayesian A/B Testing in Pyspark
---

Bayesian A/B Testing for binary count data can be written analytically with only Uniforms, Binomial and Beta distributions. The aim of this ticket is to implement this model to run in a pyspark job to serve as one of the baselines.

# A/B testing: binary outcomes<br>
For a binary-outcome test (e.g. a test of conversion rates), the probability that B will beat A in the long run is given by:

<a href="https://www.codecogs.com/eqnedit.php?latex=\bg_black&space;Pr(\boldsymbol{p_{B}>p_{A}})&space;=&space;\boldsymbol{\sum_{i=0}^{\alpha_{B-1}}}\frac{\boldsymbol{B}(\alpha_{\boldsymbol{A}}&space;&plus;&space;i,&space;\beta_{\boldsymbol{B}}&space;&plus;&space;\beta_{\boldsymbol{A}})}{(\beta_{\boldsymbol{B}}&space;&plus;&space;i)\boldsymbol{B}(1&space;&plus;&space;1,&space;\beta_{\boldsymbol{B}})\boldsymbol{B}(\alpha_{\boldsymbol{A}},&space;\beta_{\boldsymbol{A}})}" target="_blank"><img src="https://latex.codecogs.com/gif.latex?\bg_black&space;Pr(\boldsymbol{p_{B}>p_{A}})&space;=&space;\boldsymbol{\sum_{i=0}^{\alpha_{B-1}}}\frac{\boldsymbol{B}(\alpha_{\boldsymbol{A}}&space;&plus;&space;i,&space;\beta_{\boldsymbol{B}}&space;&plus;&space;\beta_{\boldsymbol{A}})}{(\beta_{\boldsymbol{B}}&space;&plus;&space;i)\boldsymbol{B}(1&space;&plus;&space;1,&space;\beta_{\boldsymbol{B}})\boldsymbol{B}(\alpha_{\boldsymbol{A}},&space;\beta_{\boldsymbol{A}})}" title="Pr(\boldsymbol{p_{B}>p_{A}}) = \boldsymbol{\sum_{i=0}^{\alpha_{B-1}}}\frac{\boldsymbol{B}(\alpha_{\boldsymbol{A}} + i, \beta_{\boldsymbol{B}} + \beta_{\boldsymbol{A}})}{(\beta_{\boldsymbol{B}} + i)\boldsymbol{B}(1 + 1, \beta_{\boldsymbol{B}})\boldsymbol{B}(\alpha_{\boldsymbol{A}}, \beta_{\boldsymbol{A}})}" /></a>

Where:

$\alpha_{𝐴}$ is one plus the number of successes for **A**<br>
$\beta_{A}$ is one plus the number of failures for **A**<br>
$\alpha_{B}$ is one plus the number of successes for **B**<br>
$\beta_{B}$ is one plus the number of failures for **B**<br>
**𝐵** is the beta function<br>

In [13]:
def run_analytical_model(groups, draws=1000):
  """
  Calculate beta posterior distribution given parameters

  This function calculates a beta posterior distribution given a tuple of 
  successes and trials for each variant (group) in the `groups` parameter 
  and returns a list of dicts, each one with the name of the variant, 
  the posterior mean, the median, and the posterior itself with size `draws`.

  Parameters
  ------------
  groups : pandas.DataFrame
        DataFrame of Variants, Trials and Successes. Index = 1D variant names
        of size 1 to number of unique variants, columns = 1D string array with
        values: ['Trials','Successes']
  draws:   int
        Size or number of data points of the beta posterior
        distribution. Default to 1000
  Returns
  --------
  list
    List of dicts with data about the posterior distribution of each variant
    in the `groups` parameter.
  """


  posterior_traces = []

  for name, group in groups.iterrows():
    dict_ = dict(name=name, **group)

    alpha = dict_.get('successes')
    beta = dict_.get('trials')
    posterior = stats.beta.rvs(a=alpha+1, 
                               b=beta-alpha+1, 
                               size=draws, 
                               random_state=42)
    
    posterior_traces.append(dict(
            name=name,
            expected_value=posterior.mean(),
            median=np.median(posterior),
            posterior=posterior
        ))
    
  return posterior_traces

#**LRAC-6566** Implement Hamiltonian Monte Carlo in Pyspark/Tensorflow
---

Implement the Hamiltonian Monte Carlo sampling algorithm in Tensorflow Probability and run it in a pyspark job

In [None]:
class HamiltonianMonteCarloModel:

  def __init__(self, control_group, variant_groups, *, number_of_steps=2000,
               burnin=200, leapfrog_steps=3):
    self._control_group=control_group
    self._variant_groups=variant_groups
    self._number_of_steps=number_of_steps
    self._burnin=burnin
    self._leapfrog_steps=leapfrog_steps
    self._list_obs=None
    self._list_probs=None
    self._observations=None

  def set_observed_values(self, list_probs=[.5, .5], list_draws=[1000, 1000]):

    observations = []
    for i, dist in enumerate(zip(list_probs, list_draws)):
      observations.append(
          {f'Variant {chr(65 + i)}':stats.bernoulli.rvs(p=dist[0], size=dist[1])}
      )
    
    self._observations=observations

    return

  def _joint_log_prob(self, observations):

    for _ in range(len(observations)):
      

  def run_hmc_model(self):
    initial_chain_state = []
    unconstraining_bijectors = []

    def joint_log_prob()

    for obs in self._observations:



# TODO



#**LRAC-6567** - Implement Bayesian Multi-Armed Bandits algorithms in Pyspark/Tensorflow

##**Description**

Bayesiam Multi-Armed Bandits have been used as an alternative for frequentist and bayesian A/B Testing. The aim of this ticket is to implement this algorithm using Tensorflow Probability to see if it'll be viable to research in the future.<br><br>


Unlike standard machine learning tools, bandit algorithms aren't simply black-box functions you can call to process the data you have lying around --- bandit algorithms have to actively select which data you should acquire and analyze that data in real-time. Indeed, bandit algorithms exemplify two types of learning that are not present in standard ML examples: *active learning*, which refers to algorithms that actively select which data they should receive; and *online learning*, which refers to algorithms that analyze data in real-time and provide results on the fly.<br><br>

#**Simulating the Arms of a Bandit Problem**

In order to reasonably simulate what might happen if we were to deploy an epsilon-Greedy algorithm in production, we need to set up some hypothetical arms. For this, we're going to focus on a very simple type of simulated arm that's easy to implement correctly. This hypothetical arm will let us simulate settings like:<br>

* *Optimizing click-throuhg rates for ads:* Every time we show someone an ad, we'll imagine that there's a fixed probability that they'll click on the ad. The bandit algorithm will then estimate this probability and try to decide on a strategy for showing ads that maximizes the click-through rate.
* *Conversion rates for new users:* Every time a new visitor comes to our site who isn't already a registered user, we'll imagine that there's a fixed probability that they'll register as a user after seeing the landing page. We'll then estimate this probability and try to decide on a strategy for maximizing our conversion rate.

In [66]:
class BernoulliArm:
  def __init__(self, p):
    self._p=p

  def draw(self):
    if np.random.random() > self._p:
      return 0.0
    else:
      return 1.0

In [73]:
means = [.1, .1, .5, .9]
n_arms = len(means)
np.random.shuffle(means)
arms = list(map(lambda mu: BernoulliArm(mu), means))

In [77]:
[arm.draw() for arm in arms]

[0.0, 1.0, 1.0, 0.0]

In [78]:
def test_bandits_algorithm(algo, arms, num_sims, horizon):
  
  chosen_arms = [0.0 for i in range(num_sims * horizon)]
  rewards = [0.0 for i in range(num_sims * horizon)]
  cumulative_rewards = [0.0 for i in range(num_sims * horizon)]
  sim_nums = [0.0 for i in range(num_sims * horizon)]
  times = [0.0 for i in range(num_sims * horizon)]

  for sim in range(num_sims):
    sim = sim + 1
    algo.initialize(len(arms))

    for t in range(horizon):
      t = t + 1
      index = (sim - 1) * horizon + t - 1
      sim_nums[index] = sim
      times[index] = t

      chosen_arm = algo.select_arm()
      chosen_arms[index] = chosen_arms
      reward = arms[chosen_arms[index]].draw()
      rewards[index] = reward

      if t == 1:
        cumulative_rewards[index] = reward 
      else:
        cumulative_rewards[index] = cumulative_rewards[index - 1] + reward
      algo.update(chosen_arm, reward)
    
    return [sim_nums, times, chosen_arms, rewards, cumulative_rewards]

In [None]:
class EpsilonGreedy:

  def __init__(self, )