## Notebook Purpose:

1. The primary goal of this notebook is to validate the results produced by the platform.
2. We are picking one experiment = `exp_big_exp_run` and we are going to validate following:
    - CUPED Results
3. This validation will ensure if the platform is trustworthy or not.



In [19]:
spark.stop()

In [None]:

from pyspark.sql import SparkSession
spark = (
        SparkSession.builder
        .appName("Laptop_ROG_CUPED_validation")
        .master("spark://10.0.0.80:7077")

        .config("spark.executor.instances", "2")
        .config("spark.executor.cores", "10")
        .config("spark.executor.memory", "18g")
        .config("spark.executor.memoryOverhead", "4g")

        .config("spark.driver.memory", "10g")
        .config("spark.driver.maxResultSize", "2g")
        .config("spark.driver.host", "10.0.0.80")
        .config("spark.driver.bindAddress", "0.0.0.0")

        # AQE + shuffle
        .config("spark.sql.adaptive.enabled", "true")
        .config("spark.sql.shuffle.partitions", "288")
        .config("spark.sql.files.maxPartitionBytes", "256m")

        # Don’t set spark.local.dir here; use SPARK_LOCAL_DIRS on the worker
        # .config("spark.local.dir", "...")  <-- remove
            # -------- MinIO / S3A (must match Iceberg) --------
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
        .config("spark.hadoop.fs.s3a.access.key", "minioadmin")
        .config("spark.hadoop.fs.s3a.secret.key", "minioadmin")
        .config("spark.hadoop.fs.s3a.endpoint", "http://10.0.0.80:9100")  # <-- changed
        .config("spark.hadoop.fs.s3a.path.style.access", "true")
        .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
        .config("spark.hadoop.fs.s3a.aws.credentials.provider",
                "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")

        # -------- Iceberg REST catalog --------
        .config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog")
        .config("spark.sql.catalog.iceberg.type", "hadoop")
        # .config("spark.sql.catalog.iceberg.uri", "http://10.0.0.59:8181")
        .config("spark.sql.catalog.iceberg.warehouse", "s3a://iceberg-warehouse/warehouse/")
        .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
        .config("spark.sql.defaultCatalog", "iceberg")
        .config("spark.sql.catalog.iceberg.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")

        # Iceberg's own S3 settings (again, pointing to MinIO)
        .config("spark.sql.catalog.iceberg.s3.endpoint", "http://10.0.0.80:9100")
        .config("spark.sql.catalog.iceberg.s3.path-style-access", "true")
        .config("spark.sql.catalog.iceberg.s3.access-key-id", "minioadmin")
        .config("spark.sql.catalog.iceberg.s3.secret-access-key", "minioadmin")
        .config(
            "spark.jars",
            "/opt/spark/jars/iceberg-spark-runtime-3.4_2.12-1.6.0.jar,"
            "/opt/spark/jars/iceberg-aws-bundle-1.6.0.jar")
        .getOrCreate()
    )

26/02/19 02:56:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## CUPED Validation:

Please note that we will using pre-experiment revenue metric i.e `pre_revenue` to reduce the variance in the `revenue` metric. We are not using conversions here as CUPED works best for continuous metrics and not binary metrics.



In [2]:
# We need to JOIN exposures and outcomes to measure the results

v_sql = """ create or replace temporary view ab_results as
with t1 as
(select exposures.experiment_id, exposures.user_id, outcomes.metric_name, outcomes.value, outcomes.ts, exposures.variant

from iceberg.exp.exposures as exposures
inner join iceberg.exp.outcomes as outcomes
on exposures.experiment_id = outcomes.experiment_id
and exposures.user_id = outcomes.user_id
and exposures.experiment_id = 'exp_big_exp_run'
and outcomes.metric_name in ('revenue','pre_revenue'))

select t1.experiment_id, t1.user_id, t1.variant,
max(case when metric_name = 'revenue' then value else 0 end) as target,
max(case when metric_name = 'pre_revenue' then value else 0 end) as covariate
from t1
group by  t1.experiment_id, t1.user_id, t1.variant


"""

spark.sql(v_sql).show(truncate=False)
spark.sql("select * from ab_results limit 10").show(truncate=False)

#  So the query returns data. But we need to check if there are any user_id, metric_name duplicates. Refer to next cell for details:

26/02/19 03:06:33 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


++
||
++
++



26/02/19 03:06:37 WARN DataSourceV2Strategy: Can't translate true to source filter, unsupported expression
[Stage 0:>                                                          (0 + 1) / 1]

+---------------+---------+---------+------+------------------+
|experiment_id  |user_id  |variant  |target|covariate         |
+---------------+---------+---------+------+------------------+
|exp_big_exp_run|u_0000001|treatment|0.0   |16.311533793776313|
|exp_big_exp_run|u_0000002|treatment|0.0   |11.32790644644474 |
|exp_big_exp_run|u_0000003|control  |0.0   |14.174728490227805|
|exp_big_exp_run|u_0000004|treatment|0.0   |10.847600353402715|
|exp_big_exp_run|u_0000005|control  |0.0   |8.275909435378757 |
|exp_big_exp_run|u_0000006|control  |0.0   |12.443259526513518|
|exp_big_exp_run|u_0000007|treatment|0.0   |15.835280615716401|
|exp_big_exp_run|u_0000008|treatment|0.0   |7.090048635555965 |
|exp_big_exp_run|u_0000010|control  |0.0   |9.99508481757065  |
|exp_big_exp_run|u_0000011|treatment|0.0   |16.328538904577922|
+---------------+---------+---------+------+------------------+



                                                                                

In [None]:
v_sql = """
select avg(covariate) as mean_covariate,
var_samp(covariate) as var_covariate,
covar_samp( target,covariate) as covar
from ab_results

"""

cuped_vals = spark.sql(v_sql).toPandas()

26/02/19 03:10:05 WARN DataSourceV2Strategy: Can't translate true to source filter, unsupported expression
                                                                                

In [9]:

mean_covariate = cuped_vals['mean_covariate'].values[0]
var_covariate = cuped_vals['var_covariate'].values[0]
covar = cuped_vals['covar'].values[0]
theta = covar/var_covariate

print(mean_covariate)
print(var_covariate)
print(covar)
print(theta)

12.5017229475785
8.980809420322341
0.007240231252164588
0.0008061891655089503


In [None]:
#  Please note that for cuped need adjusted target y-value based on reduced variance:
# We are calculating this at individual user level instead of group level
# Adjusted Y = Y - theta*( pre_experiment_value - mean of pre_experiment_value)
# We will re-run earlier join and inject values from above there:

In [11]:
# We need to JOIN exposures and outcomes to measure the results

v_sql = f""" create or replace temporary view ab_results_2 as
with t1 as
(select exposures.experiment_id, exposures.user_id, outcomes.metric_name, outcomes.value, outcomes.ts, exposures.variant

from iceberg.exp.exposures as exposures
inner join iceberg.exp.outcomes as outcomes
on exposures.experiment_id = outcomes.experiment_id
and exposures.user_id = outcomes.user_id
and exposures.experiment_id = 'exp_big_exp_run'
and outcomes.metric_name in ('revenue','pre_revenue')),

t2 as
(
select t1.experiment_id, t1.user_id, t1.variant,
max(case when metric_name = 'revenue' then value else 0 end) as target,
max(case when metric_name = 'pre_revenue' then value else 0 end) as covariate
from t1
group by  t1.experiment_id, t1.user_id, t1.variant)

select t2.*, (target - {theta}*(covariate - {mean_covariate})) as adjusted_target

from t2

"""

spark.sql(v_sql).show(truncate=False)
spark.sql("select * from ab_results_2 limit 10").show(truncate=False)

#  So the query returns data. But we need to check if there are any user_id, metric_name duplicates. Refer to next cell for details:

++
||
++
++



26/02/19 03:23:40 WARN DataSourceV2Strategy: Can't translate true to source filter, unsupported expression
[Stage 2:>                                                          (0 + 1) / 1]

+---------------+---------+---------+------+------------------+----------------------+
|experiment_id  |user_id  |variant  |target|covariate         |adjusted_target       |
+---------------+---------+---------+------+------------------+----------------------+
|exp_big_exp_run|u_0000001|treatment|0.0   |16.311533793776313|-0.0030714282268431625|
|exp_big_exp_run|u_0000002|treatment|0.0   |11.32790644644474 |9.463181455096624E-4  |
|exp_big_exp_run|u_0000003|control  |0.0   |14.174728490227805|-0.0013487589423202917|
|exp_big_exp_run|u_0000004|treatment|0.0   |10.847600353402715|0.0013335357138480762 |
|exp_big_exp_run|u_0000005|control  |0.0   |8.275909435378757 |0.003406805068996757  |
|exp_big_exp_run|u_0000006|control  |0.0   |12.443259526513518|4.713257664117599E-5  |
|exp_big_exp_run|u_0000007|treatment|0.0   |15.835280615716401|-0.002687478074652057 |
|exp_big_exp_run|u_0000008|treatment|0.0   |7.090048635555965 |0.00436283319761567   |
|exp_big_exp_run|u_0000010|control  |0.0   

                                                                                

In [15]:
# After we have calculated adjusted target values we run the normal AB testing formulas:

v_sql = """ 
select variant, 
avg(adjusted_target) as mean_value
from ab_results_2 

group by variant
"""



spark.sql(v_sql).show(truncate=False)

ab_summary = spark.sql(v_sql).toPandas()

ab_summary.sort_values(by = 'variant', inplace=True, ascending=False)

26/02/19 03:26:22 WARN DataSourceV2Strategy: Can't translate true to source filter, unsupported expression
26/02/19 03:26:24 WARN DataSourceV2Strategy: Can't translate true to source filter, unsupported expression


+---------+------------------+
|variant  |mean_value        |
+---------+------------------+
|treatment|1.8771168949176746|
|control  |1.252521525212478 |
+---------+------------------+



                                                                                

In [16]:
v_sql = """
select variant, adjusted_target as value
from ab_results_2

"""
# spark.sql(v_sql).show(truncate=False)

reve_data = spark.sql(v_sql).toPandas()
reve_data

26/02/19 03:27:30 WARN DataSourceV2Strategy: Can't translate true to source filter, unsupported expression
                                                                                

Unnamed: 0,variant,value
0,treatment,-0.003071
1,treatment,0.000946
2,control,-0.001349
3,treatment,0.001334
4,control,0.003407
...,...,...
849706,treatment,-0.002597
849707,control,-0.000160
849708,treatment,0.001407
849709,control,-0.001092


In [18]:
from scipy import stats
import numpy as np

control_values = np.array(reve_data[reve_data['variant']=='control']['value'])
treatment_values = np.array(reve_data[reve_data['variant']=='treatment']['value'])


r  = stats.ttest_ind(treatment_values, control_values, equal_var=False)
ci = r.confidence_interval(confidence_level=0.95)

print(f"Theta: {theta}")
print(f"x-Mean: {mean_covariate}")
print(f"Control Mean: {np.mean(control_values)}")
print(f"Treatment Mean: {np.mean(treatment_values)}")
print(f"Delta:   {np.mean(treatment_values) - np.mean(control_values)}")
print(f"Relative Lift:  {(np.mean(treatment_values) - np.mean(control_values))/np.mean(control_values)}")
print(f"Calculated T-statistic: {r.statistic:.3f}")
print(f"P-value: {r.pvalue:.3f}")
print(f"95% Confidence Interval for the difference in means: ({ci.low:.6f}, {ci.high:.6f})")

Theta: 0.0008061891655089503
x-Mean: 12.5017229475785
Control Mean: 1.252521525212505
Treatment Mean: 1.8771168949176709
Delta:   0.6245953697051658
Relative Lift:  0.4986703678399426
Calculated T-statistic: 67.547
P-value: 0.000
95% Confidence Interval for the difference in means: (0.606472, 0.642719)


## Comparison between Manual vs Platform values:

**Conclusion:** Manually calculated values are same as platform and hence CUPED set up in platform works as intended.

**Manual Values:**

```
Theta: 0.0008061891655089503
x-Mean: 12.5017229475785
Control Mean: 1.252521525212505
Treatment Mean: 1.8771168949176709
Delta:   0.6245953697051658
Relative Lift:  0.4986703678399426
Calculated T-statistic: 67.547
P-value: 0.000
95% Confidence Interval for the difference in means: (0.606472, 0.642719)
```

**Platform Values:**

```
Target metric: `revenue`
Covariate: `pre_revenue`

- theta: `0.0008061891655`
- x_mean: `12.5017`

### Raw (no CUPED)
- control mean: `1.25252`
- treatment mean: `1.87712`
- delta: `0.624599`
- p-value: `0.0`
- ci95: `[0.6064755764100439, 0.6427230889505252]`

### CUPED-adjusted
- control mean: `1.25252`
- treatment mean: `1.87712`
- delta: `0.624595`
- p-value: `0.0`  (not stored in Iceberg schema)
- ci95: `[0.6064716157553894, 0.6427191236550062]

```
