# Summary

get reading sessions for a day (all wikipedias)

What are the filtering steps
- all pageviews from wikipedias of a given day
- filter bots by agent_type = user
- filter app (only keep desktop, mobile web)
- filter all sessions involving edit-attempt
- filter all sessions with more than 100 pageviews (avoid bot-traffic)
- only pageviews to main_namespace
- join wikidata-items
- aggregate to each line a session in the dataframe
- save to parquet

Output is one parquet-file for each day in
    - PATH_OUT/reading-sessions-webrequest_<DAY>.parquet
        - <DAY> = '2020-05-03'
    
    
Required packages:
    - findspark
    - wmfdata

### Imports etc.

In [1]:
import os, sys
import datetime
import calendar
import time
import string
import random

import findspark
findspark.init('/usr/lib/spark2')
from pyspark.sql import SparkSession
from pyspark.sql import functions as F, types as T, Window
import wmfdata.spark as wmfspark


You are using wmfdata v1.0.1, but v1.0.2 is available.

To update, run `pip install --upgrade git+https://github.com/neilpquinn/wmfdata/wmfdata.git@release`.

To see the changes, refer to https://github.com/neilpquinn/wmfdata/blob/release/CHANGELOG.md


In [3]:
## defining the spark session
spark_config = {}
## regular
# spark_config = {
#     "spark.driver.memory": "2g",
#     "spark.dynamicAllocation.maxExecutors": 64,
#     "spark.executor.memory": "8g",
#     "spark.executor.cores": 4,
#     "spark.sql.shuffle.partitions": 256
# }
os.environ['PYSPARK_DRIVER_PYTHON'] = 'notebook'
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3.5'

spark = wmfspark.get_session(
    app_name='Pyspark notebook', 
    extra_settings=spark_config
)
spark

## Load data

In [4]:
## this is where the parquet files will be saved
PATH_OUT = '/user/mgerlach/webrequest/'

In [5]:
## WIKIDATA: join qids
##### wikidata-pageid table to join in the wikidata ids
##### https://wikitech.wikimedia.org/wiki/Analytics/Data_Lake/Edits/Wikidata_item_page_link
##### see available snapshots via hadoop fs -ls /wmf/data/wmf/wikidata/item_page_link/
##### looking at all previous snapshots we keep all unique combinations of qid | project+page_id


wd_wiki_pageid = F.concat(F.col('wiki_db'),F.lit('-'),F.col('page_id'))
df_wd = (
    spark.read.table('wmf.wikidata_item_page_link')
    ## snapshot: this is a partition!
    .where(F.col('snapshot') >= '2020-01-06') ## resolve issues with non-mathcing wikidata-items
    ## only wikis (enwiki, ... not: wikisource)
    .where(F.col('wiki_db').endswith('wiki'))
    .withColumn('wiki_pageid',wd_wiki_pageid)
    .select(
        'item_id',
        'wiki_pageid',
    )
    .drop_duplicates()
)
# df_wd.printSchema()

In [None]:
## for several days
day_start = datetime.date(2020,4,7) ## starting date
day_end = datetime.date(2020,5,1) ## end-date (non-inclusive)

date_array = \
    (day_start + datetime.timedelta(days=x) for x in range(0, (day_end-day_start).days))

## partition for max/min number of pageviews per day
w = Window.partitionBy(F.col('user_hash'), F.col('year'), F.col('month'), F.col('day'))
n_p_max = 100 ## maximum number of pageviews/user/day
n_p_min = 1 ## minimum number of pageviews/user/day

## join project and page-id
webrequest_wiki_pageid = F.concat(F.col('normalized_host.project'),F.lit('wiki-'),F.col('page_id'))

bool_reduced = False ## for testing

## WEBREQuEST data
df_webreq = spark.read.table('wmf.webrequest')

for date_object in date_array:
    day_str = date_object.strftime("%Y-%m-%d")#str(datetime.date(year,month,day))
    year = date_object.year
    month = date_object.month
    day = date_object.day
    print(day_str)
    
    ##
    ## we hash the client-ip and the user-agent aka 'fingerprinting'
    ## we add a different salt for each day
    salt = ''.join(random.choice(string.ascii_lowercase + string.ascii_uppercase + string.digits) for _ in range(random.randint(8,16)))
    user_hash = F.sha2(F.concat(F.col('client_ip'),F.lit('-'),F.col('user_agent'),F.lit(salt)),512) ## only client and user


    t1 = time.time()
    if bool_reduced == True:
        df_agg = (
            df_webreq
            .where( F.col('hour')==1 )
            .where( F.col('normalized_host.project') == "simple" )
        )
    else:
        df_agg = df_webreq


    df_agg = (
        df_agg
        ## hash of user-id as new column
        .withColumn('user_hash',user_hash)

        ## select time partition    
        .where( F.col('year')==year )
        .where( F.col('month')==month )
        .where( F.col('day')==day )
    #     .where( F.col('hour')==hour )


        ## select wiki project
        .where( F.col('normalized_host.project_family') == "wikipedia" )

        ## agent-type user to filter spiders
        ## https://meta.wikimedia.org/wiki/Research:Page_view/Tags#Spider
        .where(F.col('agent_type') == "user")

        ## user: desktop/mobile/mobile app; isaac filters != mobile app
        .where(F.col('access_method') != "mobile app")

        ## not clear why; present in all cases I saw before.
        .where(F.col('webrequest_source') ==  'text')


        ## filter users who edited
        .where( 
            (F.col('is_pageview') == 1)| 
            (F.col('uri_query').contains('action=edit')) | 
            (F.col('uri_query').contains('action=visualeditor')) | 
            (F.col('uri_query').contains('&intestactions=edit&intestactionsdetail=full&uiprop=options'))
        )

        ##### mark edit attempts (is_pageview==0)
        .withColumn('edit_attempt', F.when(F.col('is_pageview')==0,1).otherwise(0) )
        .withColumn('edit_attempt_session', F.max(F.col('edit_attempt')).over(w) )
        .where(F.col('edit_attempt_session')==0)

        ## only requests marked as pageviews
        .where( F.col('is_pageview') == 1 )  

        ## number of pageview requests per user and day between n_p_min and n_p_max
        .withColumn('n_p_by_user', F.sum(F.col('is_pageview').cast("long")).over(w) )
        .where(F.col('n_p_by_user') >= n_p_min)
        .where(F.col('n_p_by_user') <= n_p_max)    

        ## only main namespace
        .where( F.col('namespace_id') == 0 )

        ## merging the wikdidata id
        #### new column enwiki-234232
        .withColumn('wiki_pageid', webrequest_wiki_pageid )
        .join(df_wd,on='wiki_pageid',how='left_outer')
        
        .groupby('user_hash')
        .agg(
             F.first(F.col('access_method')).alias('access_method'),
             F.first(F.col('geocoded_data')).alias('geocoded_data'),
             F.first(F.col('n_p_by_user')).alias('session_length'),
             F.array_sort(
                 F.collect_list(
                     F.struct(
                         F.col('ts'),
                         F.col('page_id'),
                         F.col('pageview_info.page_title').alias('page_title'),
                         F.concat(F.col('normalized_host.project'),F.lit('wiki')).alias('project'),
                         F.col('item_id').alias('qid'),
                     )
                 )
             ).alias('session')
         )
    )


    # clear salt so not accidentally retained
    salt = None
    if bool_reduced == True:
        filename_save = os.path.join(PATH_OUT,'reading-sessions-webrequest_reduced_%s.parquet'%(day_str))
    else:
        filename_save = os.path.join(PATH_OUT,'reading-sessions-webrequest_%s.parquet'%(day_str))
    df_agg.write.mode('overwrite').parquet(filename_save)
    t2 = time.time()
    print('done in [s]: %.2f'%(t2-t1))

2020-04-07
done in [s]: 1052.41
2020-04-08
done in [s]: 1662.77
2020-04-09
done in [s]: 1440.22
2020-04-10
done in [s]: 1352.41
2020-04-11
done in [s]: 1500.09
2020-04-12
done in [s]: 1476.39
2020-04-13
