diff --git a/.gitignore b/.gitignore index 8f8786e..4241e28 100644 --- a/.gitignore +++ b/.gitignore @@ -18,7 +18,13 @@ env/ # Version files generated by hatch-vcs coordinode/_version.py +coordinode/coordinode/_version.py langchain-coordinode/langchain_coordinode/_version.py llama-index-coordinode/llama_index/graph_stores/coordinode/_version.py + +# Local dev / arch docs — not part of the SDK GAPS.md CLAUDE.md +DEVLOG*.md + +**/.ipynb_checkpoints/ diff --git a/coordinode-embedded/python/coordinode_embedded/_coordinode_embedded.pyi b/coordinode-embedded/python/coordinode_embedded/_coordinode_embedded.pyi index 77c14a2..c0711f4 100644 --- a/coordinode-embedded/python/coordinode_embedded/_coordinode_embedded.pyi +++ b/coordinode-embedded/python/coordinode_embedded/_coordinode_embedded.pyi @@ -23,7 +23,6 @@ class LocalClient: """ def __init__(self, path: str) -> None: ... - def cypher( self, query: str, @@ -48,6 +47,6 @@ class LocalClient: """ ... - def __enter__(self) -> "LocalClient": ... + def __enter__(self) -> LocalClient: ... def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> bool: ... def __repr__(self) -> str: ... diff --git a/coordinode-rs b/coordinode-rs index 0aa5bfa..6df09c8 160000 --- a/coordinode-rs +++ b/coordinode-rs @@ -1 +1 @@ -Subproject commit 0aa5bfa52214033f757ac2b7ea3be9e7d6798c3c +Subproject commit 6df09c87a94eebd4c63dde08df420396df480487 diff --git a/coordinode/coordinode/__init__.py b/coordinode/coordinode/__init__.py index d36407f..423f831 100644 --- a/coordinode/coordinode/__init__.py +++ b/coordinode/coordinode/__init__.py @@ -23,9 +23,12 @@ CoordinodeClient, EdgeResult, EdgeTypeInfo, + HybridResult, LabelInfo, NodeResult, PropertyDefinitionInfo, + TextIndexInfo, + TextResult, TraverseResult, VectorResult, ) @@ -40,8 +43,11 @@ "NodeResult", "EdgeResult", "VectorResult", + "TextResult", + "HybridResult", "LabelInfo", "EdgeTypeInfo", "PropertyDefinitionInfo", + "TextIndexInfo", "TraverseResult", ] diff --git a/coordinode/coordinode/_types.py b/coordinode/coordinode/_types.py index ef45edd..d8e354c 100644 --- a/coordinode/coordinode/_types.py +++ b/coordinode/coordinode/_types.py @@ -33,11 +33,13 @@ def to_property_value(py_val: PyValue) -> Any: pv.string_value = py_val elif isinstance(py_val, bytes): pv.bytes_value = py_val - elif isinstance(py_val, (list, tuple)): + elif isinstance(py_val, list | tuple): # Homogeneous float list → Vector; mixed/str list → PropertyList. # bool is a subclass of int, so exclude it explicitly — [True, False] must # not be serialised as a Vector of 1.0/0.0 but as a PropertyList. - if py_val and all(isinstance(v, (int, float)) and not isinstance(v, bool) for v in py_val): + # list | tuple union syntax is valid in isinstance() for Python ≥3.10 (PEP 604). + # This project targets Python ≥3.11 (pyproject.toml: requires-python = ">=3.11"). + if py_val and all(isinstance(v, int | float) and not isinstance(v, bool) for v in py_val): vec = Vector(values=[float(v) for v in py_val]) pv.vector_value.CopyFrom(vec) else: diff --git a/coordinode/coordinode/client.py b/coordinode/coordinode/client.py index b270dd1..1587556 100644 --- a/coordinode/coordinode/client.py +++ b/coordinode/coordinode/client.py @@ -28,6 +28,21 @@ # be reliably distinguished from a "host:port" pair. _HOST_PORT_RE = re.compile(r"^(\[.+\]|[^:]+):(\d+)$") +# Cypher identifier: must start with a letter or underscore, followed by +# letters, digits, or underscores. Validated before interpolating user-supplied +# names/labels/properties into DDL strings to surface clear errors early. +_CYPHER_IDENT_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$") + + +def _validate_cypher_identifier(value: str, param_name: str) -> None: + """Raise :exc:`ValueError` if *value* is not a valid Cypher identifier.""" + if not isinstance(value, str) or not _CYPHER_IDENT_RE.fullmatch(value): + raise ValueError( + f"{param_name} must be a valid Cypher identifier (letters, digits, underscores, " + f"starting with a letter or underscore); got {value!r}" + ) + + # ── Low-level helpers ──────────────────────────────────────────────────────── @@ -85,6 +100,36 @@ def __repr__(self) -> str: return f"VectorResult(distance={self.distance:.4f}, node={self.node})" +class TextResult: + """A single full-text search result with BM25 score and optional snippet.""" + + def __init__(self, proto_result: Any) -> None: + self.node_id: int = proto_result.node_id + self.score: float = proto_result.score + # HTML snippet with highlights. Empty when unavailable. + self.snippet: str = proto_result.snippet + + def __repr__(self) -> str: + return f"TextResult(node_id={self.node_id}, score={self.score:.4f}, snippet={self.snippet!r})" + + +class HybridResult: + """A single result from hybrid text + vector search (RRF-ranked).""" + + def __init__(self, proto_result: Any) -> None: + self.node_id: int = proto_result.node_id + # Combined RRF score: text_weight/(60+rank_text) + vector_weight/(60+rank_vec). + self.score: float = proto_result.score + # NOTE: proto HybridResult carries only node_id + score (no embedded Node + # message). A full node is not included by design — the server returns IDs + # for efficiency. Callers that need node properties should use the client + # API: `client.get_node(self.node_id)`, or match on an application-level + # property in Cypher (e.g. WHERE n.id = ). + + def __repr__(self) -> str: + return f"HybridResult(node_id={self.node_id}, score={self.score:.6f})" + + class PropertyDefinitionInfo: """A property definition from the schema (name, type, required, unique).""" @@ -105,9 +150,11 @@ def __init__(self, proto_label: Any) -> None: self.name: str = proto_label.name self.version: int = proto_label.version self.properties: list[PropertyDefinitionInfo] = [PropertyDefinitionInfo(p) for p in proto_label.properties] + # schema_mode: 0=unspecified, 1=strict, 2=validated, 3=flexible + self.schema_mode: int = getattr(proto_label, "schema_mode", 0) def __repr__(self) -> str: - return f"LabelInfo(name={self.name!r}, version={self.version}, properties={len(self.properties)})" + return f"LabelInfo(name={self.name!r}, version={self.version}, properties={len(self.properties)}, schema_mode={self.schema_mode})" class EdgeTypeInfo: @@ -117,9 +164,10 @@ def __init__(self, proto_edge_type: Any) -> None: self.name: str = proto_edge_type.name self.version: int = proto_edge_type.version self.properties: list[PropertyDefinitionInfo] = [PropertyDefinitionInfo(p) for p in proto_edge_type.properties] + self.schema_mode: int = getattr(proto_edge_type, "schema_mode", 0) def __repr__(self) -> str: - return f"EdgeTypeInfo(name={self.name!r}, version={self.version}, properties={len(self.properties)})" + return f"EdgeTypeInfo(name={self.name!r}, version={self.version}, properties={len(self.properties)}, schema_mode={self.schema_mode})" class TraverseResult: @@ -133,6 +181,23 @@ def __repr__(self) -> str: return f"TraverseResult(nodes={len(self.nodes)}, edges={len(self.edges)})" +class TextIndexInfo: + """Information about a full-text index returned by :meth:`create_text_index`.""" + + def __init__(self, row: dict[str, Any]) -> None: + self.name: str = str(row.get("index", "")) + self.label: str = str(row.get("label", "")) + self.properties: str = str(row.get("properties", "")) + self.default_language: str = str(row.get("default_language", "")) + self.documents_indexed: int = int(row.get("documents_indexed", 0)) + + def __repr__(self) -> str: + return ( + f"TextIndexInfo(name={self.name!r}, label={self.label!r}," + f" properties={self.properties!r}, documents_indexed={self.documents_indexed})" + ) + + # ── Async client ───────────────────────────────────────────────────────────── @@ -191,6 +256,7 @@ async def connect(self) -> None: self._channel = _make_async_channel(self._host, self._port, self._tls) self._cypher_stub = _cypher_stub(self._channel) self._vector_stub = _vector_stub(self._channel) + self._text_stub = _text_stub(self._channel) self._graph_stub = _graph_stub(self._channel) self._schema_stub = _schema_stub(self._channel) self._health_stub = _health_stub(self._channel) @@ -365,6 +431,255 @@ async def get_edge_types(self) -> list[EdgeTypeInfo]: resp = await self._schema_stub.ListEdgeTypes(ListEdgeTypesRequest(), timeout=self._timeout) return [EdgeTypeInfo(et) for et in resp.edge_types] + @staticmethod + def _validate_property_dict(p: Any, idx: int) -> tuple[str, str, bool, bool]: + """Validate a single property dict and return ``(name, type_str, required, unique)``.""" + if not isinstance(p, dict): + raise ValueError(f"Property at index {idx} must be a dict; got {p!r}") + name = p.get("name") + if not isinstance(name, str) or not name: + raise ValueError(f"Property at index {idx} must have a non-empty 'name' key; got {p!r}") + raw_type = p.get("type", "string") + if "type" in p and not isinstance(raw_type, str): + raise ValueError(f"Property {name!r} must use a string value for 'type'; got {raw_type!r}") + type_str = str(raw_type).strip().lower() + required = p.get("required", False) + unique = p.get("unique", False) + if not isinstance(required, bool) or not isinstance(unique, bool): + raise ValueError( + f"Property {name!r} must use boolean values for 'required' and 'unique'; got " + f"required={required!r}, unique={unique!r}" + ) + return name, type_str, required, unique + + @staticmethod + def _build_property_definitions( + properties: list[dict[str, Any]] | tuple[dict[str, Any], ...] | None, + property_type_cls: Any, + property_definition_cls: Any, + ) -> list[Any]: + """Convert property dicts to proto PropertyDefinition objects. + + Shared by :meth:`create_label` and :meth:`create_edge_type` to avoid + duplicating the type-map and validation logic. + """ + type_map = { + "int64": property_type_cls.PROPERTY_TYPE_INT64, + "float64": property_type_cls.PROPERTY_TYPE_FLOAT64, + "string": property_type_cls.PROPERTY_TYPE_STRING, + "bool": property_type_cls.PROPERTY_TYPE_BOOL, + "bytes": property_type_cls.PROPERTY_TYPE_BYTES, + "timestamp": property_type_cls.PROPERTY_TYPE_TIMESTAMP, + "vector": property_type_cls.PROPERTY_TYPE_VECTOR, + "list": property_type_cls.PROPERTY_TYPE_LIST, + "map": property_type_cls.PROPERTY_TYPE_MAP, + } + if properties is None: + return [] + # list | tuple union syntax is valid in isinstance() for Python ≥3.10 (PEP 604). + # This project targets Python ≥3.11 (pyproject.toml: requires-python = ">=3.11"). + if not isinstance(properties, list | tuple): + raise ValueError( + f"'properties' must be a list or tuple of property dicts or None; got {type(properties).__name__}" + ) + result = [] + for idx, p in enumerate(properties): + name, type_str, required, unique = AsyncCoordinodeClient._validate_property_dict(p, idx) + if type_str not in type_map: + raise ValueError( + f"Unknown property type {type_str!r} for property {name!r}. " + f"Expected 'type' to be one of: {sorted(type_map)}" + ) + result.append( + property_definition_cls( + name=name, + type=type_map[type_str], + required=required, + unique=unique, + ) + ) + return result + + @staticmethod + def _normalize_schema_mode(schema_mode: str | int, mode_map: dict[str, int]) -> int: + """Normalize schema_mode (str or int) to a proto SchemaMode enum value. + + Shared by :meth:`create_label` and :meth:`create_edge_type` to avoid + duplicating the validation and normalisation logic. + + Accepts: + - ``str`` — case-insensitive, leading/trailing whitespace stripped. + - ``int`` — must be one of the values in *mode_map*; allows round-tripping + ``LabelInfo.schema_mode`` / ``EdgeTypeInfo.schema_mode`` back into the call. + """ + if isinstance(schema_mode, bool): + raise ValueError(f"schema_mode must be a str or int, got bool {schema_mode!r}.") + if isinstance(schema_mode, int): + # Accept int to allow round-tripping LabelInfo/EdgeTypeInfo.schema_mode. + valid_ints = set(mode_map.values()) + if schema_mode not in valid_ints: + raise ValueError( + f"schema_mode integer {schema_mode!r} is not a valid SchemaMode value; " + f"expected one of {sorted(valid_ints)} or a string {list(mode_map)!r}" + ) + return schema_mode + elif isinstance(schema_mode, str): + normalized = schema_mode.strip().lower() + if normalized not in mode_map: + raise ValueError(f"schema_mode must be one of {list(mode_map)}, got {schema_mode!r}") + return mode_map[normalized] + else: + raise ValueError(f"schema_mode must be a str or int, got {type(schema_mode).__name__!r}") + + async def create_label( + self, + name: str, + properties: list[dict[str, Any]] | tuple[dict[str, Any], ...] | None = None, + *, + schema_mode: str | int = "strict", + ) -> LabelInfo: + """Create a node label in the schema registry. + + Args: + name: Label name (e.g. ``"Person"``). + properties: Optional list of property dicts with keys + ``name`` (str), ``type`` (str), ``required`` (bool), + ``unique`` (bool). Type strings: ``"string"``, + ``"int64"``, ``"float64"``, ``"bool"``, ``"bytes"``, + ``"timestamp"``, ``"vector"``, ``"list"``, ``"map"``. + schema_mode: ``"strict"`` (default — reject undeclared props), + ``"validated"`` (allow extra props without interning), + ``"flexible"`` (no enforcement). + """ + from coordinode._proto.coordinode.v1.graph.schema_pb2 import ( # type: ignore[import] + CreateLabelRequest, + PropertyDefinition, + PropertyType, + SchemaMode, + ) + + _mode_map = { + "strict": SchemaMode.SCHEMA_MODE_STRICT, + "validated": SchemaMode.SCHEMA_MODE_VALIDATED, + "flexible": SchemaMode.SCHEMA_MODE_FLEXIBLE, + } + proto_schema_mode = self._normalize_schema_mode(schema_mode, _mode_map) + + proto_props = self._build_property_definitions(properties, PropertyType, PropertyDefinition) + req = CreateLabelRequest( + name=name, + properties=proto_props, + schema_mode=proto_schema_mode, + ) + label = await self._schema_stub.CreateLabel(req, timeout=self._timeout) + return LabelInfo(label) + + async def create_edge_type( + self, + name: str, + properties: list[dict[str, Any]] | tuple[dict[str, Any], ...] | None = None, + ) -> EdgeTypeInfo: + """Create an edge type in the schema registry. + + Args: + name: Edge type name (e.g. ``"KNOWS"``). + properties: Optional list of property dicts with keys + ``name`` (str), ``type`` (str), ``required`` (bool), + ``unique`` (bool). Same type strings as :meth:`create_label`. + + Note: + ``schema_mode`` is not yet supported by the server for edge types + (``CreateEdgeTypeRequest`` does not carry that field). Schema + mode enforcement for edge types is planned for a future release. + """ + from coordinode._proto.coordinode.v1.graph.schema_pb2 import ( # type: ignore[import] + CreateEdgeTypeRequest, + PropertyDefinition, + PropertyType, + ) + + proto_props = self._build_property_definitions(properties, PropertyType, PropertyDefinition) + req = CreateEdgeTypeRequest( + name=name, + properties=proto_props, + ) + et = await self._schema_stub.CreateEdgeType(req, timeout=self._timeout) + return EdgeTypeInfo(et) + + async def create_text_index( + self, + name: str, + label: str, + properties: str | list[str] | tuple[str, ...], + *, + language: str = "", + ) -> TextIndexInfo: + """Create a full-text (BM25) index on one or more node properties. + + Args: + name: Unique index name (e.g. ``"article_body"``). Must be a + simple Cypher identifier: letters, digits, and underscores only, + starting with a letter or underscore. Names with dashes or + spaces are not supported by this method; use raw :meth:`cypher` + with backtick-escaped identifiers instead. + label: Node label to index (e.g. ``"Article"``). Same identifier + restrictions as *name* apply. + properties: Property name or list of property names to index + (e.g. ``"body"`` or ``["title", "body"]``). Same identifier + restrictions apply. + language: Default stemming/tokenization language (e.g. ``"english"``, + ``"russian"``). Empty string uses the server default + (``"english"``). Same identifier restrictions apply. + + Returns: + :class:`TextIndexInfo` with index metadata and document count. + + Example:: + + info = await client.create_text_index("article_body", "Article", "body") + # then: results = await client.text_search("Article", "machine learning") + """ + _validate_cypher_identifier(name, "name") + _validate_cypher_identifier(label, "label") + if isinstance(properties, str): + prop_list = [properties] + elif isinstance(properties, list | tuple): + prop_list = list(properties) + else: + raise ValueError("'properties' must be a property name (str) or a list or tuple of property names") + if not prop_list: + raise ValueError("'properties' must contain at least one property name") + for prop in prop_list: + _validate_cypher_identifier(prop, "property") + if language: + _validate_cypher_identifier(language, "language") + props_expr = ", ".join(prop_list) + lang_clause = f" DEFAULT LANGUAGE {language}" if language else "" + cypher = f"CREATE TEXT INDEX {name} ON :{label}({props_expr}){lang_clause}" + rows = await self.cypher(cypher) + if rows: + return TextIndexInfo(rows[0]) + effective_language = language or "english" + return TextIndexInfo( + {"index": name, "label": label, "properties": ", ".join(prop_list), "default_language": effective_language} + ) + + async def drop_text_index(self, name: str) -> None: + """Drop a full-text index by name. + + Args: + name: Index name previously passed to :meth:`create_text_index`. + Must be a simple Cypher identifier (letters, digits, underscores). + Use raw :meth:`cypher` with backtick-escaped identifiers for names + that contain dashes or spaces. + + Example:: + + await client.drop_text_index("article_body") + """ + _validate_cypher_identifier(name, "name") + await self.cypher(f"DROP TEXT INDEX {name}") + async def traverse( self, start_node_id: int, @@ -417,6 +732,108 @@ async def traverse( resp = await self._graph_stub.Traverse(req, timeout=self._timeout) return TraverseResult(resp) + async def text_search( + self, + label: str, + query: str, + *, + limit: int = 10, + fuzzy: bool = False, + language: str = "", + ) -> list[TextResult]: + """Run a full-text BM25 search over all indexed text properties for *label*. + + Args: + label: Node label to search (e.g. ``"Article"``). + query: Full-text query string. Supports boolean operators (``AND``, + ``OR``, ``NOT``), phrase search (``"exact phrase"``), prefix + wildcards (``term*``), and per-term boosting (``term^N``). + limit: Maximum results to return (default 10). The server may apply + its own upper bound; pass a reasonable value (e.g. ≤ 1000). + fuzzy: If ``True``, apply Levenshtein-1 fuzzy matching to individual + terms. Increases recall at the cost of precision. + language: Tokenization/stemming language (e.g. ``"english"``, + ``"russian"``). Empty string uses the index's default language. + + Returns: + List of :class:`TextResult` ordered by BM25 score descending. + Returns ``[]`` if no text index exists for *label*. + + Note: + Text indexing is **not** automatic. Before calling this method, + create a full-text index with the Cypher DDL statement:: + + CREATE TEXT INDEX my_index ON :Label(property) + + or via :meth:`create_text_index`. Nodes written before the index + was created are indexed immediately at DDL execution time. + """ + if not isinstance(limit, int) or isinstance(limit, bool) or limit < 1: + raise ValueError(f"limit must be an integer >= 1, got {limit!r}.") + from coordinode._proto.coordinode.v1.query.text_pb2 import TextSearchRequest # type: ignore[import] + + req = TextSearchRequest(label=label, query=query, limit=limit, fuzzy=fuzzy, language=language) + resp = await self._text_stub.TextSearch(req, timeout=self._timeout) + return [TextResult(r) for r in resp.results] + + async def hybrid_text_vector_search( + self, + label: str, + text_query: str, + vector: Sequence[float], + *, + limit: int = 10, + text_weight: float = 0.5, + vector_weight: float = 0.5, + vector_property: str = "embedding", + ) -> list[HybridResult]: + """Fuse BM25 text search and cosine vector search using Reciprocal Rank Fusion (RRF). + + Runs text and vector searches independently, then combines their ranked + lists:: + + rrf_score(node) = text_weight / (60 + rank_text) + + vector_weight / (60 + rank_vec) + + Args: + label: Node label to search (e.g. ``"Article"``). + text_query: Full-text query string (same syntax as :meth:`text_search`). + vector: Query embedding vector. Must match the dimensionality stored + in *vector_property*. + limit: Maximum fused results to return (default 10). The server may + apply its own upper bound; pass a reasonable value (e.g. ≤ 1000). + text_weight: Weight for the BM25 component (default 0.5). + vector_weight: Weight for the cosine component (default 0.5). + vector_property: Node property containing the embedding (default + ``"embedding"``). + + Returns: + List of :class:`HybridResult` ordered by RRF score descending. + + Note: + A full-text index covering *label* **must exist** before calling this + method — create one with :meth:`create_text_index` or a + ``CREATE TEXT INDEX`` Cypher statement. Calling this method on a + label without a text index returns an empty list. + """ + if not isinstance(limit, int) or isinstance(limit, bool) or limit < 1: + raise ValueError(f"limit must be an integer >= 1, got {limit!r}.") + from coordinode._proto.coordinode.v1.query.text_pb2 import ( # type: ignore[import] + HybridTextVectorSearchRequest, + ) + + req = HybridTextVectorSearchRequest( + label=label, + text_query=text_query, + vector=[float(v) for v in vector], + limit=limit, + text_weight=text_weight, + vector_weight=vector_weight, + vector_property=vector_property, + ) + resp = await self._text_stub.HybridTextVectorSearch(req, timeout=self._timeout) + return [HybridResult(r) for r in resp.results] + async def health(self) -> bool: from coordinode._proto.coordinode.v1.health.health_pb2 import ( # type: ignore[import] HealthCheckRequest, @@ -544,6 +961,39 @@ def get_edge_types(self) -> list[EdgeTypeInfo]: """Return all edge types defined in the schema.""" return self._run(self._async.get_edge_types()) + def create_label( + self, + name: str, + properties: list[dict[str, Any]] | tuple[dict[str, Any], ...] | None = None, + *, + schema_mode: str | int = "strict", + ) -> LabelInfo: + """Create a node label in the schema registry.""" + return self._run(self._async.create_label(name, properties, schema_mode=schema_mode)) + + def create_edge_type( + self, + name: str, + properties: list[dict[str, Any]] | tuple[dict[str, Any], ...] | None = None, + ) -> EdgeTypeInfo: + """Create an edge type in the schema registry.""" + return self._run(self._async.create_edge_type(name, properties)) + + def create_text_index( + self, + name: str, + label: str, + properties: str | list[str] | tuple[str, ...], + *, + language: str = "", + ) -> TextIndexInfo: + """Create a full-text (BM25) index on one or more node properties.""" + return self._run(self._async.create_text_index(name, label, properties, language=language)) + + def drop_text_index(self, name: str) -> None: + """Drop a full-text index by name.""" + return self._run(self._async.drop_text_index(name)) + def traverse( self, start_node_id: int, @@ -554,6 +1004,42 @@ def traverse( """Traverse the graph from *start_node_id* following *edge_type* edges.""" return self._run(self._async.traverse(start_node_id, edge_type, direction, max_depth)) + def text_search( + self, + label: str, + query: str, + *, + limit: int = 10, + fuzzy: bool = False, + language: str = "", + ) -> list[TextResult]: + """Run a full-text BM25 search over all indexed text properties for *label*.""" + return self._run(self._async.text_search(label, query, limit=limit, fuzzy=fuzzy, language=language)) + + def hybrid_text_vector_search( + self, + label: str, + text_query: str, + vector: Sequence[float], + *, + limit: int = 10, + text_weight: float = 0.5, + vector_weight: float = 0.5, + vector_property: str = "embedding", + ) -> list[HybridResult]: + """Fuse BM25 text search and cosine vector search using RRF ranking.""" + return self._run( + self._async.hybrid_text_vector_search( + label, + text_query, + vector, + limit=limit, + text_weight=text_weight, + vector_weight=vector_weight, + vector_property=vector_property, + ) + ) + def health(self) -> bool: return self._run(self._async.health()) @@ -573,6 +1059,12 @@ def _vector_stub(channel: Any) -> Any: return VectorServiceStub(channel) +def _text_stub(channel: Any) -> Any: + from coordinode._proto.coordinode.v1.query.text_pb2_grpc import TextServiceStub # type: ignore[import] + + return TextServiceStub(channel) + + def _graph_stub(channel: Any) -> Any: from coordinode._proto.coordinode.v1.graph.graph_pb2_grpc import GraphServiceStub # type: ignore[import] diff --git a/demo/Dockerfile.jupyter b/demo/Dockerfile.jupyter new file mode 100644 index 0000000..effccd9 --- /dev/null +++ b/demo/Dockerfile.jupyter @@ -0,0 +1,32 @@ +FROM jupyter/scipy-notebook:python-3.11.6 + +USER root +RUN apt-get update && apt-get install -y --no-install-recommends gcc git && rm -rf /var/lib/apt/lists/* + +# Copy and chmod install script while still root +COPY install-sdk.sh /tmp/install-sdk.sh +RUN chmod +x /tmp/install-sdk.sh + +USER ${NB_UID} + +# Core graph + LLM orchestration stack +RUN pip install --no-cache-dir \ + # build tools for SDK editable installs (hatch-vcs reads git tags for versioning) + hatchling \ + hatch-vcs \ + nest_asyncio \ + # LlamaIndex core + graph store protocol + llama-index-core \ + llama-index-llms-openai \ + llama-index-embeddings-openai \ + # LangChain + LangGraph + langchain \ + langchain-openai \ + langchain-community \ + langgraph \ + # coordinode SDK packages (installed from mounted /sdk) + grpcio \ + grpcio-tools \ + protobuf + +WORKDIR /home/jovyan/work diff --git a/demo/README.md b/demo/README.md new file mode 100644 index 0000000..974cfd7 --- /dev/null +++ b/demo/README.md @@ -0,0 +1,44 @@ +# CoordiNode Demo Notebooks + +Interactive notebooks for LlamaIndex, LangChain, and LangGraph integrations. + +## Open in Google Colab (no setup required) + +| Notebook | What it shows | +|----------|---------------| +| [![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/structured-world/coordinode-python/blob/main/demo/notebooks/00_seed_data.ipynb) **Seed Data** | Build a tech-industry knowledge graph (~35 relationships) | +| [![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/structured-world/coordinode-python/blob/main/demo/notebooks/01_llama_index_property_graph.ipynb) **LlamaIndex** | `CoordinodePropertyGraphStore`: upsert, triplets, structured query | +| [![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/structured-world/coordinode-python/blob/main/demo/notebooks/02_langchain_graph_chain.ipynb) **LangChain** | `CoordinodeGraph`: add_graph_documents, schema, GraphCypherQAChain | +| [![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/structured-world/coordinode-python/blob/main/demo/notebooks/03_langgraph_agent.ipynb) **LangGraph** | Agent with CoordiNode as graph memory — save/query/traverse | + +> **Note:** First run installs `coordinode-embedded` from source (Rust build, ~5 min). +> Subsequent runs use Colab's pip cache. +> The embedded Colab install is pinned to a specific commit that bundles coordinode-rs v0.3.17; the Colab notebook links above target `main`. +> The Docker Compose stack below uses the CoordiNode **server** image v0.3.17. + +## Run locally (Docker Compose) + +`demo/docker-compose.yml` provides a CoordiNode + Jupyter Lab stack: + +```bash +cd demo/ +docker compose up -d --build +``` + +Open: http://localhost:38888 (token: `demo`) + +| Port | Service | +|------|---------| +| 37080 | CoordiNode gRPC | +| 37084 | CoordiNode metrics/health (`/metrics`, `/health`) | +| 38888 | Jupyter Lab | + +## With OpenAI (optional) + +Notebooks 02 and 03 have optional sections that use `OPENAI_API_KEY`. +They auto-skip when the key is absent — all core features work without LLM. + +```bash +cd demo/ +OPENAI_API_KEY=sk-... docker compose up -d +``` diff --git a/demo/docker-compose.yml b/demo/docker-compose.yml new file mode 100644 index 0000000..48ba3f8 --- /dev/null +++ b/demo/docker-compose.yml @@ -0,0 +1,51 @@ +services: + coordinode: + # Keep version in sync with root docker-compose.yml + image: ghcr.io/structured-world/coordinode:0.3.17 + container_name: demo-coordinode + ports: + - "127.0.0.1:37080:7080" # gRPC (native API) — localhost-only + - "127.0.0.1:37084:7084" # Prometheus /metrics, /health — localhost-only + volumes: + - coordinode-data:/data + environment: + - COORDINODE_LOG_FORMAT=text + restart: unless-stopped + healthcheck: + test: ["CMD-SHELL", "wget -qO- http://localhost:7084/health || exit 1"] + interval: 5s + timeout: 3s + retries: 10 + start_period: 5s + + jupyter: + build: + context: . + # Dockerfile.jupyter lives in demo/ (same context directory as this file). + # It builds a Jupyter Lab image with the SDK installed from the SDK source mount. + dockerfile: Dockerfile.jupyter + container_name: demo-jupyter + ports: + - "127.0.0.1:38888:8888" # Jupyter Lab (localhost-only) + volumes: + - ./notebooks:/home/jovyan/work + - ../:/sdk # mount SDK source so notebooks can pip install -e + environment: + - COORDINODE_ADDR=coordinode:7080 # internal network address + - JUPYTER_ENABLE_LAB=yes + - JUPYTER_TOKEN=demo # intentional: localhost-only demo stack, documented in README ("token: demo") + - SETUPTOOLS_SCM_PRETEND_VERSION=0.0.0 # hatch-vcs: skip git, use fixed version + # Install SDK packages from the mounted /sdk source at container start, + # then hand off to the default Jupyter Lab startup script. + # /tmp/install-sdk.sh is copied into the image by Dockerfile.jupyter; + # it must run after /sdk is mounted (i.e. at runtime, not during build). + command: >- + bash -c "/tmp/install-sdk.sh && exec start-notebook.sh" + depends_on: + coordinode: + condition: service_healthy + restart: unless-stopped + +volumes: + coordinode-data: + driver: local diff --git a/demo/install-sdk.sh b/demo/install-sdk.sh new file mode 100644 index 0000000..4a860c9 --- /dev/null +++ b/demo/install-sdk.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash +# Install coordinode SDK packages from mounted /sdk source. +# Run once inside the container after /sdk is mounted. +set -e +pip install --no-cache-dir -e /sdk/coordinode +pip install --no-cache-dir -e /sdk/llama-index-coordinode +pip install --no-cache-dir -e /sdk/langchain-coordinode diff --git a/demo/notebooks/00_seed_data.ipynb b/demo/notebooks/00_seed_data.ipynb new file mode 100644 index 0000000..6961f92 --- /dev/null +++ b/demo/notebooks/00_seed_data.ipynb @@ -0,0 +1,470 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "a1b2c3d4-0000-0000-0000-000000000001", + "metadata": {}, + "source": [ + "# Seed Demo Data\n", + "\n", + "[![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/structured-world/coordinode-python/blob/main/demo/notebooks/00_seed_data.ipynb)\n", + "\n", + "Populates CoordiNode with a **tech industry knowledge graph**.\n", + "\n", + "> **Note:** When using `coordinode-embedded` (`LocalClient(\":memory:\")`), the seeded data\n", + "> lives only inside this notebook process — notebooks 01–03 will start with an empty graph.\n", + "> To share the graph across notebooks, point all of them at the same running CoordiNode\n", + "> server via `COORDINODE_ADDR`.\n", + "\n", + "**Graph contents:**\n", + "- 10 people (engineers, researchers, founders)\n", + "- 6 companies\n", + "- 8 technologies / research areas\n", + "- ~35 relationships (WORKS_AT, FOUNDED, KNOWS, RESEARCHES, INVENTED, ACQUIRED, USES, …)\n", + "\n", + "All nodes carry a `demo=true` property and a `demo_tag` equal to the `DEMO_TAG` variable\n", + "set in the seed cell. MERGE operations and cleanup are scoped to that tag, so only nodes\n", + "with the matching `demo_tag` are written or removed.\n", + "\n", + "**Environments:**\n", + "- **Google Colab** — uses `coordinode-embedded` (in-process Rust engine, no server needed). First run compiles from source (~5 min); subsequent runs use the pip cache.\n", + "- **Local / Docker Compose** — connects to a running CoordiNode server via gRPC.\n", + "\n", + "> **⚠️ Note for real-server use:** All writes and the cleanup step are scoped to `demo_tag`.\n", + "> Collisions can occur if multiple runs reuse the same `demo_tag` value or if `demo_tag` is\n", + "> empty. Run against a fresh/empty database or choose a unique `demo_tag` to avoid affecting\n", + "> unrelated nodes." + ] + }, + { + "cell_type": "markdown", + "id": "a1b2c3d4-0000-0000-0000-000000000002", + "metadata": {}, + "source": [ + "## Install dependencies" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a1b2c3d4-0000-0000-0000-000000000003", + "metadata": {}, + "outputs": [], + "source": [ + "import os, sys, subprocess\n", + "\n", + "IN_COLAB = \"google.colab\" in sys.modules\n", + "\n", + "# Install coordinode-embedded only when running in Colab AND no gRPC server is configured.\n", + "# If COORDINODE_ADDR is set, a live server is already available — skip the 5-min Rust build.\n", + "if IN_COLAB and not os.environ.get(\"COORDINODE_ADDR\"):\n", + " # Install Rust toolchain via rustup (https://rustup.rs).\n", + " # Colab's apt packages ship rustc ≤1.75, which cannot build coordinode-embedded\n", + " # (requires Rust ≥1.80 for maturin/pyo3). apt-get is not a viable alternative here.\n", + " # Download the installer to a temp file and execute it explicitly — this avoids\n", + " # piping remote content directly into a shell while maintaining HTTPS/TLS security\n", + " # through Python's default ssl context (cert-verified, TLS 1.2+).\n", + " # SHA256 pinning of rustup-init is intentionally omitted: rustup.rs does not\n", + " # publish a stable per-release checksum for sh.rustup.rs itself (only for\n", + " # platform-specific rustup-init binaries), and pinning a hash here would break\n", + " # silently on every rustup release. The HTTPS/TLS verification + temp-file\n", + " # execution (not piped to shell) is the rustup team's recommended trust model.\n", + " # No additional env-var gate (e.g. COORDINODE_ENABLE_RUSTUP) is needed:\n", + " # the `IN_COLAB and not COORDINODE_ADDR` check above already ensures this block\n", + " # never runs when a live gRPC server is available, so there is no risk of\n", + " # unintentional execution in local or server environments.\n", + " # Security note: downloading rustup-init via HTTPS with cert verification and\n", + " # executing from a temp file (not piped to shell) is by design — this is the\n", + " # rustup project's own recommended install method for automated environments.\n", + " import ssl as _ssl, tempfile as _tmp, urllib.request as _ur\n", + "\n", + " _ctx = _ssl.create_default_context()\n", + " with _tmp.NamedTemporaryFile(mode=\"wb\", suffix=\".sh\", delete=False) as _f:\n", + " with _ur.urlopen(\"https://sh.rustup.rs\", context=_ctx, timeout=30) as _r:\n", + " _f.write(_r.read())\n", + " _rustup_path = _f.name\n", + " try:\n", + " subprocess.run([\"/bin/sh\", _rustup_path, \"-y\", \"-q\"], check=True, timeout=300)\n", + " finally:\n", + " os.unlink(_rustup_path)\n", + " # Add cargo to PATH so maturin/pip can find it.\n", + " _cargo_bin = os.path.expanduser(\"~/.cargo/bin\")\n", + " os.environ[\"PATH\"] = f\"{_cargo_bin}{os.pathsep}{os.environ.get('PATH', '')}\"\n", + " subprocess.run([sys.executable, \"-m\", \"pip\", \"install\", \"-q\", \"maturin\"], check=True, timeout=300)\n", + " subprocess.run(\n", + " [\n", + " sys.executable,\n", + " \"-m\",\n", + " \"pip\",\n", + " \"install\",\n", + " \"-q\",\n", + " \"git+https://github.com/structured-world/coordinode-python.git@8da94d694ecaabee6f8380147d02f08220061bfa#subdirectory=coordinode-embedded\",\n", + " ],\n", + " check=True,\n", + " timeout=600,\n", + " )\n", + "\n", + "subprocess.run(\n", + " [\n", + " sys.executable,\n", + " \"-m\",\n", + " \"pip\",\n", + " \"install\",\n", + " \"-q\",\n", + " \"coordinode\",\n", + " \"nest_asyncio\",\n", + " ],\n", + " check=True,\n", + " timeout=300,\n", + ")\n", + "\n", + "import nest_asyncio\n", + "\n", + "nest_asyncio.apply()\n", + "\n", + "print(\"Ready\")" + ] + }, + { + "cell_type": "markdown", + "id": "a1b2c3d4-0000-0000-0000-000000000004", + "metadata": {}, + "source": "## Connect to CoordiNode\n\n- **Colab**: uses `LocalClient(\":memory:\")` — in-process embedded engine, no server required.\n- **Local with server**: connects to an existing CoordiNode on port 7080 (set `COORDINODE_ADDR` to override).\n- **Local without server**: falls back to `coordinode-embedded` if already installed (see [coordinode-embedded](https://github.com/structured-world/coordinode-python/tree/main/coordinode-embedded)); otherwise shows a `RuntimeError` with install instructions." + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a1b2c3d4-0000-0000-0000-000000000005", + "metadata": {}, + "outputs": [], + "source": [ + "import os, socket\n", + "\n", + "\n", + "def _port_open(port):\n", + " try:\n", + " with socket.create_connection((\"127.0.0.1\", port), timeout=1):\n", + " return True\n", + " except OSError:\n", + " return False\n", + "\n", + "\n", + "if os.environ.get(\"COORDINODE_ADDR\"):\n", + " COORDINODE_ADDR = os.environ[\"COORDINODE_ADDR\"]\n", + " from coordinode import CoordinodeClient\n", + "\n", + " client = CoordinodeClient(COORDINODE_ADDR)\n", + " if not client.health():\n", + " raise RuntimeError(f\"Health check failed for {COORDINODE_ADDR}\")\n", + " print(f\"Connected to {COORDINODE_ADDR}\")\n", + "else:\n", + " try:\n", + " grpc_port = int(os.environ.get(\"COORDINODE_PORT\", \"7080\"))\n", + " except ValueError as exc:\n", + " raise RuntimeError(\"COORDINODE_PORT must be an integer\") from exc\n", + "\n", + " if _port_open(grpc_port):\n", + " COORDINODE_ADDR = f\"localhost:{grpc_port}\"\n", + " from coordinode import CoordinodeClient\n", + "\n", + " client = CoordinodeClient(COORDINODE_ADDR)\n", + " if not client.health():\n", + " raise RuntimeError(f\"Health check failed for {COORDINODE_ADDR}\")\n", + " print(f\"Connected to {COORDINODE_ADDR}\")\n", + " else:\n", + " # No server available — use the embedded in-process engine.\n", + " try:\n", + " from coordinode_embedded import LocalClient\n", + " except ImportError as exc:\n", + " raise RuntimeError(\n", + " \"coordinode-embedded is not installed. \"\n", + " \"Run: pip install git+https://github.com/structured-world/coordinode-python.git@8da94d694ecaabee6f8380147d02f08220061bfa#subdirectory=coordinode-embedded\"\n", + " \" — or start a CoordiNode server and set COORDINODE_ADDR.\"\n", + " ) from exc\n", + "\n", + " client = LocalClient(\":memory:\")\n", + " print(\"Using embedded LocalClient (in-process)\")" + ] + }, + { + "cell_type": "markdown", + "id": "a1b2c3d4-0000-0000-0000-000000000006", + "metadata": {}, + "source": [ + "## Step 1 — Clear previous demo data" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a1b2c3d4-0000-0000-0000-000000000007", + "metadata": {}, + "outputs": [], + "source": [ + "import uuid\n", + "\n", + "DEMO_TAG = os.environ.get(\"COORDINODE_DEMO_TAG\") or f\"seed_data_{uuid.uuid4().hex[:8]}\"\n", + "print(\"Using DEMO_TAG:\", DEMO_TAG)\n", + "# Remove prior demo nodes and any attached relationships in one step to avoid\n", + "# duplicate relationship matches during cleanup (undirected MATCH -[r]-() returns\n", + "# each edge twice — once per endpoint — causing duplicate-delete errors).\n", + "client.cypher(\n", + " \"MATCH (n {demo: true, demo_tag: $tag}) DETACH DELETE n\",\n", + " params={\"tag\": DEMO_TAG},\n", + ")\n", + "print(\"Previous demo data removed\")" + ] + }, + { + "cell_type": "markdown", + "id": "a1b2c3d4-0000-0000-0000-000000000008", + "metadata": {}, + "source": [ + "## Step 2 — Create nodes" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a1b2c3d4-0000-0000-0000-000000000009", + "metadata": {}, + "outputs": [], + "source": [ + "# ── People ────────────────────────────────────────────────────────────────\n", + "people = [\n", + " {\"name\": \"Alice Chen\", \"role\": \"ML Researcher\", \"org\": \"DeepMind\", \"field\": \"Reinforcement Learning\"},\n", + " {\"name\": \"Bob Torres\", \"role\": \"Staff Engineer\", \"org\": \"Google\", \"field\": \"Distributed Systems\"},\n", + " {\"name\": \"Carol Smith\", \"role\": \"Founder & CEO\", \"org\": \"Synthex\", \"field\": \"NLP\"},\n", + " {\"name\": \"David Park\", \"role\": \"Research Scientist\", \"org\": \"OpenAI\", \"field\": \"LLMs\"},\n", + " {\"name\": \"Eva Müller\", \"role\": \"Systems Architect\", \"org\": \"Synthex\", \"field\": \"Graph Databases\"},\n", + " {\"name\": \"Frank Liu\", \"role\": \"Principal Engineer\", \"org\": \"Meta\", \"field\": \"Graph ML\"},\n", + " {\"name\": \"Grace Okafor\", \"role\": \"PhD Researcher\", \"org\": \"MIT\", \"field\": \"Knowledge Graphs\"},\n", + " {\"name\": \"Henry Rossi\", \"role\": \"CTO\", \"org\": \"Synthex\", \"field\": \"Databases\"},\n", + " {\"name\": \"Isla Nakamura\", \"role\": \"Senior Researcher\", \"org\": \"DeepMind\", \"field\": \"Graph Neural Networks\"},\n", + " {\"name\": \"James Wright\", \"role\": \"Engineering Lead\", \"org\": \"Google\", \"field\": \"Search\"},\n", + "]\n", + "\n", + "for p in people:\n", + " client.cypher(\n", + " \"MERGE (n:Person {name: $name, demo_tag: $tag}) \"\n", + " \"SET n.role = $role, n.field = $field, n.demo = true, n.demo_tag = $tag\",\n", + " params={**p, \"tag\": DEMO_TAG},\n", + " )\n", + "\n", + "print(f\"Created {len(people)} people\")\n", + "\n", + "# ── Companies ─────────────────────────────────────────────────────────────\n", + "companies = [\n", + " {\"name\": \"Google\", \"industry\": \"Technology\", \"founded\": 1998, \"hq\": \"Mountain View\"},\n", + " {\"name\": \"Meta\", \"industry\": \"Technology\", \"founded\": 2004, \"hq\": \"Menlo Park\"},\n", + " {\"name\": \"OpenAI\", \"industry\": \"AI Research\", \"founded\": 2015, \"hq\": \"San Francisco\"},\n", + " {\"name\": \"DeepMind\", \"industry\": \"AI Research\", \"founded\": 2010, \"hq\": \"London\"},\n", + " {\"name\": \"Synthex\", \"industry\": \"AI Startup\", \"founded\": 2021, \"hq\": \"Berlin\"},\n", + " {\"name\": \"MIT\", \"industry\": \"Academia\", \"founded\": 1861, \"hq\": \"Cambridge\"},\n", + "]\n", + "\n", + "for c in companies:\n", + " client.cypher(\n", + " \"MERGE (n:Company {name: $name, demo_tag: $tag}) SET n.industry = $industry, n.founded = $founded, n.hq = $hq, n.demo = true, n.demo_tag = $tag\",\n", + " params={**c, \"tag\": DEMO_TAG},\n", + " )\n", + "\n", + "print(f\"Created {len(companies)} companies\")\n", + "\n", + "# ── Technologies ──────────────────────────────────────────────────────────\n", + "technologies = [\n", + " {\"name\": \"Transformer\", \"type\": \"Architecture\", \"year\": 2017},\n", + " {\"name\": \"Graph Neural Network\", \"type\": \"Algorithm\", \"year\": 2009},\n", + " {\"name\": \"Reinforcement Learning\", \"type\": \"Paradigm\", \"year\": 1980},\n", + " {\"name\": \"Knowledge Graph\", \"type\": \"Data Model\", \"year\": 2012},\n", + " {\"name\": \"Vector Database\", \"type\": \"Infrastructure\", \"year\": 2019},\n", + " {\"name\": \"RAG\", \"type\": \"Technique\", \"year\": 2020},\n", + " {\"name\": \"LLM\", \"type\": \"Model Class\", \"year\": 2018},\n", + " {\"name\": \"GraphRAG\", \"type\": \"Technique\", \"year\": 2023},\n", + "]\n", + "\n", + "for t in technologies:\n", + " client.cypher(\n", + " \"MERGE (n:Technology {name: $name, demo_tag: $tag}) SET n.type = $type, n.year = $year, n.demo = true, n.demo_tag = $tag\",\n", + " params={**t, \"tag\": DEMO_TAG},\n", + " )\n", + "\n", + "print(f\"Created {len(technologies)} technologies\")" + ] + }, + { + "cell_type": "markdown", + "id": "a1b2c3d4-0000-0000-0000-000000000010", + "metadata": {}, + "source": [ + "## Step 3 — Create relationships" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a1b2c3d4-0000-0000-0000-000000000011", + "metadata": {}, + "outputs": [], + "source": [ + "edges = [\n", + " # WORKS_AT\n", + " (\"Alice Chen\", \"WORKS_AT\", \"DeepMind\", {}),\n", + " (\"Bob Torres\", \"WORKS_AT\", \"Google\", {}),\n", + " (\"Carol Smith\", \"WORKS_AT\", \"Synthex\", {\"since\": 2021}),\n", + " (\"David Park\", \"WORKS_AT\", \"OpenAI\", {}),\n", + " (\"Eva Müller\", \"WORKS_AT\", \"Synthex\", {\"since\": 2022}),\n", + " (\"Frank Liu\", \"WORKS_AT\", \"Meta\", {}),\n", + " (\"Grace Okafor\", \"WORKS_AT\", \"MIT\", {}),\n", + " (\"Henry Rossi\", \"WORKS_AT\", \"Synthex\", {\"since\": 2021}),\n", + " (\"Isla Nakamura\", \"WORKS_AT\", \"DeepMind\", {}),\n", + " (\"James Wright\", \"WORKS_AT\", \"Google\", {}),\n", + " # FOUNDED\n", + " (\"Carol Smith\", \"FOUNDED\", \"Synthex\", {\"year\": 2021}),\n", + " (\"Henry Rossi\", \"CO_FOUNDED\", \"Synthex\", {\"year\": 2021}),\n", + " # KNOWS\n", + " (\"Alice Chen\", \"KNOWS\", \"Isla Nakamura\", {}),\n", + " (\"Alice Chen\", \"KNOWS\", \"David Park\", {}),\n", + " (\"Carol Smith\", \"KNOWS\", \"Bob Torres\", {}),\n", + " (\"Grace Okafor\", \"KNOWS\", \"Alice Chen\", {}),\n", + " (\"Frank Liu\", \"KNOWS\", \"James Wright\", {}),\n", + " (\"Eva Müller\", \"KNOWS\", \"Grace Okafor\", {}),\n", + " # RESEARCHES / WORKS_ON\n", + " (\"Alice Chen\", \"RESEARCHES\", \"Reinforcement Learning\", {\"since\": 2019}),\n", + " (\"David Park\", \"RESEARCHES\", \"LLM\", {\"since\": 2020}),\n", + " (\"Grace Okafor\", \"RESEARCHES\", \"Knowledge Graph\", {\"since\": 2021}),\n", + " (\"Isla Nakamura\", \"RESEARCHES\", \"Graph Neural Network\", {\"since\": 2020}),\n", + " (\"Frank Liu\", \"RESEARCHES\", \"Graph Neural Network\", {}),\n", + " (\"Grace Okafor\", \"RESEARCHES\", \"GraphRAG\", {\"since\": 2023}),\n", + " # USES\n", + " (\"Synthex\", \"USES\", \"Knowledge Graph\", {}),\n", + " (\"Synthex\", \"USES\", \"Vector Database\", {}),\n", + " (\"Synthex\", \"USES\", \"RAG\", {}),\n", + " (\"OpenAI\", \"USES\", \"Transformer\", {}),\n", + " (\"Google\", \"USES\", \"Transformer\", {}),\n", + " # ACQUIRED\n", + " (\"Google\", \"ACQUIRED\", \"DeepMind\", {\"year\": 2014}),\n", + " # BUILDS_ON\n", + " (\"GraphRAG\", \"BUILDS_ON\", \"Knowledge Graph\", {}),\n", + " (\"GraphRAG\", \"BUILDS_ON\", \"RAG\", {}),\n", + " (\"RAG\", \"BUILDS_ON\", \"Vector Database\", {}),\n", + " (\"LLM\", \"BUILDS_ON\", \"Transformer\", {}),\n", + "]\n", + "\n", + "src_names = {p[\"name\"] for p in people}\n", + "tech_names = {t[\"name\"] for t in technologies}\n", + "company_names = {c[\"name\"] for c in companies}\n", + "\n", + "\n", + "def _label(name):\n", + " if name in src_names:\n", + " return \"Person\"\n", + " if name in tech_names:\n", + " return \"Technology\"\n", + " if name in company_names:\n", + " return \"Company\"\n", + " raise ValueError(f\"Unknown edge endpoint: {name!r}\")\n", + "\n", + "\n", + "for src, rel, dst, props in edges:\n", + " src_label = _label(src)\n", + " dst_label = _label(dst)\n", + " set_clause = \", \".join(f\"r.{k} = ${k}\" for k in props) if props else \"\"\n", + " set_part = f\" SET {set_clause}\" if set_clause else \"\"\n", + " client.cypher(\n", + " f\"MATCH (a:{src_label} {{name: $src, demo_tag: $tag}}) \"\n", + " f\"MATCH (b:{dst_label} {{name: $dst, demo_tag: $tag}}) \"\n", + " f\"MERGE (a)-[r:{rel}]->(b)\" + set_part,\n", + " params={\"src\": src, \"dst\": dst, \"tag\": DEMO_TAG, **props},\n", + " )\n", + "\n", + "print(f\"Created {len(edges)} relationships\")" + ] + }, + { + "cell_type": "markdown", + "id": "a1b2c3d4-0000-0000-0000-000000000012", + "metadata": {}, + "source": [ + "## Step 4 — Verify" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a1b2c3d4-0000-0000-0000-000000000013", + "metadata": {}, + "outputs": [], + "source": [ + "from collections import Counter\n", + "\n", + "print(\"Node counts:\")\n", + "for label in [\"Person\", \"Company\", \"Technology\"]:\n", + " rows = client.cypher(\n", + " f\"MATCH (n:{label} {{demo: true, demo_tag: $tag}}) RETURN count(n) AS count\",\n", + " params={\"tag\": DEMO_TAG},\n", + " )\n", + " print(f\" {label:15s} {rows[0]['count']}\")\n", + "\n", + "# Fetch all types and count in Python (avoids aggregation limitations)\n", + "rels = client.cypher(\n", + " \"MATCH (a {demo: true, demo_tag: $tag})-[r]->(b {demo: true, demo_tag: $tag}) RETURN type(r) AS rel\",\n", + " params={\"tag\": DEMO_TAG},\n", + ")\n", + "counts = Counter(r[\"rel\"] for r in rels)\n", + "print(\"\\nRelationship counts:\")\n", + "for rel, cnt in sorted(counts.items(), key=lambda x: -x[1]):\n", + " print(f\" {rel:20s} {cnt}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a1b2c3d4-0000-0000-0000-000000000014", + "metadata": {}, + "outputs": [], + "source": [ + "print(\"=== Who works at Synthex? ===\")\n", + "rows = client.cypher(\n", + " \"MATCH (p:Person {demo_tag: $tag})-[:WORKS_AT]->(c:Company {name: $co, demo_tag: $tag}) \"\n", + " \"RETURN p.name AS name, p.role AS role\",\n", + " params={\"co\": \"Synthex\", \"tag\": DEMO_TAG},\n", + ")\n", + "for r in rows:\n", + " print(f\" {r['name']} — {r['role']}\")\n", + "\n", + "print(\"\\n=== What does Synthex use? ===\")\n", + "rows = client.cypher(\n", + " \"MATCH (c:Company {name: $co, demo_tag: $tag})-[:USES]->(t:Technology {demo_tag: $tag}) RETURN t.name AS name\",\n", + " params={\"co\": \"Synthex\", \"tag\": DEMO_TAG},\n", + ")\n", + "for r in rows:\n", + " print(f\" {r['name']}\")\n", + "\n", + "print(\"\\n=== GraphRAG dependency chain ===\")\n", + "rows = client.cypher(\n", + " \"MATCH (t:Technology {name: $tech, demo_tag: $tag})-[:BUILDS_ON*1..3]->(dep:Technology {demo_tag: $tag}) RETURN dep.name AS dependency\",\n", + " params={\"tech\": \"GraphRAG\", \"tag\": DEMO_TAG},\n", + ")\n", + "for r in rows:\n", + " print(f\" → {r['dependency']}\")\n", + "\n", + "print(\"\\n✓ Demo data seeded.\")\n", + "print(\"To query it from notebooks 01–03, connect them to the same CoordiNode server (COORDINODE_ADDR).\")\n", + "client.close()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/demo/notebooks/01_llama_index_property_graph.ipynb b/demo/notebooks/01_llama_index_property_graph.ipynb new file mode 100644 index 0000000..4cb0b53 --- /dev/null +++ b/demo/notebooks/01_llama_index_property_graph.ipynb @@ -0,0 +1,399 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "b2c3d4e5-0001-0000-0000-000000000001", + "metadata": {}, + "source": [ + "# LlamaIndex + CoordiNode: PropertyGraphIndex\n", + "\n", + "[![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/structured-world/coordinode-python/blob/main/demo/notebooks/01_llama_index_property_graph.ipynb)\n", + "\n", + "Demonstrates `CoordinodePropertyGraphStore` as a backend for LlamaIndex `PropertyGraphIndex`.\n", + "\n", + "**What works right now:**\n", + "- `upsert_nodes` / `upsert_relations` — idempotent MERGE (safe to call multiple times)\n", + "- `get()` — look up nodes by ID or properties\n", + "- `get_triplets()` — all edges (wildcard) or filtered by relation type / entity name\n", + "- `get_rel_map()` — outgoing relations for a set of nodes (depth=1)\n", + "- `structured_query()` — arbitrary Cypher pass-through\n", + "- `delete()` — remove nodes by id or name\n", + "- `get_schema()` — live text schema of the graph\n", + "\n", + "**Environments:**\n", + "- **Google Colab** — uses `coordinode-embedded` (in-process Rust engine, no server needed). First run compiles from source (~5 min); subsequent runs use the pip cache.\n", + "- **Local / Docker Compose** — connects to a running CoordiNode server via gRPC." + ] + }, + { + "cell_type": "markdown", + "id": "b2c3d4e5-0001-0000-0000-000000000002", + "metadata": {}, + "source": [ + "## Install dependencies" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b2c3d4e5-0001-0000-0000-000000000003", + "metadata": {}, + "outputs": [], + "source": [ + "import os, sys, subprocess\n", + "\n", + "IN_COLAB = \"google.colab\" in sys.modules\n", + "\n", + "# Install coordinode-embedded in Colab only (requires Rust build).\n", + "if IN_COLAB and not os.environ.get(\"COORDINODE_ADDR\"):\n", + " # Install Rust toolchain via rustup (https://rustup.rs).\n", + " # Colab's apt packages ship rustc ≤1.75, which cannot build coordinode-embedded\n", + " # (requires Rust ≥1.80 for maturin/pyo3). apt-get is not a viable alternative here.\n", + " # Download the installer to a temp file and execute it explicitly — this avoids\n", + " # piping remote content directly into a shell while maintaining HTTPS/TLS security\n", + " # through Python's default ssl context (cert-verified, TLS 1.2+).\n", + " # SHA256 pinning of rustup-init is intentionally omitted: rustup.rs does not\n", + " # publish a stable per-release checksum for sh.rustup.rs itself (only for\n", + " # platform-specific rustup-init binaries), and pinning a hash here would break\n", + " # silently on every rustup release. The HTTPS/TLS verification + temp-file\n", + " # execution (not piped to shell) is the rustup team's recommended trust model.\n", + " # No additional env-var gate (e.g. COORDINODE_ENABLE_RUSTUP) is needed:\n", + " # the `IN_COLAB` check above already ensures this block never runs outside\n", + " # Colab sessions, so there is no risk of unintentional execution in local\n", + " # or server environments.\n", + " # Security note: downloading rustup-init via HTTPS with cert verification and\n", + " # executing from a temp file (not piped to shell) is by design — this is the\n", + " # rustup project's own recommended install method for automated environments.\n", + " import ssl as _ssl, tempfile as _tmp, urllib.request as _ur\n", + "\n", + " _ctx = _ssl.create_default_context()\n", + " with _tmp.NamedTemporaryFile(mode=\"wb\", suffix=\".sh\", delete=False) as _f:\n", + " with _ur.urlopen(\"https://sh.rustup.rs\", context=_ctx, timeout=30) as _r:\n", + " _f.write(_r.read())\n", + " _rustup_path = _f.name\n", + " try:\n", + " subprocess.run([\"/bin/sh\", _rustup_path, \"-y\", \"-q\"], check=True, timeout=300)\n", + " finally:\n", + " os.unlink(_rustup_path)\n", + " # Add cargo to PATH so maturin/pip can find it.\n", + " _cargo_bin = os.path.expanduser(\"~/.cargo/bin\")\n", + " os.environ[\"PATH\"] = f\"{_cargo_bin}{os.pathsep}{os.environ.get('PATH', '')}\"\n", + " subprocess.run([sys.executable, \"-m\", \"pip\", \"install\", \"-q\", \"maturin\"], check=True, timeout=300)\n", + " subprocess.run(\n", + " [\n", + " sys.executable,\n", + " \"-m\",\n", + " \"pip\",\n", + " \"install\",\n", + " \"-q\",\n", + " \"git+https://github.com/structured-world/coordinode-python.git@8da94d694ecaabee6f8380147d02f08220061bfa#subdirectory=coordinode-embedded\",\n", + " ],\n", + " check=True,\n", + " timeout=600,\n", + " )\n", + "\n", + "# coordinode-embedded is pinned to a specific git commit because it requires a Rust\n", + "# build (maturin/pyo3) and the embedded engine must match the Python SDK version.\n", + "# The remaining packages (coordinode, llama-index, etc.) are installed without pins:\n", + "# they are pure Python, release frequently, and pip resolves a compatible version.\n", + "subprocess.run(\n", + " [\n", + " sys.executable,\n", + " \"-m\",\n", + " \"pip\",\n", + " \"install\",\n", + " \"-q\",\n", + " \"coordinode\",\n", + " \"llama-index-graph-stores-coordinode\",\n", + " \"llama-index-core\",\n", + " \"nest_asyncio\",\n", + " ],\n", + " check=True,\n", + " timeout=300,\n", + ")\n", + "\n", + "import nest_asyncio\n", + "\n", + "nest_asyncio.apply()\n", + "\n", + "print(\"SDK installed\")" + ] + }, + { + "cell_type": "markdown", + "id": "b2c3d4e5-0001-0000-0000-000000000004", + "metadata": {}, + "source": [ + "## Adapter for embedded mode\n", + "\n", + "`LocalClient` (embedded engine) has the same `.cypher()` API as `CoordinodeClient`.\n", + "The `_EmbeddedAdapter` below adds the extra methods that the graph store expects\n", + "when it receives a pre-built `client=` object." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b2c3d4e5-0001-0000-0000-000000000005", + "metadata": {}, + "outputs": [], + "source": [ + "class _EmbeddedAdapter:\n \"\"\"Thin wrapper around LocalClient that adds CoordinodeClient-compatible methods.\"\"\"\n\n def __init__(self, local_client):\n self._lc = local_client\n\n def cypher(self, query, params=None):\n return self._lc.cypher(query, params or {})\n\n def get_schema_text(self):\n lbls = self._lc.cypher(\"MATCH (n) UNWIND labels(n) AS lbl RETURN DISTINCT lbl ORDER BY lbl\")\n rels = self._lc.cypher(\"MATCH ()-[r]->() RETURN DISTINCT type(r) AS t ORDER BY t\")\n lines = [\"Node labels:\"]\n for r in lbls:\n lines.append(f\" - {r['lbl']}\")\n lines.append(\"\\nEdge types:\")\n for r in rels:\n lines.append(f\" - {r['t']}\")\n return \"\\n\".join(lines)\n\n # Vector search not available in embedded mode — requires running CoordiNode server.\n\n def close(self):\n self._lc.close()\n" + ] + }, + { + "cell_type": "markdown", + "id": "b2c3d4e5-0001-0000-0000-000000000006", + "metadata": {}, + "source": [ + "## Connect to CoordiNode" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b2c3d4e5-0001-0000-0000-000000000007", + "metadata": {}, + "outputs": [], + "source": [ + "import os, socket\n", + "\n", + "\n", + "def _port_open(port):\n", + " try:\n", + " with socket.create_connection((\"127.0.0.1\", port), timeout=1):\n", + " return True\n", + " except OSError:\n", + " return False\n", + "\n", + "\n", + "if os.environ.get(\"COORDINODE_ADDR\"):\n", + " COORDINODE_ADDR = os.environ[\"COORDINODE_ADDR\"]\n", + " from coordinode import CoordinodeClient\n", + "\n", + " client = CoordinodeClient(COORDINODE_ADDR)\n", + " if not client.health():\n", + " client.close()\n", + " raise RuntimeError(f\"CoordiNode at {COORDINODE_ADDR} is not serving health checks\")\n", + " print(f\"Connected to {COORDINODE_ADDR}\")\n", + "else:\n", + " try:\n", + " grpc_port = int(os.environ.get(\"COORDINODE_PORT\", \"7080\"))\n", + " except ValueError as exc:\n", + " raise RuntimeError(\"COORDINODE_PORT must be an integer\") from exc\n", + "\n", + " if _port_open(grpc_port):\n", + " COORDINODE_ADDR = f\"localhost:{grpc_port}\"\n", + " from coordinode import CoordinodeClient\n", + "\n", + " client = CoordinodeClient(COORDINODE_ADDR)\n", + " if not client.health():\n", + " client.close()\n", + " raise RuntimeError(f\"CoordiNode at {COORDINODE_ADDR} is not serving health checks\")\n", + " print(f\"Connected to {COORDINODE_ADDR}\")\n", + " else:\n", + " # No server available — use the embedded in-process engine.\n", + " # Works without Docker or any external service; data is in-memory.\n", + " try:\n", + " from coordinode_embedded import LocalClient\n", + " except ImportError as exc:\n", + " raise RuntimeError(\n", + " \"coordinode-embedded is not installed. \"\n", + " \"Run: pip install git+https://github.com/structured-world/coordinode-python.git@8da94d694ecaabee6f8380147d02f08220061bfa#subdirectory=coordinode-embedded\"\n", + " \" — or start a CoordiNode server and set COORDINODE_ADDR.\"\n", + " ) from exc\n", + "\n", + " _lc = LocalClient(\":memory:\")\n", + " client = _EmbeddedAdapter(_lc)\n", + " print(\"Using embedded LocalClient (in-process)\")\n" + ] + }, + { + "cell_type": "markdown", + "id": "b2c3d4e5-0001-0000-0000-000000000008", + "metadata": {}, + "source": [ + "## Create the property graph store\n", + "\n", + "Pass the pre-built `client=` object so the store doesn't open a second connection." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b2c3d4e5-0001-0000-0000-000000000009", + "metadata": {}, + "outputs": [], + "source": [ + "from llama_index.graph_stores.coordinode import CoordinodePropertyGraphStore\n", + "from llama_index.core.graph_stores.types import EntityNode, Relation\n", + "\n", + "store = CoordinodePropertyGraphStore(client=client)\n", + "print(\"Connected. Schema:\")\n", + "print(store.get_schema()[:300])" + ] + }, + { + "cell_type": "markdown", + "id": "b2c3d4e5-0001-0000-0000-000000000010", + "metadata": {}, + "source": [ + "## 1. Upsert nodes and relations\n", + "\n", + "Each node gets a unique tag so this notebook can run multiple times without conflicts." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b2c3d4e5-0001-0000-0000-000000000011", + "metadata": {}, + "outputs": [], + "source": [ + "import uuid\n\ntag = uuid.uuid4().hex\n\nnodes = [\n EntityNode(label=\"Person\", name=f\"Alice-{tag}\", properties={\"role\": \"researcher\", \"field\": \"AI\"}),\n EntityNode(label=\"Person\", name=f\"Bob-{tag}\", properties={\"role\": \"engineer\", \"field\": \"ML\"}),\n EntityNode(label=\"Topic\", name=f\"GraphRAG-{tag}\", properties={\"domain\": \"knowledge graphs\"}),\n]\nstore.upsert_nodes(nodes)\nprint(\"Upserted nodes:\", [n.name for n in nodes])\n\nalice, bob, graphrag = nodes\nrelations = [\n Relation(label=\"RESEARCHES\", source_id=alice.id, target_id=graphrag.id, properties={\"since\": 2023}),\n Relation(label=\"COLLABORATES\", source_id=alice.id, target_id=bob.id),\n Relation(label=\"IMPLEMENTS\", source_id=bob.id, target_id=graphrag.id),\n]\nstore.upsert_relations(relations)\nprint(\"Upserted relations:\", [r.label for r in relations])" + ] + }, + { + "cell_type": "markdown", + "id": "b2c3d4e5-0001-0000-0000-000000000012", + "metadata": {}, + "source": [ + "## 2. get_triplets — all edges from a node (wildcard)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b2c3d4e5-0001-0000-0000-000000000013", + "metadata": {}, + "outputs": [], + "source": [ + "triplets = store.get_triplets(entity_names=[f\"Alice-{tag}\"])\n", + "print(f\"Triplets for Alice-{tag}:\")\n", + "for src, rel, dst in triplets:\n", + " print(f\" {src.name} --[{rel.label}]--> {dst.name}\")" + ] + }, + { + "cell_type": "markdown", + "id": "b2c3d4e5-0001-0000-0000-000000000014", + "metadata": {}, + "source": [ + "## 3. get_rel_map — relations for a set of nodes" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b2c3d4e5-0001-0000-0000-000000000015", + "metadata": {}, + "outputs": [], + "source": [ + "found_alice = store.get(properties={\"name\": f\"Alice-{tag}\"})\n", + "rel_map = store.get_rel_map(found_alice, depth=1, limit=20)\n", + "print(f\"Rel map for Alice-{tag} ({len(rel_map)} rows):\")\n", + "for src, rel, dst in rel_map:\n", + " print(f\" {src.name} --[{rel.label}]--> {dst.name}\")" + ] + }, + { + "cell_type": "markdown", + "id": "b2c3d4e5-0001-0000-0000-000000000016", + "metadata": {}, + "source": [ + "## 4. structured_query — arbitrary Cypher" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b2c3d4e5-0001-0000-0000-000000000017", + "metadata": {}, + "outputs": [], + "source": [ + "rows = store.structured_query(\n", + " \"MATCH (p:Person)-[r:RESEARCHES]->(t:Topic)\"\n", + " \" WHERE p.name STARTS WITH $prefix\"\n", + " \" RETURN p.name AS person, t.name AS topic, r.since AS since\",\n", + " param_map={\"prefix\": f\"Alice-{tag}\"},\n", + ")\n", + "print(\"Query result:\", rows)" + ] + }, + { + "cell_type": "markdown", + "id": "b2c3d4e5-0001-0000-0000-000000000018", + "metadata": {}, + "source": [ + "## 5. get_schema" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b2c3d4e5-0001-0000-0000-000000000019", + "metadata": {}, + "outputs": [], + "source": [ + "schema = store.get_schema()\n", + "print(schema)" + ] + }, + { + "cell_type": "markdown", + "id": "b2c3d4e5-0001-0000-0000-000000000020", + "metadata": {}, + "source": [ + "## 6. Idempotency — double upsert must not duplicate edges" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b2c3d4e5-0001-0000-0000-000000000021", + "metadata": {}, + "outputs": [], + "source": [ + "store.upsert_relations(relations) # second call — should still be exactly 1 edge\n", + "rows = store.structured_query(\n", + " \"MATCH (a {name: $src})-[r:RESEARCHES]->(b {name: $dst}) RETURN count(r) AS cnt\",\n", + " param_map={\"src\": f\"Alice-{tag}\", \"dst\": f\"GraphRAG-{tag}\"},\n", + ")\n", + "print(\"Edge count after double upsert (expect 1):\", rows[0][\"cnt\"])" + ] + }, + { + "cell_type": "markdown", + "id": "b2c3d4e5-0001-0000-0000-000000000022", + "metadata": {}, + "source": [ + "## Cleanup" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b2c3d4e5-0001-0000-0000-000000000023", + "metadata": {}, + "outputs": [], + "source": [ + "store.delete(entity_names=[f\"Alice-{tag}\", f\"Bob-{tag}\", f\"GraphRAG-{tag}\"])\n", + "print(\"Cleaned up\")\n", + "store.close()\n", + "client.close() # injected client — owned by caller" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/demo/notebooks/02_langchain_graph_chain.ipynb b/demo/notebooks/02_langchain_graph_chain.ipynb new file mode 100644 index 0000000..cd005ef --- /dev/null +++ b/demo/notebooks/02_langchain_graph_chain.ipynb @@ -0,0 +1,398 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "c3d4e5f6-0002-0000-0000-000000000001", + "metadata": {}, + "source": [ + "# LangChain + CoordiNode: Graph Chain\n", + "\n", + "[![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/structured-world/coordinode-python/blob/main/demo/notebooks/02_langchain_graph_chain.ipynb)\n", + "\n", + "Demonstrates `CoordinodeGraph` as a Knowledge Graph backend for LangChain.\n", + "\n", + "**What works right now:**\n", + "- `graph.query()` — arbitrary Cypher pass-through\n", + "- `graph.schema` / `refresh_schema()` — live graph schema\n", + "- `add_graph_documents()` — add Nodes + Relationships from a LangChain `GraphDocument`\n", + "- `GraphCypherQAChain` — LLM generates Cypher from a natural-language question *(requires `OPENAI_API_KEY`)*\n", + "\n", + "**Environments:**\n", + "- **Google Colab** — uses `coordinode-embedded` (in-process Rust engine, no server needed). First run compiles from source (~5 min); subsequent runs use the pip cache.\n", + "- **Local / Docker Compose** — connects to a running CoordiNode server via gRPC." + ] + }, + { + "cell_type": "markdown", + "id": "c3d4e5f6-0002-0000-0000-000000000002", + "metadata": {}, + "source": [ + "## Install dependencies" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c3d4e5f6-0002-0000-0000-000000000003", + "metadata": {}, + "outputs": [], + "source": [ + "import os, sys, subprocess\n", + "\n", + "IN_COLAB = \"google.colab\" in sys.modules\n", + "\n", + "# Install coordinode-embedded in Colab only (requires Rust build).\n", + "if IN_COLAB and not os.environ.get(\"COORDINODE_ADDR\"):\n", + " # Install Rust toolchain via rustup (https://rustup.rs).\n", + " # Colab's apt packages ship rustc ≤1.75, which cannot build coordinode-embedded\n", + " # (requires Rust ≥1.80 for maturin/pyo3). apt-get is not a viable alternative here.\n", + " # Download the installer to a temp file and execute it explicitly — this avoids\n", + " # piping remote content directly into a shell while maintaining HTTPS/TLS security\n", + " # through Python's default ssl context (cert-verified, TLS 1.2+).\n", + " # SHA256 pinning of rustup-init is intentionally omitted: rustup.rs does not\n", + " # publish a stable per-release checksum for sh.rustup.rs itself (only for\n", + " # platform-specific rustup-init binaries), and pinning a hash here would break\n", + " # silently on every rustup release. The HTTPS/TLS verification + temp-file\n", + " # execution (not piped to shell) is the rustup team's recommended trust model.\n", + " # No additional env-var gate (e.g. COORDINODE_ENABLE_RUSTUP) is needed:\n", + " # the `IN_COLAB` check above already ensures this block never runs outside\n", + " # Colab sessions, so there is no risk of unintentional execution in local\n", + " # or server environments.\n", + " # Security note: downloading rustup-init via HTTPS with cert verification and\n", + " # executing from a temp file (not piped to shell) is by design — this is the\n", + " # rustup project's own recommended install method for automated environments.\n", + " import ssl as _ssl, tempfile as _tmp, urllib.request as _ur\n", + "\n", + " _ctx = _ssl.create_default_context()\n", + " with _tmp.NamedTemporaryFile(mode=\"wb\", suffix=\".sh\", delete=False) as _f:\n", + " with _ur.urlopen(\"https://sh.rustup.rs\", context=_ctx, timeout=30) as _r:\n", + " _f.write(_r.read())\n", + " _rustup_path = _f.name\n", + " try:\n", + " subprocess.run([\"/bin/sh\", _rustup_path, \"-y\", \"-q\"], check=True, timeout=300)\n", + " finally:\n", + " os.unlink(_rustup_path)\n", + " # Add cargo to PATH so maturin/pip can find it.\n", + " _cargo_bin = os.path.expanduser(\"~/.cargo/bin\")\n", + " os.environ[\"PATH\"] = f\"{_cargo_bin}{os.pathsep}{os.environ.get('PATH', '')}\"\n", + " subprocess.run([sys.executable, \"-m\", \"pip\", \"install\", \"-q\", \"maturin\"], check=True, timeout=300)\n", + " subprocess.run(\n", + " [\n", + " sys.executable,\n", + " \"-m\",\n", + " \"pip\",\n", + " \"install\",\n", + " \"-q\",\n", + " \"git+https://github.com/structured-world/coordinode-python.git@8da94d694ecaabee6f8380147d02f08220061bfa#subdirectory=coordinode-embedded\",\n", + " ],\n", + " check=True,\n", + " timeout=600,\n", + " )\n", + "\n", + "# coordinode-embedded and langchain-coordinode are pinned to a specific git commit:\n", + "# - coordinode-embedded requires a Rust build (maturin/pyo3); the embedded engine\n", + "# must match the Python SDK version.\n", + "# - langchain-coordinode is pinned to the same commit so CoordinodeGraph(client=...)\n", + "# is available; this parameter is not yet released to PyPI.\n", + "# The remaining packages (coordinode, LangChain, etc.) are installed without pins:\n", + "# they are pure Python, release frequently, and pip resolves a compatible version.\n", + "subprocess.run(\n", + " [\n", + " sys.executable,\n", + " \"-m\",\n", + " \"pip\",\n", + " \"install\",\n", + " \"-q\",\n", + " \"coordinode\",\n", + " \"langchain\",\n", + " \"git+https://github.com/structured-world/coordinode-python.git@8da94d694ecaabee6f8380147d02f08220061bfa#subdirectory=langchain-coordinode\",\n", + " \"langchain-community\",\n", + " \"langchain-openai\",\n", + " \"nest_asyncio\",\n", + " ],\n", + " check=True,\n", + " timeout=300,\n", + ")\n", + "\n", + "import nest_asyncio\n", + "nest_asyncio.apply()\n", + "\n", + "print(\"SDK installed\")" + ] + }, + { + "cell_type": "markdown", + "id": "c3d4e5f6-0002-0000-0000-000000000004", + "metadata": {}, + "source": [ + "## Adapter for embedded mode\n", + "\n", + "`LocalClient` (embedded engine) has the same `.cypher()` API as `CoordinodeClient`.\n", + "The `_EmbeddedAdapter` below adds the extra methods that `CoordinodeGraph` expects\n", + "when it receives a pre-built `client=` object." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c3d4e5f6-0002-0000-0000-000000000005", + "metadata": {}, + "outputs": [], + "source": [ + "class _EmbeddedAdapter:\n \"\"\"Thin wrapper around LocalClient that adds CoordinodeClient-compatible methods.\"\"\"\n\n def __init__(self, local_client):\n self._lc = local_client\n\n def cypher(self, query, params=None):\n return self._lc.cypher(query, params or {})\n\n def get_schema_text(self):\n lbls = self._lc.cypher(\"MATCH (n) UNWIND labels(n) AS lbl RETURN DISTINCT lbl ORDER BY lbl\")\n rels = self._lc.cypher(\"MATCH ()-[r]->() RETURN DISTINCT type(r) AS t ORDER BY t\")\n lines = [\"Node labels:\"]\n for r in lbls:\n lines.append(f\" - {r['lbl']}\")\n lines.append(\"\\nEdge types:\")\n for r in rels:\n lines.append(f\" - {r['t']}\")\n return \"\\n\".join(lines)\n\n # Vector search not available in embedded mode — requires running CoordiNode server.\n\n def close(self):\n self._lc.close()\n" + ] + }, + { + "cell_type": "markdown", + "id": "c3d4e5f6-0002-0000-0000-000000000006", + "metadata": {}, + "source": [ + "## Connect to CoordiNode" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c3d4e5f6-0002-0000-0000-000000000007", + "metadata": {}, + "outputs": [], + "source": [ + "import os, socket\n", + "\n", + "\n", + "def _port_open(port):\n", + " try:\n", + " with socket.create_connection((\"127.0.0.1\", port), timeout=1):\n", + " return True\n", + " except OSError:\n", + " return False\n", + "\n", + "\n", + "if os.environ.get(\"COORDINODE_ADDR\"):\n", + " COORDINODE_ADDR = os.environ[\"COORDINODE_ADDR\"]\n", + " from coordinode import CoordinodeClient\n", + "\n", + " client = CoordinodeClient(COORDINODE_ADDR)\n", + " if not client.health():\n", + " raise RuntimeError(f\"Health check failed for {COORDINODE_ADDR}\")\n", + " print(f\"Connected to {COORDINODE_ADDR}\")\n", + "else:\n", + " try:\n", + " grpc_port = int(os.environ.get(\"COORDINODE_PORT\", \"7080\"))\n", + " except ValueError as exc:\n", + " raise RuntimeError(\"COORDINODE_PORT must be an integer\") from exc\n", + "\n", + " if _port_open(grpc_port):\n", + " COORDINODE_ADDR = f\"localhost:{grpc_port}\"\n", + " from coordinode import CoordinodeClient\n", + "\n", + " client = CoordinodeClient(COORDINODE_ADDR)\n", + " if not client.health():\n", + " raise RuntimeError(f\"Health check failed for {COORDINODE_ADDR}\")\n", + " print(f\"Connected to {COORDINODE_ADDR}\")\n", + " else:\n", + " # No server available — use the embedded in-process engine.\n", + " # Works without Docker or any external service; data is in-memory.\n", + " try:\n", + " from coordinode_embedded import LocalClient\n", + " except ImportError as exc:\n", + " raise RuntimeError(\n", + " \"coordinode-embedded is not installed. \"\n", + " \"Run: pip install git+https://github.com/structured-world/coordinode-python.git@8da94d694ecaabee6f8380147d02f08220061bfa#subdirectory=coordinode-embedded\"\n", + " \" — or start a CoordiNode server and set COORDINODE_ADDR.\"\n", + " ) from exc\n", + "\n", + " _lc = LocalClient(\":memory:\")\n", + " client = _EmbeddedAdapter(_lc)\n", + " print(\"Using embedded LocalClient (in-process)\")" + ] + }, + { + "cell_type": "markdown", + "id": "c3d4e5f6-0002-0000-0000-000000000008", + "metadata": {}, + "source": [ + "## Create the graph store\n", + "\n", + "Pass the pre-built `client=` object so the store doesn't open a second connection." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c3d4e5f6-0002-0000-0000-000000000009", + "metadata": {}, + "outputs": [], + "source": [ + "import os, uuid\n", + "from langchain_coordinode import CoordinodeGraph\n", + "from langchain_community.graphs.graph_document import GraphDocument, Node, Relationship\n", + "from langchain_core.documents import Document\n", + "\n", + "graph = CoordinodeGraph(client=client)\n", + "print(\"Connected. Schema preview:\")\n", + "print(graph.schema[:300])" + ] + }, + { + "cell_type": "markdown", + "id": "c3d4e5f6-0002-0000-0000-000000000010", + "metadata": {}, + "source": [ + "## 1. add_graph_documents\n", + "\n", + "LangChain `GraphDocument` wraps nodes and relationships from an LLM extraction pipeline.\n", + "Here we build one manually to show the insert path." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c3d4e5f6-0002-0000-0000-000000000011", + "metadata": {}, + "outputs": [], + "source": [ + "tag = uuid.uuid4().hex\n\nnodes = [\n Node(id=f\"Turing-{tag}\", type=\"Scientist\", properties={\"born\": 1912, \"field\": \"computer science\"}),\n Node(id=f\"Shannon-{tag}\", type=\"Scientist\", properties={\"born\": 1916, \"field\": \"information theory\"}),\n Node(id=f\"Cryptography-{tag}\", type=\"Field\", properties={\"era\": \"modern\"}),\n]\nrels = [\n Relationship(source=nodes[0], target=nodes[2], type=\"FOUNDED\", properties={\"year\": 1936}),\n Relationship(source=nodes[1], target=nodes[2], type=\"CONTRIBUTED_TO\"),\n Relationship(source=nodes[0], target=nodes[1], type=\"INFLUENCED\"),\n]\ndoc = GraphDocument(nodes=nodes, relationships=rels, source=Document(page_content=\"Turing and Shannon\"))\ngraph.add_graph_documents([doc])\nprint(\"Documents added\")" + ] + }, + { + "cell_type": "markdown", + "id": "c3d4e5f6-0002-0000-0000-000000000012", + "metadata": {}, + "source": [ + "## 2. query — direct Cypher" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c3d4e5f6-0002-0000-0000-000000000013", + "metadata": {}, + "outputs": [], + "source": [ + "rows = graph.query(\n", + " \"MATCH (s:Scientist)-[r]->(f:Field)\"\n", + " \" WHERE s.name STARTS WITH $prefix\"\n", + " \" RETURN s.name AS scientist, type(r) AS relation, f.name AS field\",\n", + " params={\"prefix\": f\"Turing-{tag}\"},\n", + ")\n", + "print(\"Scientists → Fields:\")\n", + "for r in rows:\n", + " print(f\" {r['scientist']} --[{r['relation']}]--> {r['field']}\")" + ] + }, + { + "cell_type": "markdown", + "id": "c3d4e5f6-0002-0000-0000-000000000014", + "metadata": {}, + "source": [ + "## 3. refresh_schema — structured_schema dict" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c3d4e5f6-0002-0000-0000-000000000015", + "metadata": {}, + "outputs": [], + "source": [ + "graph.refresh_schema()\n", + "print(\"node_props keys:\", list(graph.structured_schema.get(\"node_props\", {}).keys())[:10])\n", + "print(\"relationships:\", graph.structured_schema.get(\"relationships\", [])[:5])" + ] + }, + { + "cell_type": "markdown", + "id": "c3d4e5f6-0002-0000-0000-000000000016", + "metadata": {}, + "source": [ + "## 4. Idempotency check\n", + "\n", + "`add_graph_documents` uses MERGE internally — adding the same document twice must not\n", + "create duplicate edges." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c3d4e5f6-0002-0000-0000-000000000017", + "metadata": {}, + "outputs": [], + "source": [ + "graph.add_graph_documents([doc]) # second upsert — must not create a duplicate edge\n", + "cnt = graph.query(\n", + " \"MATCH (a {name: $src})-[r:FOUNDED]->(b {name: $dst}) RETURN count(r) AS cnt\",\n", + " params={\"src\": f\"Turing-{tag}\", \"dst\": f\"Cryptography-{tag}\"},\n", + ")\n", + "print(\"FOUNDED edge count after double upsert (expect 1):\", cnt[0][\"cnt\"])" + ] + }, + { + "cell_type": "markdown", + "id": "c3d4e5f6-0002-0000-0000-000000000018", + "metadata": {}, + "source": [ + "## 5. GraphCypherQAChain — LLM-powered Cypher (optional)\n", + "\n", + "> **This section requires `OPENAI_API_KEY`.** Set it in your environment or via\n", + "> `os.environ['OPENAI_API_KEY'] = 'sk-...'` before running.\n", + "> The cell is skipped automatically when the key is absent." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c3d4e5f6-0002-0000-0000-000000000019", + "metadata": {}, + "outputs": [], + "source": [ + "if not os.environ.get(\"OPENAI_API_KEY\"):\n", + " print(\n", + " 'Skipping: OPENAI_API_KEY is not set. Set it via os.environ[\"OPENAI_API_KEY\"] = \"sk-...\" and re-run this cell.'\n", + " )\n", + "else:\n", + " from langchain.chains import GraphCypherQAChain\n", + " from langchain_openai import ChatOpenAI\n", + "\n", + " chain = GraphCypherQAChain.from_llm(\n", + " ChatOpenAI(model=\"gpt-4o-mini\", temperature=0),\n", + " graph=graph,\n", + " verbose=True,\n", + " allow_dangerous_requests=True,\n", + " )\n", + " result = chain.invoke(f\"Who influenced Shannon-{tag}?\")\n", + " print(\"Answer:\", result[\"result\"])" + ] + }, + { + "cell_type": "markdown", + "id": "c3d4e5f6-0002-0000-0000-000000000020", + "metadata": {}, + "source": [ + "## Cleanup" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c3d4e5f6-0002-0000-0000-000000000021", + "metadata": {}, + "outputs": [], + "source": [ + "# DETACH DELETE atomically removes all edges then the node in one operation.\n# Two-step MATCH (n)-[r]-() / DELETE r / DELETE n is avoided because an\n# undirected MATCH returns each edge from both endpoints, so the second pass\n# fails with \"cannot delete node with connected edges\".\ngraph.query(\"MATCH (n) WHERE n.name ENDS WITH $tag DETACH DELETE n\", params={\"tag\": tag})\nprint(\"Cleaned up\")\ngraph.close()\nclient.close() # injected client — owned by caller" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/demo/notebooks/03_langgraph_agent.ipynb b/demo/notebooks/03_langgraph_agent.ipynb new file mode 100644 index 0000000..5eae0b8 --- /dev/null +++ b/demo/notebooks/03_langgraph_agent.ipynb @@ -0,0 +1,517 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "d4e5f6a7-0003-0000-0000-000000000001", + "metadata": {}, + "source": [ + "# LangGraph + CoordiNode: Agent with graph memory\n", + "\n", + "[![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/structured-world/coordinode-python/blob/main/demo/notebooks/03_langgraph_agent.ipynb)\n", + "\n", + "Demonstrates a LangGraph agent that uses CoordiNode as persistent **graph memory**:\n", + "- `save_fact` — store a subject → relation → object triple in the graph\n", + "- `query_facts` — run an arbitrary Cypher query against the graph\n", + "- `find_related` — traverse the graph from a given entity\n", + "- `list_all_facts` — dump every fact in the current session\n", + "\n", + "**Works without OpenAI** — the mock demo section calls tools directly. \n", + "Set `OPENAI_API_KEY` to run the full `gpt-4o-mini` ReAct agent.\n", + "\n", + "**Environments:**\n", + "- **Google Colab** — uses `coordinode-embedded` (in-process Rust engine, no server needed). First run compiles from source (~5 min); subsequent runs use the pip cache.\n", + "- **Local / Docker Compose** — connects to a running CoordiNode server via gRPC." + ] + }, + { + "cell_type": "markdown", + "id": "d4e5f6a7-0003-0000-0000-000000000002", + "metadata": {}, + "source": [ + "## Install dependencies" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d4e5f6a7-0003-0000-0000-000000000003", + "metadata": {}, + "outputs": [], + "source": [ + "import os, sys, subprocess\n", + "\n", + "IN_COLAB = \"google.colab\" in sys.modules\n", + "_EMBEDDED_PIP_SPEC = (\n", + " \"git+https://github.com/structured-world/coordinode-python.git\"\n", + " \"@8da94d694ecaabee6f8380147d02f08220061bfa#subdirectory=coordinode-embedded\"\n", + ")\n", + "\n", + "# Install coordinode-embedded in Colab only (requires Rust build).\n", + "if IN_COLAB and not os.environ.get(\"COORDINODE_ADDR\"):\n", + " # Install Rust toolchain via rustup (https://rustup.rs).\n", + " # Colab's apt packages ship rustc ≤1.75, which cannot build coordinode-embedded\n", + " # (requires Rust ≥1.80 for maturin/pyo3). apt-get is not a viable alternative here.\n", + " # Download the installer to a temp file and execute it explicitly — this avoids\n", + " # piping remote content directly into a shell while maintaining HTTPS/TLS security\n", + " # through Python's default ssl context (cert-verified, TLS 1.2+).\n", + " # SHA256 pinning of rustup-init is intentionally omitted: rustup.rs does not\n", + " # publish a stable per-release checksum for sh.rustup.rs itself (only for\n", + " # platform-specific rustup-init binaries), and pinning a hash here would break\n", + " # silently on every rustup release. The HTTPS/TLS verification + temp-file\n", + " # execution (not piped to shell) is the rustup team's recommended trust model.\n", + " # Skip embedded build if COORDINODE_ADDR is set — user has a gRPC server,\n", + " # no need to spend 5+ minutes building coordinode-embedded from source.\n", + " # The `IN_COLAB` check already guards against local/server environments.\n", + " # Security note: downloading rustup-init via HTTPS with cert verification and\n", + " # executing from a temp file (not piped to shell) is by design — this is the\n", + " # rustup project's own recommended install method for automated environments.\n", + " import ssl as _ssl, tempfile as _tmp, urllib.request as _ur\n", + "\n", + " _ctx = _ssl.create_default_context()\n", + " with _tmp.NamedTemporaryFile(mode=\"wb\", suffix=\".sh\", delete=False) as _f:\n", + " with _ur.urlopen(\"https://sh.rustup.rs\", context=_ctx, timeout=30) as _r:\n", + " _f.write(_r.read())\n", + " _rustup_path = _f.name\n", + " try:\n", + " subprocess.run([\"/bin/sh\", _rustup_path, \"-y\", \"-q\"], check=True, timeout=300)\n", + " finally:\n", + " os.unlink(_rustup_path)\n", + " # Add cargo to PATH so maturin/pip can find it.\n", + " _cargo_bin = os.path.expanduser(\"~/.cargo/bin\")\n", + " os.environ[\"PATH\"] = f\"{_cargo_bin}{os.pathsep}{os.environ.get('PATH', '')}\"\n", + " subprocess.run([sys.executable, \"-m\", \"pip\", \"install\", \"-q\", \"maturin\"], check=True, timeout=300)\n", + " subprocess.run(\n", + " [\n", + " sys.executable,\n", + " \"-m\",\n", + " \"pip\",\n", + " \"install\",\n", + " \"-q\",\n", + " _EMBEDDED_PIP_SPEC,\n", + " ],\n", + " check=True,\n", + " timeout=600,\n", + " )\n", + "\n", + "subprocess.run(\n", + " [\n", + " sys.executable,\n", + " \"-m\",\n", + " \"pip\",\n", + " \"install\",\n", + " \"-q\",\n", + " \"coordinode\",\n", + " \"langchain-community\",\n", + " \"langchain-openai\",\n", + " \"langgraph\",\n", + " \"nest_asyncio\",\n", + " ],\n", + " check=True,\n", + " timeout=300,\n", + ")\n", + "\n", + "import nest_asyncio\n", + "nest_asyncio.apply()\n", + "\n", + "print(\"SDK installed\")" + ] + }, + { + "cell_type": "markdown", + "id": "d4e5f6a7-0003-0000-0000-000000000004", + "metadata": {}, + "source": [ + "## Connect to CoordiNode\n", + "\n", + "This notebook uses `.cypher()` directly, which is identical on both `LocalClient`\n", + "(embedded) and `CoordinodeClient` (gRPC). No adapter needed." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d4e5f6a7-0003-0000-0000-000000000005", + "metadata": {}, + "outputs": [], + "source": [ + "import os, socket\n", + "\n", + "\n", + "def _port_open(port):\n", + " try:\n", + " with socket.create_connection((\"127.0.0.1\", port), timeout=1):\n", + " return True\n", + " except OSError:\n", + " return False\n", + "\n", + "\n", + "_use_embedded = True\n", + "\n", + "if os.environ.get(\"COORDINODE_ADDR\"):\n", + " # Explicit address — fail hard if health check fails.\n", + " COORDINODE_ADDR = os.environ[\"COORDINODE_ADDR\"]\n", + " from coordinode import CoordinodeClient\n", + "\n", + " client = CoordinodeClient(COORDINODE_ADDR)\n", + " if not client.health():\n", + " client.close()\n", + " raise RuntimeError(f\"CoordiNode at {COORDINODE_ADDR} is not serving health checks\")\n", + " print(f\"Connected to {COORDINODE_ADDR}\")\n", + " _use_embedded = False\n", + "else:\n", + " try:\n", + " grpc_port = int(os.environ.get(\"COORDINODE_PORT\", \"7080\"))\n", + " except ValueError as exc:\n", + " raise RuntimeError(\"COORDINODE_PORT must be an integer\") from exc\n", + "\n", + " if _port_open(grpc_port):\n", + " COORDINODE_ADDR = f\"localhost:{grpc_port}\"\n", + " from coordinode import CoordinodeClient\n", + "\n", + " client = CoordinodeClient(COORDINODE_ADDR)\n", + " if client.health():\n", + " print(f\"Connected to {COORDINODE_ADDR}\")\n", + " _use_embedded = False\n", + " else:\n", + " # Port is open but not a CoordiNode server — fall through to embedded.\n", + " client.close()\n", + "\n", + "if _use_embedded:\n", + " # No server available — use the embedded in-process engine.\n", + " # Works without Docker or any external service; data is in-memory.\n", + " try:\n", + " from coordinode_embedded import LocalClient\n", + " except ImportError as exc:\n", + " # _EMBEDDED_PIP_SPEC is defined in the install cell; fall back to the\n", + " # pinned spec so this cell remains runnable if executed standalone.\n", + " _pip_spec = globals().get(\n", + " \"_EMBEDDED_PIP_SPEC\",\n", + " \"git+https://github.com/structured-world/coordinode-python.git\"\n", + " \"@8da94d694ecaabee6f8380147d02f08220061bfa#subdirectory=coordinode-embedded\",\n", + " )\n", + " raise RuntimeError(\n", + " \"coordinode-embedded is not installed. \"\n", + " f\"Run: pip install {_pip_spec}\"\n", + " \" — or start a CoordiNode server and set COORDINODE_ADDR.\"\n", + " ) from exc\n", + "\n", + " client = LocalClient(\":memory:\")\n", + " print(\"Using embedded LocalClient (in-process)\")\n" + ] + }, + { + "cell_type": "markdown", + "id": "d4e5f6a7-0003-0000-0000-000000000006", + "metadata": {}, + "source": [ + "## 1. Define LangChain tools\n", + "\n", + "Each tool maps a natural-language action to a Cypher operation.\n", + "A `SESSION` UUID isolates this demo's data from other concurrent runs." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d4e5f6a7-0003-0000-0000-000000000007", + "metadata": {}, + "outputs": [], + "source": [ + "import os, re, uuid\n", + "from langchain_core.tools import tool\n", + "\n", + "SESSION = uuid.uuid4().hex # isolates this demo's data from other sessions\n", + "\n", + "_REL_TYPE_RE = re.compile(r\"[A-Z_][A-Z0-9_]*\")\n", + "# Regex guards for query_facts (demo safety guard).\n", + "_WRITE_CLAUSE_RE = re.compile(\n", + " r\"\\b(CREATE|MERGE|DELETE|DETACH|SET|REMOVE|DROP|CALL|LOAD)\\b\",\n", + " re.IGNORECASE | re.DOTALL,\n", + ")\n", + "# NOTE: this guard checks that AT LEAST ONE node pattern carries session scope.\n", + "# A Cartesian-product query such as `MATCH (n), (m {session: $sess}) RETURN n`\n", + "# would pass yet return unscoped rows for `n`. A complete per-alias check would\n", + "# require parsing the Cypher AST, which is out of scope for a demo safety guard.\n", + "# In production code, use server-side row-level security instead of client regex.\n", + "_SESSION_WHERE_SCOPE_RE = re.compile(\n", + " r\"WHERE\\b[^;{}]*\\.session\\s*=\\s*\\$sess\",\n", + " re.IGNORECASE | re.DOTALL,\n", + ")\n", + "_SESSION_NODE_SCOPE_RE = re.compile(\n", + " r\"\\([^)]*\\{[^}]*session\\s*:\\s*\\$sess[^}]*\\}[^)]*\\)\",\n", + " re.IGNORECASE | re.DOTALL,\n", + ")\n", + "\n", + "\n", + "@tool\n", + "def save_fact(subject: str, relation: str, obj: str) -> str:\n", + " \"\"\"Save a fact (subject → relation → object) into the knowledge graph.\n", + " Example: save_fact('Alice', 'WORKS_AT', 'Acme Corp')\"\"\"\n", + " rel_type = relation.upper().replace(\" \", \"_\")\n", + " # Validate rel_type before interpolating into Cypher to prevent injection.\n", + " if not _REL_TYPE_RE.fullmatch(rel_type):\n", + " return f\"Invalid relation type {relation!r}: only letters, digits, and underscores allowed\"\n", + " client.cypher(\n", + " f\"MERGE (a:Entity {{name: $s, session: $sess}}) \"\n", + " f\"MERGE (b:Entity {{name: $o, session: $sess}}) \"\n", + " f\"MERGE (a)-[r:{rel_type}]->(b)\",\n", + " params={\"s\": subject, \"o\": obj, \"sess\": SESSION},\n", + " )\n", + " return f\"Saved: {subject} -[{rel_type}]-> {obj}\"\n", + "\n", + "\n", + "@tool\n", + "def query_facts(cypher: str) -> str:\n", + " \"\"\"Run a read-only Cypher MATCH query against the knowledge graph.\n", + " Must scope reads via either WHERE .session = $sess\n", + " or a node pattern {session: $sess}.\"\"\"\n", + " q = cypher.strip()\n", + " if _WRITE_CLAUSE_RE.search(q):\n", + " return \"Only read-only Cypher is allowed in query_facts.\"\n", + " # Require $sess in a WHERE clause or node pattern, not just anywhere.\n", + " # Accepts both: WHERE n.session = $sess and MATCH (n {session: $sess})\n", + " if not (_SESSION_WHERE_SCOPE_RE.search(q) or _SESSION_NODE_SCOPE_RE.search(q)):\n", + " return \"Query must scope reads to the current session with either WHERE .session = $sess or {session: $sess}\"\n", + " # Enforce a result cap so agents cannot dump the entire graph.\n", + " # Cap explicit LIMIT to 20 (preserves smaller limits like LIMIT 1),\n", + " # or append LIMIT 20 when no LIMIT clause is present.\n", + " _LIMIT_AT_END_RE = re.compile(r\"\\bLIMIT\\s+(\\d+)\\s*;?\\s*$\", re.IGNORECASE | re.DOTALL)\n", + " def _cap_limit(m):\n", + " return f\"LIMIT {min(int(m.group(1)), 20)}\"\n", + " # Reject queries with parameters other than $sess — only {\"sess\": SESSION} is passed.\n", + " extra_params = sorted(\n", + " {m.group(1) for m in re.finditer(r\"\\$([A-Za-z_][A-Za-z0-9_]*)\", q)} - {\"sess\"}\n", + " )\n", + " if extra_params:\n", + " return f\"Only $sess is supported in query_facts; found additional parameters: {', '.join(extra_params)}\"\n", + " if _LIMIT_AT_END_RE.search(q):\n", + " q = _LIMIT_AT_END_RE.sub(_cap_limit, q)\n", + " elif re.search(r\"\\bLIMIT\\b\", q, re.IGNORECASE):\n", + " # Respect caller-provided non-terminal LIMIT clauses;\n", + " # appending a second LIMIT would produce invalid Cypher.\n", + " pass\n", + " else:\n", + " q = q.rstrip().rstrip(\";\") + \" LIMIT 20\"\n", + " rows = client.cypher(q, params={\"sess\": SESSION})\n", + " return str(rows) if rows else \"No results\"\n", + "\n", + "\n", + "@tool\n", + "def find_related(entity_name: str, depth: int = 1) -> str:\n", + " \"\"\"Find all entities reachable from entity_name within the given number of hops (max 3).\"\"\"\n", + " safe_depth = max(1, min(int(depth), 3))\n", + " # Note: session constraint is on both endpoints (n, m). Constraining\n", + " # intermediate nodes via path variables (MATCH p=..., WHERE ALL(x IN nodes(p)...))\n", + " # is not yet supported by CoordiNode — planned for a future release.\n", + " # In practice, session isolation holds because all nodes are MERGE'd with\n", + " # their session scope, so cross-session paths cannot form.\n", + " rows = client.cypher(\n", + " f\"MATCH (n:Entity {{name: $name, session: $sess}})-[*1..{safe_depth}]->(m:Entity {{session: $sess}}) \"\n", + " \"RETURN DISTINCT m.name AS related LIMIT 20\",\n", + " params={\"name\": entity_name, \"sess\": SESSION},\n", + " )\n", + " if not rows:\n", + " return f\"No related entities found for {entity_name}\"\n", + " return \"\\n\".join(r['related'] for r in rows)\n", + "\n", + "\n", + "@tool\n", + "def list_all_facts() -> str:\n", + " \"\"\"List every fact stored in the current session's knowledge graph.\"\"\"\n", + " rows = client.cypher(\n", + " \"MATCH (a:Entity {session: $sess})-[r]->(b:Entity {session: $sess}) \"\n", + " \"RETURN a.name AS subject, type(r) AS relation, b.name AS object\",\n", + " params={\"sess\": SESSION},\n", + " )\n", + " if not rows:\n", + " return \"No facts stored yet\"\n", + " return \"\\n\".join(f\"{r['subject']} -[{r['relation']}]-> {r['object']}\" for r in rows)\n", + "\n", + "\n", + "tools = [save_fact, query_facts, find_related, list_all_facts]\n", + "print(f\"Session: {SESSION}\")\n", + "print(\"Tools:\", [t.name for t in tools])" + ] + }, + { + "cell_type": "markdown", + "id": "d4e5f6a7-0003-0000-0000-000000000008", + "metadata": {}, + "source": [ + "## 2. Mock demo — no LLM required (direct tool calls)\n", + "\n", + "Shows the full graph memory workflow by calling the tools directly." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d4e5f6a7-0003-0000-0000-000000000009", + "metadata": {}, + "outputs": [], + "source": [ + "ACME_CORP = \"Acme Corp\" # constant used in several save_fact calls below\n", + "\n", + "print(\"=== Saving facts ===\")\n", + "print(save_fact.invoke({\"subject\": \"Alice\", \"relation\": \"WORKS_AT\", \"obj\": ACME_CORP}))\n", + "print(save_fact.invoke({\"subject\": \"Alice\", \"relation\": \"MANAGES\", \"obj\": \"Bob\"}))\n", + "print(save_fact.invoke({\"subject\": \"Bob\", \"relation\": \"WORKS_AT\", \"obj\": ACME_CORP}))\n", + "print(save_fact.invoke({\"subject\": ACME_CORP, \"relation\": \"LOCATED_IN\", \"obj\": \"Berlin\"}))\n", + "print(save_fact.invoke({\"subject\": \"Alice\", \"relation\": \"KNOWS\", \"obj\": \"Charlie\"}))\n", + "print(save_fact.invoke({\"subject\": \"Charlie\", \"relation\": \"EXPERT_IN\", \"obj\": \"Machine Learning\"}))\n", + "\n", + "print(\"\\n=== All facts in session ===\")\n", + "print(list_all_facts.invoke({}))\n", + "\n", + "print(\"\\n=== Related to Alice (depth=1) ===\")\n", + "print(find_related.invoke({\"entity_name\": \"Alice\", \"depth\": 1}))\n", + "\n", + "print(\"\\n=== Related to Alice (depth=2) ===\")\n", + "print(find_related.invoke({\"entity_name\": \"Alice\", \"depth\": 2}))\n", + "\n", + "print(\"\\n=== Cypher query: who works at Acme Corp? ===\")\n", + "print(\n", + " query_facts.invoke(\n", + " {\n", + " \"cypher\": f'MATCH (p:Entity {{session: $sess}})-[:WORKS_AT]->(c:Entity {{name: \"{ACME_CORP}\", session: $sess}}) RETURN p.name AS employee'\n", + " }\n", + " )\n", + ")\n" + ] + }, + { + "cell_type": "markdown", + "id": "d4e5f6a7-0003-0000-0000-000000000010", + "metadata": {}, + "source": [ + "## 3. LangGraph StateGraph — manual workflow\n", + "\n", + "Shows how to wire CoordiNode tool calls into a LangGraph state machine without an LLM." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d4e5f6a7-0003-0000-0000-000000000011", + "metadata": {}, + "outputs": [], + "source": [ + "from langgraph.graph import StateGraph, END\n", + "from typing import TypedDict, Annotated\n", + "import operator\n", + "\n", + "\n", + "class AgentState(TypedDict):\n", + " messages: Annotated[list[str], operator.add]\n", + " facts_saved: int\n", + "\n", + "\n", + "def extract_and_save_node(state: AgentState) -> AgentState:\n", + " \"\"\"Simulates entity extraction: saves a hardcoded fact.\n", + " In production this node would call an LLM to extract entities from the last message.\"\"\"\n", + " result = save_fact.invoke({\"subject\": \"DemoSubject\", \"relation\": \"DEMO_REL\", \"obj\": \"DemoObject\"})\n", + " return {\"messages\": [f\"[extract] {result}\"], \"facts_saved\": state[\"facts_saved\"] + 1}\n", + "\n", + "\n", + "def query_node(state: AgentState) -> AgentState:\n", + " \"\"\"Reads the graph and appends a summary to messages.\"\"\"\n", + " result = list_all_facts.invoke({})\n", + " return {\"messages\": [f\"[query] Facts: {result[:200]}\"], \"facts_saved\": state[\"facts_saved\"]}\n", + "\n", + "\n", + "def should_query(state: AgentState) -> str:\n", + " return \"query\" if state[\"facts_saved\"] >= 1 else \"extract\"\n", + "\n", + "\n", + "builder = StateGraph(AgentState)\n", + "builder.add_node(\"extract\", extract_and_save_node)\n", + "builder.add_node(\"query\", query_node)\n", + "builder.set_entry_point(\"extract\")\n", + "builder.add_conditional_edges(\"extract\", should_query, {\"query\": \"query\", \"extract\": \"extract\"})\n", + "builder.add_edge(\"query\", END)\n", + "\n", + "graph_agent = builder.compile()\n", + "\n", + "result = graph_agent.invoke({\"messages\": [\"Tell me about Alice\"], \"facts_saved\": 0})\n", + "print(\"Graph agent output:\")\n", + "for msg in result[\"messages\"]:\n", + " print(\" \", msg)" + ] + }, + { + "cell_type": "markdown", + "id": "d4e5f6a7-0003-0000-0000-000000000012", + "metadata": {}, + "source": [ + "## 4. LangGraph ReAct agent (optional — requires OPENAI_API_KEY)\n", + "\n", + "> Set `OPENAI_API_KEY` in your environment or via\n", + "> `os.environ['OPENAI_API_KEY'] = 'sk-...'` before running.\n", + "> The cell is skipped automatically when the key is absent." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d4e5f6a7-0003-0000-0000-000000000013", + "metadata": {}, + "outputs": [], + "source": [ + "if not os.environ.get(\"OPENAI_API_KEY\"):\n", + " print(\"OPENAI_API_KEY not set — skipping LLM agent. See section 2 for the mock demo.\")\n", + "else:\n", + " from langchain_openai import ChatOpenAI\n", + " from langgraph.prebuilt import create_react_agent\n", + "\n", + " llm = ChatOpenAI(model=\"gpt-4o-mini\", temperature=0)\n", + " agent = create_react_agent(llm, tools)\n", + "\n", + " print(\"Agent ready. Running demo conversation...\")\n", + " messages = [\n", + " {\n", + " \"role\": \"user\",\n", + " \"content\": \"Save these facts: Alice works at Acme Corp, Alice manages Bob, Acme Corp is in Berlin.\",\n", + " },\n", + " {\"role\": \"user\", \"content\": \"Who does Alice manage?\"},\n", + " {\"role\": \"user\", \"content\": \"What are all the facts about Alice?\"},\n", + " ]\n", + " for msg in messages:\n", + " print(f\"\\n>>> {msg['content']}\")\n", + " result = agent.invoke({\"messages\": [msg]})\n", + " print(\"Agent:\", result[\"messages\"][-1].content)" + ] + }, + { + "cell_type": "markdown", + "id": "d4e5f6a7-0003-0000-0000-000000000014", + "metadata": {}, + "source": [ + "## Cleanup" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d4e5f6a7-0003-0000-0000-000000000015", + "metadata": {}, + "outputs": [], + "source": [ + "client.cypher(\"MATCH (n:Entity {session: $sess}) DETACH DELETE n\", params={\"sess\": SESSION})\n", + "print(\"Cleaned up session:\", SESSION)\n", + "client.close()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/docker-compose.yml b/docker-compose.yml index 5cf2f39..795b849 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -7,7 +7,7 @@ version: "3.9" services: coordinode: - image: ghcr.io/structured-world/coordinode:latest + image: ghcr.io/structured-world/coordinode:0.3.17 container_name: coordinode ports: - "7080:7080" # gRPC diff --git a/langchain-coordinode/langchain_coordinode/graph.py b/langchain-coordinode/langchain_coordinode/graph.py index cc0f2e7..d94a63f 100644 --- a/langchain-coordinode/langchain_coordinode/graph.py +++ b/langchain-coordinode/langchain_coordinode/graph.py @@ -4,6 +4,7 @@ import hashlib import json +import logging import re from collections.abc import Sequence from typing import Any @@ -12,6 +13,8 @@ from coordinode import CoordinodeClient +logger = logging.getLogger(__name__) + class CoordinodeGraph(GraphStore): """LangChain `GraphStore` backed by CoordiNode. @@ -37,6 +40,11 @@ class CoordinodeGraph(GraphStore): addr: CoordiNode gRPC address, e.g. ``"localhost:7080"``. database: Database name (reserved for future multi-db support). timeout: Per-request gRPC deadline in seconds. + client: Optional pre-built client object (e.g. ``LocalClient`` from + ``coordinode-embedded``) to use instead of creating a gRPC connection. + Must expose a callable ``cypher(query, params)`` method. When + provided, ``addr`` and ``timeout`` are ignored. The caller is + responsible for closing the client. """ def __init__( @@ -45,8 +53,16 @@ def __init__( *, database: str | None = None, timeout: float = 30.0, + client: Any = None, ) -> None: - self._client = CoordinodeClient(addr, timeout=timeout) + # ``client`` allows passing a pre-built client (e.g. LocalClient from + # coordinode-embedded) instead of creating a gRPC connection. The object + # must expose a ``.cypher(query, params)`` method and, optionally, + # ``.get_schema_text()`` and ``.vector_search()``. + if client is not None and not callable(getattr(client, "cypher", None)): + raise TypeError("client must provide a callable cypher(query, params) method") + self._owns_client = client is None + self._client = client if client is not None else CoordinodeClient(addr, timeout=timeout) self._schema: str | None = None self._structured_schema: dict[str, Any] | None = None @@ -67,26 +83,85 @@ def structured_schema(self) -> dict[str, Any]: return self._structured_schema or {} def refresh_schema(self) -> None: - """Fetch current schema from CoordiNode.""" - text = self._client.get_schema_text() - self._schema = text - structured = _parse_schema(text) - # Augment with relationship triples (start_label, type, end_label) via - # Cypher — get_schema_text() only lists edge types without direction. - # No LIMIT here intentionally: RETURN DISTINCT already collapses all edges - # to unique (src_label, rel_type, dst_label) combinations, so the result - # is bounded by the number of distinct relationship type triples, not by - # total edge count. Adding a LIMIT would silently drop relationship types - # that happen to appear beyond the limit, producing an incomplete schema. + """Fetch current schema from CoordiNode. + + Prefers the structured ``get_labels()`` / ``get_edge_types()`` API + (available since ``coordinode`` 0.6.0) over the legacy text-parsing + path. Injected clients (e.g. ``_EmbeddedAdapter`` in Colab notebooks) + that do not expose those methods fall back to ``_parse_schema()``. + Injected clients that only expose ``cypher()`` / ``close()`` (e.g. + a bare ``coordinode-embedded`` ``LocalClient``) return empty node/rel + property metadata, though relationships may still be inferred from + graph data via a Cypher query and populated in + ``structured["relationships"]``. + """ + get_schema_text = getattr(self._client, "get_schema_text", None) + if callable(get_schema_text): + try: + self._schema = get_schema_text() + except Exception: + logger.debug( + "get_schema_text() raised — continuing with empty schema text", + exc_info=True, + ) + self._schema = "" + else: + self._schema = "" + + if callable(getattr(self._client, "get_labels", None)) and callable( + getattr(self._client, "get_edge_types", None) + ): + try: + node_props: dict[str, list[dict[str, str]]] = {} + for label in self._client.get_labels(): + node_props[label.name] = [ + {"property": p.name, "type": _PROPERTY_TYPE_NAME.get(p.type, "UNSPECIFIED")} + for p in label.properties + ] + rel_props: dict[str, list[dict[str, str]]] = {} + for et in self._client.get_edge_types(): + rel_props[et.name] = [ + {"property": p.name, "type": _PROPERTY_TYPE_NAME.get(p.type, "UNSPECIFIED")} + for p in et.properties + ] + if node_props or rel_props: + structured: dict[str, Any] = {"node_props": node_props, "rel_props": rel_props, "relationships": []} + # Backfill text schema for clients that expose get_labels()/get_edge_types() + # but not get_schema_text(). Keeps graph.schema consistent with + # graph.structured_schema so callers that read either view get the same data. + if not self._schema: + self._schema = _structured_to_text(node_props, rel_props) + else: + # Both APIs returned empty (e.g. schema-free graph or stub adapter) — + # fall back to text parsing so we don't lose what get_schema_text() returned. + structured = _parse_schema(self._schema) + except Exception: + # Server may expose get_labels/get_edge_types but raise UNIMPLEMENTED + # or another gRPC error if the schema service is not available in the + # deployed version. Fall back to text-based schema parsing to avoid + # surfacing an unhandled exception to the caller. + # We catch broad Exception (rather than grpc.RpcError specifically) + # because langchain-coordinode does not take a hard grpc dependency — + # clients may be non-gRPC (e.g. coordinode-embedded LocalClient). + # The exception is logged at DEBUG so it remains observable without + # cluttering production output. + logger.debug( + "get_labels()/get_edge_types() failed — falling back to _parse_schema()", + exc_info=True, + ) + structured = _parse_schema(self._schema) + else: + structured = _parse_schema(self._schema) + + # Augment with relationship triples (start_label, type, end_label). + # No LIMIT: RETURN DISTINCT bounds result by unique triples, not edge count. + # Note: can simplify to labels(a)[0] once subscript-on-function support lands in the + # published Docker image (tracked in G010 / GAPS.md). rows = self._client.cypher( - "MATCH (a)-[r]->(b) RETURN DISTINCT labels(a) AS src_labels, type(r) AS rel, labels(b) AS dst_labels" + "MATCH (a)-[r]->(b) RETURN DISTINCT labels(a) AS src_labels, type(r) AS rel, labels(b) AS dst_labels", + {}, ) if rows: - # Deduplicate after _first_label() normalization: RETURN DISTINCT operates on - # raw label lists, but _first_label(min()) can collapse different multi-label - # combinations to the same (start, type, end) triple (e.g. ['Employee','Person'] - # and ['Person','Employee'] both min-normalize to 'Employee'). Use a set to - # ensure each relationship triple appears at most once. triples: set[tuple[str, str, str]] = set() for row in rows: start = _first_label(row.get("src_labels")) @@ -140,29 +215,20 @@ def _upsert_node(self, node: Any) -> None: ) def _create_edge(self, rel: Any) -> None: - """Upsert a relationship via MERGE (idempotent). - - SET r += $props is skipped when props is empty because - SET r += {} is not supported by all server versions. - """ + """Upsert a relationship via MERGE (idempotent).""" src_label = _cypher_ident(rel.source.type or "Entity") dst_label = _cypher_ident(rel.target.type or "Entity") rel_type = _cypher_ident(rel.type) props = dict(rel.properties or {}) - if props: - self._client.cypher( - f"MATCH (src:{src_label} {{name: $src}}) " - f"MATCH (dst:{dst_label} {{name: $dst}}) " - f"MERGE (src)-[r:{rel_type}]->(dst) SET r += $props", - params={"src": rel.source.id, "dst": rel.target.id, "props": props}, - ) - else: - self._client.cypher( - f"MATCH (src:{src_label} {{name: $src}}) " - f"MATCH (dst:{dst_label} {{name: $dst}}) " - f"MERGE (src)-[r:{rel_type}]->(dst)", - params={"src": rel.source.id, "dst": rel.target.id}, - ) + # SET r += $props is intentionally unconditional (even for empty dicts). + # CoordiNode ≥ v0.3.12 supports SET r += {} as a no-op, which lets us + # keep a single code path instead of branching on emptiness. + self._client.cypher( + f"MATCH (src:{src_label} {{name: $src}}) " + f"MATCH (dst:{dst_label} {{name: $dst}}) " + f"MERGE (src)-[r:{rel_type}]->(dst) SET r += $props", + params={"src": rel.source.id, "dst": rel.target.id, "props": props}, + ) def _link_document_to_entities(self, doc: Any) -> None: """Upsert a ``__Document__`` node and MERGE ``MENTIONS`` edges to all entities.""" @@ -223,6 +289,10 @@ def similarity_search( # used in a boolean context. len() == 0 works for all sequence types. if len(query_vector) == 0: return [] + if not callable(getattr(self._client, "vector_search", None)): + # Injected clients (e.g. bare coordinode-embedded LocalClient) may + # not implement vector_search — return empty rather than AttributeError. + return [] results = sorted( self._client.vector_search( label=label, @@ -237,8 +307,15 @@ def similarity_search( # ── Lifecycle ───────────────────────────────────────────────────────── def close(self) -> None: - """Close the underlying gRPC connection.""" - self._client.close() + """Close the underlying client connection. + + Only closes the client if it was created internally (i.e. ``client`` was + not passed to ``__init__``). Externally-injected clients are owned by + the caller and must be closed by them. The underlying transport may be + gRPC or an embedded in-process client. + """ + if self._owns_client: + self._client.close() def __enter__(self) -> CoordinodeGraph: return self @@ -247,7 +324,26 @@ def __exit__(self, *args: Any) -> None: self.close() -# ── Schema parser ───────────────────────────────────────────────────────── +# ── Schema helpers ──────────────────────────────────────────────────────── + +# Maps PropertyType protobuf enum integers to LangChain-compatible type strings. +# Values mirror coordinode.v1.graph.PropertyType (schema.proto). +# A static dict is intentional: importing generated proto modules here would create +# a hard dependency on coordinode's internal proto layout inside langchain-coordinode. +# This package already receives integer enum values via LabelInfo/EdgeTypeInfo from +# the coordinode SDK, so a local lookup table is the correct decoupling boundary. +_PROPERTY_TYPE_NAME: dict[int, str] = { + 0: "UNSPECIFIED", + 1: "INT64", + 2: "FLOAT64", + 3: "STRING", + 4: "BOOL", + 5: "BYTES", + 6: "TIMESTAMP", + 7: "VECTOR", + 8: "LIST", + 9: "MAP", +} def _stable_document_id(source: Any) -> str: @@ -276,9 +372,15 @@ def _stable_document_id(source: Any) -> str: def _first_label(labels: Any) -> str | None: """Extract a stable label from a labels() result (list of strings). - openCypher does not guarantee a stable ordering for labels(), so using - labels[0] would produce nondeterministic schema entries across calls. - We return the lexicographically smallest label as a deterministic rule. + In practice this application creates nodes with a single label, but the + underlying CoordiNode API accepts ``list[str]`` so multi-label nodes are + possible. ``min()`` gives a deterministic result regardless of how many + labels are present. + + Note: once subscript-on-function support lands in the published Docker + image (tracked in G010 / GAPS.md), this Python helper could be replaced + by an inline Cypher expression — but keep the deterministic ``min()`` + rule rather than index 0, since label ordering is not guaranteed stable. """ if isinstance(labels, list) and labels: return str(min(labels)) @@ -295,6 +397,34 @@ def _cypher_ident(name: str) -> str: return f"`{name.replace('`', '``')}`" +def _structured_to_text( + node_props: dict[str, list[dict[str, str]]], + rel_props: dict[str, list[dict[str, str]]], +) -> str: + """Render node_props/rel_props dicts as a schema text string. + + Produces the same format that :func:`_parse_schema` consumes, so the two + functions are inverses. Used to backfill ``self._schema`` when the server + exposes ``get_labels()`` / ``get_edge_types()`` but not ``get_schema_text()``. + """ + lines: list[str] = ["Node labels:"] + for label, props in sorted(node_props.items()): + if props: + props_str = ", ".join(f"{p['property']}: {p['type']}" for p in props) + lines.append(f" - {label} (properties: {props_str})") + else: + lines.append(f" - {label}") + lines.append("") + lines.append("Edge types:") + for rel_type, props in sorted(rel_props.items()): + if props: + props_str = ", ".join(f"{p['property']}: {p['type']}" for p in props) + lines.append(f" - {rel_type} (properties: {props_str})") + else: + lines.append(f" - {rel_type}") + return "\n".join(lines) + + def _parse_schema(schema_text: str) -> dict[str, Any]: """Convert CoordiNode schema text into LangChain's structured format. @@ -346,7 +476,11 @@ def _parse_schema(schema_text: str) -> dict[str, Any]: continue # Parse inline properties: "- Label (properties: prop1: TYPE, prop2: TYPE)" props: list[dict[str, str]] = [] - m = re.search(r"\(properties:\s*([^)]+)\)", stripped) + # [^)\n]+ has no overlap with the surrounding literal chars, so backtracking + # is bounded at O(n). Drop the earlier \s* — its overlap with [^)\n] + # (spaces match both) created O(n²) backtracking on malformed input. + # Leading/trailing whitespace is handled by prop_str.strip() below. + m = re.search(r"\(properties:([^)\n]+)\)", stripped) if m: for prop_str in m.group(1).split(","): kv = prop_str.strip().split(":", 1) diff --git a/langchain-coordinode/pyproject.toml b/langchain-coordinode/pyproject.toml index 443a3f2..4946407 100644 --- a/langchain-coordinode/pyproject.toml +++ b/langchain-coordinode/pyproject.toml @@ -22,7 +22,7 @@ classifiers = [ "Topic :: Scientific/Engineering :: Artificial Intelligence", ] dependencies = [ - "coordinode>=0.4.0", + "coordinode>=0.6.0", "langchain-community>=0.2.0", ] diff --git a/llama-index-coordinode/llama_index/graph_stores/coordinode/base.py b/llama-index-coordinode/llama_index/graph_stores/coordinode/base.py index 1a6b3e1..88ae99b 100644 --- a/llama-index-coordinode/llama_index/graph_stores/coordinode/base.py +++ b/llama-index-coordinode/llama_index/graph_stores/coordinode/base.py @@ -58,6 +58,11 @@ class CoordinodePropertyGraphStore(PropertyGraphStore): Args: addr: CoordiNode gRPC address, e.g. ``"localhost:7080"``. timeout: Per-request gRPC deadline in seconds. + client: Optional pre-built client object (e.g. ``LocalClient`` from + ``coordinode-embedded``) to use instead of creating a gRPC connection. + Must expose a callable ``cypher(query, params)`` method. When + provided, ``addr`` and ``timeout`` are ignored. The caller is + responsible for closing the client. """ supports_structured_queries: bool = True @@ -68,8 +73,19 @@ def __init__( addr: str = "localhost:7080", *, timeout: float = 30.0, + client: Any = None, ) -> None: - self._client = CoordinodeClient(addr, timeout=timeout) + # ``client`` allows passing a pre-built client (e.g. LocalClient from + # coordinode-embedded) instead of creating a gRPC connection. + if client is not None and not callable(getattr(client, "cypher", None)): + raise TypeError("client must provide a callable cypher() method") + self._owns_client = client is None + self._client = client if client is not None else CoordinodeClient(addr, timeout=timeout) + # Advertise vector capability based on what the actual client exposes. + # Injected clients (e.g. bare coordinode-embedded LocalClient) may not + # implement vector_search(); claiming True unconditionally would mislead + # LlamaIndex into passing vector queries that silently return no results. + self.supports_vector_queries = callable(getattr(self._client, "vector_search", None)) # ── Node operations ─────────────────────────────────────────────────── @@ -224,21 +240,17 @@ def upsert_relations(self, relations: list[Relation]) -> None: for rel in relations: props = rel.properties or {} label = _cypher_ident(rel.label) - if props: - cypher = ( - f"MATCH (src {{id: $src_id}}) MATCH (dst {{id: $dst_id}}) " - f"MERGE (src)-[r:{label}]->(dst) SET r += $props" - ) - self._client.cypher( - cypher, - params={"src_id": rel.source_id, "dst_id": rel.target_id, "props": props}, - ) - else: - cypher = f"MATCH (src {{id: $src_id}}) MATCH (dst {{id: $dst_id}}) MERGE (src)-[r:{label}]->(dst)" - self._client.cypher( - cypher, - params={"src_id": rel.source_id, "dst_id": rel.target_id}, - ) + # SET r += $props is intentionally unconditional (even for empty dicts). + # CoordiNode ≥ v0.3.12 supports SET r += {} as a no-op, which lets us + # keep a single code path instead of branching on emptiness. + cypher = ( + f"MATCH (src {{id: $src_id}}) MATCH (dst {{id: $dst_id}}) " + f"MERGE (src)-[r:{label}]->(dst) SET r += $props" + ) + self._client.cypher( + cypher, + params={"src_id": rel.source_id, "dst_id": rel.target_id, "props": props}, + ) def delete( self, @@ -269,6 +281,12 @@ def vector_query( if query.query_embedding is None: return [], [] + if not self.supports_vector_queries: + # Injected clients (e.g. bare coordinode-embedded LocalClient) may + # not implement vector_search — return empty rather than AttributeError. + # supports_vector_queries is set in __init__ via callable(getattr(...)). + return [], [] + results = self._client.vector_search( label=query.filters.filters[0].value if query.filters else "Chunk", property="embedding", @@ -293,14 +311,30 @@ def vector_query( # ── Lifecycle ───────────────────────────────────────────────────────── def get_schema(self, refresh: bool = False) -> str: - """Return schema as text.""" - return self._client.get_schema_text() + """Return schema as text. + + Returns an empty string if the injected client does not expose + ``get_schema_text()`` (e.g. a bare ``coordinode-embedded`` + ``LocalClient`` that only implements ``cypher()`` / ``close()``). + """ + get_schema_text = getattr(self._client, "get_schema_text", None) + if callable(get_schema_text): + return get_schema_text() + return "" def get_schema_str(self, refresh: bool = False) -> str: return self.get_schema(refresh=refresh) def close(self) -> None: - self._client.close() + """Close the underlying client connection. + + Only closes the client if it was created internally (i.e. ``client`` was + not passed to ``__init__``). Externally-injected clients are owned by + the caller and must be closed by them. The underlying transport may be + gRPC or an embedded in-process client. + """ + if self._owns_client: + self._client.close() def __enter__(self) -> CoordinodePropertyGraphStore: return self diff --git a/llama-index-coordinode/pyproject.toml b/llama-index-coordinode/pyproject.toml index 62e2963..92069a9 100644 --- a/llama-index-coordinode/pyproject.toml +++ b/llama-index-coordinode/pyproject.toml @@ -22,7 +22,7 @@ classifiers = [ "Topic :: Scientific/Engineering :: Artificial Intelligence", ] dependencies = [ - "coordinode>=0.4.0", + "coordinode>=0.6.0", "llama-index-core>=0.10.0", ] diff --git a/proto b/proto index 1b78ebd..e1ab91d 160000 --- a/proto +++ b/proto @@ -1 +1 @@ -Subproject commit 1b78ebd35f9a41c88fddcc45addb7b91395f80f4 +Subproject commit e1ab91d180c4b1f61106a09e4b56da6a9736ebdb diff --git a/ruff.toml b/ruff.toml index 2a07ca7..05039df 100644 --- a/ruff.toml +++ b/ruff.toml @@ -6,6 +6,10 @@ exclude = [ "coordinode/coordinode/_proto/", # Generated version files — do not lint "**/_version.py", + # Git submodule — managed separately + "coordinode-rs/", + # Jupyter auto-save artifacts — not committed + "**/.ipynb_checkpoints/", ] [lint] @@ -14,5 +18,10 @@ ignore = [ "E501", # line length handled by formatter ] +[lint.per-file-ignores] +# Jupyter notebooks: multi-import lines, out-of-order imports after subprocess +# install cells, and unsorted imports are normal in demo notebook code. +"**/*.ipynb" = ["E401", "E402", "I001"] + [lint.isort] known-first-party = ["coordinode", "langchain_coordinode", "llama_index_coordinode"] diff --git a/tests/integration/test_sdk.py b/tests/integration/test_sdk.py index fa14ce2..68f69eb 100644 --- a/tests/integration/test_sdk.py +++ b/tests/integration/test_sdk.py @@ -7,12 +7,23 @@ from __future__ import annotations +import functools import os import uuid +import grpc import pytest -from coordinode import AsyncCoordinodeClient, CoordinodeClient, EdgeTypeInfo, LabelInfo, TraverseResult +from coordinode import ( + AsyncCoordinodeClient, + CoordinodeClient, + EdgeTypeInfo, + HybridResult, + LabelInfo, + TextIndexInfo, + TextResult, + TraverseResult, +) ADDR = os.environ.get("COORDINODE_ADDR", "localhost:7080") @@ -426,6 +437,116 @@ async def test_async_create_node(): await c.cypher("MATCH (n:AsyncTest {tag: $tag}) DELETE n", params={"tag": tag}) +# ── create_label / create_edge_type ────────────────────────────────────────── + + +def test_create_label_returns_label_info(client): + """create_label() registers a label and returns LabelInfo.""" + name = f"CreateLabelTest{uid()}" + info = client.create_label( + name, + properties=[ + {"name": "title", "type": "string", "required": True}, + {"name": "score", "type": "float64"}, + ], + ) + assert isinstance(info, LabelInfo) + assert info.name == name + + +def test_create_label_appears_in_get_labels(client): + """Label created via create_label() appears in get_labels() once a node exists. + + Known limitation: ListLabels currently returns only labels that have at least + one node in the graph. Ideally it should also include schema-only labels + registered via create_label() (analogous to Neo4j returning schema-constrained + labels even without data). Tracked as a server-side gap. + """ + name = f"CreateLabelVisible{uid()}" + tag = uid() + # Declare both properties used in the workaround node. Note: schema_mode + # is accepted by the server but not yet enforced in the current image; + # the properties are declared here for forward compatibility. + client.create_label( + name, + properties=[ + {"name": "x", "type": "int64"}, + {"name": "tag", "type": "string"}, + ], + ) + # Workaround: create a node so the label appears in ListLabels. + client.cypher(f"CREATE (n:{name} {{x: 1, tag: $tag}})", params={"tag": tag}) + try: + labels = client.get_labels() + names = [lbl.name for lbl in labels] + assert name in names, f"{name} not in {names}" + finally: + client.cypher(f"MATCH (n:{name} {{tag: $tag}}) DELETE n", params={"tag": tag}) + + +def test_create_label_schema_mode_flexible(client): + """create_label() with schema_mode='flexible' is accepted by the server.""" + name = f"FlexLabel{uid()}" + info = client.create_label(name, schema_mode="flexible") + assert isinstance(info, LabelInfo) + assert info.name == name + # FLEXIBLE (SchemaMode=3) is enforced server-side since coordinode-rs v0.3.12. + # Older servers that omit the field return 0 (default), but this test suite + # targets v0.3.12+ and asserts the concrete value to catch regressions. + assert info.schema_mode == 3 # FLEXIBLE + + +def test_create_label_schema_mode_validated(client): + """create_label() with schema_mode='validated' is accepted and returns SchemaMode=2.""" + name = f"ValidatedLabel{uid()}" + info = client.create_label(name, schema_mode="validated") + assert isinstance(info, LabelInfo) + assert info.name == name + assert info.schema_mode == 2 # VALIDATED + + +def test_create_label_invalid_schema_mode_raises(client): + """create_label() with unknown schema_mode raises ValueError locally.""" + with pytest.raises(ValueError, match="schema_mode"): + client.create_label(f"Bad{uid()}", schema_mode="unknown") + + +def test_create_edge_type_returns_edge_type_info(client): + """create_edge_type() registers an edge type and returns EdgeTypeInfo.""" + name = f"CREATE_ET_{uid()}".upper() + info = client.create_edge_type( + name, + properties=[{"name": "since", "type": "timestamp"}], + ) + assert isinstance(info, EdgeTypeInfo) + assert info.name == name + + +def test_create_edge_type_appears_in_get_edge_types(client): + """Edge type created via create_edge_type() appears in get_edge_types() once an edge exists. + + Same known limitation as test_create_label_appears_in_get_labels: ListEdgeTypes + currently requires at least one edge of that type to exist in the graph. + """ + name = f"VISIBLE_ET_{uid()}".upper() + tag = uid() + client.create_edge_type(name) + # Workaround: create an edge so the type appears in ListEdgeTypes. + client.cypher( + f"CREATE (a:VisibleEtNode {{tag: $tag}})-[:{name}]->(b:VisibleEtNode {{tag: $tag}})", + params={"tag": tag}, + ) + try: + edge_types = client.get_edge_types() + names = [et.name for et in edge_types] + assert name in names, f"{name} not in {names}" + finally: + client.cypher( + "MATCH (n:VisibleEtNode {tag: $tag}) DETACH DELETE n", + params={"tag": tag}, + ) + + # ── Vector search ───────────────────────────────────────────────────────────── @@ -443,3 +564,175 @@ def test_vector_search_returns_results(client): assert hasattr(results[0], "node") finally: client.cypher("MATCH (n:VecSDKTest {tag: $tag}) DELETE n", params={"tag": tag}) + + +# ── Full-text search ────────────────────────────────────────────────────────── + + +# FTS tests require a CoordiNode server with TextService implemented (>=0.3.8). +# They are wrapped with @_fts so the suite stays green against older servers +# (UNIMPLEMENTED gRPC status → xfail); against >=0.3.8 servers they are real passes. +# +# These tests intentionally exercise the explicit text-index lifecycle APIs. +# Schema-free graphs may still be auto-indexed by the server, but the tests +# below create/drop an index explicitly so create_text_index()/drop_text_index() +# are covered deterministically. Tests that deliberately cover the "no-index" +# case (test_text_search_empty_for_unindexed_label) must NOT create an index. +def _fts(fn): + """Wrap an FTS test to handle servers without TextService. + + Explicit xfail conditions: + - ``grpc.StatusCode.UNIMPLEMENTED``: TextService RPC does not exist → ``pytest.xfail()``. + - Empty result set: hit-requiring tests call ``pytest.xfail()`` inline. + + All other exceptions (wrong return type, malformed score, unexpected gRPC errors) + propagate as real failures so regressions on FTS-capable servers are visible in CI. + No ``pytest.mark.xfail`` decorator is applied — that would silently swallow any + AssertionError and hide client-side regressions. + """ + + @functools.wraps(fn) + def wrapper(*args, **kwargs): + try: + return fn(*args, **kwargs) + except grpc.RpcError as exc: + if exc.code() == grpc.StatusCode.UNIMPLEMENTED: + pytest.xfail(f"TextService not implemented: {exc.details()}") + raise # other gRPC errors (e.g. INVALID_ARGUMENT) surface as failures + + return wrapper + + +@_fts +def test_text_search_returns_results(client): + """text_search() finds nodes whose text property matches the query.""" + label = f"FtsTest_{uid()}" + tag = uid() + idx_name = f"idx_{label.lower()}" + # CoordiNode executor serialises a node variable as Value::Int(node_id) — runner.rs NodeScan + # path. No id() function needed; rows[0]["node_id"] is the integer internal node id. + rows = client.cypher( + f"CREATE (n:{label} {{tag: $tag, body: 'machine learning and neural networks'}}) RETURN n AS node_id", + params={"tag": tag}, + ) + seed_id = rows[0]["node_id"] + # Text index must be created explicitly; nodes written before index creation + # are indexed immediately at DDL time. + idx_created = False + try: + idx_info = client.create_text_index(idx_name, label, "body") + idx_created = True + assert isinstance(idx_info, TextIndexInfo) + results = client.text_search(label, "machine learning", limit=5) + assert isinstance(results, list) + assert results, "text_search returned no results after index creation" + assert any(r.node_id == seed_id for r in results), ( + f"seeded node {seed_id} not found in text_search results: {results}" + ) + r = results[0] + assert isinstance(r, TextResult) + assert isinstance(r.node_id, int) + assert isinstance(r.score, float) + assert r.score > 0 + assert isinstance(r.snippet, str) + finally: + try: + if idx_created: + client.drop_text_index(idx_name) + finally: + client.cypher(f"MATCH (n:{label} {{tag: $tag}}) DELETE n", params={"tag": tag}) + + +@_fts +def test_text_search_empty_for_unindexed_label(client): + """text_search() returns [] when there are no text-indexed nodes to match. + + Covers two distinct cases: + 1. Label has never been inserted — nothing to search. + 2. Label exists but nodes carry only numeric/boolean properties; the FTS + index contains no text, so no results can match any query term. + """ + # Case 1: label that has never been inserted into the graph + results = client.text_search("NoSuchLabelForFts_" + uid(), "anything") + assert results == [] + + # Case 2: label exists but nodes have no text properties to index + label = f"FtsNumericOnly_{uid()}" + tag = uid() + client.cypher( + f"CREATE (n:{label} {{tag: $tag, count: 42, active: true}})", + params={"tag": tag}, + ) + try: + results = client.text_search(label, "anything") + assert results == [] + finally: + client.cypher(f"MATCH (n:{label} {{tag: $tag}}) DELETE n", params={"tag": tag}) + + +@_fts +def test_text_search_fuzzy(client): + """text_search() with fuzzy=True matches approximate terms.""" + label = f"FtsFuzzyTest_{uid()}" + tag = uid() + idx_name = f"idx_{label.lower()}" + client.cypher( + f"CREATE (n:{label} {{tag: $tag, body: 'coordinode graph database'}})", + params={"tag": tag}, + ) + idx_created = False + try: + client.create_text_index(idx_name, label, "body") + idx_created = True + # "coordinode" with a one-character typo — Levenshtein-1 fuzzy must match. + results = client.text_search(label, "coordinod", fuzzy=True, limit=5) + assert isinstance(results, list) + assert results, "fuzzy text_search returned no results after index creation" + finally: + try: + if idx_created: + client.drop_text_index(idx_name) + finally: + client.cypher(f"MATCH (n:{label} {{tag: $tag}}) DELETE n", params={"tag": tag}) + + +@_fts +def test_hybrid_text_vector_search_returns_results(client): + """hybrid_text_vector_search() returns HybridResult list with RRF scores.""" + label = f"FtsHybridTest_{uid()}" + tag = uid() + idx_name = f"idx_{label.lower()}" + vec = [float(i) / 16 for i in range(16)] + # Same node-as-int pattern: RETURN n → Value::Int(node_id) in CoordiNode executor. + rows = client.cypher( + f"CREATE (n:{label} {{tag: $tag, body: 'graph neural network embedding', embedding: $vec}}) RETURN n AS node_id", + params={"tag": tag, "vec": vec}, + ) + seed_id = rows[0]["node_id"] + idx_created = False + try: + client.create_text_index(idx_name, label, "body") + idx_created = True + results = client.hybrid_text_vector_search( + label, + "graph neural", + vec, + limit=5, + ) + assert isinstance(results, list) + if not results: + pytest.xfail("hybrid_text_vector_search returned no results — vector index not available on this server") + assert any(r.node_id == seed_id for r in results), ( + f"seeded node {seed_id} not found in hybrid_text_vector_search results: {results}" + ) + r = results[0] + assert isinstance(r, HybridResult) + assert isinstance(r.node_id, int) + assert isinstance(r.score, float) + assert r.score > 0 + finally: + try: + if idx_created: + client.drop_text_index(idx_name) + finally: + client.cypher(f"MATCH (n:{label} {{tag: $tag}}) DETACH DELETE n", params={"tag": tag}) diff --git a/tests/unit/test_schema_crud.py b/tests/unit/test_schema_crud.py index 3e6495c..67db15c 100644 --- a/tests/unit/test_schema_crud.py +++ b/tests/unit/test_schema_crud.py @@ -35,19 +35,21 @@ def __init__(self, name: str, type_: int, required: bool = False, unique: bool = class _FakeLabel: """Matches proto Label shape.""" - def __init__(self, name: str, version: int = 1, properties=None) -> None: + def __init__(self, name: str, version: int = 1, properties=None, schema_mode: int = 0) -> None: self.name = name self.version = version self.properties = properties or [] + self.schema_mode = schema_mode class _FakeEdgeType: """Matches proto EdgeType shape.""" - def __init__(self, name: str, version: int = 1, properties=None) -> None: + def __init__(self, name: str, version: int = 1, properties=None, schema_mode: int = 0) -> None: self.name = name self.version = version self.properties = properties or [] + self.schema_mode = schema_mode class _FakeNode: @@ -127,6 +129,36 @@ def test_version_zero(self): label = LabelInfo(_FakeLabel("Draft", version=0)) assert label.version == 0 + def test_schema_mode_defaults_to_zero(self): + label = LabelInfo(_FakeLabel("Person")) + assert label.schema_mode == 0 + + def test_schema_mode_strict(self): + label = LabelInfo(_FakeLabel("Person", schema_mode=1)) + assert label.schema_mode == 1 + + def test_schema_mode_validated(self): + label = LabelInfo(_FakeLabel("Person", schema_mode=2)) + assert label.schema_mode == 2 + + def test_schema_mode_flexible(self): + label = LabelInfo(_FakeLabel("Person", schema_mode=3)) + assert label.schema_mode == 3 + + def test_schema_mode_in_repr(self): + label = LabelInfo(_FakeLabel("Person", schema_mode=1)) + assert "schema_mode" in repr(label) + + def test_schema_mode_missing_from_proto_defaults_zero(self): + # Proto objects without schema_mode attribute (older server) → 0. + class _OldLabel: + name = "Legacy" + version = 1 + properties = [] + + label = LabelInfo(_OldLabel()) + assert label.schema_mode == 0 + # ── EdgeTypeInfo ───────────────────────────────────────────────────────────── @@ -150,6 +182,23 @@ def test_repr_contains_name(self): et = EdgeTypeInfo(_FakeEdgeType("RATED")) assert "RATED" in repr(et) + def test_schema_mode_defaults_to_zero(self): + et = EdgeTypeInfo(_FakeEdgeType("KNOWS")) + assert et.schema_mode == 0 + + def test_schema_mode_propagated(self): + et = EdgeTypeInfo(_FakeEdgeType("KNOWS", schema_mode=2)) + assert et.schema_mode == 2 + + def test_schema_mode_missing_from_proto_defaults_zero(self): + class _OldEdgeType: + name = "LEGACY" + version = 1 + properties = [] + + et = EdgeTypeInfo(_OldEdgeType()) + assert et.schema_mode == 0 + # ── TraverseResult ─────────────────────────────────────────────────────────── @@ -199,6 +248,108 @@ def test_repr_shows_counts(self): # ── traverse() input validation ────────────────────────────────────────────── +class _FakePropertyTypeAll: + """Complete fake proto PropertyType with all enum values.""" + + PROPERTY_TYPE_INT64 = 1 + PROPERTY_TYPE_FLOAT64 = 2 + PROPERTY_TYPE_STRING = 3 + PROPERTY_TYPE_BOOL = 4 + PROPERTY_TYPE_BYTES = 5 + PROPERTY_TYPE_TIMESTAMP = 6 + PROPERTY_TYPE_VECTOR = 7 + PROPERTY_TYPE_LIST = 8 + PROPERTY_TYPE_MAP = 9 + + +class _FakePropDefCls: + """Minimal PropertyDefinition constructor.""" + + def __init__(self, **kwargs): + pass # Stub: kwargs intentionally ignored — only used to verify call succeeds + + +class TestBuildPropertyDefinitions: + """Unit tests for AsyncCoordinodeClient._build_property_definitions() validation. + + Validation runs before any RPC call, so no running server is required. + """ + + def test_non_dict_property_raises(self): + """_build_property_definitions() raises ValueError for non-dict entries.""" + client = AsyncCoordinodeClient("localhost:0") + with pytest.raises(ValueError, match="must be a dict"): + client._build_property_definitions(["not-a-dict"], _FakePropertyTypeAll, _FakePropDefCls) + + def test_missing_name_raises(self): + """_build_property_definitions() raises ValueError when 'name' key is absent.""" + client = AsyncCoordinodeClient("localhost:0") + with pytest.raises(ValueError, match="non-empty 'name' key"): + client._build_property_definitions([{"type": "string"}], _FakePropertyTypeAll, _FakePropDefCls) + + def test_non_bool_required_raises(self): + """_build_property_definitions() raises ValueError when required is not a bool.""" + client = AsyncCoordinodeClient("localhost:0") + with pytest.raises(ValueError, match="boolean values for 'required' and 'unique'"): + client._build_property_definitions( + [{"name": "x", "type": "string", "required": "true"}], + _FakePropertyTypeAll, + _FakePropDefCls, + ) + + def test_non_bool_unique_raises(self): + """_build_property_definitions() raises ValueError when unique is not a bool.""" + client = AsyncCoordinodeClient("localhost:0") + with pytest.raises(ValueError, match="boolean values for 'required' and 'unique'"): + client._build_property_definitions( + [{"name": "x", "type": "string", "unique": 1}], + _FakePropertyTypeAll, + _FakePropDefCls, + ) + + def test_valid_bool_properties_accepted(self): + """_build_property_definitions() accepts proper bool required/unique values.""" + client = AsyncCoordinodeClient("localhost:0") + result = client._build_property_definitions( + [{"name": "x", "type": "string", "required": True, "unique": False}], + _FakePropertyTypeAll, + _FakePropDefCls, + ) + assert len(result) == 1 + + +class TestCreateLabelSchemaMode: + """Unit tests for schema_mode normalization in create_label().""" + + def test_invalid_schema_mode_raises(self): + """create_label() raises ValueError for unknown schema_mode string.""" + + async def _inner() -> None: + client = AsyncCoordinodeClient("localhost:0") + with pytest.raises(ValueError, match="schema_mode must be one of"): + await client.create_label("Foo", schema_mode="unknown") + + asyncio.run(_inner()) + + def test_uppercase_schema_mode_accepted(self): + """create_label() normalizes ' STRICT ' (with spaces and uppercase) to 'strict' before RPC.""" + from unittest.mock import AsyncMock + + async def _inner() -> None: + client = AsyncCoordinodeClient("localhost:0") + # Patch the schema stub so the RPC call doesn't reach a real server. + client._schema_stub = type( + "FakeStub", + (), + {"CreateLabel": AsyncMock(return_value=_FakeLabel("Foo"))}, + )() + # ' STRICT ' must normalise cleanly (strip + lower) and NOT raise ValueError. + info = await client.create_label("Foo", schema_mode=" STRICT ") + assert info.name == "Foo" + + asyncio.run(_inner()) + + class TestTraverseValidation: """Unit tests for AsyncCoordinodeClient.traverse() input validation. diff --git a/uv.lock b/uv.lock index 50ea668..e0cce2b 100644 --- a/uv.lock +++ b/uv.lock @@ -359,7 +359,7 @@ provides-extras = ["dev"] [[package]] name = "coordinode-workspace" -version = "0.4.4" +version = "0.7.0" source = { virtual = "." } dependencies = [ { name = "googleapis-common-protos" },