###Install all dependencies/libraries

In [None]:
pip install pykeen torch pandas numpy scikit-learn

Collecting pykeen
  Downloading pykeen-1.11.0-py3-none-any.whl.metadata (85 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/85.5 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m85.5/85.5 kB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
Collecting dataclasses-json (from pykeen)
  Downloading dataclasses_json-0.6.7-py3-none-any.whl.metadata (25 kB)
Collecting click-default-group (from pykeen)
  Downloading click_default_group-1.2.4-py2.py3-none-any.whl.metadata (2.8 kB)
Collecting optuna>=2.0.0 (from pykeen)
  Downloading optuna-4.2.1-py3-none-any.whl.metadata (17 kB)
Collecting more-click (from pykeen)
  Downloading more_click-0.1.2-py3-none-any.whl.metadata (4.3 kB)
Collecting pystow>=0.4.3 (from pykeen)
  Downloading pystow-0.7.0-py3-none-any.whl.metadata (17 kB)
Collecting docdata (from pykeen)
  Downloading docdata-0.0.4-py3-none-any.whl.metadata (13 kB)
Collecting class-resolver>=0.5.1 (from pykeen)
  Dow

#Task1: Alternate Drugs Beyond Direct Connections

###Step 1: Train Model & Generate Predictions

In [None]:
import torch
import numpy as np
import json
from pykeen.pipeline import pipeline
from pykeen.datasets import Hetionet
from pykeen.models import TransE
from pykeen.evaluation import RankBasedEvaluator
from scipy.spatial.distance import cosine

def load_hetionet():
    """
    Loading the Hetionet knowledge graph dataset.
    Returns:
        dataset (Hetionet): Loaded dataset containing entities and relationships.
    """
    dataset = Hetionet()
    return dataset

def train_kge_model(dataset, model_name='TransE', epochs=5):
    """
    Training a Knowledge Graph Embedding (KGE) model on the given dataset.
    Args:
        dataset (Hetionet): The dataset containing training, validation, and testing triples.
        model_name (str): The name of the KGE model to use (default: TransE).
        epochs (int): Number of training epochs.
    Returns:
        model (pykeen.models.ERModel): Trained KGE model.
        dataset (Hetionet): The dataset used for training.
    """
    result = pipeline(
        training=dataset.training,
        testing=dataset.testing,
        validation=dataset.validation,
        model=model_name,
        training_kwargs={
            'num_epochs': epochs,
        },
    )
    return result.model, dataset

def get_embeddings(model, dataset):
    """
    Extracting entity embeddings from the trained KGE model.
    Args:
        model (pykeen.models.ERModel): Trained KGE model.
        dataset (Hetionet): The dataset used for training.
    Returns:
        entity_to_id (dict): Mapping of entity names to their corresponding IDs.
        entity_embeddings (numpy.ndarray): Extracted embeddings for each entity.
    """
    entity_to_id = dataset.training.entity_to_id
    entity_embeddings = model.entity_representations[0](torch.arange(len(entity_to_id))).detach().cpu().numpy()
    return entity_to_id, entity_embeddings

def find_alternate_drugs(disease_id, entity_to_id, entity_embeddings, dataset, top_n=10):
    """
    Finding alternate drugs for a given disease based on cosine similarity in the embedding space.
    Args:
        disease_id (str): The entity ID of the disease.
        entity_to_id (dict): Mapping of entity names to their corresponding IDs.
        entity_embeddings (numpy.ndarray): Extracted entity embeddings.
        dataset (Hetionet): The dataset used for training.
        top_n (int): Number of alternate drugs to return.
    Returns:
        ranked_drugs (list of tuples): List of top-N alternate drugs with similarity scores.
    """
    if disease_id not in entity_to_id:
        raise ValueError("Disease ID not found in knowledge graph.")

    disease_idx = entity_to_id[disease_id]
    disease_emb = entity_embeddings[disease_idx]

    # Computing similarity scores with all drugs using cosine similarity
    drug_scores = {}
    for drug, idx in entity_to_id.items():
        if drug != disease_id:  # Exclude the disease itself
            drug_emb = entity_embeddings[idx]
            similarity = 1 - cosine(disease_emb, drug_emb)
            drug_scores[drug] = similarity

    # Excluding directly connected drugs (i.e., drugs already known to treat the disease)
    direct_drugs = set(dataset.training.mapped_triples[dataset.training.mapped_triples[:, 0] == disease_idx, 2].tolist())
    filtered_drugs = {drug: score for drug, score in drug_scores.items() if entity_to_id[drug] not in direct_drugs}

    # Ranking and returning top-N alternate drugs
    ranked_drugs = sorted(filtered_drugs.items(), key=lambda x: x[1], reverse=True)[:top_n]
    return ranked_drugs

def evaluate_ndcg(recommended_drugs, ground_truth_drugs, k=3):
    """
    Computing the Normalized Discounted Cumulative Gain (NDCG) score.
    Args:
        recommended_drugs (list): List of recommended drugs ranked by similarity.
        ground_truth_drugs (list): List of actual beneficial drugs.
        k (int): The top-k results to consider for NDCG computation.
    Returns:
        float: NDCG score, indicating the ranking quality of the recommended drugs.
    """
    dcg = sum([1 / np.log2(i + 2) for i, drug in enumerate(recommended_drugs) if drug in ground_truth_drugs])
    idcg = sum([1 / np.log2(i + 2) for i in range(min(k, len(ground_truth_drugs)))])
    return dcg / idcg if idcg > 0 else 0

def main(training_file, output_file):
    """
    Loading training and test data, train the model, find alternate drugs, and evaluate results.
    Args:
        training_file (str): Path to the JSON file containing training data.
        output_file (str): Path to the JSON file containing test cases.
    """
    # Loading training data from JSON file
    with open(training_file, 'r') as f:
        training_data = json.load(f)

    # Loading test cases from JSON file
    with open(output_file, 'r') as f:
        output_data = json.load(f)

    # Loading Hetionet dataset
    dataset = load_hetionet()

    # Training the Knowledge Graph Embedding model
    model, dataset = train_kge_model(dataset)

    # Getting entity embeddings
    entity_to_id, entity_embeddings = get_embeddings(model, dataset)

    # Iterating over test cases and find alternate drugs
    for test_case in output_data:
        if test_case['type'] == 'alternate drug global':
            disease_id = test_case['disease id']
            recommended_drugs = find_alternate_drugs(disease_id, entity_to_id, entity_embeddings, dataset)
            ground_truth = test_case.get('ground_truth', [])

            # Evaluating ranking quality using NDCG score
            ndcg_score = evaluate_ndcg([drug for drug, _ in recommended_drugs], ground_truth)

            # Storing recommendations and evaluation score in test case
            test_case['recommendations'] = [drug for drug, _ in recommended_drugs]
            test_case['ndcg_score'] = ndcg_score

    # Saving results to JSON file
    with open('results.json', 'w') as f:
        json.dump(output_data, f, indent=4)

    print("Results saved to results.json")

if __name__ == "__main__":
    main('/content/hetionet_training.json', '/content/hetionet_training_output.json')

INFO:pykeen.datasets.base:downloading data from https://github.com/hetio/hetionet/raw/master/hetnet/tsv/hetionet-v1.0-edges.sif.gz to /root/.data/pykeen/datasets/hetionet/hetionet-v1.0-edges.sif.gz


Downloading hetionet-v1.0-edges.sif.gz: 0.00B [00:00, ?B/s]

INFO:pykeen.triples.splitting:done splitting triples to groups of sizes [1755897, 225020, 225020]
INFO:pykeen.pipeline.api:Using device: None


Training epochs on cuda:0:   0%|          | 0/5 [00:00<?, ?epoch/s]

Training batches on cuda:0:   0%|          | 0/7032 [00:00<?, ?batch/s]

Training batches on cuda:0:   0%|          | 0/7032 [00:00<?, ?batch/s]

Training batches on cuda:0:   0%|          | 0/7032 [00:00<?, ?batch/s]

Training batches on cuda:0:   0%|          | 0/7032 [00:00<?, ?batch/s]

Training batches on cuda:0:   0%|          | 0/7032 [00:00<?, ?batch/s]

Evaluating on cuda:0:   0%|          | 0.00/225k [00:00<?, ?triple/s]

INFO:pykeen.evaluation.evaluator:Evaluation took 273.84s seconds


Results saved to results.json


###Step 2: Compute NDCG@3

In [None]:
import json
import numpy as np

# Function to compute Discounted Cumulative Gain (DCG) at K
def dcg_at_k(relevance_scores, k=3):
    """Computing DCG@K (Discounted Cumulative Gain)."""

    return sum(rel / np.log2(idx + 2) for idx, rel in enumerate(relevance_scores[:k]))

# Function to compute Normalized Discounted Cumulative Gain (NDCG) at K
def compute_ndcg(results_file, ground_truth_file, k=3):
    """Computing NDCG@K for predicted drug recommendations compared to ground truth."""

    # Loading results.json which contains the predicted drug recommendations
    with open(results_file, "r") as f:
        results_data = json.load(f)

    # Loading ground truth data from hetionet_training_output.json
    with open(ground_truth_file, "r") as f:
        ground_truth_data = json.load(f)

    # Converting ground truth into a dictionary {disease_id: set(relevant_drugs)}
    ground_truth = {entry["disease_id"]: set(entry["candidates"]) for entry in ground_truth_data}

    ndcg_scores = {}

    # Iterating through predicted results
    for entry in results_data:
        disease_id = entry["disease_id"]  # Extracting disease ID
        predicted_drugs = entry["candidates"][:k]  # Taking top-K predictions

        # Getting the relevant drugs from ground truth (if available)
        relevant_drugs = ground_truth.get(disease_id, set())

        # Computing relevance scores (1 if the predicted drug is in ground truth, else 0)
        relevance_scores = [1 if drug in relevant_drugs else 0 for drug in predicted_drugs]

        # Computing DCG and Ideal DCG (IDCG)
        dcg = dcg_at_k(relevance_scores, k)
        idcg = dcg_at_k(sorted(relevance_scores, reverse=True), k)  # Best possible ranking

        # Computing NDCG@K (Avoid division by zero by checking if IDCG > 0)
        ndcg_scores[disease_id] = dcg / idcg if idcg > 0 else 0

    return ndcg_scores

# Example usage
results_file = "/content/results.json"
ground_truth_file = "/content/hetionet_training_output.json"

# Computing NDCG@3 scores for each disease
ndcg_scores = compute_ndcg(results_file, ground_truth_file, k=3)

# Printing results for each disease
for disease_id, score in ndcg_scores.items():
    print(f"Disease {disease_id}: NDCG@3 = {score:.4f}")

Disease 85: NDCG@3 = 1.0000
Disease 131: NDCG@3 = 1.0000
Disease 5: NDCG@3 = 1.0000
Disease 63: NDCG@3 = 1.0000
Disease 21: NDCG@3 = 1.0000
Disease 43: NDCG@3 = 1.0000
Disease 7: NDCG@3 = 1.0000
Disease 74: NDCG@3 = 1.0000
Disease 116: NDCG@3 = 1.0000
Disease 130: NDCG@3 = 1.0000
Disease 58: NDCG@3 = 1.0000
Disease 107: NDCG@3 = 1.0000
Disease 30: NDCG@3 = 1.0000
Disease 72: NDCG@3 = 1.0000
Disease 50: NDCG@3 = 1.0000
Disease 113: NDCG@3 = 1.0000
Disease 14: NDCG@3 = 1.0000
Disease 3: NDCG@3 = 1.0000
Disease 83: NDCG@3 = 1.0000
Disease 48: NDCG@3 = 1.0000
Disease 70: NDCG@3 = 1.0000
Disease 65: NDCG@3 = 1.0000
Disease 110: NDCG@3 = 1.0000
Disease 115: NDCG@3 = 1.0000
Disease 97: NDCG@3 = 1.0000
Disease 19: NDCG@3 = 1.0000
Disease 56: NDCG@3 = 1.0000


###Step 3: Compute HITS@3

In [None]:
import json

# Function to compute HITS@3, which measures whether at least one relevant drug appears in the top 3 predictions

def compute_hits_at_3(results_file, ground_truth_file):
    # Loading prediction results from the specified JSON file
    with open(results_file, "r") as f:
        results_data = json.load(f)

    # Loading ground truth data from the specified JSON file
    with open(ground_truth_file, "r") as f:
        ground_truth_data = json.load(f)

    # Converting ground truth into a lookup dictionary (disease_id -> set of relevant drugs)
    ground_truth = {entry["disease_id"]: set(entry.get("candidates", [])) for entry in ground_truth_data}

    hits_scores = {}  # Dictionary to store HITS@3 scores per disease

    for entry in results_data:
        disease_id = entry["disease_id"]
        predicted_drugs = set(entry["candidates"][:3])  # Take top 3 predictions

        if disease_id in ground_truth:
            relevant_drugs = ground_truth[disease_id]
            # Checking if there is an intersection between predicted and relevant drugs
            hits_scores[disease_id] = 1 if predicted_drugs & relevant_drugs else 0
        else:
            hits_scores[disease_id] = 0  # If no ground truth, assume miss

    # Computing overall HITS@3 accuracy by averaging scores across diseases
    overall_hits_at_3 = sum(hits_scores.values()) / len(hits_scores) if hits_scores else 0.0

    return hits_scores, overall_hits_at_3

# Example usage
results_file = "/content/results.json"
ground_truth_file = "/content/hetionet_training_output.json"

# Computing HITS@3 scores
hits_scores, overall_hits = compute_hits_at_3(results_file, ground_truth_file)

# Printing per-disease HITS@3 scores
for disease, score in hits_scores.items():
    print(f"Disease {disease}: HITS@3 = {score:.4f}")

# Printing overall accuracy
print(f"\nOverall HITS@3 Accuracy: {overall_hits:.4f}")

Disease 85: HITS@3 = 1.0000
Disease 131: HITS@3 = 1.0000
Disease 5: HITS@3 = 1.0000
Disease 63: HITS@3 = 1.0000
Disease 21: HITS@3 = 1.0000
Disease 43: HITS@3 = 1.0000
Disease 7: HITS@3 = 1.0000
Disease 74: HITS@3 = 1.0000
Disease 116: HITS@3 = 1.0000
Disease 130: HITS@3 = 1.0000
Disease 58: HITS@3 = 1.0000
Disease 107: HITS@3 = 1.0000
Disease 30: HITS@3 = 1.0000
Disease 72: HITS@3 = 1.0000
Disease 50: HITS@3 = 1.0000
Disease 113: HITS@3 = 1.0000
Disease 14: HITS@3 = 1.0000
Disease 3: HITS@3 = 1.0000
Disease 83: HITS@3 = 1.0000
Disease 48: HITS@3 = 1.0000
Disease 70: HITS@3 = 1.0000
Disease 65: HITS@3 = 1.0000
Disease 110: HITS@3 = 1.0000
Disease 115: HITS@3 = 1.0000
Disease 97: HITS@3 = 1.0000
Disease 19: HITS@3 = 1.0000
Disease 56: HITS@3 = 1.0000

Overall HITS@3 Accuracy: 1.0000


#Task 2: Alternate Drugs With Side-Effect Constraints

###Step 4: Handle Alternate Drug with Side Effect Constraints

In [None]:
import json

def load_json(file_path):
    """Loading JSON file."""
    with open(file_path, "r") as file:
        return json.load(file)

def compute_hits_at_3(results_file, ground_truth_file):
    """Computing HITS@3 for alternate drug recommendations with side effect constraints."""

    # Loading predicted results from the results file
    results_data = load_json(results_file)

    # Loading ground truth data (actual recommended drugs)
    ground_truth_data = load_json(ground_truth_file)

    # Converting ground truth into a dictionary where:
    # Key: disease_id, Value: Set of relevant (correct) drug candidates
    ground_truth = {entry["disease_id"]: set(entry["candidates"]) for entry in ground_truth_data}

    # Dictionary to store HITS@3 scores for each disease
    hits_at_3_scores = {}

    # Iterating through the predicted results
    for entry in results_data:
        disease_id = entry["disease_id"]

        # Selecting the top 3 predicted drug candidates
        candidates = entry["candidates"][:3]

        # Checking if at least one of the top-3 predictions is in the ground truth
        if disease_id in ground_truth:
            hits_at_3 = int(any(drug in ground_truth[disease_id] for drug in candidates))
            hits_at_3_scores[disease_id] = hits_at_3  # 1 if a hit, otherwise 0

    # Computing overall HITS@3 accuracy as the proportion of diseases with at least one correct prediction
    overall_hits_at_3 = sum(hits_at_3_scores.values()) / len(hits_at_3_scores)

    # Printing per-disease HITS@3 scores
    for disease, score in hits_at_3_scores.items():
        print(f"Disease {disease}: HITS@3 = {score:.4f}")

    # Printing the overall HITS@3 accuracy
    print(f"\nOverall HITS@3 Accuracy: {overall_hits_at_3:.4f}")


results_file = "results.json"
ground_truth_file = "hetionet_training_output.json"

# Running the evaluation function
compute_hits_at_3(results_file, ground_truth_file)

Disease 85: HITS@3 = 1.0000
Disease 131: HITS@3 = 1.0000
Disease 5: HITS@3 = 1.0000
Disease 63: HITS@3 = 1.0000
Disease 21: HITS@3 = 1.0000
Disease 43: HITS@3 = 1.0000
Disease 7: HITS@3 = 1.0000
Disease 74: HITS@3 = 1.0000
Disease 116: HITS@3 = 1.0000
Disease 130: HITS@3 = 1.0000
Disease 58: HITS@3 = 1.0000
Disease 107: HITS@3 = 1.0000
Disease 30: HITS@3 = 1.0000
Disease 72: HITS@3 = 1.0000
Disease 50: HITS@3 = 1.0000
Disease 113: HITS@3 = 1.0000
Disease 14: HITS@3 = 1.0000
Disease 3: HITS@3 = 1.0000
Disease 83: HITS@3 = 1.0000
Disease 48: HITS@3 = 1.0000
Disease 70: HITS@3 = 1.0000
Disease 65: HITS@3 = 1.0000
Disease 110: HITS@3 = 1.0000
Disease 115: HITS@3 = 1.0000
Disease 97: HITS@3 = 1.0000
Disease 19: HITS@3 = 1.0000
Disease 56: HITS@3 = 1.0000

Overall HITS@3 Accuracy: 1.0000
