Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .semversioner/next-release/patch-20260409021555501931.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"type": "patch",
"description": "implement parquet reader"
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ def create_input_reader(config: InputConfig, storage: Storage) -> InputReader:
from graphrag_input.markitdown import MarkItDownFileReader

register_input_reader(InputType.MarkItDown, MarkItDownFileReader)
case InputType.Parquet:
from graphrag_input.parquet import ParquetFileReader

register_input_reader(InputType.Parquet, ParquetFileReader)
case _:
msg = f"InputConfig.type '{input_strategy}' is not registered in the InputReaderFactory. Registered types: {', '.join(input_reader_factory.keys())}."
raise ValueError(msg)
Expand Down
2 changes: 2 additions & 0 deletions packages/graphrag-input/graphrag_input/input_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ class InputType(StrEnum):
"""The JSON Lines input type."""
MarkItDown = "markitdown"
"""The MarkItDown input type."""
Parquet = "parquet"
"""The Parquet input type."""

def __repr__(self):
"""Get a string representation."""
Expand Down
39 changes: 39 additions & 0 deletions packages/graphrag-input/graphrag_input/parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License

"""A module containing 'ParquetFileReader' model."""

import io
import logging

import pyarrow.parquet as pq

from graphrag_input.structured_file_reader import StructuredFileReader
from graphrag_input.text_document import TextDocument

logger = logging.getLogger(__name__)


class ParquetFileReader(StructuredFileReader):
"""Reader implementation for parquet files."""

def __init__(self, file_pattern: str | None = None, **kwargs):
super().__init__(
file_pattern=file_pattern if file_pattern is not None else ".*\\.parquet$",
**kwargs,
)

async def read_file(self, path: str) -> list[TextDocument]:
"""Read a parquet file into a list of documents.

Args:
- path - The path to read the file from.

Returns
-------
- output - list with a TextDocument for each row in the file.
"""
file_bytes = await self._storage.get(path, as_bytes=True)
table = pq.read_table(io.BytesIO(file_bytes))
rows = table.to_pylist()
return await self.process_data_columns(rows, path)
3 changes: 2 additions & 1 deletion packages/graphrag-input/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ dependencies = [
"graphrag-storage==3.0.8 ",
"pydantic~=2.10",
"markitdown~=0.1.0",
"markitdown[pdf]"
"markitdown[pdf]",
"pyarrow>=14.0.0"
]

[project.urls]
Expand Down
Binary file not shown.
61 changes: 61 additions & 0 deletions tests/unit/indexing/input/test_parquet_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License

from graphrag_input import InputConfig, InputType, create_input_reader
from graphrag_storage import StorageConfig, create_storage


async def test_parquet_loader_one_file():
config = InputConfig(
type=InputType.Parquet,
file_pattern=".*\\.parquet$",
)
storage = create_storage(
StorageConfig(
base_dir="tests/unit/indexing/input/data/one-parquet",
)
)
reader = create_input_reader(config, storage)
documents = await reader.read_files()
assert len(documents) == 2
assert documents[0].title == "input.parquet (0)"
assert documents[0].raw_data == {
"title": "Hello",
"text": "Hi how are you today?",
}
assert documents[1].title == "input.parquet (1)"


async def test_parquet_loader_one_file_with_title():
config = InputConfig(
type=InputType.Parquet,
title_column="title",
)
storage = create_storage(
StorageConfig(
base_dir="tests/unit/indexing/input/data/one-parquet",
)
)
reader = create_input_reader(config, storage)
documents = await reader.read_files()
assert len(documents) == 2
assert documents[0].title == "Hello"
assert documents[1].title == "World"


async def test_parquet_loader_text_content():
config = InputConfig(
type=InputType.Parquet,
text_column="text",
title_column="title",
)
storage = create_storage(
StorageConfig(
base_dir="tests/unit/indexing/input/data/one-parquet",
)
)
reader = create_input_reader(config, storage)
documents = await reader.read_files()
assert len(documents) == 2
assert documents[0].text == "Hi how are you today?"
assert documents[1].text == "This is a test."
3,710 changes: 1,856 additions & 1,854 deletions uv.lock

Large diffs are not rendered by default.

Loading