In [None]:
#111111111222222222233333333334444444444555555555566666666667777777777888888888899999999990000000000

In [1]:
# create a spark session
import pyspark
spark_context = pyspark.SparkContext()
spark_session = pyspark.sql.SparkSession(spark_context)

In [2]:
# load csv file into dataframe
df = spark_session.read.csv("ab_data.csv", header=True, sep=",")
df.show()

+-------+--------------------+---------+------------+---------+
|user_id|           timestamp|    group|landing_page|converted|
+-------+--------------------+---------+------------+---------+
| 851104|2017-01-21 22:11:...|  control|    old_page|        0|
| 804228|2017-01-12 08:01:...|  control|    old_page|        0|
| 661590|2017-01-11 16:55:...|treatment|    new_page|        0|
| 853541|2017-01-08 18:28:...|treatment|    new_page|        0|
| 864975|2017-01-21 01:52:...|  control|    old_page|        1|
| 936923|2017-01-10 15:20:...|  control|    old_page|        0|
| 679687|2017-01-19 03:26:...|treatment|    new_page|        1|
| 719014|2017-01-17 01:48:...|  control|    old_page|        0|
| 817355|2017-01-04 17:58:...|treatment|    new_page|        1|
| 839785|2017-01-15 18:11:...|treatment|    new_page|        1|
| 929503|2017-01-18 05:37:...|treatment|    new_page|        0|
| 834487|2017-01-21 22:37:...|treatment|    new_page|        0|
| 803683|2017-01-09 06:05:...|treatment|

In [3]:
# calculate the conversion rate for the control group
num_control_users = df.filter( df.group == "control").count()
num_control_converted = \
    df.filter( ( df.group == "control") & ( df.converted == 1) ).count()
print("control group conversion rate = ", \
    num_control_converted / num_control_users )

control group conversion rate =  0.12039917935897611


In [4]:
# calculate the conversion rate for the treatment group
num_treatment_users = df.filter( df.group == "treatment").count()
num_treatment_converted = \
    df.filter( ( df.group == "treatment") & ( df.converted == 1) ).count()
print("treatment group conversion rate = ", \
    num_treatment_converted / num_treatment_users )

treatment group conversion rate =  0.11891957956489856


In [5]:
# calculate the p-value
import statsmodels.api as sm
_, p_value = sm.stats.proportions_ztest( \
    [num_control_converted, num_treatment_converted], \
    [num_control_users, num_treatment_users], \
    alternative='smaller')
print("A/B test p-value =", p_value)     

A/B test p-value = 0.8919419336512124


In [6]:
# detect any null values
num_rows_with_null_vals = df.filter( df.user_id.isNull() | \
                                     df.timestamp.isNull() | \
                                     df.group.isNull() | \
                                     df.landing_page.isNull() | \
                                     df.converted.isNull() ).count()
print("Number of rows with null values = ", num_rows_with_null_vals)

Number of rows with null values =  0


In [7]:
#detect non-unique users
total_rows = df.count()
unique_users = df.select("user_id").distinct().count()
print("total rows =", total_rows," unique users =", unique_users)

#drop duplicates
ndf = df.dropDuplicates(["user_id"])
print("total rows =", ndf.count())

total rows = 294478  unique users = 290584
total rows = 290584


In [8]:
# detect any group/landing_page mismatch
df = df.filter( ( ( df.group == "control") &
                  ( df.landing_page == "old_page") ) |
                ( ( df.group == "treatment") &
                  ( df.landing_page == "new_page") ) )
print("new dataframe size =", df.count())

new dataframe size = 290585


In [9]:
# recalculate the conversion rate for the control group
num_control_users = df.filter( df.group == "control").count()
num_control_converted = \
    df.filter( ( df.group == "control") & ( df.converted == 1) ).count()
print("control group conversion rate = ", \
    num_control_converted / num_control_users )

control group conversion rate =  0.1203863045004612


In [10]:
# recalculate the conversion rate for the treatment group
num_treatment_users = df.filter( df.group == "treatment").count()
num_treatment_converted = \
    df.filter( ( df.group == "treatment") & ( df.converted == 1) ).count()
print("treatment group conversion rate = ", \
    num_treatment_converted / num_treatment_users )

treatment group conversion rate =  0.11880724790277405


In [11]:
# recalculate the p-value
import statsmodels.api as sm
_, p_value = sm.stats.proportions_ztest( \
    [num_control_converted, num_treatment_converted], \
    [num_control_users, num_treatment_users], \
    alternative='smaller')
print("A/B test p-value =", p_value)  

A/B test p-value = 0.905173705140591


In [12]:
# lets now pretend all those in the treatment group converted...
from pyspark.sql.functions import *
df = df \
    .withColumn('converted_new',\
    when(df.group == "control", df.converted).otherwise(1)) \
    .drop(df.converted) \
    .select(col('user_id'), \
            col('timestamp'), \
            col('group'), \
            col('landing_page'), \
            col('converted_new').alias('converted'))
df.show()

+-------+--------------------+---------+------------+---------+
|user_id|           timestamp|    group|landing_page|converted|
+-------+--------------------+---------+------------+---------+
| 851104|2017-01-21 22:11:...|  control|    old_page|        0|
| 804228|2017-01-12 08:01:...|  control|    old_page|        0|
| 661590|2017-01-11 16:55:...|treatment|    new_page|        1|
| 853541|2017-01-08 18:28:...|treatment|    new_page|        1|
| 864975|2017-01-21 01:52:...|  control|    old_page|        1|
| 936923|2017-01-10 15:20:...|  control|    old_page|        0|
| 679687|2017-01-19 03:26:...|treatment|    new_page|        1|
| 719014|2017-01-17 01:48:...|  control|    old_page|        0|
| 817355|2017-01-04 17:58:...|treatment|    new_page|        1|
| 839785|2017-01-15 18:11:...|treatment|    new_page|        1|
| 929503|2017-01-18 05:37:...|treatment|    new_page|        1|
| 834487|2017-01-21 22:37:...|treatment|    new_page|        1|
| 803683|2017-01-09 06:05:...|treatment|

In [13]:
# recalculate the conversion rate for the treatment group
num_treatment_users = df.filter( df.group == "treatment").count()
num_treatment_converted = \
    df.filter( ( df.group == "treatment") & ( df.converted == 1) ).count()
print("treatment group conversion rate = ", \
    num_treatment_converted / num_treatment_users )

treatment group conversion rate =  1.0


In [14]:
# recalculate the p-value
import statsmodels.api as sm
_, p_value = sm.stats.proportions_ztest( \
    [num_control_converted, num_treatment_converted], \
    [num_control_users, num_treatment_users], \
    alternative='smaller')
print("A/B test p-value =", p_value)  

A/B test p-value = 0.0
