In [1]:
import os
import json
import time
import numpy as np
import pandas as pd

import fasttext
import faiss

from tqdm.auto import tqdm

## Data

In [2]:
data_path = '../data/train_sessions.jsonl'
  
train_sessions = pd.DataFrame()
chunks = pd.read_json(data_path, lines=True, chunksize=100_000)

for e, chunk in enumerate(chunks):
    event_dict = {
        'session': [],
        'aid': [],
        'ts': [],
        'type': [],
    }
    if e < 2:
        # train_sessions = pd.concat([train_sessions, chunk])
        for session, events in zip(chunk['session'].tolist(), chunk['events'].tolist()):
            for event in events:
                event_dict['session'].append(session)
                event_dict['aid'].append(event['aid'])
                event_dict['ts'].append(event['ts'])
                event_dict['type'].append(event['type'])
        chunk_session = pd.DataFrame(event_dict)
        train_sessions = pd.concat([train_sessions, chunk_session])
    else:
        break
        
train_sessions = train_sessions.reset_index(drop=True)
train_sessions.head()

Unnamed: 0,session,aid,ts,type
0,0,1517085,1659304800025,clicks
1,0,1563459,1659304904511,clicks
2,0,1309446,1659367439426,clicks
3,0,16246,1659367719997,clicks
4,0,1781822,1659367871344,clicks


In [3]:
data_path = '../data/test_sessions.jsonl'
  
test_sessions = pd.DataFrame()
chunks = pd.read_json(data_path, lines=True, chunksize=100_000)

for e, chunk in enumerate(chunks):
    event_dict = {
        'session': [],
        'aid': [],
        'ts': [],
        'type': [],
    }
    if e < 2:
        # train_sessions = pd.concat([train_sessions, chunk])
        for session, events in zip(chunk['session'].tolist(), chunk['events'].tolist()):
            for event in events:
                event_dict['session'].append(session)
                event_dict['aid'].append(event['aid'])
                event_dict['ts'].append(event['ts'])
                event_dict['type'].append(event['type'])
        chunk_session = pd.DataFrame(event_dict)
        test_sessions = pd.concat([test_sessions, chunk_session])
    else:
        break
        
test_sessions = test_sessions.reset_index(drop=True)
test_sessions.head()

Unnamed: 0,session,aid,ts,type
0,12383433,1542913,1661551200081,clicks
1,12383434,8211,1661551200511,clicks
2,12383435,940546,1661551201055,carts
3,12383435,45443,1661551213043,clicks
4,12383435,1769360,1661551246239,clicks


## Training sequence

In [4]:
aid_seq = train_sessions.sort_values(["session", "ts"]).reset_index(drop=True)
aid_seq["aid_2"] = aid_seq.aid.shift(1)
aid_seq = aid_seq[aid_seq.aid != aid_seq.aid_2]
aid_seq = aid_seq[["session", "aid", "ts", "type"]]
aid_seq["aid"] = aid_seq["aid"].astype(str)
aid_seq = aid_seq.groupby(["session"]).agg(list)["aid"].reset_index()
aid_seq = aid_seq[(aid_seq.aid.apply(len) >= 5)].reset_index(drop=True)
aid_seq.head()

Unnamed: 0,session,aid
0,0,"[1517085, 1563459, 1309446, 16246, 1781822, 11..."
1,1,"[424964, 1492293, 910862, 1491172, 424964, 151..."
2,2,"[763743, 137492, 504789, 137492, 795863, 37834..."
3,3,"[1425967, 1343406, 1425967, 1343406, 1815570, ..."
4,4,"[613619, 298827, 383828, 255379, 1838173, 1453..."


In [5]:
with open("../data/train_aid_seq.txt", 'w') as f:
    for aid_list in aid_seq.aid:
        print("__label__1", " ".join(aid_list), file = f)

## Training

In [6]:
model = fasttext.train_unsupervised(
    '../data/train_aid_seq.txt', 
    model = 'skipgram',
    ws = 5,
    dim = 128, 
    epoch = 5, 
    lr = 0.01,
    minn = 0,
    maxn = 0
)

Read 8M words
Number of words:  275012
Number of labels: 1
Progress: 100.0% words/sec/thread:   63958 lr:  0.000000 avg.loss:  2.676408 ETA:   0h 0m 0s 0.009740 avg.loss:  4.139654 ETA:   0h 1m10s


## ANN index

### Method 1 (out of memory)

In [7]:
item_dict = {}
embeddings = []
for i, item_id in enumerate(model.words):
    if item_id != '</s>':
        item_dict[i-1] = item_id
        embeddings.append(model[item_id])
    
index_data = np.array(embeddings)
faiss.normalize_L2(index_data)
index = faiss.IndexHNSWFlat(128, 32, faiss.METRIC_INNER_PRODUCT)

index.add(index_data)
sim_matrix, candidate_ids_matrix = index.search(index_data, 20)

sim_index = []
for idx, candidate_ids in tqdm(enumerate(candidate_ids_matrix)):
    sims = sim_matrix[idx]
    candidates = list(zip(candidate_ids, sims))
    candidates = list(filter(lambda x: x[0] != idx, candidates))
    candidates = list(filter(lambda x: x[0] in item_dict, candidates))
    sim_index.extend([(item_dict[idx], item_dict[x[0]], float(x[1])) for x in candidates])

0it [00:00, ?it/s]

In [8]:
from pyspark.sql import SparkSession

# SparkSession
spark = SparkSession.builder \
    .master("local") \
    .appName("item2vec") \
    .getOrCreate()

# Similariy table and sessions 
sim_table = spark.createDataFrame(sim_index).toDF("trigger_id", "sim_id", "score")
sessions = spark.createDataFrame(test_sessions)

# To temp view
sim_table.createOrReplaceTempView("sim_table")
sessions.createOrReplaceTempView("test_sessions")
sim_table.show(5)

22/12/07 01:51:27 WARN Utils: Your hostname, hataeis-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.1.3 instead (on interface en0)
22/12/07 01:51:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/12/07 01:51:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/12/07 01:53:02 WARN TaskSetManager: Stage 0 contains a task of very large size (128811 KiB). The maximum recommended task size is 1000 KiB.
[Stage 0:>                                                          (0 + 1) / 1]

+----------+-------+------------------+
|trigger_id| sim_id|             score|
+----------+-------+------------------+
|     29735| 832192|0.9488866329193115|
|     29735| 329725|0.9306361675262451|
|     29735| 575616|0.9032601714134216|
|     29735|1436280| 0.900749683380127|
|     29735|1194834| 0.889082133769989|
+----------+-------+------------------+
only showing top 5 rows



                                                                                

In [9]:
querySQL = """
    WITH sim AS (
        SELECT
            *
        FROM
            sim_table
    ),
    session AS (
        SELECT
            session,
            aid
        FROM
            test_sessions
    ),
    preds AS (
        SELECT
            session,
            sim_id,
            SUM(score) AS score
        FROM (
            SELECT
                session,
                sim_id,
                score
            FROM
                session LEFT JOIN sim ON session.aid = sim.trigger_id
        ) t
        GROUP BY
            session,
            sim_id
    )
    SELECT
        session,
        sim_id,
        score
    FROM (
        SELECT
            session,
            sim_id,
            score,
            ROW_NUMBER() OVER (PARTITION BY session ORDER BY score DESC) AS rn
        FROM
            preds
    ) t
    WHERE
        rn <= 20
    ORDER BY
        session ASC,
        score DESC
"""

preds = spark.sql(querySQL)

In [10]:
preds.show(100)

22/12/07 01:53:15 WARN TaskSetManager: Stage 1 contains a task of very large size (128811 KiB). The maximum recommended task size is 1000 KiB.
22/12/07 01:53:20 WARN TaskSetManager: Stage 2 contains a task of very large size (24766 KiB). The maximum recommended task size is 1000 KiB.

+--------+-------+------------------+
| session| sim_id|             score|
+--------+-------+------------------+
|12383433|  98198|0.9962925910949707|
|12383433|1286203|0.9935086965560913|
|12383433|1262140|0.9934881329536438|
|12383433|1279234|0.9933539628982544|
|12383433| 166356|0.9932950139045715|
|12383433|1460535|0.9931045174598694|
|12383433|1287070|0.9909816384315491|
|12383433| 820797| 0.990317165851593|
|12383433| 939499|0.9902307987213135|
|12383433|1682245|0.9899848699569702|
|12383433| 208637|0.9896426200866699|
|12383433|1587722|0.9891594648361206|
|12383433| 235914|0.9888125061988831|
|12383433| 302191|0.9884185791015625|
|12383433|1443220|0.9878557920455933|
|12383433| 299919|0.9877472519874573|
|12383433|1492695|0.9877097606658936|
|12383433|1574625|0.9876535534858704|
|12383433|1362676| 0.987083375453949|
|12383434|1524036|0.9965766072273254|
|12383434|1265775| 0.995956301689148|
|12383434|1180072|0.9952216148376465|
|12383434| 421724|0.9951610565185547|
|12383434|  



### Method 2 (session embedding)

In [11]:
# Compute session embeddings
session_embeddings_df = test_sessions[["session", "aid"]].copy()
session_embeddings_df["embedding"] = session_embeddings_df.aid.apply(lambda x: model[str(x)])
session_embeddings_df = session_embeddings_df \
    .groupby(["session"]) \
    .agg(lambda x: np.mean(x, axis=0))["embedding"] \
    .reset_index()

In [12]:
session_embeddings_df

Unnamed: 0,session,embedding
0,12383433,"[0.21975018, -0.14686339, -0.03233924, 0.04380..."
1,12383434,"[0.41913685, -0.1216619, -0.20306435, -0.31676..."
2,12383435,"[0.07131644, -0.0522051, 0.008751924, 0.032821..."
3,12383436,"[0.12278918, -0.12973182, 0.07255889, 0.133903..."
4,12383437,"[0.39890373, -0.55441976, -0.23951662, 0.33657..."
...,...,...
199995,12583428,"[0.2477992, -0.25386792, 0.15905522, 0.1981562..."
199996,12583429,"[0.2561214, -0.1344506, 0.0025729612, 0.136058..."
199997,12583430,"[0.12452375, -0.11119616, 0.014411954, 0.05683..."
199998,12583431,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."


In [13]:
item_dict = {}
embeddings = []
for i, item_id in enumerate(model.words):
    if item_id != '</s>':
        item_dict[i-1] = item_id
        embeddings.append(model[item_id])

session_dict = dict(zip(session_embeddings_df.session.index, session_embeddings_df.session.astype(str)))
session_embedding = np.array(session_embeddings_df.embedding.tolist())
    
index_data = np.array(embeddings)
faiss.normalize_L2(index_data)
faiss.normalize_L2(session_embedding)
index = faiss.IndexHNSWFlat(128, 32, faiss.METRIC_INNER_PRODUCT)

index.add(index_data)
sim_matrix, candidate_ids_matrix = index.search(session_embedding, 20)

sim_index = []
for idx, candidate_ids in tqdm(enumerate(candidate_ids_matrix)):
    sims = sim_matrix[idx]
    candidates = list(zip(candidate_ids, sims))
    candidates = list(filter(lambda x: x[0] in item_dict, candidates))
    
    preds = []
    for cand in candidates:
        preds.append(item_dict[cand[0]])
    sim_index.append([session_dict[idx] + "_clicks", " ".join(preds)])
    sim_index.append([session_dict[idx] + "_carts", " ".join(preds)])
    sim_index.append([session_dict[idx] + "_orders", " ".join(preds)])

0it [00:00, ?it/s]

In [14]:
submission = pd.DataFrame(sim_index, columns=["session_type", "labels"])
submission.to_csv("../out/i2v_submission.csv", index=False)

In [15]:
submission

Unnamed: 0,session_type,labels
0,12383433_clicks,1542913 98198 1286203 1262140 1279234 166356 1...
1,12383433_carts,1542913 98198 1286203 1262140 1279234 166356 1...
2,12383433_orders,1542913 98198 1286203 1262140 1279234 166356 1...
3,12383434_clicks,8211 1524036 1265775 1180072 421724 29144 1184...
4,12383434_carts,8211 1524036 1265775 1180072 421724 29144 1184...
...,...,...
599995,12583431_carts,367983 460717 480122 460989 945061 418608 8952...
599996,12583431_orders,367983 460717 480122 460989 945061 418608 8952...
599997,12583432_clicks,847629 1077213 1542995 576642 373932 1004705 1...
599998,12583432_carts,847629 1077213 1542995 576642 373932 1004705 1...
