In [1]:
import wmfdata as wmf

The KaiOS app sends non-indentifying user data to the `inukapageview` event stream. During the experiment, participating installations mark their events using the `event.tests` fields. Since we usually only store the last 90 days of the `inukapageview` stream, I have stored all the data from the experiment in a separate table under an [exception to the data retention guidelines](https://meta.wikimedia.org/w/index.php?title=Data_retention_guidelines&diff=21837886&oldid=21410159) so we can do a full analysis before eventually deleting the stored data. 

Storage log:
* 2021-07-29: stored data for \[2021-05-06, 2021-07-27\] in `neilpquinn.kaios_experiment_event`
* 2021-09-07: stored data for \[2021-07-28, 2021-09-05\] in `neilpquinn.kaios_experiment_event`
* 2021-10-21: stored data for \[2021-07-23, 2021-10-20\] in `nshahquinn.kaios_experiment_event` (with extra columns `dt` and `wiki`).
* 2021-11-05: stored data for \[2021-10-21, 2021-11-04\] in `nshahquinn.kaios_experiment_event`
* 2021-11-11: combined all existing data in `nshahquinn.kaios_experiment_event_combined` (see work log).
* 2022-01-07: stored data for \[2021-11-05, 2022-01-06\] in `nshahquinn.kaios_experiment_event_combined`
* 2022-01-08: stored data for 2022-01-07 in `nshahquinn.kaios_experiment_event_combined`.

In [2]:
SPARK_PARTITION_DATE = (
    "CONCAT("
        "CAST(year AS STRING), '-', "
        "LPAD(CAST(month AS STRING), 2, '0'), '-', "
        "LPAD(CAST(day AS STRING), 2, '0')"
    ")"
)

# Both start and end dates are included
START_DATE = "2022-01-07"
END_DATE = "2022-01-07"

In [3]:
wmf.spark.run(f"""
INSERT INTO TABLE nshahquinn.kaios_experiment_event_combined
SELECT
    TO_TIMESTAMP(client_dt) AS client_dt,
    TO_TIMESTAMP(meta.dt) AS server_dt,
    event.pageview_token AS pageview_id,
    event.session_id AS session_id,
    event.user_id AS user_id,
    event.load_dt AS load_dt,
    event.page_open_time AS page_open_time,
    event.page_visible_time AS page_visible_time,
    event.section_count AS section_count,
    event.opened_section_count AS opened_section_count,
    event.is_main_page AS is_main_page,
    event.page_namespace AS page_namespace,
    wiki,
    event.tests[0].`group` AS experiment_group,
    geocoded_data["country_code"] AS country,
    geocoded_data["subdivision"] AS country_subdivision,
    event.app_version AS app_version
FROM event.inukapageview
WHERE
    {SPARK_PARTITION_DATE} BETWEEN '{START_DATE}' AND '{END_DATE}'
    AND SIZE(event.tests) > 0
    AND event.is_production
""")

PySpark executors will use /usr/lib/anaconda-wmf/bin/python3.


PYSPARK_PYTHON=/usr/lib/anaconda-wmf/bin/python3


SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/spark2/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/01/08 23:47:01 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).
22/01/08 23:47:01 WARN Utils: Service 'sparkDriver' could not bind on port 12000. Attempting port 12001.
22/01/08 23:47:02 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/01/08 23:47:11 WARN U