Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 37 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ Key motivations inspired by the broader Lance roadmap<sup>[1](https://github.com
- Unified schema for agent messages (`ContextRecord`) with optional embeddings and metadata.
- Automatic versioning via Lance manifests with `checkout(version)` support.
- Background compaction to optimize storage and read performance.
- Remote persistence: point the store at `s3://` URIs with either AWS environment variables or explicit credentials/endpoint overrides.
- Remote persistence on any `object_store` backend (S3, GCS, Azure Blob, ...)
via the generic `storage_options` dict, aligned with `lance` and `lance-graph`.
- Python API (`lance_context.api.Context`) aligned with the Rust implementation.
- Integration tests that exercise real persistence, image serialization, and version rollbacks.

Expand Down Expand Up @@ -68,16 +69,42 @@ ctx.checkout(first_version)

print("Entries after checkout:", ctx.entries())

# Store context in S3 (e.g., for MinIO/moto test endpoints)
# Remote persistence on any object_store backend uses a generic `storage_options`
# dict, matching the conventions used by `lance` and `lance-graph`.
#
# Amazon S3 (and S3-compatible endpoints like MinIO / moto):
ctx = Context.create(
"s3://my-bucket/context.lance",
aws_access_key_id="minioadmin",
aws_secret_access_key="minioadmin",
region="us-east-1",
endpoint_url="http://localhost:9000",
allow_http=True,
storage_options={
"aws_access_key_id": "minioadmin",
"aws_secret_access_key": "minioadmin",
"aws_region": "us-east-1",
"aws_endpoint_url": "http://localhost:9000", # optional
"aws_allow_http": "true", # optional
},
)
# Environment variables (AWS_ACCESS_KEY_ID, ...) are picked up by lance when
# `storage_options` isn't provided; pass overrides only when you need them.

# Google Cloud Storage:
ctx = Context.create(
"gs://my-bucket/context.lance",
storage_options={
# Pick one: inline service-account JSON, path to the JSON file, or ADC.
"google_service_account_key": service_account_json,
# "google_service_account_path": "/path/to/sa.json",
# "google_application_credentials": "/path/to/adc.json",
},
)

# Azure Blob Storage:
ctx = Context.create(
"az://my-container/context.lance",
storage_options={
"azure_storage_account_name": "...",
"azure_storage_account_key": "...",
},
)
# AWS_* environment variables work too—pass overrides only when you need custom endpoints.

# Background Compaction - optimize storage and read performance
ctx = Context.create(
Expand Down Expand Up @@ -141,7 +168,8 @@ println!("Current version {}", store.version());

We are tracking future enhancements as GitHub issues:

- [Support S3-backed context stores](https://github.com/lance-format/lance-context/issues/14)
- ~~[Support S3-backed context stores](https://github.com/lance-format/lance-context/issues/14)~~ ✅ **Implemented**
- ~~[Support standard storage_options / GCS](https://github.com/lance-format/lance-context/issues/45)~~ ✅ **Implemented**
- [Add relationship column for GraphRAG workflows](https://github.com/lance-format/lance-context/issues/15)
- ~~[Background compaction for Lance fragments](https://github.com/lance-format/lance-context/issues/16)~~ ✅ **Implemented**

Expand Down
4 changes: 3 additions & 1 deletion python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ requires = ["maturin>=1.4"]
build-backend = "maturin"

[project.optional-dependencies]
tests = ["pytest", "ruff", "moto[s3]", "boto3", "botocore"]
# `moto[server]` pulls in flask + flask-cors so moto.server can be launched
# as a subprocess for the S3 integration tests.
tests = ["pytest", "ruff", "moto[s3,server]", "boto3", "botocore"]
dev = ["ruff", "pyright"]

[tool.ruff]
Expand Down
139 changes: 122 additions & 17 deletions python/python/lance_context/api.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import warnings
from datetime import datetime
from io import BytesIO
from typing import Any
Expand Down Expand Up @@ -134,40 +135,145 @@ def _normalize_search_hit(raw: dict[str, Any]) -> dict[str, Any]:
return result


_AWS_KWARG_MAP: dict[str, str] = {
"aws_access_key_id": "aws_access_key_id",
"aws_secret_access_key": "aws_secret_access_key",
"aws_session_token": "aws_session_token",
"region": "aws_region",
"endpoint_url": "aws_endpoint_url",
}


def _merge_storage_options(
storage_options: dict[str, Any] | None,
*,
aws_access_key_id: str | None,
aws_secret_access_key: str | None,
aws_session_token: str | None,
region: str | None,
endpoint_url: str | None,
allow_http: bool,
) -> dict[str, Any]:
"""Merge deprecated AWS-specific kwargs into a generic storage_options dict.

Emits a single DeprecationWarning when any AWS kwarg is used so callers
can migrate to the generic `storage_options` path (which works for S3,
GCS, Azure, and any other lance/object_store backend).
"""
options: dict[str, Any] = dict(storage_options or {})

aws_kwargs = {
"aws_access_key_id": aws_access_key_id,
"aws_secret_access_key": aws_secret_access_key,
"aws_session_token": aws_session_token,
"region": region,
"endpoint_url": endpoint_url,
}
used = [name for name, value in aws_kwargs.items() if value is not None]
if allow_http:
used.append("allow_http")

if used:
warnings.warn(
"The AWS-specific kwargs "
f"({', '.join(sorted(used))}) are deprecated and will be removed in a "
"future release. Pass credentials via the generic "
"`storage_options` dict instead (e.g. "
"storage_options={'aws_access_key_id': ..., "
"'aws_secret_access_key': ...} for S3, or "
"storage_options={'google_service_account_key': ...} for GCS).",
DeprecationWarning,
stacklevel=3,
)

for kwarg_name, option_key in _AWS_KWARG_MAP.items():
value = aws_kwargs[kwarg_name]
if value is not None:
options.setdefault(option_key, value)
if allow_http:
options.setdefault("aws_allow_http", True)

return options


class Context:
"""Multimodal, versioned context store backed by Lance.

Storage backends are configured via the generic ``storage_options`` dict,
aligned with the conventions used by ``lance`` and ``lance-graph``. Any
keys understood by the underlying ``object_store`` crate are accepted.

Examples:
Local filesystem::

Context.create("/tmp/context.lance")

Amazon S3 (or S3-compatible endpoints like MinIO / moto)::

Context.create(
"s3://bucket/prefix/context.lance",
storage_options={
"aws_access_key_id": "...",
"aws_secret_access_key": "...",
"aws_region": "us-east-1",
"aws_endpoint_url": "http://localhost:9000", # optional
"aws_allow_http": "true", # optional
},
)

Google Cloud Storage::

Context.create(
"gs://bucket/prefix/context.lance",
storage_options={
# Any one of these is enough; pick whatever fits your
# deployment (inline JSON, file path, or ADC).
"google_service_account_key": service_account_json,
# "google_service_account_path": "/path/to/sa.json",
# "google_application_credentials": "/path/to/adc.json",
},
)

Azure Blob Storage::

Context.create(
"az://container/prefix/context.lance",
storage_options={
"azure_storage_account_name": "...",
"azure_storage_account_key": "...",
},
)
"""

def __init__(
self,
uri: str,
*,
storage_options: dict[str, Any] | None = None,
# --- Deprecated AWS-specific shortcuts (kept for backwards compat). ---
aws_access_key_id: str | None = None,
aws_secret_access_key: str | None = None,
aws_session_token: str | None = None,
region: str | None = None,
endpoint_url: str | None = None,
allow_http: bool = False,
# Compaction configuration
# --- Compaction configuration. ---
enable_background_compaction: bool = False,
compaction_interval_secs: int = 300,
compaction_min_fragments: int = 5,
compaction_target_rows: int = 1_000_000,
quiet_hours: list[tuple[int, int]] | None = None,
) -> None:
options = dict(storage_options or {})
if aws_access_key_id is not None:
options["aws_access_key_id"] = aws_access_key_id
if aws_secret_access_key is not None:
options["aws_secret_access_key"] = aws_secret_access_key
if aws_session_token is not None:
options["aws_session_token"] = aws_session_token
if region is not None:
options["aws_region"] = region
if endpoint_url is not None:
options["aws_endpoint_url"] = endpoint_url
if allow_http:
options["aws_allow_http"] = True

# Build compaction config
options = _merge_storage_options(
storage_options,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
aws_session_token=aws_session_token,
region=region,
endpoint_url=endpoint_url,
allow_http=allow_http,
)

compaction_config = {
"enabled": enable_background_compaction,
"check_interval_secs": compaction_interval_secs,
Expand Down Expand Up @@ -197,7 +303,6 @@ def create(
region: str | None = None,
endpoint_url: str | None = None,
allow_http: bool = False,
# Compaction configuration
enable_background_compaction: bool = False,
compaction_interval_secs: int = 300,
compaction_min_fragments: int = 5,
Expand Down
90 changes: 90 additions & 0 deletions python/tests/test_gcs_persistence.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
"""Opt-in GCS integration tests.

Skipped by default. To run locally against a real (or emulated) GCS:

# Option A: real GCS
export LANCE_CONTEXT_GCS_BUCKET=my-test-bucket
export GOOGLE_APPLICATION_CREDENTIALS=/path/to/sa.json
uv run pytest python/tests/test_gcs_persistence.py -v

# Option B: against fake-gcs-server or another emulator, point the
# relevant storage_options at the emulator endpoint (e.g. via
# `use_opendal=true`, `endpoint=http://...`, `allow_anonymous=true`).
export LANCE_CONTEXT_GCS_BUCKET=test-bucket
export LANCE_CONTEXT_GCS_ENDPOINT=http://127.0.0.1:4443
uv run pytest python/tests/test_gcs_persistence.py -v

These tests intentionally do not bring up their own emulator in CI because
there is no pure-Python GCS emulator that is both (a) maintained on modern
Python and (b) fully compatible with the lance-io GCS backend. The S3
suite uses moto, which has no GCS counterpart of equivalent quality.
"""

from __future__ import annotations

import os
import sys
import uuid
from pathlib import Path

import pytest

PACKAGE_ROOT = Path(__file__).resolve().parents[2] / "python" / "python"
if str(PACKAGE_ROOT) not in sys.path:
sys.path.insert(0, str(PACKAGE_ROOT))

from lance_context.api import Context # noqa: E402

lance = pytest.importorskip("lance")

GCS_BUCKET = os.environ.get("LANCE_CONTEXT_GCS_BUCKET")
GCS_ENDPOINT = os.environ.get("LANCE_CONTEXT_GCS_ENDPOINT")
GCS_SA_JSON = os.environ.get("LANCE_CONTEXT_GCS_SERVICE_ACCOUNT_KEY")
GCS_ADC = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS")

_has_gcs_config = bool(GCS_BUCKET) and (
bool(GCS_SA_JSON) or bool(GCS_ADC) or bool(GCS_ENDPOINT)
)

pytestmark = pytest.mark.skipif(
not _has_gcs_config,
reason=(
"Set LANCE_CONTEXT_GCS_BUCKET plus one of "
"LANCE_CONTEXT_GCS_SERVICE_ACCOUNT_KEY / "
"GOOGLE_APPLICATION_CREDENTIALS / "
"LANCE_CONTEXT_GCS_ENDPOINT to run GCS integration tests."
),
)


def _gcs_storage_options() -> dict[str, str]:
options: dict[str, str] = {}
if GCS_SA_JSON is not None:
options["google_service_account_key"] = GCS_SA_JSON
if GCS_ADC is not None:
options["google_application_credentials"] = GCS_ADC
if GCS_ENDPOINT is not None:
# Emulator path: OpenDAL backend supports a custom endpoint and
# anonymous auth, which is how fake-gcs-server is typically driven.
options["use_opendal"] = "true"
options["endpoint"] = GCS_ENDPOINT
options.setdefault("allow_anonymous", "true")
return options


def test_gcs_round_trip_via_storage_options() -> None:
"""End-to-end: Context.create(gs://...) with generic storage_options."""
assert GCS_BUCKET is not None
key = f"contexts/{uuid.uuid4().hex}/context.lance"
uri = f"gs://{GCS_BUCKET}/{key}"
options = _gcs_storage_options()

ctx = Context.create(uri, storage_options=options)

ctx.add("user", "gcs-hello")
ctx.add("assistant", "gcs-response")
assert ctx.entries() == 2

dataset = lance.dataset(uri, storage_options=options)
rows = dataset.to_table().to_pylist()
assert [row["text_payload"] for row in rows] == ["gcs-hello", "gcs-response"]
Loading
Loading