# Matrix Factorisation using Similarity Matrices

In [None]:
# import modules
import numpy as np
import matplotlib.pyplot as plt
import torch
from tdc.multi_pred import DTI
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.feature_extraction.text import CountVectorizer
import logging
from rdkit import Chem

In [None]:
# load in the dataset
data_Kd = DTI(name='BindingDB_Kd')
data_Kd.convert_to_log()

print("load similarity matrix")
drug_sim_np = np.loadtxt('../sim_matrix/drug_sim.txt', delimiter=',')
drug_sim = torch.from_numpy(drug_sim_np)

target_sim_np = np.loadtxt('../sim_matrix/target_sim_Kd.txt', delimiter=',')
target_sim = torch.from_numpy(target_sim_np)
print("load done")

In [None]:
def data_split(data):
    # split data and get ID dicts
    split = data.get_split(seed=42, frac=[0.6, 0.05, 0.35])
    train = split['train']
    test = split['test']

    train = train[['Drug_ID', 'Drug', 'Target', 'Y']].dropna()
    train = train.reset_index(drop=True)

    ID_to_Drug = dict(enumerate(list(dict.fromkeys(train['Drug_ID']))))
    ID_to_Target = dict(enumerate(list(dict.fromkeys(train['Target']))))
    Drug_to_ID = dict((v, k) for k, v in ID_to_Drug.items())
    Target_to_ID = dict((v, k) for k, v in ID_to_Target.items())

    return train, test, Drug_to_ID, Target_to_ID


def data_loader(data, drug_dict, target_dict):
    # load data into correct format
    data["Target_ID2"] = data["Target"].apply(lambda x: target_dict.get(x))
    data["Drug_ID2"] = data["Drug_ID"].apply(lambda x: drug_dict.get(x))
    data = data.dropna()

    drug_ID = data["Drug_ID2"].to_numpy()
    target_ID = data["Target_ID2"].to_numpy()
    features = np.vstack((drug_ID, target_ID)).T
    label = data['Y'].to_numpy()
    return features, label

In [None]:
class RatingDataset(Dataset):
    def __init__(self, train, label):
        self.feature_ = train
        self.label_ = label

    def __len__(self):
        # return size of dataset
        return len(self.feature_)

    def __getitem__(self, idx):
        return torch.tensor(self.feature_[idx], dtype=torch.long), torch.tensor(self.label_[idx], dtype=torch.float)



In [None]:
class MatrixFactorization(torch.nn.Module):

    def __init__(self, n_users, n_items, n_factors, drug_sim, target_sim):
        super().__init__()
        self.user_factors = torch.nn.Embedding(n_users, n_factors)
        self.item_factors = torch.nn.Embedding(n_items, n_factors)
        torch.nn.init.xavier_uniform_(self.user_factors.weight)
        torch.nn.init.xavier_uniform_(self.item_factors.weight)

        self.user_biases = torch.nn.Embedding(n_users, 1)
        self.item_biases = torch.nn.Embedding(n_items, 1)
        self.user_biases.weight.data.fill_(0.)
        self.item_biases.weight.data.fill_(0.)

        self.user_sim = drug_sim
        self.item_sim = target_sim

    def forward(self, user, item):
        AAT_list = [torch.dot(self.user_factors(user)[i, :], self.user_factors(user)[i, :]) for i in
                    range(self.user_factors(user).shape[0])]
        AAT = torch.tensor(AAT_list)

        Sd_partial = self.user_sim[user]
        Sd = Sd_partial[:, user]

        BBT_list = [torch.dot(self.item_factors(item)[i, :], self.item_factors(item)[i, :]) for i in
                    range(self.item_factors(item).shape[0])]
        BBT = torch.tensor(BBT_list)

        St_partial = self.user_sim[user]
        St = St_partial[:, user]

        pred = self.user_biases(user) + self.item_biases(item)
        pred += (self.user_factors(user) * self.item_factors(item)).sum(1, keepdim=True)

        drug_norm = 0.25 * torch.norm(Sd - AAT)
        target_norm = 0.25 * torch.norm(St - BBT)
        return pred.squeeze(), drug_norm, target_norm

In [None]:
def train_model(train_loader, test_loader, model, num_epochs=100):
    dev = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
    loss_func = torch.nn.MSELoss()

    model.to(dev)

    train_losses = []
    test_losses = []
    for epoch in range(0, num_epochs):
        count = 0
        cum_loss = 0.
        for i, (train_batch, label_batch) in enumerate(train_loader):
            count = 1 + i
            # Predict and calculate loss for user factor and bias
            optimizer = torch.optim.SGD([model.user_biases.weight, model.user_factors.weight], lr=0.01,
                                        weight_decay=1e-5)
            prediction, drug_norm, target_norm = model(train_batch[:, 0].to(dev), train_batch[:, 1].to(dev))
            loss = loss_func(prediction, label_batch.to(dev)).float() + drug_norm + target_norm
            # Backpropagate
            loss.backward()

            # Update the parameters
            optimizer.step()
            optimizer.zero_grad()

            # predict and calculate loss for item factor and bias
            optimizer = torch.optim.SGD([model.item_biases.weight, model.item_factors.weight], lr=0.01,
                                        weight_decay=1e-5)
            prediction, drug_norm, target_norm = model(train_batch[:, 0].to(dev), train_batch[:, 1].to(dev))
            loss = loss_func(prediction, label_batch.to(dev)).float() + drug_norm + target_norm
            loss_item = loss.item()
            cum_loss += loss_item

            # Backpropagate
            loss.backward()

            # Update the parameters
            optimizer.step()
            optimizer.zero_grad()
        train_loss = cum_loss / count
        train_losses.append(train_loss)

        cum_loss = 0.
        count = 0
        for i, (test_batch, label_batch) in enumerate(test_loader):
            count = 1 + i
            with torch.no_grad():
                prediction, drug_norm, target_norm = model(test_batch[:, 0].to(dev), test_batch[:, 1].to(dev))
                loss = loss_func(prediction, label_batch.to(dev))
                cum_loss += loss.item()

        test_loss = cum_loss / count
        test_losses.append(test_loss)
        if epoch % 1 == 0:
            print('epoch: ', epoch, ' avg training loss: ', train_loss, ' avg test loss: ', test_loss)
    return train_losses, test_losses

In [None]:
def full_model(data, drug_sim, target_sim, img_name, n_factors=100, bs=128, num_epochs=100):
    train, test, drug_dict, target_dict = data_split(data)
    x_train, y_train = data_loader(train, drug_dict, target_dict)
    x_test, y_test = data_loader(test, drug_dict, target_dict)

    train_dataloader = DataLoader(RatingDataset(x_train, y_train), batch_size=bs, shuffle=True)
    test_dataloader = DataLoader(RatingDataset(x_test, y_test), batch_size=bs)

    model = MatrixFactorization(len(drug_dict), len(target_dict), n_factors, drug_sim, target_sim)

    train_losses, test_losses = train_model(train_dataloader, test_dataloader, model, num_epochs)

    epochs = range(1, num_epochs + 1)
    plt.plot(epochs, train_losses, label='train')
    plt.plot(epochs, test_losses, label='test')
    plt.xlabel('epoch')
    plt.ylabel('mse loss')
    plt.legend()
    plt.title(img_name)
    #     plt.savefig(img_name)
    plt.show()

In [None]:
full_model(data_Kd, drug_sim, target_sim, 'Kd', n_factors=100, bs=128, num_epochs=100)