In [1]:
import os
# For whatever reason setting spark.pyspark.python isn't 
# being picked up, so we must set in environment
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3'

import findspark
findspark.init('/usr/lib/spark2')

from pyspark.sql import SparkSession, DataFrame, functions as F, types as T

spark = SparkSession.builder.master('yarn').getOrCreate()

# silly hack makes chaining some transformations cleaner
DataFrame.transform = lambda self, fn: fn(self)

In [2]:
def collect_groups_as_list(name, group_cols, *extra_aggs):
    """Collect df into a row per group"""
    def transform(df):
        # sort for determinism, probably doesn't matter
        item_cols = sorted(set(df.columns) - set(group_cols))
        return (
            df
            .groupBy(*group_cols)
            .agg(F.collect_list(F.struct(item_cols)).alias(name))
        )
    return transform


def explode_and_flatten_names(col_name):
    """Inverse of collect_groups_as_list"""
    def transform(df):
        assert 'exploded' not in df.columns
        dataType = df.schema[col_name].dataType.elementType
        base_cols = list(set(df.columns) - {col_name})
        element_cols = ['exploded.' + name for name in dataType.fieldNames()]
        
        return (
            df
            .select(F.explode(col_name).alias('exploded'), *base_cols)
            .select(*(base_cols + element_cols))
        )
    return transform

In [18]:
def clean_duplicates(events):
    seen = set()
    for event in events:
        if event['uniqueId'] in seen:
            continue
        seen.add(event['uniqueId'])
        yield event

        
def as_dym_events(events):
    """Aggregate session into dym events
 
    Takes the full set of events that occured within a single search
    session and transforms them into an event per search with various
    boolean properties related to dym.
    
    This could plausibly be done directly in spark, but it seemed
    much more complex and required multiple shuffles to aggregate
    per-search and per-session data that is needed.
    """
    events = list(events)
    # Map from suggested query to search token that suggested it
    suggested = {}
    for event in events:
        if not event['suggestion']:
            continue
        sugg = event['suggestion']
        # First suggestion wins. not amazing but maybe good enough.
        if sugg not in suggested or suggested[sugg][0] > event['dt']:
            suggested[sugg] = (event['dt'], event['searchToken'])
    suggested = {k: v[1] for k, v in suggested.items()}
    
    # set of searches that showed the result set of a dym query
    dym_searches = set()
    # set of searches that were autorewritten
    dym_autorewrite = set()
    # set of searches that showed a dym suggestion
    dym_shown = set()
    # set of searches that clicked the dym
    dym_clicked = set()
    # set of searches that had user interaction (click, etc)
    hit_interact = set()
    # set of searches that displayed 1 or more results
    has_results = set()
    
    for event in events:
        is_serp = event['action'] == 'searchResultPage'
        is_autorewrite = is_serp and event['didYouMeanVisible'] == 'autorewrite'
        is_dym_visible = is_serp and event['didYouMeanVisible'] in ('autorewrite', 'yes')
        is_dym_clickthrough = is_serp and event['inputLocation'] in (
            'dym-suggest', 'dym-rewritten')
        is_dym_cancel_autorewrite = is_serp and event['inputLocation'] == 'dym-original'
        is_dym = is_autorewrite or is_dym_clickthrough
        is_hit_interact = event['action'] in ('click', 'visitPage')
        
        if is_dym_visible:
            dym_shown.add(event['searchToken'])
        if is_dym:
            dym_searches.add(event['searchToken'])
        if is_autorewrite:
            dym_autorewrite.add(event['searchToken'])
        # TODO: dym_cancel_autorewrite is clicking the dym area, but it's
        # to cancel and get the original query results. Should it count?
        if is_dym_clickthrough or is_dym_cancel_autorewrite:
            try:
                dym_clicked.add(suggested[event['query']])
            except KeyError:
                pass
        if is_serp and event['hitsReturned'] is not None and event['hitsReturned'] > 0:
            has_results.add(event['searchToken'])
        if is_hit_interact:
            hit_interact.add(event['searchToken'])
        
    dym_events = []
    # Lazy dedup of tokens + random associated dt.
    # TODO: should probably group and take min dt or at least
    # something consistent.
    search_dt = {event['searchToken']: event['dt'] for event in events}
    for search, dt in search_dt.items():
        dym_events.append((
            dt,
            search in dym_autorewrite,
            search in dym_searches,
            search in dym_shown,
            search in dym_clicked,
            search in has_results,
            search in hit_interact
        ))
    return dym_events
        
        
@F.udf(returnType=T.ArrayType(T.StructType([
    T.StructField('dt', T.StringType(), False),
    T.StructField('is_autorewrite', T.BooleanType(), False),
    T.StructField('is_dym', T.BooleanType(), False),
    T.StructField('dym_shown', T.BooleanType(), False),
    T.StructField('dym_clicked', T.BooleanType(), False),
    T.StructField('has_resuults', T.BooleanType(), False),
    T.StructField('hit_interact', T.BooleanType(), False)
])))
def transform_session(events):
    # Convert a series of source events into very specific events
    # about DYM interactions
    return as_dym_events(clean_duplicates(events))

In [19]:
PARTITION_COND = (
    (F.col('year') == 2019) &
    (F.col('month') == 7) &
    (F.col('day') >= 7) &
    (F.col('day') < 14))

# When a user clicks a DYM suggestion the resulting SERP event records
# that this is a suggested query, but not which search suggested it. Pull
# in the suggested queries so we can at least guess.
df_cirrus_sugg = (
    spark.read.table('event.mediawiki_cirrussearch_request')
    .where(PARTITION_COND)
    # Guess what the provided suggestion was.
    # TODO: Cirrus should log final suggestion at top level
    .select(F.col('search_id').alias('searchToken'),
            F.explode(F.col('elasticsearch_requests.suggestion')).alias('suggestion'))
    .where(F.col('suggestion').isNotNull())
    .groupBy('searchToken').agg(F.first('suggestion').alias('suggestion'))
)

df_dym = (
    spark.read.table('event.testsearchsatisfaction2')
    .where(PARTITION_COND)
    .where(F.col('event.source') == "fulltext")
    # checkin events don't contain any useful data for dym.
    .where(F.col('event.action') != 'checkin')
    .select(
        'wiki',
        'dt',
        'event.action',
        'event.didYouMeanVisible',
        'event.hitsReturned',
        'event.inputLocation',
        'event.query',
        'event.searchSessionId',
        'event.searchToken',
        'event.uniqueId')
    .join(df_cirrus_sugg, how='left', on='searchToken')
    .transform(collect_groups_as_list('events', ['wiki', 'searchSessionId']))
    .cache()
    .select('wiki', 'searchSessionId',
            transform_session(F.col('events')).alias('dym_events'))
    .transform(explode_and_flatten_names('dym_events'))
)

In [None]:
df_dym_local = df_dym.toPandas()

In [None]:
import bokeh.io
import bokeh.plotting
from collections import defaultdict
from scipy.stats.kde import gaussian_kde


bokeh.io.output_notebook()


def ridge(bucket, data, scale):
    return list(zip([bucket]*len(data), scale*data))

def plot_dist(title, colors, data, x_range):
    min_x = min(np.min(raw) for _, _, raw in data.values())
    max_x = max(np.max(raw) for _, _, raw in data.values())
    
    x = np.linspace(min_x, max_x, 500)
    # A bit evil .. but for the patch to draw the polygon we need
    # the data must start and end with y=0. The first and last
    # x values are repeated and these are applied manually later.
    x = np.append(np.append(x, x[-1])[::-1], x[0])[::-1]
    source = bokeh.models.ColumnDataSource(data=dict(x=x))
    p = bokeh.plotting.figure(
        y_range=sorted(data.keys(), reverse=True), title=title,
        plot_height=75 * len(data), plot_width=700,
        x_range=x_range,
        toolbar_location=None)
    
    pdfs = {bucket: gaussian_kde(raw) for bucket, (_, _, raw) in data.items()}
    ys = {bucket: pdf(x) for bucket, pdf in pdfs.items()}
    max_y = max(np.max(ys[bucket]) for bucket in data.keys())
    scale = 0.8 / max_y
    
    bounds_data = defaultdict(list)
    label_data = defaultdict(list)
    for bucket, (bounds, n, raw) in sorted(data.items(), key=lambda x: x[0], reverse=True):
        # Apply polygon minimum edges
        ys[bucket][0] = 0
        ys[bucket][-1] = 0
        y = ridge(bucket, ys[bucket], scale=scale)
        source.add(y, bucket)
        p.patch(
            'x', bucket, color=colors[bucket], line_color="black",
            alpha=0.6, source=source)
        
        label_data['bucket'].append(bucket)
        label_data['label'].append('n={:,}'.format(n))
        
        if bounds:
            bounds_data['bucket'].append(bucket)
            bounds_data['upper'].append(bounds[-1])
            bounds_data['lower'].append(bounds[0])
            
    source_label = bokeh.models.ColumnDataSource(label_data)
    p.add_layout(bokeh.models.LabelSet(
        x=0, y='bucket', text='label', text_font_size="8pt",
        x_offset=-50, y_offset=-23,
        render_mode='css', source=source_label))
    
    if bounds_data:
        source_error = bokeh.models.ColumnDataSource(bounds_data)
        p.add_layout(bokeh.models.Whisker(
            dimension="width", line_color="black",
            source=source_error, base="bucket", upper="upper", lower="lower"))

    p.y_range.range_padding = 1.0
    return p

def _ci(values, rounds=1000, alpha=0.05, n=None, agg=lambda x: x.mean(axis=1)):
    if n is None:
        n = len(values)
    samples = np.random.choice(values, size=n * rounds, replace=True).reshape(rounds, -1)
    scores = np.sort(agg(samples))
    low = int(rounds * (alpha/2))
    mid = int(rounds / 2)
    high = int(rounds * (1 - alpha/2))
    return (scores[low], scores[mid], scores[high]), n, scores

def ci(df, extract, **kwargs):
    data = {}
    buckets = df['bucket'].unique()
    for bucket in sorted(buckets):
        samples = extract(df[df['bucket'] == bucket])
        data[bucket] = _ci(samples, **kwargs)
    return data

In [None]:
import pandas as pd
import numpy as np

df = df_dym_local[df_dym_local['wiki'] == 'enwiki'].copy()
df['bucket'] = 'control'
df['non_autorewrite_is_dym'] = df['is_dym'] & ~df['is_autorewrite']
df['non_autorewrite_dym_shown'] = df['dym_shown'] & ~df['is_autorewrite']

In [None]:
from collections import namedtuple
StatDef = namedtuple('StatDef', ('name', 'filter_col', 'stat_col'))
stat_defs = [
    StatDef('shown a dym suggestion', None, 'dym_shown'),
    StatDef('shown an autorewrite dym', None, 'is_autorewrite'),
    StatDef('shown a non-autorewrite dym suggestion', None, 'non_autorewrite_dym_shown'),
    StatDef('shown the results of a dym suggestion', None, 'is_dym'),
    StatDef('shown the results of a non-autorewrite dym suggestion',
            None, 'non_autorewrite_is_dym'),
    StatDef('shown non-autorewrite dym suggestion that clicked through to dym results',
            'non_autorewrite_dym_shown', 'dym_clicked'),
    StatDef('shown autorewrite dym results that clicked a result',
            'is_autorewrite', 'hit_interact'),
    StatDef('shown non-autorewrite dym results that clicked a result',
            'non_autorewrite_is_dym', 'hit_interact'),
    StatDef('clicking some result', None, 'hit_interact'),
    StatDef('shown a non-autorewrite dym suggestion and clicked non-dym result',
            'non_autorewrite_dym_shown', 'hit_interact')
]

from collections import OrderedDict

stats = OrderedDict()

for stat_def in stat_defs:
    df_source = df    
    if stat_def.filter_col is not None:
        df_source = df_source[df_source[stat_def.filter_col]]
    name = '% of search {}'.format(stat_def.name)
    stats[name] = ci(df_source, lambda x: x[stat_def.stat_col])
    
    df_session = df_source.groupby(['bucket', 'searchSessionId'])[stat_def.stat_col].agg(np.any).reset_index()
    name = '% of session {}'.format(stat_def.name)
    stats[name] = ci(df_session, lambda x: x[stat_def.stat_col])

In [None]:
from bokeh.layouts import column

colors = {'control': 'blue'}
figs = []
for k, v in stats.items():
    figs.append(plot_dist(k, colors, v, figs[0].x_range if figs else (0, 0.61)))
bokeh.io.show(column(*figs))

In [None]:
# TT TF
# FT FF
#
# TT = true positive = suggestion provided, clicked through dym
# TF = false positive = suggestion provided, user did not click through d
# FT = false negative = no suggestion provided, user reformulated query (should have provided suggestion)
# FF = true negative = no suggestion provided, clicked through results (no suggestion needed)


In [None]:
spark.stop()