<a href="https://colab.research.google.com/github/swapnilcse16/TwinNet-DR/blob/main/TwinNet_DR.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install torch
!pip install torch-geometric

Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch)
  Downloading nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cufft-cu12==11.2.1.3 (from torch)
  Downloading nvidia_cufft_cu12-11.2.1.3-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-curand-cu12==10.3.5.147 (from torch)
  Downloading nvidia_curand_cu12-10.3.5

In [2]:
threshold = 0.6
learning_rate = 0.01
hidden_channels = 128
output_channels = 32
train_ratio = 0.9
negative_edge_ratio = 1
epochs = 200

In [3]:
import pandas as pd
import torch
from torch_geometric.nn import SAGEConv
from torch_geometric.nn import GCNConv
from torch_geometric.nn import GATConv
from torch_geometric.utils import negative_sampling
from sklearn.metrics import roc_auc_score, accuracy_score
import torch.nn.functional as F
import torch.nn as nn
from google.colab import files
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, roc_curve, auc, roc_auc_score
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.metrics import confusion_matrix

In [4]:
columns = ['Algorithm', 'AUROC', 'Accuracy', 'Precision', 'Recall', 'F1']
results = pd.DataFrame(columns=columns)

def sampling(arr, size):
  perm = torch.randperm(arr.size(1))
  arr = arr[:, perm]
  perm = torch.randperm(arr.size(1))
  selected_indices = perm[:size]
  not_selected_indices = perm[size:]
  return arr[:, selected_indices], arr[:, not_selected_indices]


def compute_accuracy(y_true, y_pred):
    return accuracy_score(y_true, y_pred)

def compute_precision(y_true, y_pred):
    return precision_score(y_true, y_pred)

def compute_f1(y_true, y_pred):
    return f1_score(y_true, y_pred)

def compute_auc_roc(y_true, y_score):
    return roc_auc_score(y_true, y_score)


def performace(df, algorithm):
  accuracy = accuracy_score(df.true, df.predicted)
  print(f"Accuracy: {accuracy:.4f}")

  # Precision
  precision = precision_score(df.true, df.predicted)
  print(f"Precision: {precision:.4f}")

  # Recall
  recall = recall_score(df.true, df.predicted)
  print(f"Recall: {recall:.4f}")

  # F1 Score
  f1 = f1_score(df.true, df.predicted)
  print(f"F1 Score: {f1:.4f}")

  # AUROC (Area Under the ROC Curve)
  roc_auc = roc_auc_score(df.true, df.sigmoid)
  print(f"AUROC: {roc_auc:.4f}")

  results.loc[len(results)] = [algorithm, roc_auc, accuracy, precision, recall, f1]

def plot_roc_curves(df1, df2, df3):
    """
    Plots ROC curves for three models and their combined ROC curves.

    Args:
        df1: DataFrame containing 'sigmoid' (predicted probabilities) and 'true' (actual labels) for GraphSAGE.
        df2: DataFrame containing 'sigmoid' (predicted probabilities) and 'true' (actual labels) for Random Forest.
        df3: DataFrame containing 'sigmoid' (predicted probabilities) and 'true' (actual labels) for Integrated Model.
    """

    # Define model labels
    models = {
        "GraphSAGE": df1,
        "Random Forest": df2,
        "Integrated Model": df3
    }

    plt.figure(figsize=(10, 7))

    # Plot ROC curve for each model separately
    for label, df in models.items():
        fpr, tpr, _ = roc_curve(df['true'], df['sigmoid'])
        roc_auc = auc(fpr, tpr)
        plt.plot(fpr, tpr, label=f"{label} (AUC = {roc_auc:.3f})")

    # Add diagonal line (random classifier)
    plt.plot([0, 1], [0, 1], 'k--', lw=2)

    # Formatting the plot
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('ROC Curve Comparison')
    plt.legend(loc="lower right")
    plt.grid()

    # Show combined ROC curve
    plt.show()

    # Draw individual ROC curves
    for label, df in models.items():
        plt.figure(figsize=(6, 5))
        fpr, tpr, _ = roc_curve(df['true'], df['sigmoid'])
        roc_auc = auc(fpr, tpr)
        plt.plot(fpr, tpr, label=f"{label} (AUC = {roc_auc:.3f})", color='blue')
        plt.plot([0, 1], [0, 1], 'k--', lw=2)

        # Formatting the plot
        plt.xlabel('False Positive Rate')
        plt.ylabel('True Positive Rate')
        plt.title(f'ROC Curve for {label}')
        plt.legend(loc="lower right")
        plt.grid()

        # Show individual ROC curve
        plt.show()




def build_confusion_matrix(df1, df2, df3):
    # Function to calculate TP, FP, FN, TN
    def get_conf_matrix(df):
        tn, fp, fn, tp = confusion_matrix(df['true'], df['predicted']).ravel()
        return tn, fp, fn, tp

    # Compute confusion matrices
    tn1, fp1, fn1, tp1 = get_conf_matrix(df1)
    tn2, fp2, fn2, tp2 = get_conf_matrix(df2)
    tn3, fp3, fn3, tp3 = get_conf_matrix(df3)

    # Create a DataFrame
    conf_matrix_df = pd.DataFrame({
        'Algorithm': ['GraphSAGE', 'Random Forest', 'Integrated Model'],
        'True Negative': [tn1, tn2, tn3],
        'False Positive': [fp1, fp2, fp3],
        'False Negative': [fn1, fn2, fn3],
        'True Positive': [tp1, tp2, tp3]
    })

    return conf_matrix_df

In [5]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [6]:
subfolder = ''
#subfolder = 'test/'
file_path = '/content/drive/MyDrive/MSc/Thesis/Colab/Dataset/' + subfolder

drug_protein_df = pd.read_csv(file_path + 'drugProteinInteraction.csv', index_col = 0)
disease_protein_df = pd.read_csv(file_path + 'diseaseProteinInteraction.csv', index_col = 0)
drug_disease_df = pd.read_csv(file_path + 'drugDiseaseInteraction.csv', index_col = 0)
ppi_df = pd.read_csv(file_path + 'proteinSimilarity.csv', index_col = 0)

drug_protein_matrix = torch.tensor(drug_protein_df.values, dtype=torch.float)
disease_protein_matrix = torch.tensor(disease_protein_df.values, dtype=torch.float)
drug_disease_matrix = torch.tensor(drug_disease_df.values, dtype=torch.float)
ppi_matrix = torch.tensor(ppi_df.values, dtype=torch.float)

x_drugs = drug_protein_matrix
x_diseases = disease_protein_matrix
x = torch.cat([x_drugs, x_diseases], dim=0)

num_of_drugs = drug_protein_df.shape[0]
num_of_diseases = disease_protein_df.shape[0]
num_of_proteins = drug_protein_df.shape[1]
n = num_of_drugs + num_of_diseases

positive_edges = drug_disease_matrix.nonzero(as_tuple=False).t().contiguous()
positive_edges[1] += num_of_drugs

negative_edges = torch.eq(drug_disease_matrix, 0).nonzero(as_tuple=False).t().contiguous()
negative_edges[1] += num_of_drugs

num_positive_edges = positive_edges.size(1)
num_negative_edges = negative_edges.size(1)

print(f"Number of positive edges: {num_positive_edges}")
print(f"Number of negative edges: {num_negative_edges}")

pos_train_size = int(train_ratio * num_positive_edges)
neg_train_size = int(train_ratio * negative_edge_ratio * num_positive_edges)

negative_edges = sampling(negative_edges, negative_edge_ratio * num_positive_edges)[0]
train_positive_edges, test_positive_edges = sampling(positive_edges, pos_train_size)
train_negative_edges, test_negative_edges = sampling(negative_edges, neg_train_size)

test_edge_index = torch.cat([test_positive_edges, test_negative_edges], dim=-1)
pos_y = torch.ones(test_positive_edges.size(1))
neg_y = torch.zeros(test_negative_edges.size(1))
test_edge_labels = torch.cat([pos_y, neg_y])


train_edges = torch.cat((train_positive_edges, test_positive_edges), dim=1)
swapped_edges = train_edges[[1, 0], :]
train_edges = torch.cat([train_edges, swapped_edges], dim=1)
#self_loops = torch.tensor([list(range(n)), list(range(n))])
#train_edges = torch.cat((train_edges, self_loops), dim=1)

print(f"Number of training positive edges: {train_positive_edges.size(1)}")
print(f"Number of training negative edges: {train_negative_edges.size(1)}")
print(f"Number of test positive edges: {test_positive_edges.size(1)}")
print(f"Number of test negative edges: {test_negative_edges.size(1)}")


Number of positive edges: 1827
Number of negative edges: 530687
Number of training positive edges: 1644
Number of training negative edges: 1644
Number of test positive edges: 183
Number of test negative edges: 183


In [7]:
train_edges.shape

torch.Size([2, 3654])

In [8]:
features_pos_a = x[train_positive_edges[0]]
features_pos_b = x[train_positive_edges[1]]
summed_features_pos = features_pos_a + features_pos_b
features_neg_a = x[train_negative_edges[0]]
features_neg_b = x[train_negative_edges[1]]
summed_features_neg = features_neg_a + features_neg_b
X_train = torch.cat((summed_features_pos, summed_features_neg), dim=0)
y_train = torch.cat((torch.ones(train_positive_edges.size(1)), torch.zeros(train_negative_edges.size(1))))
shuffle_indices = torch.randperm(X_train.shape[0])
X_train = X_train[shuffle_indices]
y_train = y_train[shuffle_indices]
X_train_np = X_train.numpy()
y_train_np = y_train.numpy()




features_pos_a = x[test_positive_edges[0]]
features_pos_b = x[test_positive_edges[1]]
summed_features_pos = features_pos_a + features_pos_b
features_neg_a = x[test_negative_edges[0]]
features_neg_b = x[test_negative_edges[1]]
summed_features_neg = features_neg_a + features_neg_b
X_test = torch.cat((summed_features_pos, summed_features_neg), dim=0)
y_test = torch.cat((torch.ones(test_positive_edges.size(1)), torch.zeros(test_negative_edges.size(1))))
X_test_np = X_test.numpy()
y_test_np = y_test.numpy()

X_test_np

array([[0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       ...,
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.]], dtype=float32)

In [9]:
#train_positive_edges.shape
#train_positive_edges = torch.cat((train_positive_edges, test_positive_edges), dim=1)
#train_positive_edges.shape
#swapped_edges = train_positive_edges[[1, 0], :]
#train_positive_edges = torch.cat((train_positive_edges, swapped_edges), dim=1)
#train_positive_edges.shape

In [10]:
class GraphSAGE(torch.nn.Module):
    def __init__(self, in_channels, hidden_channels, out_channels):
        super(GraphSAGE, self).__init__()
        self.conv1 = SAGEConv(in_channels, hidden_channels)
        self.dropout = nn.Dropout(p=0.5)
        self.conv2 = SAGEConv(hidden_channels, out_channels)

    def encode(self, x, edge_index):
        x = self.conv1(x, edge_index)
        x = F.relu(x)
        #x = self.dropout(x)
        x = self.conv2(x, edge_index)
        return F.relu(x)

    def decode(self, z, pos_edge_index, neg_edge_index):
        edge_index = torch.cat([pos_edge_index, neg_edge_index], dim=-1)
        return (z[edge_index[0]] * z[edge_index[1]]).sum(dim=-1)

    def decode_all(self, z):
        prob_adj = z @ z.t()
        return (prob_adj > 0).nonzero(as_tuple=False).t()

In [11]:
def train(model, optimizer, criterion):
    model.train()
    optimizer.zero_grad()

    z = model.encode(x, train_edges)
    out = model.decode(z, train_positive_edges, train_negative_edges)

    pos_y = torch.ones(train_positive_edges.size(1))
    neg_y = torch.zeros(train_negative_edges.size(1))
    y_true = torch.cat([pos_y, neg_y])

    loss = criterion(out, y_true)
    loss.backward()
    optimizer.step()

    y_score = torch.sigmoid(out).detach().cpu().numpy()
    y_pred = (y_score > threshold).astype(int)

    acc = compute_accuracy(y_true, y_pred)
    precision = compute_precision(y_true, y_pred)
    f1 = compute_f1(y_true, y_pred)
    auc = compute_auc_roc(y_true, y_score)

    return model, z, loss.item(), acc, precision

def evaluate(model, z, test_pos, test_neg):
    pos_y = torch.ones(test_pos.size(1))
    neg_y = torch.zeros(test_neg.size(1))
    edge_label = torch.cat([pos_y, neg_y])

    model.eval()
    with torch.no_grad():
      #z = model.encode(x, test_pos)
      out = model.decode(z, test_pos, test_neg)

    y_score = out.sigmoid().numpy()
    y_pred = (y_score > threshold).astype(int)

    df = pd.DataFrame({
      'sigmoid': y_score,
      'predicted': y_pred,
      'true': edge_label
    })

    acc = compute_accuracy(edge_label, y_pred)
    precision = compute_precision(edge_label, y_pred)
    f1 = compute_f1(edge_label, y_pred)
    auc = compute_auc_roc(edge_label, y_score)

    return acc, precision, df

def roc(df):
  y_true = df['true']  # True labels
  y_scores = df['sigmoid']  # Predicted probabilities

  # Compute ROC curve
  fpr, tpr, threshold = roc_curve(y_true, y_scores)

  # Compute AUROC score
  auc_score = roc_auc_score(y_true, y_scores)

  # Plot ROC Curve
  plt.figure(figsize=(8, 6))
  plt.plot(fpr, tpr, color='blue', label=f'AUC = {auc_score:.3f}')
  plt.plot([0, 1], [0, 1], linestyle='--', color='gray')  # Random classifier
  plt.xlabel('False Positive Rate (FPR)')
  plt.ylabel('True Positive Rate (TPR)')
  plt.title('ROC Curve')
  plt.legend(loc='lower right')
  plt.grid()
  plt.show()

In [12]:
modelSAGE = GraphSAGE(in_channels=num_of_proteins, hidden_channels=hidden_channels, out_channels=output_channels)
optimizerSAGE = torch.optim.Adam(params=modelSAGE.parameters(), lr=learning_rate)
criterionSAGE = torch.nn.BCEWithLogitsLoss()
zSAGE = []

for epoch in range(1, epochs + 1):
    modelSAGE, zSAGE, lossSAGE, accuracySAGE, precisionSAGE = train(modelSAGE, optimizerSAGE, criterionSAGE)
    print(f"Epoch {epoch:03d} | Loss: {lossSAGE:.4f} | Accuracy: {accuracySAGE:.4f} | Precision: {precisionSAGE:.4f}")

accSAGE, preSAGE, dfSAGE = evaluate(modelSAGE, zSAGE, test_positive_edges, test_negative_edges)
print(f"Test Accuracy: {accSAGE:.4f}  | Test Precision: {preSAGE:.4f}")
roc(dfSAGE)

NameError: name 'compute_accuracy' is not defined

In [None]:
rf_model = RandomForestClassifier(n_estimators=100, random_state=42)
rf_model.fit(X_train, y_train)
y_prob = rf_model.predict_proba(X_test)[:, 1]
y_pred = (y_prob > threshold).astype(int)
#y_pred = rf_model.predict(X_test)


dfRF = pd.DataFrame({
  'sigmoid': y_prob,
  'predicted': y_pred,
  'true': y_test
})


dfEN = pd.DataFrame()
dfEN['sigmoid'] = (dfSAGE['sigmoid'] + dfRF['sigmoid']) / 2
dfEN['predicted'] = (dfSAGE['predicted'] + dfRF['predicted'])
dfEN['predicted'] = ((dfEN['predicted'] > 1) & (dfEN['sigmoid'] > threshold)).astype(int)
#dfEN['predicted'] = (dfSAGE['predicted'] + dfRF['predicted'])
#dfEN['predicted'] = (dfEN['sigmoid'] > threshold).astype(int)
dfEN['true'] = dfSAGE['true']

performace(dfSAGE, 'Graph SAGE')
performace(dfRF, 'Random Forest')
performace(dfEN, 'Ensembled')
results


plot_roc_curves(dfSAGE, dfRF, dfEN)

In [None]:
results
#results.to_csv('results.csv', index=False)
#files.download('results.csv')


In [None]:
confusion = build_confusion_matrix(dfSAGE, dfRF, dfEN)
confusion

In [None]:
import matplotlib.pyplot as plt

# Threshold values
thresholds = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]

# Values for accuracy, precision, recall, and F1 score
accuracy = [66.94, 75.96, 79.23, 81.42, 89.89, 90.20, 84.42, 79.24, 69.95]
precision = [60.76, 68.77, 73.91, 77.51, 95.06, 95.92, 98.46, 99.08, 100]
recall = [95.63, 95.08, 90.16, 88.52, 84.15, 94.61, 69.95, 59.02, 39.89]
f1 = [74.31, 79.82, 81.28, 82.65, 89.28, 95.26, 81.79, 73.97, 57.03]

# Plotting the values
plt.figure(figsize=(10, 6))

# Plot each metric
plt.plot(thresholds, accuracy, label='Accuracy', marker='o', linestyle='-', color='blue')
plt.plot(thresholds, precision, label='Precision', marker='s', linestyle='-', color='green')
plt.plot(thresholds, recall, label='Recall', marker='^', linestyle='-', color='red')
plt.plot(thresholds, f1, label='F1 Score', marker='D', linestyle='-', color='purple')

# Adding labels and title
plt.xlabel('Threshold (T)')
plt.ylabel('Percentage (%)')
plt.title('Performance Metrics vs Threshold')

# Adding a legend
plt.legend()

# Displaying the plot
plt.grid(True)
plt.tight_layout()
plt.show()
