In [1]:
import rdflib
from rdflib.namespace import RDF, XSD, OWL
from typing import Optional

# Load the ontology
ontology_graph = rdflib.Graph()
ontology_graph.parse("ontology.ttl", format="turtle")

# Define the namespace for co_learning (adjust if needed)
CO_LEARNING_NS = rdflib.Namespace("http://example.org/co_learning#")


def validate_class_membership(
    data_graph: rdflib.Graph, class_uri: rdflib.URIRef
) -> bool:
    """
    Validate that all instances claiming to be of a specific class actually belong to
    that class.

    Args:
        data_graph (rdflib.Graph): The graph containing RDF data.
        class_uri (rdflib.URIRef): The URI of the class to check for membership.

    Returns:
        bool: True if all instances belong to the correct class; False if any errors are
        found.
    """
    is_valid = True

    # Check if the class exists in the ontology
    if (class_uri, RDF.type, OWL.Class) not in ontology_graph:
        print(f"Error: Class {class_uri} is not defined in the ontology.")
        return False

    # Check instances of this class in the data graph
    for instance in data_graph.subjects(RDF.type, class_uri):
        if not data_graph.triples((instance, RDF.type, class_uri)):
            print(f"Error: {instance} claims to be of unknown class {class_uri}")
            is_valid = False

    return is_valid


def validate_domain_range(
    data_graph: rdflib.Graph,
    property_uri: rdflib.URIRef,
    domain_uri: rdflib.URIRef,
    range_uri: rdflib.URIRef,
) -> bool:
    """
    Validate that a property is used correctly according to the ontology's domain and
    range.

    Args:
        data_graph (rdflib.Graph): The graph containing RDF data.
        property_uri (rdflib.URIRef): The URI of the property to validate.
        domain_uri (rdflib.URIRef): The URI of the expected domain class.
        range_uri (rdflib.URIRef): The URI of the expected range class.

    Returns:
        bool: True if all properties have correct domain and range; False if any errors
        are found.
    """
    is_valid = True

    for s, o in data_graph.subject_objects(property_uri):
        # Check domain
        if not data_graph.triples((s, RDF.type, domain_uri)):
            print(f"Error: {s} is not of type {domain_uri} for property {property_uri}")
            is_valid = False
        # Check range
        if not data_graph.triples((o, RDF.type, range_uri)):
            print(f"Error: {o} is not of type {range_uri} for property {property_uri}")
            is_valid = False

    return is_valid


def validate_data_property(
    data_graph: rdflib.Graph,
    property_uri: rdflib.URIRef,
    expected_datatype: rdflib.URIRef,
) -> bool:
    """
    Validate that a data property has the correct datatype.

    Args:
        data_graph (rdflib.Graph): The graph containing RDF data.
        property_uri (rdflib.URIRef): The URI of the data property to validate.
        expected_datatype (rdflib.URIRef): The expected datatype (e.g., XSD.integer).

    Returns:
        bool: True if all data properties have the correct datatype; False if any errors
        are found.
    """
    is_valid = True

    for s, o in data_graph.subject_objects(property_uri):
        # Check if the object is a literal and has the expected datatype
        if not (isinstance(o, rdflib.Literal) and o.datatype == expected_datatype):
            print(f"Error: {o} for {property_uri} is not of type {expected_datatype}")
            is_valid = False

    return is_valid


def validate_graph(ttl_file: str) -> bool:
    """
    Validate an RDF graph loaded from a Turtle (.ttl) file against the ontology.

    Args:
        ttl_file (str): Path to the Turtle file to validate.

    Returns:
        bool: True if the graph is valid according to the ontology; False if any errors
        are found.
    """
    # Load the RDF data
    data_graph = rdflib.Graph()
    data_graph.parse(ttl_file, format="turtle")

    is_valid = True

    # Validate class membership for Situation, ActionHuman, ActionRobot
    is_valid &= validate_class_membership(data_graph, CO_LEARNING_NS.Situation)
    is_valid &= validate_class_membership(data_graph, CO_LEARNING_NS.ActionHuman)
    is_valid &= validate_class_membership(data_graph, CO_LEARNING_NS.ActionRobot)

    # Validate object properties (domain and range checks)
    is_valid &= validate_domain_range(
        data_graph,
        CO_LEARNING_NS.hasSituation,
        CO_LEARNING_NS.CollaborationPattern,
        CO_LEARNING_NS.Situation,
    )
    is_valid &= validate_domain_range(
        data_graph,
        CO_LEARNING_NS.hasActionHuman,
        CO_LEARNING_NS.CollaborationPattern,
        CO_LEARNING_NS.ActionHuman,
    )
    is_valid &= validate_domain_range(
        data_graph,
        CO_LEARNING_NS.hasActionRobot,
        CO_LEARNING_NS.CollaborationPattern,
        CO_LEARNING_NS.ActionRobot,
    )

    # Validate data properties (data type checks)
    is_valid &= validate_data_property(data_graph, CO_LEARNING_NS.hasCPNum, XSD.integer)
    is_valid &= validate_data_property(
        data_graph, CO_LEARNING_NS.hasParticipantNumber, XSD.integer
    )
    is_valid &= validate_data_property(
        data_graph, CO_LEARNING_NS.hasCPLabel, XSD.string
    )
    is_valid &= validate_data_property(
        data_graph, CO_LEARNING_NS.hasTimeScore, XSD.integer
    )
    is_valid &= validate_data_property(
        data_graph, CO_LEARNING_NS.hasUnixTimeStamp, XSD.integer
    )

    return is_valid


# Example of running validation for each TTL file in a directory
from glob import glob

for ttl_path in glob("./rdf-data/*.ttl"):
    print(f"Validating: {ttl_path}")
    if validate_graph(ttl_path):
        print(f"{ttl_path} is valid!")
    else:
        print(f"{ttl_path} has validation errors.")
    print("---\n")

Validating: ./rdf-data/077.ttl
./rdf-data/077.ttl is valid!
---

Validating: ./rdf-data/151.ttl
./rdf-data/151.ttl is valid!
---

Validating: ./rdf-data/073.ttl
./rdf-data/073.ttl is valid!
---

Validating: ./rdf-data/152.ttl
./rdf-data/152.ttl is valid!
---

Validating: ./rdf-data/065.ttl
./rdf-data/065.ttl is valid!
---

Validating: ./rdf-data/009.ttl
./rdf-data/009.ttl is valid!
---

Validating: ./rdf-data/008.ttl
./rdf-data/008.ttl is valid!
---

Validating: ./rdf-data/185.ttl
./rdf-data/185.ttl is valid!
---

Validating: ./rdf-data/175.ttl
./rdf-data/175.ttl is valid!
---

Validating: ./rdf-data/158.ttl
./rdf-data/158.ttl is valid!
---

Validating: ./rdf-data/126.ttl
./rdf-data/126.ttl is valid!
---

Validating: ./rdf-data/096.ttl
./rdf-data/096.ttl is valid!
---

Validating: ./rdf-data/169.ttl
./rdf-data/169.ttl is valid!
---

Validating: ./rdf-data/034.ttl
./rdf-data/034.ttl is valid!
---

Validating: ./rdf-data/114.ttl
./rdf-data/114.ttl is valid!
---

Validating: ./rdf-data/13

## Convert the graph into natural langauge sentences

This is a very simple way to "serialize" a knowledge graph

In [None]:
from glob import glob
import rdflib
from tqdm import tqdm

# Define the namespace prefix (adjust if needed)
CO_LEARNING_NS = "http://example.org/co_learning#"


# Helper function to extract literals and form a sentence
def extract_literals_to_sentence(g, node):
    sentence_parts = []

    # Extract the literal values for the relevant properties
    for _, p, o in g.triples((node, None, None)):
        if (
            p.endswith("hasActor")
            or p.endswith("hasAction")
            or p.endswith("hasLocation")
            or p.endswith("hasObject")
        ):
            sentence_parts.append(str(o))  # Add the literal to the sentence parts

    # Join the sentence parts with a space
    return " ".join(sentence_parts)


# Function to convert a graph into three sentences
def graph_to_sentences(ttl_file: str):
    # Load the RDF graph
    g = rdflib.Graph()
    g.parse(ttl_file, format="turtle")

    situation_sentence = []
    action_human_sentence = []
    action_robot_sentence = []

    # Find instances of Situation, ActionHuman, and ActionRobot and generate sentences
    for s, rdf_type in g.subject_objects(rdflib.RDF.type):
        rdf_type_str = str(rdf_type)

        if rdf_type_str == CO_LEARNING_NS + "Situation":
            extracted = extract_literals_to_sentence(g, s)
            if extracted:
                situation_sentence.append(extracted)
        elif rdf_type_str == CO_LEARNING_NS + "ActionHuman":
            extracted = extract_literals_to_sentence(g, s)
            if extracted:
                action_human_sentence.append(extracted)
        elif rdf_type_str == CO_LEARNING_NS + "ActionRobot":
            extracted = extract_literals_to_sentence(g, s)
            if extracted:
                action_robot_sentence.append(extracted)

    situation_sentence = ", ".join(situation_sentence)
    action_human_sentence = ", ".join(action_human_sentence)
    action_robot_sentence = ", ".join(action_robot_sentence)

    return situation_sentence, action_human_sentence, action_robot_sentence


words = []

# Loop through all .ttl files and convert each to three sentences
for ttl_path in tqdm(sorted(glob("./rdf-data/*.ttl"))):
    situation, action_human, action_robot = graph_to_sentences(ttl_path)

    words.append(
        {
            "situation": situation,
            "action_human": action_human,
            "action_robot": action_robot,
        }
    )

    # Print or save the result (example prints for now)
    print(f"Graph: {ttl_path}")
    print(f"Situation: {situation}")
    print(f"ActionHuman: {action_human}")
    print(f"ActionRobot: {action_robot}")
    print("---\n")

## Make bag-of-words feature vectors.

I do this cuz I know that the vocabulary size is quite small

In [None]:
from sklearn.feature_extraction.text import CountVectorizer
import pandas as pd

# Combine the extracted sentences into a single string for each collaboration pattern
corpus = [
    f"{w['situation']}. {w['action_human']}. {w['action_robot']}."
    for w in words
]

# Use CountVectorizer to create the bag-of-words feature matrix
vectorizer = CountVectorizer()
X = vectorizer.fit_transform(corpus)

# Convert the feature matrix into a DataFrame for easy visualization
df_bow = pd.DataFrame(X.toarray(), columns=vectorizer.get_feature_names_out())
df_bow

## Clustering with K-means

Play around with the number of clusters

In [None]:
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score

# Perform K-Means clustering (adjust n_clusters based on your data)
n_clusters = 3  # You can experiment with different numbers of clusters
kmeans = KMeans(n_clusters=n_clusters, random_state=42)
kmeans.fit(X)

# Get the cluster labels for each collaboration pattern
labels = kmeans.labels_

# Calculate the silhouette score to evaluate the clustering
silhouette_avg = silhouette_score(X, labels)
print(f"Silhouette Score for KMeans with {n_clusters} clusters: {silhouette_avg:.4f}")

# Add the cluster labels to the DataFrame for inspection
df_bow['Cluster'] = labels
df_bow

## Visualize the clusters and points using PCA

t-SNE might be better

In [None]:
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA

# Reduce the dimensionality of the data using PCA for visualization
pca = PCA(n_components=2)
X_pca = pca.fit_transform(X.toarray())

# Plot the clusters
plt.figure(figsize=(10, 7))
plt.scatter(X_pca[:, 0], X_pca[:, 1], c=labels, cmap='viridis', marker='o', edgecolor='k', s=100)
plt.title(f"K-Means Clustering with {n_clusters} Clusters (PCA-reduced data)")
plt.xlabel('PCA Component 1')
plt.ylabel('PCA Component 2')
plt.colorbar(label='Cluster')
plt.show()

In [None]:
import numpy as np
from sklearn.cluster import KMeans
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import pairwise_distances_argmin_min

# Step 3: Get the cluster centroids
centroids = kmeans.cluster_centers_

# Step 4: Find the row/point closest to the centroid for each cluster
closest, _ = pairwise_distances_argmin_min(centroids, X)

for i, index in enumerate(closest):
    print(
        f"Cluster {i}: Closest vector to centroid is at index {index}, which corresponds to: '{corpus[index]}'"
    )

In [None]:
closest