Training KGEs(

Refs:-
https://mlflow.org/docs/latest/ml/tracking/quickstart/
https://pykeen.readthedocs.io/en/stable/tutorial/models.html

In [1]:
import os
from pathlib import Path
from datetime import datetime
import torch
import pandas as pd
from tqdm import tqdm

import mlflow
from mlflow.models import infer_signature

In [2]:
from pykeen.triples import TriplesFactory
from pykeen.pipeline import pipeline
from pykeen.trackers import MLFlowResultTracker
from pykeen.pipeline import pipeline
from pykeen.trackers import MLFlowResultTracker

In [3]:
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print("Device:", DEVICE)
SEED = 23

Device: cpu


In [4]:
MLFLOW_TRACKING_URI="file:./mlruns"
MLFLOW_EXPERIMENT="KGE Training (TransE, DistMult and ComplEx) with PharMeBINet"

# mlflow.set_tracking_uri(uri="http://127.0.0.1:8080")
mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
mlflow.set_experiment(MLFLOW_EXPERIMENT)

<Experiment: artifact_location='file:///home/sid/Study/knowledge-graphs-drug-repurposing/notebooks/mlruns/981545609083190387', creation_time=1758889240882, experiment_id='981545609083190387', last_update_time=1758889240882, lifecycle_stage='active', name='KGE Training (TransE, DistMult and ComplEx) with PharMeBINet', tags={}>

In [5]:
ROOT = Path().resolve()/".." # FIXME hacky way to go back, find a way to reference actual project root
DATA_DIR = ROOT/"data"/"processed"/"pharmebinet"
RUNS_DIR = ROOT/"runs"/"pykeen_kge_pharmebinet"
RUNS_DIR.mkdir(parents=True, exist_ok=True)

TRIPLES = DATA_DIR/"triples.tsv"
E2ID = DATA_DIR/"entity2id.csv"
R2ID = DATA_DIR/"relation2id.csv"
ETYPE = DATA_DIR/"entity_types.csv"

# double check if it's all there
for p in [TRIPLES, E2ID, R2ID, ETYPE]:
    assert p.exists(), f"Missing file: {p}"

# print("entity types")
# display(pd.read_csv(ETYPE, nrows=5))
print("triples:")
display(pd.read_csv(TRIPLES, nrows=5))
print("relation to ID")
display(pd.read_csv(R2ID, nrows=30))

triples:


Unnamed: 0,relation\thead\ttail
0,INVOLVED_IN_PiiBP\t192000\t304346
1,INVOLVED_IN_PiiBP\t192000\t297825
2,INVOLVED_IN_PiiBP\t192000\t297959
3,INVOLVED_IN_PiiBP\t192000\t317503
4,INVOLVED_IN_PiiBP\t192000\t295456


relation to ID


Unnamed: 0,relation,idx
0,ACTS_UPSTREAM_OF_GauoBP,0
1,ACTS_UPSTREAM_OF_NEGATIVE_EFFECT_GauoneBP,1
2,ACTS_UPSTREAM_OF_NEGATIVE_EFFECT_PauoneBP,2
3,ACTS_UPSTREAM_OF_OR_WITHIN_GauoowBP,3
4,ACTS_UPSTREAM_OF_OR_WITHIN_NEGATIVE_EFFECT_Gau...,4
5,ACTS_UPSTREAM_OF_OR_WITHIN_NEGATIVE_EFFECT_Pau...,5
6,ACTS_UPSTREAM_OF_OR_WITHIN_POSITIVE_EFFECT_Gau...,6
7,ACTS_UPSTREAM_OF_OR_WITHIN_POSITIVE_EFFECT_Pau...,7
8,ACTS_UPSTREAM_OF_OR_WITHIN_PauoowBP,8
9,ACTS_UPSTREAM_OF_POSITIVE_EFFECT_GauopeBP,9


In [6]:
# load mappings
e2 = pd.read_csv(E2ID, dtype={"entity_id": str, "idx": int})
r2 = pd.read_csv(R2ID, dtype={"relation": str, "idx": int})

entity_to_id = dict(zip(e2["entity_id"], e2["idx"]))
relation_to_id = dict(zip(r2["relation"],  r2["idx"]))

# and triples (header: head, relation, tail) -------------------
triples_df = pd.read_csv(TRIPLES, sep="\t", dtype=str, usecols=["head", "relation", "tail"]) # pls don't crash
labeled_triples = triples_df[["head", "relation", "tail"]].values

tf_all = TriplesFactory.from_labeled_triples(
    triples=labeled_triples,
    entity_to_id=entity_to_id,
    relation_to_id=relation_to_id,
    create_inverse_triples=False, # TODO what does this really do?
)

tf_train, tf_valid, tf_test = tf_all.split([0.6, 0.2, 0.2], random_state=SEED)
# FIXME try different splits!!! 80/10/10

print(f"entities={tf_all.num_entities:,} relations={tf_all.num_relations:,}")
print(f"triples ---> train={tf_train.num_triples:,} valid={tf_valid.num_triples:,} test={tf_test.num_triples:,}")

entities=2,653,751 relations=208
triples ---> train=9,526,657 valid=3,175,553 test=3,175,553


Reference: https://pykeen.readthedocs.io/en/stable/api/pykeen.pipeline.pipeline.html

In [7]:
def train_model(name: str, model_kwargs: dict, use_inverses: bool, out_dir: Path):
    out_dir.mkdir(parents=True, exist_ok=True)

    tr = tf_train if not use_inverses else tf_train.create_inverse_triples()
    va = tf_valid if not use_inverses else tf_valid.create_inverse_triples()
    te = tf_test if not use_inverses else tf_test.create_inverse_triples()

    # PyKEEN MLflow tracker (it starts/uses a run under the experiment)
    run_name = f"{name}-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
    tracker = MLFlowResultTracker(
        tracking_uri=MLFLOW_TRACKING_URI,
        # experiment_id=MLFLOW_EXPERIMENT,
        experiment_name=MLFLOW_EXPERIMENT,
        tags={"dataset": "PharMeBINet", "embedding_name": name},
    )

    result = pipeline(
        training=tr,
        validation=va,
        testing=te,
        model=name,
        model_kwargs=model_kwargs,
        optimizer="Adam",
        optimizer_kwargs={"lr": 1e-3},
        negative_sampler="basic",
        negative_sampler_kwargs={"num_negs_per_pos": 1},
        training_kwargs={"num_epochs": 5, "batch_size": 1024, "automatic_memory_optimization": True},
        evaluator_kwargs={"filtered": True},
        device=DEVICE,
        random_seed=SEED,
        result_tracker=tracker,
    )

    # save artifacts locally 
    result.save_to_directory(out_dir)

    # also log params/metrics/artifacts to the SAME MLflow run
    # (PyKEEN should already logged losses/metrics over time but 
    # add hyperparams + artifacts + others?)
    run_id = tracker.run.info.run_id
    with mlflow.start_run(run_id=run_id):# attach to the active run
        # Hyperparameters / switches
        mlflow.log_params({
            "model": name,
            "embedding_dim": model_kwargs.get("embedding_dim"),
            "scoring_fct_norm": model_kwargs.get("scoring_fct_norm"),
            "create_inverse_triples": use_inverses,
            "lr": 1e-3,
            "epochs": 5,
            "batch_size": 1024,
            "negatives_per_pos": 1,
            "seed": SEED,
            "device": DEVICE,
        })

        # final (filtered) metrics snapshot since pykeen already gives us this
        m = result.metric_results.to_dict()
        mlflow.log_metrics({
            "filtered_mrr": m.get("mrr", float("nan")),
            "filtered_hits@1": m.get("hits_at_1", float("nan")),
            "filtered_hits@3": m.get("hits_at_3", float("nan")),
            "filtered_hits@10": m.get("hits_at_10", float("nan")),
        })

        # artifacts
        mlflow.log_artifacts(str(out_dir)) # the whole run folder (model, metadata, etc.)
        mlflow.log_artifact(str(TRIPLES))
        mlflow.log_artifact(str(E2ID))
        mlflow.log_artifact(str(R2ID))

    print(f"[{name}] run_id={run_id}")
    metrics = result.metric_results.to_dict()
    print(f"[{name}] filtered: MRR={metrics.get('mrr'):.4f}"
          f"H@1={metrics.get('hits_at_1'):.4f}  H@10={metrics.get('hits_at_10'):.4f}")
    return result, run_id


In [8]:
experiments = [
    ("TransE",   {"embedding_dim": 128, "scoring_fct_norm": 1}, False), # TODO check 512 dim
    #("DistMult", {"embedding_dim": 256}, True), # TODO why inverse = True?
    #("ComplEx",  {"embedding_dim": 256}, True), # TODO why inverse = True?
]

runs = {}
for name, mkw, inv in experiments:
    print(f"====Training {name}")
    out = RUNS_DIR/name.lower()
    res, run_id = train_model(name, mkw, inv, out)
    runs[name] = run_id

pd.DataFrame({k: v.metric_results.to_dict() for k, v in results.items()}).T

====Training TransE


TypeError: Model.__init__() got an unexpected keyword argument 'automatic_memory_optimization'