# Computing Experiment Datasets #3: Computing Metric Observations

This notebook is part of a multi-part series focused on computing useful experiment datasets. In this notegbook, we'll use [PySpark](https://spark.apache.org/docs/latest/api/python/index.html) to compute a set of numerical measurements for each _unit_ represented in our experiment data.

**Metric observations** map each **experiment unit** to a specific numerical outcome observed during an experiment.  For example, in order to measure the purchase conversion rate associated with each variation in an experiment, we can map each visitor to a 0 or a 1, depending on whether or not they made at least one purchase during the attribution window in our experiment.

The transformations executed in this notebook are part of a larger [DAG](https://en.wikipedia.org/wiki/Directed_acyclic_graph) that can be used to perform sequential hypothesis testing with event-level experiment data:

![Experiment Analysis DAG](img/transformations.png)

This notebook is _experiment agnostic_ and can be used to compute to metric observations with input data collected in any digital experiment.

However, this notebook computes a specific set of metrics and expects certain conversion event types to be included in the input dataset.  Therefore, this notebook is probably most useful as an illustration and may also serve as a useful starting point if you want to compute metric observations on your own experiment data.

## Global parameters

The following global parameters are used to control the execution in this notebook.  These parameters may be overridden by setting environment variables prior to launching the notebook, e.g.:

```
export OPTIMIZELY_DATA_DIR=~/my_analysis_dir
```

In [59]:
import os

# Determines whether output data should be written back to disk
# Defaults to False; setting this to True may be useful when running this notebook
# as part of a larger workflow
SKIP_WRITING_OUTPUT_DATA_TO_DISK = os.environ.get("SKIP_WRITING_OUTPUT_DATA_TO_DISK", False)

# Default path for reading and writing analysis data
OPTIMIZELY_DATA_DIR = os.environ.get("OPTIMIZELY_DATA_DIR", "./covid_test_data")

## Load Experiment Unit and Event Data into Spark Dataframes

We'll use [PySpark](https://spark.apache.org/docs/latest/api/python/index.html) to transform data in this notebook. We'll start by creating a new local Spark session.

In [60]:
from pyspark.sql import SparkSession

num_cores = 1
driver_ip = "127.0.0.1"
driver_memory_gb = 1
executor_memory_gb = 2

# Create a local Spark session
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL") \
    .config(f"local[{num_cores}]") \
    .config("spark.sql.repl.eagerEval.enabled", True) \
    .config("spark.sql.repl.eagerEval.truncate", 120) \
    .config("spark.driver.bindAddress", driver_ip) \
    .config("spark.driver.host", driver_ip) \
    .config("spark.driver.memory", f"{driver_memory_gb}g") \
    .config("spark.executor.memory", f"{executor_memory_gb}g") \
    .getOrCreate()

Next we'll load our experiment units data into a Spark dataframe:

In [61]:
import os
from lib import util

units_dir = os.path.join(optimizely_data_dir, "type=experiment_units")

# load experiment unit data from disk into a new Spark dataframe
decisions = util.read_parquet_data_from_disk(
    spark_session=spark,
    data_path=units_dir,
    view_name="experiment_units"
)

# View a sample of the loaded experiment unit records
spark.sql("SELECT * FROM experiment_units LIMIT 3")

experiment_name,variation_name,reference_variation_id,uuid,timestamp,process_timestamp,visitor_id,session_id,account_id,campaign_id,experiment_id,variation_id,attributes,user_ip,user_agent,referer,is_holdback,revision,client_engine,client_version,date,experiment
covid_messaging_experiment,control,18802093142,3e1d31a3-41c2-4f22-9b77-6a67f44a269a,2020-09-14 11:21:40.177,2020-09-14 11:22:05.571,user_0,-535397001,596780373,18811053836,18786493712,18802093142,"[[$opt_bot_filtering, $opt_bot_filtering, custom, false], [$opt_enrich_decisions, $opt_enrich_decisions, custom, true...",162.227.140.251,python-requests/2.24.0,,False,99,python-sdk,3.5.2,2020-09-14,18786493712
covid_messaging_experiment,control,18802093142,e37700df-856c-46c9-9c6d-b771ff244c2c,2020-09-14 11:21:40.279,2020-09-14 11:22:12.287,user_1,-1393366513,596780373,18811053836,18786493712,18802093142,"[[$opt_bot_filtering, $opt_bot_filtering, custom, false], [$opt_enrich_decisions, $opt_enrich_decisions, custom, true...",162.227.140.251,python-requests/2.24.0,,False,99,python-sdk,3.5.2,2020-09-14,18786493712
covid_messaging_experiment,control,18802093142,c011276d-7ae0-46bd-acfb-28cdcc268545,2020-09-14 11:21:40.381,2020-09-14 11:21:59.163,user_2,340330538,596780373,18811053836,18786493712,18802093142,"[[$opt_bot_filtering, $opt_bot_filtering, custom, false], [$opt_enrich_decisions, $opt_enrich_decisions, custom, true...",162.227.140.251,python-requests/2.24.0,,False,99,python-sdk,3.5.2,2020-09-14,18786493712


Next we'll load experiment event data:

In [62]:
# oevents downloads conversion data into the type=events subdirectory
experiment_events_dir = os.path.join(optimizely_data_dir, "type=experiment_events")

# load experiment unit data from disk into a new Spark dataframe
decisions = util.read_parquet_data_from_disk(
    spark_session=spark,
    data_path=experiment_events_dir,
    view_name="experiment_events"
)

# View a sample of the loaded experiment event records
spark.sql("SELECT * FROM experiment_events LIMIT 3")

experiment_id,experiment_name,variation_id,variation_name,uuid,timestamp,process_timestamp,visitor_id,session_id,account_id,experiments,entity_id,attributes,user_ip,user_agent,referer,event_type,event_name,revenue,value,quantity,tags,revision,client_engine,client_version,date,event
18786493712,covid_messaging_experiment,18802093142,control,315063de-5f7d-4105-ba65-88a7768d80ca,2020-09-14 11:23:50.677,2020-09-14 11:24:32.015,user_1283,1441429334,596780373,"[[18803622799, 18805683213, 18809464474, false], [18811053836, 18786493712, 18802093142, false]]",18822540003,"[[$opt_bot_filtering, $opt_bot_filtering, custom, false], [$opt_enrich_decisions, $opt_enrich_decisions, custom, true...",162.227.140.251,python-requests/2.24.0,,,homepage_view,0,,0,[],99,python-sdk,3.5.2,2020-09-14,homepage_view
18786493712,covid_messaging_experiment,18818611832,message_1,4506ef78-951d-411c-9eb7-6658bf82199c,2020-09-14 11:23:50.883,2020-09-14 11:24:17.604,user_1285,-152925476,596780373,"[[18803622799, 18805683213, 18774763028, false], [18811053836, 18786493712, 18818611832, false]]",18822540003,"[[$opt_bot_filtering, $opt_bot_filtering, custom, false], [$opt_enrich_decisions, $opt_enrich_decisions, custom, true...",162.227.140.251,python-requests/2.24.0,,,homepage_view,0,,0,[],99,python-sdk,3.5.2,2020-09-14,homepage_view
18786493712,covid_messaging_experiment,18817551468,message_2,2a97d911-093d-4b7f-8d4b-3f92f4b4c936,2020-09-14 11:24:03.368,2020-09-14 11:24:10.597,user_1408,-582840818,596780373,"[[18803622799, 18805683213, 18809464474, false], [18811053836, 18786493712, 18817551468, false]]",18822540003,"[[$opt_bot_filtering, $opt_bot_filtering, custom, false], [$opt_enrich_decisions, $opt_enrich_decisions, custom, true...",162.227.140.251,python-requests/2.24.0,,,homepage_view,0,,0,[],99,python-sdk,3.5.2,2020-09-14,homepage_view


## Compute metric observations

**Metric observations** map each **experiment unit** to a specific numerical outcome observed during an experiment.  For example, in order to measure purchase conversion rate associated with each variation in an experiment, we can map each visitor to a 0 or 1, depending on whether or not they'd made at least one purchase during the attribution window in our experiment.

Unlike **experiment units** and **experiment events**, which can be computed using simple transformations,  **metric observations** are metric-dependent and can be arbitrarily complex, depending on the outcome you're trying to measure.

<table>
    <tr>
        <td>
            <img src="img/transformations_3.png" alt="Experiment Units" style="width:100%; padding-left:0px">
        </td>
        <td>
            <img src="img/tables_3.png" alt="Experiment Units" style="width:100%; padding-left:0px">
        </td>
    </tr>
</table>

We'll start by defining `add_observations` a helper function that is used to join computed observations with experiment units and append the resulting records to a global `observations` view.

In [63]:
from pyspark.sql import functions as F

observations = None
metric_names = []

def add_observations(metric_name, observations_df, units_df=None):
    """Add a new set of observations to the observations view
    
    Parameters: 
        metric_name         - A string that uniquely identifies metric for which observations are being computed,
                              for example: "purchase_conversion_rate"
        observations_df     - A spark dataframe containing the set of observations to be added.  This dataframe should
                              contain two columns:
                                  visitor_id
                                  observation - numerical outcome observered for this metric
        units_df (optional) - A spark dataframe containing the experiment units for which this metric should be
                              computed.  This is useful if you wish to filter the units in order to analyze
                              experiment results for a specific subsample, e.g. "subscribed users".  If this
                              parameter is omitted, the full experiment_units dataframe will be used.
    """
  
    global metric_names
    if metric_name in metric_names:
        raise Exception(f"Metric '{metric_name}' has already been computed.  Please choose a different name.")
        
    metric_names.append(metric_name)

    units_df = units_df or experiment_units
    merged_df = units_df.join(observations_df, on=['visitor_id'], how='left') \
                        .withColumn("_observation", F.coalesce('observation', F.lit(0))) \
                        .drop("observation") \
                        .withColumnRenamed("_observation", "observation") \
                        .withColumn("metric_name", F.lit(metric_name))

    global observations
    if observations is None:
        observations = merged_df
    else:
        observations = observations.union(merged_df)
    observations.createOrReplaceTempView("observations")

Now we'll define a set of observations by executing simple queries on our experiment events.  Each query computes a single _observation_ for each subject.

### Metric: Purchase conversion rate

In this query we measure for each visitor whether they made _at least one_ purchase. The resulting observation should be `1` if the visitor triggered the event in question during the _attribution window_ and `0` otherwise.  

Since _any_ visitor who triggered an appropriate experiment event should be counted, we can simply select a `1`. 

In [64]:
## Unique conversions on the "add to cart" event.
metric_obs = spark.sql(f"""
    SELECT
        visitor_id,
        1 as observation
    FROM
        experiment_events
    WHERE
        event_name = 'purchase'
    GROUP BY
        visitor_id
""")
metric_obs.toPandas().head(5)

Unnamed: 0,visitor_id,observation
0,user_5967,1
1,user_1434,1
2,user_3058,1
3,user_926,1
4,user_9069,1


We'll use our `add_observations` function to perform a left outer join between `experiment_units` and our newly-computed `add_to_cart` conversions.

In [65]:
add_observations(
    "Purchase conversion rate",
    metric_obs,
    experiment_units
)

Let's take a look at our observations view:

In [66]:
spark.sql("""
    SELECT 
        metric_name,
        visitor_id, 
        experiment_name, 
        variation_name, 
        observation 
    FROM 
        observations 
    LIMIT 10
""")

metric_name,visitor_id,experiment_name,variation_name,observation
Purchase conversion rate,user_1048,covid_messaging_experiment,message_2,0
Purchase conversion rate,user_1368,covid_messaging_experiment,message_2,0
Purchase conversion rate,user_1425,covid_messaging_experiment,message_1,0
Purchase conversion rate,user_1786,covid_messaging_experiment,message_2,0
Purchase conversion rate,user_2262,covid_messaging_experiment,control,0
Purchase conversion rate,user_2349,covid_messaging_experiment,control,0
Purchase conversion rate,user_242,covid_messaging_experiment,message_1,0
Purchase conversion rate,user_248,covid_messaging_experiment,message_1,0
Purchase conversion rate,user_2573,covid_messaging_experiment,message_1,0
Purchase conversion rate,user_2937,covid_messaging_experiment,message_1,0


Metric observations can be used to compute a variety of useful statistics.  Let's compute the value of our `purchase` conversion rate metric for all of the visitors in our experiment:

In [67]:
spark.sql("""
    SELECT
        metric_name,
        experiment_name,
        count(1) as unit_count,
        sum(observation),
        sum(observation) / (1.0 * count(1)) as metric_value
    FROM
        observations
    WHERE
        metric_name = "Purchase conversion rate"
    GROUP BY
        metric_name,
        experiment_name
""")

metric_name,experiment_name,unit_count,sum(observation),metric_value
Purchase conversion rate,covid_messaging_experiment,10000,555,0.0555


Now let's compute the `purchase` conversion rate broken down by experiment variation:

In [68]:
spark.sql("""
    SELECT
        metric_name,
        experiment_name,
        variation_name,
        count(1) as unit_count,
        sum(observation),
        sum(observation) / (1.0 * count(1)) as metric_value
    FROM
        observations
    WHERE
        metric_name = "Purchase conversion rate"
    GROUP BY
        metric_name,
        experiment_name,
        variation_name
""")

metric_name,experiment_name,variation_name,unit_count,sum(observation),metric_value
Purchase conversion rate,covid_messaging_experiment,message_1,3367,169,0.0501930501930501
Purchase conversion rate,covid_messaging_experiment,control,3304,163,0.0493341404358353
Purchase conversion rate,covid_messaging_experiment,message_2,3329,223,0.0669870832081706


### Metric: Items added to cart per visitor

In this query we compute the number of unique conversions on a particular event for each user.

In [69]:
## Unique conversions on the "add_to_cart" event.
add_observations(
    "Items added to cart",
    spark.sql("""
        SELECT
            visitor_id,
            count(1) as observation
        FROM
            experiment_events
        WHERE
            event_name = "add_to_cart"
        GROUP BY
            visitor_id
    """)
)

We can inspect our observations by counting the units and summing up the observations we've computed for each experiment in our dataset:

In [76]:
spark.sql("""
    SELECT 
        metric_name, 
        experiment_name, 
        variation_name,
        visitor_id, 
        observation 
    FROM 
        observations
    WHERE
        metric_name = "Items added to cart"
    LIMIT 10
""")

metric_name,experiment_name,variation_name,visitor_id,observation
Items added to cart,covid_messaging_experiment,message_2,user_1048,0
Items added to cart,covid_messaging_experiment,message_2,user_1368,0
Items added to cart,covid_messaging_experiment,message_1,user_1425,0
Items added to cart,covid_messaging_experiment,message_2,user_1786,0
Items added to cart,covid_messaging_experiment,control,user_2262,0
Items added to cart,covid_messaging_experiment,control,user_2349,0
Items added to cart,covid_messaging_experiment,message_1,user_242,0
Items added to cart,covid_messaging_experiment,message_1,user_248,0
Items added to cart,covid_messaging_experiment,message_1,user_2573,0
Items added to cart,covid_messaging_experiment,message_1,user_2937,0


### Metric: Revenue from electronics purchases

In this query we compute the total revenue associated with electronics purchases made by our experiment subjects.

In [71]:
add_observations(
    "Electronics revenue per visitor",
    spark.sql("""
        SELECT
            visitor_id,
            sum(revenue) as observation
        FROM 
            experiment_events
            LATERAL VIEW explode(tags) t
        WHERE
            t.key = "category" AND 
            t.value = "electronics" AND
            event_name = "purchase"
        GROUP BY
            visitor_id
    """)
)

Again, let's examine our observations:

In [72]:
spark.sql("""
    SELECT 
        metric_name, 
        experiment_name, 
        variation_id, 
        visitor_id, 
        observation 
    FROM 
        observations
    WHERE
        metric_name = "Electronics revenue per visitor"
    LIMIT 20
""")

metric_name,experiment_name,variation_id,visitor_id,observation
Electronics revenue per visitor,covid_messaging_experiment,18817551468,user_1048,0
Electronics revenue per visitor,covid_messaging_experiment,18817551468,user_1368,0
Electronics revenue per visitor,covid_messaging_experiment,18818611832,user_1425,0
Electronics revenue per visitor,covid_messaging_experiment,18817551468,user_1786,0
Electronics revenue per visitor,covid_messaging_experiment,18802093142,user_2262,0
Electronics revenue per visitor,covid_messaging_experiment,18802093142,user_2349,0
Electronics revenue per visitor,covid_messaging_experiment,18818611832,user_242,0
Electronics revenue per visitor,covid_messaging_experiment,18818611832,user_248,0
Electronics revenue per visitor,covid_messaging_experiment,18818611832,user_2573,0
Electronics revenue per visitor,covid_messaging_experiment,18818611832,user_2937,0


## Writing our datasets to disk

We'll write our experiment units and events datasets to disk so that they may be used for other analysis tasks.  Experiment unit data is partitioned by `experiment_id` and experiment event data is partitioned by `event_name`.

In [74]:
if not SKIP_WRITING_OUTPUT_DATA_TO_DISK: 
    
    observations_output_dir = os.path.join(optimizely_data_dir, "type=metric_observations")
    
    spark.sql("SELECT * FROM observations") \
        .coalesce(1) \
        .write.mode('overwrite') \
        .partitionBy("experiment_id", "metric_name") \
        .parquet(observations_output_dir)    