In [1]:
import wmfdata
spark = wmfdata.spark.get_custom_session(
    master='yarn',
    spark_config={
        'spark.executor.memory': '32g',
        'spark.executor.cores': '4',
        'spark.dynamicAllocation.maxExecutors': '32'
    }
)
sc=spark.sparkContext

import pyspark
import pyspark.sql
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import timedelta, date
import matplotlib.pyplot as plt
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
import pandas as pd

%matplotlib inline


DESTINATION_FOLDER = "how_we_read_wikipedia_march/en"
MAX_PAGELOADS = 100*28

PySpark executors will use /usr/lib/anaconda-wmf/bin/python3.


In [2]:
spark

In [4]:
pageviews = spark.read.parquet("{}/pageloads/*".format(DESTINATION_FOLDER))\
        .where("MONTH(local_time)=3 AND DAY(local_time)>=1 AND DAY(local_time)<=28")\
        .where("namespace_id = 0")\
        .where("actual_destination<>'Main_Page'")
pageviews

DataFrame[country_code: string, user_identifier: string, http_status: string, referer: string, local_time: timestamp, timezone: string, access_method: string, page_title: string, page_id: bigint, namespace_id: int, actual_destination: string]

In [5]:
pageviews.count()

6791284901

In [6]:
previews = spark.read.parquet("{}/previews/*".format(DESTINATION_FOLDER))\
                .where("MONTH(local_time)=3 AND DAY(local_time)>=1 AND DAY(local_time)<=28")\
                .where("long_preview=TRUE")
previews

DataFrame[user_identifier: string, http_status: string, local_time: timestamp, preview_title: string, page_title: string, long_preview: boolean]

### Remove user_identifier that are too frequent

In [7]:
limit = pageviews.groupBy("user_identifier").agg(count("*").alias("pageloads"))\
            .where("pageloads<{}".format(MAX_PAGELOADS)).select("user_identifier").cache()

pageviews_limited = pageviews.join(limit, "user_identifier").repartition(12000)
previews_limited = previews.join(limit, "user_identifier").repartition(12000)

In [8]:
pageviews_limited

DataFrame[user_identifier: string, country_code: string, http_status: string, referer: string, local_time: timestamp, timezone: string, access_method: string, page_title: string, page_id: bigint, namespace_id: int, actual_destination: string]

In [9]:
pageviews_limited.count()

6528759496

In [11]:
def get_row(row):
    user_identifier = row[0][0]
    access_method = row[0][1]
    country_code = row[0][2]
    timezone = row[0][3]
    session = sorted(row[1], key=lambda x: x[0])
    session = [{"local_time": s[0], "http_status": s[1], "referer": s[2], 
                "page_title": s[3], "actual_destination": s[4], "prev_load": -1,
                "page_id": s[5]} for s in session]
    
    clean_session = [Row(**session[0])]
    for s in session[1:]:
        s['prev_load'] = int((s["local_time"]-clean_session[-1]["local_time"]).total_seconds())
        clean_session.append(Row(**s))
    return Row(user_identifier=user_identifier, access_method=access_method, 
               country_code=country_code, session=clean_session,
              timezone=timezone)

aggregated_sessions_rdd = pageviews_limited.rdd.map(
                                lambda r: ((r.user_identifier, r.access_method, r.country_code, r.timezone), 
                                    [(r.local_time, r.http_status, r.referer, 
                                      r.page_title, r.actual_destination, r.page_id)]))\
                    .reduceByKey(lambda a,b: a+b).map(get_row)

aggregated_pageloads = spark.createDataFrame(aggregated_sessions_rdd).repartition(18000)
aggregated_pageloads

DataFrame[access_method: string, country_code: string, session: array<struct<actual_destination:string,http_status:string,local_time:timestamp,page_id:bigint,page_title:string,prev_load:bigint,referer:string>>, timezone: string, user_identifier: string]

In [13]:
previews_by_user_rdd = previews_limited.rdd.map(lambda r: (r.user_identifier, 
                            [Row(http_status=r.http_status, preview_title=r.preview_title, 
                                 page_title=r.page_title, local_time=r.local_time)]))\
            .reduceByKey(lambda a,b: a+b)\
            .map(lambda r: Row(user_identifier=r[0], previews=sorted(r[1], key=lambda p: p.local_time)))

previews_by_user = spark.createDataFrame(previews_by_user_rdd).repartition(18000)
previews_by_user

DataFrame[previews: array<struct<http_status:string,local_time:timestamp,page_title:string,preview_title:string>>, user_identifier: string]

### Merge pageviews and previews

In [14]:
aggregated_pageloads.registerTempTable("aggregated_pageloads")
previews_by_user.registerTempTable("previews_by_user")

query = """
SELECT pl.*, p.previews
FROM aggregated_pageloads pl
LEFT JOIN previews_by_user p
ON pl.user_identifier = p.user_identifier
"""

aggregated_sessions = spark.sql(query).repartition(12000)
aggregated_sessions

DataFrame[access_method: string, country_code: string, session: array<struct<actual_destination:string,http_status:string,local_time:timestamp,page_id:bigint,page_title:string,prev_load:bigint,referer:string>>, timezone: string, user_identifier: string, previews: array<struct<http_status:string,local_time:timestamp,page_title:string,preview_title:string>>]

In [15]:
aggregated_sessions.write.mode("overwrite").parquet("{}/aggregated_sessions_final.parquet".format(DESTINATION_FOLDER))

In [16]:
aggregated_sessions

DataFrame[access_method: string, country_code: string, session: array<struct<actual_destination:string,http_status:string,local_time:timestamp,page_id:bigint,page_title:string,prev_load:bigint,referer:string>>, timezone: string, user_identifier: string, previews: array<struct<http_status:string,local_time:timestamp,page_title:string,preview_title:string>>]