# Experimental notebook for document structured knowledge extraction

This is the fourth iteration of the pipeline, the first one to be pushed in the repo, and thus the first official version. Version history will now be handled by git.


In [None]:
# Standard library imports
import json
import time
import re
from typing import List
from pydantic import BaseModel
from rdflib import Graph, Namespace, Literal
from pyshacl import validate
import datetime
import yaml
import os


# Local imports
from src.llm_utils import initialize_gemini_client, call_gemini, call_gemini_pdf, call_gemini_json, with_retries
from src.graph_utils import visualize_graph, get_semantic_hash, validate_shacl_syntax
from src.parsing_utils import read_txt, read_json
from src.testing_utils import parse_validation_report, apply_mutations, flush_context_to_csv

### Initialization


In [12]:
GEMINI_MODEL = "gemini-2.5-pro"
DOCUMENT_NAME = "student_housing"
CSV_FILE = "Master_Results.csv"

initialize_gemini_client(model_name=GEMINI_MODEL)

current_run_id = 1 # TODO: this will be part of the framework later, fetched from the csv last entry


## Preparation: Get everything ready for logging


In [3]:
# This function creates a blank slate for a single run
def initialize_run_context(run_id, doc_name, model_name):
    return {
        # --- Metadata ---
        "Run ID": run_id,
        "Document Name": doc_name,
        "Timestamp": datetime.datetime.now().isoformat(sep=" ", timespec="seconds"),
        "Model Name": model_name,
        
        # --- Pipeline Artifacts (Placeholders) ---
        "Service Graph Hash": "N/A",
        "SHACL Graph Hash": "N/A",
        "SHACL Valid Syntax": False,
        "SHACL Error Type": "N/A",
        "SHACL Error Message": "N/A",
        
        # --- Scenario Specifics (Will be overwritten per scenario) ---
        "Scenario ID": "N/A",
        "Scenario Description": "N/A",
        "Expected Violation Count": 0,
        "Actual Violation Count": 0,
        "Violated Shapes": [],
        "Violation Messages": [],
        "Raw Validation Report": "N/A",
        
        # --- Execution Stats ---
        "Execution Time": 0.0,
        "Successfully Executed": False,
    }
    
ctx = initialize_run_context(current_run_id, DOCUMENT_NAME, GEMINI_MODEL)


## Phase 1: Public service modeling


In [None]:
# We begin the outer loop here
# e.g. for current_run_id in range(last_run_id, last_run_id + N):

execution_start_time = time.time()

current_run_id = 999 # temporary override. TODO: this will be handleded by the core pipeine function

# Create artifact directory for this run
artifact_dir = f"Testing Artifacts/RUN_{current_run_id}_{DOCUMENT_NAME}/" 
if not os.path.exists(artifact_dir):
    os.makedirs(artifact_dir)
else:
    raise FileExistsError(f"Artifact directory {artifact_dir} already exists. Aborting to prevent overwriting.")



### 1.1 Document → Preconditions Summary

Use LLM to summarize the document into a list of preconditions.

In [5]:
file_path = f"Precondition documents/{DOCUMENT_NAME}.pdf"

prompt = read_txt('Prompts/summarization.txt')

preconditions_summary = with_retries(call_gemini_pdf, prompt, file_path)
print(preconditions_summary)

# Save artifact
with open(f"{artifact_dir}{DOCUMENT_NAME} preconditions summary.txt", "w") as f:
    f.write(preconditions_summary)

Title: Student Housing Allowance
Conditions:
- The student must be a Greek citizen or a citizen of another European Union country.
- The student must be enrolled in an undergraduate program at a Higher Education Institution (AEI) to obtain their first degree.
- The student must have successfully passed exams in at least half of the required courses for the preceding academic year.
- The student must possess a valid academic ID.
- The student's annual family income for the previous tax year must not exceed €30,000. This limit is increased by €3,000 for each dependent child after the first.
- The student must be renting a property in a city other than their city of permanent residence due to their studies.
- The student's rental lease must have a duration of at least six months.
- In the student's city of permanent residence, neither the student nor their parents can have full ownership or usufruct of a property.
- The total area of properties (owner-occupied or rented out) owned by the 

### 1.2. Preconditions Summary + Citizen Schema (TTL) → Information Model (JSON)



In [None]:
preconditions_summary = read_txt(f"{artifact_dir}{DOCUMENT_NAME} preconditions summary.txt")
citizen_schema = read_txt(f"Citizens/{DOCUMENT_NAME} schema.ttl")

class Paths(BaseModel):
    path: List[str]
    datatype: str
    
class InformationConcept(BaseModel):
    name: str
    related_paths: List[Paths]  # links the concept to citizen data available
    
class Constraint(BaseModel):
    name: str
    desc: str
    constrains: List[InformationConcept]  

schema = list[Constraint]

# Formulate prompt content and call Gemini
prompt = read_txt('Prompts/preconditions_to_JSON.txt')
content = [prompt, preconditions_summary, citizen_schema]

info_model = with_retries(call_gemini_json, content, schema)

# Save artifact
with open(f"{artifact_dir}{DOCUMENT_NAME} information model.json", "w") as f:
    f.write(info_model)

### 1.3 Information Model (JSON) → Public Service Graph (TTL)

Use deterministic code to turn the JSON into a knowledge graph using TTL syntax.

In [None]:
PREFIXES = """@prefix ex: <http://example.org/> .
@prefix cccev: <http://data.europa.eu/m8g/> .
@prefix cpsv: <http://purl.org/vocab/cpsv#> .
@prefix dct: <http://purl.org/dc/terms/> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .

"""

# Parse JSON string
info_model = read_json(f"{artifact_dir}{DOCUMENT_NAME} information model.json", raw=True)

# Get service name. As per the prompt, it's in the first line of the preconditions summary file
with open(f"{artifact_dir}{DOCUMENT_NAME} preconditions summary.txt") as f:
    line = f.readline()
    service_name = re.findall(r'Title: (.+)', line)[0].strip().replace(" ", "_")

triples = [PREFIXES]
triples.append(f"ex:{service_name} a cpsv:PublicService .\n\n")

# Convert constraints + concepts into triples
for constraint in info_model:
    constraint_name = constraint["name"]
    constraint_desc = constraint["desc"].replace('"', '\\"')

    # Public service -> holdsRequirement -> constraint
    triples.append(f"ex:{service_name} cpsv:holdsRequirement ex:{constraint_name} .\n")

    # Constraint node
    triples.append(f'ex:{constraint_name} a cccev:Constraint ; dct:description "{constraint_desc}" .\n')

    # InformationConcept nodes
    for concept in constraint.get("constrains", []):
        concept_name = concept["name"]

        # Link constraint to concept
        triples.append(f"ex:{constraint_name} cccev:constrains ex:{concept_name} .\n")

        # Declare information concept
        triples.append(f'ex:{concept_name} a cccev:InformationConcept .\n')

    triples.append("\n")  # spacing for readability

triples_string = "".join(triples)

# Save artifact
with open(f"{artifact_dir}{DOCUMENT_NAME} service graph.ttl", "w") as f:
    f.write(triples_string)   
     
# Log 
ctx["Service Graph Hash"] = get_semantic_hash(triples_string)

### 1.4. Graph Visualization / Inspection

Visualize part of the knowledge graph to more easily inspect correct structure and logic.

In [None]:
# Render graph of the public service
visualize_graph(f"{artifact_dir}{DOCUMENT_NAME} service graph.ttl")

## Phase 2: SHACL Rule Generation


### 2.1. Information Model (JSON) → SHACL-spec (JSON)

Use deterministic code on the JSON from before to make a new intermediate JSON that contains only the necessary information to construct SHACL shapes, one for each constraint.

In [None]:
# Load the information model JSON
info_model = read_json(f"{artifact_dir}{DOCUMENT_NAME} information model.json", raw=True)

shacl_spec_json = []

for constraint in info_model:
    # 1. Rename for clarity downstream
    shape_name = constraint["name"].replace("_condition", "_shape")
    desc = constraint["desc"]
    
    concepts = []
    
    # 2. Iterate concepts (e.g., family_income, residency_city)
    for concept in constraint.get("constrains", []):
        related_paths = []
        
        paths_source = concept.get("related_paths", []) 
        
        for rp in paths_source:
            # Capture the path AND the datatype (URI vs Literal)
            related_paths.append({
                "path": rp["path"],
                "datatype": rp["datatype"] 
            })
            
        concepts.append({
            "name": concept["name"],
            "related_paths": related_paths
        })
    
    shacl_spec_json.append({
        "shape_name": shape_name,
        "desc": desc,
        "concepts": concepts
    })

# Save artifact
with open(f"{artifact_dir}{DOCUMENT_NAME} shacl-spec.json", "w") as f:
    json.dump(shacl_spec_json, f, indent=2)

### 2.2. SHACL-spec (JSON) + Citizen Schema (TTL) → SHACL Shapes (TTL)



In [None]:
# Load JSON as string 
shacl_spec_json = read_json(f"{artifact_dir}{DOCUMENT_NAME} shacl-spec.json")
citizen_schema = read_txt(f"Citizens/{DOCUMENT_NAME} schema.ttl")

prompt = read_txt('Prompts/shacl_spec_to_shacl_ttl.txt')
content = [prompt, shacl_spec_json, citizen_schema]

shacl_shapes = with_retries(call_gemini, content)

# Cleanup gemini markdown formatting
shacl_shapes = shacl_shapes.strip("`").replace("turtle", "").replace("ttl", "").strip()

# Save artifact
with open(f"{artifact_dir}{DOCUMENT_NAME} shacl shapes.ttl", "w") as f:
    f.write(shacl_shapes)
    
# Log
ctx["SHACL Graph Hash"] = get_semantic_hash(shacl_shapes)
is_valid, error_stage, error_message = validate_shacl_syntax(shacl_shapes)
ctx["SHACL Valid Syntax"] = is_valid
ctx["SHACL Error Type"] = error_stage
ctx["SHACL Error Message"] = error_message


## Phase 3: Citizen - Service Modeling


### 3.1 Public Service Graph (TTL) + Citizen Graph (TTL) + Information Model (JSON) → Citizen-Service Graph (TTL) 

We expand the Public Service Graph to include Citizen data, properly connected with edges.

In [None]:
EX = Namespace("http://example.org/")
SC = Namespace("http://example.org/schema#")

# Load service and citizen ttl's and info model
public_service_ttl = f"{artifact_dir}{DOCUMENT_NAME} service graph.ttl"
citizen_ttl = f"Citizens/{DOCUMENT_NAME} eligible.ttl"
info_model = read_json(f"{artifact_dir}{DOCUMENT_NAME} information model.json", raw=True)

# Realize them into graphs
g = Graph()
g.parse(public_service_ttl, format="turtle")
citizen_g = Graph()
citizen_g.parse(citizen_ttl, format="turtle")

# Merge citizen triples into main graph
for t in citizen_g:
    g.add(t)
    
# Automatically determine the root citizen node 
root_candidates = list(citizen_g.subjects(predicate=None, object=SC.Applicant))
citizen_root = root_candidates[0]

# Helper: resolve node paths (return nodes, not literals) 
def resolve_node_path(citizen_g, root_uri, path_list, datatype):
    
    # 1. Determine how deep to go
    if datatype == "URI":
        # For Identity logic (City, Person), the Value IS the Node.
        traversal_parts = path_list
    else:
        # For Value logic (Income, Area), the Value is a Literal. Stop one step BEFORE the literal to get the Node holding it.
        traversal_parts = path_list[:-1]

    # 2. Traverse
    current_nodes = {root_uri}
    
    for part in traversal_parts:
        next_nodes = set()
        pred = SC[part] # Assumes our schema matches the namespace
        
        for node in current_nodes:
            # Find all objects connected by this predicate
            for obj in citizen_g.objects(node, pred):
                # Safety check: Ensure we don't accidentally traverse into a Literal 
                # (unless it's the final step of a URI path, but usually URIs point to URIs)
                if isinstance(obj, Literal) and datatype == "URI":
                     continue # Skip weird data errors
                next_nodes.add(obj)
        
        current_nodes = next_nodes
        
        # Optimization: If dead end, stop early
        if not current_nodes:
            return set()

    return current_nodes

# Add mapsTo edges  
for constraint in info_model:
    for concept in constraint["constrains"]:
        concept_uri = EX[concept["name"]]

        for path_obj in concept["related_paths"]: 
            path_list = path_obj["path"]
            dtype = path_obj["datatype"] 
            
            # Pass the datatype to the resolver
            subject_nodes = resolve_node_path(citizen_g, citizen_root, path_list, dtype)

            for subj in subject_nodes:
                # Connect the Information Concept to the Data Node
                g.add((concept_uri, EX.mapsTo, subj))

# Serialize unified graph into ttl and save to file
g.serialize(f"{artifact_dir}{DOCUMENT_NAME} citizen-service graph.ttl", format="turtle")

<Graph identifier=N1b3b3494b4634305b57f0f1232329d47 (<class 'rdflib.graph.Graph'>)>

### 3.2 Visualize the unified graph

We reuse the same function from before.

In [None]:
visualize_graph(f"{artifact_dir}{DOCUMENT_NAME} citizen-service graph.ttl")

In [None]:
# This marks the end of the main pipeline. 
ctx["Execution Time"] = round(time.time() - execution_start_time)


## Phase 4: Citizen Validation and Scenarios

### SHACL Shape Validation


Begin the scenario loop, writing to csv after every scenario.

In [None]:
# Load the Golden Citizen (Baseline)
golden_ttl = f"Citizens/{DOCUMENT_NAME} eligible.ttl"
golden_graph = Graph()
golden_graph.parse(golden_ttl, format="turtle")

# Load the Scenarios from YAML
with open(f"Citizens/{DOCUMENT_NAME} scenarios.yaml", "r") as f:
    scenarios = yaml.safe_load(f)

# Iterate and Apply
for scn in scenarios:
    ctx["Scenario ID"] = scn['id']
    ctx["Scenario Description"] = scn['description']
    ctx["Expected Violation Count"] = scn['expected_violation_count']
    
    # Apply mutations (Returns a NEW graph, leaving golden_graph untouched)
    mutated_graph = apply_mutations(golden_graph, scn['actions'])

    # Proceed to Validation 
    conforms, results_graph, results_text = validate(
        data_graph=mutated_graph,
        shacl_graph=f"{artifact_dir}{DOCUMENT_NAME} shacl shapes.ttl",
        inference='rdfs',
    )
    
    # Parse validation report
    parse_result = parse_validation_report(conforms, results_graph, results_text)
    ctx["Actual Violation Count"] = parse_result["violation_count"]
    ctx["Violated Shapes"] = parse_result["failed_shapes"]
    ctx["Violation Messages"] = parse_result["messages"]

    # If we made it this far
    ctx["Successfully Executed"] = True
    
    flush_context_to_csv(ctx, CSV_FILE) #  TODO this will be outside the try except block as finally
    
# Report End of the run to console
print(f"Logged Run {ctx['Run ID']} to CSV.") 


Logged Run 1 to CSV.
