In [120]:
import os 
import pandas as pd 
import numpy as np 
import networkx as nx 
from itertools import permutations
from math import factorial
from sklearn.model_selection import train_test_split
from functools import reduce



ROOT = os.getenv('ROOT_FOLDER')
df = pd.read_csv(os.path.join(ROOT, "app/resources/rating.csv"), usecols=['userId', 'movieId', 'timestamp',
                                                                          'rating'])
df = df.assign(timestamp=pd.to_datetime(df.timestamp),
              movieId=df.movieId.astype(str))

In [2]:
df.userId.unique().shape, df.movieId.unique().shape

((138493,), (26744,))

###  Train test 

Steps 

1. Randomly sample 10% of users and movies
2. Keep movies and users that have atleast 10 number of edges in the dataset (this helps to stratify between train and test set)
3. Train, test splits with stratification on movieIds (to ensure that there are no new movies in the test set)



In [3]:
def movie_filter(df, col, thresh):
    m = df[col].value_counts()>thresh
    return df[df[col].isin(m[m].index)]


def get_subsample(df, col, size=0.10):
    uniques = np.unique(df[col]) if col else np.unique(df.index)
    subsample = np.random.choice(uniques, size=int(len(uniques)*size), replace=False)
    return df[df[col].isin(subsample)] if col else df[df.index.isin(subsample)]
        

In [4]:
 


df_sample = get_subsample(df, 'userId', 0.10)
df_sample = get_subsample(df_sample, 'movieId', 0.10)
df_sample = movie_filter(df_sample, 'movieId', 10)
df_sample = movie_filter(df_sample, 'userId', 10)



In [209]:
df_sample.to_pickle(os.path.join(ROOT, "app/resources/subsample.p"))

In [5]:
df_sample.shape,df_sample.userId.unique().shape,df_sample.movieId.unique().shape

((144207, 3), (4551,), (840,))

In [6]:
train, test = train_test_split(df_sample, 
                                 test_size=0.2, 
                                 stratify=df_sample[['movieId']], 
                              random_state=42)

In [7]:
train.shape, test.shape

((115365, 3), (28842, 3))

In [8]:
train.userId.unique().shape,train.movieId.unique().shape

((4551,), (840,))

In [9]:
test.userId.unique().shape,test.movieId.unique().shape

((4442,), (840,))

In [10]:
set(test.userId).difference(set(train.userId))

set()

In [11]:
set(test.movieId).difference(set(train.movieId))

set()

In [None]:
len(set(zip(test.userId, test.movieId)).difference(set(zip(train.userId, train.movieId))))

###  User item Bipartite graph

In [12]:
def user_item_bipartite(df):
    
    B = nx.Graph()
    B.add_nodes_from(df.userId.unique(), bipartite=0)
    B.add_nodes_from(df.movieId.unique(), bipartite=1)
    # add edges only between nodes of opposite sets
    B.add_edges_from(list(zip(df.userId, df.movieId)))
    return B
    

In [13]:
B = user_item_bipartite(train)

In [212]:

nx.write_gpickle(user_item_bipartite(test), os.path.join(ROOT, "app/resources/user_item_graph_test.p"))

In [210]:
nx.write_gpickle(B, os.path.join(ROOT, "app/resources/user_item_graph.p"))

### Item Item graph

In [14]:

def nPr(n, r):
    return int(factorial(n)/factorial(n-r))


def get_edge_weight_bool(graph, item1, item2):
    
    g = nx.common_neighbors(graph, item1, item2)
    
    if_edge = False
    try:
        if_edge = True if next(g) else if_edge
    except Exception as e:
        print(f"No edge found between the items {item1} & {item2}")

    return if_edge

def get_edge_weight_float(graph, item1, item2):
    
    """
    item1 -> item 2 : P(item2|item1) = P(item2 and item1)/P(item1)
    """
    probab_items = len(list(nx.common_neighbors(graph, item1, item2)))
    return 0 if probab_items==0 else len(list(graph.neighbors(item1)))/probab_items
    


In [15]:
def build_item_item_graph(permutations_generator, user_item_graph, edge_weight_fn):
    
    out_graph = nx.DiGraph()
    for item1, item2 in permutations_generator:
        wt = edge_weight_fn(user_item_graph, item1, item2)
        
        if wt:
            out_graph.add_edge(item1, item2) if wt is True else out_graph.add_edge(item1, item2, weight=wt)
            
    return out_graph



def preference_vector(user, user_item_graph):
    "we have no weights in user_item graphs because we assume model is ratings agnostic"
    
    nitems = user_item_graph.degree(user)
    
    return {item:1/nitems for item in user_item_graph.neighbors(user)}

In [16]:
nPr(845, 2)

713180

In [17]:
%%time 

ss = train.movieId.unique()
pairs_generator = permutations(ss, r=2)
I = build_item_item_graph(pairs_generator, B, get_edge_weight_float)

CPU times: user 2min 13s, sys: 618 ms, total: 2min 14s
Wall time: 2min 16s


In [204]:
nx.write_gpickle(I, os.path.join(ROOT, "app/resources/item_item_graph.p"))

In [18]:
nx.info(I)


  nx.info(I)


'DiGraph with 840 nodes and 569114 edges'

In [70]:
%%time 

scores_100 = get_page_rank_scores(test.userId.unique()[:100], I, B)

CPU times: user 2min 14s, sys: 3.87 s, total: 2min 18s
Wall time: 2min 23s


In [71]:
def get_page_rank_scores(users, item_item_graph, user_item_graph):
    
    scores = {user:
              {movie:score for movie,score in nx.pagerank(item_item_graph, 
                                      personalization=preference_vector(user, user_item_graph)).items()
              if movie not in list(user_item_graph.neighbors(user))}
              
          for user in users}
    return scores
    
    
def helper(args):
    return get_page_rank_scores(*args)


def chunks(lst, n):
    """Yield successive n-sized chunks from lst."""
    for i in range(0, len(lst), n):
        yield lst[i:i + n]
        

def merge_dicts(dicts):
    return reduce(lambda x,y: {**x, **y}, dicts)

In [None]:
import multiprocessing as mp 

mp.set_start_method("fork")

In [105]:

from concurrent.futures import ProcessPoolExecutor

executor=ProcessPoolExecutor(max_workers=4)



In [106]:
%%time 

args = [(batch, I, B) for batch in chunks(test.userId.unique(), 100)]
out = [result for result in executor.map(helper, args)]

out_merged = merge_dicts(out)

CPU times: user 40.6 s, sys: 10.2 s, total: 50.8 s
Wall time: 1h 1min 48s


Process ForkProcess-8:
Process ForkProcess-9:
Process ForkProcess-7:
Process ForkProcess-10:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "/Users/siddhanttandon/.pyenv/versions/3.8.0/lib/python3.8/multiprocessing/process.py", line 313, in _bootstrap
    self.run()


In [109]:
pd.to_pickle(out_merged, os.path.join(ROOT, "app/resources/test_results.p"))

### Evaluation

In [200]:
class QueryMetrics:
    """
    Base class that calculates the given metric for all the queries present in the dataframe. The class takes a \
    dataframe. Every row of the dataframe is a document which must have a query id, relevance score and a predicted \
    score. These three column names can be set in the constructor of the derived class. The output of the class is a \
    dictionary with keys as query ids and values as metric score.
    
    :param query_col: name of the column of the dataframe having query ids
    :param preds_col: name of the column of the dataframe having predicted scores
    :param relevance_col: name of the column of the dataframe having relevance scores

    """
    
    def __init__(self, query_col, preds_col, relevance_col):
        self.query_col = query_col
        self.preds_col = preds_col
        self.relevance_col = relevance_col

    def __totuple__(self, x):
        return list(zip(x[self.relevance_col], x[self.preds_col]))

    @staticmethod
    def _sortByscore(x):
        return sorted(x, key=lambda k: k[1], reverse=True)

    @staticmethod
    def _sortByrelevance(x):
        return sorted(x, key=lambda k: k[0], reverse=True)
        
    def calculate_metrics(self, df):
        query_dict = {query_id: self.__totuple__(df[df.index.isin(q_index)])
                      for query_id, q_index in df.groupby([self.query_col]).groups.items()}
        
        metric_dict = {query_id: self.get_metric(xx) for query_id, xx in query_dict.items()}
        return metric_dict
    
    
class Ndcg(QueryMetrics):
    """
    This class implements the Normalized Discounted Cumulative Gain metric. Ndcg is a metric widely used in information
    retrieval domain to evaluate quality of search results of a search engine. Since we care more about top ranking \
    documents rather than the bottom ranking documents, this metric penalizes more if ranking results have mistakes \
    in the top order compared to mistakes in the bottom order. The range of this metric is between 0-1 and a higher \
    score is better.
    
    This class assumes top ranking documents should have a higher relevance score. If there are 5 documents in the best
    ranking order 1,2,3,4,5 then the relevance scores should be 5,4,3,2,1.
    
    The relevance scores can be scaled using the label_gain parameter which is a dictionary. For example to scale the \
    relevances 5,4,3,2,1 by a factor of 10 we can set label_gain={1:10,2:20,3:30,4:40,5:50} meaning the 1st ranking \
    document which has a relevance of 5 is now weighted as 50 and the least ranking document is weighted as 10.
    
    For a single query_id the metric can be calculated by calling Ndcg.get_metric method. The input should be a list \
    of tuples where every tuple is made of (relevance, score) where relevance is the relevance of the document and \
    score is the score of the document assigned by the estimator. The output is a score in float of the given query id.
    
    For multiple query ids the metric can be calculated by calling Ndcg.calculate_metrics which is a method of it's \
    baseclass. The method takes a dataframe having query id, relevance and predicted score columns. These columns can \
    be set in the constructor of the class. The output is a dictionary with keys as query ids and values as metric \
    score.
    
    :param k: integer specifying topk documents to consider when calculating ndcg score. 
    :param label_gain: dictionary with keys as relevance and values as corresponding weight for relevance.
    :param _log: function to calculate logarithm. Default is np.log2 which is log base 2.
    :param query_col: name of the column of the dataframe having query ids
    :param preds_col: name of the column of the dataframe having predicted scores
    :param relevance_col: name of the column of the dataframe having relevance scores
    
    Usage:
    
    df = pd.DataFrame({'query_id':[ii for i in range(5) for ii in [i]*5],
                   'preds':[ii for i in range(5) for ii in np.random.random_sample((5,))],
                   'relevance':[ii for i in range(5) for ii in np.random.choice(range(1,6),size=5, replace=False)]})
                   
    out = Ndcg(k=5,  query_col='query_id', preds_col='preds', relevance_col='relevance').calculate_metrics(df)
    
    #out
    {0: 0.9542080637541479,
     1: 0.7392561108165644,
     2: 0.7875305117652345,
     3: 0.9073155929493236,
     4: 0.8386893446302166}
     
    """

    def __init__(self, k, label_gain=None, _log=np.log2, 
                 query_col='userId', preds_col='score', relevance_col='rating'):
        
        super(Ndcg, self).__init__(query_col=query_col, preds_col=preds_col, relevance_col=relevance_col)
        self.k = k
        self.label_gain = label_gain if label_gain is not None else {i: i for i in range(32)}
        self._log = _log
    
    def __getWeight__(self, rel):
        return self.label_gain.get(rel, rel)
        
    def dcg(self, relevance_with_scores):
        
        relevances = [self.__getWeight__(rel) for rel, score in self._sortByscore(relevance_with_scores)[:self.k]]
        indexes = np.arange(1, len(relevances)+1)
        return np.sum(relevances/self._log(indexes+1))
    
    def idcg(self, relevance_with_scores):
        
        relevances = [self.__getWeight__(rel) for rel, score in self._sortByrelevance(relevance_with_scores)[:self.k]]
        indexes = np.arange(1, len(relevances)+1)
        return np.sum(relevances/self._log(indexes+1))

    def get_metric(self, relevance_with_scores):
        return self.dcg(relevance_with_scores)/self.idcg(relevance_with_scores)
    
    

In [150]:
ndcg = Ndcg(k=10, query_col='userId', preds_col='score', relevance_col='rating')

In [181]:
def to_dataframe(test_results):

    xx = {k:pd.DataFrame.from_dict(v, orient='index', 
                              columns=['score'],).reset_index().rename(columns={'index':'movieId'})
            for k,v in test_results.items()
        }
    return pd.concat(xx).rename_axis(['userId', None]).reset_index(level=0)

In [183]:
user_scores = to_dataframe(out_merged)

In [190]:
x = test2.merge(user_scores, on=['movieId', 'userId'])

In [203]:
pd.DataFrame.from_dict(ndcg.calculate_metrics(x), orient='index').mean()

0    0.91615
dtype: float64

### Todo: 

1. Make function for personalization_vector for a given user 
2. Train and test splits and restrict the sample space 
3. Check how can we scale the solution
4. run the algorithm 
5. evaluate using ndcg