In [None]:
import os

# The below line was causing Java gateway Process exited before sending its port number error
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars $PKG_HOME/snowflake-jdbc-3.6.10.jar,$PKG_HOME/spark-snowflake_2.11-2.4.4-spark_2.2.jar,$PKG_HOME/hadoop-aws-2.7.3.jar,$PKG_HOME/aws-java-sdk-1.7.4.jar'
os.environ['JAVA_HOME'] = '/Library/Java/JavaVirtualMachines/jdk1.8.0_91.jdk/Contents/Home'

In [None]:
import findspark
findspark.init()

import pyspark
from pyspark import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SparkSession, SQLContext
import pyspark.sql.types as sql_types
import pyspark.sql.functions as sql_functions

import pandas as pd
from scipy import stats

spark_conf = SparkConf().setMaster('local').setAppName('Stat Sig T-Test S3 Parquet')
sc = pyspark.SparkContext.getOrCreate()
spark = SparkSession(sc)

In [None]:
#
# Some constants
#
aws_profile = "prasad_dev"
aws_region = "us-east-1"
s3_bucket = "ou-dev-workspace"

# 
# Reading environment variables from aws credential file 
#
import configparser

config = configparser.ConfigParser()
config.read(os.path.expanduser("~/.aws/credentials"))

access_id = config.get(aws_profile, "aws_access_key_id") 
access_key = config.get(aws_profile, "aws_secret_access_key") 


# You might need to set these
spark._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", access_id)
spark._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", access_key)


In [None]:
path1 = s3_bucket + "/prasad/welch_t_test/input_data/*"
df3=spark.read.parquet("s3a://" + path1)
df3.show(1)


In [16]:
def welch_t_test(mean_c, var_c, n_c, mean_t, var_t, n_t):
    '''
        returns t-statistic, two_tail p-value and degrees_of_freedom
    '''

    # t = float((mean_c - mean_t) / ((((float(n_c - 1) * var_c + float(n_t - 1) * var_t) / float(n_c + n_t - 2)) ** (1/2)) * float(1 / n_c + 1 / n_t) ** (1/2)))
    t = (mean_c - mean_t) / ((var_c/float(n_c) + var_t/float(n_t)) ** (1/2))

    #Degrees of freedom
    df = float(n_c + n_t - 2)

    #p-value after comparison with the t 
    p = float(stats.t.cdf(t,df=df))
    
    return sql_types.Row('T_STATISTIC', 'TWO_TAIL_P_VALUE', 'DEGREES_OF_FREEDOM')(round(t, 6), p, df)


In [28]:


welch_t_test_schema = sql_types.StructType([
    sql_types.StructField("T_STATISTIC", sql_types.DoubleType(), True),
    sql_types.StructField("TWO_TAIL_P_VALUE", sql_types.DoubleType(), True),
    sql_types.StructField("DEGREES_OF_FREEDOM", sql_types.DoubleType(), True)])

welch_t_test_udf = sql_functions.udf(welch_t_test, welch_t_test_schema)

welch_1k_t_test_schema = sql_types.StructType([
    sql_types.StructField("T_STATISTIC_1K", sql_types.DoubleType(), True),
    sql_types.StructField("TWO_TAIL_P_VALUE_1K", sql_types.DoubleType(), True),
    sql_types.StructField("DEGREES_OF_FREEDOM_1K", sql_types.DoubleType(), True)])

welch_1k_t_test_udf = sql_functions.udf(welch_t_test, welch_1k_t_test_schema)

welch_values_df = df3.withColumn("WELCH_T_VALUES", 
            sql_functions.explode(
                sql_functions.array(
                    welch_t_test_udf(
                         df3["MEAN_METRIC_VALUE_IN_CONTROL"],
                         df3["VARIANCE_OF_METRIC_VALUE_IN_CONTROL"],
                         df3["USERS_BUCKETED_IN_CONTROL"],
                         df3["MEAN_METRIC_VALUE_IN_TREATMENT"],
                         df3["VARIANCE_OF_METRIC_VALUE_IN_TREATMENT"],
                         df3["USERS_BUCKETED_IN_TREATMENT"]
                    )))).withColumn("WELCH_T_VALUES_1K", 
            sql_functions.explode(
                sql_functions.array(
                    welch_1k_t_test_udf(
                         df3["MEAN_METRIC_VALUE_IN_CONTROL"]/1000,
                         df3["VARIANCE_OF_METRIC_VALUE_IN_CONTROL"],
                         df3["USERS_BUCKETED_IN_CONTROL"],
                         df3["MEAN_METRIC_VALUE_IN_TREATMENT"]/1000,
                         df3["VARIANCE_OF_METRIC_VALUE_IN_TREATMENT"],
                         df3["USERS_BUCKETED_IN_TREATMENT"]
                    )))).withColumn("MEAN_METRIC_VALUE_IN_CONTROL_1K",
                        df3["MEAN_METRIC_VALUE_IN_CONTROL"]/1000).withColumn("MEAN_METRIC_VALUE_IN_TREATMENT_1K",
                                                                             df3["MEAN_METRIC_VALUE_IN_TREATMENT"]/1000)


all_columns_df_1k = welch_values_df.select(
                "CAL_DATE",
                 "EXPERIMENT_ID",
                 "EXPERIMENT_NAME",
                 "CONTROL_VARIANT_ID",
                 "CONTROL_VARIANT_NAME",
                 "TREATMENT_VARIANT_ID",
                 "TREATMENT_VARIANT_NAME",
                 "METRIC_NAME",
                 "USERS_BUCKETED_IN_CONTROL",
                 "VARIANCE_OF_METRIC_VALUE_IN_CONTROL",
                 "MEAN_METRIC_VALUE_IN_CONTROL",
                 "USERS_BUCKETED_IN_TREATMENT",
                 "VARIANCE_OF_METRIC_VALUE_IN_TREATMENT",
                 "MEAN_METRIC_VALUE_IN_TREATMENT",
                 "WELCH_T_VALUES.*",
                 "WELCH_T_VALUES_1K.T_STATISTIC_1K",
                 "WELCH_T_VALUES_1K.TWO_TAIL_P_VALUE_1K",
                 "MEAN_METRIC_VALUE_IN_CONTROL_1K",
                 "MEAN_METRIC_VALUE_IN_TREATMENT_1K"
            )



In [29]:
all_columns_df_1k.describe()

DataFrame[summary: string, EXPERIMENT_ID: string, EXPERIMENT_NAME: string, CONTROL_VARIANT_ID: string, CONTROL_VARIANT_NAME: string, TREATMENT_VARIANT_ID: string, TREATMENT_VARIANT_NAME: string, METRIC_NAME: string, USERS_BUCKETED_IN_CONTROL: string, VARIANCE_OF_METRIC_VALUE_IN_CONTROL: string, MEAN_METRIC_VALUE_IN_CONTROL: string, USERS_BUCKETED_IN_TREATMENT: string, VARIANCE_OF_METRIC_VALUE_IN_TREATMENT: string, MEAN_METRIC_VALUE_IN_TREATMENT: string, T_STATISTIC: string, TWO_TAIL_P_VALUE: string, DEGREES_OF_FREEDOM: string, T_STATISTIC_1K: string, TWO_TAIL_P_VALUE_1K: string, MEAN_METRIC_VALUE_IN_CONTROL_1K: string, MEAN_METRIC_VALUE_IN_TREATMENT_1K: string]

In [30]:
output_path = s3_bucket + "/prasad/welch_t_test/output_data_1k/"
all_columns_df_1k.write.parquet("s3a://" + output_path,mode="overwrite")

output_path2 = s3_bucket + "/prasad/welch_t_test/output_data_1k_csv/"
all_columns_df_1k.write.csv("s3a://" + output_path2,mode="overwrite")

In [None]:
sc.stop()