In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [2]:
%matplotlib inline
import matplotlib.pyplot as plt
import matplotlib
import datetime
import numpy as np
from pyspark.sql.functions import *
import seaborn as sns
import pandas as pd

sns.set()



In [3]:
ms = spark.read.option("mergeSchema", "true").\
     parquet("s3://telemetry-parquet/main_summary/v4/")

In [4]:
import time

In [5]:
date = '20180101'

In [6]:
period = 7
epoch = int(time.mktime(time.strptime(date, '%Y%m%d')))
begin = epoch - 60 * 60 * 24 * period
begin_str = time.strftime('%Y%m%d', time.localtime(begin))

In [7]:
data = ms.filter("app_name = 'Firefox'").\
                filter("normalized_channel = 'release'").\
                filter("sample_id='42'").\
                filter("subsession_length >= 0").\
                filter("subsession_length <= 86400")

In [8]:
WAU = data.filter("submission_date_s3 <= '%s' and submission_date_s3 > '%s'"%(date, begin_str))\
        .agg(countDistinct('client_id').alias('WAU'))\
        .select(lit(date).alias('submission_date_s3'), '*')

In [9]:
WAU.show()

+------------------+-------+
|submission_date_s3|    WAU|
+------------------+-------+
|          20180101|1477395|
+------------------+-------+



In [10]:
import datetime as DT
today = DT.date.today()
week_ago = today - DT.timedelta(days=7)

In [11]:
week_ago

datetime.date(2018, 1, 24)

In [12]:
week_ago.strftime('%Y%m%d')

'20180124'

In [13]:
datetime.datetime.strptime(date, '%Y%m%d')

datetime.datetime(2018, 1, 1, 0, 0)

In [14]:
before = datetime.datetime.strptime(date, '%Y%m%d') - datetime.timedelta(days=7)

In [15]:
before.strftime('%Y%m%d')

'20171225'

In [45]:
def getPAU(data, date, period, countrylist = None, localelist = None):
    enddate = datetime.datetime.strptime(date, '%Y%m%d')
    periodstartdate = enddate - datetime.timedelta(days=period)
    periodstartdate_str = periodstartdate.strftime('%Y%m%d')
    
    WAU = data.filter("submission_date_s3 <= '%s' and submission_date_s3 > '%s'"%(date, periodstartdate_str))\
        .agg(countDistinct('client_id').alias('PeriodCount'))\
        .select(lit(date).alias('submission_date_s3'), '*')
    return WAU

In [46]:
getPAU(data, '20180102',7).show()

+------------------+-----------+
|submission_date_s3|PeriodCount|
+------------------+-----------+
|          20180102|    1531402|
+------------------+-----------+



In [49]:
def getWAU(data, startdate, enddate, countrylist = None, localelist = None):
    end_dt = datetime.datetime.strptime(enddate, '%Y%m%d').date()
    start_dt = datetime.datetime.strptime(startdate, '%Y%m%d').date()
    
    delta = end_dt - start_dt
    
    res = (getPAU(data, startdate, 7, countrylist = None, localelist = None),)
    for i in range(1, delta.days+1):
        dt = start_dt + timedelta(days=i)
        dt = dt.strftime('%Y%m%d')
        WAU = getPAU(data, dt, 7, countrylist = None, localelist = None)
        res += (WAU,)
    df = res[0]
    for item in res[1:]:
        df = df.union(item)
    df = df.withColumnRenamed('PeriodCount','WAU')
    return df

In [50]:
getWAU(data, '20180101', '20180107').show()

+------------------+-------+
|submission_date_s3|    WAU|
+------------------+-------+
|          20180101|1477395|
|          20180102|1531402|
|          20180103|1560599|
|          20180104|1574774|
|          20180105|1570614|
|          20180106|1576086|
|          20180107|1590684|
+------------------+-------+



In [52]:
getPAU(data, '20180107', 7, countrylist = None, localelist = None).show()

+------------------+-----------+
|submission_date_s3|PeriodCount|
+------------------+-----------+
|          20180107|    1590684|
+------------------+-----------+



In [55]:
def getMAU(data, startdate, enddate, countrylist = None, localelist = None):
    end_dt = datetime.datetime.strptime(enddate, '%Y%m%d').date()
    start_dt = datetime.datetime.strptime(startdate, '%Y%m%d').date()
    
    delta = end_dt - start_dt
    
    res = (getPAU(data, startdate, 28, countrylist = None, localelist = None),)
    for i in range(1, delta.days+1):
        dt = start_dt + timedelta(days=i)
        dt = dt.strftime('%Y%m%d')
        MAU = getPAU(data, dt, 28, countrylist = None, localelist = None)
        res += (MAU,)
    df = res[0]
    for item in res[1:]:
        df = df.union(item)
    df = df.withColumnRenamed('PeriodCount','MAU')
    return df

In [56]:
getMAU(data, '20180101', '20180107').show()

+------------------+-------+
|submission_date_s3|    MAU|
+------------------+-------+
|          20180101|2564641|
|          20180102|2556029|
|          20180103|2550349|
|          20180104|2544333|
|          20180105|2540089|
|          20180106|2539755|
|          20180107|2538029|
+------------------+-------+



In [59]:
def getYAU(data, startdate, enddate, countrylist = None, localelist = None):
    end_dt = datetime.datetime.strptime(enddate, '%Y%m%d').date()
    start_dt = datetime.datetime.strptime(startdate, '%Y%m%d').date()
    
    delta = end_dt - start_dt
    
    res = (getPAU(data, startdate, 365, countrylist, localelist),)
    for i in range(1, delta.days+1):
        dt = start_dt + timedelta(days=i)
        dt = dt.strftime('%Y%m%d')
        MAU = getPAU(data, dt, 365, countrylist = None, localelist = None)
        res += (MAU,)
    df = res[0]
    for item in res[1:]:
        df = df.union(item)
    df = df.withColumnRenamed('PeriodCount','YAU')
    return df

In [86]:
getYAU(data, '20171201', '20180130').show()

+------------------+-------+
|submission_date_s3|    YAU|
+------------------+-------+
|          20171201|8302024|
|          20171202|8294046|
|          20171203|8291536|
|          20171204|8297701|
|          20171205|8296217|
|          20171206|8293332|
|          20171207|8290720|
|          20171208|8286728|
|          20171209|8279039|
|          20171210|8276837|
|          20171211|8282565|
|          20171212|8280701|
|          20171213|8277719|
|          20171214|8275345|
|          20171215|8270857|
|          20171216|8263472|
|          20171217|8261446|
|          20171218|8266911|
|          20171219|8266165|
|          20171220|8264461|
+------------------+-------+
only showing top 20 rows



In [83]:
def getPAU1(data, date, period, countrylist = None, localelist = None):
    """
    Parameter:
    data: data extracted from main_ping, filtered with 
         normalized_channel:release, sample_id:42, subsession_length between 0 and 86400(inclusive)
    date: string, with the format of 'yyyyMMdd'
    period: integer, the last XX days of the date
    countrylist: a list of country names in string
    localelist: a list of locale information in strings

    Returns: Dataframes of the distinct client_id count in the period for the would, country, locale respectively 
    """
    
    
    enddate = datetime.datetime.strptime(date, '%Y%m%d')
    periodstartdate = enddate - datetime.timedelta(days=period)
    periodstartdate_str = periodstartdate.strftime('%Y%m%d')
    
    totalperiodcount = data.filter("submission_date_s3 <= '%s' and submission_date_s3 > '%s'"%(date, periodstartdate_str))\
        .agg(countDistinct('client_id').alias('PeriodCount'))\
        .select(lit(date).alias('submission_date_s3'), '*')
    res = (totalperiodcount,)
    
    if countrylist is not None:
        countryperiodcount = data.where(col('country').isin(countrylist)).\
                    filter("submission_date_s3 <= '%s' and submission_date_s3 > '%s'"%(date, periodstartdate_str)).\
                    groupBy('country').\
                    agg(countDistinct('client_id').alias('countryPeriodCount')).\
                    orderBy(desc('countryPeriodCount')).\
                    select(lit(date).alias('submission_date_s3'), '*')
        res += (countryperiodcount,)
        
    if localelist is not None:
        localeperiodcount = data.where(col('locale').isin(localelist)).\
                    filter("submission_date_s3 <= '%s' and submission_date_s3 > '%s'"%(date, periodstartdate_str)).\
                    groupBy('locale').\
                    agg(countDistinct('client_id').alias('localePeriodCount')).\
                    orderBy(desc('localePeriodCount')).\
                    select(lit(date).alias('submission_date_s3'), '*')
        res += (localeperiodcount,)
    return res

In [84]:
countrylist = ['US','CA','MX','DE','BR','FR','IN','ID','RU','PL','IT','GB','ES','CN','JP']
localelist = ['en-US', 'de', 'fr', 'ru','es-ES']
date = '20180101'

In [85]:
getPAU1(data, date,7,countrylist,localelist)[2].show()

+------------------+------+-----------------+
|submission_date_s3|locale|localePeriodCount|
+------------------+------+-----------------+
|          20180101| en-US|           581539|
|          20180101|    de|           169090|
|          20180101|    fr|           112137|
|          20180101|    ru|            89025|
|          20180101| es-ES|            78647|
+------------------+------+-----------------+



### backfill metiod

In [91]:
def countTrue(column):
    return sum(column.cast('int'))

# Used to convert seconds into days or visa-versa
one_day_secs = 60 * 60 * 24

# Day that analysis starts on
start_date = '20160601'

# Get the timestamps in number of seconds and days from 1970-01-01
start_timestamp_s = epoch = int(time.mktime(time.strptime(start_date, "%Y%m%d")))
start_timestamp_day = int(start_timestamp_s / one_day_secs)

submission_timestamp_s = unix_timestamp('submission_date_s3', format = 'yyyyMMdd')
submission_timestamp_day = (submission_timestamp_s / one_day_secs).cast('int')
submission_timestamp_week = (submission_timestamp_s / one_day_secs / 7).cast('int')

In [92]:
def activeUserRolling(df, period):
    last_date = df.select(max('submission_date_s3')).take(1)[0][0]
    
    new_cols = [((col('timestamp_day') - i) / period).cast('int').alias('offset_%d'%i) for i in xrange(period)]

    df = df.select('submission_date_s3', 'client_id').distinct()
    df = df.select('*', submission_timestamp_day.alias('timestamp_day'))
    
    df = df.select('*', *new_cols)
    
    df = df.cache()
    active_user_dfs = []
    for i in range(period):
        active_user_dfs.append(df.groupBy('offset_%d'%i)\
            .agg(max('submission_date_s3').alias('submission_date_s3'), countDistinct('client_id').alias('active_users'))\
            .select('submission_date_s3', 'active_users')\
            .filter("submission_date_s3 != '%s'"%last_date))
    
    final = active_user_dfs[0]
    for f in active_user_dfs[1:]:
        final = final.union(f)
    
    return final

def backfillMAU(data):
    return activeUserRolling(data, 28)

def backfillWAU(data):
    return activeUserRolling(data, 7)


In [96]:
backfillMAU(data).filter("submission_date_s3 == '20180101'").show()

+------------------+------------+
|submission_date_s3|active_users|
+------------------+------------+
|          20180101|     2564641|
+------------------+------------+

