### iTerms: Deconstructing ToS with LLMs
Defining legal requirements as code enables users to automatically and more efficiently assess compliance with service restrictions. However, non-technical users face significant challenges when translating and understanding these clauses in existing formal languages. With iTerms, we propose a first approach that leverages LLMs as a bridge to translate Terms of Service clauses into the Terms of Service Language (TOSL).

In [None]:
! pip install langchain
! pip install langchain-openai
! pip install langchain-community
! pip install langchain-core
! pip install pdfplumber
! pip install rdflib requests
! pip install -U "openai>=1.46.0" "langchain>=0.2.12" "langchain-core>=0.2.34" "langchain-openai>=0.1.20"

In [None]:
import os
import json
from langchain_openai import ChatOpenAI

os.environ["OPENAI_API_KEY"] = ""
model = ChatOpenAI(model="gpt-5", temperature=0)

# Knowledge Graph Construction
## Phase 1: Classification and Metadata Extraction

In the first phase, each clause is analyzed to identify its nature and key semantic components.  

- **Input:** use case to be analyzed and the clause in natural language  
- **Processing:** classification of the clause and extraction of relevant metadata: `type`, `party`, `action`, and `asset`. These elements are aligned with the TOSL/ODRL vocabulary.  
- **Output:** use case enriched with structured metadata

In [None]:
DEFAULT_ACTIONS = [
    'tosl:allowDownload','tosl:appeal','tosl:assign','tosl:claim','tosl:consent','tosl:develop','tosl:evaluate','tosl:integrate','tosl:procedure','tosl:publish','tosl:remove','tosl:terminate','tosl:test',
    'odrl:acceptTracking','odrl:aggregate','odrl:anonymize','odrl:annotate','odrl:archive','odrl:attribute','odrl:compensate','odrl:concurrentUse','odrl:delete','odrl:derive','odrl:digitize','odrl:display','odrl:distribute','odrl:ensureExclusivity','odrl:execute','odrl:extract','odrl:give','odrl:grantUse','odrl:include','odrl:index','odrl:inform','odrl:install','odrl:modify','odrl:move','odrl:nextPolicy','odrl:obtainConsent','odrl:play','odrl:present','odrl:print','odrl:read','odrl:reproduce','odrl:reviewPolicy','odrl:sell','odrl:stream','odrl:synchronize','odrl:textToSpeech','odrl:transfer','odrl:transform','odrl:translate','odrl:uninstall','odrl:use','odrl:watermark',
    'unspecified'
]

In [None]:
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate

prompt_template = ChatPromptTemplate.from_template("""
You are a legal analyst specializing in SAAS Terms of Service.

For the following text, perform these steps:

1. Classify it EXCLUSIVELY as one of these 6 labels, in lowercase:
    1. **permission**: When granting a right/authorization (e.g., "may use", "right to access", "is permitted").
    2. **prohibition**: When restricting/forbidding actions (e.g., "may not", "prohibited", "not allowed").
    3. **obligation**: When the party owes or is obliged to do something (e.g., "must pay", "responsible for", "required to maintain", "will").
    4. **dispute resolution**: When establishing legal conflict mechanisms (e.g., "arbitration", "jurisdiction", "governing law").
    5. **liability**: Responsibilities or limitations of liability for damages/warranties/indemnities (e.g., "limitation of liability", "indirect damages", "warranty disclaimer").
    6. **other**: ONLY for:
      - Pure titles/section headers (e.g., "APPENDIX A: DEFINITIONS")
      - Non-substantive text (e.g., "Effective Date: 2025-01-01")
      - Definition lists without operational context

2. Identify the Party responsible for performing the action, this should be the "Customer" or the "Provider". If no Party is clearly specified, write "unspecified".

3. From the list below, identify the Action that is most accurately permitted, obligatory, or prohibited within the clause. ONLY SELECT from the predefined list. Not create, modify, or assume new action names.
    {actions_list}

4. Identify the Object of the action (e.g.,‚Äúagreement‚Äù, ‚Äúapi service‚Äù, ‚Äúservice‚Äù, ‚Äúapi‚Äù, ‚Äúdataset‚Äù, ‚Äúdata‚Äù, ‚Äúuser content‚Äù, ‚Äúaccount‚Äù, ‚Äúapi key‚Äù, ‚Äúdocumentation‚Äù, ‚Äúsdk‚Äù, ‚Äúwebsite‚Äù, ‚Äúmodel output‚Äù, ‚Äúpersonal data‚Äù, ‚Äúlogs‚Äù, ‚Äúapplication‚Äù, ‚Äúsoftware‚Äù, ‚Äúcredentials‚Äù, ‚Äúbilling info‚Äù, ‚Äúother‚Äù). If no asset is clearly specified, write "unspecified".

**Critical Instructions:**
- Analyze ONLY substantive content (ignore formatting).
- If the chunk contains both header and legal content, classify by content.
- Use "other" ONLY for non-classifiable structural elements.

**Response Format:**
Return a JSON object with exactly these fields:

{{
  "type": "<one of the 6 labels>",
  "party": "<the party responsible>",
  "action": "<the action permitted, obligatory or prohibited>",
  "asset": "<the asset>"
}}

Text to analyze:
{clause}
""")

chat_model = ChatOpenAI(
    model="gpt-5",
    temperature=0
)


#### Structured output

In [None]:
from typing import Literal
from pydantic import BaseModel,Field

AllowedAction = Literal[
    "tosl:allowDownload", "tosl:appeal", "tosl:assign", "tosl:claim", "tosl:consent",
    "tosl:develop", "tosl:evaluate", "tosl:integrate", "tosl:procedure", "tosl:publish",
    "tosl:remove", "tosl:terminate", "tosl:test",
    "odrl:acceptTracking", "odrl:aggregate", "odrl:anonymize", "odrl:annotate", "odrl:archive",
    "odrl:attribute", "odrl:compensate", "odrl:concurrentUse", "odrl:delete", "odrl:derive",
    "odrl:digitize", "odrl:display", "odrl:distribute", "odrl:ensureExclusivity", "odrl:execute",
    "odrl:extract", "odrl:give", "odrl:grantUse", "odrl:include", "odrl:index",
    "odrl:inform", "odrl:install", "odrl:modify", "odrl:move", "odrl:nextPolicy",
    "odrl:obtainConsent", "odrl:play", "odrl:present", "odrl:print", "odrl:read",
    "odrl:reproduce", "odrl:reviewPolicy", "odrl:sell", "odrl:stream", "odrl:synchronize",
    "odrl:textToSpeech", "odrl:transfer", "odrl:transform", "odrl:translate", "odrl:uninstall",
    "odrl:use", "odrl:watermark",
    "unspecified"
]

class ClauseResult(BaseModel):
    type: Literal["permission", "prohibition", "obligation", "dispute resolution", "liability", "other"] = Field(...)
    party: Literal["customer", "provider", "unspecified"] = Field(...)
    action: AllowedAction = Field(...)
    asset: str = Field(..., description="Short noun phrase (<= 3 words)")


In [None]:
structured_llm = chat_model.with_structured_output(ClauseResult)
actions_text = ", ".join(f"`{a}`" for a in DEFAULT_ACTIONS) 
chain = prompt_template.partial(actions_list=actions_text) | structured_llm

def classify_and_extract(clause: str) -> dict:
    result: ClauseResult = chain.invoke({"clause": clause})
    return result.model_dump()

def process_use_cases(use_cases: dict) -> dict:
    """
    Itera sobre los use cases y a√±ade los campos.
    """
    for key, value in use_cases.items():
        description = value.get("description", "").strip()
        if not description:
            value["type"] = "other"
            value["party"] = "unspecified"
            value["action"] = "unspecified"
            value["asset"] = "unspecified"
            continue

        result = classify_and_extract(description)
        value["type"] = (result.get("type") or "other").strip().lower()
        value["party"] = (result.get("party") or "unspecified").strip().lower()
        value["action"] = (result.get("action") or "unspecified").strip().lower()
        value["asset"] = (result.get("asset") or "unspecified").strip().lower()

    return use_cases

def main(input_file: str, output_file: str):
    with open(input_file, "r", encoding="utf-8") as f:
        data = json.load(f)

    use_cases = data.get("USE_CASE_DESCRIPTIONS", {})
    updated = process_use_cases(use_cases)
    data["USE_CASE_DESCRIPTIONS"] = updated

    with open(output_file, "w", encoding="utf-8") as f:
        json.dump(data, f, indent=2, ensure_ascii=False)


In [None]:
if __name__ == "__main__":
    INPUT_JSON = "data/use_cases_elsevier.json"
    OUTPUT_JSON = "phase1/results/use_cases_elsevier_classifieds.json"

    main(INPUT_JSON, OUTPUT_JSON)


#### Evaluation

In [None]:
import json
import pathlib
import time
import matplotlib.pyplot as plt

ASSET_ALIASES = {
    "api service's web site": "api service",
    "api service‚Äôs web site": "api service",
    "api services": "api service",
    "api service access": "api service",
    "api access": "api service",
    "api keys": "api service",
    "api key": "api service",
    "elsevier content": "api service"
}

FIELDS_EVAL = ["type", "party", "action", "asset"]

def _load_uc(path: str):
    d = json.loads(pathlib.Path(path).read_text(encoding="utf-8"))
    return d.get("USE_CASE_DESCRIPTIONS") or d.get("use_cases") or {}


def _norm_type(x):
    return ("" if x is None else str(x).strip().lower())


def _norm_party(x, normalize_roles):
    if x is None:
        return ""
    s = str(x).strip()
    return s if not normalize_roles else s.lower()


def _norm_action(x, normalize_actions):
    if x is None:
        return ""
    s = str(x).strip()
    return s if not normalize_actions else s


def _norm_asset(x, normalize_assets):
    if x is None:
        return ""
    s = str(x).strip().lower()
    return ASSET_ALIASES.get(s, s) if normalize_assets else s


def _normalize_row(row, normalize_actions=False, normalize_roles=False, normalize_assets=True):
    return {
        "type":  _norm_type(row.get("type")),
        "party": _norm_party(row.get("party"), normalize_roles),
        "action": _norm_action(row.get("action"), normalize_actions),
        "asset": _norm_asset(row.get("asset"), normalize_assets),
    }


def _confusion_and_metrics(y_true, y_pred):
    labels = sorted(set(y_true) | set(y_pred))
    idx = {lab: i for i, lab in enumerate(labels)}
    n = len(labels)
    cm = [[0]*n for _ in range(n)]
    for t, p in zip(y_true, y_pred):
        cm[idx[t]][idx[p]] += 1

    tp_sum = fp_sum = fn_sum = 0
    prec = rec = f1 = {}
    support = {}
    for i, lab in enumerate(labels):
        tp = cm[i][i]
        fp = sum(cm[r][i] for r in range(n) if r != i)
        fn = sum(cm[i][c] for c in range(n) if c != i)
        sup = sum(cm[i])
        p = tp/(tp+fp) if (tp+fp) > 0 else 0.0
        r = tp/(tp+fn) if (tp+fn) > 0 else 0.0
        f = 2*p*r/(p+r) if (p+r) > 0 else 0.0
        prec[lab], rec[lab], f1[lab], support[lab] = p, r, f, sup
        tp_sum += tp
        fp_sum += fp
        fn_sum += fn
    micro_p = tp_sum/(tp_sum+fp_sum) if (tp_sum+fp_sum) > 0 else 0.0
    micro_r = tp_sum/(tp_sum+fn_sum) if (tp_sum+fn_sum) > 0 else 0.0
    micro_f = 2*micro_p*micro_r / \
        (micro_p+micro_r) if (micro_p+micro_r) > 0 else 0.0
    macro_p = sum(prec.values())/(len(labels) or 1)
    macro_r = sum(rec.values())/(len(labels) or 1)
    macro_f = sum(f1.values())/(len(labels) or 1)
    total = sum(sum(r) for r in cm)
    acc = sum(cm[i][i] for i in range(n))/(total or 1)
    return {
        "labels": labels, "confusion": cm, "per_label": {"precision": prec, "recall": rec, "f1": f1, "support": support},
        "macro": {"precision": macro_p, "recall": macro_r, "f1": macro_f},
        "micro": {"precision": micro_p, "recall": micro_r, "f1": micro_f},
        "accuracy": acc, "total": total,
    }


def compare_phase1(pred_path, exp_path, out_json_path, normalize_actions=False, normalize_roles=False, normalize_assets=True):
    pred_uc = _load_uc(pred_path)
    exp_uc = _load_uc(exp_path)
    keys_all = sorted(set(pred_uc.keys()) | set(exp_uc.keys()))
    keys_eval = sorted(set(pred_uc.keys()) & set(exp_uc.keys()))

    per_case = []
    for k in keys_all:
        p = _normalize_row(pred_uc.get(k, {}), normalize_actions,
                           normalize_roles, normalize_assets)
        e = _normalize_row(exp_uc.get(k, {}),  normalize_actions,
                           normalize_roles, normalize_assets)
        matches = {f: int(p.get(f) == e.get(f)) for f in FIELDS_EVAL}
        per_case.append({"id": k, "pred": p, "exp": e, "match": matches})

    per_field = {}
    for f in FIELDS_EVAL:
        y_true = []
        y_pred = []
        for k in keys_eval:
            e = _normalize_row(exp_uc.get(k, {}),  normalize_actions,
                               normalize_roles, normalize_assets).get(f, "")
            p = _normalize_row(pred_uc.get(k, {}), normalize_actions,
                               normalize_roles, normalize_assets).get(f, "")
            y_true.append("" if e is None else str(e))
            y_pred.append("" if p is None else str(p))
        per_field[f] = _confusion_and_metrics(y_true, y_pred)

    result = {
        "meta": {
            "pred_path": pred_path, "exp_path": exp_path,
            "n_cases_all": len(keys_all), "n_cases_eval": len(keys_eval),
            "normalize_actions": normalize_actions, "normalize_roles": normalize_roles,
            "normalize_assets": normalize_assets, "ignored": ["description"],
            "timestamp": int(time.time()),
        },
        "per_case": per_case,
        "per_field": per_field,
    }
    out_p = pathlib.Path(out_json_path)
    out_p.parent.mkdir(parents=True, exist_ok=True)
    out_p.write_text(json.dumps(result, ensure_ascii=False,
                     indent=2), encoding="utf-8")
    print(f"‚úì Guardado JSON de m√©tricas en {out_p}")
    return result

In [None]:
import json, pathlib
import matplotlib.pyplot as plt

def _load_results(result_or_path):
    return json.loads(pathlib.Path(result_or_path).read_text(encoding="utf-8")) \
           if isinstance(result_or_path, (str, pathlib.Path)) else result_or_path

def _weighted_f1_for_field(res, field):
    m = res["per_field"][field]
    labels = m["labels"]
    sup = m["per_label"]["support"]
    f1s = m["per_label"]["f1"]
    total = sum(sup.values()) or 1
    return sum((f1s.get(l, 0.0) * sup.get(l, 0)) for l in labels) / total

def plot_phase1_metrics(result_or_path, save_path: str = None, show_weighted=True):
    res = _load_results(result_or_path)
    fields = FIELDS_EVAL  # ["type","party","action","asset"]

    accs = [res["per_field"][f]["accuracy"] for f in fields]
    f1m  = [res["per_field"][f]["macro"]["f1"] for f in fields]
    f1w  = [_weighted_f1_for_field(res, f) for f in fields] if show_weighted else None

    # Pastel palette (fallback if seaborn not available)
    pastel = ['#a1c9f4', '#ffb482', '#8de5a1']  # blue, orange, green (soft)
    plt.figure(figsize=(9, 4))
    x = range(len(fields)); w = 0.28

    bars1 = plt.bar([i - (w if show_weighted else w/2) for i in x], accs, width=w, label="Accuracy", color=pastel[0])
    bars2 = plt.bar([i + (0 if show_weighted else w/2) for i in x], f1m, width=w, label="Macro-F1", color=pastel[1])
    if show_weighted:
        bars3 = plt.bar([i + w for i in x], f1w, width=w, label="Weighted-F1", color=pastel[2])

    plt.xticks(list(x), [f.capitalize() for f in fields])
    plt.ylim(0, 1)
    plt.grid(axis="y", alpha=0.25)
    plt.title("Phase 1 ‚Äî Accuracy vs Macro-F1" + (" (Weighted-F1 shown)" if show_weighted else ""))
    plt.legend()

    def annotate(bars):
        for b in bars:
            h = b.get_height()
            plt.text(b.get_x() + b.get_width()/2, h + 0.02, f"{h:.2f}", ha="center", va="bottom", fontsize=9)
    annotate(bars1); annotate(bars2)
    if show_weighted:
        annotate(bars3)

    plt.tight_layout()
    if save_path:
        pathlib.Path(save_path).parent.mkdir(parents=True, exist_ok=True)
        plt.savefig(save_path, dpi=160)
        print(f"Saved chart to {save_path}")
    plt.show()


def plot_phase1_confusion(result_or_path, field: str = "action", save_path: str = None):
    import json, pathlib
    import matplotlib.pyplot as plt

    if field not in FIELDS_EVAL:
        raise ValueError(f"field must be one of {FIELDS_EVAL}")

    res = json.loads(pathlib.Path(result_or_path).read_text(encoding="utf-8")) \
          if isinstance(result_or_path, (str, pathlib.Path)) else result_or_path

    m = res["per_field"][field]
    labels, cm = m["labels"], m["confusion"]

    plt.figure(figsize=(max(6, 0.6*len(labels)), max(4, 0.6*len(labels))))
    plt.imshow(cm, cmap="Blues")
    plt.colorbar(fraction=0.046, pad=0.04)
    plt.xticks(range(len(labels)), labels, rotation=45, ha="right")
    plt.yticks(range(len(labels)), labels)
    plt.title(f"Confusion Matrix ‚Äî {field}")
    plt.xlabel("Predicted")
    plt.ylabel("True")

    for i in range(len(labels)):
        for j in range(len(labels)):
            v = cm[i][j]
            if v:
                plt.text(j, i, str(v), ha="center", va="center", color="black", fontsize=9)

    plt.tight_layout()
    if save_path:
        pathlib.Path(save_path).parent.mkdir(parents=True, exist_ok=True)
        plt.savefig(save_path, dpi=160)
        print(f"‚úì Saved confusion matrix to {save_path}")
    plt.show()

In [None]:
res = compare_phase1(
    pred_path="phase1/results/use_cases_elsevier_classifieds.json",
    exp_path="data/use_cases_elsevier_classifieds_expected.json",
    out_json_path="phase1/eval/elsevier_metadata_results.json",
    normalize_actions=False, normalize_roles=False, normalize_assets=True)

plot_phase1_metrics(res, save_path="phase1/eval/metrics.png", show_weighted=True)

plot_phase1_confusion(res, field="action", save_path="phase1/eval/confusion.png")

-------

## Phase 2: Turtle Generation Guided by Template and Ontology

The second phase focuses on transforming the enriched representation into an interoperable semantic format.  

- **Inputs:** enriched use case, a Markdown-based template serving as a construction guide, and the TOSL/ODRL ontology as a semantic reference  
- **Processing:** application of the template and the metamodel to generate RDF statements in Turtle syntax  
- **Output:** Turtle file corresponding to the processed clause  

In [None]:
def load_use_cases(json_path):
    """
    Carga el JSON y devuelve una lista de diccionarios con los campos relevantes.
    """
    with open(json_path, "r", encoding="utf-8") as f:
        data = json.load(f)

    global_metadata = {
        "PROVIDER": data.get("PROVIDER", "Unknown"),
        "SOURCE": data.get("SOURCE", "Unknown"),
        "TITLE": data.get("TITLE", "Unknown"),
        "DATE": data.get("DATE", "Unknown")
    }

    use_cases = []
    for key, item in data.get("USE_CASE_DESCRIPTIONS", {}).items():
        clause = item.get("description", "").strip()
        classification = item.get("type", "").strip().lower()

        # Si Party o Asset no est√°n, usa valores por defecto
        party = item.get("party", "UnknownParty")
        action = item.get("action", "UnknownAction")
        asset = item.get("asset", "UnknownAsset")

        use_cases.append({
            "id": key,
            "clause": clause,
            "type": classification,
            "party": party,
            "action": action,
            "asset": asset
        })

    return use_cases, global_metadata

In [None]:
def extract_text_from_md(md_path):
  with open(md_path, "r", encoding="utf-8") as f:
      content = f.read()
  return content


In [None]:
def extract_ontology(ontology_path):
    with open(ontology_path, "r") as f:
        content = f.read()
        return content


In [None]:
def extract_ttl(content):
    start_tag = "```ttl"
    end_tag = "```"
    start_idx = content.find(start_tag)
    end_idx = content.rfind(end_tag)

    if start_idx == -1 or end_idx == -1:
        raise ValueError("No se encontraron los delimitadores ```ttl")

    # Extraer solo la parte del TTL
    ttl = content[start_idx + len(start_tag):end_idx].strip()
    return ttl

In [None]:
def save_prompt(prompt_text, uc_id):
    path = f"phase2/results/prompt-{uc_id}.txt"
    with open(path, "w") as f:
        f.write(prompt_text)

def save_ttl(ttl_text, uc_id):
    path = f"phase2/results/{uc_id}.ttl"
    with open(path, "w", encoding="utf-8") as f:
        f.write(ttl_text)

In [None]:
def build_prompt(ontology_context, template_text, clause_info, global_metadata):
    """
    Construye un prompt de modelado RDF en Turtle.
    """
    prompt = f"""
        You are a legal knowledge extractor specialized in TOSL and ODRL policy modeling.

        Your job is to:
        - Read a legal clause written in natural language.
        - Identify the implied permissions, prohibitions, duties, limitations of liability, dispute resolution, and constraints.
        - Convert the clause into a structured RDF policy using the ODRL and TOSL ontologies.

        Guidelines:
        - Output MUST be valid Turtle syntax.
        - Use only the vocabulary and structure provided in the ontology context and reference template in markdown.
        - All URIs and prefixes must be exactly as shown.
        - Do not add explanations, headers, or any commentary.
        - Output MUST be enclosed in a fenced code block with ```ttl.

        Reference template: {template_text}

        Metadata:
          - Provider: {global_metadata.get("PROVIDER", "Unknown")}
          - Source: {global_metadata.get("SOURCE", "Unknown")}
          - Title: {global_metadata.get("TITLE", "Unknown")}
          - Date: {global_metadata.get("DATE", "Unknown")}

        Clause to model: {clause_info['clause']}

        Classification type: {clause_info['type']}

        Party: {clause_info['party']}

        Suggested Action: {clause_info['action']}

        Target asset: {clause_info['asset']}
        """
    return prompt.strip()


def main(json_path, md_path, ontology_path):
    """
    Orquesta el flujo:
    1. Lee JSON.
    2. Lee template markdown.
    3. Construye prompts para cada cl√°usula.
    """
    use_cases, global_metadata = load_use_cases(json_path)
    template_text = extract_text_from_md(md_path)
    os.makedirs("phase2/results", exist_ok=True)

    # Crear y mostrar prompts
    for uc in use_cases:
      prompt_text = build_prompt(template_text, uc, global_metadata)
      save_prompt(prompt_text, uc['id'])
      messages = [
          {"role": "system", "content": "You are a legal knowledge extractor specialized in TOSL and ODRL policy modeling."},
          {"role": "user", "content": prompt_text}
      ]
      response = model.invoke(messages)

      print("======================================")
      print(f"{uc['id']}")
      print("======================================")
      print(response.content)
      ttl_text = extract_ttl(response.content)
      save_ttl(ttl_text, uc['id'])
      print(f"‚úÖ Archivo guardado")
      print("\n\n")


In [None]:
if __name__ == "__main__":
    # Paths de tus archivos
    json_path = "phase1/results/use_cases_elsevier_classifieds.json"
    md_path = "template.md"

    main(json_path, md_path)

#### Evaluation

In [None]:
import requests

PREFIXES = {
    "http://www.w3.org/ns/shacl#": "sh:",
    "http://www.w3.org/ns/odrl/2/": "odrl:",
    "https://w3id.org/tosl/": "tosl:",
    "http://example.com/": ":",
    "http://www.w3.org/2001/XMLSchema#": "xsd:",
}
def short(u: str) -> str:
    if not u:
        return ""
    for base, pfx in PREFIXES.items():
        if u.startswith(base):
            return pfx + u[len(base):]
    return u

def tosl_checker(url, file_path):
    with open(file_path, "rb") as f:
        r = requests.post(url, files={"file": ("use_case.ttl", f, "text/turtle")})

    print("C√≥digo de estado:", r.status_code)
    try:
        data = r.json()
    except Exception:
        print("Respuesta no-JSON:\n", r.text)
        return None

    ok = bool(data.get("conforms") or data.get("valid"))
    print("‚úÖ Conforms" if ok else "‚ùå Violations")

    if ok:
        return data

    for i, v in enumerate(data.get("violations", []), 1):
        focus = short(v.get("focusNode", ""))
        path  = short(v.get("resultPath", ""))
        value = short(v.get("valueNode", ""))
        msg   = v.get("message", "")
        head = f"{focus} --{path}--> {value}" if path else focus or value or "(sin nodo)"
        print(f"  {i}. {head}")
        if msg:
            print(f"      {msg}")

    return data

In [None]:
import json, pathlib
import matplotlib.pyplot as plt

VALIDATOR_URL = "https://tosl.onrender.com/validator/validate"

def phase2_validate_and_plot(
    url: str = VALIDATOR_URL,
    in_globs = ("phase2/results/*.ttl",),
    out_json: str = "phase2/eval/elsevier_ttls_results.json",
    out_plot: str = "phase2/eval/elsevier_ttls_results_plot.png",
    show_hist: bool = True,
):
    # 1) Collect files
    paths = []
    for g in in_globs:
        paths.extend(pathlib.Path(".").glob(g))
    paths = sorted(set(paths), key=lambda p: str(p))

    # 2) Ensure output dir exists
    out_path = pathlib.Path(out_json)
    out_path.parent.mkdir(parents=True, exist_ok=True)
    pathlib.Path(out_plot).parent.mkdir(parents=True, exist_ok=True)

    # 3) Validate with your tosl_checker and keep only {passed, errors}
    results = {}
    for p in paths:
        print(f"\n[i] {p}")
        report = tosl_checker(url, str(p)) or {}
        passed = bool(report.get("conforms") or report.get("valid"))
        errors = len(report.get("violations", [])) if isinstance(report, dict) else None
        results[str(p)] = {"passed": passed, "errors": errors}

    out_path.write_text(json.dumps(results, ensure_ascii=False, indent=2), encoding="utf-8")
    print(f"\nSaved JSON: {out_path} ({len(results)} items)")

    # 4) Plot and save
    vals = list(results.values())
    passed_n = sum(1 for v in vals if v.get("passed"))
    failed_n = len(vals) - passed_n
    pass_rate = (passed_n / (passed_n + failed_n) * 100) if (passed_n + failed_n) else 0.0

    c_pass, c_fail, c_hist = "#8de5a1", "#ffb482", "#a1c9f4"

    if show_hist:
        fig, (ax0, ax1) = plt.subplots(1, 2, figsize=(12, 4))
    else:
        fig, ax0 = plt.subplots(1, 1, figsize=(7, 4))

    # Bars: Passed/Failed
    ax0.bar(["Passed", "Failed"], [passed_n, failed_n], color=[c_pass, c_fail])
    for i, v in enumerate([passed_n, failed_n]):
        ax0.text(i, v + max(1, v) * 0.02, f"{v}", ha="center", va="bottom", fontsize=10)
    ax0.set_ylim(0, max(1, passed_n, failed_n) * 1.2)
    ax0.set_title(f"Phase 2 ‚Äî Pass/Fail (Pass rate {pass_rate:.0f}%)")
    ax0.grid(axis="y", alpha=0.3)

    # Histogram: violations per failed file
    if show_hist:
        errors = [int(v.get("errors") or 0) for v in vals if not v.get("passed")]
        if errors:
            bins = range(0, max(errors) + 2)
            ax1.hist(errors, bins=bins, color=c_hist, edgecolor="white")
            ax1.set_title("Violations per failed file")
            ax1.set_xlabel("Violations")
            ax1.set_ylabel("Files")
            ax1.grid(axis="y", alpha=0.3)
        else:
            ax1.axis("off")
            ax1.text(0.5, 0.5, "No failed files", ha="center", va="center", fontsize=11)

    plt.tight_layout()
    plt.savefig(out_plot, dpi=160)
    print(f"Saved plot: {out_plot}")
    plt.show()

    return results

if __name__ == "__main__":
    phase2_validate_and_plot()


-------

## Phase 3: Validation and Self-repair
This phase ensures the syntactic and semantic correctness of the output.  

- **Input:** Turtle file generated in the previous phase  
- **Processing:**  
  - Syntactic validation using standard Turtle validation tools  
  - Semantic validation using a custom validator based on TOSL/ODRL rules  
  - **AI-assisted self-repair** to automatically correct inconsistencies  
- **Outputs:**  
  1. Error report  
  2. Corrected and validated Turtle file  

In [None]:
def build_prompt_fix_ttl(ontology_context, instance_ttl, validator_errors_json):
    """
    Builds a prompt to FIX a Turtle file using an ontology and validator feedback.
    Output is requested as a fenced ```ttl code block.
    """
    prompt = f"""
        You are a knowledge engineer specialized in RDF/Turtle and ontology-driven validation.

        Your job is to:
        - Read the original Turtle file and the validator violations.
        - Fix ALL violations so that the file conforms to the given ontology.
        - Preserve all correct information and the original meaning whenever possible.

        Guidelines:
        - Output MUST be valid Turtle syntax.
        - Use ONLY the vocabulary and structure provided in the ontology context.
        - Keep existing prefixes; add missing ones if needed (and only if required by the ontology).
        - Do NOT invent IRIs if they can be derived from the ontology/prefixes.
        - The output MUST contain ONLY the corrected TTL, enclosed in a fenced code block with ```ttl (no extra text).

        Ontology context (.ttl):
        {ontology_context}

        Validator errors (JSON):
        {validator_errors_json}

        Original file (Turtle):
        {instance_ttl}

        IMPORTANT:
        - Think step by step internally, but return ONLY the corrected Turtle in a ```ttl fenced block.
    """
    return prompt.strip()


In [None]:
from pathlib import Path

URL = "https://tosl.onrender.com/validator/validate"
IN_DIR = Path("phase2/results")
OUT_DIR = Path("phase3/results")
EVAL_JSON = Path("phase3/eval/elsevier_ttls_results.json")  
ONTOLOGY_PATH = Path("tosl_odrl_simplified.ttl")
PATTERN = "use_case_*.ttl"

def conforms(report: dict) -> bool:
    return bool(report.get("conforms") or report.get("valid"))

def _violations_count(report) -> int:
    if isinstance(report, dict) and isinstance(report.get("violations"), list):
        return len(report["violations"])
    return 0

def _load_results(path: Path) -> dict:
    if path.exists():
        try:
            return json.loads(path.read_text(encoding="utf-8"))
        except Exception:
            pass
    return {}

def _save_results(path: Path, results: dict):
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(json.dumps(results, ensure_ascii=False, indent=2), encoding="utf-8")

def main():
    OUT_DIR.mkdir(parents=True, exist_ok=True)
    ontology_context = extract_ontology(str(ONTOLOGY_PATH))
    EVAL_JSON.parent.mkdir(parents=True, exist_ok=True)

    results = _load_results(EVAL_JSON)

    for f in sorted(IN_DIR.glob(PATTERN)):
        key = str(f) 
        print(f"[i] {f.name}")
        report = tosl_checker(URL, str(f))

        if conforms(report):
            out = OUT_DIR / f.name
            out.write_text(f.read_text(encoding="utf-8"), encoding="utf-8")
            continue

        prompt_text = build_prompt_fix_ttl(ontology_context=ontology_context, 
                                           instance_ttl=f.read_text(encoding="utf-8"), 
                                           validator_errors_json=report.get("violations", []))

        messages = [
            {"role": "system", "content": "You are a strict RDF/Turtle engineer."},
            {"role": "user", "content": prompt_text},
        ]
        response = model.invoke(messages)

        ttl_text = extract_ttl(response.content)
        out = OUT_DIR / f"{f.stem}{f.suffix}"
        out.write_text(ttl_text, encoding="utf-8")

        # check again
        reval = tosl_checker(URL, str(out))
        ok = conforms(reval)
        errs = 0 if ok else _violations_count(reval)
        results[key] = {"passed": ok, "errors": errs}
        _save_results(EVAL_JSON, results)
        print("   üîß Save as:", out.name, "|", "‚úÖ Conforms" if conforms(reval) else "‚ùå No conforms")

if __name__ == "__main__":
    main()


------

# Evaluation


In [None]:
import json, time, pathlib, concurrent.futures as cf
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

DEONTIC_URL = "https://tosl.onrender.com/sparql/deontic_status"
UNFAIR_URL  = "https://tosl.onrender.com/sparql/unfair_terms"

DEONTIC_KEEP = ["get_duties", "get_permissions", "get_prohibitions", "total_rules"]

TERM_TYPES = [
    "change",
    "termination",
    "contract_by_use",
    "choice_of_law",
    "jurisdiction",
    "arbitration",
    "content_removal",
    "limitation_of_liability",
]

SHORTEN = True
PREFIXES = {
    "http://www.w3.org/ns/odrl/2/": "odrl:",
    "https://w3id.org/tosl/": "tosl:",
    "http://purl.org/dc/terms/": "dcterms:",
    "http://example.com/": ":",
    "http://www.w3.org/2001/XMLSchema#": "xsd:",
}

S = requests.Session()
S.headers.update({"Accept": "application/sparql-results+json", "Accept-Encoding": "gzip, deflate"})
S.mount("https://", HTTPAdapter(max_retries=Retry(
    total=3, backoff_factor=0.6, status_forcelist=[429, 500, 502, 503, 504], allowed_methods=["POST"]
)))
TIMEOUT = 60

def short(u: str):
    if not SHORTEN or not isinstance(u, str):
        return u
    for base, pfx in PREFIXES.items():
        if u.startswith(base):
            return pfx + u[len(base):]
    return u

def to_rows(payload):
    if isinstance(payload, list):
        return payload
    if isinstance(payload, dict) and "results" in payload:
        vars_ = payload.get("head", {}).get("vars", [])
        return [{v: b.get(v, {}).get("value") for v in payload["results"].get("bindings", [])}
                for b in payload["results"].get("bindings", [])]
    return []

def as_list(x):
    if x is None or x == "":
        return []
    if isinstance(x, list):
        return [short(v) for v in x]
    return [short(v.strip()) for v in str(x).split(",") if v.strip()]

def norm_deontic_row(row):
    rid = row.get("duty") or row.get("permission") or row.get("prohibition") or row.get("rule") or row.get("element")
    return {
        "id": short(rid),
        "actions": as_list(row.get("actions") or row.get("action")),
        "targets": as_list(row.get("targets") or row.get("target")),
        "assignee": short(row.get("assignee")),
        "assigner": short(row.get("assigner")),
        "description": row.get("description"),
    }

def post_file(url, ttl_path, params=None):
    t0 = time.time()
    ttl_path = pathlib.Path(ttl_path)
    with open(ttl_path, "rb") as f:
        r = S.post(url, params=params or {}, files={"file": (ttl_path.name, f, "text/turtle")}, timeout=TIMEOUT)
    ms = int((time.time() - t0) * 1000)
    try:
        data, ok_json = r.json(), True
    except Exception:
        data, ok_json = {"non_json": r.text[:4000]}, False
    return {"status": r.status_code, "ms": ms, "ok": r.ok and ok_json, "data": data}

def run_all_for_file(ttl_path: str):
    ttl_path = str(ttl_path)
    out = {
        "file": ttl_path,
        "deontic": {
            "total_rules": 0,
            "duties": [],
            "permissions": [],
            "prohibitions": [],
        },
        "unfair_terms": {},
        "_meta": {"calls": 0, "ok": 0, "ms_total": 0},  # opcional
    }

    for q in DEONTIC_KEEP:
        res = post_file(DEONTIC_URL, ttl_path, params={"deontic_status": q})
        out["_meta"]["calls"] += 1
        out["_meta"]["ok"] += int(bool(res["ok"]))
        out["_meta"]["ms_total"] += res["ms"]
        rows = to_rows(res["data"]) if res["ok"] else []
        if q == "total_rules":
            val = 0
            if rows:
                raw = rows[0].get("totalElements")
                try: val = int(raw or 0)
                except Exception: val = 0
            out["deontic"]["total_rules"] = val
        elif q == "get_duties":
            out["deontic"]["duties"] = [norm_deontic_row(r) for r in rows]
        elif q == "get_permissions":
            out["deontic"]["permissions"] = [norm_deontic_row(r) for r in rows]
        elif q == "get_prohibitions":
            out["deontic"]["prohibitions"] = [norm_deontic_row(r) for r in rows]

    for tt in TERM_TYPES:
        res = post_file(UNFAIR_URL, ttl_path, params={"term_type": tt})
        out["_meta"]["calls"] += 1
        out["_meta"]["ok"] += int(bool(res["ok"]))
        out["_meta"]["ms_total"] += res["ms"]
        rows = to_rows(res["data"]) if res["ok"] else []
        clean = []
        for r in rows:
            clean.append({k: (as_list(v) if k in ("actions", "targets") else short(v)) for k, v in r.items()})
        out["unfair_terms"][tt] = clean

    return out

def run_batch(in_glob="phase3/results/*.ttl", out_dir="evaluation", max_workers=6):
    in_paths = sorted(pathlib.Path(".").glob(in_glob))
    out_dir = pathlib.Path(out_dir); out_dir.mkdir(parents=True, exist_ok=True)

    def _save(res: dict):
        p = pathlib.Path(res["file"])
        out_path = out_dir / f"{p.stem}_results.json"
        out_path.write_text(json.dumps(res, ensure_ascii=False, indent=2))
        return out_path

    index = []
    with cf.ThreadPoolExecutor(max_workers=max_workers) as ex:
        futs = {ex.submit(run_all_for_file, str(p)): str(p) for p in in_paths}
        for fut in cf.as_completed(futs):
            res = fut.result()
            saved = _save(res)
            index.append({"file": res["file"], "out": str(saved), "calls": res["_meta"]["calls"], "ok": res["_meta"]["ok"]})
            print(f"‚úì {res['file']} -> {saved.name} | {res['_meta']['ok']}/{res['_meta']['calls']} OK")

    (out_dir / "_index.json").write_text(json.dumps(index, ensure_ascii=False, indent=2))
    return index

run_batch(in_glob="phase3/results/*.ttl", out_dir="evaluation", max_workers=6)
