In [1]:
from rdflib import Graph, RDFS, URIRef
from collections import defaultdict
import textwrap
from rdflib.namespace import RDF, OWL, split_uri
import time
import os

def get_safe_max_new_tokens(prompt_text, model_id, context_window=8192, buffer=50):
    """
    Returns a safe max_new_tokens value so the total tokens (prompt + output)
    do not exceed the model's context window.

    Parameters:
    - prompt_text: str, the input prompt
    - model_id: str, the HF model identifier
    - context_window: int, max tokens the model supports (default 4096)
    - buffer: int, optional reserve tokens (for EOS etc.)

    Returns:
    - int: safe max_new_tokens value
    """
    tokenizer = AutoTokenizer.from_pretrained(model_id)
    prompt_tokens = len(tokenizer.encode(prompt_text, add_special_tokens=False))
    available_tokens = context_window - prompt_tokens - buffer

    if available_tokens < 0:
        print(f"⚠️ WARNING: Prompt exceeds context window by {-available_tokens} tokens.")
        return 0
    else:
        print(f"✅ Prompt uses {prompt_tokens} tokens. {available_tokens} tokens available for generation.")
        return available_tokens

# final_samples input
final_samples = {'http://example.org/src#samplingtime': ['12:49:00'],
 'http://example.org/src#patient_cpr': ['te64687489', 'c0cef4fadfd', 'dc44b505e4e', 'afedd9d7f0', 'cdse4751d0'],
 'http://example.org/src#analysiscode': ['DNK35312', 'NPU02070'],
 'http://example.org/src#laboratorium_idcode': ['UKN','OUI','ESB','HDI','KPL'],
 'http://example.org/src#referenceinterval_lowerlimit': ['50.0', '137.0','27.0'],
 'http://example.org/src#referenceinterval_upperlimit': ['30.0','7.5','105.0'],
 'http://example.org/src#unit': ['U/L', 'mL/min', '10^6/l', 'mg/g', 'mol/l'],
 'http://example.org/src#rekvirent_idtype': ['sorkode','sygehusafdelingsnummer','yaugethusgbdnummer','ydernummer'],
 'http://example.org/src#samplingdate': ['2010-12-07','2017-04-16','2023-10-27'],
 'http://example.org/src#resulttype': ['alfanumerisk', 'numerisk'],
 'http://example.org/src#value': ['00', 'A RhD pos', '>175', 'NEG', '137'],
 'http://example.org/src#operator': ['stoerre_end', 'mindre_end'],
 'http://example.org/src#resultvalidation': ['for_hoej'],
 'http://example.org/src#rekvirent_id': ['0123815','1789AFS4611','2000A005','6620378SKADE','990202']}


# RDF graph of the source and target ontologies
src_graph = Graph().parse("flat_src_onto.owl")
tgt_graph = Graph().parse("flat_tgt_onto.owl")

# Helper to compact URIs to src: or tgt: style
def compact_uri(uri):
    uri_str = str(uri)
    if uri_str.startswith("http://example.org/src#"):
        return f"src:{uri_str.split('#')[-1]}"
    elif uri_str.startswith("http://example.org/tgt#"):
        return f"tgt:{uri_str.split('#')[-1]}"
    elif uri_str.startswith("http://www.w3.org/2001/XMLSchema#"):
        return f"xsd:{uri_str.split('#')[-1]}"
    else:
        return uri_str

def generate_src_domain_description_with_classes(graph, final_samples, minimal=False, include_examples=False):
    lines = ["### Source Ontology Description (`src:`)\n"]

    # --- Add class descriptions ---
    lines.append("#### Classes\n")
    for cls in graph.subjects(RDF.type, OWL.Class):
        lines.append(f"- {compact_uri(cls)}")
    lines.append("")

    # --- Add property descriptions ---
    lines.append("#### Properties\n")
    for uri_str, examples in final_samples.items():
        uri = URIRef(uri_str)
        domain = graph.value(uri, RDFS.domain)
        range_ = graph.value(uri, RDFS.range)

        lines.append(f"- {compact_uri(uri)}")
        if domain:
            lines.append(f"  - Domain: {compact_uri(domain)}")
        if range_:
            lines.append(f"  - Range: {compact_uri(range_)}")

        if not minimal:
            label = graph.value(uri, RDFS.label)
            comment = graph.value(uri, RDFS.comment)
            if label:
                lines.append(f"  - Label: \"{str(label)}\"")
            if comment:
                lines.append(f"  - Comment: \"{str(comment)}\"")
            if include_examples and examples:
                example_line = textwrap.fill(', '.join(examples), width=80,
                                             initial_indent='  - Example values: ',
                                             subsequent_indent=' ' * 20)
                lines.append(example_line)
        lines.append("")

    return '\n'.join(lines)

def generate_tgt_domain_description_with_classes(graph, minimal=False):
    lines = ["### Target Ontology Description (`tgt:`)\n"]

    # --- Add class descriptions ---
    lines.append("#### Classes\n")
    for cls in graph.subjects(RDF.type, OWL.Class):
        lines.append(f"- {compact_uri(cls)}")
    lines.append("")

    # --- Add property descriptions ---
    lines.append("#### Properties\n")
    properties = set(graph.subjects(RDFS.domain, None)) | set(graph.subjects(RDFS.range, None))

    for uri in sorted(properties):
        domain = graph.value(uri, RDFS.domain)
        range_ = graph.value(uri, RDFS.range)

        lines.append(f"- {compact_uri(uri)}")
        if domain:
            lines.append(f"  - Domain: {compact_uri(domain)}")
        if range_:
            lines.append(f"  - Range: {compact_uri(range_)}")

        if not minimal:
            label = graph.value(uri, RDFS.label)
            comment = graph.value(uri, RDFS.comment)
            if label:
                lines.append(f"  - Label: \"{str(label)}\"")
            if comment:
                lines.append(f"  - Comment: \"{str(comment)}\"")
        lines.append("")

    return '\n'.join(lines)

# Generate descriptions including examples for the source
src_domain_str = generate_src_domain_description_with_classes(src_graph, final_samples, minimal=False, include_examples=True)
tgt_domain_str = generate_tgt_domain_description_with_classes(tgt_graph, minimal=False)

minimal_src_domain_str = generate_src_domain_description_with_classes(src_graph, final_samples, minimal=True, include_examples=False)
minimal_tgt_domain_str = generate_tgt_domain_description_with_classes(tgt_graph, minimal=True)

src_domain_str_wo_examples = generate_src_domain_description_with_classes(src_graph, final_samples, minimal=False, include_examples=False)

In [2]:
openai_key='' #insert API key

In [4]:
from openai import OpenAI
import time

client = OpenAI(api_key=openai_key)
gpt_model = "gpt-4.1"

def call_chatgpt(prompt, system_prompt):
    messages = [{"role": "system", "content": system_prompt},
                {"role": "user", "content": prompt}]
    try:
        response = client.chat.completions.create(
            model = gpt_model,
            max_completion_tokens=8192,
            messages = messages,
        )
        return response.choices[0].message.content.strip()
    except Exception as e:
        return str(e)

| Model                   | Context Window       |
| ----------------------- | -------------------- |
| `gpt-3.5-turbo`         | 16,385 tokens        |
| `gpt-4` (standard)      | 8,192 tokens         |
| `gpt-4-32k`             | 32,768 tokens        |
| `gpt-4o` (latest mo| 128,000 tokens       | ✅ |


### 1. Matching Generation

In [6]:
match_system_prompt = "You are an expert in ontology matching."
match_prompt = f"""
## Task: Ontology Matching

Given a **Source Ontology** (`src:`) and a **Target Ontology** (`tgt:`), identify semantic matches: corresponding classes and properties between the ontologies. Your task is to find and list all **semantic matches**—pairs of concepts that refer to the same or closely related ideas.

---

### 🧠 What to Match
- Match classes or properties with the **same or closely related meaning**
- Use similarities in **labels, comments, domains, ranges, and names**
- Include only clear, meaningful correspondences

- **Matchings may include**:
  - 1:1 — One source concept ≈ One target concept  
  - 1:N — One source concept ≈ Multiple target concepts  
  - N:1 — Multiple source concepts ≈ One target concept  
  - M:N — Multiple source concepts ≈ Multiple target concepts  

- When more than one match exists for a concept, include **each match on its own line**
- If no suitable target exists for a source concept, **omit it from the output**

---

### 🧮 Matching Procedure

Iterate systematically:

- For each `src:` concept in the Source Ontology:
  - Compare it to each `tgt:` concept in the Target Ontology.
  - If a semantic match is found, output a line in this format:
    `src:ConceptA ≈ tgt:ConceptB`
  - If multiple matches exist for a concept, output one line per match.
  - If no match exists, skip the concept.

---

### 🔍 What Qualifies as a Semantic Match
A **semantic match** exists when two concepts refer to the same or equivalent meaning — even if they differ in structure or representation. Valid matches include:
- One concept being a **component** of another (e.g., `src:firstname` vs. `tgt:fullname`)
- Cases where **converting**, **parsing**, or **value transformation** would be required
- Multiple target concepts jointly representing a source concept

You are not required to define the transformation — only to identify that a match exists.

---

#### ✅ Examples of Valid Matches
- src:birthDate ≈ tgt:date_of_birth  
- src:birthDate ≈ tgt:personAge # Requires calculating age from birth date
- src:height_m ≈ tgt:personHeight_feet # Requires unit conversion (meters → feet)
- src:employmentStatus ≈ tgt:employment_status # Requires a lookup: src: uses integers (e.g. 1, 2), tgt: uses strings (e.g. "employed", "unemployed")

---

### 🗂️ Ontologies

**Source Ontology** (`src:`):  
{src_domain_str}

**Target Ontology** (`tgt:`):  
{tgt_domain_str}

---

### 📤 Output Format

Each line must contain:
- A `src:` element
- The ≈ symbol
- A `tgt:` element (or ∅ if unmatched)
 
❗ Output only a list of matched pairs using this exact format:
- src:PropertyA ≈ tgt:PropertyB  
- src:ClassX ≈ tgt:ClassY  

✅ End your response **after you have iterated over all concepts in the Source Ontology**.

- Only output the matches.
- Output a Python list with each line as an element.
"""

In [7]:
# call chatgpt
start_time = time.time()
gpt4_response_match = call_chatgpt(match_prompt, match_system_prompt)
response_time_match = time.time() - start_time

print(f"Response Time: {response_time_match:.2f} seconds")
print("Response:\n", gpt4_response_match)

Response Time: 5.95 seconds
Response:
 [
"src:Measurement ≈ tgt:Measurement",
"src:samplingtime ≈ tgt:measurement_time",
"src:samplingtime ≈ tgt:measurement_datetime",
"src:patient_cpr ≈ tgt:person_id",
"src:analysiscode ≈ tgt:measurement_source_concept_id",
"src:analysiscode ≈ tgt:measurement_source_value",
"src:laboratorium_idcode ≈ tgt:provider_id",
"src:referenceinterval_lowerlimit ≈ tgt:range_low",
"src:referenceinterval_upperlimit ≈ tgt:range_high",
"src:unit ≈ tgt:unit_source_value",
"src:samplingdate ≈ tgt:measurement_date",
"src:samplingdate ≈ tgt:measurement_datetime",
"src:value ≈ tgt:measurement_source_value",
"src:value ≈ tgt:value_as_number",
"src:value ≈ tgt:value_as_concept_id",
"src:operator ≈ tgt:operator_concept_id"
]


### 2. Mapping Generation

In [9]:
mapping_system_prompt="You are a knowledge representation expert."

mapping_prompt = f"""
## Task: Ontology Alignment via First-Order Logic (FOL)

Given a source ontology and a target ontology, produce a complete set of logical alignments between them. These alignments describe how to semantically transform data from the source to the target. Express all mappings using first-order logic (FOL).

---

### 🗂️ Ontology Context

The source ontology includes example values for its datatype properties. The target ontology includes structural definitions only.

**Source Ontology** (`src:`):  
{src_domain_str}

**Target Ontology** (`tgt:`):  
{tgt_domain_str}

---

### ✅ Requirements:

#### 🔗 Matched Elements

Only generate mappings for the following matched properties and classes:
{gpt4_response_match}

---

#### 🔄 Mapping Relationships

- Mappings may include:
  - 1:1 mappings (direct correspondence)
  - 1:2 or 2:1 mappings (split or combine)
  - n:m mappings (many-to-many relationships)
- Mappings may involve intermediate transformation steps such as:
  - Concatenating multiple values
  - Decomposing or parsing a value into structured parts
  - Mapping string or code values to concept identifiers via lookup tables
- In addition to properties, identify and align source and target classes.
- When instances of a source class (e.g., `src:ClassA`) should be transformed into instances of a corresponding target class (e.g., `tgt:ClassB`), include this as a class-level mapping:

  ∀x (src:ClassA(x) → tgt:ClassB(x))

---

#### 🔧 Parsing Literal Values

- When a literal value must be parsed into multiple structured components (e.g., extracting an operator and a number from a threshold string), assume a helper function exists to do so.
- Select an appropriate name for the parsing function using the format:

  `parse_{{value_type}}` or `parse_{{semantic_task}}`

- Examples (naming conventions only):
  - `parse_threshold`
  - `parse_date_range`
  - `parse_value_with_unit`

- These parsing functions are to be treated as opaque and deterministic. Use them directly in the logic expressions.

---

#### 📚 Lookup Functions for Concept Resolution

- When values such as codes, units, or symbolic strings need to be mapped to concept identifiers in the target ontology, assume the existence of a lookup function.
- Name the function according to the target resource it returns:

  `lookup_{{target_resource_name}}(input_value)`

- Examples (naming conventions only):
  - `lookup_concept_id`
  - `lookup_unit_concept`
  - `lookup_operator_concept`

- These functions return target concept identifiers and can be used directly in logical rules.

---

### 📘 Illustrative Examples (Structure Only)

The examples below demonstrate the structure of valid alignment rules and usage of helper functions. They are not tied to any specific ontology or domain.

∀x,v (src:hasValue(x,v) ∧ parse_threshold(v,op,n) → tgt:operatorConceptId(x, lookup_operator_concept(op)) ∧ tgt:valueAsNumber(x,n))

∀x,c (src:hasCode(x,c) → tgt:conceptId(x, lookup_concept_id(c)))

---

### 📤 Output Format

- Produce a list of distinct, universally quantified first-order logic (FOL) rules that define how source predicates map to target predicates.
- Follow this structure:

  ∀x,y,... (src:Triple1 ∧ src:Triple2 → tgt:Triple3 ∧ tgt:Triple4 ...)

- Use predicates of the form `src:propertyName` and `tgt:propertyName`.
- Each rule must be:
  - Syntactically valid
  - Semantically meaningful
  - Logically sound

- Only output the FOL rules.
- Begin your response immediately with the first FOL rule.
"""

In [10]:
start_time = time.time()
gpt4_response_map = call_chatgpt(mapping_prompt, mapping_system_prompt)
response_time_map = time.time() - start_time

print(f"Response Time: {response_time_map:.2f} seconds")
print("Response:\n", gpt4_response_map)

Response Time: 8.16 seconds
Response:
 ∀x (src:Measurement(x) → tgt:Measurement(x))

∀x,t (src:samplingtime(x,t) → tgt:measurement_time(x,t))

∀x,t (src:samplingtime(x,t) ∧ src:samplingdate(x,d) ∧ combine_date_time(d,t,dt) → tgt:measurement_datetime(x,dt))

∀x,d (src:samplingdate(x,d) → tgt:measurement_date(x,d))

∀x,d (src:samplingdate(x,d) ∧ src:samplingtime(x,t) ∧ combine_date_time(d,t,dt) → tgt:measurement_datetime(x,dt))

∀x,p (src:patient_cpr(x,p) → tgt:person_id(x,lookup_person_id(p)))

∀x,a (src:analysiscode(x,a) → tgt:measurement_source_concept_id(x,lookup_concept_id(a)))

∀x,a (src:analysiscode(x,a) → tgt:measurement_source_value(x,a))

∀x,l (src:laboratorium_idcode(x,l) → tgt:provider_id(x,lookup_provider_id(l)))

∀x,low (src:referenceinterval_lowerlimit(x,low) → tgt:range_low(x,to_decimal(low)))

∀x,high (src:referenceinterval_upperlimit(x,high) → tgt:range_high(x,to_decimal(high)))

∀x,u (src:unit(x,u) → tgt:unit_source_value(x,u))

∀x,u (src:unit(x,u) → tgt:unit_concept_i

### FGF Generation

In [11]:
fgf_system_prompt="You are an expert Python developer specializing in RDF transformations."

fgf_prompt = f"""
Your task is to generate **Fact Generating Functions (FGFs)** — Python functions that use RDFLib to transform RDF triples from a source graph (`src_graph`) into a target graph (`tgt_graph`) based on a formal ontology alignment.

---

### 🔁 Definition of FGFs

Each FGF:
- Handles a specific **source class** (e.g., `src:Person`).
- Iterates over all instances of that class in `src_graph`.
- Creates a corresponding instance in `tgt_graph` using the mapped target class.
- Transfers property values using explicitly defined mappings.
- Skips any information that is not mapped.
- Uses the `SRC` and `TGT` namespaces to build URIs.
- Ensures that domain and range constraints of the **target ontology** are respected.
- Follows RDFLib syntax and best practices.

---

### 📑 Ontology Mappings

The `mappings_block` below consists of first-order logic (FOL) alignment rules describing how source classes and properties map to target ontology structures.

```text
{gpt4_response_map}
```

#### Ontology Domain and Range (for validation)

**Source Ontology** (`src:`):
{minimal_src_domain_str}

## Target ontology (`tgt:`):
{minimal_tgt_domain_str}

### ✅ Instructions

For each source class that appears as the subject of a class-level alignment:
1. Write a single Python function that:
   - Finds all instances of the source class in `src_graph`.
   - Constructs a new instance of the mapped target class in `tgt_graph`.
   - Transfers mapped properties using the logic in mappings_block.
2. Use RDFLib's `URIRef`, `Literal`, and `RDF.type` as appropriate.
3. Construct URIs using `SRC` and `TGT` namespaces.
4. Do not include unmapped properties or relationships.
5. Ensure all triples conform to the target ontology's domain and range constraints.
6. You may call helper functions like parse_threshold() or lookup_concept_id() if referenced in the mappings.
7. Each function must be standalone, valid Python, and directly executable in a transformation pipeline.

---

### ⚠️ Output Constraint

Only output the Python function(s) corresponding to the mapped classes.   
Output code only.
"""

In [12]:
start_time = time.time()
gpt4_response_fgf = call_chatgpt(fgf_prompt, fgf_system_prompt)
response_time_fgf = time.time() - start_time

print(f"Response Time: {response_time_fgf:.2f} seconds")
print("Response:\n", gpt4_response_fgf)

Response Time: 23.83 seconds
Response:
 ```python
from rdflib import URIRef, Literal, RDF, XSD
from rdflib.namespace import Namespace
from datetime import datetime, date, time

SRC = Namespace("http://example.org/src#")
TGT = Namespace("http://example.org/tgt#")

def combine_date_time(date_val, time_val):
    """Combine xsd:date and xsd:time Literal into xsd:dateTime string."""
    if not (isinstance(date_val, Literal) and isinstance(time_val, Literal)):
        return None
    try:
        dt = datetime.combine(date.fromisoformat(str(date_val)), time.fromisoformat(str(time_val)))
        return Literal(dt.isoformat(), datatype=XSD.dateTime)
    except Exception:
        return None

def to_decimal(val):
    """Converts a value which can be string, int etc. to a decimal-formatted Literal."""
    try:
        return Literal(float(val), datatype=XSD.decimal)
    except Exception:
        return None

def is_numeric(val):
    """Checks if given value (Literal) is numeric; if so, returns n

## Evaluation

### Matchings evaluation

In [13]:
ground_truth_matchings = [
    "src:patient_cpr ≈ tgt:person_id",
    "src:laboratorium_idcode ≈ tgt:provider_id",
    "src:rekvirent_id ≈ tgt:visit_occurence_id",
    "src:rekvirent_id ≈ tgt:visit_detail_id",
    "src:samplingdate ≈ tgt:measurement_date",
    "src:samplingdate ≈ tgt:measurement_datetime",
    "src:samplingtime ≈ tgt:measurement_datetime",
    "src:samplingtime ≈ tgt:measurement_time",
    "src:analysiscode ≈ tgt:measurement_source_concept_id",
    "src:analysiscode ≈ tgt:measurement_concept_id",
    "src:unit ≈ tgt:unit_source_value",
    "src:unit ≈ tgt:unit_concept_id",
    "src:value ≈ tgt:measurement_source_value",
    "src:value ≈ tgt:value_as_concept_id",
    "src:value ≈ tgt:value_as_number",
    "src:operator ≈ tgt:operator_concept_id",
    "src:referenceinterval_lowerlimit ≈ tgt:range_low",
    "src:referenceinterval_upperlimit ≈ tgt:range_high",
    "src:resultvalidation ≈ tgt:value_as_concept_id",
    "src:rekvirent_idtype ≈ tgt:measurement_type_concept_id",
    "src:resulttype ≈ ∅",
    "src:Measurement ≈ tgt:Measurement"
]

matchings_response = [
"src:Measurement ≈ tgt:Measurement",
"src:samplingtime ≈ tgt:measurement_time",
"src:samplingtime ≈ tgt:measurement_datetime",
"src:patient_cpr ≈ tgt:person_id",
"src:analysiscode ≈ tgt:measurement_source_concept_id",
"src:analysiscode ≈ tgt:measurement_source_value",
"src:laboratorium_idcode ≈ tgt:provider_id",
"src:referenceinterval_lowerlimit ≈ tgt:range_low",
"src:referenceinterval_upperlimit ≈ tgt:range_high",
"src:unit ≈ tgt:unit_source_value",
"src:samplingdate ≈ tgt:measurement_date",
"src:samplingdate ≈ tgt:measurement_datetime",
"src:value ≈ tgt:measurement_source_value",
"src:value ≈ tgt:value_as_number",
"src:value ≈ tgt:value_as_concept_id",
"src:operator ≈ tgt:operator_concept_id"
]

gt_matchings_curated = [
"src:Measurement ≈ tgt:Measurement",
"src:laboratorium_idcode ≈ tgt:provider_id",
"src:rekvirent_id ≈ tgt:visit_occurence_id",
"src:rekvirent_id ≈ tgt:visit_detail_id",
"src:samplingdate ≈ tgt:measurement_date",
"src:samplingdate ≈ tgt:measurement_datetime",
"src:samplingtime ≈ tgt:measurement_datetime",
"src:samplingtime ≈ tgt:measurement_time",
"src:analysiscode ≈ tgt:measurement_source_concept_id",
"src:analysiscode ≈ tgt:measurement_concept_id",
"src:unit ≈ tgt:unit_source_value",
"src:unit ≈ tgt:unit_concept_id",
"src:value ≈ tgt:value_as_number",
"src:value ≈ tgt:value_as_concept_id",
"src:value ≈ tgt:measurement_source_value",
"src:operator ≈ tgt:operator_concept_id",
"src:referenceinterval_lowerlimit ≈ tgt:range_low",
"src:referenceinterval_upperlimit ≈ tgt:range_high",
"src:resultvalidation ≈ tgt:value_as_concept_id",
"src:rekvirent_idtype ≈ tgt:measurement_type_concept_id"]

import ast
import re

def parse_llm_list_output(raw_text: str):
    try:
        # Step 1: Remove ```python and ``` (if present)
        cleaned = raw_text.strip()
        cleaned = re.sub(r"^```python\s*", "", cleaned)
        cleaned = re.sub(r"\s*```$", "", cleaned)

        # Step 2: Evaluate as literal Python
        result = ast.literal_eval(cleaned)

        # Step 3: Ensure list of strings
        if isinstance(result, list):
            return [item.strip() for item in result if isinstance(item, str)]
        else:
            raise ValueError("Parsed object is not a list.")
    except Exception as e:
        print(f"Error parsing list: {e}")
        return []

gpt4_matches = parse_llm_list_output(gpt4_response_match)

print(gpt4_matches)

def evaluate_alignment(ground_truth_list, predicted_list):
    ground_truth_set = set(map(str.strip, ground_truth_list))
    predicted_set = set(map(str.strip, predicted_list))

    true_positives = ground_truth_set & predicted_set
    false_positives = predicted_set - ground_truth_set
    false_negatives = ground_truth_set - predicted_set

    precision = len(true_positives) / len(predicted_set) if predicted_set else 0
    recall = len(true_positives) / len(ground_truth_set) if ground_truth_set else 0
    f1 = (2 * precision * recall) / (precision + recall) if (precision + recall) else 0

    return {
        "precision": precision,
        "recall": recall,
        "f1_score": f1,
        "true_positives": sorted(true_positives),
        "false_positives": sorted(false_positives),
        "false_negatives": sorted(false_negatives),
    }

result = evaluate_alignment(gt_matchings_curated, gpt4_matches)

print(f"Precision: {result['precision']:.2f}")
print(f"Recall: {result['recall']:.2f}")
print(f"F1 Score: {result['f1_score']:.2f}")

['src:Measurement ≈ tgt:Measurement', 'src:samplingtime ≈ tgt:measurement_time', 'src:samplingtime ≈ tgt:measurement_datetime', 'src:patient_cpr ≈ tgt:person_id', 'src:analysiscode ≈ tgt:measurement_source_concept_id', 'src:analysiscode ≈ tgt:measurement_source_value', 'src:laboratorium_idcode ≈ tgt:provider_id', 'src:referenceinterval_lowerlimit ≈ tgt:range_low', 'src:referenceinterval_upperlimit ≈ tgt:range_high', 'src:unit ≈ tgt:unit_source_value', 'src:samplingdate ≈ tgt:measurement_date', 'src:samplingdate ≈ tgt:measurement_datetime', 'src:value ≈ tgt:measurement_source_value', 'src:value ≈ tgt:value_as_number', 'src:value ≈ tgt:value_as_concept_id', 'src:operator ≈ tgt:operator_concept_id']
Precision: 0.88
Recall: 0.70
F1 Score: 0.78


### Mappings Evaluation

In [15]:
ground_truth_fol_rules = [
    "∀x (src:Measurement(x) → tgt:Measurement(x))",
    "∀x,c (src:patient_cpr(x,c) → tgt:person_id(x, hash_to_int(c)))",
    "∀x,l (src:laboratorium_idcode(x,l) → tgt:provider_id(x, hash_to_int(l)))",
    "∀x,t (src:samplingtime(x,t) → tgt:measurement_time(x,t))",
    "∀x,d (src:samplingdate(x,d) → tgt:measurement_date(x,d))",
    "∀x,d,t (src:samplingdate(x,d) ∧ src:samplingtime(x,t) → tgt:measurement_datetime(x, combine_date_time(d, t)))",
    "∀x,a (src:analysiscode(x,a) → tgt:measurement_source_value(x,a))",
    "∀x,a (src:analysiscode(x,a) → tgt:measurement_concept_id(x, lookup_measurement_concept(a)))",
    "∀x,u (src:unit(x,u) → tgt:unit_source_value(x,u))",
    "∀x,u (src:unit(x,u) → tgt:unit_concept_id(x, lookup_unit_concept(u)))",
    "∀x,v (src:value(x,v) ∧ is_numeric(v) → tgt:value_as_number(x, v))",
    "∀x,v (src:value(x,v) ∧ is_categorical(v) → tgt:value_as_concept_id(x, lookup_concept_id(v)))",
    "∀x,v (src:value(x,v) → tgt:value_source_value(x,v))",
    "∀x,o (src:operator(x,o) → tgt:operator_concept_id(x, lookup_operator_concept(o)))",
    "∀x,r (src:resultvalidation(x,r) → tgt:value_as_concept_id(x, lookup_concept_id(r)))",
    "∀x,m (src:rekvirent_idtype(x,m) → tgt:measurement_type_concept_id(x, lookup_concept_id(m)))",
    "∀x,lo (src:referenceinterval_lowerlimit(x,lo) → tgt:range_low(x, lo))",
    "∀x,hi (src:referenceinterval_upperlimit(x,hi) → tgt:range_high(x, hi))"
]

ground_truth_fol_rules_curated = [
    "∀x (src:Measurement(x) → tgt:Measurement(x))",
    "∀x,p (src:patient_cpr(x,p) → tgt:person_id(x,lookup_person_id(p)))",
    "∀x,l (src:laboratorium_idcode(x,l) → tgt:provider_id(x,lookup_provider_id(l)))",
    "∀x,t (src:samplingtime(x,t) → tgt:measurement_time(x,t))",
    "∀x,d (src:samplingdate(x,d) → tgt:measurement_date(x,d))",
    "∀x,d,t,dt (src:samplingdate(x,d) ∧ src:samplingtime(x,t) ∧ combine_date_time(d,t,dt) → tgt:measurement_datetime(x,dt))",
    "∀x,d (src:samplingdate(x,d) ∧ src:samplingtime(x,t) ∧ combine_date_time(d,t,dt) → tgt:measurement_datetime(x,dt))",
    "∀x,c (src:analysiscode(x,c) → tgt:measurement_source_value(x,c))",
    "∀x,a (src:analysiscode(x,a) → tgt:measurement_source_concept_id(x,lookup_concept_id(a)))",
    "∀x,u (src:unit(x,u) → tgt:unit_source_value(x,u))",
    "∀x,u (src:unit(x,u) → tgt:unit_concept_id(x,lookup_unit_concept(u)))",
    "∀x,v (src:value(x,v) → tgt:measurement_source_value(x,v))",
    "∀x,v (src:value(x,v) ∧ is_numeric(v,n) → tgt:value_as_number(x,to_decimal(n)))",
    "∀x,v (src:value(x,v) ∧ ¬is_numeric(v,_) → tgt:value_as_concept_id(x,lookup_concept_id(v)))",
    "∀x,o (src:operator(x,o) → tgt:operator_concept_id(x,lookup_operator_concept(o)))",
    "∀x,r (src:resultvalidation(x,r) → tgt:value_as_concept_id(x, lookup_concept_id(r)))",
    "∀x,m (src:rekvirent_idtype(x,m) → tgt:measurement_type_concept_id(x, lookup_concept_id(m)))",
    "∀x,low (src:referenceinterval_lowerlimit(x,low) → tgt:range_low(x,to_decimal(low)))",
    "∀x,high (src:referenceinterval_upperlimit(x,high) → tgt:range_high(x,to_decimal(high)))"

]

In [14]:
print(gpt4_response_map)

∀x (src:Measurement(x) → tgt:Measurement(x))

∀x,t (src:samplingtime(x,t) → tgt:measurement_time(x,t))

∀x,t (src:samplingtime(x,t) ∧ src:samplingdate(x,d) ∧ combine_date_time(d,t,dt) → tgt:measurement_datetime(x,dt))

∀x,d (src:samplingdate(x,d) → tgt:measurement_date(x,d))

∀x,d (src:samplingdate(x,d) ∧ src:samplingtime(x,t) ∧ combine_date_time(d,t,dt) → tgt:measurement_datetime(x,dt))

∀x,p (src:patient_cpr(x,p) → tgt:person_id(x,lookup_person_id(p)))

∀x,a (src:analysiscode(x,a) → tgt:measurement_source_concept_id(x,lookup_concept_id(a)))

∀x,a (src:analysiscode(x,a) → tgt:measurement_source_value(x,a))

∀x,l (src:laboratorium_idcode(x,l) → tgt:provider_id(x,lookup_provider_id(l)))

∀x,low (src:referenceinterval_lowerlimit(x,low) → tgt:range_low(x,to_decimal(low)))

∀x,high (src:referenceinterval_upperlimit(x,high) → tgt:range_high(x,to_decimal(high)))

∀x,u (src:unit(x,u) → tgt:unit_source_value(x,u))

∀x,u (src:unit(x,u) → tgt:unit_concept_id(x,lookup_unit_concept(u)))

∀x,v (src

In [16]:
# 1. Define Equivalence Mappings
function_equivalents = {
    "lookup_operator_concept": "lookup_operator_concept",
    "lookup_concept_id": "lookup_measurement_concept",
    "lookup_person_id": "hash_to_int",
    "lookup_provider_id": "hash_to_int",
    "parse_decimal": None  # Assume it’s acceptable noise
}

predicate_equivalents = {
    #"measurement_source_concept_id": "measurement_source_value",
    "value_as_concept_id": "value_as_concept_id",
    "person_id": "person_id",  # redundant but keeps format consistent
}

# 2. Normalize Rules
import re

def normalize_fol_rule(rule, func_map, pred_map):
    # 1. Remove whitespace redundancy
    rule = re.sub(r'\s+', ' ', rule.strip())

    # 2. Substitute helper functions
    for model_func, canon_func in func_map.items():
        if canon_func:
            rule = re.sub(rf'\b{model_func}\b', canon_func, rule)
        else:
            # Strip functions like parse_decimal(x) → x
            rule = re.sub(rf'{model_func}\(([^)]+)\)', r'\1', rule)

    # 3. Substitute predicates
    for model_pred, canon_pred in pred_map.items():
        rule = re.sub(rf'\btgt:{model_pred}\b', f'tgt:{canon_pred}', rule)
        rule = re.sub(rf'\bsrc:{model_pred}\b', f'src:{canon_pred}', rule)

    # 4. Normalize variable names
    var_match = re.match(r'^∀([^\(]+)\(', rule)
    if var_match:
        # Extract original variable list
        var_list = [v.strip() for v in var_match.group(1).split(',')]
        var_map = {old: f'x{i+1}' for i, old in enumerate(var_list)}

        # Replace variables consistently in the rule
        for old, new in var_map.items():
            rule = re.sub(rf'\b{old}\b', new, rule)

    return rule


def evaluate_normalized_rules(gt_rules, llm_rules, func_map, pred_map):
    norm_gt = set(normalize_fol_rule(r, func_map, pred_map) for r in gt_rules)
    norm_llm = set(normalize_fol_rule(r, func_map, pred_map) for r in llm_rules)

    true_positives = norm_gt & norm_llm
    false_positives = norm_llm - norm_gt
    false_negatives = norm_gt - norm_llm

    precision = len(true_positives) / len(norm_llm) if norm_llm else 0
    recall = len(true_positives) / len(norm_gt) if norm_gt else 0
    f1 = (2 * precision * recall) / (precision + recall) if (precision + recall) else 0

    return {
        "precision": precision,
        "recall": recall,
        "f1_score": f1,
        "true_positives": sorted(true_positives),
        "false_positives": sorted(false_positives),
        "false_negatives": sorted(false_negatives)
    }

llm_rules = [r.strip() for r in gpt4_response_map.split("\n") if r.strip()]
results = evaluate_normalized_rules(ground_truth_fol_rules_curated, llm_rules, function_equivalents, predicate_equivalents)

print(f"Precision: {results['precision']:.2f}")
print(f"Recall: {results['recall']:.2f}")
print(f"F1 Score: {results['f1_score']:.2f}")

Precision: 0.94
Recall: 0.84
F1 Score: 0.89


### Evaluate FGF Generation

In [17]:
# 1. Clean and extract the Python code from gpt4_response_fgf
def extract_and_define_fgf(gpt4_response_fgf: str):
    # Remove ```python ... ``` if present
    cleaned_code = re.sub(r"^```python\n?", "", gpt4_response_fgf)
    cleaned_code = re.sub(r"\n?```$", "", cleaned_code)

    # 2. Define a local namespace for safe execution
    local_namespace = {}

    # 3. Execute the cleaned code in the local namespace
    exec(cleaned_code, globals(), local_namespace)

    # 4. Return the extracted transform_measurement function
    return local_namespace.get("transform_measurement", None)

# Example usage:
transform_measurement = extract_and_define_fgf(gpt4_response_fgf)

# Check if it worked
if transform_measurement:
    print("✅ FGF function extracted and loaded.")
else:
    print("❌ FGF function not found.")

✅ FGF function extracted and loaded.


In [18]:
print(transform_measurement)

<function transform_measurement at 0x00000217CF6E3010>


In [19]:
# deal with missing rdf:type src:Measurement statements

from rdflib import Graph, URIRef, RDF, Namespace

# Load the source graph
src_graph = Graph()
src_graph.parse("src_data_graph_new.nt", format="nt")

# Define the namespace
SRC = Namespace("http://example.org/src#")

# Find all unique measurement subjects
measurement_subjects = set(src_graph.subjects())

# Create a new graph for type declarations
type_graph = Graph()

# Add rdf:type triples for each subject
for subj in measurement_subjects:
    type_graph.add((subj, RDF.type, SRC.Measurement))

src_graph += type_graph

In [58]:
def lookup_concept_id(val_lit):
    """
    Map a categorical value (rdflib term or plain str) to a
    measurement_concept_id and return it as Literal(xsd:integer).
    Returns None when no mapping is known.
    """
    val = str(val_lit).strip()


    # 1️⃣ direct table lookup
    cid = SOURCE_CODE_TO_CONCEPT_ID.get(val)
    print(cid)
    if cid is not None:
        return Literal(int(cid), datatype=XSD.integer)

    # 2️⃣ heuristic prefixes
    if val.lower().startswith(("pos")):
        return Literal(45884084, datatype=XSD.integer)   # positive
    if val.lower().startswith(("neg")):
        return Literal(45878583, datatype=XSD.integer)   # negative
    if val.lower().startswith(("normal")):
        return Literal(45884153, datatype=XSD.integer)   # normal

    # 3️⃣ special VIST rule
    if "VIST" in val:
        cid = 45877985 if val.startswith("P") else 45880296
        return Literal(cid, datatype=XSD.integer)

    # 🔚 no match
    return None

print(type(lookup_concept_id('NPU26678')))

3002529
<class 'rdflib.term.Literal'>


In [68]:
#Step 1: Set up RDFLib and Namespaces
from rdflib import Graph, Namespace, Literal, URIRef
from rdflib.namespace import RDF, XSD

SRC = Namespace("http://example.org/src#")
TGT = Namespace("http://example.org/tgt#")

import pandas as pd

# Load lookup tables
cpr_df = pd.read_csv("cpr_to_person_id.csv")
labid_df = pd.read_csv("labid_to_provider_id.csv")
conversion_df = pd.read_csv("npu_loinc_conversion_table_expanded.csv")

# Step 2: Define the lookup and helper functions

def parse_decimal(value):
    try:
        return float(value)
    except ValueError:
        return None

def lookup_person_id(cpr):
    key = str(cpr).strip().lower()               # normalise incoming CPR

    # do the comparison on a normalised helper column
    if "cpr_lc" not in cpr_df.columns:
        cpr_df["cpr_lc"] = cpr_df["cpr"].astype(str).str.strip().str.lower()

    result = cpr_df.loc[cpr_df["cpr_lc"] == key, "person_id"].dropna()

    if not result.empty:
        try:
            return Literal(int(float(result.values[0])), datatype=XSD.integer)
        except ValueError:
            pass                                  # person_id is not numeric

    return None

def lookup_provider_id(rekvirent_id):
    """
    Map `rekvirent_id` to provider_id and return it as
    Literal(xsd:integer).  Returns None when no mapping exists.

    Parameters
    ----------
    rekvirent_id : rdflib term or str
        The lab-ID value coming from the source graph.
    df : pandas.DataFrame
        Dataframe with columns 'labid_value' and 'provider_id'.

    Examples
    --------
    >>> lookup_provider_id(Literal(" LAB123 "))
    rdflib.term.Literal('789', datatype=rdflib.term.URIRef('http://www.w3.org/2001/XMLSchema#integer'))
    """
    df=labid_df
    # 1️⃣ normalise the incoming key once
    key = str(rekvirent_id).strip().lower()

    # 2️⃣ create & reuse a normalised helper column on first call
    if "labid_value_lc" not in df.columns:
        df["labid_value_lc"] = (
            df["labid_value"].astype(str).str.strip().str.lower()
        )

    matches = df.loc[df["labid_value_lc"] == key, "provider_id"].dropna()

    if not matches.empty:
        try:
            pid = int(float(matches.iloc[0]))          # handles "123.0" → 123
            return Literal(pid, datatype=XSD.integer)
        except ValueError:
            # provider_id is not numeric; fall through to 'None'
            pass

    return None

from rdflib import Literal
from rdflib.namespace import XSD
from datetime import datetime

def combine_date_time(date_literal, time_literal):
    try:
        # Convert RDFLib Literals to Python date and time strings
        date_str = str(date_literal)
        time_str = str(time_literal)

        # Combine into a datetime object
        dt = datetime.fromisoformat(f"{date_str}T{time_str}")

        # Return as xsd:dateTime formatted string
        return dt.isoformat()
    except Exception as e:
        # Return None if formatting fails
        return None

# Mapping from sourceCode to measurement_concept_id
SOURCE_CODE_TO_CONCEPT_ID = {
    "NPU01682": 3013466,
    "NPU20197": 3002582,
    "NPU20198": 3018095,
    "NPU02070": 3021960,
    "NPU01435": 40771025,
    "NPU19917": 3020924,
    "NPU27591": 3021337,
    "NPU01992": 3020191,
    "NPU26678": 3002529,
    "NPU01700": 3009035,
    "NPU16065": 3023421,
    "NPU03429": 3019550,
    "NPU03577": 3009201,
    "NPU02319": 40762351,
    "NPU19661": 3034485,
    "NPU27412": 3013826,
    "NPU03230": 3023103,
    "NPU12033": 3017143,
    "DNK35302": 46236975,
    "NPU19651": 3006923,
    "NPU01944": 3024731,
    "DNK35312": 1259611,
    "NPU19748": 3020460,
    "DNK35131": 46236975,
    "NPU18016": 3020564,
    "NPU19673": 3024561,
    "NPU01568": 3001308,
    "NPU01349": 3006315,
    "NPU27783": 3035995
}

SOURCE_UNIT_TO_UNIT_ID = {
    "s": 8555.0,
    "10^6/l": 9442.0,
    "nmol/L": 8736.0,
    "ng/L": 8725.0,
    "IU/L": 9254.0,
    "mg/g": 9364.0,
    "mmol/L": 8753.0,
    "U/L": 9442.0,
    "mol/L": 8729.0,
    "mL/min": 8795.0,  
}

def lookup_measurement_source_concept_id(code_lit):
    """
    Given an rdflib Literal for the analysis code (sourceCode),
    return the associated measurement_concept_id as an int,
    or None if not found.
    """
    code_str = str(code_lit)
    return SOURCE_CODE_TO_CONCEPT_ID.get(code_str)

def lookup_unit_concept(unit_lit):
    """
    Map a unit label (rdflib term or plain str) to the corresponding
    OMOP unit_concept_id.

    Returns
    -------
    rdflib.Literal
        • Literal(<concept_id>, xsd:integer)  when a mapping exists
        • Literal("nan", xsd:string)          when no mapping is found
          (choose a different fallback if you prefer)
    """
    # 1️⃣ canonicalise the key: string, trimmed, upper-case
    key = str(unit_lit).strip().upper()

    # 2️⃣ look it up in your mapping table
    cid = SOURCE_UNIT_TO_UNIT_ID.get(key)

    if cid is not None:
        try:
            return Literal(int(cid), datatype=XSD.integer)
        except (ValueError, TypeError):
            # cid wasn’t numeric; fall through to the fallback literal
            pass

    # 3️⃣ fallback literal so Graph.add() still receives a valid Node
    return Literal("nan", datatype=XSD.string)

def parse_numeric(val_lit):
    """
    Given an rdflib Literal containing a string representation of a number,
    return it as an int or float. Returns None if parsing fails.
    """
    val_str = str(val_lit).strip()
    try:
        if '.' in val_str:
            return float(val_str)
        else:
            return int(val_str)
    except ValueError:
        return None

CATEGORICAL_VALUE_TO_CONCEPT_ID = {
    # Positive (case-insensitive startswith)
    "POS": 45884084,
    "Pos": 45884084,
    "pos": 45884084,

    # Negative (case-insensitive startswith)
    "NEG": 45878583,
    "Neg": 45878583,
    "neg": 45878583,

    # Normal (case-insensitive startswith)
    "NORMAL": 45884153,
    "Normal": 45884153,
    "normal": 45884153,

    # Exact matches
    "VÆKST": 36032835,
    "INGEN VÆKST": 42530718,
    "FORHØJET": 1620380,

    # VIST handling is dynamic, handled in logic
}

def parse_categorical(val_lit):
    """
    Determines if a value is categorical by checking the known dictionary or special patterns.
    """
    val = str(val_lit).strip()

    # Check exact match
    if val in CATEGORICAL_VALUE_TO_CONCEPT_ID:
        return True

    # Check startswith patterns (POS, NEG, NORMAL)
    if any(val.startswith(prefix) for prefix in ["POS", "Pos", "pos", "NEG", "Neg", "neg", "NORMAL", "Normal", "normal"]):
        return True

    # Check special VIST case
    if "VIST" in val:
        return True

    return False


def lookup_concept_id(val_lit):
    """
    Map a categorical value (rdflib term or plain str) to a
    measurement_concept_id Literal(xsd:integer).  Returns None if no mapping.
    """

    # ① make it a clean, comparable string
    val = str(val_lit).strip()          # works for Literal, URIRef, str
    low = val.lower()                   # for case-insensitive tests

    # ② exact match on a *normalised* key (e.g. uppercase)
    cid = SOURCE_CODE_TO_CONCEPT_ID.get(val) or \
          SOURCE_CODE_TO_CONCEPT_ID.get(val.upper()) or \
          SOURCE_CODE_TO_CONCEPT_ID.get(low)
    if cid is not None:
        return Literal(int(cid), datatype=XSD.integer)

    # ③ heuristic prefixes (now guaranteed to be a string)
    if low.startswith("pos"):
        return Literal(45884084, datatype=XSD.integer)   # positive
    if low.startswith("neg"):
        return Literal(45878583, datatype=XSD.integer)   # negative
    if low.startswith("normal"):
        return Literal(45884153, datatype=XSD.integer)   # normal

    # ④ special VIST rule
    if "VIST" in val:
        cid = 45877985 if val.startswith("P") else 45880296
        return Literal(cid, datatype=XSD.integer)

    # ⑤ **fallback** – no mapping
    return Literal("nan", datatype=XSD.string)


OPERATOR_TEXT_TO_CONCEPT_ID = {
    "mindre_end": 4171756,   # "<" (less than)
    "stoerre_end": 4172704,  # ">" (greater than)
}

from rdflib import Literal, XSD

def lookup_operator_concept(op_lit):
    """
    Map a source operator label (rdflib term or str) to its
    OMOP operator_concept_id and return it as Literal(xsd:integer).

    When the text is not recognised, the function returns
    Literal("nan", xsd:string) so the caller still gets a valid Node.
    """

    # ① canonicalise input: string, trimmed, upper-case
    key = str(op_lit).strip().upper()

    # ② try the exact mapping
    cid = OPERATOR_TEXT_TO_CONCEPT_ID.get(key)
    if cid is not None:
        try:
            return Literal(int(cid), datatype=XSD.integer)
        except (ValueError, TypeError):
            # mapping exists but isn't numeric – fall through to fallback
            pass

    # ③ fallback literal keeps rdflib happy
    return Literal("nan", datatype=XSD.string)


#Step 3: Paste the FGF function (transform_measurement)
#Use the previously extracted FGF function and update it to use the above helpers

from rdflib import Namespace, RDF, URIRef, Literal
from rdflib.namespace import XSD

# ---------------------------------------------------------------------------
# Namespaces (customise the base URIs so they match your actual ontologies)
# ---------------------------------------------------------------------------
SRC = Namespace("http://example.org/src#")
TGT = Namespace("http://example.org/tgt#")

# ---------------------------------------------------------------------------
# Helper --------------------------------------------------------------------
# ---------------------------------------------------------------------------
def _local_name(uri, namespace):
    """
    Return the local part of `uri` relative to `namespace`.
    If the URI is outside the namespace, fall back to the last path fragment.
    """
    ns_str = str(namespace)
    uri_str = str(uri)
    return uri_str[len(ns_str):] if uri_str.startswith(ns_str) else uri_str.rsplit("/", 1)[-1]

# ---------------------------------------------------------------------------
# Main Fact-Generating Function ---------------------------------------------
# ---------------------------------------------------------------------------
from rdflib import URIRef, Literal, RDF, XSD
from rdflib.namespace import Namespace
from datetime import datetime, date, time

SRC = Namespace("http://example.org/src#")
TGT = Namespace("http://example.org/tgt#")

def combine_date_time(date_val, time_val):
    """Combine xsd:date and xsd:time Literal into xsd:dateTime string."""
    if not (isinstance(date_val, Literal) and isinstance(time_val, Literal)):
        return None
    try:
        dt = datetime.combine(date.fromisoformat(str(date_val)), time.fromisoformat(str(time_val)))
        return Literal(dt.isoformat(), datatype=XSD.dateTime)
    except Exception:
        return None

def to_decimal(val):
    """Converts a value which can be string, int etc. to a decimal-formatted Literal."""
    try:
        return Literal(float(val), datatype=XSD.decimal)
    except Exception:
        return None

def is_numeric(val):
    """Checks if given value (Literal) is numeric; if so, returns number as string, else None."""
    try:
        n = float(str(val))
        return True, str(n)
    except Exception:
        return False, None

# this is the generated FGF
def transform_measurement(src_graph, tgt_graph):
    """
    FGF for src:Measurement aligned to tgt:Measurement.
    Transfers mapped properties with required transformation.
    """
    for meas in src_graph.subjects(RDF.type, SRC.Measurement):
        tgt_graph.add((meas, RDF.type, TGT.Measurement))
        
        # src:samplingtime(x,t) → tgt:measurement_time(x,t)
        for _, _, t in src_graph.triples((meas, SRC.samplingtime, None)):
            tgt_graph.add((meas, TGT.measurement_time, Literal(t, datatype=XSD.time)))

        # src:samplingdate(x,d) → tgt:measurement_date(x,d)
        for _, _, d in src_graph.triples((meas, SRC.samplingdate, None)):
            tgt_graph.add((meas, TGT.measurement_date, Literal(d, datatype=XSD.date)))
        
        # src:samplingdate + src:samplingtime → tgt:measurement_datetime(x,dt)
        d = next((o for _, _, o in src_graph.triples((meas, SRC.samplingdate, None))), None)
        t = next((o for _, _, o in src_graph.triples((meas, SRC.samplingtime, None))), None)
        if d is not None and t is not None:
            dt = combine_date_time(d, t)
            if dt is not None:
                tgt_graph.add((meas, TGT.measurement_datetime, dt))

        # src:patient_cpr(x,p) → tgt:person_id(x,lookup_person_id(p))
        for _, _, p in src_graph.triples((meas, SRC.patient_cpr, None)):
            tgt_graph.add((meas, TGT.person_id, lookup_person_id(p)))
        
        # src:analysiscode(x,a) → tgt:measurement_source_concept_id(x,lookup_concept_id(a)), tgt:measurement_source_value(x,a)
        for _, _, a in src_graph.triples((meas, SRC.analysiscode, None)):
            tgt_graph.add((meas, TGT.measurement_source_concept_id, lookup_concept_id(a)))
            tgt_graph.add((meas, TGT.measurement_source_value, Literal(a, datatype=XSD.string)))

        # src:laboratorium_idcode(x,l) → tgt:provider_id(x,lookup_provider_id(l))
        for _, _, l in src_graph.triples((meas, SRC.laboratorium_idcode, None)):
            tgt_graph.add((meas, TGT.provider_id, lookup_provider_id(l)))
        
        # src:referenceinterval_lowerlimit(x,low) → tgt:range_low(x,to_decimal(low))
        for _, _, low in src_graph.triples((meas, SRC.referenceinterval_lowerlimit, None)):
            d = to_decimal(low)
            if d is not None:
                tgt_graph.add((meas, TGT.range_low, d))
        
        # src:referenceinterval_upperlimit(x,high) → tgt:range_high(x,to_decimal(high))
        for _, _, high in src_graph.triples((meas, SRC.referenceinterval_upperlimit, None)):
            d = to_decimal(high)
            if d is not None:
                tgt_graph.add((meas, TGT.range_high, d))
        
        # src:unit(x,u) → tgt:unit_source_value(x,u), tgt:unit_concept_id(x,lookup_unit_concept(u))
        for _, _, u in src_graph.triples((meas, SRC.unit, None)):
            tgt_graph.add((meas, TGT.unit_source_value, Literal(u, datatype=XSD.string)))
            tgt_graph.add((meas, TGT.unit_concept_id, lookup_unit_concept(u)))

        # src:value(x,v) → tgt:measurement_source_value(x,v)
        for _, _, v in src_graph.triples((meas, SRC.value, None)):
            tgt_graph.add((meas, TGT.measurement_source_value, Literal(v, datatype=XSD.string)))

            # src:value(x,v) ∧ is_numeric(v,n) → tgt:value_as_number(x,to_decimal(n))
            isnum, n = is_numeric(v)
            if isnum:
                num_lit = to_decimal(n)
                if num_lit is not None:
                    tgt_graph.add((meas, TGT.value_as_number, num_lit))
            else:
                # src:value(x,v) ∧ ¬is_numeric(v,_) → tgt:value_as_concept_id(x,lookup_concept_id(v))
                tgt_graph.add((meas, TGT.value_as_concept_id, lookup_concept_id(v)))

        # src:operator(x,o) → tgt:operator_concept_id(x,lookup_operator_concept(o))
        for _, _, o in src_graph.triples((meas, SRC.operator, None)):
            tgt_graph.add((meas, TGT.operator_concept_id, lookup_operator_concept(o)))

# Step 4: Generate the Target Graph
# Create the target graph
tgt_graph = Graph()

# Run the FGF function
transform_measurement(src_graph, tgt_graph)

# Optional: Save the result
#tgt_graph.serialize('openai_generated_target_graph.nt', format='nt')
print(len(tgt_graph))

484


In [69]:
from rdflib import Graph, Namespace, RDF, Literal, XSD
from collections import defaultdict
import pandas as pd

def evaluate_fgf_metrics(ground_truth: Graph, generated: Graph):
    # Step 1: Gather all triples per predicate
    gt_by_pred = defaultdict(set)
    gen_by_pred = defaultdict(set)

    for s, p, o in ground_truth:
        gt_by_pred[p].add((s, p, o))
    for s, p, o in generated:
        gen_by_pred[p].add((s, p, o))

    # Step 2: Identify all predicates across both graphs
    all_predicates = set(gt_by_pred.keys()) | set(gen_by_pred.keys())

    # Step 3: Compute per-predicate TP, FP, FN and metrics
    records = []
    total_tp = total_fp = total_fn = 0

    for pred in all_predicates:
        gt_triples = gt_by_pred.get(pred, set())
        gen_triples = gen_by_pred.get(pred, set())

        tp = len(gt_triples & gen_triples)
        fp = len(gen_triples - gt_triples)
        fn = len(gt_triples - gen_triples)

        precision = tp / (tp + fp) if (tp + fp) > 0 else 0.0
        recall = tp / (tp + fn) if (tp + fn) > 0 else 0.0
        f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0.0

        total_tp += tp
        total_fp += fp
        total_fn += fn

        records.append({
            "predicate": str(pred),
            "TP": tp,
            "FP": fp,
            "FN": fn,
            "Precision": precision,
            "Recall": recall,
            "F1": f1
        })

    # Step 4: Compute micro-average
    micro_precision = total_tp / (total_tp + total_fp) if (total_tp + total_fp) > 0 else 0.0
    micro_recall = total_tp / (total_tp + total_fn) if (total_tp + total_fn) > 0 else 0.0
    micro_f1 = 2 * micro_precision * micro_recall / (micro_precision + micro_recall) if (micro_precision + micro_recall) > 0 else 0.0

    # Step 5: Compute macro-average
    macro_precision = sum(r["Precision"] for r in records) / len(records)
    macro_recall = sum(r["Recall"] for r in records) / len(records)
    macro_f1 = sum(r["F1"] for r in records) / len(records)

    # Add summary row
    records.append({
        "predicate": "MICRO_AVG",
        "TP": total_tp,
        "FP": total_fp,
        "FN": total_fn,
        "Precision": micro_precision,
        "Recall": micro_recall,
        "F1": micro_f1
    })
    records.append({
        "predicate": "MACRO_AVG",
        "TP": None,
        "FP": None,
        "FN": None,
        "Precision": macro_precision,
        "Recall": macro_recall,
        "F1": macro_f1
    })

    return pd.DataFrame(records)

gt_graph = Graph()
gt_graph.parse("ground_truth_target_graph.nt", format="nt")

# Define your namespace mappings
ground_truth_ns = "https://loinc/measurement/"
generated_ns = "http://example.org/tgt#"

# Prepare subject mapping: order-based matching
src_subjects = sorted(set(src_graph.subjects()), key=lambda x: str(x))
gt_subjects = sorted(set(gt_graph.subjects()), key=lambda x: str(x))
subject_map = dict(zip(gt_subjects, src_subjects))

# Rewrite all triple components: subject alignment + namespace normalization
aligned_gt = Graph()
for s, p, o in gt_graph:
    # Subject replacement (by aligned subject if available)
    s = subject_map.get(s, s)

    # Namespace normalization
    def replace_ns(term):
        if isinstance(term, URIRef) and str(term).startswith(ground_truth_ns):
            return URIRef(str(term).replace(ground_truth_ns, generated_ns))
        return term

    new_s = replace_ns(s)
    new_p = replace_ns(p)
    new_o = replace_ns(o)

    aligned_gt.add((new_s, new_p, new_o))

# Find all unique measurement subjects
measurement_subjects = set(aligned_gt.subjects())

# Create a new graph for type declarations
type_graph = Graph()

# Add rdf:type triples for each subject
for subj in measurement_subjects:
    type_graph.add((subj, RDF.type, TGT.Measurement))

aligned_gt += type_graph

# Remove all triples with the bad URI, and re-add them with the corrected URI
def remove_trailing_slash_from_properties(graph):
    to_add = []
    to_remove = []

    for s, p, o in graph:
        if str(p).endswith("/"):
            new_p = URIRef(str(p).rstrip("/"))
            to_remove.append((s, p, o))
            to_add.append((s, new_p, o))

    for triple in to_remove:
        graph.remove(triple)
    for triple in to_add:
        graph.add(triple)

    return graph

def replace_subject_namespace(graph, old_ns, new_ns):
    updated_graph = graph.__class__()  # same type (likely rdflib.Graph)

    for s, p, o in graph:
        # Only update if subject starts with old namespace
        if isinstance(s, URIRef) and str(s).startswith(str(old_ns)):
            new_s = URIRef(str(s).replace(str(old_ns), str(new_ns), 1))
        else:
            new_s = s
        updated_graph.add((new_s, p, o))

    return updated_graph

aligned_gt = remove_trailing_slash_from_properties(aligned_gt)
aligned_gt = replace_subject_namespace(aligned_gt, SRC, TGT)

tgt_graph = replace_subject_namespace(tgt_graph, SRC, TGT)

df_results = evaluate_fgf_metrics(aligned_gt, tgt_graph)
display(df_results)  # or df_results.to_csv("eval_metrics.csv")

Failed to convert Literal lexical form to value. Datatype=http://www.w3.org/2001/XMLSchema#integer, Converter=<class 'int'>
Traceback (most recent call last):
  File "C:\Users\ramme\anaconda3\lib\site-packages\rdflib\term.py", line 2119, in _castLexicalToPython
    return conv_func(lexical)  # type: ignore[arg-type]
ValueError: invalid literal for int() with base 10: '8753.0'
Failed to convert Literal lexical form to value. Datatype=http://www.w3.org/2001/XMLSchema#integer, Converter=<class 'int'>
Traceback (most recent call last):
  File "C:\Users\ramme\anaconda3\lib\site-packages\rdflib\term.py", line 2119, in _castLexicalToPython
    return conv_func(lexical)  # type: ignore[arg-type]
ValueError: invalid literal for int() with base 10: '44777588.0'
Failed to convert Literal lexical form to value. Datatype=http://www.w3.org/2001/XMLSchema#integer, Converter=<class 'int'>
Traceback (most recent call last):
  File "C:\Users\ramme\anaconda3\lib\site-packages\rdflib\term.py", line 2119, 

Unnamed: 0,predicate,TP,FP,FN,Precision,Recall,F1
0,http://example.org/tgt#measurement_datetime,38.0,0.0,72.0,1.0,0.345455,0.513514
1,http://example.org/tgt#provider_id,38.0,0.0,1.0,1.0,0.974359,0.987013
2,http://example.org/tgt#measurement_source_conc...,0.0,38.0,0.0,0.0,0.0,0.0
3,http://example.org/tgt#measurement_source_value,38.0,38.0,1.0,0.5,0.974359,0.66087
4,http://example.org/tgt#unit_concept_id,0.0,25.0,37.0,0.0,0.0,0.0
5,http://example.org/tgt#measurement_id,0.0,0.0,39.0,0.0,0.0,0.0
6,http://example.org/tgt#unit_source_value,25.0,0.0,0.0,1.0,1.0,1.0
7,http://example.org/tgt#person_id,38.0,0.0,1.0,1.0,0.974359,0.987013
8,http://example.org/tgt#measurement_concept_id,0.0,0.0,37.0,0.0,0.0,0.0
9,http://example.org/tgt#measurement_time,38.0,0.0,1.0,1.0,0.974359,0.987013


In [73]:
df_metrics = (
    df_results
        # 1️  keep only the local name after the last “/” or “#”
        .assign(predicate=lambda d:
                d["predicate"].str.replace(r".*[/#]", "", regex=True))

        # 2️  drop the raw count columns
        .drop(columns=["TP", "FP", "FN"], errors="ignore")

        # 3️  round/format *only* for presentation
        .round(2)           # keeps the numeric dtype
)

latex_code = df_metrics.to_latex(index=False,  # drop the row numbers
                         escape=True, # keep e.g. % or _ as-is
                         float_format=lambda x: f"{x:.2f}",
                         na_rep="")    # how to show NaNs
print(latex_code)

\begin{tabular}{lrrr}
\toprule
predicate & Precision & Recall & F1 \\
\midrule
measurement\_datetime & 1.00 & 0.35 & 0.51 \\
provider\_id & 1.00 & 0.97 & 0.99 \\
measurement\_source\_concept\_id & 0.00 & 0.00 & 0.00 \\
measurement\_source\_value & 0.50 & 0.97 & 0.66 \\
unit\_concept\_id & 0.00 & 0.00 & 0.00 \\
measurement\_id & 0.00 & 0.00 & 0.00 \\
unit\_source\_value & 1.00 & 1.00 & 1.00 \\
person\_id & 1.00 & 0.97 & 0.99 \\
measurement\_concept\_id & 0.00 & 0.00 & 0.00 \\
measurement\_time & 1.00 & 0.97 & 0.99 \\
type & 1.00 & 0.97 & 0.99 \\
measurement\_date & 1.00 & 0.97 & 0.99 \\
range\_high & 0.00 & 0.00 & 0.00 \\
range\_low & 0.00 & 0.00 & 0.00 \\
value\_as\_number & 0.00 & 0.00 & 0.00 \\
value\_as\_concept\_id & 0.22 & 0.67 & 0.33 \\
measurement\_type\_concept\_id & 0.00 & 0.00 & 0.00 \\
value\_source\_value & 0.00 & 0.00 & 0.00 \\
operator\_concept\_id & 0.00 & 0.00 & 0.00 \\
MICRO\_AVG & 0.61 & 0.48 & 0.54 \\
MACRO\_AVG & 0.41 & 0.41 & 0.39 \\
\bottomrule
\end{tabular}

