In [None]:
import sys
import datetime
import plotly
import plotly.graph_objects as go
import sqlalchemy

# import Objectiv buh_tuh
from buhtuh import BuhTuhDataFrame
sys.path.extend([
    '../../buhtuh',
    '../'
])

from objectiv_buhtuh.util import duplo_basic_features

## Get website production data

In [None]:
# connect to full postgresql dataset, add database and credentials here
engine = sqlalchemy.create_engine('postgresql://objectiv:@localhost:5432/objectiv')

In [None]:
# create a buh_tuh dataframe based on the full dataset
basic_features = duplo_basic_features()
full_df = BuhTuhDataFrame.from_model(engine=engine, model=basic_features, index=['event_id'])

## Set the timeframe

In [None]:
# set the timeframe for analysis
timeframe_selector = (full_df['moment'] >= datetime.date(2021,6,1)) & (full_df['moment'] < datetime.date(2021,11,1))

# create one sampled df with timeframe applied 
timeframe_df = full_df[timeframe_selector]

# explore the data
timeframe_df.sort_values(by='moment', ascending=False).head()

## Set the time aggregation 

In [None]:
# choose for which level of time aggregation the rest of the analysis will run
# supports all Postgres datetime template patterns: https://www.postgresql.org/docs/9.1/functions-formatting.html#FUNCTIONS-FORMATTING-DATETIME-TABLE

agg_level = 'YYYYIW'

# add the time aggregation as new column to the dataframes, so we can group on this later
timeframe_df['time_aggregation'] = timeframe_df['moment'].format(agg_level)
full_df['time_aggregation'] = full_df['moment'].format(agg_level)

## Set the user application

In [None]:
# add a new column to dataframes with the user application from the global contexts
timeframe_df['user_application'] = timeframe_df.global_contexts.json.application
full_df['user_application'] = full_df.global_contexts.json.application

# select one or more user application(s) for analysis, in this case objectiv.io website 
# when selecting more than one application, each of the metrics below can easily be group by user_application to compare behavior
timeframe_df = timeframe_df[(timeframe_df['user_application'] == 'objectiv-website')]
full_df = full_df[(full_df['user_application'] == 'objectiv-website')]

## Users

In [None]:
# calculate unique users per timeframe
users = timeframe_df.groupby('time_aggregation').aggregate({'user_id':'nunique'})

# calculate total users, to reuse later
total_users = timeframe_df['user_id'].nunique()

users.sort_values(by='time_aggregation', ascending=False).head()

In [None]:
# visualize users
users.sort_values(by='time_aggregation', ascending=True).head(60).plot()

## Sessions

In [None]:
# calculate unique sessions
sessions = timeframe_df.groupby('time_aggregation').aggregate({'session_id':'nunique'})

sessions.sort_values(by='time_aggregation', ascending=False).head()

In [None]:
# visualize sessions
sessions.sort_values('time_aggregation', ascending=True).head(60).plot()

## Sessions per user

In [None]:
# merge users and sessions
users_sessions = sessions.merge(users, how='inner', on='time_aggregation')

# calculate average sessions per user
users_sessions['sessions_per_user_avg'] = users_sessions['session_id_nunique'] / users_sessions['user_id_nunique']

# clean-up columns
users_sessions.drop(columns=['session_id_nunique', 'user_id_nunique'], inplace=True)

users_sessions.sort_values('time_aggregation', ascending=False).head()

In [None]:
# visualize average sessions per user
users_sessions.sort_values(by='time_aggregation', ascending=True).head(60).plot()

## New users

In [None]:
# define first seen per user, based on full dataset
user_first_seen = full_df.groupby('user_id').aggregate({'time_aggregation':'min', 'session_id':'min'})

# calculate new users for each timeframe
new_users = user_first_seen.groupby('time_aggregation_min').aggregate({'user_id':'nunique'})

# merge with total users, to calculate ratio and limit to timerange
new_total_users = users.merge(new_users, how='inner', left_on='time_aggregation', right_on='time_aggregation_min', suffixes=('_total', '_new'))

# set time_aggregation as single index
new_total_users = new_total_users.set_index('time_aggregation')

# calculate new & returning user share
new_total_users['new_user_share'] = new_total_users['user_id_nunique_new'] / new_total_users['user_id_nunique_total']
new_total_users['returning_user_share'] = (new_total_users['user_id_nunique_total'] - new_total_users['user_id_nunique_new']) / new_total_users['user_id_nunique_total']

new_total_users.sort_values(by='time_aggregation', ascending=False).head()

In [None]:
# visualize new users
new_total_users[['user_id_nunique_new', 'user_id_nunique_total']].sort_values(by='time_aggregation', ascending=True).head(60).plot()

In [None]:
# visualize returning users
new_total_users[['returning_user_share']].sort_values(by='time_aggregation', ascending=True).head(60).plot()

## Events

In [None]:
# add the event location from the location_stack as new column to the df
timeframe_df['event_location'] = timeframe_df.location_stack.json.nice_name

# get the number of total users and hits per feature
users_per_event = timeframe_df.groupby(['time_aggregation', 'event_type', 'event_location']).aggregate({'user_id':'nunique','session_hit_number':'count'})

users_per_event.sort_values(by=['time_aggregation', 'user_id_nunique'], ascending=False).head()

## New user events

In [None]:
# look at the first 10 things new users do

# get the first session for users that were new in the timeframe
timeframe_new_users = timeframe_df.merge(user_first_seen, how='inner', left_on=['user_id', 'time_aggregation', 'session_id'], right_on=['user_id', 'time_aggregation_min', 'session_id_min'])

# limit to the first 10 events
timeframe_new_users = timeframe_new_users[(timeframe_new_users['session_hit_number'] <= 10)]

# number of total user and hits per feature
new_user_events = timeframe_new_users.groupby(['time_aggregation', 'event_type', 'event_location']).aggregate({'user_id':'nunique','session_hit_number':'count'})

new_user_events.sort_values(by=['time_aggregation', 'user_id_nunique'], ascending=False).head()

## WIP Conversion

In [None]:
# NOTE - find conversion feature - replace with sankey when we have it 

# set the completed conversion event, in the case a successful submission of email address to keep-me-posted
# NOTE: replace soon with the completed event '(WebDocumentContext,#document),(SectionContext,keep-me-posted-form),(CompletedContext,keep-me-posted)'
conversion_completed = '(WebDocumentContext,#document),(SectionContext,keep-me-posted-form),(ActionContext,keep-me-posted)'

# filter on only completed conversion events
conversion_completed = timeframe_df[(timeframe_df['feature'] == conversion_completed)]

# calculate conversions, now per user, but can easily be aggregated to session_id instead
conversions = conversion_completed.groupby('time_aggregation').aggregate({'user_id':'nunique'})

# merge with users, but can easily be done with sessions instead
conversion_rate = conversions.merge(users, how='inner', on='time_aggregation', suffixes=('_converting', '_total'))

# calculate conversion rate
conversion_rate['conversion_rate'] = conversion_rate['user_id_nunique_converting'] / conversion_rate['user_id_nunique_total']

conversion_rate.sort_values(by='time_aggregation', ascending=False).head()

In [None]:
# visualize conversion rate
conversion_rate[['conversion_rate']].sort_values(by='time_aggregation', ascending=True).head(60).plot()

## WIP Conversion error rate

In [None]:
# NOTE replace with sankey when we have it

# set the conversion start event, which can turn out completed or an error, on this case a click on 'keep me posted' button
conversion_start = '(WebDocumentContext,#document),(SectionContext,header),(SectionContext,keep-me-posted-form),(ButtonContext,subscribe)'

# filter on only completed conversion start events
conversion_start = timeframe_df[(timeframe_df.feature == conversion_start)]

# calculate conversion starts, now per user, but can easily be aggregated to session_id instead
conversion_starts = conversion_start.groupby('time_aggregation').aggregate({'user_id':'nunique'})

# join conversion start & complete events
conversion_totals = conversion_starts.merge(conversions, how='left', on='time_aggregation')

# rename columns
conversion_totals.rename(columns={'user_id_nunique_x':'users_start','user_id_nunique_y':'users_completed'}, inplace=True)

# calculate error rate by comparing starting and successfully completing a conversion event
conversion_totals['error_rate'] = (conversion_totals['users_start'] - conversion_totals['users_completed']) / conversion_totals['users_start']

conversion_totals.sort_values(by='time_aggregation', ascending=False).head()

## WIP Conversion funnel

In [None]:
# for users that have a conversion event, select the first one (later we can add multiple conversions case)
converting_users = conversion_completed.groupby(['user_id']).aggregate({'moment':'min'})

# merge with the df that has all user events in the timeframe
converting_users_events = timeframe_df.merge(converting_users, how='inner', on='user_id')

# select all events that converting users had up to their first conversion moment
converting_users_events = converting_users_events[(converting_users_events['moment'] <= converting_users_events['moment_min'])]

# create a window that returns the previous event for each row
window = converting_users_events.sort_values('moment').window('session_id')
converting_users_events['prev_event'] = converting_users_events.feature.window_lag(window)
converting_users_events['prev_moment'] = converting_users_events.moment.window_lag(window)

converting_users_events[['user_id', 'moment', 'event_type' , 'event_location', 'prev_moment', 'prev_event']].sort_values(by=['user_id', 'moment'], ascending=True).head()

# TODO use this as input for a sankey visual

## Session duration

In [None]:
# calculate duration of each session
session_duration = timeframe_df.groupby(['session_id']).aggregate({'moment':['min','max'], 'time_aggregation':'min'})
session_duration['session_duration'] = session_duration['moment_max'] - session_duration['moment_min']

# check which sessions have duration of zero and filter these out, as they are bounces
session_duration = session_duration[(session_duration['session_duration'] > '0')]

# rename columns
session_duration.rename(columns={'time_aggregation_min':'time_aggregation'}, inplace=True)

# calculate average session duration
avg_session_duration = session_duration.groupby(['time_aggregation']).aggregate({'session_duration': 'mean'})

avg_session_duration.sort_values(by='time_aggregation', ascending=False).head()

## WIP Session duration between events


In [None]:
# NOTE: replace event selection with sankey

# define the start and stop events to measure the duration in between, in this case landing on homepage and completing conversion
start_event = '(WebDocumentContext,#document)'
stop_event = '(WebDocumentContext,#document),(SectionContext,keep-me-posted-form),(ActionContext,keep-me-posted)'

# filter on only these events
start_stop = timeframe_df[(timeframe_df.feature == start_event) | (timeframe_df.feature == stop_event)]

# get previous (because of the sorting) event for stop event _in the same session, window_lag(n) returns the nth previous value in the partition
window = start_stop.sort_values('moment').window('session_id')
start_stop['prev_event'] = start_stop.feature.window_lag(window)
start_stop['prev_moment'] = start_stop.moment.window_lag(window)

# materizalize the df before we apply an expression on window
start_stop = start_stop.get_df_materialized_model()

# filter: for each stop event, select the closest preceeding start event
complete = start_stop[(start_stop.feature == stop_event) & (start_stop.prev_event == start_event)]

# calculate duration
complete['duration'] = complete.moment - complete.prev_moment

# calculate average duration per timeframe
duration_between_events = complete.groupby('time_aggregation').aggregate({'duration':'mean'})

duration_between_events.sort_values(by='time_aggregation', ascending=False).head()

## Retention

In [None]:
# select all active moments for each user
user_moments = timeframe_df.groupby(['user_id', 'time_aggregation']).aggregate({'moment':'count'})

# merge with first seen df
user_activity = user_moments.merge(user_first_seen, how='inner', on='user_id')

# clean-up and rename columns
user_activity.rename(columns={'time_aggregation_min':'new_user_cohort'}, inplace=True)
user_activity.drop(columns=['moment_count'], inplace=True)

# for each new_user_cohort count how many users get back per timeframe
retention_input = user_activity.groupby(['new_user_cohort', 'time_aggregation']).aggregate({'user_id':'nunique'})

# add the size of each new user cohort
cohorts = retention_input.merge(new_users, how='inner', left_on='new_user_cohort', right_on='time_aggregation_min', suffixes=('_active', '_cohort'))

# calculate classic retention (so not rolling retention, where users are required to be active each timeframe)
cohorts['retention'] = cohorts['user_id_nunique_active'] / cohorts['user_id_nunique_cohort']

# NOTE: once we can reset index. we should keep this in buh_tuh, so we can also view SQL etc.
# now switch to Pandas, as the dataset is small enough
cohorts_df = cohorts.to_df().reset_index()

# create typical retention matrix
cohorts_df = cohorts_df.astype({'new_user_cohort': 'int', 'time_aggregation': 'int'})
cohorts_df['active_in_timeframe'] = cohorts_df.time_aggregation - cohorts_df.new_user_cohort
cohorts_df.pivot('new_user_cohort', 'active_in_timeframe', 'retention')

## Bounce rate

In [None]:
# gather sessions, hits per timeframe
hits_sessions = timeframe_df[['time_aggregation', 'session_id', 'session_hit_number']]

# calculate hits per session
hits_per_session = hits_sessions.groupby(['time_aggregation', 'session_id']).aggregate({'session_hit_number':'nunique'})

# select sessions with only one hit
hit_selector = (hits_per_session['session_hit_number_nunique'] == 1)
single_hit_sessions = hits_per_session[hit_selector].to_frame()

# count these single hit sessions per timeframe
bounced_sessions = single_hit_sessions.groupby('time_aggregation').aggregate({'session_id':'nunique'})

# merge with total sessions
bounce_rate = bounced_sessions.merge(sessions, how='inner', on='time_aggregation', suffixes=('_bounce', '_total'))

# calculate bounce rate
bounce_rate['bounce_rate'] = bounce_rate['session_id_nunique_bounce'] / bounce_rate['session_id_nunique_total']

# clean-up columns
bounce_rate.drop(columns=['session_id_nunique_bounce', 'session_id_nunique_total'], inplace=True)

bounce_rate.sort_values(by='time_aggregation', ascending=False).head()

In [None]:
# visualize bounce rate
bounce_rate[['bounce_rate']].sort_values(by='time_aggregation', ascending=True).head(60).plot()

## User agent

In [None]:
# add a new column to df with the user_agent from the global contexts
timeframe_df['user_agent'] = timeframe_df.global_contexts.json.user_agent

# gather overall basic stats grouped per user_agent
user_agent_counts = timeframe_df.groupby(['time_aggregation', 'user_agent']).aggregate({'user_id':'nunique', 'session_id':'nunique'})

# add total users and calculate share per user_agent
user_agent_counts['total_users'] = total_users[1]

# calculate share per user_agent
user_agent_counts['share_of_users'] = user_agent_counts['user_id_nunique'] / user_agent_counts['total_users']

# clean-up colums
user_agent_counts.drop(columns=['total_users'], inplace=True)

user_agent_counts.sort_values(by=['time_aggregation', 'user_id_nunique'], ascending=False).head()

## Referer

In [None]:
# add a new column to dataframe with the referer from the global contexts
timeframe_df['referer'] = timeframe_df.global_contexts.json.get_from_context_with_type_series(type='HttpContext', key='referer')

# gather overall basic stats grouped per referer
referer_counts = timeframe_df.groupby(['time_aggregation', 'referer']).aggregate({'user_id':'nunique', 'session_id':'nunique'})

# add total users and calculate share per referer
referer_counts['total_users'] = total_users[1]

# calculate share per referer
referer_counts['share_of_users'] = referer_counts['user_id_nunique'] / referer_counts['total_users']

# clean-up colums
referer_counts.drop(columns=['total_users'], inplace=True)

referer_counts.sort_values(by=['time_aggregation', 'user_id_nunique'], ascending=False).head()

## User timeline

In [None]:
# show the timeline of an indivual user's events
# NOTE: we can make this better with feature selection & aggregation

# select the spefic user we want to replay
# NOTE: .astype('string') is more something buhtuh should handle, on list
user_selector = (timeframe_df['user_id'].astype('string') == '320db8ee-847c-424b-8291-c65d021575aa')

# create df with only this user's events
selected_user_df = timeframe_df[user_selector]

# NOTE: we can apply feature selection and maybe sankey visual here
# timeline of this user's events
user_timeline = selected_user_df[['moment','event_type', 'event_location', 'user_agent', 'referer']]

user_timeline.sort_values(by='moment', ascending=True).head()

## Frequency

In [None]:
# number of total sessions per user
total_sessions_user = timeframe_df.groupby(['user_id']).aggregate({'session_id':'nunique'})

# calculate frequency
frequency = total_sessions_user.groupby(['session_id_nunique']).aggregate({'user_id':'nunique'})

# add total users and calculate share per number of sessions
frequency['share_of_users'] = frequency['user_id_nunique'] / total_users[1]

frequency.sort_values(by='session_id_nunique', ascending=True).head()

In [None]:
# visualize frequency
frequency[['share_of_users']].sort_values(by='session_id_nunique', ascending=True).head(60).plot(kind='bar')

## Recency

In [None]:
# count the number of active days per user
user_active_check = timeframe_df.groupby(['user_id']).aggregate({'day':'nunique'})

# select all users that had more than one active day
user_active_check = user_active_check[(user_active_check['day_nunique'] > 1)]

# select all active days for each user
user_days = timeframe_df.groupby(['user_id', 'day']).aggregate({'time_aggregation':'min'})

# merge with users that have more than one active day
user_days = user_days.merge(user_active_check, how='inner', on='user_id')

# reset the index so we can use the user_id & day columns
user_days = user_days.reset_index()

# get previous (because of the sorting) day for each user
window = user_days.sort_values('day').window(['user_id'])
user_days['prev_day'] = user_days.day.window_lag(window)

# materizalize the df before we apply an expression on window
user_days = user_days.get_df_materialized_model()

# calculate the number of days between an active day and prev_day
user_days['recency'] = user_days['day'] - user_days['prev_day']

# rename columns
user_days.rename(columns={'time_aggregation_min':'time_aggregation'}, inplace=True)

# calculate the recency per time_aggregation
recency = user_days.groupby(['time_aggregation']).aggregate({'recency':'mean','user_id':'nunique'})

recency.sort_values(by='time_aggregation', ascending=False).head()

In [None]:
# visualize recency
recency[['recency_mean']].sort_values(by='time_aggregation', ascending=True).head(60).plot()

## Get metrics to production

In [None]:
# We're working on export functionality to dbt, until then, you can use view_sql() to get the SQL that runs on the full dataset for any metric above

# As an example, the SQL for the session duration metric
print(avg_session_duration.view_sql())