# Baseline Dataset

- This dataset is used to implement the baseline from the paper "Personalizing Session based Recommendations with Hierarchical RNNs" -> resources/papers/personalizing_session_based_rec.pdf
- This dataset is generated from the OnlineShopTrafficTracking Table in BigQuery
- Clean out bots using: https://github.com/monperrus/crawler-user-agents/blob/master/crawler-user-agents.json

In [1]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

TESTMODE = True
import warnings
warnings.filterwarnings(action='ignore', category=UserWarning)
warnings.filterwarnings(action='ignore', category=ResourceWarning)
from google.cloud import bigquery
from google.cloud import storage
import pandas as pd
import time
from io import StringIO
from dg_ml_core.datastores import gcs_utils
from dg_ml_core.file import get_file_handle, get_paths_with_prefix, save_to_file, file_exists, copy_file
from dg_ml_core.collections import dict_ops
from dg_ml_core.datastores import gcs_utils
import requests
import json
import csv
import random
import pprint

In [6]:
def get_bots_list():
    url = 'https://raw.githubusercontent.com/monperrus/crawler-user-agents/master/crawler-user-agents.json'
    response = requests.get(url)
    data = json.loads(response.content)
    all_instances = [item for sublist in map(lambda x: x['instances'], data) for item in sublist]
    return all_instances

def clean_dataset(source, target):
    col_types = {"ProductId": int, 
             "UserId": int, 
             "UserAgent": str, 
             "LastLoggedInUserId": int, 
             "SessionId": int, 
             "Timestamp": int}
    
    df = pd.read_csv(source).fillna(-1).astype(col_types)
    bots = get_bots_list()
    df = df[~df.UserAgent.isin(bots)]
    
    no_user_id_mask = df.UserId == -1
    df.loc[no_user_id_mask, 'UserId'] = df.loc[no_user_id_mask, 'LastLoggedInUserId']
    
    df.to_csv(target, index=False, columns=['UserId', 'ProductId', 'SessionId', 'Timestamp'])

    return target

def merge_sessions(reader):
    sessions_by_user = dict()
    for row in reader:
        if row['UserId'] not in sessions_by_user:
            sessions_by_user[row['UserId']] = dict()
        if row['SessionId'] not in sessions_by_user[row['UserId']]:
            sessions_by_user[row['UserId']][row['SessionId']] = dict()
            sessions_by_user[row['UserId']][row['SessionId']]['Events'] = []
        
        sessions_by_user[row['UserId']][row['SessionId']]['Events'].append(
            {
                "ProductId": int(row['ProductId']),
                "Timestamp": int(row['Timestamp'])
            })
        
        first_event_ts = min(map(lambda x: int(x['Timestamp']), sessions_by_user[row['UserId']][row['SessionId']]['Events']))
        sessions_by_user[row['UserId']][row['SessionId']]['StartTime'] = first_event_ts
    return sessions_by_user

def generate_sessions_by_user(shard, merged_shards_prefix):
    
    gcs_client = gcs_utils.get_client('machinelearning-prod', None)
    
    path = ''
    new_path = ''
    output_dict = dict()
    
    for user_id in shard:
        if int(user_id) > 0:
            new_path = merged_shards_prefix + str(int(user_id) % 100) + '.json' # Add a datestamp hierarchy

            if new_path != path:
                if path != '':
                    dict_ops.save_dict(path, output_dict, gcs_client=gcs_client)
                path = new_path

                if file_exists(path):
                    output_dict = dict_ops.load_dict(path, gcs_client=gcs_client)
                else:
                    output_dict = dict()

            for session_id in shard[user_id]:
                if user_id not in output_dict:
                    output_dict[user_id] = dict()

                if session_id not in output_dict[user_id]:
                    output_dict[user_id][session_id] = shard[user_id][session_id]

                else:
                    merged_events = output_dict[user_id][session_id]['Events'] + shard[user_id][session_id]['Events']
                    merged_events_str = map(lambda x: json.dumps(x), merged_events)
                    unique_events_str = set(merged_events_str)
                    unique_events = list(map(lambda x: json.loads(x), unique_events_str))
                    output_dict[user_id][session_id]['Events'] = unique_events
                    output_dict[user_id][session_id]['StartTime'] = min(map(lambda x: int(x['Timestamp']), unique_events))
    
    dict_ops.save_dict(new_path, output_dict, gcs_client=gcs_client)

def user_iterator(sessions_by_user_prefix):
    paths = get_paths_with_prefix(sessions_by_user_prefix)
    for path in paths:
        merged_shard = dict_ops.load_dict(path)
        user_ids = list(merged_shard.keys())
        random.shuffle(user_ids)
        for user_id in user_ids:
            yield user_id, merged_shard[user_id]

def event_iterator(user_sessions, min_events_per_session):
    sorted_sessions = sorted(map(lambda x: user_sessions[x], user_sessions.keys()), key=lambda y: y['StartTime'])
    for sorted_session in sorted_sessions:
        if len(sorted_session['Events']) < min_events_per_session:
            continue
            
        sorted_events = sorted(sorted_session['Events'], key=lambda z: z['Timestamp'])
        for event in sorted_events:
            yield event['ProductId']
        
        yield '<EOS>'
        
def get_next_event_or_none(active_user):
    try:
        return next(active_user['Events'])
    except StopIteration:
        return None
    
def get_next_user_or_none(users, min_events_per_session):
    try:
        user_id, user_sessions = next(users)
        return {
            'UserId': int(user_id),
            'Events': event_iterator(user_sessions, min_events_per_session)
        }
    except StopIteration:
        return None
        
def user_parallel_batch_iterator(batch_size, sessions_by_user_prefix, min_events_per_session):

        active_users = dict()
        users = user_iterator(sessions_by_user_prefix)
    
        data = [[]]*batch_size

        # Initial fill of users
        for i in range(batch_size):
            active_users[i] = get_next_user_or_none(users, min_events_per_session)
        
        while True:
            next_batch = dict()
            for idx in active_users:
                if active_users[idx] is None:
                    next_batch[idx] = ('<EOU>', '<EOS>')
                    continue
                next_event = get_next_event_or_none(active_users[idx])
                while next_event is None:
                    next_user = get_next_user_or_none(users, min_events_per_session)
                    if next_user is None:
                        print('There are no more new users')
                        active_users[idx] = None
                        break
                    else:
                        active_users[idx] = next_user
                        next_event = get_next_event_or_none(active_users[idx])
                else:
                    next_batch[idx] = (active_users[idx]['UserId'], next_event)
            if len(set(next_batch.values())) == 1:
                return
            yield list(next_batch.values())

## Execute Query in BQ

- Here we extract the relevant features out of the large collection of visits

In [29]:
query = """
SELECT (SELECT Value FROM UNNEST(ActionParameters) WHERE Key = 'id') as ProductId, LastLoggedInUserId, UserId, SessionId, UserAgent, Timestamp
FROM `dg-prod-personalization.PersonalizationDataV2.OnlineShopTrafficTracking` 
WHERE LOWER(ControllerName) = 'product' AND LOWER(ActionName) = 'show' AND UserId > 0
"""

if TESTMODE:
    query += ' AND _PARTITIONTIME = TIMESTAMP("2019-02-11")'
    
print('Executing query {}. \nYou have 5 seconds to cancel...'.format(query))
time.sleep(5)

client = bigquery.Client()
dataset_ref = client.dataset('MAMuy', project='machinelearning-prod')
table_ref = dataset_ref.table('baseline_dataset')

job_config = bigquery.job.QueryJobConfig(
    allow_large_results=True, 
    destination=table_ref,
    write_disposition=bigquery.job.WriteDisposition.WRITE_TRUNCATE)

query_job = client.query(query, job_config=job_config, job_id_prefix='baseline_dataset_query_', location='EU')
print('Running Job {}'.format(query_job.job_id))
query_job.result()

print('Query execution done')

Executing query 
SELECT (SELECT Value FROM UNNEST(ActionParameters) WHERE Key = 'id') as ProductId, LastLoggedInUserId, UserId, SessionId, UserAgent, Timestamp
FROM `dg-prod-personalization.PersonalizationDataV2.OnlineShopTrafficTracking` 
WHERE LOWER(ControllerName) = 'product' AND LOWER(ActionName) = 'show'
 AND _PARTITIONTIME = TIMESTAMP("2019-02-11"). 
You have 5 seconds to cancel...
Running Job baseline_dataset_query_46a37433-e183-435b-aa6b-ca40170d95ab


<google.cloud.bigquery.table.RowIterator at 0x7fe32584ee48>

Query execution done


## Extract to GCS

- Extract the table containing the relevant features to GCS

In [30]:
destination_uri = 'gs://ma-muy/baseline_dataset/raw/*.csv'

client = bigquery.Client()
dataset_ref = client.dataset('MAMuy', project='machinelearning-prod')
table_ref = dataset_ref.table('baseline_dataset')

extract_job = client.extract_table(
    table_ref,
    destination_uri,
    location='EU',
    job_id_prefix='baseline_dataset_extract_')

print('Running Job {}'.format(extract_job.job_id))
extract_job.result()
print('Extraction done')

Running Job baseline_dataset_extract_78ed9046-82b1-4de6-9707-8cbcec1225e0


<google.cloud.bigquery.job.ExtractJob at 0x7fe3268db2b0>

Extraction done


## Clean data

- Here we clean the data.
- Specifically there are two steps:
  - Clean out bot visits
  - Merge LastLoggedInUserId and UserId

In [6]:
if TESTMODE:
    print('Processing example.csv')
    df = clean_dataset('example.csv', 'example_clean.csv')
    
else:
    raw_data_prefix = 'gs://ma-muy/baseline_dataset/raw/'
    cleaned_data_prefix = 'gs://ma-muy/baseline_dataset/clean/'
        
    raw_paths = get_paths_with_prefix(raw_data_prefix)
    
    for raw_path in raw_paths:
        clean_path = cleaned_data_prefix + gcs_utils.get_file_name(raw_path)

        print('Downloading {}'.format(raw_path))
        source = get_file_handle(raw_path)
        target = StringIO()
        
        target = clean_dataset(source, target)
        
        print('Uploading {}'.format(clean_path))
        save_to_file(clean_path, target.getvalue())

Downloading gs://ma-muy/baseline_dataset/raw/000000000000.csv
Uploading gs://ma-muy/baseline_dataset/clean/000000000000.csv
Downloading gs://ma-muy/baseline_dataset/raw/000000000001.csv
Uploading gs://ma-muy/baseline_dataset/clean/000000000001.csv


## Merge Sessions

- In this step we will merge all the single visit events into sessions
- Further we merge all sessions to the specific user

In [7]:
if TESTMODE:
    reader = csv.DictReader(open('example_clean.csv'))
    
    sessions_by_user = merge_sessions(reader)
    
    dict_ops.save_dict('example_merged.json', sessions_by_user)

else:
    cleaned_data_prefix = 'gs://ma-muy/baseline_dataset/clean/'
    merged_data_prefix = 'gs://ma-muy/baseline_dataset/merged/'
    
    clean_paths = get_paths_with_prefix(cleaned_data_prefix)
    for clean_path in clean_paths:
        merged_path = (merged_data_prefix + gcs_utils.get_file_name(clean_path)).replace('csv', 'json')
        
        print('Downloading {}'.format(clean_path))
        source = get_file_handle(clean_path)
        reader = csv.DictReader(source)
        
        sessions_by_user = merge_sessions(reader)
        
        print('Uploading {}'.format(merged_path))
        dict_ops.save_dict(merged_path, sessions_by_user)

Downloading gs://ma-muy/baseline_dataset/clean/000000000000.csv
Uploading gs://ma-muy/baseline_dataset/merged/000000000000.json
Downloading gs://ma-muy/baseline_dataset/clean/000000000001.csv
Uploading gs://ma-muy/baseline_dataset/merged/000000000001.json


## Merge shards

- As of now we have several shards, containing the sessions aggregated to the user level.
- The merging of the shards is the most time consuming part of the data generation process. 
- We need to merge all sessions of a specific user into one datastructure.
- In production we will be dealing with daily shards, which makes the generation of the dataset easier
- However in this case we will be dealing with full exports, therefore we cannot assume that a shard is from one day. 

In [7]:
TESTMODE=False
if TESTMODE:
    shard = json.load(open('example_merged.json'))
    generate_sessions_by_user(shard, 'sessions_by_user/')
else:
    merged_data_prefix = 'gs://ma-muy/baseline_dataset/merged/'
    sessions_by_user_prefix = 'gs://ma-muy/baseline_dataset/sessions_by_user/'
    temp_sessions_by_user_prefix = 'temp_sessions_by_user/'
    
    merged_paths = get_paths_with_prefix(merged_data_prefix)
    for merged_path in merged_paths:
        
        print('Downloading {}'.format(merged_path))
        source = dict_ops.load_dict(merged_path)
        
        generate_sessions_by_user(source, temp_sessions_by_user_prefix)
    
    temp_files = get_paths_with_prefix(temp_sessions_by_user_prefix)
    for temp_file in temp_files:
        file_name = temp_file.rsplit('/', 1)[1]
        target_uri = sessions_by_user_prefix + file_name
        print('Uploading {} to {}'.format(temp_file, target_uri))
        copy_file(temp_file, target_uri)

Downloading gs://ma-muy/baseline_dataset/merged/000000000000.json
Downloading gs://ma-muy/baseline_dataset/merged/000000000001.json
Uploading temp_sessions_by_user/75.json to gs://ma-muy/baseline_dataset/sessions_by_user/75.json
Uploading temp_sessions_by_user/46.json to gs://ma-muy/baseline_dataset/sessions_by_user/46.json
Uploading temp_sessions_by_user/83.json to gs://ma-muy/baseline_dataset/sessions_by_user/83.json
Uploading temp_sessions_by_user/71.json to gs://ma-muy/baseline_dataset/sessions_by_user/71.json
Uploading temp_sessions_by_user/9.json to gs://ma-muy/baseline_dataset/sessions_by_user/9.json
Uploading temp_sessions_by_user/55.json to gs://ma-muy/baseline_dataset/sessions_by_user/55.json
Uploading temp_sessions_by_user/4.json to gs://ma-muy/baseline_dataset/sessions_by_user/4.json
Uploading temp_sessions_by_user/68.json to gs://ma-muy/baseline_dataset/sessions_by_user/68.json
Uploading temp_sessions_by_user/28.json to gs://ma-muy/baseline_dataset/sessions_by_user/28.json

## Generate User Parallel Mini batches

- Now that we know all the sessions of all the users we can generate the user parallel mini batches.

In [None]:
BATCH_SIZE = 10
MIN_EVENTS_PER_SESSION = 5

if TESTMODE:
    sessions_by_user_prefix = 'sessions_by_user/'
else:
    sessions_by_user_prefix = 'gs://ma-muy/baseline_dataset/sessions_by_user/'
iterator = user_parallel_batch_iterator(BATCH_SIZE, sessions_by_user_prefix, MIN_EVENTS_PER_SESSION)
batches = []
for idx, batch in enumerate(iterator):
    batches.append(batch)
    if idx == 200:
        break
pprint.PrettyPrinter(width=240, compact=True).pprint(batches)

- Now we have created the user parallel mini batches
- This notebook can be split into two parts:
    - First we have the export of the data and transformation into sessions by users
    - Second we have the generation of the mini batches based on the sessions by users
    
- The first part should be implemented in a ETL Pipeline and will be executed daily
- The second part is part of the dataset implementation inside the model repository