In [2]:
import os
repo_name = 'final-project-qrdecomposition_final'
data_path = '../downloads'

if not os.path.isdir(data_path):
    os.mkdir(data_path)

In [3]:
###utilities
from tqdm import tqdm
import time
import warnings
warnings.filterwarnings("ignore")

###pyspark dependencies
from pyspark.sql import SparkSession
import pyspark.ml as M
import pyspark.sql.functions as F
import pyspark.sql.window as W
import pyspark.sql.types as T
from pyspark.ml.recommendation import ALS

###numpy,scipy,pandas,sklearn stacks
from scipy import sparse
import pandas as pd
import numpy as np
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import FunctionTransformer
from sklearn.pipeline import Pipeline

###torch stacks
import torch
from torch import nn
from pytorch_widedeep.preprocessing import DensePreprocessor
from pytorch_widedeep.callbacks import (
    LRHistory,
    EarlyStopping,
    ModelCheckpoint,
)
from pytorch_widedeep.optim import RAdam
from pytorch_widedeep.initializers import XavierNormal, KaimingNormal
from pytorch_widedeep.models import Wide, DeepDense, WideDeep
use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")
print(device, torch.__version__)

cuda 1.6.0+cu101


In [4]:
os.environ["JAVA_HOME"] = "/datasets/home/65/965/yux164/.jdk/jdk-11.0.9.1+1" #for java path
import psutil
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
NUM_WORKER = psutil.cpu_count(logical = False)
NUM_THREAD = psutil.cpu_count(logical = True)
def spark_session():
    """[function for creating spark session]

    Returns:
        [Spark Session]: [the spark session]
    """    
    conf_spark = SparkConf().set("spark.driver.host", "127.0.0.1")\
                            .set("spark.executor.instances", NUM_WORKER)\
                            .set("spark.executor.cores", int(NUM_THREAD / NUM_WORKER))\
                            .set("spark.executor.memory", '4g')\
                            .set("spark.sql.shuffle.partitions", NUM_THREAD)
    sc = SparkContext(conf = conf_spark)
    sc.setLogLevel('ERROR')
    spark = SparkSession(sc)
    print('Spark UI address {}'.format(spark.sparkContext.uiWebUrl))
    return spark

spark = spark_session()

Spark UI address http://127.0.0.1:4040


In [5]:
#load sample from local path
compressed_sample_path = '../data/sample.tar.gz'
!tar -xzvf $compressed_sample_path -C $data_path
!ls $data_path

sample_path = os.path.join(data_path, 'samples', 'sample.csv')
sample = spark.read.csv(sample_path, header=True).select('userId', 'movieId', 'rating').persist()
sample_df = pd.read_csv(sample_path).drop('timestamp', axis = 1)

samples/
samples/sample_train.csv
samples/sample.csv
samples/sample_test.csv
model_results  movies.csv  samples


In [6]:
# load from local files
sample_train_path = os.path.join(data_path, 'samples', 'sample_train.csv')
sample_test_path = os.path.join(data_path, 'samples', 'sample_test.csv')
movie_path =  os.path.join(data_path, 'movies.csv')

sample_train = spark.read.csv(sample_train_path, header=True)
sample_test = spark.read.csv(sample_test_path, header=True)
sample_train_df = pd.read_csv(sample_train_path)
sample_test_df = pd.read_csv(sample_test_path)

movies = spark.read.csv(movie_path, header=True)
movies_df = pd.read_csv(movie_path)

sample_df = sample_df.merge(movies_df)
sample_train_df, sample_test_df = sample_train_df.merge(movies_df), sample_test_df.merge(movies_df)

In [7]:
print(sample.count(), sample_train.count(), sample_test.count())

print(sample_df.shape, sample_train_df.shape, sample_test_df.shape)

4340404 3255932 1084472
(4340404, 5) (3255932, 5) (1084472, 5)


In [8]:
def base_recommend(spark, 
                   base_model, 
                   cold_start_model, 
                   user_ids, 
                   movies, 
                   n,
                   extra_features,
                   user_id, 
                   item_id):
    
    userset = list(set(user_ids))
    users = spark.createDataFrame(pd.DataFrame({base_model.userCol: userset}))
    base_recommend = base_model.recommend(users, n).toPandas()
    base_recommend = base_recommend.merge(movies, how='left')
    base_recommend = base_recommend[[user_id, item_id] + extra_features]
    base_recommend = base_recommend.astype({user_id: np.int64,
                                            item_id: np.int64})
    cold_start_users = set(user_ids) - set(base_recommend[user_id].tolist())
    for user in cold_start_users:
        cold_recommend = cold_start_model.recommend().toPandas().values.reshape(-1,)
        user_lst = [user for _ in range(n)]
        cold_recommendation = pd.DataFrame({user_id: user_lst, item_id: cold_recommend})
        cold_recommendation = cold_recommendation.astype({user_id: np.int64,
                                                        item_id: np.int64})
        cold_recommendation = cold_recommendation.merge(movies, how='left')
        cold_recommendation = cold_recommendation[[user_id, item_id] + extra_features]
        base_recommend = base_recommend.append(cold_recommendation, ignore_index=True)
    return base_recommend
def advanced_recommend(advanced_recommender,
                       base_recommend, 
                       k, 
                       user_id, 
                       item_id):
    df = base_recommend.copy()
    prediction = advanced_model.predict(df)
    df['prediction'] = prediction
    df = df.set_index(item_id).groupby(user_id).prediction\
        .apply(lambda x: x.sort_values(ascending=False)[:k]).reset_index()
    return df

def final_recommender(spark, 
                base_model, 
                cold_start_model, 
                advanced_recommender,
                users,
                movies,
                n = 50,
                k = 5,
                user_id = 'userId',
                item_id = 'movieId',
                extra_features = ['genres']
               ):
    base_recommend_items  = base_recommend(spark, base_model, cold_start_model, users, movies, n, extra_features, user_id, item_id)
    return advanced_recommend(advanced_recommender, base_recommend_items, k, user_id, item_id)

In [9]:
class Memory_based_CF():
    def __init__(self, spark, base, usercol='userId', itemcol='movieId', ratingcol='rating'):
        """[the memory based collabritive filtering model]
        Args:
            spark (Spark Session): [the current spark session]
            base (str): [user base or item base]
            usercol (str, optional): [user column name]. Defaults to 'userId'.
            itemcol (str, optional): [item column name]. Defaults to 'movieId'.
            ratingcol (str, optional): [rating/target column name]. Defaults to 'rating'.
        """        
        self.base = base
        self.usercol = usercol
        self.itemcol = itemcol
        self.ratingcol = ratingcol
        self.spark = spark
        self.X = None
        self.idxer = None
        self.similarity_matrix = None
        self.prediction_matrix = None
    def fit(self, _X):
        """[to train the model]
        Args:
            _X (Pyspark DataFrame): [the training set]
        """
        X = self._preprocess(_X, True)
        self.X = X
        self.similarity_matrix = self._pearson_corr(X)
        self.prediction_matrix = self._get_predict()
        
    def predict(self, _X):
        """[to predict based on trained model]
        Args:
            _X (Pyspark DataFrame): [the DataFrame needed to make prediction]
        Returns:
            [Pyspark DataFrame]: [the DataFrame with prediction column]
        """        
        rows, cols = self._preprocess(_X, False)
        preds = []
        for i,j in zip(rows,cols):   
            preds.append(self.prediction_matrix[i, j])
        df = self.idxer.transform(_X).select(self.usercol, self.itemcol, self.ratingcol).toPandas()
        df['prediction'] = preds
        return self.spark.createDataFrame(df)

    def recommend(self, X, numItem):
        idices = self.idxer.u_indxer.transform(X).toPandas()['userId_idx'].values.astype(int)
        items = np.asarray(np.argsort(self.prediction_matrix.T[idices, :])[:, -numItem:])
        result = np.zeros((1, 3))
        inverse_imat = pd.Series(self.idxer.i_indxer.labels)
        inverse_umat = pd.Series(self.idxer.u_indxer.labels)
        for u, i in zip(idices, items):
            result = np.vstack((result, np.hstack((inverse_umat.iloc[np.array([u for _ in range(len(i))])].values.reshape(-1, 1),
                                inverse_imat.iloc[i.reshape(-k,)].values.reshape(-1, 1), 
                                np.asarray(self.prediction_matrix.T[np.array([u for _ in range(len(i))]), i]).reshape(-1, 1)))))
        df = pd.DataFrame(result[1:], columns = ['userId', 'movieId', 'prediction'])
        return self.spark.createDataFrame(df)


    def _preprocess(self, X, fit):
        """[preprocessing function before training and predicting]
        Args:
            X (Pyspark DataFrame): [training/predicting set]
            fit (bool): [if it is on training stage or not]
        Raises:
            NotImplementedError: [if not User base or Item base]
        Returns:
            sparse.csr_matrix: [if on training stage],
            numpy.array: [row and columns in np.array if on prediction stage]
        """        
        if fit:
            self.idxer = indexTransformer(self.usercol, self.itemcol)
            self.idxer.fit(X)
            _X = self.idxer.transform(X)\
                            .select(F.col(self.usercol+'_idx').alias(self.usercol), 
                                    F.col(self.itemcol+'_idx').alias(self.itemcol), 
                                    F.col(self.ratingcol))
            _X = _X.toPandas().values
            if self.base == 'user':
                row = _X[:, 0].astype(int)
                col = _X[:, 1].astype(int)
                data = _X[:, 2].astype(float)
            elif self.base == 'item':
                row = _X[:, 1].astype(int)
                col = _X[:, 0].astype(int)
                data = _X[:, 2].astype(float)
            else:
                raise NotImplementedError
            return sparse.csr_matrix((data, (row, col)))
        else:
            _X = self.idxer.transform(X).select(self.usercol+'_idx', self.itemcol+'_idx').toPandas().values
            if self.base == 'user':
                row = _X[:, 0].astype(int)
                col = _X[:, 1].astype(int)
            elif self.base == 'item':
                row = _X[:, 1].astype(int)
                col = _X[:, 0].astype(int)
            else:
                raise NotImplementedError
            return row, col

    def _pearson_corr(self, A):
        """[generating pearson corretion matrix for the model when training]
        Args:
            A (sparse.csr_matrix): [the training set in sparse matrix form with entries of ratings]
        Returns:
            sparse.csr_matrix: [the pearson correlation matrix in sparse form]
        """        
        n = A.shape[1]
        
        rowsum = A.sum(1)
        centering = rowsum.dot(rowsum.T) / n
        C = (A.dot(A.T) - centering) / (n - 1)
        
        d = np.diag(C)
        coeffs = C / np.sqrt(np.outer(d, d))
        return np.array(np.nan_to_num(coeffs)) - np.eye(A.shape[0])
    def _get_predict(self):
        """[generating prediction matrix]
        Returns:
            sparse.csr_matrix: [the prediction matrix in sparse form]
        """        
        mu_iarray = np.array(np.nan_to_num(self.X.sum(1) / (self.X != 0).sum(1))).reshape(-1)
        mu_imat = np.vstack([mu_iarray for _ in range(self.X.shape[1])]).T
        x = self.X.copy()
        x[x==0] = np.NaN
        diff = np.nan_to_num(x-mu_imat)
        sim_norm_mat = abs(self.similarity_matrix).dot((diff!=0).astype(int))
        w = self.similarity_matrix.dot(diff) / sim_norm_mat
        w = np.nan_to_num(w)
        return mu_imat + w
class indexTransformer():
    """[helper class for memory based model]
    """    
    def __init__(self, usercol='userId', itemcol='movieId', ratingcol='rating'):
        """[the index transformer for matrix purpose]
        Args:
            usercol (str, optional): [user column name]. Defaults to 'userId'.
            itemcol (str, optional): [item column name]. Defaults to 'movieId'.
        """        
        self.usercol = usercol
        self.itemcol = itemcol
        self.ratingcol = ratingcol
        self.u_indxer =  M.feature.StringIndexer(inputCol=usercol, 
                                                outputCol=usercol+'_idx', 
                                                handleInvalid = 'skip')
        self.i_indxer = M.feature.StringIndexer(inputCol=itemcol, 
                                                outputCol=itemcol+'_idx', 
                                                handleInvalid = 'skip')
        self.X = None
    def fit(self, X):
        """[to train the transformer]
        Args:
            X (Pyspark DataFrame): [the DataFrame for training]
        """        
        self.X = X
        self.u_indxer = self.u_indxer.fit(self.X)
        self.i_indxer = self.i_indxer.fit(self.X)
        return
    def transform(self, X):
        """[to transform the DataFrame]
        Args:
            X (Pyspark DataFrame): [the DataFrame needs to be transformed]
        Returns:
            Pyspark DataFrame: [the transformed DataFrame with index]
        """        
        X_ = self.u_indxer.transform(X)
        X_ = self.i_indxer.transform(X_)
        return X_.orderBy([self.usercol+'_idx', self.itemcol+'_idx'])
    
    def fit_transform(self, X):
        """[combining fit and transform]
        Args:
            X (Pyspark DataFrame): [the DataFrame needs to be trained and transformed]
        Returns:
            Pyspark DataFrame: [the transformed DataFrame with index]
        """        
        self.fit(X)
        return self.transform(X)

In [10]:
class Als():
    """[the predictor for Pyspark ALS]
    """
    def __init__(self, userCol, itemCol, ratingCol, regParam, seed, rank):
        self.userCol = userCol
        self.itemCol = itemCol
        self.ratingCol = ratingCol
        self.model =None
        self.als = ALS(userCol=userCol,
                itemCol=itemCol,
                ratingCol=ratingCol,
                coldStartStrategy="drop",
                nonnegative=True,
                regParam=regParam,
                seed=seed,
                rank=rank)
    def fit(self, _X):
        """[function to train parameter of predictor]
        Args:
            _X (Pyspark DataFrame): [training set]
        """
        X = self._preprocess(_X)
        self.model = self.als.fit(X)

    def predict(self, _X):
        """[function to make predict over test set]
        Args:
            _X (Pyspark DataFrame): [test set]
        Returns:
            Pyspark DataFrame: [DataFrame with 'prediction' column which has the predicting value]
        """        
        X = self._preprocess(_X)
        return self.model.transform(X)

    def recommend(self, X, numItems):
        return self.model.recommendForUserSubset(X, numItems)\
                .select(self.userCol, F.explode('recommendations').alias('recommendations'))\
                .select(self.userCol, 'recommendations.*')\
                .select(self.userCol, self.itemCol, F.col(self.ratingCol).alias('prediction'))
    def _preprocess(self, _X):
        """[preprocess the input dataset]
        Args:
            _X (Pyspark DataFrame): [the training or test set]
        Returns:
            Pyspark DataFrame: [the preprocessed DataFrame]
        """
        cast_int = lambda df: df.select([F.col(c).cast('int') for c in [self.userCol, self.itemCol]] + \
                                [F.col(self.ratingCol).cast('float')])
        return cast_int(_X)

In [11]:
class code_start():
    def __init__(self, movie):
        movie_copy = movie.withColumn("year",F.regexp_extract(movie.title,r"(\d{4})",0).cast(T.IntegerType()))
        movie_copy = movie_copy.withColumn("genre",F.explode(F.split(movie.genres,pattern="\|")))
        movie_copy = movie_copy.select("movieId","title","genre","year")
        genres = movie_copy.select("genre").distinct().toPandas()['genre'].tolist()

        sample_copy = sample.select("userId","movieId")

        total = sample_copy.join(movie_copy,["movieId"],'left')
        popular = total.groupby("movieId").count().sort("count",ascending=False)
        
        self.movie = movie
        self.popular = popular
        
    def recommend(self):
        return self.popular.select("movieId").limit(50).select('movieId')

In [12]:
class wide_deep():
    def __init__(self,wide_cols='genres',
                    deep_cols=['userId', 'movieId'],
                    target_col = 'rating',
                    deep_embs=[64, 64],
                    deep_hidden=[64,32,16],
                    deep_dropout=[0.1, 0.1, .1],
                    deep_bachnorm=True):
        self.wide = None
        self.deep = None
        self.deep_hidden = deep_hidden
        self.deep_dropout = deep_dropout
        self.deep_bachnorm = deep_bachnorm
        self.model = None
        self.wide_cols = wide_cols
        self.deep_cols = deep_cols
        self.embs = [(col, dim) for col, dim in zip(deep_cols, deep_embs)]
        self.wide_preprocessor = self._genre_preprocessor(wide_cols)
        self.deep_preprocessor = DensePreprocessor(embed_cols=self.embs)
        self.target_col = target_col


    def fit(self, train, n_epochs=10, batch_size=128, val_split=.1, verbose = True):
        X, y = train.drop(self.target_col, axis = 1), train[self.target_col].values
        wide_feature = self.wide_preprocessor.fit_transform(X)
        deep_feature = self.deep_preprocessor.fit_transform(X)
        self.wide = Wide(wide_dim=np.unique(wide_feature).shape[0], pred_dim=1)
        self.deep = DeepDense(hidden_layers=self.deep_hidden, dropout=self.deep_dropout,
                      batchnorm=self.deep_bachnorm,
                      deep_column_idx=self.deep_preprocessor.deep_column_idx,
                      embed_input=self.deep_preprocessor.embeddings_input)
        self.model =  WideDeep(wide=self.wide, deepdense=self.deep)
        wide_opt = torch.optim.Adam(self.model.wide.parameters(), lr=0.01)
        deep_opt = RAdam(self.model.deepdense.parameters())
        wide_sch = torch.optim.lr_scheduler.StepLR(wide_opt, step_size=3)
        deep_sch = torch.optim.lr_scheduler.StepLR(deep_opt, step_size=5)
        callbacks = [
                        LRHistory(n_epochs=n_epochs),
                        EarlyStopping(patience=5),
                        ModelCheckpoint(filepath="model_weights/wd_out"),
                    ]
        optimizers = {"wide": wide_opt, "deepdense": deep_opt}
        schedulers = {"wide": wide_sch, "deepdense": deep_sch}
        initializers = {"wide": KaimingNormal, "deepdense": XavierNormal}
        self.model.compile(method='regression',
                            optimizers=optimizers,
                        lr_schedulers=schedulers,
                        initializers=initializers,
                        callbacks=callbacks,
                        verbose=verbose)
        self.model.fit(X_wide=wide_feature, 
                  X_deep=deep_feature, 
                  target=y, 
                  n_epochs=n_epochs, 
                  batch_size=batch_size, 
                  val_split=val_split,)
    def load_pretrained(self, train, fp, device):
        X = train.copy()
        if type(self.wide_cols) == str:
            wide_feature = self.wide_preprocessor.fit_transform(X[[self.wide_cols]])
        else:
            wide_feature = self.wide_preprocessor.fit_transform(X[self.wide_cols])
        deep_feature = self.deep_preprocessor.fit_transform(X[self.deep_cols])
        self.wide = Wide(wide_dim=np.unique(wide_feature).shape[0], pred_dim=1)
        self.deep = DeepDense(hidden_layers=self.deep_hidden, dropout=self.deep_dropout,
                      batchnorm=self.deep_bachnorm,
                      deep_column_idx=self.deep_preprocessor.deep_column_idx,
                      embed_input=self.deep_preprocessor.embeddings_input)
        self.model =  torch.load(fp,  map_location=torch.device(device))
        
    def predict(self, test):
        X = test.copy()
        wide_feature = self.wide_preprocessor.transform(X)
        deep_feature = self.deep_preprocessor.transform(X)
        return self.model.predict(X_wide=wide_feature, X_deep=deep_feature)

    def _genre_preprocessor(self, genre_feat):
        dense_layer = lambda X: X.toarray()
        genre_transformer = Pipeline(steps=[
                ('tokenizer', CountVectorizer()),
                ('dense', FunctionTransformer(dense_layer, validate=False))   
        ])
        preproc = ColumnTransformer(transformers=[('genre', genre_transformer, genre_feat),])
        return preproc


    def _deep_preprocessor(self,embs):
        return DensePreprocessor(embed_cols=embs)

In [None]:
item_based = Memory_based_CF(spark, base='item', usercol='userId', itemcol='movieId', ratingcol='rating')
als = Als(userCol='userId', itemCol='movieId', ratingCol='rating', regParam=.15, seed=0, rank=10)

item_based.fit(sample_train)
als.fit(sample_train)