# Sample notebook for running ULTRA
This notebook serves as a sample notebook to create a dataset, run the dataset, and evaluate/explore the output predictions with the ULTRA foundation model. The notebook is divided into three sections: dataset creation, running the model, and prediction evaluation.

prior to running code, you want to setup your file directory. I recommend creating a project folder that you can download data, organize your notebook and code. Sample structure like this:
* kg-models/
    * code/
    * data/
    * notebooks/
    * output/

You can download [ULTRA](https://github.com/roger-tu/ULTRA) under `code/ULTRA` (as well as any other algorithms you want to try), and this notebook under `notebooks/ultra_nb/999_Sample_ULTRA_Run.ipynb`.

Alternatively, clone this repo and run the notebook.

In [1]:
# pathing
import os
import os.path as osp
import pickle
import pathlib as Path

# data sci & stats
import polars as pl
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.stats import spearmanr, pearsonr

# run model
import shutil
import subprocess
import torch
import sys
from tqdm import tqdm

# get model dicts
sys.path.append('../')
from ultra import datasets, data_util

# Add your dataset to `ultra.datasets`
* before beginning add your example dataset into ultra.datasets as a class object in ultra/datasets
* you can copy and paste the code below for this particular notebook
* this tells the model that you can find the dataset at a particular location, and how the dataset is formatted.

```python
# After the TransductiveDatset Class, approx Line 379
class SamplePrimeKG(TransductiveDataset):
    '''
    This is a sample PrimeKG dataset to preprocess for ULTRA usage. 
    Its just PrimeKG randomly split using seed 42.
    Two variables need to be specified: name and delimiter
    '''
    name = 'primekg' # the file name holding the train/test/valid dataset
    delimiter = '\t' # format of the files in the folder
```

# Dataset Download, Creation and Preprocessing

This is an example of creating a custom dataset to run in ULTRA. Here I create a modified PrimeKG and insert it into the codebase. This step is required because the model needs to do its own pre-processing on the raw triples.

## Download and Preprocess data

In [2]:
# download data
!wget -O ../data/primekg.csv https://dataverse.harvard.edu/api/access/datafile/6180620 --no-check-certificate -nc

for details.



--2025-09-02 19:28:54--  https://dataverse.harvard.edu/api/access/datafile/6180620
Resolving dataverse.harvard.edu (dataverse.harvard.edu)... 3.90.211.193, 18.214.210.234, 52.6.5.183
Connecting to dataverse.harvard.edu (dataverse.harvard.edu)|3.90.211.193|:443... connected.
HTTP request sent, awaiting response... 303 See Other
Location: https://dvn-cloud.s3.amazonaws.com/10.7910/DVN/IXA7BM/1805e679c4c-72137dbedbf1?response-content-disposition=attachment%3B%20filename%2A%3DUTF-8%27%27kg.csv&response-content-type=text%2Fcsv&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Date=20250902T192855Z&X-Amz-SignedHeaders=host&X-Amz-Credential=AKIAIEJ3NV7UYCSRJC7A%2F20250902%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Expires=3600&X-Amz-Signature=97b8cfae36308479fe4338ef17b306f80fcb73211beb25967ae5eea7e78ef5df [following]
--2025-09-02 19:28:55--  https://dvn-cloud.s3.amazonaws.com/10.7910/DVN/IXA7BM/1805e679c4c-72137dbedbf1?response-content-disposition=attachment%3B%20filename%2A%3DUTF-8%27%27kg.csv&response-con

### Load data

In [3]:
# Load data into pandas
df = pd.read_csv('../data/primekg.csv',low_memory=False)
df.head(2)

Unnamed: 0,relation,display_relation,x_index,x_id,x_type,x_name,x_source,y_index,y_id,y_type,y_name,y_source
0,protein_protein,ppi,0,9796,gene/protein,PHYHIP,NCBI,8889,56992,gene/protein,KIF15,NCBI
1,protein_protein,ppi,1,7918,gene/protein,GPANK1,NCBI,2798,9240,gene/protein,PNMA1,NCBI


### Preprocess data

In [4]:
# extract the node heads/tails and get unique nodes
node_heads = df[['x_index','x_id','x_name','x_type','x_source']].rename(columns = {'x_index':'index','x_id':'id','x_name':'name','x_type':'type','x_source':'source'})
node_tails = df[['y_index','y_id','y_name','y_type','y_source']].rename(columns = {'y_index':'index','y_id':'id','y_name':'name','y_type':'type','y_source':'source'})

# remove duplicates
nodes = pd.concat([node_heads,node_tails])[['name','type','source','id']].drop_duplicates()
nodes['source_label'] = nodes['source'] +':'+nodes['id']

# export to data folder
nodes[['name','type','source','id','source_label']].rename(columns = {'id':'source_id'}).to_csv('../data/nodes.txt',sep = '\t', header = True, index = False)
nodes.head(2)

Unnamed: 0,name,type,source,id,source_label
0,PHYHIP,gene/protein,NCBI,9796,NCBI:9796
1,GPANK1,gene/protein,NCBI,7918,NCBI:7918


In [5]:
# create node to label mapping and map the triples
nodes_dict = dict(zip(nodes['name'],nodes['source_label']))

In [6]:
triples = df[['x_name','display_relation','y_name']].drop_duplicates().rename(columns = {'x_name':'head','display_relation':'relation','y_name':'tail'})
triples['head_label'] = triples['head'].apply(lambda x: nodes_dict[x])
triples['tail_label'] = triples['tail'].apply(lambda x: nodes_dict[x])

# export the data
triples[['head_label','relation','tail_label']].to_csv('../data/graph.txt',sep = '\t', header = False, index = False)
triples.head(2)

Unnamed: 0,head,relation,tail,head_label,tail_label
0,PHYHIP,ppi,KIF15,NCBI:9796,NCBI:56992
1,GPANK1,ppi,PNMA1,NCBI:7918,NCBI:9240


## Load Preprocessed data
PrimeKG is a semantic knowledge graph. The raw form is just 1 file containing nodes/edges information. I separated them into two different files: nodes and graph.
* The nodes file is located here: `../../data/nodes.txt`
* All edges (graph) is located here: `../../data/graph.txt`

In [7]:
# using polars to import in the dataframes
nodes = pl.read_csv(
    '../data/nodes.txt',
    separator='\t',
    schema={
        'name':pl.String,
        'type':pl.String,
        'source':pl.String,
        'source_id':pl.String,
        'source_label':pl.String
    }
)

graph = pl.read_csv('../data/graph.txt', separator='\t', new_columns=['h','r','t'])
# get shapes
print(f'Nodes shape: {nodes.shape}')
print(f'Graph shape: {graph.shape}')

Nodes shape: (129375, 5)
Graph shape: (8100048, 3)


## Create data splits
* we're just going to randomly split the data into 80/10/10% train, test, valid splits here
* in ULTRA, the model needs a training set to calculate relative relational and relative entity embeddings
* it's a foundation model in the sense that you can apply the model on any dataset, but the model will still need to compute relative embeddings

In [8]:
train = graph.sample(fraction=0.8, seed=42)
test = graph.join(train, on = ['h','r','t'], how = 'anti').sample(fraction=0.5, seed=42)
valid = graph.join(pl.concat([train, test]), on = ['h','r','t'], how = 'anti')

print(f'Train shape: {train.shape}')
print(f'Test shape: {test.shape}')
print(f'Valid shape: {valid.shape}')

Train shape: (6480038, 3)
Test shape: (810005, 3)
Valid shape: (810005, 3)


## export the dataset

In [9]:
data_dir = '../data/primekg/raw'

# check if directory exists, if not create it
if osp.exists(data_dir)==False:
    print(f'Creating directory: {data_dir}')
    os.makedirs(data_dir)

else:
    print(f'Directory already exists')


# export files
for i in ['train', 'test','valid']:
    if osp.exists(export_dir := osp.join(data_dir,f'{i}.txt'))==False:
        print(f'Creating file: {export_dir}')
        tmp_df = eval(i)
        tmp_df.write_csv(export_dir, separator='\t', include_header=False)
    else:
        print(f'File already exists: {export_dir}')

Directory already exists
File already exists: ../data/primekg/raw/train.txt
File already exists: ../data/primekg/raw/test.txt
File already exists: ../data/primekg/raw/valid.txt


## build dataset object

After exporting the train/test/valid object, go to `ultra.datasets` and add your own class object. If you didn't do this before beginning you may need to restart the jupyter notebook or reimport the `datasets` library into the notebook. The class object example looks like this. We need this inorder to preprocess and build the dataset to be used by ULTRA.


```python
class SamplePrimeKG(TransductiveDataset):
    '''
    This is a sample PrimeKG dataset to preprocess for ULTRA usage. 
    Its just PrimeKG randomly split using seed 42.
    Two variables need to be specified: name and delimiter
    '''
    name = 'sample_primekg' # the file name holding the train/test/valid dataset
    delimiter = '\t' # format of the files in the folder
```

### build dataset object by calling the object

In [10]:
# just calling the class object will create the dataset
# need to point the class to the root directory of the dataset
sample_data = datasets.SamplePrimeKG(root='../data')

Processing...
  Ahh = torch.sparse.mm(EhT, Eh).coalesce()
Done!
  self.data, self.slices = torch.load(self.processed_paths[0])


#### extract the entity to id dictionary conversion

In [11]:
def get_id2ent_rel_dict(dataset)->dict:
    return {v:k for k,v in dataset.entity_vocab.items()}, {v:k for k,v in dataset.relation_vocab.items()}

In [12]:
output_dir = '../output/Ultra/SamplePrimeKG'
# extract dictionaries
tmp = get_id2ent_rel_dict(sample_data)

# check if directory exists, if not create it
if osp.exists(output_dir)==False:
    print(f'Creating directory: {output_dir}')
    os.makedirs(output_dir)

    # dump id2ent
    with open(osp.join(output_dir, 'id2ent_dict.pkl'),'wb') as f:
        pickle.dump(tmp[0],f)

    # dump id2rel
    with open(osp.join(output_dir, 'id2rel_dict.pkl'),'wb') as f:
        pickle.dump(tmp[1],f)

else:
    print(f'Directory already exists')

Creating directory: ../output/Ultra/SamplePrimeKG


#### create entity to id mapping

In [19]:
# also make sure ent2name_dict.pkl exists. we can pull from an existing directory
if osp.exists(osp.join(output_dir, "ent2name_dict.pkl")) == False:
    print("ent2name_dict.pkl not found, creating")
    ent2name_dict = dict(zip(nodes['source_label'].to_list(),nodes['name'].to_list()))
    
    with open('../data/ent2name_dict.pkl', 'wb') as f:
        pickle.dump(ent2name_dict, f)
else:
    print("ent2name_dict.pkl already exists, skipping copy")

ent2name_dict.pkl not found, creating


# Run Model
* You can run the model and evaluate/get predictions across the test/valid set 
* You can also run an inference script that evaluates 1 node at a time

## Run evaluation on test/valid

In [None]:
# Set pixi as environment variable so subprocess knows where to find it
# V100 platform use '7.0+PTX'
os.environ['TORCH_CUDA_ARCH_LIST'] = '8.9+PTX' # L4/L40s
my_env = os.environ.copy()
my_env["PATH"] = f"/home/sagemaker-user/.pixi/bin:{my_env['PATH']}"

# subprocess command
cmd = f'pixi run -e gpu torchrun --nproc-per-node=4 \
    "/home/sagemaker-user/knowledge-graph-workflows-and-models-team-primeKG/code/ULTRA/script/run.py" \
    -c "/home/sagemaker-user/knowledge-graph-workflows-and-models-team-primeKG/code/ULTRA/config/transductive/inference_pkg.yaml" \
    --ckpt "/home/sagemaker-user/knowledge-graph-workflows-and-models-team-primeKG/code/ULTRA/ckpts/ultra_primekg_50g_ft_epoch_1.pth" \
    --dataset "SamplePrimeKG" \
    --gpus [0,1,2,3] \
    --epochs 0 \
    --bpe 4 \
    --tb null'

# execute subprocess
subprocess.run(cmd,env=my_env, shell=True)

# you'll get like 1000 parquet files that need to be combined
# export is setup this way because GPU can't keep everything in memory

W0813 00:51:42.821000 11913 site-packages/torch/distributed/run.py:793] 
W0813 00:51:42.821000 11913 site-packages/torch/distributed/run.py:793] *****************************************
W0813 00:51:42.821000 11913 site-packages/torch/distributed/run.py:793] Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed. 
W0813 00:51:42.821000 11913 site-packages/torch/distributed/run.py:793] *****************************************
[rank3]:[W813 00:51:45.815917586 ProcessGroupNCCL.cpp:4115] [PG ID 0 PG GUID 0 Rank 3]  using GPU 3 to perform barrier as devices used by this process are currently unknown. This can potentially cause a hang if this rank to GPU mapping is incorrect.Specify device_ids in barrier() to force use of a particular device,or call init_process_group() with a device_id.
[rank2]:[W813 00:51:45.049605982 ProcessGroupNCCL.cpp

Load rspmm extension. This may take a while...
Load rspmm extension. This may take a while...
Load rspmm extension. This may take a while...
Load rspmm extension. This may take a while...


06:43:28   mr: 168.763
06:43:28   mrr: 0.850323
06:43:28   hits@1: 0.810478
06:43:28   hits@3: 0.880861
06:43:28   hits@10: 0.916597
06:43:28   >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
06:43:28   Evaluate on test


## Run inference for a particular query
* Notice a different command and `-c` that is run. Here it's `inference.py` with the `-c` flag as `inference_single.yaml`
* Full paths must be specified for each flag, not relative directory.

In [9]:
# Set pixi as environment variable so subprocess knows where to find it
os.environ['TORCH_CUDA_ARCH_LIST'] = '8.9+PTX' # L4/L40s
my_env = os.environ.copy()
my_env["PATH"] = f"/home/sagemaker-user/.pixi/bin:{my_env['PATH']}"

# variables
h = 'NCBI:7297' # TYK2
r = 'associated with'

# initialize command
cmd = f'pixi run -e gpu torchrun --nproc-per-node=4 \
"/home/sagemaker-user/knowledge-graph-workflows-and-models-team-primeKG/code/ULTRA/script/inference.py" \
-c \
"/home/sagemaker-user/knowledge-graph-workflows-and-models-team-primeKG/code/ULTRA/config/transductive/inference_single.yaml" \
--ckpt "/home/sagemaker-user/knowledge-graph-workflows-and-models-team-primeKG/code/ULTRA/ckpts/ultra_primekg_50g_ft_epoch_1.pth" \
--data_dir "/home/sagemaker-user/knowledge-graph-workflows-and-models-team-primeKG/data/" \
--dataset "SamplePrimeKG" \
--gpus [0,1,2,3] \
--h_ent {h} --rel "{r}"'

# execute subprocess
subprocess.run(cmd,env=my_env,shell=True)


W0813 17:35:40.066000 30367 site-packages/torch/distributed/run.py:793] 
W0813 17:35:40.066000 30367 site-packages/torch/distributed/run.py:793] *****************************************
W0813 17:35:40.066000 30367 site-packages/torch/distributed/run.py:793] Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed. 
W0813 17:35:40.066000 30367 site-packages/torch/distributed/run.py:793] *****************************************
[rank1]:[W813 17:35:42.891823336 ProcessGroupNCCL.cpp:4115] [PG ID 0 PG GUID 0 Rank 1]  using GPU 1 to perform barrier as devices used by this process are currently unknown. This can potentially cause a hang if this rank to GPU mapping is incorrect.Specify device_ids in barrier() to force use of a particular device,or call init_process_group() with a device_id.
[rank3]:[W813 17:35:42.145715522 ProcessGroupNCCL.cpp

Load rspmm extension. This may take a while...


  state = torch.load(cfg.checkpoint, map_location="cpu")


Load rspmm extension. This may take a while...
Load rspmm extension. This may take a while...
Load rspmm extension. This may take a while...


17:36:52   >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
17:36:52   Exporting Predictions for batch 0


CompletedProcess(args='pixi run -e gpu torchrun --nproc-per-node=4 "/home/sagemaker-user/knowledge-graph-workflows-and-models-team-primeKG/code/ULTRA/script/inference.py" -c "/home/sagemaker-user/knowledge-graph-workflows-and-models-team-primeKG/code/ULTRA/config/transductive/inference_single.yaml" --ckpt "/home/sagemaker-user/knowledge-graph-workflows-and-models-team-primeKG/code/ULTRA/ckpts/ultra_primekg_50g_ft_epoch_1.pth" --data_dir "/home/sagemaker-user/knowledge-graph-workflows-and-models-team-primeKG/data/" --dataset "SamplePrimeKG" --gpus [0,1,2,3] --h_ent NCBI:7297 --rel "associated with"', returncode=0)

# Process and collate datasets
* If you ran the evaluation on test/valid instead of single inference, you probably got thousands of parquet files in your output directory
* we need to combine those outputs, and process them into human readable format

In [9]:
# output directory
batched = '../../output/Ultra/SamplePrimeKG/2025-08-13-00-51-45' # test/valid
single = '../../output/Ultra/SamplePrimeKG/2025-08-13-17-35-42' # single TYK2-associated_with query

print(f'Parent directory of both results: {osp.dirname(batched)}')
print(f'Batched file name: {osp.basename(batched)}')

Parent directory of both results: ../../output/Ultra/SamplePrimeKG
Batched file name: 2025-08-13-00-51-45


## process single query
* I keep the scores in the single query

In [10]:
# what does it look like initially?
pl.read_parquet(osp.join(single, 'inference_0.parquet')).head(2)

h,r,t,t_filt_rank,t_unfilt_rank,t_pred_filt,t_pred_unfilt,t_pred_score,t_mask
i64,i64,i64,i64,i64,list[i64],list[i64],list[f64],list[bool]
6031,5,86164,1,23,"[7123, 10907, … 86164]","[31225, 24719, … 6031]","[-20.370192, -20.601143, … -5.81477]","[true, true, … true]"
6031,5,2045,1,24,"[7123, 10907, … 86164]","[31225, 24719, … 6031]","[-20.370192, -20.601143, … -5.81477]","[true, true, … true]"


In [11]:
single_df = data_util.filter_process_results(
    df=data_util.load_and_translate_results(
        data_path=osp.dirname(single), results_folder=osp.basename(single)
    ),
    results_path=os.path.dirname(single),
).unique()

print(f"Single results shape: {single_df.shape}")

Single results shape: (129199, 7)


In [12]:
single_df.sort('t_pred_score', descending=True).head(2)

h_label,t_pred_label,h_name,r_label,t_pred_name,t_pred_score,edge_in_primekg
str,str,str,str,str,f64,bool
"""NCBI:7297""","""MONDO:1484""","""TYK2""","""associated with""","""paranoid schizophrenia""",5.615732,True
"""NCBI:7297""","""MONDO:13276""","""TYK2""","""associated with""","""Reynolds syndrome""",5.55585,True


## process test/valid batched data
* Scores aren't kept in the test/valid setup - it would be ridiculous to keep every possible prediction when making 1.6 million queries (x 129,000)
* If you want more than the top 100 results dumped, visit L193 in [run.py]('../../code/ULTRA/script/run.py'), modify the number, and re-run the model.

In [13]:
pl.read_parquet(osp.join(batched,'valid_0.parquet')).head(2)

h,r,t,h_pred,t_pred,h_rank,t_rank
i64,i64,i64,list[i64],list[i64],i64,i64
4548,0,1420,"[4115, 680, … 5985]","[2294, 10773, … 49463]",2,3
13859,0,1578,"[8500, 5459, … 6112]","[5014, 29849, … 1897]",1,1


In [None]:
# load in the batched predictions, 405,004 x 12 ...
# missing 1/4 of the edge predictions... probably something to do with parallelization
# in theory all items should of exported based on the machine rank.
# reccommend just running the model above with 1 gpu (much slower) to mitigate this bug
batched_df = data_util.load_and_translate_results(
        data_path=osp.dirname(batched), results_folder=osp.basename(batched)
    ).unique()

print(f"Batched results shape: {batched_df.shape}")

Batched results shape: (405004, 12)


In [15]:
# h_prediction and tail prediction aren't translated so we'll need to do so
batched_df.head(2)

h,r,t,h_pred,t_pred,h_rank,t_rank,h_label,t_label,r_label,h_name,t_name
i64,i64,i64,list[i64],list[i64],i64,i64,str,str,str,str,str
28601,1,27904,"[29914, 26869, … 3346]","[5496, 4203, … 35376]",1,1,"""MONDO:18682""","""HPO:12745""","""phenotype present""","""congenital insensitivity to pa…","""Short palpebral fissure"""
98,17,6240,"[16995, 13542, … 6458]","[23653, 13262, … 132]",1,1,"""DrugBank:DB12466""","""NCBI:5005""","""carrier""","""Favipiravir""","""ORM2"""


In [16]:
# create dictionary translation
id2ent, id2rel, ent2name = data_util.load_id_dict(osp.dirname(batched))

# translate the list by first replace number with id, then id with name/label
batched_df = batched_df.with_columns(
    pl.col("h_pred")
    .list.eval( # replace number with id
        pl.element().map_elements(lambda x: id2ent.get(x, x), return_dtype=pl.String)
    )
    .list.eval( # replace id with name/label
        pl.element().map_elements(lambda x: ent2name.get(x, x), return_dtype=pl.String)
    )
    .alias("h_pred_name"),
    pl.col('t_pred').list.eval(
        pl.element().map_elements(lambda x: id2ent.get(x, x), return_dtype=pl.String)
    ).list.eval(
        pl.element().map_elements(lambda x: ent2name.get(x, x), return_dtype=pl.String)
    ).alias('t_pred_name')
)[['h_name','r_label','t_name','h_rank','t_rank','h_pred_name','t_pred_name']]

batched_df.head(2)

h_name,r_label,t_name,h_rank,t_rank,h_pred_name,t_pred_name
str,str,str,i64,i64,list[str],list[str]
"""congenital insensitivity to pa…","""phenotype present""","""Short palpebral fissure""",1,1,"[""autosomal dominant Robinow syndrome"", ""C syndrome"", … ""growth delay due to insulin-like growth factor I resistance""]","[""Global developmental delay"", ""Intellectual disability"", … ""Corneal scarring""]"
"""Favipiravir""","""carrier""","""ORM2""",1,1,"[""collagen-containing extracellular matrix"", ""positive regulation of tumor necrosis factor production"", … ""negative regulation of apoptotic process""]","[""GDA"", ""PNP"", … ""small intestine""]"
