Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
- Fixed an edge case where the LLM can output a property with type 'map', which was causing errors during import as it is not a valid property type in Neo4j.


### Added

- Document node is now always created when running SimpleKGPipeline, even if `from_pdf=False`.
- Document metadata is exposed in SimpleKGPipeline run method.


## 1.9.1

### Fixed
Expand Down
9 changes: 9 additions & 0 deletions docs/source/user_guide_kg_builder.rst
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,15 @@ chunk overlap in the text splitter component:
)


Run Parameters
--------------

SimpleKGPipeline also accepts addition runtime parameters:

- ``document_metadata`` (dict): each item will be saved as a property attached to the ``Document`` node.



Using a Config file
===================

Expand Down
7 changes: 6 additions & 1 deletion examples/build_graph/simple_kg_builder_from_pdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,12 @@ async def define_and_run_pipeline(
},
neo4j_database=DATABASE,
)
return await kg_builder.run_async(file_path=str(file_path))
return await kg_builder.run_async(
file_path=str(file_path),
# optional, add document metadata, each item will
# be saved as a property of the Document node
# document_metadata={"author": "J. K. Rowling"},
)


async def main() -> PipelineResult:
Expand Down
10 changes: 9 additions & 1 deletion examples/build_graph/simple_kg_builder_from_text.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,15 @@ async def define_and_run_pipeline(
from_pdf=False,
neo4j_database=DATABASE,
)
return await kg_builder.run_async(text=TEXT)
return await kg_builder.run_async(
text=TEXT,
# optional, specify file path for the Document node
# if not, a random name will be generated
# file_path="my_document.txt"
# optional, add document metadata, each item will
# be saved as a property of the Document node
# document_metadata={"author": "Frank Herbert"},
)


async def main() -> PipelineResult:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ def create_document_node(self, document_info: DocumentInfo) -> Neo4jNode:
properties={
"path": document_info.path,
"createdAt": datetime.datetime.now(datetime.timezone.utc).isoformat(),
"document_type": document_info.document_type,
**document_metadata,
},
)
Expand Down
1 change: 1 addition & 0 deletions src/neo4j_graphrag/experimental/components/pdf_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,5 +89,6 @@ async def run(
document_info=DocumentInfo(
path=filepath,
metadata=self.get_document_metadata(text, metadata),
document_type="pdf",
),
)
1 change: 1 addition & 0 deletions src/neo4j_graphrag/experimental/components/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class DocumentInfo(DataModel):
path: str
metadata: Optional[Dict[str, str]] = None
uid: str = Field(default_factory=lambda: str(uuid.uuid4()))
document_type: Optional[str] = None

@property
def document_id(self) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.
from __future__ import annotations

from collections import defaultdict
from typing import (
Any,
ClassVar,
Expand Down Expand Up @@ -336,37 +337,40 @@ def _get_connections(self) -> list[ConnectionDefinition]:
return connections

def get_run_params(self, user_input: dict[str, Any]) -> dict[str, Any]:
run_params = {}
if self.lexical_graph_config:
run_params["extractor"] = {
"lexical_graph_config": self.lexical_graph_config,
}
run_params["writer"] = {
"lexical_graph_config": self.lexical_graph_config,
}
run_params["pruner"] = {
"lexical_graph_config": self.lexical_graph_config,
}
text = user_input.get("text")
file_path = user_input.get("file_path")
if not ((text is None) ^ (file_path is None)):
# exactly one of text or user_input must be set
if text is None and file_path is None:
# user must provide either text or file_path or both
raise PipelineDefinitionError(
"Use either 'text' (when from_pdf=False) or 'file_path' (when from_pdf=True) argument."
"At least one of `text` (when from_pdf=False) or `file_path` (when from_pdf=True) argument must be provided."
)
run_params: dict[str, dict[str, Any]] = defaultdict(dict)
if self.lexical_graph_config:
run_params["extractor"]["lexical_graph_config"] = self.lexical_graph_config
run_params["writer"]["lexical_graph_config"] = self.lexical_graph_config
run_params["pruner"]["lexical_graph_config"] = self.lexical_graph_config
if self.from_pdf:
if not file_path:
raise PipelineDefinitionError(
"Expected 'file_path' argument when 'from_pdf' is True."
)
run_params["pdf_loader"] = {"filepath": file_path}
run_params["pdf_loader"]["filepath"] = file_path
run_params["pdf_loader"]["metadata"] = user_input.get("document_metadata")
else:
if not text:
raise PipelineDefinitionError(
"Expected 'text' argument when 'from_pdf' is False."
)
run_params["splitter"] = {"text": text}
run_params["splitter"]["text"] = text
# Add full text to schema component for automatic schema extraction
if not self.has_user_provided_schema():
run_params["schema"] = {"text": text}
run_params["schema"]["text"] = text
run_params["extractor"]["document_info"] = dict(
path=user_input.get(
"file_path",
)
or "document.txt",
metadata=user_input.get("document_metadata"),
document_type="inline_text",
)
return run_params
16 changes: 13 additions & 3 deletions src/neo4j_graphrag/experimental/pipeline/kg_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,16 +145,26 @@ def __init__(
self.runner = PipelineRunner.from_config(config)

async def run_async(
self, file_path: Optional[str] = None, text: Optional[str] = None
self,
file_path: Optional[str] = None,
text: Optional[str] = None,
document_metadata: Optional[dict[str, Any]] = None,
) -> PipelineResult:
"""
Asynchronously runs the knowledge graph building process.

Args:
file_path (Optional[str]): The path to the PDF file to process. Required if `from_pdf` is True.
file_path (Optional[str]): The path to the PDF file to process. Required if `from_pdf` is True. If `from_pdf` is False, can be used to set the Document node path property.
text (Optional[str]): The text content to process. Required if `from_pdf` is False.
document_metadata (Optional[dict[str, Any]]): The metadata to attach to the document.

Returns:
PipelineResult: The result of the pipeline execution.
"""
return await self.runner.run({"file_path": file_path, "text": text})
return await self.runner.run(
{
"file_path": file_path,
"text": text,
"document_metadata": document_metadata,
}
)
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,11 @@ async def test_lexical_graph_builder_run_with_document() -> None:
TextChunk(text="text chunk 1", index=1),
]
),
document_info=DocumentInfo(path="test_lexical_graph", uid=doc_uid),
document_info=DocumentInfo(
path="test_lexical_graph",
uid=doc_uid,
document_type="my_type",
),
)
assert isinstance(result, GraphResult)
graph = result.graph
Expand All @@ -89,6 +93,7 @@ async def test_lexical_graph_builder_run_with_document() -> None:
assert document.label == DEFAULT_DOCUMENT_NODE_LABEL
assert document.properties["path"] == "test_lexical_graph"
assert document.properties["createdAt"] is not None
assert document.properties["document_type"] == "my_type"
chunk1 = nodes[1]
assert chunk1.label == DEFAULT_CHUNK_NODE_LABEL
chunk2 = nodes[2]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
from unittest.mock import Mock, patch

import neo4j
Expand Down Expand Up @@ -286,16 +287,16 @@ def test_simple_kg_pipeline_config_connections_with_er() -> None:
def test_simple_kg_pipeline_config_run_params_from_pdf_file_path() -> None:
config = SimpleKGPipelineConfig(from_pdf=True)
assert config.get_run_params({"file_path": "my_file"}) == {
"pdf_loader": {"filepath": "my_file"}
"pdf_loader": {"filepath": "my_file", "metadata": None}
}


def test_simple_kg_pipeline_config_run_params_from_text_text() -> None:
config = SimpleKGPipelineConfig(from_pdf=False)
assert config.get_run_params({"text": "my text"}) == {
"splitter": {"text": "my text"},
"schema": {"text": "my text"},
}
run_params = config.get_run_params({"text": "my text"})
assert run_params["splitter"] == {"text": "my text"}
assert run_params["schema"] == {"text": "my text"}
assert run_params["extractor"]["document_info"]["path"] == "document.txt"


def test_simple_kg_pipeline_config_run_params_from_pdf_text() -> None:
Expand All @@ -314,22 +315,13 @@ def test_simple_kg_pipeline_config_run_params_from_text_file_path() -> None:

def test_simple_kg_pipeline_config_run_params_no_file_no_text() -> None:
config = SimpleKGPipelineConfig(from_pdf=False)
with pytest.raises(PipelineDefinitionError) as excinfo:
with pytest.raises(
PipelineDefinitionError,
match=re.escape(
"At least one of `text` (when from_pdf=False) or `file_path` (when from_pdf=True) argument must be provided."
),
):
config.get_run_params({})
assert (
"Use either 'text' (when from_pdf=False) or 'file_path' (when from_pdf=True) argument."
in str(excinfo)
)


def test_simple_kg_pipeline_config_run_params_both_file_and_text() -> None:
config = SimpleKGPipelineConfig(from_pdf=False)
with pytest.raises(PipelineDefinitionError) as excinfo:
config.get_run_params({"text": "my text", "file_path": "my file"})
assert (
"Use either 'text' (when from_pdf=False) or 'file_path' (when from_pdf=True) argument."
in str(excinfo)
)


def test_simple_kg_pipeline_config_process_schema_with_precedence_legacy() -> None:
Expand Down
29 changes: 22 additions & 7 deletions tests/unit/experimental/pipeline/test_kg_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import neo4j
import pytest
from neo4j_graphrag.embeddings import Embedder
from neo4j_graphrag.experimental.components.types import LexicalGraphConfig
from neo4j_graphrag.experimental.components.types import (
LexicalGraphConfig,
)
from neo4j_graphrag.experimental.pipeline.exceptions import PipelineDefinitionError
from neo4j_graphrag.experimental.pipeline.kg_builder import SimpleKGPipeline
from neo4j_graphrag.experimental.pipeline.pipeline import PipelineResult
Expand Down Expand Up @@ -49,11 +51,16 @@ async def test_knowledge_graph_builder_document_info_with_file(_: Mock) -> None:
"run",
return_value=PipelineResult(run_id="test_run", result=None),
) as mock_run:
await kg_builder.run_async(file_path=file_path)
await kg_builder.run_async(
file_path=file_path, document_metadata={"source": "google drive"}
)

pipe_inputs = mock_run.call_args[1]["data"]
assert "pdf_loader" in pipe_inputs
assert pipe_inputs["pdf_loader"] == {"filepath": file_path}
assert pipe_inputs["pdf_loader"] == {
"filepath": file_path,
"metadata": {"source": "google drive"},
}
assert "extractor" not in pipe_inputs


Expand Down Expand Up @@ -81,11 +88,19 @@ async def test_knowledge_graph_builder_document_info_with_text(_: Mock) -> None:
"run",
return_value=PipelineResult(run_id="test_run", result=None),
) as mock_run:
await kg_builder.run_async(text=text_input)
await kg_builder.run_async(
text=text_input,
file_path="my_document.txt",
document_metadata={"source": "google drive"},
)

pipe_inputs = mock_run.call_args[1]["data"]
assert "splitter" in pipe_inputs
assert pipe_inputs["splitter"] == {"text": text_input}
assert pipe_inputs["extractor"]["document_info"]["path"] == "my_document.txt"
assert pipe_inputs["extractor"]["document_info"]["metadata"] == {
"source": "google drive"
}


@mock.patch(
Expand Down Expand Up @@ -175,6 +190,6 @@ async def test_knowledge_graph_builder_with_lexical_graph_config(_: Mock) -> Non

pipe_inputs = mock_run.call_args[1]["data"]
assert "extractor" in pipe_inputs
assert pipe_inputs["extractor"] == {
"lexical_graph_config": lexical_graph_config
}
assert pipe_inputs["extractor"]["lexical_graph_config"] == lexical_graph_config
assert pipe_inputs["extractor"]["document_info"] is not None
assert pipe_inputs["extractor"]["document_info"]["path"] == "document.txt"