In [2]:
import pandas as pd
import numpy as np
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import dgl
import dgl.function as FN
#import stanfordnlp
import tqdm
import re
import string
import tqdm
import spotlight
import pickle
import time 

# Load Pytorch as backend
dgl.load_backend('pytorch')

Using backend: pytorch


In [3]:
import scipy.sparse as ss

def csv_to_sparse(df_transactions):

    df_transactions = df_transactions.drop_duplicates()

    users=list(df_transactions["customer_id"])
    
    items=list(df_transactions["item_id"])
    
    unique_users=list(set(df_transactions["customer_id"]))
    print(len(unique_users))
    
    unique_items=list(set(df_transactions["item_id"]))
    
    map_users={}
    
    map_val=0
    
    user_list=[]
    
    for user in unique_users:
        map_users[user]=map_val
        map_val+=1
    
    map_items={}
    
    item_list=[]
    
    for item in unique_items:
        map_items[item]=map_val
        map_val+=1
    
    for val in range(len(users)):
        user_list.append(map_users[users[val]])
        item_list.append(map_items[items[val]])
    

    rows=user_list

    cols=item_list
    
    ratings=list(df_transactions["price"])
    
    matrix = ss.coo_matrix((ratings, (rows, cols)))

    return matrix

In [4]:

class DataSource:
    CASSANDRA = "cassandra"
    S3 = "s3"
    JSON = "json"
    LOCAL = "local"

   
# DO NOT EDIT
# Exception Classes

class PZAIException(Exception):
    """
    Base class for other exceptions.
    """
    pass


class DataSourceNotDefinedError(PZAIException):
    """
    Raised when a particular data source is not defined for data import or export.
    """


class PortInformationNotFoundError(PZAIException):
    """
    Raised when port details are not found for the particular port.
    """
    pass


class UnknownOperationMode(PZAIException):
    """
    Raised when a method or feature is not defined.
    """
    pass


# DO NOT EDIT
# Connection Manager Class

class ConnectionManager:

    def __init__(self):
        pass

    @staticmethod
    def get_data_for_a_port(data, port_number, connection_type):
        """
        This method helps to get the details for one particular port information
        :param data:
        :param port_number:
        :param connection_type:
        :return:
        """
        for each_data in data:
            if int(each_data["port"]) == int(port_number):
                return each_data["sourceDetails"]
        raise PortInformationNotFoundError(
            "Port details not found for port_number {} in the {} connection".format(port_number, connection_type))
       
    
    
def create_cassandra_table_if_not_exists(df, keypace, table_name, pk):
    cassandra_keyspace = keypace
    cassandra_table = table_name
    schema_string = get_schema_string(df, pk)

    query_cassandra_create_table = """
    CREATE TABLE {keyspace}.{table}({schema_string});
    """.format(keyspace=cassandra_keyspace, table=cassandra_table, schema_string=schema_string)

    print(query_cassandra_create_table)
    drop_query = "DROP TABLE IF EXISTS {keyspace}.{table}".format(keyspace=cassandra_keyspace,table=cassandra_table)
    session.execute(drop_query)
    session.execute(query_cassandra_create_table)

def get_schema_string(df, primary_k):
    datatype_changer_dict = {
        "StringType": "text",
        "IntegerType": "int",
        "DateType": "int",
        "LongType": "int",
        "DoubleType": "double"
    }
    columns = df.limit(2).toPandas().columns
    schema_string = []
    for each_column in columns:
        data_type = str(list(df[[each_column]].schema)[0]).split(",")[1]
        try:
            data_type = datatype_changer_dict[data_type]
        except:
            data_type = None
        schema_string.append("{column_name} {data_type}".format(column_name=each_column, data_type=data_type))
    schema_string.append("PRIMARY KEY ({})".format(primary_k))
    schema_string = ",".join(schema_string)
    return schema_string
# DO NOT EDIT
# Dataframe Connector

class DataFrame(object):
    """
    class that gets a data-frame from the mentioned port of the input-data
    """

    def get(self, input_data, port_number):
        """
        The function that gets the entire input configuration for the data and returns the selected data-frame.
        :param input_data: list of dictionary for input configuration.
        :param port_number: the port number from where the data has to be fetched.
        :return: spark data-frame
        """
        return self._get_df(input_data=input_data, port_number=port_number)

    def _get_df(self, input_data, port_number):
        """
        The function that gets the entire input configuration for the data and returns the selected data-frame.
        :param input_data: list od dictionary for input configuration.
        :param port_number: the port number from where the data has to be fetched.
        :return: spark data-frame
        """
        port_information = ConnectionManager.get_data_for_a_port(data=input_data,
                                                                 port_number=port_number,
                                                                 connection_type="input")
        data_source = str(port_information["source"]).lower()
        if data_source == DataSource.CASSANDRA:
            df = self._get_df_from_cassandra(port_information)
        elif data_source == DataSource.S3:
            df = self._get_df_from_s3(port_information)
        elif data_source==DataSource.LOCAL:
            df=self._get_df_from_local(port_information)
        else:
            raise DataSourceNotDefinedError("Data-frame import from {} is currently not supported.".format(data_source))
        return df

    def write(self, df, output_data, port_number):
        """
        The function that gets the entire input configuration for the data and returns the selected data-frame.
        :param df: The spark data-frame to be written out.
        :param output_data: list of dictionary for output configuration.
        :param port_number: the port number to which the data has to be written.
        :return: boolean status
        """
        return self.__write(df=df, output_data=output_data, port_number=port_number)

    def __write(self, df, output_data, port_number):
        """
        The function that gets the entire input configuration for the data and returns the selected data-frame.
        :param df: The spark data-frame to be written out.
        :param output_data: list of dictionary for output configuration.
        :param port_number: the port number to which the data has to be written.
        :return: boolean status
        """
        port_information = ConnectionManager.get_data_for_a_port(data=output_data,
                                                                 port_number=port_number,
                                                                 connection_type="output")
        data_source = str(port_information["source"]).lower()

        if data_source == DataSource.CASSANDRA:
            flag = self.__write_to_cassandra(df=df, source_information=port_information)
        elif data_source == DataSource.S3:
            flag = self.__write_to_s3(df=df, source_information=port_information)
        else:
            raise DataSourceNotDefinedError("Data-frame export to {} is currently not supported.".format(data_source))
        return flag

    @staticmethod
    def __write_to_cassandra(df, source_information):
        """
        The function to write data to a cassandra table.
        :param df:
        :param source_information:
        :return:
        """
        table_name = source_information["tableName"]
        keyspace_name = source_information["keyspace"]
        write_mode = str(source_information["writeMode"]).lower()
        pk = str(source_information["primaryKeys"]).lower()

        for col in df.columns:
            df = df.withColumnRenamed(col, col.lower())

        if write_mode == "append":
            df.write.format("org.apache.spark.sql.cassandra").mode(write_mode).options(table=table_name,
                                                                                       keyspace=keyspace_name).save()
        elif write_mode == "overwrite":
            create_cassandra_table_if_not_exists(df, keyspace_name, table_name, pk)
            df.write.format("org.apache.spark.sql.cassandra").mode("overwrite").options(table=table_name,keyspace=keyspace_name).option("confirm.truncate", "true").save()

        else:
            raise UnknownOperationMode("The mentioned writing mode {} is not defined for Cassandra.".format(write_mode))
        return True

    @staticmethod
    def __write_to_s3(df, source_information):
        file_path = source_information["filePath"]
        file_format = source_information["fileFormat"]
        df.write.format(file_format).options(header='true').mode("overwrite").save(file_path)        
        return True

    @staticmethod
    def _get_df_from_cassandra(source_information):
        """
        The function to get data-frame from a cassandra table.
        :param source_information:
        :return:
        """
        table_name = source_information["tableName"]
        keyspace_name = source_information["keyspace"]
        df = spark.read.format('org.apache.spark.sql.cassandra').options(table=table_name,
                                                                         keyspace=keyspace_name).load()
        return df

    @staticmethod
    def _get_df_from_s3(source_information):
        """
        The function to get data-frame from Amazon S3.
        :param source_information:
        :return:
        """
        file_format = source_information["fileFormat"]
        file_path = source_information["filePath"]
        df = spark.read.format(file_format).options(header='true', inferSchema='true').load(file_path)
        return df
    @staticmethod
    def _get_df_from_local(source_information):

        df=pd.DataFrame()
        file_format = source_information["fileFormat"]
        file_path = source_information["filePath"]
        try:
            for filename in os.listdir(file_path):
                if filename.endswith(file_format):
                    if file_format=="csv":
                        df1=pd.read_csv(file_path+"/"+filename)
                        df=pd.concat([df,df1])
                    elif file_format=="parquet":
                        df1=pd.read_parquet(file_path+"/"+filename,engine="pyarrow")
                        df=pd.concat([df,df1])
        except:
            if file_format=="csv":
                df=pd.read_csv(file_path)

            elif file_format=="parquet":
                df=pd.read_parquet(file_path,engine="pyarrow")


        return df



request_data ={
  "input": [
    {
      "port": 1,
      "dataType": "dataframe",
      "sourceDetails": {
        "source": "local",
        "fileFormat": "csv",
        "filePath":r"C:\Users\pv23228\Documents\P.AI\Data\Movies Dataset\final_transactions_ss_trainset_v1.csv"
    }
    },
      {
      "port": 2,
      "dataType": "dataframe",
      "sourceDetails": {
        "source": "local",
        "fileFormat": "csv",
        "filePath": r"C:\Users\pv23228\Documents\P.AI\Data\Movies Dataset\final_item_data_ss_trainset_v1.csv"
      }
    }, 
      {
      "port": 3,
      "dataType": "dataframe",
      "sourceDetails": {
        "source": "local",
        "fileFormat": "csv",
        "filePath": r"C:\Users\pv23228\Documents\P.AI\Data\Movies Dataset\final_customer_demographics_ss_trainset_v1.csv"
      }
    }    
  ],
  "output": [
    {
      "port": 5,
      "dataType": "dataframe",
      "sourceDetails": {
        "source": "s3",
        "fileFormat": "csv",
        "filePath": "s3://zs-ds-pzai-general/data/customer_embeddings_imdb_tr_full_v1.csv"
      }
    },
      {
      "port": 6,
      "dataType": "dataframe",
      "sourceDetails": {
        "source": "s3",
        "fileFormat": "csv",
        "filePath": "s3://zs-ds-pzai-general/data/item_embeddings_imdb_tr_full_v1.csv"
      }
    },
      {
      "port": 7,
      "dataType": "dataframe",
      "sourceDetails": {
        "source": "s3",
        "fileFormat": "csv",
        "filePath": "s3://zs-ds-pzai-general/data/link_embeddings_imdb_tr_full_v1.csv"
      }
    }
  ],
  "function": {
    "component": "objective",
    "args": {
        "features_for_item_node":[],
        "features_for_customer_node":[],
        "batch_size": 2000,
        "epochs": 4,
        "layer_size_of_hidden_layer":[32],
        "number_of_neighbours_access":[4]
    }
  },
  "meta": {
    "triggeredBy": "Aditya Kothari",
    "triggerTime": "2020-02-06 12:55:04",
    "pipelineId": "pzai_pipeline_001"
  }
}

# DO NOT  EDIT BELOW
input_data = request_data["input"]
output_data = request_data["output"]
arguments = request_data["function"]["args"]
meta_data = request_data["meta"]


transactions = DataFrame().get(input_data,1)
items = DataFrame().get(input_data,2)
customers = DataFrame().get(input_data,3)

In [5]:
matrix=csv_to_sparse(transactions)
display(matrix)
print(matrix.todense())
g = dgl.bipartite_from_scipy(matrix, utype='users', etype='ratings', vtype='items', eweight_name='rating')
print(g)
g.edata

10363


<10363x15162 sparse matrix of type '<class 'numpy.float64'>'
	with 365170 stored elements in COOrdinate format>

[[0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 ...
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]]
Graph(num_nodes={'users': 10363, 'items': 15162},
      num_edges={('users', 'ratings', 'items'): 365170},
      metagraph=[('users', 'items', 'ratings')])


{'rating': tensor([3., 5., 3.,  ..., 2., 4., 2.], dtype=torch.float64)}

In [6]:
#Adding a column to Customer Df as there are no features for users in IMDB
customers["alive"] = 1
#coverting dtype for live column to category to experiment on building tensor for category type feature
customers["alive"] = customers["alive"].astype('category')
customers = customers.drop_duplicates()
customers.shape

(10363, 2)

In [7]:
items = items.drop_duplicates()
items.shape

(4799, 9)

In [8]:
#Storing user and product ids as a list, creating dict
user_ids = list(customers.customer_id)
product_ids = list(items.item_id)
user_ids_invmap = {id_: i for i, id_ in enumerate(user_ids)}
product_ids_invmap = {id_: i for i, id_ in enumerate(product_ids)}

In [221]:
#Creating Graph
g= dgl.DGLGraph(multigraph=True)
g.add_nodes(len(user_ids) + len(product_ids))

In [9]:
# user features
user_column =customers.columns[1]
udata = torch.zeros(g.number_of_nodes(), dtype=torch.int64)
# 0 for padding
udata[:len(user_ids)] = torch.LongTensor(customers[user_column].cat.codes.values.astype('int64') + 1)
g.ndata[user_column] = udata

AssertionError: Current HeteroNodeDataView has multiple node types, please passing the node type and the corresponding data through a dict.

In [13]:
g.ndata['users']['alive'] = udata


In [14]:
# ITEM features
for item_column in ['budget','product_category', 'popularity','runtime', 'vote_count']:
    pdata = torch.from_numpy(items[item_column].values.astype('int64'))
    g.ndata['items'][item_column] = pdata
#     g.ndata[item_column] = torch.zeros(g.number_of_nodes(), dtype=torch.int64)
#     g.ndata[item_column][len(user_ids):len(user_ids) + len(product_ids)] = pdata

In [15]:
#Item categorical features
for col in ['status', 'adult']:
    items[col] = items[col].astype('category')
#     pdata = torch.zeros(g.number_of_nodes(), dtype=torch.int64)
    pdata = torch.LongTensor(items[col].cat.codes.values.astype('int64') + 1)
    g.ndata['items'][col] = pdata


In [225]:
# Movie title
nlp = stanfordnlp.Pipeline(use_gpu=False, processors='tokenize,lemma')
vocab = set()
title_words = []
for t in tqdm.tqdm(items['title'].values):
    doc = nlp(t)
    words = set()
    for s in doc.sentences:
        words.update(w.lemma.lower() for w in s.words
                     if not re.fullmatch(r'['+string.punctuation+']+', w.lemma))
    vocab.update(words)
    title_words.append(words)
vocab = list(vocab)

vocab_invmap = {w: i for i, w in enumerate(vocab)}
# bag-of-words
g.ndata['title'] = torch.zeros(g.number_of_nodes(), len(vocab))


for i, tw in enumerate(tqdm.tqdm(title_words)):
    g.ndata['title'][i, [vocab_invmap[w] for w in tw]] = 1

  0%|                                                                                         | 0/4799 [00:00<?, ?it/s]

Use device: cpu
---
Loading: tokenize
With settings: 
{'model_path': 'C:\\Users\\pv23228\\stanfordnlp_resources\\en_ewt_models\\en_ewt_tokenizer.pt', 'lang': 'en', 'shorthand': 'en_ewt', 'mode': 'predict'}
---
Loading: lemma
With settings: 
{'model_path': 'C:\\Users\\pv23228\\stanfordnlp_resources\\en_ewt_models\\en_ewt_lemmatizer.pt', 'lang': 'en', 'shorthand': 'en_ewt', 'mode': 'predict'}
Building an attentional Seq2Seq model...
Using a Bi-LSTM encoder
Using soft attention for LSTM.
Finetune all embeddings.
[Running seq2seq lemmatizer with edit classifier]
Done loading processors!
---


100%|██████████████████████████████████████████████████████████████████████████████| 4799/4799 [02:03<00:00, 38.82it/s]
100%|███████████████████████████████████████████████████████████████████████████| 4799/4799 [00:00<00:00, 11152.47it/s]


In [16]:
ratings= transactions.drop(columns = ["product_category", "date"]).drop_duplicates()
product_count = ratings['item_id'].value_counts()
product_count.name = 'product_count'
ratings = ratings.join(product_count, on='item_id')
ratings.head()

Unnamed: 0,customer_id,item_id,price,product_count
0,c6216.0,i1487,3.0,63
1,c9733.0,i2132,5.0,103
2,c9488.0,i26391,3.0,2
3,c4614.0,i39183,4.0,263
4,c5980.0,i788,4.0,710


In [17]:
split_by_time = None
from functools import partial

def split_user(df, filter_counts=0, timestamp=None):
    df_new = df.copy()
    df_new['prob'] = -1
    df_new_sub = (df_new['product_count'] >= filter_counts).to_numpy().nonzero()[0]
    prob = np.linspace(0, 1, df_new_sub.shape[0], endpoint=False)
    np.random.shuffle(prob)
    df_new['prob'].iloc[df_new_sub] = prob
    
    return df_new

def data_split(ratings):
    ratings = ratings.groupby('customer_id', group_keys=False).apply(
            partial(split_user, filter_counts=5, timestamp=split_by_time))
    ratings['train'] = ratings['prob'] <= 0.8
    ratings['valid'] = (ratings['prob'] > 0.8) & (ratings['prob'] <= 0.9)
    ratings['test'] = ratings['prob'] > 0.9
    ratings.drop(['prob'], axis=1, inplace=True)
    return ratings

In [18]:
t1 = time.time()
ratings_ = data_split(ratings)
ratings_.head()
t2 = time.time()
print(t2-t1)

Unnamed: 0,customer_id,item_id,price,product_count,train,valid,test
38553,c1.0,i4226,4.0,1303,True,False,False
18957,c1.0,i54503,3.5,352,True,False,False
32030,c1.0,i110,1.0,2104,True,False,False
7086,c1.0,i858,5.0,1853,True,False,False
4817,c1.0,i2959,4.0,1942,True,False,False


In [19]:
ratings_.reset_index(drop = True, inplace = True)
ratings_.head()

Unnamed: 0,customer_id,item_id,price,product_count,train,valid,test
0,c1.0,i4226,4.0,1303,True,False,False
1,c1.0,i54503,3.5,352,True,False,False
2,c1.0,i110,1.0,2104,True,False,False
3,c1.0,i858,5.0,1853,True,False,False
4,c1.0,i2959,4.0,1942,True,False,False


In [20]:
#Use ratings df to get lists of source-dest nodes 
rating_user_vertices = [user_ids_invmap[id_] for id_ in ratings_['customer_id'].values]
rating_product_vertices = [product_ids_invmap[id_] + len(user_ids)
                         for id_ in ratings_['item_id'].values]

In [231]:
#Partial, edges added 
# g.add_edges(
#         rating_user_vertices,
#         rating_product_vertices,
#         data={'inv': torch.ones(ratings_.shape[0], dtype=torch.uint8),
#             'rating': torch.FloatTensor(ratings_['price'])})

In [37]:
def generate_mask():
        valid_tensor = torch.from_numpy(ratings_['valid'].values.astype('uint8'))
        test_tensor = torch.from_numpy(ratings_['test'].values.astype('uint8'))
        train_tensor = torch.from_numpy(ratings_['train'].values.astype('uint8'))
        g.edata['valid'] = valid_tensor
        g.edata['test'] = test_tensor
        g.edata['true'] = train_tensor
        
# Generate the list of products for each user in training/validation/test set.
def generate_candidates():
    p_train = []
    p_valid = []
    p_test = []
    for uid in tqdm.tqdm(user_ids):
        user_ratings = ratings_[ratings_['customer_id'] == uid]
        p_train.append(np.array(
            [product_ids_invmap[i] for i in user_ratings[user_ratings['train']]['item_id'].values]))
        p_valid.append(np.array(
            [product_ids_invmap[i] for i in user_ratings[user_ratings['valid']]['item_id'].values]))
        p_test.append(np.array(
            [product_ids_invmap[i] for i in user_ratings[user_ratings['test']]['item_id'].values]))
        
    return p_train, p_valid, p_test

In [38]:
generate_mask()

p_train, p_valid, p_test = generate_candidates()

100%|████████████████████████████████████████████████████████████████████████████| 10363/10363 [10:02<00:00, 17.20it/s]


In [235]:
# Find the subgraph of all "training" edges
g_train = g.edge_subgraph(g.filter_edges(lambda edges: edges.data['train']), True)
# g_train.copy_from_parent()
# g_train.readonly()

#Obtain edge id's for valid and test data
eid_valid = g.filter_edges(lambda edges: edges.data['valid'])
eid_test = g.filter_edges(lambda edges: edges.data['test'])

#Storing source and dest node ids for train, valid, test edges
src_valid, dst_valid = g.find_edges(eid_valid)
src_test, dst_test = g.find_edges(eid_test)
src, dst = g_train.all_edges()

#storing ratings for train, valid, test edges
rating = g_train.edata['rating']
rating_valid = g.edges[eid_valid].data['rating']
rating_test = g.edges[eid_test].data['rating']

In [236]:
#Define feature size and layers in gNN
feature_size = 10
n_layers = 1

In [237]:
#Build model 
model = GraphSAGERecommender(GraphSageWithSampling(100, 1, g_train))
opt = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-9)

batch_size = 1024
n_users = len(customers['customer_id'].to_list())
n_products = len(items['item_id'].to_list())

#Running model training for 50 epochs
for epoch in range(50):
    model.eval()
    
    # Validation & Test, we precompute GraphSage output for all nodes first.
    sampler = dgl.contrib.sampling.NeighborSampler(
        g_train,
        batch_size,
        5,
        1,
        seed_nodes=torch.arange(g.number_of_nodes()),
        prefetch=True,
        add_self_loop=True,
        shuffle=False,
        num_workers=4
    )

    with torch.no_grad():
        h = []
        for nf in sampler:
            #import pdb
            #pdb.set_trace()
            h.append(model.gcn.forward(nf))
        h = torch.cat(h)

        # Compute validation RMSE
        score = torch.zeros(len(src_valid))
        for i in range(0, len(src_valid), batch_size):
            s = src_valid[i:i+batch_size]
            d = dst_valid[i:i+batch_size]
            score[i:i+batch_size] = (h[s] * h[d]).sum(1) + model.node_biases[s + 1] + model.node_biases[d + 1]
        valid_rmse = ((score - rating_valid) ** 2).mean().sqrt()

        # Compute test RMSE
        score = torch.zeros(len(src_test))
        for i in range(0, len(src_test), batch_size):
            s = src_test[i:i+batch_size]
            d = dst_test[i:i+batch_size]
            score[i:i+batch_size] = (h[s] * h[d]).sum(1) + model.node_biases[s + 1] + model.node_biases[d + 1]
        test_rmse = ((score - rating_test) ** 2).mean().sqrt()
        
    model.train()
    
    shuffle_idx = torch.randperm(g_train.number_of_edges())
    src_shuffled = src[shuffle_idx]
    dst_shuffled = dst[shuffle_idx]
    rating_shuffled = rating[shuffle_idx]
    src_batches = src_shuffled.split(batch_size)
    dst_batches = dst_shuffled.split(batch_size)
    rating_batches = rating_shuffled.split(batch_size)

    seed_nodes = torch.cat(sum([[s, d] for s, d in zip(src_batches, dst_batches)], []))
    
    sampler = dgl.contrib.sampling.NeighborSampler(
        g_train,               # the graph
        batch_size * 2,        # number of nodes to compute at a time, HACK 2
        5,                     # number of neighbors for each node
        1,                     # number of layers in GCN
        seed_nodes=seed_nodes, # list of seed nodes, HACK 2
        prefetch=True,         # whether to prefetch the NodeFlows
        add_self_loop=True,    # whether to add a self-loop in the NodeFlows, HACK 1
        shuffle=False,         # whether to shuffle the seed nodes.  Should be False here.
        num_workers=4,
    )

    # Training
    for s, d, r, nodeflow in zip(src_batches, dst_batches, rating_batches, sampler):
        score = model.forward(nodeflow, s, d)
        loss = ((score - r) ** 2).mean()
        
        opt.zero_grad()
        loss.backward()
        opt.step()

    print('Training loss:', loss.item(), 'Validation RMSE:', valid_rmse.item(), 'Test RMSE:', test_rmse.item())
    

Training loss: 7.031308650970459 Validation RMSE: 3.42280650138855 Test RMSE: 3.4068078994750977
Training loss: 6.358283519744873 Validation RMSE: 2.653252124786377 Test RMSE: 2.635495662689209
Training loss: 5.915226459503174 Validation RMSE: 2.545581102371216 Test RMSE: 2.525508403778076
Training loss: 5.876169681549072 Validation RMSE: 2.4451513290405273 Test RMSE: 2.4228813648223877
Training loss: 5.133886814117432 Validation RMSE: 2.3491814136505127 Test RMSE: 2.3247969150543213
Training loss: 4.962716579437256 Validation RMSE: 2.25700306892395 Test RMSE: 2.2305908203125
Training loss: 4.498600959777832 Validation RMSE: 2.168396472930908 Test RMSE: 2.1400582790374756
Training loss: 4.082102298736572 Validation RMSE: 2.0832841396331787 Test RMSE: 2.0531299114227295
Training loss: 3.7747926712036133 Validation RMSE: 2.0018880367279053 Test RMSE: 1.970004677772522
Training loss: 3.2147936820983887 Validation RMSE: 1.923883080482483 Test RMSE: 1.8903712034225464
Training loss: 3.11359

In [None]:
 import dgl
>>> import scipy as sp
>>> m = sp.sparse.random(100, 100, density=0.1, format='csr')
>>> g= dgl.DGLGraph(m, readonly=True)
>>> g.number_of_nodes()
100
>>> g.number_of_edges()
1000


In [101]:
model.train()

GraphSAGERecommender(
  (gcn): GraphSageWithSampling(
    (convs): ModuleList(
      (0): GraphSageConvWithSampling(
        (W): Linear(in_features=20, out_features=10, bias=True)
      )
    )
    (emb): ModuleDict(
      (alive): Embedding(2, 10, padding_idx=0)
      (budget): Embedding(380000001, 10, padding_idx=0)
      (product_category): Embedding(11, 10, padding_idx=0)
      (popularity): Embedding(141, 10, padding_idx=0)
      (runtime): Embedding(481, 10, padding_idx=0)
      (vote_count): Embedding(12270, 10, padding_idx=0)
      (status): Embedding(5, 10, padding_idx=0)
      (adult): Embedding(3, 10, padding_idx=0)
      (_ID): Embedding(15162, 10, padding_idx=0)
    )
    (proj): ModuleDict(
      (title): Sequential(
        (0): Linear(in_features=4854, out_features=10, bias=True)
        (1): LeakyReLU(negative_slope=0.01)
      )
    )
    (node_emb): Embedding(15163, 10)
  )
)

In [234]:
def mix_embeddings(ndata, emb, proj):
    """Adds external (categorical and numeric) features into node representation G.ndata['h']"""
    extra_repr = []
    for key, value in ndata.items():
        if (value.dtype == torch.int64) and key in emb:
            result = emb[key](value)
            if result.dim() == 3:    # bag of words: the result would be a (n_nodes x seq_len x feature_size) tensor
                result = result.mean(1)
            extra_repr.append(result)
        elif (value.dtype == torch.float32) and key in proj:
            result = proj[key](value)
            extra_repr.append(result)
    ndata['h'] = ndata['h'] + torch.stack(extra_repr, 0).sum(0)
    
def init_weight(param, initializer, nonlinearity):
    initializer = getattr(nn.init, initializer)
    if nonlinearity is not None:
        initializer(param)
    else:
        initializer(param, nn.init.calculate_gain(nonlinearity))
        
def init_bias(param):
    nn.init.constant_(param, 0)

class GraphSageConvWithSampling(nn.Module):
    def __init__(self, feature_size):
        super(GraphSageConvWithSampling, self).__init__()

        self.feature_size = feature_size
        self.W = nn.Linear(feature_size * 2, feature_size)
        init_weight(self.W.weight, 'xavier_uniform_', 'leaky_relu')
        init_bias(self.W.bias)

    def forward(self, nodes):
        h_agg = nodes.data['h_agg']
        h = nodes.data['h']
        w = nodes.data['w'][:, None]
        h_agg = (h_agg-h)/(w-1).clamp(min=1)    # HACK 1
        h_concat = torch.cat([h, h_agg], 1)
        h_new = F.leaky_relu(self.W(h_concat))
        return {'h': h_new / h_new.norm(dim=1, keepdim=True).clamp(min=1e-6)}
    
class GraphSageWithSampling(nn.Module):
    def __init__(self, feature_size, n_layers, G):
        super(GraphSageWithSampling, self).__init__()
        
        self.feature_size = feature_size
        self.n_layers = n_layers

        self.convs = nn.ModuleList([GraphSageConvWithSampling(feature_size) for _ in range(n_layers)])
        
        self.emb = nn.ModuleDict()
        self.proj = nn.ModuleDict()

        for key, scheme in G.node_attr_schemes().items():
            if scheme.dtype == torch.int64:
                n_items = G.ndata[key].max().item()
                self.emb[key] = nn.Embedding(
                        n_items + 1,
                        self.feature_size,
                        padding_idx=0)
                nn.init.normal_(self.emb[key].weight, 1 / self.feature_size)
            elif scheme.dtype == torch.float32:
                w = nn.Linear(scheme.shape[0], self.feature_size)
                init_weight(w.weight, 'xavier_uniform_', 'leaky_relu')
                init_bias(w.bias)
                self.proj[key] = nn.Sequential(w, nn.LeakyReLU())
                
        self.G = G
        
        self.node_emb = nn.Embedding(G.number_of_nodes() + 1, feature_size)
        nn.init.normal_(self.node_emb.weight, std=1 / self.feature_size)

    msg = [FN.copy_src('h', 'h'),
           FN.copy_src('one', 'one')]
    red = [FN.sum('h', 'h_agg'), FN.sum('one', 'w')]

    def forward(self, nf):
        '''
        nf: NodeFlow.
        '''
        nf.copy_from_parent(edge_embed_names=None)
        for i in range(nf.num_layers):
            nf.layers[i].data['h'] = self.node_emb(nf.layer_parent_nid(i) + 1)
            nf.layers[i].data['one'] = torch.ones(nf.layer_size(i))
            mix_embeddings(nf.layers[i].data, model.gcn.emb, model.gcn.proj)
        if self.n_layers == 0:
            return nf.layers[i].data['h']
        for i in range(self.n_layers):
            nf.block_compute(i, self.msg, self.red, self.convs[i])

        result = nf.layers[self.n_layers].data['h']
        assert (result != result).sum() == 0
        return result
    
class GraphSAGERecommender(nn.Module):
    def __init__(self, gcn):
        super(GraphSAGERecommender, self).__init__()
        
        self.gcn = gcn
        self.node_biases = nn.Parameter(torch.zeros(gcn.G.number_of_nodes()+1))
        
    def forward(self, nf, src, dst):
        h_output = self.gcn(nf)
        h_src = h_output[nodeflow.map_from_parent_nid(-1, src, True)]
        h_dst = h_output[nodeflow.map_from_parent_nid(-1, dst, True)]
        score = (h_src * h_dst).sum(1) + self.node_biases[src+1] + self.node_biases[dst+1]
        return score