shrink_ttl_to_schema_plus_samples.py

Reduce a TTL file to "schema + tiny instance samples":
- Keep ALL schema triples (RDFS/OWL axioms & declarations).
- For instance data, keep up to:
    * MAX_TRIPLES_PER_PREDICATE non-schema triples per predicate, and
    * MAX_INSTANCES_PER_CLASS rdf:type assertions per class.
- Optionally keep labels/comments for any subject that survives.

Deterministic selection (stable sorting). Configure paths/limits below.

In [4]:
# =========================
# CONFIG — EDIT HERE ONLY
# =========================
file = "big_one-test.ttl"
INPUT_TTL  = file
OUTPUT_TTL = "big_schema_for_prefix.ttl"

# Instance sampling limits
MAX_TRIPLES_PER_PREDICATE = 1    # e.g., 1 or 2
MAX_INSTANCES_PER_CLASS   = 1    # e.g., 1 or 2

# Keep helpful annotations (labels/comments) for any resource in the final graph
KEEP_LABELS_FOR_KEPT_RESOURCES   = True

# Add SHACL schema constructs to the “always keep” list?
INCLUDE_SHACL_SCHEMA = True

In [5]:
from rdflib import Graph, URIRef, BNode, Literal
from rdflib.namespace import RDF, RDFS, OWL, XSD, Namespace
from typing import Tuple, Set, Dict, Iterable, DefaultDict
from collections import defaultdict

Triple = Tuple[object, object, object]

# Namespaces (optional SHACL support)
SH = Namespace("http://www.w3.org/ns/shacl#")

# --- schema recognizers ---
SCHEMA_PREDICATES = {
    # RDFS / RDF schema-level links
    RDFS.domain, RDFS.range, RDFS.subClassOf, RDFS.subPropertyOf, RDFS.seeAlso,
    # OWL schema axioms
    OWL.equivalentClass, OWL.equivalentProperty, OWL.inverseOf, OWL.disjointWith,
    OWL.propertyChainAxiom, OWL.onProperty, OWL.allValuesFrom, OWL.someValuesFrom,
    OWL.hasValue, OWL.cardinality, OWL.minCardinality, OWL.maxCardinality,
    OWL.qualifiedCardinality, OWL.minQualifiedCardinality, OWL.maxQualifiedCardinality,
    OWL.unionOf, OWL.intersectionOf, OWL.complementOf, OWL.hasKey,
}

SCHEMA_TYPES = {
    # RDFS / RDF
    RDFS.Class, RDF.Property,
    # OWL classes & properties
    OWL.Class, OWL.Ontology, OWL.Restriction,
    OWL.ObjectProperty, OWL.DatatypeProperty, OWL.AnnotationProperty,
    OWL.FunctionalProperty, OWL.InverseFunctionalProperty,
    OWL.SymmetricProperty, OWL.TransitiveProperty, OWL.ReflexiveProperty,
    OWL.IrreflexiveProperty, OWL.AsymmetricProperty,
}

# SHACL schema bits (optional)
if INCLUDE_SHACL_SCHEMA:
    SCHEMA_PREDICATES |= {
        SH.targetClass, SH.targetNode, SH.targetObjectsOf, SH.targetSubjectsOf,
        SH.path, SH.property, SH.node, SH.datatype, SH.class_,
        SH.minCount, SH.maxCount, SH.minInclusive, SH.maxInclusive, SH.minExclusive, SH.maxExclusive,
        SH.pattern, SH.flags, SH.description, SH.name, SH.in_,
        SH.and_, SH.or_, SH.xone, SH.not_, SH.hasValue, SH.qualifiedValueShape,
        SH.qualifiedMinCount, SH.qualifiedMaxCount, SH.uniqueLang
    }
    SCHEMA_TYPES |= {
        SH.Shape, SH.NodeShape, SH.PropertyShape, SH.ValidationReport, SH.ValidationResult
    }

def is_entity(n) -> bool:
    return isinstance(n, (URIRef, BNode))

def schema_triple(g: Graph, s, p, o) -> bool:
    # 1) Any schema-signaling predicate
    if p in SCHEMA_PREDICATES:
        return True
    # 2) Types that declare schema-y resources
    if p == RDF.type and o in SCHEMA_TYPES:
        return True
    # 3) Ontology-level annotations (label/comment) if s is an OWL.Ontology
    if p in (RDFS.label, RDFS.comment) and (s, RDF.type, OWL.Ontology) in g:
        return True
    return False

# --- deterministic scoring helpers for picking "nice" examples ---
def score_instance_triple(t: Triple) -> Tuple:
    """Prefer entity-rich triples; then subject URIs over BNodes; then lexical."""
    s, p, o = t
    entity_count = int(is_entity(s)) + int(is_entity(o))
    subj_uri = int(isinstance(s, URIRef))
    return (-entity_count, -subj_uri, str(s), str(p), str(o))

def score_type_assertion(t: Triple) -> Tuple:
    """Prefer subjects that are URIs, then lexical."""
    s, p, o = t
    subj_uri = int(isinstance(s, URIRef))
    return (-subj_uri, str(s), str(o))

def entities_in_triples(ts: Iterable[Triple]) -> Set[object]:
    out = set()
    for s, p, o in ts:
        if is_entity(s): out.add(s)
        if is_entity(o): out.add(o)
    return out

def main():
    src = Graph()
    src.parse(INPUT_TTL, format="turtle")

    picked: Set[Triple] = set()

    # -----------------------------
    # 1) KEEP ALL SCHEMA TRIPLES
    # -----------------------------
    for s, p, o in src.triples((None, None, None)):
        if schema_triple(src, s, p, o):
            picked.add((s, p, o))

    # -----------------------------------------------------
    # 2) SAMPLE INSTANCE rdf:type assertions PER CLASS
    # -----------------------------------------------------
    # Collect all type assertions that are NOT declaring schema resources
    by_class: DefaultDict[object, list] = defaultdict(list)
    for s, p, o in src.triples((None, RDF.type, None)):
        # If this type assertion declares a schema thing (o in SCHEMA_TYPES), it's already picked
        if (s, p, o) in picked:
            continue
        if o in SCHEMA_TYPES:
            # already covered by "schema_triple"; skip (to avoid double counting)
            continue
        by_class[o].append((s, p, o))

    for cls, assertions in by_class.items():
        assertions_sorted = sorted(assertions, key=score_type_assertion)
        for t in assertions_sorted[:MAX_INSTANCES_PER_CLASS]:
            picked.add(t)

    # -----------------------------------------------------
    # 3) SAMPLE NON-SCHEMA TRIPLES PER PREDICATE
    # -----------------------------------------------------
    by_pred: DefaultDict[object, list] = defaultdict(list)
    for s, p, o in src.triples((None, None, None)):
        if (s, p, o) in picked:
            continue
        # Skip schema-signaling predicates entirely here; they’re already preserved.
        if p in SCHEMA_PREDICATES:
            continue
        # # Also avoid re-adding type assertions already considered above
        # if p == RDF.type:
        #     continue
        by_pred[p].append((s, p, o))

    for pred, triples_ in by_pred.items():
        triples_sorted = sorted(triples_, key=score_instance_triple)
        for t in triples_sorted[:MAX_TRIPLES_PER_PREDICATE]:
            picked.add(t)

    # -----------------------------------------------------
    # 4) OPTIONAL: keep labels/comments for resources we kept
    # -----------------------------------------------------
    if KEEP_LABELS_FOR_KEPT_RESOURCES:
        kept_entities = entities_in_triples(picked)
        for ent in sorted(kept_entities, key=str):
            for p in (RDFS.label, RDFS.comment):
                for _, _, lit in src.triples((ent, p, None)):
                    # ensure we only keep literal annotations
                    if isinstance(lit, Literal):
                        picked.add((ent, p, lit))

    # -----------------------------------------------------
    # 5) BUILD & WRITE OUTPUT
    # -----------------------------------------------------
    out = Graph()
    # Useful prefix bindings
    for prefix, namespace in src.namespace_manager.namespaces():
        print(prefix, namespace)
        out.bind(prefix, namespace)
    out.bind("rdf", RDF)
    out.bind("rdfs", RDFS)
    out.bind("owl", OWL)
    out.bind("xsd", XSD)
    if INCLUDE_SHACL_SCHEMA:
        out.bind("sh", SH)

    for t in sorted(picked, key=lambda x: (str(x[0]), str(x[1]), str(x[2]))):
        out.add(t)

    out.serialize(destination=OUTPUT_TTL, format="turtle")

    # Small console summary
    print("=== Shrink complete ===")
    print(f"Source triples:  {len(src)}")
    print(f"Kept triples:    {len(out)}")
    print(f"Output file:     {OUTPUT_TTL}")
    print(f"- Schema preserved: YES")
    print(f"- Instance limits:  {MAX_TRIPLES_PER_PREDICATE} per predicate, {MAX_INSTANCES_PER_CLASS} per class")
    if KEEP_LABELS_FOR_KEPT_RESOURCES:
        print("- Labels/comments retained for kept resources")


In [None]:
main()