Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions DEVELOPING.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,27 @@ store = ElasticsearchStore(
store = ElasticsearchStore(client=existing_client, index="custom-index")
```

### MongoDB Store
Document-based storage with MongoDB:

```python
from kv_store_adapter.stores.mongodb import MongoStore

# Connection options
store = MongoStore(host="localhost", port=27017, database="kvstore")
store = MongoStore(connection_string="mongodb://localhost:27017/kvstore")
store = MongoStore(client=existing_motor_client, database="custom_db")

# With authentication
store = MongoStore(
host="localhost",
port=27017,
username="user",
password="pass",
database="secure_db"
)
```

### Simple Stores
Dictionary-based stores for testing and development:

Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ A pluggable, async-first key-value store interface for Python applications with
## Features

- **Async-first**: Built from the ground up with `async`/`await` support
- **Multiple backends**: Redis, Elasticsearch, In-memory, Disk, and more
- **Multiple backends**: Redis, Elasticsearch, MongoDB, In-memory, Disk, and more
- **TTL support**: Automatic expiration handling across all store types
- **Type-safe**: Full type hints with Protocol-based interfaces
- **Adapters**: Pydantic, Single Collection, and more
Expand All @@ -21,11 +21,12 @@ pip install kv-store-adapter
# With specific backend support
pip install kv-store-adapter[redis]
pip install kv-store-adapter[elasticsearch]
pip install kv-store-adapter[mongodb]
pip install kv-store-adapter[memory]
pip install kv-store-adapter[disk]

# With all backends
pip install kv-store-adapter[memory,disk,redis,elasticsearch]
pip install kv-store-adapter[memory,disk,redis,elasticsearch,mongodb]
```

# The KV Store Protocol
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ memory = ["cachetools>=6.0.0"]
disk = ["diskcache>=5.6.0"]
redis = ["redis>=6.0.0"]
elasticsearch = ["elasticsearch>=9.0.0", "aiohttp>=3.12"]
mongodb = ["motor>=3.0.0"]
pydantic = ["pydantic>=2.11.9"]

[tool.pytest.ini_options]
Expand All @@ -40,7 +41,7 @@ env_files = [".env"]

[dependency-groups]
dev = [
"kv-store-adapter[memory,disk,redis,elasticsearch]",
"kv-store-adapter[memory,disk,redis,elasticsearch,mongodb]",
"kv-store-adapter[pydantic]",
"pytest",
"pytest-mock",
Expand Down
3 changes: 3 additions & 0 deletions src/kv_store_adapter/stores/mongodb/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .store import MongoStore

__all__ = ["MongoStore"]
184 changes: 184 additions & 0 deletions src/kv_store_adapter/stores/mongodb/store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
from typing import overload
from urllib.parse import urlparse

from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorCollection, AsyncIOMotorDatabase
from typing_extensions import override

from kv_store_adapter.errors import StoreConnectionError
from kv_store_adapter.stores.base.managed import BaseManagedKVStore
from kv_store_adapter.stores.utils.managed_entry import ManagedEntry


class MongoStore(BaseManagedKVStore):
"""MongoDB-based key-value store."""

_client: AsyncIOMotorClient
_database: AsyncIOMotorDatabase
_collection_name: str

@overload
def __init__(self, *, client: AsyncIOMotorClient, database: str = "kvstore", collection: str = "entries") -> None: ...

@overload
def __init__(self, *, connection_string: str, database: str = "kvstore", collection: str = "entries") -> None: ...

@overload
def __init__(
self,
*,
host: str = "localhost",
port: int = 27017,
username: str | None = None,
password: str | None = None,
database: str = "kvstore",
collection: str = "entries",
) -> None: ...

def __init__(
self,
*,
client: AsyncIOMotorClient | None = None,
connection_string: str | None = None,
host: str = "localhost",
port: int = 27017,
username: str | None = None,
password: str | None = None,
database: str = "kvstore",
collection: str = "entries",
) -> None:
"""Initialize the MongoDB store.

Args:
client: An existing AsyncIOMotorClient to use.
connection_string: MongoDB connection string (e.g., mongodb://localhost:27017/kvstore).
host: MongoDB host. Defaults to localhost.
port: MongoDB port. Defaults to 27017.
username: MongoDB username. Defaults to None.
password: MongoDB password. Defaults to None.
database: Database name to use. Defaults to kvstore.
collection: Collection name to use. Defaults to entries.
"""
if client:
self._client = client
elif connection_string:
self._client = AsyncIOMotorClient(connection_string)
# Extract database name from connection string if not in path
parsed = urlparse(connection_string)
if parsed.path and parsed.path != "/" and database != "kvstore":
pass # Keep provided database name
elif parsed.path and parsed.path != "/":
database = parsed.path.lstrip("/")
else:
# Build connection string from individual parameters
auth_str = f"{username}:{password}@" if username and password else ""
connection_str = f"mongodb://{auth_str}{host}:{port}"
self._client = AsyncIOMotorClient(connection_str)

self._database = self._client[database]
self._collection_name = collection
super().__init__()

@property
def _collection(self) -> AsyncIOMotorCollection:
"""Get the collection for storing entries."""
return self._database[self._collection_name]

@override
async def setup(self) -> None:
"""Initialize the MongoDB store by testing connectivity and creating indexes."""
try:
# Test connection
await self._client.admin.command("ping")

# Create compound index on collection+key for efficient lookups
await self._collection.create_index([("collection", 1), ("key", 1)], unique=True)

# Create TTL index for automatic expiration
await self._collection.create_index("expires_at", expireAfterSeconds=0)
except Exception as e:
raise StoreConnectionError(message=f"Failed to connect to MongoDB: {e}") from e

@override
async def setup_collection(self, collection: str) -> None:
"""Setup collection-specific resources (no-op for MongoDB)."""
# MongoDB collections are created automatically when first document is inserted

@override
async def get_entry(self, collection: str, key: str) -> ManagedEntry | None:
doc = await self._collection.find_one({"collection": collection, "key": key})

if doc is None:
return None

# Convert MongoDB document to ManagedEntry
return ManagedEntry(
collection=doc["collection"],
key=doc["key"],
value=doc["value"],
created_at=doc.get("created_at"),
ttl=doc.get("ttl"),
expires_at=doc.get("expires_at"),
)

@override
async def put_entry(
self,
collection: str,
key: str,
cache_entry: ManagedEntry,
*,
ttl: float | None = None,
) -> None:
doc = {
"collection": collection,
"key": key,
"value": cache_entry.value,
"created_at": cache_entry.created_at,
"ttl": cache_entry.ttl,
"expires_at": cache_entry.expires_at,
}

# Use upsert to replace existing entries
await self._collection.replace_one(
{"collection": collection, "key": key},
doc,
upsert=True,
)

@override
async def delete(self, collection: str, key: str) -> bool:
await self.setup_collection_once(collection=collection)

result = await self._collection.delete_one({"collection": collection, "key": key})
return result.deleted_count > 0

@override
async def keys(self, collection: str) -> list[str]:
await self.setup_collection_once(collection=collection)

cursor = self._collection.find({"collection": collection}, {"key": 1})
return [doc["key"] async for doc in cursor]

@override
async def clear_collection(self, collection: str) -> int:
await self.setup_collection_once(collection=collection)

result = await self._collection.delete_many({"collection": collection})
return result.deleted_count

@override
async def list_collections(self) -> list[str]:
await self.setup_once()

pipeline = [
{"$group": {"_id": "$collection"}},
{"$project": {"collection": "$_id", "_id": 0}},
]

cursor = self._collection.aggregate(pipeline)
return [doc["collection"] async for doc in cursor]

@override
async def cull(self) -> None:
"""MongoDB handles TTL automatically, so this is a no-op."""
# MongoDB's TTL indexes handle expiration automatically
Empty file.
89 changes: 89 additions & 0 deletions tests/stores/mongodb/test_mongodb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import asyncio
from collections.abc import AsyncGenerator

import pytest
from motor.motor_asyncio import AsyncIOMotorClient
from typing_extensions import override

from kv_store_adapter.stores.base.unmanaged import BaseKVStore
from kv_store_adapter.stores.mongodb import MongoStore
from tests.stores.conftest import BaseStoreTests

# MongoDB test configuration
MONGO_HOST = "localhost"
MONGO_PORT = 27017
MONGO_DB = "kvstore_test"

WAIT_FOR_MONGO_TIMEOUT = 30


async def ping_mongo() -> bool:
client = AsyncIOMotorClient(f"mongodb://{MONGO_HOST}:{MONGO_PORT}")
try:
await client.admin.command("ping")
except Exception:
return False
else:
return True
finally:
client.close()


async def wait_mongo() -> bool:
# with a timeout of 30 seconds
for _ in range(WAIT_FOR_MONGO_TIMEOUT):
if await ping_mongo():
return True
await asyncio.sleep(delay=1)

return False


class MongoFailedToStartError(Exception):
pass


class TestMongoStore(BaseStoreTests):
@pytest.fixture(autouse=True, scope="session")
async def setup_mongo(self) -> AsyncGenerator[None, None]:
# Try to connect to existing MongoDB or skip tests if not available
if not await ping_mongo():
pytest.skip("MongoDB not available at localhost:27017")

return

@override
@pytest.fixture
async def store(self, setup_mongo: None) -> MongoStore: # pyright: ignore[reportUnusedParameter]
"""Create a MongoDB store for testing."""
# Create the store with test database
mongo_store = MongoStore(host=MONGO_HOST, port=MONGO_PORT, database=MONGO_DB)

# Clear the test database
await mongo_store._database.drop_collection(mongo_store._collection_name)

return mongo_store

async def test_mongo_connection_string(self) -> None:
"""Test MongoDB store creation with connection string."""
connection_string = f"mongodb://{MONGO_HOST}:{MONGO_PORT}/{MONGO_DB}"
store = MongoStore(connection_string=connection_string)

await store._database.drop_collection(store._collection_name)
await store.put(collection="test", key="conn_test", value={"test": "value"})
result = await store.get(collection="test", key="conn_test")
assert result == {"test": "value"}

async def test_mongo_client_connection(self) -> None:
"""Test MongoDB store creation with existing client."""
client = AsyncIOMotorClient(f"mongodb://{MONGO_HOST}:{MONGO_PORT}")
store = MongoStore(client=client, database=MONGO_DB)

await store._database.drop_collection(store._collection_name)
await store.put(collection="test", key="client_test", value={"test": "value"})
result = await store.get(collection="test", key="client_test")
assert result == {"test": "value"}

@pytest.mark.skip(reason="Distributed Caches are unbounded")
@override
async def test_not_unbounded(self, store: BaseKVStore): ...
Loading