## Experiments
1. MF using NN - features based on users and movies
2. NN regression - features based on users, movies & baseline features
3. NN regression - features based on users, movies, movie titles & baseline features

In [1]:
%load_ext autoreload
%autoreload 2
%matplotlib inline

In [2]:
import io
import os
import json
import time
import sys
import math
import copy
import pickle
import zipfile
from textwrap import wrap
from pathlib import Path
from itertools import zip_longest
from collections import defaultdict
from urllib.error import URLError
from urllib.request import urlopen

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split

import torch
from torch import nn
from torch import optim
from torch import tensor
from torch.nn import functional as F 
from torch.optim.lr_scheduler import _LRScheduler

In [3]:
sys.path.append("src/")
from constants import *

## Find the number of unique users and movies and subsequently create a user2index and item2index map

In [4]:
def _find_files(_dir, start_token, end_token):
    return [x for x in os.listdir(_dir) if (x.startswith(start_token))
            and (x.endswith(end_token))]


def _get_entity_index_map(entity_lst):

    user2idx = {user: i for i, user in enumerate(entity_lst)}
    idx2user = {i: user for i, user in enumerate(entity_lst)}

    return user2idx, idx2user


def get_metadata(data_dir=OUT_DIR, start_token='user_data_',
                 end_token='.h5', user_col='User', item_col='Movie'):
    
    print('get all the files to be read')
    files = _find_files(data_dir, start_token, end_token)
    print(files)
    print('num files: %d' % (len(files)))
    
    user_ids, item_ids = set(), set()
    
    for i, file in enumerate(files):
        print('num files completed: %d' % (i))
        print('\n')

        fn = os.path.join(data_dir, file)
        print('reading file %s' % (fn))
        df = pd.read_hdf(fn, key='stage')
        print('shape: ', df.shape)
        
        print('update number of unique categories for discrete variables')
        user_lst = df[user_col].unique().tolist()
        item_lst = df[item_col].unique().tolist()

        user_ids = user_ids.union(set(user_lst))
        item_ids = item_ids.union(set(item_lst))
    
    print('\n\n\n')
    print('num users: %d' % (len(user_ids)))
    print('num items: %d' % (len(item_ids)))
    
    print('create entity to index mapping')
    user2idx, idx2user = _get_entity_index_map(user_ids)
    item2idx, idx2item = _get_entity_index_map(item_ids)
    del user_ids, item_ids
    print('\n\n')
    
    print('save artifacts \n')

    print('user2idx \n')
    json.dump(user2idx, open(USER2IDX_FN, 'w'))

    print('idx2user \n')
    json.dump(idx2user, open(IDX2USER_FN, 'w'))

    print('item2idx \n')
    json.dump(item2idx, open(ITEM2IDX_FN, 'w'))

    print('idx2item \n')
    json.dump(idx2item, open(IDX2ITEM_FN, 'w'))

In [6]:
%time get_metadata()

get all the files to be read
['user_data_1.h5', 'user_data_2.h5', 'user_data_3.h5', 'user_data_4.h5']
num files: 4
num files completed: 0


reading file /Users/varunn/Documents/kaggle/netflix-prize-data/interim/user_data_1.h5
shape:  (24053764, 4)
update number of unique categories for discrete variables
num files completed: 1


reading file /Users/varunn/Documents/kaggle/netflix-prize-data/interim/user_data_2.h5
shape:  (26977591, 4)
update number of unique categories for discrete variables
num files completed: 2


reading file /Users/varunn/Documents/kaggle/netflix-prize-data/interim/user_data_3.h5
shape:  (22601629, 4)
update number of unique categories for discrete variables
num files completed: 3


reading file /Users/varunn/Documents/kaggle/netflix-prize-data/interim/user_data_4.h5
shape:  (26847523, 4)
update number of unique categories for discrete variables




num users: 480189
num items: 17770
create entity to index mapping



save artifacts 

user2idx 

idx2user 

item2idx 

## Prepare data for the modelling experiments

In [5]:
class PrepareData(object):

    def __init__(self, sample, interim_data_dir=OUT_DIR,
                 prep_data_dir=PREPARED_DATA_DIR,
                 user_col='User', item_col='Movie', end_token='.h5',
                 date_col='Date', dv_col='Rating', good_ratings=[5],
                 start_token='user_{}_data_', user2idx_fn=USER2IDX_FN,
                 item2idx_fn=ITEM2IDX_FN, baseline_feats=False,
                 erd_user_fn=EARLIEST_RATING_DATE_USER_FN,
                 erd_movie_fn=EARLIEST_RATING_DATE_MOVIE_FN,
                 lrd_user_fn=LATEST_RATING_DATE_USER_FN,
                 lrd_movie_fn=LATEST_RATING_DATE_MOVIE_FN,
                 mr_user_fn=MEAN_RATINGS_USER_DCT_FN,
                 mr_movie_fn=MEAN_RATINGS_MOVIE_DCT_FN,
                 wmr_movie_fn=WEIGHTED_MEAN_RATINGS_MOVIE_DCT_FN,
                 nr_user_fn=NUM_RATINGS_USER_DCT_FN,
                 nr_movie_fn=NUM_RATINGS_MOVIE_DCT_FN):

        files = _find_files(interim_data_dir, start_token.format(sample),
                            end_token)
        self.files = [os.path.join(interim_data_dir, x) for x in files]
        self.out_files = [os.path.join(prep_data_dir, x) for x in files]
        print(self.files)
        print(self.out_files)
        self.date_col = date_col
        self.dv_col = dv_col
        self.good_ratings = good_ratings
        self.dv_col_class = dv_col + '_class'
        self.user_col = user_col
        self.item_col = item_col
        self.user2idx = json.load(open(user2idx_fn))
        self.item2idx = json.load(open(item2idx_fn))
        self.baseline_feats = baseline_feats
        
        if self.baseline_feats:
            
            self.erd_user = self.convert_to_datetime(json.load(open(
                erd_user_fn)))
            d = self.calc_median(self.erd_user, self.user2idx,
                                 'earliest')
            if d is not None:
                self.erd_user.update(d)
            
            self.erd_movie = self.convert_to_datetime(json.load(open(
                erd_movie_fn)))
            d = self.calc_median(self.erd_movie, self.item2idx,
                                 'earliest')
            if d is not None:
                self.erd_movie.update(d)
            
            self.lrd_user = self.convert_to_datetime(json.load(open(
                lrd_user_fn)))
            d = self.calc_median(self.lrd_user, self.user2idx, 'latest')
            if d is not None:
                self.lrd_user.update(d)
            
            self.lrd_movie = self.convert_to_datetime(json.load(open(
                lrd_movie_fn)))
            d = self.calc_median(self.lrd_movie, self.item2idx, 'latest')
            if d is not None:
                self.lrd_movie.update(d)
            
            self.mr_user = json.load(open(mr_user_fn))
            d = self.calc_median(self.mr_user, self.user2idx, None)
            if d is not None:
                self.mr_user.update(d)
            
            self.mr_movie = json.load(open(mr_movie_fn))
            d = self.calc_median(self.mr_movie, self.item2idx, None)
            if d is not None:
                self.mr_movie.update(d)
            
            self.wmr_movie = json.load(open(wmr_movie_fn))
            d = self.calc_median(self.wmr_movie, self.item2idx, None)
            if d is not None:
                self.wmr_movie.update(d)
            
            self.nr_user = json.load(open(nr_user_fn))
            d = self.calc_median(self.nr_user, self.user2idx, None)
            if d is not None:
                self.nr_user.update(d)
            
            self.nr_movie = json.load(open(nr_movie_fn))
            d = self.calc_median(self.nr_movie, self.item2idx, None)
            if d is not None:
                self.nr_movie.update(d)
            

    def read_file(self, fn):
        
        df = pd.read_hdf(fn, key='stage')
        return df
    
    def convert_to_datetime(self, feat_dct):
        return {k: pd.to_datetime(v, format='%Y-%m-%d')
                for k, v in feat_dct.items()}
    
    def calc_median(self, feat_dct, ent2idx, date):
        
        if len(ent2idx) > len(feat_dct):
            print('there are missing users or movies')
            missing_users = set(ent2idx.keys())
            missing_users = missing_users - set(feat_dct.keys())
            if date is None:
                median = np.nanmedian(list(feat_dct.values()))
            elif date == 'earliest':
                median = np.nanmin(list(feat_dct.values()))
            elif date == 'latest':
                median = np.nanmax(list(feat_dct.values()))
            d = dict(zip(missing_users, [median]*len(missing_users)))
            return d
        else:
            return None
        
    def calc_date_features(self, data, erd_feat_dct, lrd_feat_dct,
                           feat_type):
        
        if feat_type == 'user':
            ent_col = self.user_col
        elif feat_type == 'item':
            ent_col = self.item_col
        
        parent_feature = 'days_since_first_{}_rating'.format(feat_type)
        print(parent_feature)
        data[parent_feature] = list(
            map(lambda user, date: (date -
                                    erd_feat_dct[str(user)]).days
                if str(user) in erd_feat_dct else None,
                data[ent_col], data[self.date_col]))

        feature = 'sqrt_days_since_first_{}_rating'.format(feat_type)
        print(feature)
        data[feature] = data[parent_feature].apply(
            lambda x: np.sqrt(x) if (x is not None) and (x>=0) else None)
        mask = data[parent_feature] < 0
        data.loc[mask, feature] = -1
        data.loc[mask, parent_feature] = -1

        feature = 'rating_age_days_{}'.format(feat_type)
        print(feature)
        data[feature] = list(
            map(lambda user: (lrd_feat_dct[str(user)] -
                              erd_feat_dct[str(user)]).days
                if (str(user) in erd_feat_dct) and
                (str(user) in lrd_feat_dct) else None, data[ent_col]))

        feature = 'rating_age_weeks_{}'.format(feat_type)
        parent_feature = 'rating_age_days_{}'.format(feat_type)
        print(feature)
        data[feature] = data[parent_feature].apply(
            lambda x: x/7. if x is not None else None)

        feature = 'rating_age_months_{}'.format(feat_type)
        parent_feature = 'rating_age_days_{}'.format(feat_type)
        print(feature)
        data[feature] = data[parent_feature].apply(
            lambda x: x/30. if x is not None else None)
        
        return data

    def preprocess(self, data):

        print('convert %s to datetime format' % (self.date_col))
        data[self.date_col] = pd.to_datetime(data[self.date_col],
                                             format='%Y-%m-%d')

        print('encoding for DV - only for classification experiments')
        mask = data[self.dv_col].isin(self.good_ratings)
        data[self.dv_col_class] = 0
        data.loc[mask, self.dv_col_class] = 1

        if self.baseline_feats:
            print('adding user and item baseline features\n')

            print('Define ordered tuple of feature columns\n')
            user_feats = [
                ('mean_ratings_user', self.mr_user),
                ('num_ratings_user', self.nr_user)]
            
            movie_feats = [
                ('mean_ratings_movie', self.mr_movie),
                ('weighted_mean_ratings_movie', self.wmr_movie),
                ('num_ratings_movie', self.nr_movie)]

            print('User Features\n')
            
            print('date features\n')
            data = self.calc_date_features(data, self.erd_user,
                                           self.lrd_user, 'user')
            print('other features\n')
            for feat_name, feat_dct in user_feats:
                print('Feature: ', feat_name)
                data[feat_name] = data[self.user_col].apply(
                    lambda x: feat_dct.get(str(x), None))

            print('Item Features\n')
            
            print('date features\n')
            data = self.calc_date_features(data, self.erd_movie,
                                           self.lrd_movie, 'item')
            print('other features\n')
            for feat_name, feat_dct in movie_feats:
                print('Feature: ', feat_name)
                data[feat_name] = data[self.item_col].apply(
                    lambda x: feat_dct.get(str(x), None))     

        print('encoding for categorical variables')
        data[self.user_col] = data[self.user_col].apply(
            lambda x: self.user2idx[str(x)])
        data[self.item_col] = data[self.item_col].apply(
            lambda x: self.item2idx[str(x)])
        
        print('drop unwanted columns')
        data.drop(['num_rating_user', 'num_rating_user_bins',
                   'num_rating_movie', 'num_rating_movie_bins'],
                  axis=1, inplace=True)

        return data

    def prepare_data(self):
        
        for i, fn in enumerate(self.files):
            if os.path.isfile(self.out_files[i]):
                print('file %s already exists in the disk.' % (
                    self.out_files[i]))
                continue
            print('num completed files: %d' % (i))

            start = time.time()

            print('read data')
            data = self.read_file(fn)
            print('time taken: %0.2f' % (time.time() - start))
            print('\n\n\n')

            print('preprocess data')
            data = self.preprocess(data)
            print('time taken: %0.2f' % (time.time() - start))
            print('\n\n\n')

            print('save')
            out_fn = self.out_files[i]
            print('out file: %s' % (out_fn))
            data.to_hdf(out_fn, key='stage', mode='w')
            del data
            print('time taken: %0.2f' % (time.time() - start))
            print('\n\n\n')

In [42]:
%time prep_data = PrepareData(sample='train', baseline_feats=True)

['/Users/varunn/Documents/kaggle/netflix-prize-data/interim/user_train_data_1.h5', '/Users/varunn/Documents/kaggle/netflix-prize-data/interim/user_train_data_2.h5', '/Users/varunn/Documents/kaggle/netflix-prize-data/interim/user_train_data_3.h5', '/Users/varunn/Documents/kaggle/netflix-prize-data/interim/user_train_data_4.h5']
['/Users/varunn/Documents/kaggle/netflix-prize-data/prepared_data_for_NN_modelling/user_train_data_1.h5', '/Users/varunn/Documents/kaggle/netflix-prize-data/prepared_data_for_NN_modelling/user_train_data_2.h5', '/Users/varunn/Documents/kaggle/netflix-prize-data/prepared_data_for_NN_modelling/user_train_data_3.h5', '/Users/varunn/Documents/kaggle/netflix-prize-data/prepared_data_for_NN_modelling/user_train_data_4.h5']
there are missing users or movies
there are missing users or movies
there are missing users or movies
there are missing users or movies
CPU times: user 2min 35s, sys: 1.25 s, total: 2min 36s
Wall time: 2min 38s


In [43]:
print(len(prep_data.erd_user))
print(len(prep_data.erd_movie))
print(len(prep_data.lrd_user))
print(len(prep_data.lrd_movie))
print(len(prep_data.mr_user))
print(len(prep_data.mr_movie))
print(len(prep_data.wmr_movie))
print(len(prep_data.nr_user))
print(len(prep_data.nr_movie))

480189
17770
480189
17770
480189
17770
17770
480189
17770


In [44]:
%time prep_data.prepare_data()

num completed files: 0
read data
time taken: 8.43




preprocess data
convert Date to datetime format
encoding for DV - only for classification experiments
adding user and item baseline features

Define ordered tuple of feature columns

User Features

date features

days_since_first_user_rating
sqrt_days_since_first_user_rating
rating_age_days_user
rating_age_weeks_user
rating_age_months_user
other features

Feature:  mean_ratings_user
Feature:  num_ratings_user
Item Features

date features

days_since_first_item_rating
sqrt_days_since_first_item_rating
rating_age_days_item
rating_age_weeks_item
rating_age_months_item
other features

Feature:  mean_ratings_movie
Feature:  weighted_mean_ratings_movie
Feature:  num_ratings_movie
encoding for categorical variables
drop unwanted columns
time taken: 1957.49




save
out file: /Users/varunn/Documents/kaggle/netflix-prize-data/prepared_data_for_NN_modelling/user_train_data_1.h5
time taken: 1970.24




num completed files: 1
read data
time tak

## Experiment 1 - MF with NN

### Define the network and model class

In [6]:
class SimpleCF(nn.Module):
    def __init__(self, n_users: int, n_items: int, factors: int = 16,
                 user_embeddings: torch.tensor = None,
                 freeze_users: bool = False,
                 item_embeddings: torch.tensor = None,
                 freeze_items: bool = False,
                 init: torch.nn.init = torch.nn.init.normal_,
                 binary: bool =False, **kwargs):
        super().__init__()
        self.binary = binary

        self.user_embeddings = self._create_embedding(
            n_users, factors, user_embeddings, freeze_users,
            init, **kwargs)
        self.item_embeddings = self._create_embedding(
            n_items, factors, item_embeddings, freeze_items,
            init, **kwargs)
        self.sigmoid = nn.Sigmoid()

    def forward(self, u: torch.tensor, i: torch.tensor) -> torch.tensor:
        user_embedding = self.user_embeddings(u)
        user_embedding = user_embedding[:, None, :]
        item_embedding = self.item_embeddings(i)
        item_embedding = item_embedding[:, None, :]
        rating = torch.matmul(user_embedding, item_embedding.transpose(
            1, 2))
        if self.binary:
            return self.sigmoid(rating)
        return rating

    def _create_embedding(self, n_items, factors, weights, freeze,
                          init, **kwargs):
        embedding = nn.Embedding(n_items, factors)
        init(embedding.weight.data, **kwargs)

        if weights is not None:
            embedding.load_state_dict({'weight': weights})
        if freeze:
            embedding.weight.requires_grad = False

        return embedding
    
    
class BaseModule(nn.Module):
    """
    Base module for explicit matrix factorization.
    """
    
    def __init__(self,
                 n_users,
                 n_items,
                 n_factors=40,
                 dropout_p=0,
                 sparse=False,
                 user_embeddings: torch.tensor = None,
                 user_biases: torch.tensor = None,
                 freeze_users: bool = False,
                 item_embeddings: torch.tensor = None,
                 item_biases: torch.tensor = None,
                 freeze_items: bool = False,
                 init: torch.nn.init = torch.nn.init.normal_,
                 **kwargs):
        """
        Parameters
        ----------
        n_users : int
            Number of users
        n_items : int
            Number of items
        n_factors : int
            Number of latent factors (or embeddings or whatever you want to
            call it).
        dropout_p : float
            p in nn.Dropout module. Probability of dropout.
        sparse : bool
            Whether or not to treat embeddings as sparse. NOTE: cannot use
            weight decay on the optimizer if sparse=True. Also, can only use
            Adagrad.
        """
        super(BaseModule, self).__init__()
        self.n_users = n_users
        self.n_items = n_items
        self.n_factors = n_factors
        self.user_embeddings, self.user_biases = self._create_embedding(
            n_users, n_factors, user_embeddings, user_biases,
            freeze_users, init, sparse, **kwargs)
        self.item_embeddings, self.item_biases = self._create_embedding(
            n_items, n_factors, item_embeddings, item_biases,
            freeze_items, init, sparse, **kwargs)
        
        self.dropout_p = dropout_p
        self.dropout = nn.Dropout(p=self.dropout_p)

        self.sparse = sparse
        
    def forward(self, users, items):
        """
        Forward pass through the model. For a single user and item, this
        looks like:
        user_bias + item_bias + user_embeddings.dot(item_embeddings)
        Parameters
        ----------
        users : np.ndarray
            Array of user indices
        items : np.ndarray
            Array of item indices
        Returns
        -------
        preds : np.ndarray
            Predicted ratings.
        """
        ues = self.user_embeddings(users)
        uis = self.item_embeddings(items)

        preds = self.user_biases(users)
        preds += self.item_biases(items)
        preds += (self.dropout(ues) * self.dropout(uis)).sum(
            dim=1, keepdim=True)

        return preds.squeeze()
    
    def __call__(self, *args):
        return self.forward(*args)

    def predict(self, users, items):
        return self.forward(users, items)
    
    def _create_embedding(self, n_items, n_factors, pre_weights,
                          pre_biases, freeze, init, sparse, **kwargs):
        
        bias = nn.Embedding(n_items, 1, sparse=sparse)
        embedding = nn.Embedding(n_items, n_factors, sparse=sparse)
        init(bias.weight.data, **kwargs)
        init(embedding.weight.data, **kwargs)

        if pre_weights is not None:
            embedding.load_state_dict({'weight': pre_weights})
            
        if pre_biases is not None:
            bias.load_state_dict({'weight': pre_biases})
        
        if freeze:
            embedding.weight.requires_grad = False
            bias.weight.requires_grad = False

        return embedding, bias


def bpr_loss(preds, vals):
    sig = nn.Sigmoid()
    return (1.0 - sig(preds)).pow(2).sum()

In [7]:
from torch.utils.data import Dataset


class Interactions(Dataset):
    """
    Hold data in the form of an interactions matrix.
    Typical use-case is like a ratings matrix:
    - Users are the rows
    - Items are the columns
    - Elements of the matrix are the ratings given by a user for an item.
    """

    def __init__(self, mat):
        self.mat = mat.astype(np.float32).tocoo()
        self.n_users = self.mat.shape[0]
        self.n_items = self.mat.shape[1]

    def __getitem__(self, index):
        row = self.mat.row[index]
        col = self.mat.col[index]
        val = self.mat.data[index]
        return (row, col), val

    def __len__(self):
        return self.mat.nnz

In [8]:
# create interactions matrix
import scipy.sparse as sp

def get_interaction_matrix(df, n_users, n_movies, user2index,
                           item2index):
    interactions = np.zeros((n_users, n_movies))
    for row in df.itertuples():
        interactions[user2index[row[1]], item2index[row[2]]] = row[3]
    
    return sp.coo_matrix(interactions)

In [9]:
from torch.utils.data import IterableDataset
from itertools import chain, islice


class InteractionsStream(IterableDataset):

    def __init__(self, prep_data_dir=PREPARED_DATA_DIR, file_num=None,
                 sample='train', user_col='User', item_col='Movie',
                 end_token='.h5', start_token='user_{}_data_',
                 baseline_feats=False, model_type='regression',
                 chunksize=10):

        if file_num is None:
            self.files = [os.path.join(prep_data_dir, x) for x in
                          _find_files(prep_data_dir,
                                      start_token.format(sample),
                                      end_token)]
        else:
            self.files = [
                os.path.join(prep_data_dir,
                             start_token.format(sample)+str(file_num)+
                             end_token)]
        print(self.files)
        self.user_col = user_col
        self.item_col = item_col
        self.baseline_feats = baseline_feats
        self.sample = sample
        self.chunksize = chunksize
        if model_type == 'regression':
            self.dv_col = 'Rating'
        elif model_type == 'classification':
            self.dv_col = 'Rating_class'
        self.cat_cols = [self.user_col, self.item_col]
        
        if baseline_feats:
            self.numeric_cols = [
                'days_since_first_user_rating',
                'sqrt_days_since_first_user_rating',
                'rating_age_days_user', 'rating_age_weeks_user',
                'rating_age_months_user', 'mean_ratings_user',
                'num_ratings_user', 'days_since_first_item_rating',
                'sqrt_days_since_first_item_rating',
                'rating_age_days_item', 'rating_age_weeks_item',
                'rating_age_months_item', 'mean_ratings_movie',
                'weighted_mean_ratings_movie', 'num_ratings_movie']
        else:
            self.numeric_cols = []            

    def read_file(self, fn):
        
        if self.sample == 'train':
            df = pd.read_hdf(fn, key='stage', iterator=True,
                             chunksize=self.chunksize)
        else:
            df = pd.read_hdf(fn, key='stage')
        
        return df       

    def process_data(self, fn):

        print('read data')
        data = self.read_file(fn)

        if self.sample == 'train':
            for row in data:
                user = row[self.user_col].tolist()
                item = row[self.item_col].tolist()
                y = row[self.dv_col].tolist()
                yield (user, item), y
        else:
            for i, row in data.iterrows():
                yield (row[self.user_col],
                       row[self.item_col]), row[self.dv_col]

    def get_stream(self, files):
        return chain.from_iterable(map(self.process_data, files))

    def __iter__(self):
        return self.get_stream(self.files)

In [18]:
import torch
from torch import nn, tensor
import numpy as np
import collections

from abc import ABCMeta
from abc import abstractmethod
from typing import Callable
from tqdm import tqdm


class StepBase:
    """Defines the interface that all step models here expose."""
    __metaclass__ = ABCMeta

    @abstractmethod
    def batch_fit(self, data_loader: torch.utils.data.DataLoader, epochs: int):
        """Trains the model on a batch of user-item interactions."""
        pass

    @abstractmethod
    def step(self, user: torch.tensor, item: torch.tensor,
             rating: torch.tensor, preference: torch.tensor):
        """Trains the model incrementally."""
        pass

    @abstractmethod
    def predict(self, user: torch.tensor, k: int):
        """Recommends the top-k items to a specific user."""
        pass

    @abstractmethod
    def save(self, path: str):
        """Saves the model parameters to the given path."""
        pass

    @abstractmethod
    def load(self, path: str):
        """Loads the model parameters from a given path."""
        pass


class Step(StepBase):
    """Incremental and batch training of recommender systems."""
    def __init__(self, model: torch.nn.Module,
                 loss_function=torch.nn.MSELoss(reduction='sum'),
                 optimizer = torch.optim.Adam,
                 lr = 0.01, weight_decay = 0., batch_size=512,
                 chunksize=10):
        self.model = model
        self.loss_function = loss_function
        self.lr = lr
        self.batch_size = batch_size
        self.chunksize = chunksize
        self.weight_decay = weight_decay
        self.optimizer = optimizer(self.model.parameters(),
                                   lr=self.lr,
                                   weight_decay=self.weight_decay)
        self.losses = []

        # check if the user has provided user and item embeddings
        assert self.model.user_embeddings, 'User embedding matrix could not be found.'
        assert self.model.item_embeddings, 'Item embedding matrix could not be found.'

    @property
    def user_embeddings(self):
        return self.model.user_embeddings

    @property
    def item_embeddings(self):
        return self.model.item_embeddings
    
    @property
    def user_biases(self):
        return self.model.user_biases
    
    @property
    def item_biases(self):
        return self.model.item_biases
    
    def construct_tensor(self, a):
        out = [] 
        for i in a: 
            out += i.tolist() 
        return tensor(out)

    def batch_fit(self, data_loader: torch.utils.data.DataLoader,
                  data_size: int, epochs: int = 1):
        """Trains the model on a batch of user-item interactions."""
        
        self.model.train()
        for epoch in range(epochs):
            total_loss = torch.Tensor([0])
            with tqdm(total=data_size//(self.batch_size * self.chunksize)) as pbar:
                for _, ((row, col), val) in enumerate(data_loader):
                    self.optimizer.zero_grad()

                    row = self.construct_tensor(row).long()
                    col = self.construct_tensor(col).long()
                    val = self.construct_tensor(val).float()
                    
                    #print(row.size(), '\t', col.size(), '\t', val.size())

                    preds = self.model(row, col)
                    loss = self.loss_function(preds, val)
                    loss.backward()

                    self.optimizer.step()

                    total_loss += loss.item()
                    batch_loss = loss.item() / row.size()[0]

                    pbar.update(1)
                
            total_loss /= data_size
            self.losses.append(total_loss)
            
    def _validation_loss(self, data_loader: torch.utils.data.DataLoader,
                         data_size: int):
        self.model.eval()
        total_loss = torch.Tensor([0])
        for _, ((row, col), val) in enumerate(data_loader):
            row = row.long()
            if isinstance(col, list):
                col = tuple(c.long() for c in col)
            else:
                col = col.long()
            val = val.float()

            preds = self.model(row, col)
            loss = self.loss_function(preds, val)
            total_loss += loss.item()

        total_loss /= data_size
        return total_loss[0]

    def step(self, user: torch.tensor, item: torch.tensor,
             rating: torch.tensor = None):
        """Trains the model incrementally."""
        self.model.train()
        
        self.optimizer.zero_grad()
        
        pred = self.model(user, item)
        loss = self.loss_function(pred, rating)
        loss.backward()

        self.optimizer.step()
        
        batch_loss = loss.item()
        return batch_loss

    def recommend(self, user: torch.tensor, k:int = 10) -> torch.tensor:
        """Recommends the top-k items to a specific user."""
        self.model.eval()
        
        u_embed_one = self.user_embeddings(user)
        u_embed_one_reshaped = u_embed_one.reshape((
            1, u_embed_one.shape[0]))
        m_embed = self.item_embeddings.weight
        u_bias_one = self.user_biases(user)
        u_bias_one_reshaped = u_bias_one.reshape((
            1, u_bias_one.shape[0]))
        m_bias = self.item_biases.weight
        
        bias_sum = u_bias_one_reshaped + m_bias
        bias_sum = bias_sum.reshape((bias_sum.shape[1],
                                     bias_sum.shape[0]))

        preds = torch.matmul(u_embed_one_reshaped, m_embed.t())+bias_sum

        return preds.squeeze().argsort()[-k:]

    def save(self, path: str):
        """Saves the model parameters to the given path."""
        torch.save(self.model.state_dict(), path)

    def load(self, path: str):
        """Loads the model parameters from a given path."""
        self.model.load_state_dict(torch.load(path))

### Try training with file 1. If it works, then create a custom data loader that continuously streams batches of data from all the 4 part files

In [19]:
# GLOBALS
FILE_NUM = 1
N_USERS = 480189
N_ITEMS = 17770
BATCH_SIZE = 500
TRAIN_SIZE = 22851074 # corresponds to FILE_NUM
VAL_SIZE = 962152     # corresponds to FILE_NUM
TEST_SIZE = 240538    # corresponds to FILE_NUM

In [85]:
from torch.utils.data import DataLoader

dataset = InteractionsStream(file_num=FILE_NUM, baseline_feats=False,
                             model_type='regression', sample='train',
                             chunksize=2)
train_loader = DataLoader(dataset, batch_size=2, shuffle=False)

['/Users/varunn/Documents/kaggle/netflix-prize-data/prepared_data_for_NN_modelling/user_train_data_1.h5']


In [27]:
test_dataset = InteractionsStream(file_num=FILE_NUM, baseline_feats=False,
                                  model_type='regression', sample='test')
test_loader = DataLoader(test_dataset, batch_size=2, shuffle=False)

['/Users/varunn/Documents/kaggle/netflix-prize-data/prepared_data_for_NN_modelling/user_test_data_1.h5']


In [28]:
val_dataset = InteractionsStream(file_num=FILE_NUM, baseline_feats=False,
                                 model_type='regression', sample='val')
val_loader = DataLoader(val_dataset, batch_size=2, shuffle=False)

['/Users/varunn/Documents/kaggle/netflix-prize-data/prepared_data_for_NN_modelling/user_val_data_1.h5']


In [89]:
def construct_tensor(a):
    out = [] 
    for i in a: 
        out += i.tolist() 
    return tensor(out)

In [95]:
start = time.time()
for i, ((row, col), val) in enumerate(islice(train_loader, 10)):
    
    print(i, '\t', construct_tensor(row).long(), '\t',
          construct_tensor(col).long(),
          '\t', construct_tensor(val).float(), construct_tensor(row).size())
    
print('time taken: %0.2f' % (time.time() - start))

read data
0 	 tensor([161459, 191296,  87375,  27266]) 	 tensor([2138, 1154, 3253, 1201]) 	 tensor([4., 2., 2., 5.]) torch.Size([4])
1 	 tensor([175666, 252679, 141629, 134412]) 	 tensor([4377,  289, 1405, 4340]) 	 tensor([3., 3., 3., 3.]) torch.Size([4])
2 	 tensor([130415, 398255, 296886, 122198]) 	 tensor([2339, 4487, 1026, 1641]) 	 tensor([3., 3., 4., 3.]) torch.Size([4])
3 	 tensor([ 86100, 405639, 187857, 109577]) 	 tensor([3417, 3127, 1901,  995]) 	 tensor([3., 3., 3., 5.]) torch.Size([4])
4 	 tensor([ 55618, 392816, 383780, 392221]) 	 tensor([2560,  329, 3902, 1832]) 	 tensor([3., 3., 4., 1.]) torch.Size([4])
5 	 tensor([360000, 331687, 177187, 327369]) 	 tensor([2574, 1876, 4170, 2620]) 	 tensor([5., 5., 4., 5.]) torch.Size([4])
6 	 tensor([ 97259, 371177, 106697,  32546]) 	 tensor([2327, 3824, 3167, 1434]) 	 tensor([1., 3., 5., 3.]) torch.Size([4])
7 	 tensor([202018, 141240, 331242, 460100]) 	 tensor([ 108, 1011, 2250, 1865]) 	 tensor([4., 5., 2., 4.]) torch.Size([4])
8 	 te

In [30]:
start = time.time()
for i, ((row, col), val) in enumerate(islice(test_loader, 8)):
    print(i, '\t', row.long(), '\t', col.long(), '\t', val.float())
    
print('time taken: %0.2f' % (time.time() - start))

read data
0 	 tensor([294990, 372871]) 	 tensor([ 952, 1306]) 	 tensor([3., 3.])
1 	 tensor([207396, 330647]) 	 tensor([3924,  357]) 	 tensor([4., 2.])
2 	 tensor([169970,  67519]) 	 tensor([ 311, 2991]) 	 tensor([3., 3.])
3 	 tensor([332836,  59424]) 	 tensor([2881,  570]) 	 tensor([4., 5.])
4 	 tensor([  5795, 193554]) 	 tensor([ 456, 3863]) 	 tensor([2., 5.])
5 	 tensor([293805, 261844]) 	 tensor([1143, 4079]) 	 tensor([5., 5.])
6 	 tensor([65289, 24001]) 	 tensor([2847, 3797]) 	 tensor([5., 5.])
7 	 tensor([ 24053, 246913]) 	 tensor([1859, 1743]) 	 tensor([4., 4.])
time taken: 1.41


In [31]:
start = time.time()
for i, batch in enumerate(islice(val_loader, 8)):
    print(i, '\t', batch)
    
print('time taken: %0.2f' % (time.time() - start))

read data
0 	 [[tensor([ 95805, 392278]), tensor([3714, 1641])], tensor([5., 5.], dtype=torch.float64)]
1 	 [[tensor([231167, 228667]), tensor([3623,  894])], tensor([4., 4.], dtype=torch.float64)]
2 	 [[tensor([183932, 275970]), tensor([1179, 2605])], tensor([3., 4.], dtype=torch.float64)]
3 	 [[tensor([402484, 418136]), tensor([3221, 1809])], tensor([3., 4.], dtype=torch.float64)]
4 	 [[tensor([371283, 336578]), tensor([1797, 3152])], tensor([5., 5.], dtype=torch.float64)]
5 	 [[tensor([156182, 407293]), tensor([4431, 1702])], tensor([2., 4.], dtype=torch.float64)]
6 	 [[tensor([221310, 210155]), tensor([ 788, 1651])], tensor([5., 5.], dtype=torch.float64)]
7 	 [[tensor([244322, 263423]), tensor([ 988, 4355])], tensor([4., 3.], dtype=torch.float64)]
time taken: 5.52


### Training on 1 part file owing to data size and resource constraints

In [20]:
from torch.utils.data import DataLoader


train_dataset = InteractionsStream(
    file_num=FILE_NUM, baseline_feats=False, model_type='regression',
    sample='train', chunksize=10)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE,
                          shuffle=False)

['/Users/varunn/Documents/kaggle/netflix-prize-data/prepared_data_for_NN_modelling/user_train_data_1.h5']


In [21]:
test_dataset = InteractionsStream(file_num=FILE_NUM, baseline_feats=False,
                                  model_type='regression', sample='test')
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE,
                         shuffle=False)

val_dataset = InteractionsStream(file_num=FILE_NUM, baseline_feats=False,
                                 model_type='regression', sample='val')
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE,
                        shuffle=False)

['/Users/varunn/Documents/kaggle/netflix-prize-data/prepared_data_for_NN_modelling/user_test_data_1.h5']
['/Users/varunn/Documents/kaggle/netflix-prize-data/prepared_data_for_NN_modelling/user_val_data_1.h5']


In [22]:
start = time.time()

net = BaseModule(n_users=N_USERS, n_items=N_ITEMS, n_factors=100,
                 dropout_p=0.02)
model = Step(net, lr=0.02, weight_decay=0.1, batch_size=BATCH_SIZE,
             chunksize=10)
print('time taken: %0.2f' % (time.time() - start))

time taken: 0.82


In [23]:
model.model

BaseModule(
  (user_embeddings): Embedding(480189, 100)
  (user_biases): Embedding(480189, 1)
  (item_embeddings): Embedding(17770, 100)
  (item_biases): Embedding(17770, 1)
  (dropout): Dropout(p=0.02, inplace=False)
)

In [28]:
start = time.time()

model.batch_fit(train_loader, TRAIN_SIZE, epochs=1)

print('time taken: %0.2f' % (time.time() - start))

  0%|          | 0/4570 [00:00<?, ?it/s]

read data


4571it [3:26:25,  2.23s/it]                            

time taken: 12385.76





In [29]:
model.losses

[tensor([2.7790]), tensor([1.0267]), tensor([1.0279])]

In [30]:
start = time.time()
print('loss of model on test: ',
      model._validation_loss(test_loader, TEST_SIZE))

read data
loss of model on test:  tensor(1.0241)


In [27]:
model_fn = os.path.join(MODEL_DIR, 'model_NN_MF_{}_E2.pt'.format(
    FILE_NUM))
model.save(model_fn)

In [31]:
# get prediction for test set

preds = []
actuals = []
with torch.no_grad():
    for _, ((row, col), val) in enumerate(test_loader):
        row = row.long()
        if isinstance(col, list):
            col = tuple(c.long() for c in col)
        else:
            col = col.long()
        val = val.float()
        #print(row, col, val)

        pred = model.model(row, col)
        #print(pred)
        preds.append(pred.tolist())
        actuals.append(val.tolist())

read data


In [32]:
len(actuals), len(preds)

(482, 482)

In [33]:
final_preds = [item for sublist in preds for item in sublist]
final_actuals = [item for sublist in actuals for item in sublist]

In [34]:
len(final_preds), len(final_actuals), final_preds[:10], final_actuals[:10]

(240538,
 240538,
 [3.1168930530548096,
  3.129922866821289,
  3.370454788208008,
  3.2390098571777344,
  3.769129514694214,
  3.7638726234436035,
  3.4293339252471924,
  4.000853538513184,
  3.80224347114563,
  4.212867736816406],
 [3.0, 3.0, 4.0, 2.0, 3.0, 3.0, 4.0, 5.0, 2.0, 5.0])

In [35]:
from sklearn.metrics import mean_squared_error
rmse = np.sqrt(mean_squared_error(y_true=final_actuals,
                                  y_pred=final_preds))
print('Test RMSE: %0.4f' % (rmse))

Test RMSE: 1.0120


## Experiment 3 - NN Regression with baseline features

In [5]:
import torch
from torch import nn
from torch.nn import functional as F
from torch.utils.data import IterableDataset
from itertools import chain, islice


class InteractionsStream(IterableDataset):

    def __init__(self, prep_data_dir=PREPARED_DATA_DIR, file_num=None,
                 sample='train', user_col='User', item_col='Movie',
                 end_token='.h5', start_token='user_{}_data_',
                 baseline_feats=False, model_type='regression',
                 chunksize=10):

        if file_num is None:
            self.files = [os.path.join(prep_data_dir, x) for x in
                          _find_files(prep_data_dir,
                                      start_token.format(sample),
                                      end_token)]
        else:
            self.files = [
                os.path.join(prep_data_dir,
                             start_token.format(sample)+str(file_num)+
                             end_token)]
        print(self.files)
        self.user_col = user_col
        self.item_col = item_col
        self.baseline_feats = baseline_feats
        self.sample = sample
        self.chunksize = chunksize
        if model_type == 'regression':
            self.dv_col = 'Rating'
        elif model_type == 'classification':
            self.dv_col = 'Rating_class'
        self.cat_cols = [self.user_col, self.item_col]
        
        if baseline_feats:
            self.numeric_cols = [
                'days_since_first_user_rating',
                'sqrt_days_since_first_user_rating',
                'rating_age_days_user', 'rating_age_weeks_user',
                'rating_age_months_user', 'mean_ratings_user',
                'num_ratings_user', 'days_since_first_item_rating',
                'sqrt_days_since_first_item_rating',
                'rating_age_days_item', 'rating_age_weeks_item',
                'rating_age_months_item', 'mean_ratings_movie',
                'weighted_mean_ratings_movie', 'num_ratings_movie']
        else:
            self.numeric_cols = []            

    def read_file(self, fn):
        
        if self.sample == 'train':
            df = pd.read_hdf(fn, key='stage', iterator=True,
                             chunksize=self.chunksize)
        else:
            df = pd.read_hdf(fn, key='stage')
        
        return df
    
    def iter_data(self, data):
        
        if self.sample == 'train':
            if self.baseline_feats:
                for row in data:
                    yield (row[self.cat_cols].values.tolist(),
                           row[self.numeric_cols].values.tolist(),
                           row[self.dv_col].tolist())
            else:
                for row in data:
                    user = row[self.user_col].tolist()
                    item = row[self.item_col].tolist()
                    y = row[self.dv_col].tolist()
                    yield (user, item), y
        else:
            if self.baseline_feats:
                for i, row in data.iterrows():
                    yield (row[self.cat_cols].values,
                           row[self.numeric_cols].values, 
                           row[self.dv_col])
            else:
                for i, row in data.iterrows():
                    yield (row[self.user_col],
                           row[self.item_col]), row[self.dv_col]

    def process_data(self, fn):

        print('read data')
        data = self.read_file(fn)

        print('create an iterable')
        self.iter_data(data)

    def get_stream(self, files):
        return chain.from_iterable(map(self.process_data, files))

    def __iter__(self):
        return self.get_stream(self.files)

In [6]:
class TabularModel(nn.Module):
    """
    Defines the neural network for product recommendation
    """

    def __init__(self, embedding_sizes, n_cont, n_classes=3):
        super().__init__()
        self.embeddings = nn.ModuleList([nn.Embedding(categories, size) for
                                         categories, size in embedding_sizes])
        n_emb = sum(e.embedding_dim for e in self.embeddings)
        self.n_emb, self.n_cont, self.n_classes = n_emb, n_cont, n_classes
        self.lin1 = nn.Linear(self.n_emb + self.n_cont, 200)
        self.lin2 = nn.Linear(200, 70)
        self.lin3 = nn.Linear(70, self.n_classes)
        self.bn1 = nn.BatchNorm1d(self.n_cont)
        self.bn2 = nn.BatchNorm1d(200)
        self.bn3 = nn.BatchNorm1d(70)
        self.emb_drop = nn.Dropout(0.6)
        self.drops = nn.Dropout(0.3)


    def forward(self, x_cat, x_cont):
        x = [e(x_cat[:, i]) for i, e in enumerate(self.embeddings)]
        x = torch.cat(x, 1)
        x = self.emb_drop(x)
        x2 = self.bn1(x_cont)
        x = torch.cat([x, x2], 1)
        x = F.relu(self.lin1(x))
        x = self.drops(x)
        x = self.bn2(x)
        x = F.relu(self.lin2(x))
        x = self.drops(x)
        x = self.bn3(x)
        x = self.lin3(x)

        return x

In [11]:
torch.get_num_threads(), nn.parallel.DistributedDataParallelCPU

(1, <function torch.nn.parallel.DistributedDataParallelCPU(*args, **kwargs)>)

In [31]:
(160 * 4570214)/22851074

31.999994398512737