# Adoption of Apple's Relay Service

After discussing our findings and needs further, [T292106](https://phabricator.wikimedia.org/T292106) got an update to ask the following question:

* "How widespread is the adoption of Relay among Safari users? How has it been changing over time?"

Martin Urbanec wrote [code to measure this on a per-wiki, per-day basis](https://github.com/nettrom/2021-icloud-private-relay-usage). We've used that code as a starting point for our earlier investigation into edits and blocks. In this case, we want to know what scale we can aggregate pageview data on while maintaining a reasonable execution time. Can we do this across all projects on a daily basis? Turns out we can, and that's what this notebook does.

In [24]:
import os
import ipaddress
import time

from collections import defaultdict

import datetime as dt

import numpy as np
import pandas as pd

import findspark
from wmfdata import spark

In [33]:
SPARK_HOME = os.environ.get("SPARK_HOME", "/usr/lib/spark2")
findspark.init(SPARK_HOME)
from pyspark.sql import functions as F, types as T, Window

In [50]:
## Using standard wmfdata.spark session settings, but increasing memory overhead because we've seen
## tasks erroring out based on that. 
spark_session = spark.get_session(
    app_name = 'nettrom-apple-relay-aggregator',
    extra_settings = {'spark.executor.memoryOverhead' : '12G'}
)

PySpark executors will use /usr/lib/anaconda-wmf/bin/python3.


## Load in the Relay Range Dataset

In [6]:
# load iCloud's private relay egress ranges
# data comes from https://mask-api.icloud.com/egress-ip-ranges.csv
relay_ranges = pd.read_csv('datasets/egress-ip-ranges.csv',
                           sep=',', names=['range', 'country', 'region', 'city', 'empty']).drop(columns=['empty'])


In [8]:
## This data structure is based on using https://stackoverflow.com/a/1004527
## to determine if an IP falls within a given network, which gives the following
## assertions:
## 1: IPv4 and IPV6 networks are disjoint so we can split on IP version. There used to be
##    compatibility between the networks, but that was deprecated according to
##    https://networkengineering.stackexchange.com/questions/57903/are-the-ipv6-address-space-and-ipv4-address-space-completely-disjoint
## 2: The netmask is binary-AND'ed onto the binary IP address, hence
##    the second layer are the netmasks, of which we expect there to be a limited number. 
## 3: We then have a limited set of possible networks which are all numbers
##    so we store those as a set and let Python handle it, which gives us fast lookup.

dict_nets = {
    '4' : defaultdict(set),
    '6' : defaultdict(set)    
}

In [9]:
for net_raw in relay_ranges.range:
    net = ipaddress.ip_network(net_raw)
    
    net_v = str(net.version)
    
    dict_nets[net_v][net.netmask].add(int(net.network_address))

In [47]:
def is_ip_private_relay(ip_raw):
    try:
        ip = ipaddress.ip_address(ip_raw)
        bin_ip = int(ip)
    
        for netmask, range_set in dict_nets[str(ip.version)].items():
            bin_netmask = int(netmask)
            if (bin_ip & bin_netmask) in range_set:
                return(True)
    except ValueError: # not a valid IP address
        pass 
    
    return(False)

## Aggregate Viewership Data

In [45]:
def aggreate_daily(spark_session, this_day, db_table):
    '''
    Use `spark_session` to aggregate daily pageviews for `this_day` from
    iOS 15 split by project and whether they're using an IP within the relay,
    and store that data in `db_table`
    '''
   
    viewers_data = (
        spark_session.read.table("wmf.pageview_actor")

        .where(F.col("year") == this_day.year)
        .where(F.col("month") == this_day.month)
        .where(F.col("day") == this_day.day)

        # only user pageview traffic
        .where(F.col("agent_type") == 'user')
        .where(F.col("is_pageview") == True)

        # exclude mobile app -- private relay does not affect it
        .where(F.col("access_method") != 'mobile app')

        # only iOS 15 devices
        .where(F.col("user_agent_map.os_family") == 'iOS')
        .where(F.col("user_agent_map.os_major") == '15')
        
    )
    viewers_data_is_relay = spark_session.createDataFrame(
        viewers_data.rdd.map(lambda r: T.Row(
            log_date = dt.date(r.year, r.month, r.day),
            project = "{}.{}".format(r.normalized_host.project, r.normalized_host.project_family),
            is_relay = is_ip_private_relay(r.ip)
        ))
    )

    agg_viewer_data_by_relay = (
        viewers_data_is_relay
        .groupBy("project", "log_date", "is_relay")
        .agg(F.count("*").alias("num_views"))
    )
    agg_viewer_data_by_relay.write.insertInto(db_table)
    
    return()

In [25]:
db_name = 'nettrom_apple_relay'
table_name = 'daily_usage_per_project'

create_table_statement = f'''
CREATE TABLE {db_name}.{table_name} (
    project STRING COMMENT "project name (e.g. en.wikipedia)",
    log_date DATE COMMENT "the date for which pageview data is aggregated",
    is_relay BOOLEAN COMMENT "did the pageviews use a Relay Service IP address?",
    num_views INT COMMENT "the number of pageviews"
)
'''

# print(create_table_statement)


CREATE TABLE nettrom_apple_relay.daily_usage_per_project (
    project STRING COMMENT "project name (e.g. en.wikipedia)",
    log_date DATE COMMENT "the date for which pageview data is aggregated",
    is_relay BOOLEAN COMMENT "did the pageviews use a Relay Service IP address?",
    num_views INT COMMENT "the number of pageviews"
)



In [None]:
## This is expected to run daily as a cron job, at which point we're aggregating
## data from the previous day.

today = dt.date.today()
yesterday = today - dt.timedelta(days = 1)

aggreate_daily(spark_session, yesterday, f'{db_name}.{table_name}')
print(f'completed daily aggregation for {yesterday}')