# Dynamic A/B Testing Demo
This notebook will utilize the sample data files generated by the Data Generator notebook to demonstrate how to perform dynamic A/B testing that continuously monitors the metrics of two strategies and choose the better strategy for next timestep.

In [0]:
# create port folder on dbfs and initialize timestep 0 by copying from strategy 1
dbutils.fs.mkdirs('dbfs:/FileStore/port')
dbutils.fs.cp('dbfs:/FileStore/simulated_data/strategy_0/timestep_0.csv', 'dbfs:/FileStore/port/timestep_0.csv')

In [0]:
from pyspark.sql.types import *

dataschema = StructType([ \
    StructField("banner",IntegerType(),True), \
    StructField("product",StringType(),True), \
    StructField("time", TimestampType(), True), \
    StructField("target", FloatType(), True)\
  ])

# define save_test function to be run each time there is a input
from scipy.stats import ttest_ind_from_stats
def save_test(max_timestep):
    def func(df, timestep):
        # If the first time step, initialize winner variable
        if timestep == 0:
            global winner
            winner = 0
            winner_df = spark.createDataFrame([(-1, winner)], schema='timestep: integer, winner: integer')
            winner_df.write. \
            mode("append"). \
            saveAsTable("ab_test.winner_data")

        # save input data to hive
        df = df.withColumn('timestep', F.lit(timestep))
        df.write. \
        mode("append"). \
        saveAsTable("ab_test.raw_data")

        # collect statistics for local t-test
        stats = df.groupby('banner').agg(F.mean('target').alias('target_mean'), 
                                         F.stddev('target').alias('target_std'), 
                                         F.count('target').alias('target_count')).toPandas()
        stats['target_count'] = stats['target_count'] * 5
        pvalue = ttest_ind_from_stats(*stats.iloc[:,1:].values.ravel()).pvalue

        # announce new winner
        if pvalue <= 0.05:
            winner = int(stats.loc[stats['target_mean'].argmax()]['banner'])

        # save winner to hive
        winner_df = spark.createDataFrame([(timestep, winner)], 
                                           schema='timestep: integer, winner: integer')
        winner_df.write. \
            mode("append"). \
            saveAsTable("ab_test.winner_data")
        # if timestep haven't reached max, copy next file into port, so that trigger the next streaming task
        # copying from strategy_0 means we adopt strategy_0 in the next timestep, vice versa.
        if timestep + 1 <= max_timestep:
            dbutils.fs.cp('dbfs:/FileStore/simulated_data/strategy_{}/timestep_{}.csv'.format(winner, timestep+1), 
                          'dbfs:/FileStore/port/timestep_{}.csv'.format(timestep+1))

    return func

# initiate the Hive tables for storage
spark.sql('create database if not exists ab_test')
spark.sql('drop table if exists ab_test.raw_data')
spark.sql('drop table if exists ab_test.winner_data')

# Define the streamer. This streamer will monitor the port folder
streaming = spark.readStream.schema(dataschema)\
    .option("maxFilesPerTrigger", 1)\
    .option("header", True)\
    .format('csv')\
    .load(r"dbfs:/FileStore/port")
# Run the streamer. It will call the save_test function each time a new file appears
query = streaming.writeStream.foreachBatch(save_test(max_timestep=120)).start()

In [0]:
# Stop the streamer when all timesteps are done
query.stop()
# The resulting hive tables are saved as csv in our repository as raw_data.csv and winner_data.csv