In [8]:
from rdflib import Graph, RDF
from rdflib.namespace import SH
from collections import defaultdict
from itertools import zip_longest
import rdflib

In [2]:
cm_g = Graph().parse("/Users/duanxuemin/Desktop/PhD/projects/cm2shacl/evaluation/standardForms/package_F03/cm_shacl.ttl", format="turtle")

rml_g = Graph().parse("/Users/duanxuemin/Desktop/PhD/projects/cm2shacl/evaluation/standardForms/package_F03/rml_shacl.ttl", format="turtle")

owl_g = Graph().parse("/Users/duanxuemin/Desktop/PhD/projects/cm2shacl/evaluation/standardForms/package_F03/owl_shacl_rewrite.ttl", format="turtle")

In [4]:
def extract_constraint_tuple(graph, shape):
    """
    Extract a single (datatype, nodeKind, class) constraint.
    If no_or=True: treat shape as atomic.
    """
    d = graph.value(subject=shape, predicate=SH.datatype)
    k = graph.value(subject=shape, predicate=SH.nodeKind)
    c = graph.value(subject=shape, predicate=SH["class"])

    # sh:node 替代 class
    if not c:
        node = graph.value(subject=shape, predicate=SH.node)
        if node:
            c = graph.value(subject=node, predicate=SH["class"])

    return (
        str(d) if d else None,
        str(k) if k else None,
        str(c) if c else None,
    )

In [3]:
def extract_constraints_general(graph, source_type):
    """
    For each (targetClass, path), extract all constraints.
    Return: dict mapping (class, path) -> set of (datatype, nodeKind, class)
    """
    shape_map = defaultdict(list) 
    # Step 1: 构建 (targetClass, path) 到 shape 的映射
    for s,_,cls in graph.triples((None,SH.targetClass,None)):
            for ps in graph.objects(s, SH.property):
                    path = graph.value(ps, SH.path)
                    if path:
                        shape_map[(cls, path)].append(ps)
            path = graph.value(s, SH.path)
            if path:
                shape_map[(cls, path)].append(s)
    constraint_map = {}
    
    for (cls, path), shape_list in shape_map.items():
        datatype_set = set()
        nodekind_set = set()
        class_set = set()
        for shape in shape_list:
            d, k, c = extract_constraint_tuple(graph, shape)
            if d: datatype_set.add(d)
            if k: nodekind_set.add(k)
            if c: class_set.add(c)
            or_node = graph.value(subject=shape, predicate=SH["or"])
            if or_node:
                for alt in graph.items(or_node):
                    d, k, c = extract_constraint_tuple(graph, alt)
                    if d: datatype_set.add(d)
                    if k: nodekind_set.add(k)
                    if c: class_set.add(c)
        constraint_map[(cls, path)] = {SH.datatype: datatype_set,
                                        SH.nodeKind: nodekind_set,
                                        SH["class"]: class_set}
    return constraint_map

In [5]:
m = extract_constraints_general(cm_g, "cm")

In [22]:
m

{(rdflib.term.URIRef('http://data.europa.eu/a4g/ontology#OrganisationGroup'),
  rdflib.term.URIRef('http://data.europa.eu/a4g/ontology#hasMember')): {rdflib.term.URIRef('http://www.w3.org/ns/shacl#datatype'): set(),
  rdflib.term.URIRef('http://www.w3.org/ns/shacl#nodeKind'): {'http://www.w3.org/ns/shacl#IRI'},
  rdflib.term.URIRef('http://www.w3.org/ns/shacl#class'): {'http://www.w3.org/ns/org#Organization'}},
 (rdflib.term.URIRef('http://data.europa.eu/a4g/ontology#ProcurementProcessInformation'),
  rdflib.term.URIRef('http://data.europa.eu/a4g/ontology#isDPSTerminated')): {rdflib.term.URIRef('http://www.w3.org/ns/shacl#datatype'): {'http://www.w3.org/2001/XMLSchema#boolean'},
  rdflib.term.URIRef('http://www.w3.org/ns/shacl#nodeKind'): {'http://www.w3.org/ns/shacl#Literal'},
  rdflib.term.URIRef('http://www.w3.org/ns/shacl#class'): set()},
 (rdflib.term.URIRef('http://data.europa.eu/a4g/ontology#ProcurementProcessInformation'),
  rdflib.term.URIRef('http://data.europa.eu/a4g/ontolog

In [21]:
c = {"rml":{"datatype":"A"},"cm":{"datatype":"A"},"owl":{"datatype":"C"}}
shared_keys = set.intersection(*(set(cmap.keys()) for cmap in c.values()))
shared_keys

{'datatype'}

In [11]:
a = m[(rdflib.term.URIRef('http://data.europa.eu/a4g/ontology#OrganisationGroup'),
  rdflib.term.URIRef('http://data.europa.eu/a4g/ontology#hasMember'))][rdflib.term.URIRef('http://www.w3.org/ns/shacl#nodeKind')]

In [17]:
m[(rdflib.term.URIRef('http://data.europa.eu/a4g/ontology#OrganisationGroup'),
  rdflib.term.URIRef('http://data.europa.eu/a4g/ontology#hasMember'))].values()

dict_values([set(), {'http://www.w3.org/ns/shacl#IRI'}, {'http://www.w3.org/ns/org#Organization'}])

In [16]:
'http://www.w3.org/ns/shacl#IRI' in a

True

In [22]:
def extract_constraint_tuple(graph, shape, no_or=False):
    """
    Extract a single (datatype, nodeKind, class) constraint.
    If no_or=True: treat shape as atomic.
    """
    d = graph.value(subject=shape, predicate=SH.datatype)
    k = graph.value(subject=shape, predicate=SH.nodeKind)
    c = graph.value(subject=shape, predicate=SH["class"])

    # sh:node 替代 class
    if not c:
        node = graph.value(subject=shape, predicate=SH.node)
        if node:
            c = graph.value(subject=node, predicate=SH["class"])

    return (
        str(d) if d else None,
        str(k) if k else None,
        str(c) if c else None,
    )


In [41]:
from rdflib import RDF, SH
from collections import defaultdict

def extract_constraints_from_graph(graph, source_type="CM"):
    """
    For each (targetClass, path), extract all constraints.
    Return: dict mapping (class, path) -> set of (datatype, nodeKind, class)
    """
    shape_map = defaultdict(list)

    # Step 1: 构建 (targetClass, path) 到 shape 的映射
    for s,_,cls in graph.triples((None,SH.targetClass,None)):
            for ps in graph.objects(s, SH.property):
                    path = graph.value(ps, SH.path)
                    if path:
                        shape_map[(str(cls), str(path))].append(ps)
            path = graph.value(s, SH.path)
            if path:
                    shape_map[(str(cls), str(path))].append(s)

    # Step 2: 每个 (class, path) 分别处理不同来源
    constraint_map = {}


    for (cls, path), shape_list in shape_map.items():
        constraint_set = set()

        if source_type in {"CM", "RML"}:
            for shape in shape_list:
                or_node = graph.value(subject=shape, predicate=SH["or"])
                or_group = []
                if or_node:
                    for alt in graph.items(or_node):
                        d, k, c = extract_constraint_tuple(graph, alt)
                        single = []
                        if d: single.append((SH.datatype, d))
                        if k: single.append((SH.nodeKind, k))
                        if c: single.append((SH["class"], c))
                        if single:
                            or_group.append(single)
                d, k, c = extract_constraint_tuple(graph, shape)
                hashable_or_group = tuple(tuple(group) for group in or_group)
                constraint_set.add((d, k, c, hashable_or_group))

        else:  # OWL special case
            datatype_set = set()
            nodekind_set = set()
            class_set = set()

            for shape in shape_list:
                d, k, c = extract_constraint_tuple(graph, shape)
                if d: datatype_set.add(d)
                if k: nodekind_set.add(k)
                if c: class_set.add(c)

            or_group = []

            # 单个值保留在前三元组，多值则放入 or 组中
            d_val = next(iter(datatype_set)) if len(datatype_set) == 1 else None
            k_val = next(iter(nodekind_set)) if len(nodekind_set) == 1 else None
            c_val = next(iter(class_set)) if len(class_set) == 1 else None

            if len(datatype_set) > 1:
                or_group.append([(SH.datatype, d) for d in datatype_set])
            if len(nodekind_set) > 1:
                or_group.append([(SH.nodeKind, k) for k in nodekind_set])
            if len(class_set) > 1:
                or_group.append([(SH["class"], c) for c in class_set])

            hashable_or_group = tuple(tuple(group) for group in or_group)
            constraint_set.add((d_val, k_val, c_val, hashable_or_group))

        constraint_map[(cls, path)] = constraint_set
    return constraint_map

In [53]:
graph = rml_g
c_rml = extract_constraints_from_graph(graph, source_type="RML")

In [54]:
graph = cm_g
c_cm = extract_constraints_from_graph(graph, source_type="CM")

In [55]:
graph = owl_g
c_owl = extract_constraints_from_graph(graph, source_type="OWL")

In [63]:
c2 = c_cm[('http://data.europa.eu/a4g/ontology#Notice', 'http://data.europa.eu/a4g/ontology#announcesRole')]

In [62]:
c1 = c_rml[('http://data.europa.eu/a4g/ontology#ResultNotice', 'http://data.europa.eu/a4g/ontology#announcesRole')]

In [70]:
for c11 in c1:
    for c22 in c2:
        print("OKK")
        diff = compare_constraints_detailed(c11, c22)

OKK


In [71]:
diff

[('or',
  [[rdflib.term.URIRef('http://www.w3.org/ns/shacl#class'),
    'http://data.europa.eu/a4g/ontology#TenderReceiver']],
  'present',
  None),
 ('or',
  [[rdflib.term.URIRef('http://www.w3.org/ns/shacl#class'),
    'http://data.europa.eu/a4g/ontology#ProcurementProcedureInformationProvider']],
  'present',
  None),
 ('or',
  [[rdflib.term.URIRef('http://www.w3.org/ns/shacl#class'),
    'http://data.europa.eu/a4g/ontology#Winner']],
  'present',
  None)]

In [None]:
compare_constraints_detailed(c1, c2)

ValueError: not enough values to unpack (expected 4, got 1)

In [61]:

def compare_constraints_detailed(c1, c2):
    """
    比较两个 constraint 四元组 (d, k, c, or) 结构，返回差异字段及其值。
    """
    d1, k1, c1_val, or1 = c1
    d2, k2, c2_val, or2 = c2

    diffs = []

    if d1 != d2:
        if d1 is not None and d1 != d2:
            diffs.append(("datatype", d1, "present", None))
        if d2 is not None and d2 != d1:
            diffs.append(("datatype", d2, "missing", None))

    if k1 != k2:
        if k1 is not None and k1 != k2:
            diffs.append(("nodeKind", k1, "present", None))
        if k2 is not None and k2 != k1:
            diffs.append(("nodeKind", k2, "missing", None))

    if c1_val != c2_val:
        if c1_val is not None and c1_val != c2_val:
            diffs.append(("class", c1_val, "present", None))
        if c2_val is not None and c2_val != c1_val:
            diffs.append(("class", c2_val, "missing", None))

    # 处理 or group：集合中只要某组缺失就算差异（比较不考虑顺序）
    or1_sets = set(frozenset(group) for group in or1) if or1 else set()
    or2_sets = set(frozenset(group) for group in or2) if or2 else set()

    extra_in_1 = or1_sets - or2_sets
    extra_in_2 = or2_sets - or1_sets

    for group in extra_in_1:
        diffs.append(("or", [list(i) for i in group], "present", None))
    for group in extra_in_2:
        diffs.append(("or", [list(i) for i in group], "missing", None))

    return diffs

In [52]:
c[('http://data.europa.eu/a4g/ontology#ResultNotice', 'http://data.europa.eu/a4g/ontology#announcesRole')]

{(None,
  'http://www.w3.org/ns/shacl#IRI',
  None,
  (((rdflib.term.URIRef('http://www.w3.org/ns/shacl#class'),
     'http://data.europa.eu/a4g/ontology#TenderReceiver'),),
   ((rdflib.term.URIRef('http://www.w3.org/ns/shacl#class'),
     'http://data.europa.eu/a4g/ontology#ProcurementProcedureInformationProvider'),),
   ((rdflib.term.URIRef('http://www.w3.org/ns/shacl#class'),
     'http://data.europa.eu/a4g/ontology#ReviewProcedureInformationProvider'),),
   ((rdflib.term.URIRef('http://www.w3.org/ns/shacl#class'),
     'http://data.europa.eu/a4g/ontology#Reviewer'),),
   ((rdflib.term.URIRef('http://www.w3.org/ns/shacl#class'),
     'http://data.europa.eu/a4g/ontology#Winner'),),
   ((rdflib.term.URIRef('http://www.w3.org/ns/shacl#class'),
     'http://data.europa.eu/a4g/ontology#Buyer'),),
   ((rdflib.term.URIRef('http://www.w3.org/ns/shacl#class'),
     'http://data.europa.eu/a4g/ontology#Mediator'),)))}

In [None]:
c[('http://data.europa.eu/a4g/ontology#ResultNotice', 'http://data.europa.eu/a4g/ontology#announcesRole')]:

(None, 'http://www.w3.org/ns/shacl#IRI', None, (((rdflib.term.URIRef('http://www.w3.org/ns/shacl#class'), 'http://data.europa.eu/a4g/ontology#EnvironmentalProtectionInformationProvider'), (rdflib.term.URIRef('http://www.w3.org/ns/shacl#class'), 'http://data.europa.eu/a4g/ontology#CentralPurchasingBody'), (rdflib.term.URIRef('http://www.w3.org/ns/shacl#class'), 'http://data.europa.eu/a4g/ontology#TenderProcessor'), (rdflib.term.URIRef('http://www.w3.org/ns/shacl#class'), 'http://data.europa.eu/a4g/ontology#Subcontractor'), (rdflib.term.URIRef('http://www.w3.org/ns/shacl#class'), 'http://data.europa.eu/a4g/ontology#TenderReceiver'), (rdflib.term.URIRef('http://www.w3.org/ns/shacl#class'), 'http://data.europa.eu/a4g/ontology#OfflineAccessProvider'), (rdflib.term.URIRef('http://www.w3.org/ns/shacl#class'), 'http://data.europa.eu/a4g/ontology#EmploymentInformationProvider'), (rdflib.term.URIRef('http://www.w3.org/ns/shacl#class'), 'http://data.europa.eu/a4g/ontology#ParticipationRequestProc

In [79]:
constraints_by_source = {"RML": c_rml[('http://data.europa.eu/a4g/ontology#ResultNotice', 'http://data.europa.eu/a4g/ontology#announcesRole')], "CM": c_cm[('http://data.europa.eu/a4g/ontology#Notice', 'http://data.europa.eu/a4g/ontology#announcesRole')], "OWL": c_owl[('http://data.europa.eu/a4g/ontology#ResultNotice', 'http://data.europa.eu/a4g/ontology#announcesRole')]}
all_constraints = set().union(*constraints_by_source.values())

In [109]:
def decompose_constraint_tuple(constraint):
    """将一个constraint拆解成非or和or部分"""
    for c in constraint:
        cons = c

    d, k, c, or_groups = cons
    components = []

    if d is not None:
        components.append(("sh:datatype", d))
    if k is not None:
        components.append(("sh:nodeKind", k))
    if c is not None:
        components.append(("sh:class", c))

    return components, or_groups or []

In [87]:
for constraint_set in constraints_by_source.values():
            for c in constraint_set:
                    print("here")
                    print(c)
            break
                    

here
(None, 'http://www.w3.org/ns/shacl#IRI', None, (((rdflib.term.URIRef('http://www.w3.org/ns/shacl#class'), 'http://data.europa.eu/a4g/ontology#TenderReceiver'),), ((rdflib.term.URIRef('http://www.w3.org/ns/shacl#class'), 'http://data.europa.eu/a4g/ontology#ProcurementProcedureInformationProvider'),), ((rdflib.term.URIRef('http://www.w3.org/ns/shacl#class'), 'http://data.europa.eu/a4g/ontology#ReviewProcedureInformationProvider'),), ((rdflib.term.URIRef('http://www.w3.org/ns/shacl#class'), 'http://data.europa.eu/a4g/ontology#Reviewer'),), ((rdflib.term.URIRef('http://www.w3.org/ns/shacl#class'), 'http://data.europa.eu/a4g/ontology#Winner'),), ((rdflib.term.URIRef('http://www.w3.org/ns/shacl#class'), 'http://data.europa.eu/a4g/ontology#Buyer'),), ((rdflib.term.URIRef('http://www.w3.org/ns/shacl#class'), 'http://data.europa.eu/a4g/ontology#Mediator'),)))


In [90]:
simple, ors = decompose_constraint_tuple(c)

In [94]:
all_simple = set()
all_or_groups = set()

In [95]:
all_simple.update(simple)
all_or_groups.update(tuple(sorted(g)) for g in ors)

In [104]:
for c in c_rml[("http://data.europa.eu/a4g/ontology#AccessTerm","http://data.europa.eu/a4g/ontology#hasPublicAccessURL")]:
    print(len(c))

4


In [110]:
simple, ors = decompose_constraint_tuple(c_rml[("http://data.europa.eu/a4g/ontology#AccessTerm","http://data.europa.eu/a4g/ontology#hasPublicAccessURL")])

In [112]:
print(c)

(None, 'http://www.w3.org/ns/shacl#Literal', None, ())


In [2]:
import rdflib

In [16]:
i = {(None, 'http://www.w3.org/ns/shacl#IRI', None, (((rdflib.term.URIRef('http://www.w3.org/ns/shacl#class'), 'http://data.europa.eu/a4g/ontology#DirectAwardTerm'), (rdflib.term.URIRef('http://www.w3.org/ns/shacl#class'), 'http://data.europa.eu/a4g/ontology#ProcedureTerm'), (rdflib.term.URIRef('http://www.w3.org/ns/shacl#class'), 'http://data.europa.eu/a4g/ontology#ReviewTerm'), (rdflib.term.URIRef('http://www.w3.org/ns/shacl#class'), 'http://data.europa.eu/a4g/ontology#SubmissionTerm'), (rdflib.term.URIRef('http://www.w3.org/ns/shacl#class'), 'http://data.europa.eu/a4g/ontology#FrameworkAgreementTerm'), (rdflib.term.URIRef('http://www.w3.org/ns/shacl#class'), 'http://data.europa.eu/a4g/ontology#ProcedureSpecificTerm'), (rdflib.term.URIRef('http://www.w3.org/ns/shacl#class'), 'http://data.europa.eu/a4g/ontology#AccessTerm')),))}

In [24]:
i = {(None, 'http://www.w3.org/ns/shacl#IRI', None, (((rdflib.term.URIRef('http://www.w3.org/ns/shacl#class'), 'http://data.europa.eu/m8g/PublicOrganisation'),), ((rdflib.term.URIRef('http://www.w3.org/ns/shacl#class'), 'http://data.europa.eu/a4g/ontology#System'),), ((rdflib.term.URIRef('http://www.w3.org/ns/shacl#class'), 'http://data.europa.eu/a4g/ontology#Business'),), ((rdflib.term.URIRef('http://www.w3.org/ns/shacl#class'), 'http://data.europa.eu/m8g/Person'),), ((rdflib.term.URIRef('http://www.w3.org/ns/shacl#class'), 'http://xmlns.com/foaf/0.1/Person'),), ((rdflib.term.URIRef('http://www.w3.org/ns/shacl#class'), 'http://www.w3.org/ns/org#Organization'),), ((rdflib.term.URIRef('http://www.w3.org/ns/shacl#class'), 'http://xmlns.com/foaf/0.1/Agent'),), ((rdflib.term.URIRef('http://www.w3.org/ns/shacl#class'), 'http://data.europa.eu/a4g/ontology#OrganisationGroup'),)))} 

In [25]:
for c in i:
    c

In [26]:
d, k, c, or_groups = c

In [35]:
for o in or_groups:
    print(tuple([(rdflib.term.URIRef('http://www.w3.org/ns/shacl#class'), 'http://www.w3.org/ns/org#Organization')])==o)

False
False
False
False
False
True
False
False


In [33]:
tuple([(rdflib.term.URIRef('http://www.w3.org/ns/shacl#class'), 'http://www.w3.org/ns/org#Organization')]) in or_groups

True

In [1]:
from rdflib import Graph, RDF

In [13]:
g = Graph().parse("/Users/duanxuemin/Desktop/PhD/projects/cm2shacl/data/eforms/ontology/ePO_core.ttl", format="turtle")