# Data Preprocessing

In [1]:
import cudf
import pandas as pd
from utils.data.load_data import cache_data_to_cpu
import numpy as np
from pathlib import Path

APPROACH = 'validation'

if APPROACH == 'validation':
    base_path = '../data/otto-validation'
elif APPROACH == 'test':
    base_path = '../data/otto-chunk-data-inparquet-format'

# Cache data to RAM

train_cache, _ = cache_data_to_cpu(data_path=base_path, data_seg='train')
test_cache, _ = cache_data_to_cpu(data_path=base_path, data_seg='test')

# Go from dictionary to single dataframe
train = pd.concat([df for _, df in train_cache.items()])
test = pd.concat([df for _, df in test_cache.items()]) 

unique_aids = train.aid.unique().tolist() + test.aid.unique().tolist()
unique_aids = np.sort(np.unique(unique_aids))

print(f'Train Unique Aids: {train.aid.nunique():,}')
print(f'Test Unique Aids: {test.aid.nunique():,}')
print(f'Unique Aids: {len(unique_aids):,}')

Train Unique Aids: 1,825,499
Test Unique Aids: 874,852
Unique Aids: 1,844,284


In [3]:
print(f'Max Value Aids: {max(unique_aids):,}')
print(f'Min Value Aids: {min(unique_aids):,}')

Max Value Aids: 1,855,602
Min Value Aids: 0


In [4]:
train = cudf.from_pandas(train)
test = cudf.from_pandas(test)

We need to create `aid-aid` pairs to train our matrix factorization model!

Let's us grab the pairs both from the train and test set.

In [5]:
%%time

train_pairs = cudf.concat([train, test])[['session', 'aid']]
del train, test

train_pairs['aid_next'] = train_pairs.groupby('session').aid.shift(-1)
train_pairs = train_pairs[['aid', 'aid_next']].dropna().reset_index(drop=True)

CPU times: user 281 ms, sys: 273 ms, total: 554 ms
Wall time: 553 ms


The running time is 15x better than when using polar with CPU!

In [6]:
train_pairs.shape[0] / 1_000_000

158.738978

That is 209 million pairs created in 40 seconds without running out of RAM! 🙂 Not too bad

In [7]:
train_pairs.head()

Unnamed: 0,aid,aid_next
0,625184,559816
1,559816,559816
2,559816,1021135
3,1021135,559816
4,559816,1630441


Let's see what is the cardinality of our aids -- we will need this to create the embedding layer.

In [8]:
cardinality_aids = max(train_pairs['aid'].max(), train_pairs['aid_next'].max())
cardinality_aids

1855602

We will have up to `1855602` -- that is a lot! But our matrix factorization model will be able to handle this.


Oh dear, that took forever! Mind you, were are not doing anything here, apart from iterating over the dataset for a single epoch (and that is without validation!).

The reason this is taking so long is that indexing into the the arrays and collating results into batches is very computationally expensive.

There are ways to work around this but they require writing a lot of code (you could use the iterable-style dataset). And still our solution wouldn't be particularly well optimized.

Let us do something else instead!

We will use a brand new [Merlin Dataloader](https://github.com/NVIDIA-Merlin/dataloader). It is a library that my team launched just a couple of days ago 🙂

Now this library shines when you have a GPU, which is what you generally want when training DL models. But, alas, Kaggle gives you only 13 GB of RAM on a kernel with a GPU, and that wouldn't allow us to process our dataset!

Let's see how far we can get with CPU only.

In [9]:
from merlin.loader.torch import Loader 

We can read data directly from the disk -- even better!

Let's write our datasets to disk.

In [10]:
train_pairs[:-10_000_000].to_pandas().to_parquet('train_pairs.parquet')
train_pairs[-10_000_000:].to_pandas().to_parquet('valid_pairs.parquet')

In [11]:
from merlin.loader.torch import Loader 
from merlin.io import Dataset

train_ds = Dataset('train_pairs.parquet')
train_dl_merlin = Loader(train_ds, 65536, True)

In [12]:
%%time

for batch in train_dl_merlin:
    aid1, aid2 = batch[0], batch[1]

CPU times: user 905 ms, sys: 414 ms, total: 1.32 s
Wall time: 1.63 s


That is much better 🙂. Let's train our matrix factorization model!

In [13]:
import torch
from torch import nn

class MatrixFactorization(nn.Module):
    def __init__(self, n_aids, n_factors):
        super().__init__()
        self.aid_factors = nn.Embedding(n_aids, n_factors, sparse=True)
        
    def forward(self, aid1, aid2):
        aid1 = self.aid_factors(aid1)
        aid2 = self.aid_factors(aid2)
        
        return (aid1 * aid2).sum(dim=1)
    
class AverageMeter(object):
    """Computes and stores the average and current value"""
    def __init__(self, name, fmt=':f'):
        self.name = name
        self.fmt = fmt
        self.reset()

    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count

    def __str__(self):
        fmtstr = '{name} {val' + self.fmt + '} ({avg' + self.fmt + '})'
        return fmtstr.format(**self.__dict__)

valid_ds = Dataset('valid_pairs.parquet')
valid_dl_merlin = Loader(valid_ds, 65536, True)

In [14]:
from torch.optim import SparseAdam

num_epochs=10
lr=0.1

model = MatrixFactorization(cardinality_aids+1, 32)
optimizer = SparseAdam(model.parameters(), lr=lr)
criterion = nn.BCEWithLogitsLoss()

In [15]:
%%time
model.to('cuda')
for epoch in range(num_epochs):
    for batch, _ in train_dl_merlin:
        model.train()
        losses = AverageMeter('Loss', ':.4e')
            
        aid1, aid2 = batch['aid'], batch['aid_next']
        aid1 = aid1.to('cuda')
        aid2 = aid2.to('cuda')
        output_pos = model(aid1, aid2)
        output_neg = model(aid1, aid2[torch.randperm(aid2.shape[0])])
        
        output = torch.cat([output_pos, output_neg])
        targets = torch.cat([torch.ones_like(output_pos), torch.zeros_like(output_pos)])
        loss = criterion(output, targets)
        losses.update(loss.item())
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
    model.eval()
    
    with torch.no_grad():
        accuracy = AverageMeter('accuracy')
        for batch, _ in valid_dl_merlin:
            aid1, aid2 = batch['aid'], batch['aid_next']
            output_pos = model(aid1, aid2)
            output_neg = model(aid1, aid2[torch.randperm(aid2.shape[0])])
            accuracy_batch = torch.cat([output_pos.sigmoid() > 0.5, output_neg.sigmoid() < 0.5]).float().mean()
            accuracy.update(accuracy_batch, aid1.shape[0])
            
    print(f'{epoch+1:02d}: * TrainLoss {losses.avg:.3f}  * Accuracy {accuracy.avg:.3f}')

01: * TrainLoss 0.620  * Accuracy 0.705
02: * TrainLoss 0.613  * Accuracy 0.714
03: * TrainLoss 0.611  * Accuracy 0.718
04: * TrainLoss 0.606  * Accuracy 0.719
05: * TrainLoss 0.606  * Accuracy 0.720
06: * TrainLoss 0.607  * Accuracy 0.721
07: * TrainLoss 0.603  * Accuracy 0.721
08: * TrainLoss 0.604  * Accuracy 0.721
09: * TrainLoss 0.605  * Accuracy 0.722
10: * TrainLoss 0.605  * Accuracy 0.722
CPU times: user 11min 17s, sys: 1.7 s, total: 11min 18s
Wall time: 1min 29s


This code ran about 60x faster than the cpu code form Radek's notebook. And we have not tuned the batch size! Using GPU larger batch size is possible which would reduce the running time further.

Let's grab the embeddings!

In [16]:
embeddings = model.aid_factors.weight.detach().cpu().numpy()

And construct create the index for approximate nearest neighbor search. We will use cuml for that.

In [17]:
from cuml.neighbors import NearestNeighbors

We will compute 21 nearest neighbors as in Radek's notebook. 

In [18]:
%%time

knn = NearestNeighbors(n_neighbors=21, metric='euclidean')
knn.fit(embeddings)

CPU times: user 16.5 ms, sys: 92 ms, total: 108 ms
Wall time: 112 ms


NearestNeighbors()

Now for any `aid`, we can find its nearest neighbor! cuml let you do this in parallel for all input at once.

In [19]:
%%time

_, aid_nns = knn.kneighbors(embeddings)

CPU times: user 48.3 s, sys: 156 ms, total: 48.5 s
Wall time: 48.4 s


Let's check we get 21 neighbors for each aid:

In [20]:
aid_nns.shape

(1855603, 21)

We can get rid of the first neigbor directly.

In [21]:
aid_nns[0]

array([      0,  435927, 1532058, 1331180, 1196094,   54213, 1303929,
       1362288, 1475885,  810679, 1346918,  277765, 1411300,  166411,
        748568,  719910, 1140826, 1503167, 1656874,   89497,  926200])

In [22]:
aid_nns = aid_nns[:, 1:]

In [23]:
aid_nns.shape

(1855603, 20)

In [24]:
# Save matrix to disk
cols = [f'nn_{i}' for i in range(aid_nns.shape[1])]
data = pd.DataFrame(aid_nns, columns=cols)
save_base = Path('../output/matrix-fac')
file_path = save_base / APPROACH / 'nn20.parquet'
data.to_parquet(file_path)
data.head()

Unnamed: 0,nn_0,nn_1,nn_2,nn_3,nn_4,nn_5,nn_6,nn_7,nn_8,nn_9,nn_10,nn_11,nn_12,nn_13,nn_14,nn_15,nn_16,nn_17,nn_18,nn_19
0,435927,1532058,1331180,1196094,54213,1303929,1362288,1475885,810679,1346918,277765,1411300,166411,748568,719910,1140826,1503167,1656874,89497,926200
1,1183939,315834,1408104,417868,759057,1328127,1156408,1481588,1017121,1139392,1223000,310100,1509196,1663913,617652,1088867,1072540,105737,641836,588686
2,1287855,1839810,1209089,1772209,1080360,155993,1392978,925419,27086,1814441,1543007,1209041,1554563,412636,1067902,1320053,429342,1196561,1770094,1081344
3,828344,1741562,1818695,347257,16778,1229277,1252701,1160686,750061,316488,873918,597886,1602679,64692,1152909,1762469,258325,93004,1403776,776187
4,1730772,1115393,389702,103787,1018902,1065060,211393,573353,1358045,1429258,317290,409609,530363,523774,1020053,129508,443671,852073,1059300,254580
