# Introduction

This is used to compare the counts of events between segment and kinesis during the move between the two pipelines.

As they counts inevitably don't match, it then provides detailed segmentation and search / exploratory tools towards the bottom.

For a large number of days, you'll want a lot of RAM (16GB or 32GB).  For single day experimenting, you will be fine with 8GB.

Running the whole script takes quite a long time initially, in particular due to the segment query (minutes to tens of minutes).  Once this has been done, further exploration is generally very quick (less than a second to a few seconds).

It's not overly optimised, but some steps have been taken to reduce memory.

# Requirements / Jupyter Extensions

Install these through jupyterlab extension manager (if using jupyterlab)
* jupyter-widgets
* plotly (and ideally chart studio too)

In [1]:
# Safe imports
from datetime import datetime, timedelta, date

In [2]:
# Run imports that might require installation to the environment, and install if necessary.
try:
    import psycopg2
except:
    print("Failed ot import psychopg2, trying to install it")
    !{sys.executable} -m pip install psycopg2-binary
    import psycopg2
    print("Successfully installed")
    
    
try:
    import dateparser
except:
    print("Failed ot import dateparser, trying to install it")
    !{sys.executable} -m pip install dateparser
    import dateparser
    print("Successfully installed")
    
try:
    import pyathena #used in other imports, so really just checking it's available
except:
    print("Failed ot import pyathena, trying to install it")
    !{sys.executable} -m pip install pyathena
    import pyathena
    print("Successfully installed")
    
try:
    import user_agents
except:
    print("Failed ot import user_agents, trying to install it")
    !{sys.executable} -m pip install user_agents
    import user_agents
    print("Successfully installed")

    
import ipywidgets as widgets
    


  """)


In [3]:
# Imports on files that might have dependencies that need installing
import data_pier_querying
from athena_querying import AthenaQuery
from athena_common_queries import *
import user_agents # this converts user agent from browser to mobile / desktop etc.

In [4]:
ua = user_agents.parse("Mozilla/5.0 (iPhone; CPU iPhone OS 13_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.4 Mobile/15E148 Safari/604.1")

In [5]:
ua.browser.family

'Mobile Safari'

# Settings

In [147]:
num_days_to_query = 7
#from_datetime = datetime.now() - timedelta(days = 5)
from_datetime = datetime(year=2020, month=1, day=15)
to_datetime = from_datetime+ timedelta(days=num_days_to_query)
include_device_segmentation = True #E.g. iphone users.  This will use more memory (and likely slow things a bit).
save_end_dataframe_to_file = True #Saves a parquet for easy loading after crashes, or in other tools

# Kinesis Data via Athena

Data goes tracker -> kinesis -> S3 (+ another S3 transform).  Then we can query S3 using Athena.

In [7]:
aq = AthenaQuery()

In [8]:
aq.connect()

In [9]:
athena_database = "ms_data_lake_production"
athena_raw_events_table = "ms_data_stream_production_processed"

In [10]:
#query = "select context.page_url, body.event_name, count(*) from "+athena_database+"."+athena_raw_events_table
#query += " where partition_0='2019' and partition_1>='12' and partition_2>='05' group by 1,2"

In [11]:
# I've removed the device_type data to save memory, but it would be useful.
query = create_generic_event_query(from_datetime, to_datetime, include_user_agent=include_device_segmentation, include_ip_address = include_device_segmentation, interpret_urls=False)

full_query = "select * from (%s) where country_code ='sg'" %query

In [12]:
print(full_query)

select * from (
    
    SELECT 
          CAST("from_iso8601_timestamp"("sent_at") AS timestamp) "sent_at_timestamp"
    , "sent_at"
    , "type"
    , "body"."event_name"
    , "body"."data"."status"
    , "user"."anonymous_id"
    , "user"."amp_id"
    , "context"."page_url"
    , "context"."referrer"
 
    
        , context.user_agent as user_agent
        
        , context.ip_address
        
    
    FROM
      ms_data_lake_production.ms_data_stream_production_processed
    
    
    WHERE true -- makes query composition easier
    
 AND 
  (
 partition_0 >= '2020'
 AND partition_1 >= '01'
 AND partition_2 >= '15'
 OR (
 partition_0 >= '2020'
 AND partition_1 > '01'
 ) 
 OR (
 partition_0 > '2020'
 ) 
)
 AND ((partition_0 <= '2020'
	 AND partition_1 <= '01'
	 AND partition_2 <= '22'
) 
 OR (
	 partition_0 <= '2020'
	 AND partition_1 < '01'
) 
 OR (
	 partition_0 < '2020'
) 
)
 AND CAST(from_iso8601_timestamp(sent_at) AS timestamp)  between CAST(from_iso8601_timestamp('2020-01-1

In [None]:
athena_full_events_df = aq.query(query)

In [None]:
# Set types to speed queries and save on memory
athena_full_events_df = athena_full_events_df.astype({ "type":"category"
    , "event_name":"category"
    , "status":"category"}, copy=False)

In [None]:
athena_full_events_df.dtypes

sent_at_timestamp      object
sent_at                object
type                 category
event_name           category
status               category
anonymous_id           object
amp_id                 object
page_url               object
referrer               object
user_agent             object
ip_address             object
dtype: object

In [None]:
athena_full_events_df.head(5)

Unnamed: 0,sent_at_timestamp,sent_at,type,event_name,status,anonymous_id,amp_id,page_url,referrer,user_agent,ip_address
0,2020-01-16 21:18:02.975,2020-01-16T21:18:02.975Z,page,PageView,,83e9c90e-f3e2-4821-9d1f-fd75425d8a51,,https://blog.moneysmart.sg/travel/ktm-train-to...,https://www.google.com/,Mozilla/5.0 (Linux; Android 9; Mi 9T) AppleWeb...,
1,2020-01-16 21:18:07.356,2020-01-16T21:18:07.356Z,event,Reading,Article Body 25,80035ff8-fe6d-4d84-a4c5-1c31e8d4896b,,https://blog.moneysmart.sg/travel/travel-data-...,https://www.google.com/,Mozilla/5.0 (Linux; Android 9; SM-G975F) Apple...,
2,2020-01-16 21:18:06.192,2020-01-16T21:18:06.192Z,page,PageView,,80035ff8-fe6d-4d84-a4c5-1c31e8d4896b,,https://blog.moneysmart.sg/travel/travel-data-...,https://www.google.com/,Mozilla/5.0 (Linux; Android 9; SM-G975F) Apple...,
3,2020-01-16 21:18:06.636,2020-01-16T21:18:06.636Z,page,PageView,,9ddd8006-b19d-477d-a741-ab0d58851a52,,https://www-new.moneysmart.sg/credit-cards?pro...,https://www-new.moneysmart.sg/credit-cards/uob...,Mozilla/5.0 (iPhone; CPU iPhone OS 13_3 like M...,
4,2020-01-16 21:57:49.115,2020-01-16T21:57:49.115Z,page,PageView,,a9ceb7fc-744e-4e19-b82c-07870018908f,,https://www.moneysmart.tw/articles/%E6%96%B0%E...,https://tw-news-yahoo-com.cdn.ampproject.org/v...,Mozilla/5.0 (Linux; Android 8.0.0; ASUS_Z01KD)...,


# Segment Data

NB: screwed up, and can use the tracks table, rather than individual event tables, so a lot of this is pointless.

In [None]:
#from importlib import reload
#reload(data_pier_querying)

In [None]:
# Below there are some checks on what columns are available

segment_columns_to_query = [
    # "sent_at", - don't use this, use timestamp
    "timestamp",
    #"event", - going to get that implied from the table.
    # "status", # TODO: would like to have this, but not sure which column, or which tables.  Maybe just not used much, so only do for the 4 tables.
    "anonymous_id",
    "context_page_url",
    # "referrer", #maybe only used in pages table??
    "context_ip", 
    "context_user_agent"]

In [None]:
dp_querying = data_pier_querying.DataPierQuerying()
dp_querying.connect()

In [None]:
tables_df = dp_querying.query_to_dataframe("select * from information_schema.tables")

In [None]:
segment_event_tables_df = tables_df[tables_df.table_schema=="moneysmartsg_prod"]["table_name"]


In [None]:
# These are taken from the dictionary in https://docs.google.com/spreadsheets/d/1HICh77BoGMIat9K3NPwz3pBayJWiAr0ohAlTuv7dr80/edit#gid=1882048411
#but actually it turns out there should be more than this, and don't need to do it this way.
expected_events_str = """
LeadGeneration.ClickConversion
LeadGeneration.FormStepCompleted
LeadGeneration.FormSubmitted
LeadGeneration.PaymentCompleted
LeadGeneration.ThankYou
LeadGeneration.RedirectCompleted
UserEngagement.ShowedMoreDetails
UserEngagement.ViewedMoreDetails
UserEngagement.SortedList
UserEngagement.UsedHelpHints
UserEngagement.ClickedMenuItem
UserEngagement.QuestionAnswered
UserEngagement.ShowMoreFilter
UserEngagement.ShowMoreOptions
UserEngagement.ClickedFilter
UserEngagement.ButtonClick
UserAuth.LoggedIn
UserAuth.RegisteredAccount
UserAuth.LoggedOut
UserFeedback.ModalDisplayed
UserFeedback.MoodSubmitted
UserFeedback.FeedbackSubmitted
UserFeedback.MoreFeedback
ABTest.Conversion
UserView.WidgetLoad
EmailCapture
PageView
Sharing
Reading
NewsLetterPopup
"""
expected_events = [z.strip() for z in expected_events_str.split("\n") if len(z.strip())>0]

In [None]:


expected_events_and_segment_tables = []
special_maps = {
    "PageView": "pages"
}
for event in expected_events:
    if event in special_maps:
        new_event_name = special_maps[event]
    else:
        new_event_name = ""
        for i, c in enumerate(event):
            if i==0:new_event_name+=c.lower()
            elif str.isupper(c): 
                if i>0 and event[i-1]!=".":
                    new_event_name += "_"
                new_event_name += c.lower()
            elif c==".": new_event_name += "_"
            else: new_event_name+= c
    expected_events_and_segment_tables.append([event, new_event_name])

In [None]:
expected_events_and_segment_tables

[['LeadGeneration.ClickConversion', 'lead_generation_click_conversion'],
 ['LeadGeneration.FormStepCompleted', 'lead_generation_form_step_completed'],
 ['LeadGeneration.FormSubmitted', 'lead_generation_form_submitted'],
 ['LeadGeneration.PaymentCompleted', 'lead_generation_payment_completed'],
 ['LeadGeneration.ThankYou', 'lead_generation_thank_you'],
 ['LeadGeneration.RedirectCompleted', 'lead_generation_redirect_completed'],
 ['UserEngagement.ShowedMoreDetails', 'user_engagement_showed_more_details'],
 ['UserEngagement.ViewedMoreDetails', 'user_engagement_viewed_more_details'],
 ['UserEngagement.SortedList', 'user_engagement_sorted_list'],
 ['UserEngagement.UsedHelpHints', 'user_engagement_used_help_hints'],
 ['UserEngagement.ClickedMenuItem', 'user_engagement_clicked_menu_item'],
 ['UserEngagement.QuestionAnswered', 'user_engagement_question_answered'],
 ['UserEngagement.ShowMoreFilter', 'user_engagement_show_more_filter'],
 ['UserEngagement.ShowMoreOptions', 'user_engagement_show_m

### Check for missing tables

Expect some random events not to be in Segment, or blog specific ones that haven't been deployed to SG and HK

In [None]:
# Check all the event tables exist
expected_event_segment_tables = [z[1] for z in expected_events_and_segment_tables]
segment_table_names = segment_event_tables_df.to_list()
missing_event_tables = [z for z in expected_event_segment_tables if z not in segment_table_names]
missing_event_tables

['user_engagement_used_help_hints',
 'user_engagement_clicked_menu_item',
 'user_feedback_modal_displayed',
 'user_feedback_more_feedback',
 'a_b_test_conversion',
 'sharing',
 'news_letter_popup']

In [None]:
expected_events_and_segment_tables

[['LeadGeneration.ClickConversion', 'lead_generation_click_conversion'],
 ['LeadGeneration.FormStepCompleted', 'lead_generation_form_step_completed'],
 ['LeadGeneration.FormSubmitted', 'lead_generation_form_submitted'],
 ['LeadGeneration.PaymentCompleted', 'lead_generation_payment_completed'],
 ['LeadGeneration.ThankYou', 'lead_generation_thank_you'],
 ['LeadGeneration.RedirectCompleted', 'lead_generation_redirect_completed'],
 ['UserEngagement.ShowedMoreDetails', 'user_engagement_showed_more_details'],
 ['UserEngagement.ViewedMoreDetails', 'user_engagement_viewed_more_details'],
 ['UserEngagement.SortedList', 'user_engagement_sorted_list'],
 ['UserEngagement.UsedHelpHints', 'user_engagement_used_help_hints'],
 ['UserEngagement.ClickedMenuItem', 'user_engagement_clicked_menu_item'],
 ['UserEngagement.QuestionAnswered', 'user_engagement_question_answered'],
 ['UserEngagement.ShowMoreFilter', 'user_engagement_show_more_filter'],
 ['UserEngagement.ShowMoreOptions', 'user_engagement_show_m

In [None]:
# Removing the missing ones from the query list
events_and_tables_to_get_from_data_pier = [z for z in expected_events_and_segment_tables if z[1] not in missing_event_tables]

# Removing a problematic one (doesn't have context_page_url in it, and very unimportant
events_and_tables_to_get_from_data_pier = [z for z in events_and_tables_to_get_from_data_pier if z[1] not in ["user_auth_logged_out",]]

In [None]:
len(events_and_tables_to_get_from_data_pier)

22

In [None]:
cols = dp_querying.query_to_dataframe("""
select column_name, data_type, count(*) from information_schema.columns 
where 
table_name in  ('"""+"','".join([z[1] for z in events_and_tables_to_get_from_data_pier])+"""')
and table_schema='moneysmartsg_prod'

group by 1,2
""")

In [None]:
cols[cols["count"]>10].sort_values(["count"])

Unnamed: 0,column_name,data_type,count
287,page_referrer,text,11
352,user_id,text,13
27,context_campaign_content,text,14
42,context_campaign_term,text,14
33,context_campaign_medium,text,15
34,context_campaign_name,text,15
41,context_campaign_source,text,15
286,page_path,text,15
17,channel,text,16
60,context_locale,text,19


In [None]:
cols = dp_querying.query_to_dataframe("""
select  column_name, data_type, count(*) from information_schema.columns 
where 
 table_name in  ('"""+"','".join(["pages", "tracks"])+"""')
and table_schema='moneysmartsg_prod'
and column_name like '%%'
group by 1,2 order by count(*) desc
""")
cols

Unnamed: 0,column_name,data_type,count
0,context_campaign_term,text,2
1,context_campaign_medium,text,2
2,context_page_referrer,text,2
3,context_user_agent,text,2
4,context_page_search,text,2
5,context_page_title,text,2
6,context_campaign_name,text,2
7,context_page_url,text,2
8,id,character varying,2
9,context_ip,text,2


In [None]:
segment_date_constraint = " timestamp >= '%s' and timestamp < '%s' " % (from_datetime.isoformat(), to_datetime.isoformat())

In [None]:
dp_querying.query_to_dataframe("""SELECT
    nmsp_parent.nspname AS parent_schema,
    parent.relname      AS parent,
    nmsp_child.nspname  AS child_schema,
    child.relname       AS child
FROM pg_inherits
    JOIN pg_class parent            ON pg_inherits.inhparent = parent.oid
    JOIN pg_class child             ON pg_inherits.inhrelid   = child.oid
    JOIN pg_namespace nmsp_parent   ON nmsp_parent.oid  = parent.relnamespace
    JOIN pg_namespace nmsp_child    ON nmsp_child.oid   = child.relnamespace
WHERE parent.relname='%s';""")%"pages"

Unnamed: 0,parent_schema,parent,child_schema,child


In [None]:
pd.get_option("display.max_colwidth", 200)
indexes = dp_querying.query_to_dataframe("""SELECT
    indexname,
    indexdef
FROM
    pg_indexes
WHERE
    tablename = '%s';""" % "pages")

for a in indexes.values:
    print(a)

['pages_pkey'
 'CREATE UNIQUE INDEX pages_pkey ON moneysmarthk_prod.pages USING btree (id)']
['pages_pkey'
 'CREATE UNIQUE INDEX pages_pkey ON moneysmartsg_prod.pages USING btree (id)']
['pages_timestamp_idx'
 'CREATE INDEX pages_timestamp_idx ON moneysmartsg_prod.pages USING btree ("timestamp")']
['pages_pkey'
 'CREATE UNIQUE INDEX pages_pkey ON moneysmarthk_dev.pages USING btree (id)']
['pages_pkey'
 'CREATE UNIQUE INDEX pages_pkey ON moneysmartsg_dev.pages USING btree (id)']


In [None]:
query_segment_by_table = False #really shouldn't set this to true, didn't realise correct method.  Also need to add country stuff

segment_schemas = ["moneysmartsg_prod", "moneysmarthk_prod"]
# The meat of it
start_time = datetime.now()
event_name_to_rows = {}
if query_segment_by_table:
    for country_schema in segment_schemas:
        for i, (event_name, table_name) in enumerate(events_and_tables_to_get_from_data_pier):
            table_start_time = datetime.now()
            print("querying table %s / %s (%i/%i)" % (table_name, event_name, i+1, len(events_and_tables_to_get_from_data_pier)))
            query = "select {cols} from {schema}.{table} where {date_constraint}".format(cols=", ".join(segment_columns_to_query), 
                                                                           table=table_name,
                                                                           date_constraint =segment_date_constraint, schema=country_schema)

            events = dp_querying.query_to_dataframe(query)
            events["event_name"] = event_name #fills the entire column with the same value
            print("Got %i events"% len(events))
            event_name_to_rows[event_name]=events

            table_download_time = (datetime.now()-table_start_time).total_seconds()
            time_since_start = (datetime.now()-start_time).total_seconds()
            print("It took %.1f seconds to download from the table (%.1f seconds overall)" %(table_download_time, time_since_start))
            print()
            # if i>4:break


        # Merge tables
        segment_combined_df = pd.DataFrame()
        #combined_df = pd.DataFrame(columns=event_name_to_rows["LeadGeneration.ClickConversion"].columns)
        """for event_name, event_df in event_name_to_rows.items():
            print(len(event_df))
            combined_df.append(event_df, ignore_index=True)
            print(len(combined_df))
        #combined_df.astype({"event_name":"category"})
        """

        segment_combined_df = combined_df.append(list(event_name_to_rows.values()))
    
    
else:
    segment_columns_to_query_full = segment_columns_to_query + ["event_text",]
    tables_to_query = ["pages", "tracks"]
    all_event_dfs = []
    segment_combined_df = pd.DataFrame()
    for country_schema in segment_schemas:
        for table_name in tables_to_query:
            table_start_time = datetime.now()
            if table_name!="pages":
                cols_to_query = segment_columns_to_query_full
            else:
                cols_to_query = segment_columns_to_query
            print("querying table %s.%s" % (country_schema, table_name))
            print(cols_to_query)
            query = "select {cols} from {schema}.{table} where {date_constraint}".format(cols=", ".join(cols_to_query), 
                                                                           table=table_name,
                                                                           date_constraint =segment_date_constraint, schema=country_schema)

            events = dp_querying.query_to_dataframe(query)
            
            print("Got %i events"% len(events))
            #all_event_dfs.append(events)
            
            if table_name =="pages":
                events["event_text"] = "PageView" # fills the whole column
            table_download_time = (datetime.now()-table_start_time).total_seconds()
            time_since_start = (datetime.now()-start_time).total_seconds()
            print("merging")
            segment_combined_df = segment_combined_df.append(events)
            print("It took %.1f seconds to download from the table (%.1f seconds overall)" %(table_download_time, time_since_start))
            print()
            
        

querying table moneysmartsg_prod.pages
['timestamp', 'anonymous_id', 'context_page_url', 'context_ip', 'context_user_agent']


In [None]:
if not query_segment_by_table:
    segment_combined_df.rename(columns={"event_text":"event_name"}, inplace=True)

In [None]:
len(all_event_dfs)

In [None]:
if include_device_segmentation:
    segment_combined_df.rename(columns={"context_user_agent":"user_agent"}, inplace=True)

In [None]:
segment_combined_df.head()

In [None]:
segment_combined_df.rename(columns={"context_page_url":"page_url"}, inplace=True)
segment_combined_df.head(5)

# Merging Segment and Kinesis Events

In [None]:
# Make names clear e.g. s_...

# Check the timezone / timestamps match
# Athena raw stuff is in UTC, not SG time.  So 2020-01-19T00:04:04.443Z is 8:05am Singapore time.
# whereas Segment is stored with tiemzone at UTC.  So, could convert them all.
# TODO: But it does meant that there's a lot of events coming at the day boundary.

In [None]:
athena_full_events_df.head(2)

In [None]:
athena_full_events_df.dtypes

In [None]:
segment_combined_df.dtypes

In [None]:
# Group by columns to get around date inaccuracy issue
cols_to_group_by = ["anonymous_id", "event_name", "page_url", "date"] #, "context_ip", "context_user_agent"] #TODO: add IP address

print("Grouping by %s"% ", ".join(cols_to_group_by))

print("Fixing dates before grouping")
print("... for Segment")
segment_combined_df["date"] = segment_combined_df.apply(lambda row: row.timestamp.date().isoformat(), axis=1) # making this a string
print("... for athena")
athena_full_events_df["date"] = athena_full_events_df.apply(lambda row: row.sent_at[:10], axis=1)
# super-slow,so moving to using strings athena_full_events_df["date"] = athena_full_events_df.apply(lambda row: dateparser.parse(row.sent_at_timestamp).date(), axis=1)  #conversion from string might not be needed in the future; using dateparser as more robust, also slow

#going to reduce the number of columns to make it safer, then can go back and look for user agents etc (can do a mapping of anonymous_id to user_agent for instance.)




In [None]:
print("Setting sensible data types for the columns to group by")
data_type_mappings = {"event_name":"category", "date":"category"}
segment_combined_df = segment_combined_df.astype(data_type_mappings, copy=False)
athena_full_events_df = athena_full_events_df.astype(data_type_mappings, copy=False)

In [None]:
segment_combined_df.head()[cols_to_group_by]

In [None]:
athena_full_events_df.head()[cols_to_group_by]

In [None]:
# athena_full_events_df timestamp

print("Grouping by %s"%cols_to_group_by)
segment_grouped_df = segment_combined_df.groupby(cols_to_group_by).size().reset_index(name='s_count') #size preserves nulls, this sets the column to s_count

athena_grouped_df = athena_full_events_df.groupby(cols_to_group_by).size().reset_index(name='k_count')

# segment_combined_df.rename(columns = {"context_ip":"s_context_ip", "context_user_agent":"s_context_user_agent"}) 

In [None]:
athena_grouped_df.head()

In [None]:
# Actually join them

# set the column count names

merged_df = segment_grouped_df.merge(athena_grouped_df, how='outer', on=cols_to_group_by )

#Fill in the empty counts with 0s

merged_df["s_count"].fillna(0, inplace=True)
merged_df["k_count"].fillna(0, inplace=True)

In [None]:
merged_df.head(10)

In [None]:
merged_df.groupby(["date"]).count()

# Add Page Filtering Metadata

* is url blog / shop / ...
* country

In [None]:
from urllib.parse import urlparse, parse_qs

In [145]:
def get_metadata_from_url(url):
    p = urlparse(url.lower())
    
    #urlparse("https://www-new.moneysmart.sg/rabbit/mouse/?a=b")
    #ParseResult(scheme='https', netloc='www-new.moneysmart.sg', path='/rabbit/mouse/', params='', query='a=b', fragment='')
    
    
    nl = p.netloc
    
    page_type = ""
    stripped_path = p.path.strip("/")
    
    slug = "/"+stripped_path
    
    if slug.startswith("/en/") or slug.startswith("/zh-hk/"):
        slug_root = "/"+stripped_path.split("/")[1]
    elif slug=="/en" or slug=="/zh-hk":
        slug_root = "/"
    else:
        slug_root = "/"+stripped_path.split("/")[0]
    
    
    
    #blog (for SG and HK)
    if "moneysmart.tw" in nl or "moneysmart.ph" in nl or "moneysmart.id" in nl or 'blog.moneysmart' in nl or 'blog-new' in nl or 'blog3' in nl:
        page_type = "blog"
    
    #LPS
    elif stripped_path.endswith("ms"):
        page_type = "lps"
    
    #interstitial
    elif "iss.moneysmart" in nl or stripped_path.endswith("apply") or stripped_path.endswith("redirect"):
        page_type = "iss"
        
    #unbounce
    elif "get.moneysmart" in nl:
        page_type = "unbounce"
     
    
    #embed     , "regexp_extract"("context"."page_url", '^(([^:/?#]+):)?(//([^/?#]*))?([^?#]*)(\?([^#]*))?(#(.*))?', 5) like '/embed/%' as is_embed
    elif slug_root.startswith("/embed/"): #As it uses slug_root, I think will be safe with HK locales.
        page_type = "embed"
    
    #else shop
    else:
        page_type = "shop"
        
        
        
    
    #ab test side , CAST("strpos"("context"."page_url", '://www-new.') AS boolean) OR CAST("strpos"("context"."page_url", '://www3.') AS boolean)  OR CAST("strpos"("context"."page_url", '://blog3.') AS boolean) as "is_test"
    if "www-new." in nl or "www3." in nl or "blog3." in nl:
        ab_test = "test"
    else:
        ab_test = "control"
    

    
    
    
    
    """
     , CASE WHEN "regexp_extract"("context"."page_url", '^(([^:/?#]+):)?(//([^/?#]*))?([^?#]*)(\?([^#]*))?(#(.*))?', 4) LIKE '%moneysmart.sg%' THEN 'sg' 
        WHEN "regexp_extract"("context"."page_url", '^(([^:/?#]+):)?(//([^/?#]*))?([^?#]*)(\?([^#]*))?(#(.*))?', 4) LIKE '%moneysmart.hk%' THEN 'hk' 
        WHEN "regexp_extract"("context"."page_url", '^(([^:/?#]+):)?(//([^/?#]*))?([^?#]*)(\?([^#]*))?(#(.*))?', 4) LIKE '%moneysmart.id%' THEN 'id' 
        WHEN "regexp_extract"("context"."page_url", '^(([^:/?#]+):)?(//([^/?#]*))?([^?#]*)(\?([^#]*))?(#(.*))?', 4) LIKE '%moneysmart.ph%' THEN 'ph'
        WHEN "regexp_extract"("context"."page_url", '^(([^:/?#]+):)?(//([^/?#]*))?([^?#]*)(\?([^#]*))?(#(.*))?', 4) LIKE '%moneysmart.tw%' THEN 'tw' 
        ELSE null END as country_code
    """
    
    country_code = ""
    if "moneysmart.sg" in nl:
        country_code = "sg"
    elif "moneysmart.hk" in nl:
        country_code = "hk"
    elif "moneysmart.id" in nl:
        country_code = "id"
    elif "moneysmart.ph" in nl:
        country_code = "ph"
    elif "moneysmart.tw" in nl:
        country_code = "tw"
    elif "moneysmart.com" in nl:
        country_code = "ww" #worldwide
    else:
        country_code = "??"
    
    
    #return {"page_type":page_type, "path":path, "ab_test":ab_test, "country_code":country_code}
    return [page_type, slug, slug_root, ab_test, country_code]
    
    



In [142]:
# Do some tests to show that it's kind of working (bad version of a unit test!)

In [143]:
get_metadata_from_url("https://www-new.moneysmart.sg/rabbit/headlight/?scary=True")

['shop', '/rabbit/headlight', '/rabbit', 'test', 'sg']

In [None]:
get_metadata_from_url("https://blog.moneysmart.ph/rabbit/headlight/?scary=True")

In [None]:
get_metadata_from_url("https://blog3.moneysmart.tw")

In [None]:
get_metadata_from_url("https://www.moneysmart.hk/zh-hk/credit-cards/")

In [None]:
start_time = datetime.now()
print("starting at %s"%start_time.isoformat())
#This is a bit slow (consider at looking how to optimise, especially memory usage from creating loads of series objects
#Could probably optimise by splitting all the urls using a pandas function, then joining with a map to get page_type, path etc, but ymmv
metadata_df = merged_df.apply(lambda x: pd.Series(get_metadata_from_url(x.page_url)), axis=1)#, index=["page_type", "path", "ab_test", "country_code"])
end_time = datetime.now()
time_taken = (end_time-start_time).total_seconds()
print("Took %i seconds"%time_taken)

In [None]:
metadata_df.rename(columns={0:"page_type", 1:"slug", 2:"slug_root", 3:"ab_test", 4:"country_code"}, inplace=True)

In [144]:
metadata_df.head()

Unnamed: 0,page_type,slug,slug_root,ab_test,country_code
0,blog,/career/5-easy-side-businesses-you-can-run-whi...,/career,control,sg
1,shop,/embed/98e61305602380971d9c5e68c4a75647,/embed,control,sg
2,blog,/career/5-easy-side-businesses-you-can-run-whi...,/career,control,sg
3,shop,/embed/98e61305602380971d9c5e68c4a75647,/embed,control,sg
4,shop,/zh-hk/credit-cards,/credit-cards,control,hk


In [None]:
merged_df_with_meta = pd.concat([merged_df, metadata_df], axis=1)

In [None]:
# Set some sensible data types to speed it all up
#merged_df_with_meta.astype({"page_type":"category", "slug":"category"})
merged_df_with_meta = merged_df_with_meta.astype({"page_type":"category", "slug":"category", "ab_test":"category", "country_code":"category", "s_count":"int", "k_count":"int"})

In [None]:
merged_df_with_meta.head()

In [None]:
merged_df_with_meta[(merged_df_with_meta.s_count>1) & (merged_df_with_meta.k_count>1)].head()

# Add Device Type Metadata

In [None]:
segment_combined_df.head()

In [None]:
athena_full_events_df.head()

### Segment

In [None]:
segment_combined_df.head()

In [None]:
group_by_cols = ["anonymous_id", "user_agent"]
segment_anonymous_id_to_user_agent_full_df = segment_combined_df.groupby(group_by_cols).count()
print("%i anonymous_id to user_agents found" % len(segment_anonymous_id_to_user_agent_full_df))

In [None]:
segment_anonymous_id_to_user_agent_full_df = segment_anonymous_id_to_user_agent_full_df.reset_index()
segment_anonymous_id_to_user_agent_full_df.rename({"0":"count"}, inplace=True)
segment_anonymous_id_to_user_agent_full_df.head()

In [None]:
# check for duplicates
sd = segment_anonymous_id_to_user_agent_full_df.groupby(["anonymous_id"]).size() #[["sent_at"]]
sd = sd.reset_index()
duplicates = sd[sd[0]>1]
print("%i / %i anonymous_ids with different user agent strings.  Expect there to be some due to browser upgrades" % (len(duplicates), len(sd)))

In [None]:
sd.head()

In [None]:
segment_anonymous_id_to_user_agent_df = segment_anonymous_id_to_user_agent_full_df[["anonymous_id", "user_agent"]] # .set_index("anonymous_id")

#make a bit safer by stripping the strings
#segment_anonymous_id_to_user_agent_df["user_agent"] = segment_anonymous_id_to_user_agent_df["user_agent"].str.strip()
#segment_anonymous_id_to_user_agent_df["anonymous_id"] = segment_anonymous_id_to_user_agent_df["anonymous_id"].str.strip()

In [None]:
segment_anonymous_id_to_user_agent_df = segment_anonymous_id_to_user_agent_df.rename(columns={"user_agent": "s_user_agent"})
segment_anonymous_id_to_user_agent_df.head()

In [None]:
# Remove duplicates, so anonymous_id column is unique (otherwise on joins you'll expand the dataset)
segment_anonymous_id_to_user_agent_dedup_df = segment_anonymous_id_to_user_agent_df.groupby("anonymous_id").first().reset_index()
print("Before de-duplication %i, after %i"%(len(segment_anonymous_id_to_user_agent_df), len(segment_anonymous_id_to_user_agent_dedup_df)))
segment_anonymous_id_to_user_agent_dedup_df.head()

### Athena / Kinesis

In [None]:
group_by_cols = ["anonymous_id", "user_agent"]
athena_anonymous_id_to_user_agent_full_df = athena_full_events_df.groupby(group_by_cols).size()
print("%i anonymous_id to user_agents found" % len(athena_anonymous_id_to_user_agent_full_df))

In [None]:
athena_anonymous_id_to_user_agent_full_df = athena_anonymous_id_to_user_agent_full_df.reset_index()
athena_anonymous_id_to_user_agent_full_df.rename({"0":"count"}, inplace=True)
athena_anonymous_id_to_user_agent_full_df.head()

In [None]:
# check for duplicates
ad = athena_anonymous_id_to_user_agent_full_df.groupby(["anonymous_id"]).size() #[["sent_at"]]
ad = ad.reset_index()
duplicates = ad[ad[0]>1]
print("%i / %i anonymous_ids with different user agent strings.  Expect there to be some due to browser upgrades" % (len(duplicates), len(ad)))

In [None]:
# explore if issue
#df = ad[ad[0]>1].merge(athena_anonymous_id_to_user_agent_full_df, how="inner")
#df.sort_values("anonymous_id")

In [None]:
#df = athena_anonymous_id_to_user_agent_full_df[athena_anonymous_id_to_user_agent_full_df.anonymous_id=="f4a0d91c-b118-40ce-890c-9142bce9f152"]
#pd.set_option('max_colwidth', 200)
#print(df.values[0][1])
#print(df.values[1][1])

In [None]:
#athena_anonymous_id_to_user_agent_full_df.head()
athena_anonymous_id_to_user_agent_df = athena_anonymous_id_to_user_agent_full_df[["anonymous_id", "user_agent"]]


#make a bit safer by stripping the strings #couldn't get this to work without warning easily, so skipping.
#athena_anonymous_id_to_user_agent_df.loc[:,1] = athena_anonymous_id_to_user_agent_df["user_agent"].str.strip()
#athena_anonymous_id_to_user_agent_df.loc[:,0] = athena_anonymous_id_to_user_agent_df["anonymous_id"].str.strip()

#?athena_anonymous_id_to_user_agent_df["user_agent"].str.strip()

In [None]:
athena_anonymous_id_to_user_agent_df = athena_anonymous_id_to_user_agent_df.rename(columns={"user_agent": "a_user_agent"})
athena_anonymous_id_to_user_agent_df.head()


In [None]:
# Remove duplicates, so anonymous_id column is unique (otherwise on joins you'll expand the dataset)
athena_anonymous_id_to_user_agent_dedup_df = athena_anonymous_id_to_user_agent_df.groupby("anonymous_id").first().reset_index()
print("Before de-duplication %i, after %i"%(len(athena_anonymous_id_to_user_agent_df), len(athena_anonymous_id_to_user_agent_dedup_df)))
athena_anonymous_id_to_user_agent_dedup_df.head()

### Joined up for all anonymous_ids

In [None]:
athena_anonymous_id_to_user_agent_dedup_df.set_index("anonymous_id", inplace=True)
segment_anonymous_id_to_user_agent_dedup_df.set_index("anonymous_id", inplace=True)




In [None]:
athena_anonymous_id_to_user_agent_dedup_df.head(2)

In [None]:
segment_anonymous_id_to_user_agent_dedup_df.head(2)

In [None]:
combined_anonymous_id_to_user_agent_df = athena_anonymous_id_to_user_agent_dedup_df.merge(segment_anonymous_id_to_user_agent_dedup_df, how="outer", left_index=True, right_index=True)


In [146]:
combined_anonymous_id_to_user_agent_df.head(1)

Unnamed: 0_level_0,a_user_agent,s_user_agent
anonymous_id,Unnamed: 1_level_1,Unnamed: 2_level_1
00000b54-600a-4de2-8700-fd9885252dca,Mozilla/5.0 (iPhone; CPU iPhone OS 13_3 like M...,Mozilla/5.0 (iPhone; CPU iPhone OS 13_3 like M...


### Check if Segment and Kinesis disagree at all

In [None]:
print("%i segment anonymous_ids" % len(segment_anonymous_id_to_user_agent_df))
print("%i athena anonymous_ids" % len(athena_anonymous_id_to_user_agent_df))

In [None]:
# combined_anonymous_id_to_user_agent_df[(combined_anonymous_id_to_user_agent_df.a_user_agent.isnull())]

In [None]:
s_not_a = combined_anonymous_id_to_user_agent_df[(combined_anonymous_id_to_user_agent_df.s_user_agent.str.len()>0) \
                                                 & ((combined_anonymous_id_to_user_agent_df.a_user_agent.isnull()) |(combined_anonymous_id_to_user_agent_df.a_user_agent.str.len()==0))]
a_not_s = combined_anonymous_id_to_user_agent_df[(combined_anonymous_id_to_user_agent_df.a_user_agent.str.len()>0) \
                                                 & ((combined_anonymous_id_to_user_agent_df.s_user_agent.isnull()) |(combined_anonymous_id_to_user_agent_df.s_user_agent.str.len()==0))]

In [None]:
s_not_a.head()

In [None]:
a_not_s.head()

In [None]:
total_count = len(combined_anonymous_id_to_user_agent_df)
s_not_a_count = len(s_not_a)
a_not_s_count = len(a_not_s)
print("%i / %i are in segment, not athena (%.1f percent )" % (s_not_a_count, total_count, s_not_a_count / total_count *100))
print("%i / %i are in athena, not segement (%.1f percent)" % (a_not_s_count, total_count, a_not_s_count / total_count *100))
print("If you include countries that aren't on Segment i.e. ID, PH, TW, then you'd expect more from athena")

### Get an idea of how many don't have matching user_agents

In [None]:
df = combined_anonymous_id_to_user_agent_df.groupby("anonymous_id").size().reset_index()
duplicates = df[df[0]>1]
print("%i duplicate anonymous_ids - should be none at this stage" % len(duplicates))

In [None]:
non_matching_excl_nulls = combined_anonymous_id_to_user_agent_df[(combined_anonymous_id_to_user_agent_df.s_user_agent != combined_anonymous_id_to_user_agent_df.a_user_agent) \
                                                                 & ~combined_anonymous_id_to_user_agent_df.s_user_agent.isnull() \
                                                                 & ~combined_anonymous_id_to_user_agent_df.a_user_agent.isnull()]
print("%i User agent strings don't match" % len(non_matching_excl_nulls))
print("Look for changes in browser version for instance.  Don't worry about every last one.")
non_matching_excl_nulls.head()

### Create a Single user agent string per anonymous_id

In [None]:
combined_anonymous_id_to_user_agent_single_col_df = combined_anonymous_id_to_user_agent_df["a_user_agent"]\
        .fillna(combined_anonymous_id_to_user_agent_df["s_user_agent"]).reset_index().set_index("anonymous_id")
combined_anonymous_id_to_user_agent_single_col_df.rename(columns={"a_user_agent":"user_agent"}, inplace=True)
combined_anonymous_id_to_user_agent_single_col_df.head()

In [None]:
# This bit is for development where I keep appending the user_agent column and it generates user_agent_x etc
user_agent_cols_to_delete = [z for z in merged_df_with_meta.columns if z.startswith("user_agent")]
print(" Removing %s "%str(user_agent_cols_to_delete))
merged_df_with_meta.drop(columns=user_agent_cols_to_delete, inplace=True)

### Useful segmentation / convert user agent to browser etc

In [None]:
def convert_user_agent_to_useful_strings(user_agent_string):
    """
    Sort of matches to https://github.com/moneysmartco/metl/blob/e13086fae453911bed5a40cb51ff0869e2f3a0ce/scripts/python/device_tagger.py
    """
    user_agent = user_agents.parse(user_agent_string)
    
    device_family = ""
    
    if user_agent.is_pc:
        device_family = 'desktop'
    elif user_agent.is_mobile:
        device_family = 'mobile'
    elif user_agent.is_tablet:
        device_family = 'tablet'
    else:
        device_family = 'other'
        
    
    os_family = user_agent.os.family
    os_version = user_agent.os.version_string
    browser_family = user_agent.browser.family 
    browser_version = user_agent.browser.version_string
    
    is_bot = user_agent.is_bot
    
    return [device_family, os_family, os_version, browser_family, browser_version, is_bot]
    



There's an important optimisation going on here (which still isn't that quick).

If you just do .apply across all the rows, then it's super slow (many minutes e.g. 278s vs 24s for my better version).  I tried the optimisation at https://ys-l.github.io/posts/2015/08/28/how-not-to-use-pandas-apply/, but that didn't seem to provide benefit (or I slowed it down in other ways).

So I'm taking the unique user_agents, processing them and then doing a join, without creating Series objects as well.

There's probably more improvement do-able (e.g. creating the full data structure to insert into up front / generating fewer arrays, but it's fast enough for me right now.

In [None]:
distinct_user_agents = combined_anonymous_id_to_user_agent_single_col_df.user_agent.unique()

In [None]:
distinct_user_agents[:10]

In [None]:
len(distinct_user_agents)

In [None]:
# This isn't fast, but acceptable
start_time = datetime.now()
print("Starting to add user agent data at %s"% start_time.isoformat())
#meta_df = combined_anonymous_id_to_user_agent_single_col_df.apply(lambda x: pd.Series(convert_user_agent_to_useful_strings(x.user_agent)), axis=1)
meta_rows = [[z, ]+convert_user_agent_to_useful_strings(z)  for z in distinct_user_agents]
#d = dfcombined_anonymous_id_to_user_agent_single_col_df.merge(meta_df)
end_time = datetime.now()
seconds_taken = (end_time - start_time).total_seconds()
print("Took %i seconds to process" % seconds_taken)

In [None]:
user_agent_meta_df = pd.DataFrame(meta_rows)

user_agent_meta_df.rename(columns = {0:"user_agent", 1:"device_family", 2:"os_family", 3:"os_version", 4:"browser_family",5:"browser_version", 6:"is_bot"}, inplace=True)
user_agent_meta_df.set_index("user_agent", inplace=True)


In [None]:
# Try to make the data types a bit efficient
user_agent_meta_df = user_agent_meta_df.astype({ "device_family":"category", "os_family":"category", "os_version":"category", "browser_family":"category","browser_version":"category","is_bot":"bool"})
user_agent_meta_df.dtypes

In [None]:
user_agent_meta_df.head()

In [None]:
if False:# This is super slow currently.
    start_time = datetime.now()
    print("Starting to add user agent data at %s"% start_time.isoformat())
    #meta_df = combined_anonymous_id_to_user_agent_single_col_df.apply(lambda x: pd.Series(convert_user_agent_to_useful_strings(x.user_agent)), axis=1)
    meta_df = combined_anonymous_id_to_user_agent_single_col_df.apply(lambda x: convert_user_agent_to_useful_strings(x.user_agent), axis=1, result_type="expand")
    #d = dfcombined_anonymous_id_to_user_agent_single_col_df.merge(meta_df)
    end_time = datetime.now()
    seconds_taken = (end_time - start_time).total_seconds()
    print("Took %i seconds to process" % seconds_taken)

In [None]:
if False:
    # Trying something faster - based on https://ys-l.github.io/posts/2015/08/28/how-not-to-use-pandas-apply/, but hasn't worked so far
    # but hasn't worked, still (after tidying) it takes 257s, slower than the original.
    start_time = datetime.now()
    print("Starting to add user agent data at %s"% start_time.isoformat())
    new_cols = [[]]*6 # make some empty arrays
    num_new_cols = len(new_cols)
    #for row_num, (_, row) in enumerate(combined_anonymous_id_to_user_agent_single_col_df.iterrows()):
    for _, row in combined_anonymous_id_to_user_agent_single_col_df.iterrows():
        #if row_num % 100000==0:
        #    print("row %i"%row_num)
        vals = convert_user_agent_to_useful_strings(row.user_agent)
        #for i in range(len(vals)):
            #new_cols[i].append(vals[i])
        new_cols[0].append(vals[0])
        new_cols[1].append(vals[1])
        new_cols[2].append(vals[2])
        new_cols[3].append(vals[3])
        new_cols[4].append(vals[4])
        new_cols[5].append(vals[5])
        

    print("New cols generated at %s"% start_time.isoformat())
    # meta_df = combined_anonymous_id_to_user_agent_single_col_df.apply(lambda x: convert_user_agent_to_useful_strings(x.user_agent), axis=1, result_type="expand")
    #d = dfcombined_anonymous_id_to_user_agent_single_col_df.merge(meta_df)
    meta_df = pd.DataFrame({
        "device_family": new_cols[0], 
         "os_family" : new_cols[1], 
         "os_version" : new_cols[2], 
         "browser_family" : new_cols[3], 
         "browser_version":new_cols[4], 
         "is_bot":new_cols[5] 


    })
    print("Additional data frame generated at %s"% start_time.isoformat())
    end_time = datetime.now()
    seconds_taken = (end_time - start_time).total_seconds()
    print("Took %i seconds to process" % seconds_taken)

### Join onto the main dataframe 

In [None]:
merged_df_with_meta = merged_df_with_meta.merge(combined_anonymous_id_to_user_agent_single_col_df, on="anonymous_id", how="left")

In [None]:
merged_df_with_meta.head(2)

In [None]:
# add on the user agent breakdown

merged_df_with_meta = merged_df_with_meta.merge(user_agent_meta_df, on="user_agent", how="left")

In [None]:
merged_df_with_meta.head()

In [None]:
#Check it's set them all
merged_df_with_meta[merged_df_with_meta.user_agent.isnull()].head()

In [None]:
#Check it's set them all
merged_df_with_meta[merged_df_with_meta.device_family.isnull()].head()

### Clean up data frames / save some memory

In [None]:
# TODO: could do a lot more here
segment_anonymous_id_to_user_agent_full_df = None
segment_anonymous_id_to_user_agent_df = None
athena_anonymous_id_to_user_agent_full_df = None
athena_anonymous_id_to_user_agent_df = None
sd = None
ad = None

# Play Area

In [None]:
d = merged_df_with_meta[merged_df_with_meta.page_type=="iss"].groupby(["slug", "page_type"]).sum()
d[d.s_count>0]
merged_df_with_meta[(merged_df_with_meta.page_type=="iss") & (merged_df_with_meta.page_url.str.contains("iss."))]

In [None]:
merged_df_with_meta[(merged_df_with_meta.slug_root=="/zh-hk") & (merged_df_with_meta.country_code=="hk") & (merged_df_with_meta.page_type!="blog")].head(40) #.groupby(["slug"]).sum()

# Store Data Frame for Faster Loading etc.

When stored as a zipped parquet, it's actually very small 3 days -> 30MB.

In [None]:
#!pip install fastparquet

In [149]:
if save_end_dataframe_to_file:
    from_to_str = "_to_".join([z.strftime("%Y%m%d_%H%M") for z in [from_datetime, to_datetime]])
    parquet_filename = "merged_df_with_meta_"+from_to_str+".gzip"
    
    merged_df_with_meta.to_parquet(parquet_filename, compression='gzip')

    

In [None]:
>> look into AB test stuff more.  I think the urls are different segment vs kinesis (but I think we've found the origin and might have been fixed / non-issue)
                                                                                     