Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core[minor]: Adds an in-memory implementation of RecordManager #13200

Merged
merged 41 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
249d8a1
Adds an in-memory implementation of RecordStore
pprados Nov 10, 2023
0d5f09f
Fix spell
pprados Nov 10, 2023
3d9e062
Update to last langchain version
pprados Nov 27, 2023
f667410
Fix a race condition in SQLAlchemyMd5Cache
pprados Jan 23, 2024
ec53014
Fix a race condition in SQLAlchemyMd5Cache
pprados Jan 23, 2024
cb37574
Add MemoryRecordManager
pprados Jan 23, 2024
6fe4072
Fix TU
pprados Jan 24, 2024
37099c8
Add __init__.py
pprados Jan 24, 2024
8c036b5
Merge remote-tracking branch 'upstream/master' into pprados/memory_re…
pprados May 6, 2024
2dd7366
Adds an in-memory implementation of RecordStore
pprados Nov 10, 2023
c83cb73
Fix spell
pprados Nov 10, 2023
4dff80b
Update to last langchain version
pprados Nov 27, 2023
8f4b258
Fix a race condition in SQLAlchemyMd5Cache
pprados Jan 23, 2024
a0bcc8b
Add MemoryRecordManager
pprados Jan 23, 2024
2cc8a3f
Fix TU
pprados Jan 24, 2024
2ee0721
Add __init__.py
pprados Jan 24, 2024
6670807
Merge remote-tracking branch 'upstream/master' into pprados/memory_re…
pprados Jun 10, 2024
a3a8a75
Merge remote-tracking branch 'upstream/master' into pprados/memory_re…
pprados Jun 10, 2024
04b55d0
Add the TU, like test_sql_record_manager
pprados Jun 10, 2024
e19d627
Merge remote-tracking branch 'origin/pprados/memory_recordmanager' in…
pprados Jun 10, 2024
17b370f
Fix lint and __init__
pprados Jun 10, 2024
074129d
Merge branch 'master' into pprados/memory_recordmanager
pprados Jun 10, 2024
3d79bbf
Merge branch 'master' into pprados/memory_recordmanager
pprados Jun 11, 2024
1850348
Merge branch 'master' into pprados/memory_recordmanager
pprados Jun 12, 2024
1e4bde6
Merge branch 'master' into pprados/memory_recordmanager
pprados Jun 12, 2024
5402b31
Merge branch 'master' into pprados/memory_recordmanager
pprados Jun 12, 2024
8205777
migrate to langchain-core
pprados Jun 13, 2024
4cf163b
Merge branch 'master' into pprados/memory_recordmanager
pprados Jun 13, 2024
0a41079
Fix
pprados Jun 13, 2024
67adcbb
Fix test_public_api.py
pprados Jun 13, 2024
de36e11
Fix long_context_reorder.py async
pprados Jun 13, 2024
90bb3d7
Fix
pprados Jun 13, 2024
c12c666
Merge branch 'master' into pprados/memory_recordmanager
pprados Jun 13, 2024
d97dcc1
Merge branch 'master' into pprados/memory_recordmanager
pprados Jun 13, 2024
bd24d2a
Merge remote-tracking branch 'upstream/master' into pprados/memory_re…
pprados Jun 14, 2024
1bf3821
Merge branch 'master' into pprados/memory_recordmanager
pprados Jun 14, 2024
1ce6aed
Merge branch 'master' into pprados/memory_recordmanager
pprados Jun 14, 2024
c62bb8c
qxqx
eyurtsev Jun 14, 2024
ca79b22
x
eyurtsev Jun 14, 2024
bfc83ea
Merge branch 'master' into pprados/memory_recordmanager
pprados Jun 18, 2024
f841067
Merge branch 'master' into pprados/memory_recordmanager
pprados Jun 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions libs/community/langchain_community/indexes/memory_recordmanager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
from datetime import datetime
from typing import Any, Dict, List, Optional, Sequence

from langchain_community.indexes.base import RecordManager


class MemoryRecordManager(RecordManager):
data: List[Dict[str, Any]] = []

def __init__(self, namespace: str = ""):
super().__init__(namespace=namespace)

def create_schema(self) -> None:
pass

async def acreate_schema(self) -> None:
pass
pprados marked this conversation as resolved.
Show resolved Hide resolved

def get_time(self) -> float:
return datetime.now().timestamp()

def update(
self,
keys: Sequence[str],
*,
group_ids: Optional[Sequence[Optional[str]]] = None,
time_at_least: Optional[float] = None,
) -> None:
if group_ids is None:
group_ids = [None] * len(keys)
if len(keys) != len(group_ids):
raise ValueError(
f"Number of keys ({len(keys)}) does not match number of "
f"group_ids ({len(group_ids)})"
)

update_time = self.get_time()
if time_at_least and update_time < time_at_least:
# Safeguard against time sync issues
raise AssertionError(f"Time sync issue: {update_time} < {time_at_least}")

records_to_upsert = [
{
"key": key,
"namespace": self.namespace,
"updated_at": update_time,
"group_id": group_id,
}
for key, group_id in zip(keys, group_ids)
]
self.delete_keys(keys)
self.data.extend(records_to_upsert)

def exists(self, keys: Sequence[str]) -> List[bool]:
return [
len(list(filter(lambda record: record["key"] == key, self.data))) == 1
for key in keys
]

def list_keys(
self,
*,
before: Optional[float] = None,
after: Optional[float] = None,
group_ids: Optional[Sequence[str]] = None,
limit: Optional[int] = None,
) -> List[str]:
keys = [
record["key"]
for record in filter(
lambda record: record["namespace"] == self.namespace
and (not group_ids or record["group_id"] in group_ids)
and (not before or record["updated_at"] < before)
and (not after or record["updated_at"] > after),
self.data,
)
]
return keys[:limit]

def delete_keys(self, keys: Sequence[str]) -> None:
self.data = list(
filter(
lambda record: record["namespace"] != self.namespace
or record["key"] not in keys,
self.data,
)
)

# %% Async versions
async def aget_time(self) -> float:
return datetime.now().timestamp()
pprados marked this conversation as resolved.
Show resolved Hide resolved

async def aupdate(
self,
keys: Sequence[str],
*,
group_ids: Optional[Sequence[Optional[str]]] = None,
time_at_least: Optional[float] = None,
) -> None:
return self.update(keys=keys, group_ids=group_ids, time_at_least=time_at_least)
pprados marked this conversation as resolved.
Show resolved Hide resolved

async def aexists(self, keys: Sequence[str]) -> List[bool]:
return self.exists(keys=keys)
pprados marked this conversation as resolved.
Show resolved Hide resolved

async def alist_keys(
self,
*,
before: Optional[float] = None,
after: Optional[float] = None,
group_ids: Optional[Sequence[str]] = None,
limit: Optional[int] = None,
) -> List[str]:
return self.list_keys(
pprados marked this conversation as resolved.
Show resolved Hide resolved
before=before, after=after, group_ids=group_ids, limit=limit
)

async def adelete_keys(self, keys: Sequence[str]) -> None:
return self.delete_keys(keys=keys)
pprados marked this conversation as resolved.
Show resolved Hide resolved
132 changes: 132 additions & 0 deletions libs/community/tests/unit_tests/indexes/test_memory_record_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import pytest
import pytest_asyncio

from langchain_community.indexes.memory_recordmanager import MemoryRecordManager


@pytest.fixture()
def manager() -> MemoryRecordManager:
"""Initialize the test database and yield the TimestampedSet instance."""
# Initialize and yield the TimestampedSet instance
record_manager = MemoryRecordManager(namespace="kittens")
return record_manager


@pytest_asyncio.fixture # type: ignore
async def amanager() -> MemoryRecordManager:
"""Initialize the test database and yield the TimestampedSet instance."""
# Initialize and yield the TimestampedSet instance
record_manager = MemoryRecordManager(namespace="kittens")
return record_manager


def test_update(manager: MemoryRecordManager) -> None:
"""Test updating records in the database."""
# no keys should be present in the set
read_keys = manager.list_keys()
assert read_keys == []
# Insert records
keys = ["key1", "key2", "key3"]
manager.update(keys)
# Retrieve the records
read_keys = manager.list_keys()
assert read_keys == ["key1", "key2", "key3"]


async def test_aupdate(amanager: MemoryRecordManager) -> None:
"""Test updating records in the database."""
# no keys should be present in the set
read_keys = await amanager.alist_keys()
assert read_keys == []
# Insert records
keys = ["key1", "key2", "key3"]
await amanager.aupdate(keys)
# Retrieve the records
read_keys = await amanager.alist_keys()
assert read_keys == ["key1", "key2", "key3"]


def test_update_with_group_ids(manager: MemoryRecordManager) -> None:
"""Test updating records in the database."""
# no keys should be present in the set
read_keys = manager.list_keys()
assert read_keys == []
# Insert records
keys = ["key1", "key2", "key3"]
manager.update(keys)
# Retrieve the records
read_keys = manager.list_keys()
assert read_keys == ["key1", "key2", "key3"]


async def test_aupdate_with_group_ids(amanager: MemoryRecordManager) -> None:
"""Test updating records in the database."""
# no keys should be present in the set
read_keys = await amanager.alist_keys()
assert read_keys == []
# Insert records
keys = ["key1", "key2", "key3"]
await amanager.aupdate(keys)
# Retrieve the records
read_keys = await amanager.alist_keys()
assert read_keys == ["key1", "key2", "key3"]


def test_exists(manager: MemoryRecordManager) -> None:
"""Test checking if keys exist in the database."""
# Insert records
keys = ["key1", "key2", "key3"]
manager.update(keys)
# Check if the keys exist in the database
exists = manager.exists(keys)
assert len(exists) == len(keys)
assert exists == [True, True, True]

exists = manager.exists(["key1", "key4"])
assert len(exists) == 2
assert exists == [True, False]


async def test_aexists(amanager: MemoryRecordManager) -> None:
"""Test checking if keys exist in the database."""
# Insert records
keys = ["key1", "key2", "key3"]
await amanager.aupdate(keys)
# Check if the keys exist in the database
exists = await amanager.aexists(keys)
assert len(exists) == len(keys)
assert exists == [True, True, True]

exists = await amanager.aexists(["key1", "key4"])
assert len(exists) == 2
assert exists == [True, False]


def test_delete_keys(manager: MemoryRecordManager) -> None:
"""Test deleting keys from the database."""
# Insert records
keys = ["key1", "key2", "key3"]
manager.update(keys)

# Delete some keys
keys_to_delete = ["key1", "key2"]
manager.delete_keys(keys_to_delete)

# Check if the deleted keys are no longer in the database
remaining_keys = manager.list_keys()
assert remaining_keys == ["key3"]


async def test_adelete_keys(amanager: MemoryRecordManager) -> None:
"""Test deleting keys from the database."""
# Insert records
keys = ["key1", "key2", "key3"]
await amanager.aupdate(keys)

# Delete some keys
keys_to_delete = ["key1", "key2"]
await amanager.adelete_keys(keys_to_delete)

# Check if the deleted keys are no longer in the database
remaining_keys = await amanager.alist_keys()
assert remaining_keys == ["key3"]
Loading