# ODRL Policy Validation

This validator uses the code from https://github.com/paolo7/ODRL-Engine.git

## Instructions

First run the code cell 1) to initialise the project and install the required libraries. Then run one of the following cells to perform different functions.

* Cell 2) Prompts an upload of your ODRL file, which will then be validated and the report displayed below. Examples of valid and invalid ODRL files can be found in the github repository.
* Cell 3) Prompts an upload of your ODRL file, which will then be displayed in an interactive display below. Click on elements such as rules or constraints to expand them.
* Cell 4) Prompts the upload of two ODRL files, and it will check if they contain the same RDF triples, which checks for a stronger version of equivalence, which also includes graph equivalence.
* Cell 5.1) sets up the parser code, and cell 5.2) Prompts the upload of an ODRL file, which will then be parsed using the parser code, and the parsed information will be displayed below.



## 1) Initialise Code

In [1]:
!git clone https://github.com/paolo7/ODRL-Engine.git

%cd ODRL-Engine
!pip install -r requirements.txt

!pip install ipywidgets

import sys
sys.path.append('/content/ODRL-Engine')

from google.colab import files

import validate
import rdflib
from rdflib.namespace import RDF, RDFS, SKOS
import ipywidgets as widgets
from IPython.display import display, HTML

Cloning into 'ODRL-Engine'...
remote: Enumerating objects: 46, done.[K
remote: Counting objects: 100% (46/46), done.[K
remote: Compressing objects: 100% (31/31), done.[K
remote: Total 46 (delta 23), reused 32 (delta 11), pack-reused 0 (from 0)[K
Receiving objects: 100% (46/46), 36.78 KiB | 1.53 MiB/s, done.
Resolving deltas: 100% (23/23), done.
/content/ODRL-Engine
Collecting rdflib (from -r requirements.txt (line 1))
  Downloading rdflib-7.2.1-py3-none-any.whl.metadata (11 kB)
Collecting pyshacl (from -r requirements.txt (line 2))
  Downloading pyshacl-0.30.1-py3-none-any.whl.metadata (35 kB)
Collecting owlrl<8,>=7.1.2 (from pyshacl->-r requirements.txt (line 2))
  Downloading owlrl-7.1.4-py3-none-any.whl.metadata (3.8 kB)
Collecting html5rdf<2,>=1.2 (from rdflib[html]!=7.1.2,<8.0,>=7.1.1->pyshacl->-r requirements.txt (line 2))
  Downloading html5rdf-1.2.1-py2.py3-none-any.whl.metadata (7.5 kB)
Downloading rdflib-7.2.1-py3-none-any.whl (565 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━

## 2) Validate ODRL

In [2]:
from google.colab import files
import validate

uploaded = files.upload()
# Enforce single file upload
if len(uploaded) != 1:
    raise ValueError("Please upload exactly one file.")
# Extract filename and content
filename = list(uploaded.keys())[0]
content = uploaded[filename]
# Save uploaded file to disk
with open(filename, "wb") as f:
    f.write(content)
print(f'User uploaded file "{filename}" with length {len(content)} bytes')
validate.generate_ODRL_diagnostic_report(filename)

Saving policy_6_BIOSKIN_2025-09-22_13-17-07 1.json to policy_6_BIOSKIN_2025-09-22_13-17-07 1.json
User uploaded file "policy_6_BIOSKIN_2025-09-22_13-17-07 1.json" with length 20624 bytes
REPORT START
Analysing the file policy_6_BIOSKIN_2025-09-22_13-17-07 1.json for ODRL compliance:
INFO: The file contains an RDF graph in the following format: json-ld
ODRL entities summary:
- 1 Policy
- 0 Set
- 0 Agreement
- 0 Offer
- 8 Permission
- 0 Prohibition
- 0 Duty
- 35 Constraint
SHACL validation check passed
REPORT END



## 3) Visualise ODRL policy

In [5]:
import rdf_utils

uploaded = files.upload()

fn = list(uploaded.keys())[0]
policy_content = uploaded[fn].decode('utf-8')
g, format = rdf_utils.parse_string_to_graph(policy_content)

# --- Ontologies ---
ODRL = rdflib.Namespace("http://www.w3.org/ns/odrl/2/")

odrl_ontology = rdflib.Graph()
odrl_ontology.parse("https://www.w3.org/ns/odrl/2/ODRL22.ttl", format="turtle")

odrl_dpv_ontology = rdflib.Graph()
odrl_dpv_ontology.parse("https://raw.githubusercontent.com/EU-UPCAST/ODRL-DPV-Ontology/refs/heads/main/ODRL_DPV.rdf", format="xml")


# Collect all properties to use to expand the tree
allowed_relations = set(odrl_ontology.subjects(RDF.type, RDF.Property))
allowed_relations.add(ODRL.source)
allowed_relations.add(RDF.value)
allowed_relations.add(RDF.type)
allowed_relations.add(RDF.rest)
allowed_relations.add(RDF.first)

# --- Helper functions ---

LEVEL_COLORS = {
    1: "white",           # top-level policy
    2: "#cce5ff",         # pastel blue
    3: "#d4edda",         # light green
    4: "#ffe5b4",         # light orange
}

INDENT_PER_LEVEL = 8

def get_label(node):
    """Get a human-readable label for a node if available,
       checking policy graph, ODRL ontology, and DPV ontology."""
    if isinstance(node, rdflib.term.Literal):
        return str(node)

    # --- check in main policy graph ---
    for lbl in g.objects(node, RDFS.label):
        return str(lbl)
    for lbl in g.objects(node, rdflib.URIRef("http://purl.org/dc/terms/title")):
        return str(lbl)
    for lbl in g.objects(node, SKOS.prefLabel):
        return str(lbl)

    # --- check in DPV/ODRL ontologies ---
    for lbl in odrl_dpv_ontology.objects(node, RDFS.label):
        return str(lbl)
    for lbl in odrl_dpv_ontology.objects(node, rdflib.URIRef("http://purl.org/dc/terms/title")):
        return str(lbl)
    for lbl in odrl_dpv_ontology.objects(node, SKOS.prefLabel):
        return str(lbl)

    # optionally: check in odrl_ontology too if you load it separately
    for lbl in odrl_ontology.objects(node, RDFS.label):
        return str(lbl)
    for lbl in odrl_ontology.objects(node, SKOS.prefLabel):
        return str(lbl)

    # fallback: use last part of URI
    if isinstance(node, rdflib.term.URIRef):
        return node.split("/")[-1]
    return str(node)

def build_html_tree(node, visited=None, level=1, parent_predicate=None):
    if visited is None:
        visited = set()
    visited.add(node)

    bg_color = LEVEL_COLORS.get(level, "white")
    indent = level * INDENT_PER_LEVEL

    # --- Detect if this node is the head of an RDF list ---
    if (node, RDF.first, None) in g:
        # It's a list head, iterate through list
        items_html = []
        current = node
        while current != RDF.nil:
            first_objs = list(g.objects(current, RDF.first))
            if first_objs:
                first_node = first_objs[0]
                # render first_node recursively, but mark as list item
                items_html.append(f"<li>{build_html_tree(first_node, visited.copy(), level=level+1, parent_predicate=RDF.first)}</li>")
            rest_objs = list(g.objects(current, RDF.rest))
            current = rest_objs[0] if rest_objs else RDF.nil
        return f'<ul style="margin-left:{indent}px;">{"".join(items_html)}</ul>'

    children_html = ""
    for p, o in g.predicate_objects(node):
        if p not in allowed_relations:
            continue
        pname = get_label(p)
        label_text = f"{pname}: {get_label(o)}"

        is_expandable = (o, None, None) in g
        border_style = "1px solid black" if bg_color == "white" and is_expandable else "none"

        if is_expandable:
            sub_html = build_html_tree(o, visited.copy(), level=level+1, parent_predicate=p)
            children_html += f'''
            <details style="
                background-color:{bg_color};
                padding:6px;
                margin-left:{indent}px;
                margin-bottom:4px;
                border:{border_style};
                border-radius:3px;">
                <summary>{label_text}</summary>
                {sub_html}
            </details>
            '''
        else:
            children_html += f'''
            <div style="
                background-color:{bg_color};
                padding:6px;
                margin-left:{indent}px;
                margin-bottom:4px;
                border:{border_style};
                border-radius:3px;">
                {label_text}
            </div>
            '''

    if not children_html:
        border_style = "1px solid black" if bg_color == "white" else "none"
        return f'''
        <div style="
            background-color:{bg_color};
            padding:6px;
            margin-left:{indent}px;
            margin-bottom:4px;
            border:{border_style};
            border-radius:3px;">
            {get_label(node)}
        </div>
        '''

    return children_html


def explore_policies_html():
    policy_types = [ODRL.Policy, ODRL.Set, ODRL.Agreement, ODRL.Offer]
    policy_nodes = []
    for t in policy_types:
        policy_nodes.extend(list(g.subjects(RDF.type, t)))

    for pol in policy_nodes:
        # Add the policy IRI at the top (without a black border)
        policy_iri_html = f'''
        <div style="
            padding:4px 0px;
            font-weight:bold;
            margin-bottom:6px;">
            Policy: {pol}
        </div>
        '''
        # Build the tree for this policy
        policy_tree_html = policy_iri_html + build_html_tree(pol, level=1)

        # Wrap each policy in its own top-level box
        top_box_html = f'''
        <div style="
            padding:10px;
            border:2px solid black;
            border-radius:5px;
            background-color:white;
            margin-bottom:10px;">
            {policy_tree_html}
        </div>
        '''
        display(HTML(top_box_html))

# Display interactive HTML tree with colored levels
explore_policies_html()

Saving policy_6_BIOSKIN_2025-09-22_13-17-07 1.json to policy_6_BIOSKIN_2025-09-22_13-17-07 1 (3).json


## 4) Compare two RDF files (like ODRL policies) to see if they are identical
Note: this does not just check whether they are semantically equivalent, i.e. they mean the same thing, but also that they are represented by exactly the same graph.

In [7]:
from google.colab import files
from rdflib import Graph, BNode
import rdf_utils
import codecs

# Prompt user to upload two RDF files
print("Please upload the first RDF file:")
first_upload = files.upload()
first_filename = next(iter(first_upload))

print("Please upload the second RDF file:")
second_upload = files.upload()
second_filename = next(iter(second_upload))

def clean_file(filename: str) -> bytes:
    """Read a file as bytes and strip UTF-8 BOM if present."""
    with open(filename, "rb") as f:
        content = f.read()
    if content.startswith(codecs.BOM_UTF8):
        content = content[len(codecs.BOM_UTF8):]
    return content

# Read and clean file contents as bytes
data1 = clean_file(first_filename)
data2 = clean_file(second_filename)

# Parse to graphs (updated function now accepts bytes)
graph1, _ = rdf_utils.parse_string_to_graph(data1)
graph2, _ = rdf_utils.parse_string_to_graph(data2)

def triple_in_graph(triple, graph):
    """Check if triple is in graph, treating blank nodes as wildcards."""
    s, p, o = triple
    for (s2, p2, o2) in graph:
        if (isinstance(s, BNode) or s == s2) and \
           (isinstance(p, BNode) or p == p2) and \
           (isinstance(o, BNode) or o == o2):
            return True
    return False

# Compare graphs with blank nodes as wildcards
only_in_first = [t for t in graph1 if not triple_in_graph(t, graph2)]
only_in_second = [t for t in graph2 if not triple_in_graph(t, graph1)]

if not only_in_first and not only_in_second:
    print("✅ Both RDF files contain the same data (treating blank nodes as wildcards).")
else:
    if only_in_first:
        print("\nTriples only in first file:")
        for triple in only_in_first:
            print(triple)
    if only_in_second:
        print("\nTriples only in second file:")
        for triple in only_in_second:
            print(triple)

Please upload the first RDF file:


Saving policy_6_BIOSKIN_2025-09-22_13-17-07 1.json to policy_6_BIOSKIN_2025-09-22_13-17-07 1 (4).json
Please upload the second RDF file:


Saving policy_6_BIOSKIN_2025-09-22_13-17-07 1.json to policy_6_BIOSKIN_2025-09-22_13-17-07 1 (5).json
✅ Both RDF files contain the same data (treating blank nodes as wildcards).


## 5.1) UPCAST contract parser

In [None]:
from rdflib import Graph, Namespace, URIRef
from rdflib.namespace import RDF
from rdflib.plugin import Parser
import datetime
import logging
import sys

if sys.version_info[0] < 3:
    raise Exception("Python 3.11 or higher is required.")
if sys.version_info[1] < 11:
    raise Exception("Python 3.11 or higher is required.")


class ContractParser:
    IDSA = Namespace("https://w3id.org/idsa/core/")
    UPCAST = Namespace("https://www.upcast-project.eu/upcast-vocab/1.0/")

    def __init__(self):
        self.contract_graph = None

    def load(self, file_path):
        """
        Loads the contract data from the specified file path.
        Tries multiple RDF serializations and encodings until one succeeds, or
        all are exhausted.
        """
        self.contract_graph = Graph()

        rdf_formats = [
            "xml",       # RDF/XML
            "turtle",    # Turtle / TTL
            "nt",        # N-Triples
            "n3",        # Notation3
            "json-ld",   # JSON-LD
            "trig",      # TriG
            "trix",      # TriX
        ]

        # Try parsing with each format
        last_exception = None
        for rdf_format in rdf_formats:
            try:
                self.contract_graph.parse(file_path, format=rdf_format)
                break
            except Exception as e:
                last_exception = e
                self.contract_graph = Graph()  # reset in case of partial parse
        else:
            # If no parser worked, try again by reading file contents with encodings
            encodings = ["utf-8", "utf-16", "latin-1"]
            for enc in encodings:
                try:
                    with open(file_path, "r", encoding=enc) as f:
                        data = f.read()
                    for rdf_format in available_formats:
                        try:
                            self.contract_graph.parse(data=data, format=rdf_format)
                            break
                        except Exception:
                            continue
                    else:
                        continue
                    break
                except Exception as e:
                    last_exception = e

        if len(self.contract_graph) == 0 and last_exception:
            raise ValueError(
                f"Failed to parse RDF file {file_path}. Last error: {last_exception}"
            )
        self.contract_graph.bind("idsa-core", ContractParser.IDSA)
        self.contract_graph.bind("upcast", ContractParser.UPCAST)

    def query(self, query_string):
        """
        Query the loaded contract data with the specified SPARQL query string.
        """
        pass

    def get_contract_uri(self):
        """
        Convenience method to get the URI of the contract to use in other methods
        Assumption: A loaded document contains a single contract, hence a single contract URI
        Returns a rdflib Graph node
        """
        if self.contract_graph is None:
            raise Exception("No contract loaded into this parser")
        contract_id = self.contract_graph.value(predicate=RDF.type, object=ContractParser.IDSA.Contract)
        return contract_id

    def get_provider(self):
        """
        Convenience method to get the provider of the contract
        """
        if self.contract_graph is None:
            raise Exception("No contract loaded into this parser")
        provider = self.contract_graph.value(predicate=ContractParser.IDSA.Provider, subject=self.get_contract_uri())
        return provider

    def get_consumer(self):
        """
        Convenience method to get the provider of the contract
        """
        if self.contract_graph is None:
            raise Exception("No contract loaded into this parser")
        consumer = self.contract_graph.value(predicate=ContractParser.IDSA.Consumer, subject=self.get_contract_uri())
        return consumer

    def get_permitted_actions(self):
        """
        Convenience method to get the permitted actions of the contract
        """

        if self.contract_graph is None:
            raise Exception("No contract loaded into this parser")

        # Permitted actions and their IRIs
        query = """
        PREFIX odrl: <http://www.w3.org/ns/odrl/2/>
        PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>

        SELECT ?actionValue
        WHERE {
        ?agreement a ?type ;
             odrl:permission ?permission .
        VALUES ?type { odrl:Agreement odrl:Policy odrl:Set odrl:Offer }
         {?permission odrl:action ?actionValue .}
        UNION
        {?permission odrl:action ?actionIRI .
         ?actionIRI rdf:value ?actionValue .}

        }
        """
        query_results = self.contract_graph.query(query)
        permitted_actions = {str(row.actionValue) for row in query_results if isinstance(row.actionValue,URIRef)}


        return permitted_actions

    def get_prohibited_actions(self):
        """
        Convenience method to get the prohibited actions of the contract
        """

        if self.contract_graph is None:
            raise Exception("No contract loaded into this parser")

        # Permitted actions and their IRIs
        query = """
        PREFIX odrl: <http://www.w3.org/ns/odrl/2/>
        PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>

        SELECT ?actionValue
        WHERE {
        ?agreement odrl:prohibition ?prohibition .
        {?prohibition odrl:action ?actionValue .}
        UNION
        {?prohibition odrl:action ?actionIRI .
         ?actionIRI rdf:value ?actionValue .}
        }
        """
        query_results = self.contract_graph.query(query)
        prohibited_actions = {str(row.actionValue) for row in query_results if isinstance(row.actionValue,URIRef)}
        return prohibited_actions

    def get_action_container(self,actionValue):
        """
        Input: action Value, i.e. , its name in String format
        Output: URL of the container that implements the input action IRI, None if not specified in the contract
        """

        if self.contract_graph is None:
            raise Exception("No contract loaded into this parser")

        query = """
        PREFIX odrl: <http://www.w3.org/ns/odrl/2/>
        PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
        PREFIX upcast: <https://www.upcast-project.eu/upcast-vocab/1.0/>

        SELECT ?rightOperand
        WHERE {
        ?actionIRI rdf:value ?actionValue .
        ?actionIRI odrl:refinement ?refinement .
        ?refinement odrl:leftOperand upcast:implementedBy ;
                    odrl:operator odrl:eq ;
                    odrl:rightOperand ?rightOperand .
        }
        """
        qres = self.contract_graph.query(query,initBindings={'actionValue': URIRef(actionValue)})
        qres_list = list(qres)

        return str(qres_list[0]["rightOperand"]) if len(qres) > 0 else None

    def get_action_execution_command(self,actionValue):
        """
        Input: action Value, i.e. , its name in String format
        Output: URL of the container that implements the input action IRI, None if not specified in the contract
        """

        if self.contract_graph is None:
            raise Exception("No contract loaded into this parser")

        query = """
        PREFIX odrl: <http://www.w3.org/ns/odrl/2/>
        PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
        PREFIX upcast: <https://www.upcast-project.eu/upcast-vocab/1.0/>

        SELECT ?rightOperand
        WHERE {
        ?actionIRI rdf:value ?actionValue .
        ?actionIRI odrl:refinement ?refinement .
        ?refinement odrl:leftOperand upcast:executionCommand ;
                    odrl:operator odrl:eq ;
                    odrl:rightOperand ?rightOperand .
        }
        """
        qres = self.contract_graph.query(query,initBindings={'actionValue': URIRef(actionValue)})
        qres_list = list(qres)

        return str(qres_list[0]["rightOperand"]) if len(qres) > 0 else None

    def get_action_execution_limits(self,actionValue):
        """
        input: actionValue, that is the name of the action in string format
        output: list of tuple of the form (operator, rightOperand) where operator is the comparison odrl operator (eq,lteq,gteq,gt) and rightOperand the integer value.
                An empty list is returned if the action does not have any execution limit
        """
        if self.contract_graph is None:
            raise Exception("No contract loaded into this parser")

        query = """
        PREFIX odrl: <http://www.w3.org/ns/odrl/2/>
        PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
        PREFIX upcast: <https://www.upcast-project.eu/upcast-vocab/1.0/>

        SELECT ?operator ?rightOperand
        WHERE {
        ?actionIRI rdf:value ?actionValue .
        ?actionIRI odrl:refinement ?refinement .
        ?refinement odrl:leftOperand odrl:count ;
                    odrl:operator ?operator;
                    odrl:rightOperand ?rightOperand .
        }
        """
        qres = self.contract_graph.query(query,initBindings={'actionValue': URIRef(actionValue)})
        limits = []
        for row in qres:
            operator = str(row["operator"]).split("/")[-1]
            value = int(row["rightOperand"])
            limits.append((operator,value))

        return limits

    def get_action_carbon_emission_limit(self,actionValue):
        """
        input: actionValue, that is the name of the action in string format
        output: Float value of maximum carbon emission agreed for this operation (that is, operator less ro equal than is assumed)
          returns None if there is no carbon emission limit defined in the contract
        """
        query = """
        PREFIX odrl: <http://www.w3.org/ns/odrl/2/>
        PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
        PREFIX upcast: <https://www.upcast-project.eu/upcast-vocab/1.0/>

        SELECT ?rightOperand ?unit
        WHERE {
        ?actionIRI rdf:value ?actionValue .
        ?actionIRI odrl:constraint ?constraint .
        ?constraint odrl:leftOperand upcast:operationCarbonEmission ;
                    odrl:operator odrl:lteq;
                    odrl:rightOperand ?rightOperand;
                    odrl:unit ?unit  .
        }
        """
        qres = list(self.contract_graph.query(query,initBindings={'actionValue': URIRef(actionValue)}))
        if len(qres) == 0:
            return None
        result = qres[0]
        return (result["rightOperand"].toPython(),result["unit"].toPython())

    def get_action_energy_consumption_limit(self,actionValue):
        """
        input: actionValue, that is the name of the action in string format
        output: tuple (value,unit), with value a float of maximum energy consumption agreed for this operation (that is, operator less ro equal than is assumed)
             unit a string with the unit of the value
        """
        query = """
        PREFIX odrl: <http://www.w3.org/ns/odrl/2/>
        PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
        PREFIX upcast: <https://www.upcast-project.eu/upcast-vocab/1.0/>

        SELECT ?rightOperand ?unit
        WHERE {
        ?actionIRI rdf:value ?actionValue .
        ?actionIRI odrl:constraint ?constraint .
        ?constraint odrl:leftOperand upcast:operationEnergyConsumption ;
                    odrl:operator odrl:lteq;
                    odrl:rightOperand ?rightOperand ;
                    odrl:unit ?unit  .
        }
        """
        qres = self.contract_graph.query(query,initBindings={'actionValue': URIRef(actionValue)})
        qres = list(self.contract_graph.query(query,initBindings={'actionValue': URIRef(actionValue)}))
        if len(qres) == 0:
            return None
        result = qres[0]
        return (result["rightOperand"].toPython(),result["unit"].toPython())

    def get_action_datetime_constraints(self,actionValue):
        """
        input: actionValue, that is the name of the action in string format
        output: list of tuple of the form (operator, datetime) where rule is one of {Permission,Prohibition,Duty}, operator is the comparison odrl operator (eq,lt,lteq,gteq,gt) and datetime is the constrained datetime.
                An empty list is returned if the action does not have any datetime constraint.
        """
        query = """
        PREFIX odrl: <http://www.w3.org/ns/odrl/2/>
        PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
        PREFIX upcast: <https://www.upcast-project.eu/upcast-vocab/1.0/>

        SELECT ?rule ?operator ?rightOperand
        WHERE {
        ?policy a ?type .
        VALUES ?type { odrl:Agreement odrl:Policy odrl:Set odrl:Offer }
        ?policy ?rule ?rulesetBnode .
        ?rulesetBnode odrl:action ?actionIRI .
        ?actionIRI rdf:value ?actionValue .
        ?rulesetBnode odrl:constraint ?constraint .
        ?constraint odrl:leftOperand odrl:dateTime ;
                    odrl:operator ?operator;
                    odrl:rightOperand ?rightOperand .
        }
        """
        qres = self.contract_graph.query(query,initBindings={'actionValue': URIRef(actionValue)})
        limits = []
        for row in qres:
            rule = str(row["rule"]).split("/")[-1]
            operator = str(row["operator"]).split("/")[-1]
            value = row["rightOperand"].toPython()
            limits.append((operator,value))
        return limits

    def get_action_dependencies(self,actionValue):
        """
        input: actionValue, that is the name of the action in string format
        output: list of actions that must be executed before the input action according to the loaded contract
        """
        if self.contract_graph is None:
            raise Exception("No contract loaded into this parser")

        # Note the permission and not the action is the one that has a duty
        query = """
        PREFIX odrl: <http://www.w3.org/ns/odrl/2/>
        PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
        PREFIX upcast: <https://www.upcast-project.eu/upcast-vocab/1.0/>

        SELECT ?dependencyValue
        WHERE {
        ?agreement a ?type ;
             odrl:permission ?permission .
        VALUES ?type { odrl:Agreement odrl:Policy odrl:Set odrl:Offer }
        ?permission odrl:action ?actionIRI .
        ?actionIRI rdf:value ?actionValue .
        ?permission odrl:duty ?dutyIRI .
        ?dutyIRI odrl:action ?dependencyIRI .
        ?dependencyIRI rdf:value ?dependencyValue .
        }
        """

        qres = self.contract_graph.query(query,initBindings={'actionValue': URIRef(actionValue)})
        dependencies = [str(row["dependencyValue"]) for row in qres]
        return dependencies






## 5.2) Custom Test of UPCAST parser

In [None]:
from google.colab import files

# Prompt user to upload a file
print("Please upload your ODRL file:")
uploaded = files.upload()

# Get the uploaded filename
file_path = list(uploaded.keys())[0]

# Initialize the parser and load the ODRL file
parser = ContractParser()
parser.load(file_path)

# Fetch and display contract URI
contract_uri = parser.get_contract_uri()
print("Contract URI:", contract_uri)

# Fetch and display provider
provider = parser.get_provider()
print("Provider:", provider)

# Fetch and display consumer
consumer = parser.get_consumer()
print("Consumer:", consumer)

# Fetch and display permitted actions
permitted_actions = parser.get_permitted_actions()
print("\nPermitted Actions:")
for action in permitted_actions:
    print("-", action)

# Fetch and display prohibited actions
prohibited_actions = parser.get_prohibited_actions()
print("\nProhibited Actions:")
for action in prohibited_actions:
    print("-", action)

# For each permitted action, fetch and display further details
for action in permitted_actions:
    print(f"\nDetails for action: {action}")

    container = parser.get_action_container(action)
    print("  Container:", container)

    exec_command = parser.get_action_execution_command(action)
    print("  Execution Command:", exec_command)

    exec_limits = parser.get_action_execution_limits(action)
    print("  Execution Limits:", exec_limits)

    carbon_limit = parser.get_action_carbon_emission_limit(action)
    print("  Carbon Emission Limit:", carbon_limit)

    energy_limit = parser.get_action_energy_consumption_limit(action)
    print("  Energy Consumption Limit:", energy_limit)

    datetime_constraints = parser.get_action_datetime_constraints(action)
    print("  DateTime Constraints:", datetime_constraints)

    dependencies = parser.get_action_dependencies(action)
    print("  Dependencies:", dependencies)


Please upload your ODRL file:


Saving policy3 to policy3 (1)
Contract URI: None
Provider: None
Consumer: None

Permitted Actions:
- https://www.cactusweb.gr/vocab/Report_Creation
- http://www.w3.org/ns/odrl/2/use
- https://www.cactusweb.gr/vocab/Customer_Analysis
- https://www.cactusweb.gr/vocab/Marketing_Analysis
- https://www.cactusweb.gr/vocab/LLM_Processing
- https://www.cactusweb.gr/vocab/Financial_Analysis
- https://w3id.org/dpv/owl#Store

Prohibited Actions:

Details for action: https://www.cactusweb.gr/vocab/Report_Creation
  Container: None
  Execution Command: None
  Execution Limits: []
  Carbon Emission Limit: None
  Energy Consumption Limit: None
  DateTime Constraints: [('lteq', datetime.date(2025, 12, 8))]
  Dependencies: []

Details for action: http://www.w3.org/ns/odrl/2/use
  Container: None
  Execution Command: None
  Execution Limits: []
  Carbon Emission Limit: None
  Energy Consumption Limit: None
  DateTime Constraints: [('lteq', datetime.date(2026, 3, 8))]
  Dependencies: []

Details for acti

In [None]:
from rdflib import Graph, URIRef


parser = ContractParser()
parser.load(file_path)

provider = parser.get_provider()
print(f'Provider: {provider}')

consumer = parser.get_consumer()
print(f'Consumer: {consumer}')

print('Permitted Actions')
permitted_actions = parser.get_permitted_actions()
#permitted_actions = set(map(lambda x: x[1], permitted_actions))
print(permitted_actions)

print(f'Container for {"http://www.w3.org/ns/odrl/2/anonymize"}: ', end='')
action_container = parser.get_action_container(actionValue="http://www.w3.org/ns/odrl/2/anonymize")
print(action_container)

print(f'Action Execution Limit for {"http://www.w3.org/ns/odrl/2/anonymize"}')
action_execution_limit = parser.get_action_execution_limits(actionValue="http://www.w3.org/ns/odrl/2/anonymize")
print(action_execution_limit)

print(f'Action Execution Limit for {"http://www.w3.org/ns/odrl/2/aggregate"}')
action_execution_limit = parser.get_action_execution_limits(actionValue="http://www.w3.org/ns/odrl/2/aggregate")
print(action_execution_limit)

print(f'Action Execution Limit for {"https://www.upcast-project.eu/upcast-vocab/1.0/Integrate"}')
action_execution_limit = parser.get_action_execution_limits(actionValue="https://www.upcast-project.eu/upcast-vocab/1.0/Integrate")
print(action_execution_limit)

print(f'Datetime Constraint for {"http://www.w3.org/ns/odrl/2/anonymize"}')
action_datetime_constraints = parser.get_action_datetime_constraints(actionValue="http://www.w3.org/ns/odrl/2/anonymize")
print(action_datetime_constraints)

print(f'Datetime Constraint for {"http://www.w3.org/ns/odrl/2/aggregate"}')
action_datetime_constraints = parser.get_action_datetime_constraints(actionValue="http://www.w3.org/ns/odrl/2/aggregate")
print(action_datetime_constraints)

print(f'Datetime Constraint for {"http://www.w3.org/ns/odrl/2/use"}')
action_datetime_constraints = parser.get_action_datetime_constraints(actionValue="http://www.w3.org/ns/odrl/2/use")
print(action_datetime_constraints)

print(f'Datetime Constraint for {"https://www.upcast-project.eu/upcast-vocab/1.0/Integrate"}')
action_datetime_constraints = parser.get_action_datetime_constraints(actionValue="https://www.upcast-project.eu/upcast-vocab/1.0/Integrate")
print(action_datetime_constraints)

print(f'Energy Consumption Limit for {"https://www.upcast-project.eu/upcast-vocab/1.0/Integrate"}')
action_energy_limit = parser.get_action_energy_consumption_limit(actionValue="https://www.upcast-project.eu/upcast-vocab/1.0/Integrate")
print(action_energy_limit)

print(f'Energy Consumption Limit for {"http://www.w3.org/ns/odrl/2/anonymize"}')
action_energy_limit = parser.get_action_energy_consumption_limit(actionValue="http://www.w3.org/ns/odrl/2/anonymize")
print(action_energy_limit)

print(f'Carbon Emission Limit for {"http://www.w3.org/ns/odrl/2/aggregate"}')
action_carbon_limit = parser.get_action_carbon_emission_limit(actionValue="http://www.w3.org/ns/odrl/2/aggregate")
print(action_carbon_limit)

print(f'Carbon Emission Limit for {"http://www.w3.org/ns/odrl/2/anonymize"}')
action_carbon_limit = parser.get_action_carbon_emission_limit(actionValue="http://www.w3.org/ns/odrl/2/anonymize")
print(action_carbon_limit)

print(f'Action Dependencies for {"http://www.w3.org/ns/odrl/2/aggregate"}')
action_dependencies = parser.get_action_dependencies(actionValue="http://www.w3.org/ns/odrl/2/aggregate")
print(action_dependencies)

print(f'Action Dependencies for {"https://www.upcast-project.eu/upcast-vocab/1.0/Integrate"}')
action_dependencies = parser.get_action_dependencies(actionValue="https://www.upcast-project.eu/upcast-vocab/1.0/Integrate")
print(action_dependencies)


Provider: None
Consumer: None
Permitted Actions
{'https://www.cactusweb.gr/vocab/Report_Creation', 'http://www.w3.org/ns/odrl/2/use', 'https://www.cactusweb.gr/vocab/Customer_Analysis', 'https://www.cactusweb.gr/vocab/Marketing_Analysis', 'https://www.cactusweb.gr/vocab/LLM_Processing', 'https://www.cactusweb.gr/vocab/Financial_Analysis', 'https://w3id.org/dpv/owl#Store'}
Container for http://www.w3.org/ns/odrl/2/anonymize: None
Action Execution Limit for http://www.w3.org/ns/odrl/2/anonymize
[]
Action Execution Limit for http://www.w3.org/ns/odrl/2/aggregate
[]
Action Execution Limit for https://www.upcast-project.eu/upcast-vocab/1.0/Integrate
[]
Datetime Constraint for http://www.w3.org/ns/odrl/2/anonymize
[]
Datetime Constraint for http://www.w3.org/ns/odrl/2/aggregate
[]
Datetime Constraint for http://www.w3.org/ns/odrl/2/use
[('permission', 'lteq', datetime.date(2026, 3, 8))]
Datetime Constraint for https://www.upcast-project.eu/upcast-vocab/1.0/Integrate
[]
Energy Consumption Lim