Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -279,3 +279,56 @@ python -m pytest
3. Commit your changes
4. Push to the branch
5. Create a Pull Request

## Running the Background Task Worker

The Redis Memory Server uses Docket for background task management. There are two ways to run the worker:

### 1. Using the Docket CLI

After installing the package, you can run the worker using the Docket CLI command:

```bash
docket worker --tasks agent_memory_server.docket_tasks:task_collection --docket memory-server
```

You can customize the concurrency and redelivery timeout:

```bash
docket worker --tasks agent_memory_server.docket_tasks:task_collection --concurrency 5 --redelivery-timeout 60 --docket memory-server
```

**NOTE:** The name passed with `--docket` is effectively the name of a task queue where
the worker will look for work. This name should match the docket name your API server
is using, configured with the `docket_name` setting via environment variable
or directly in `agent_memory_server.config.Settings`.

## Memory Compaction

The memory compaction functionality optimizes storage by merging duplicate and semantically similar memories. This improves retrieval quality and reduces storage costs.

### Key Features

- **Hash-based Deduplication**: Identifies and merges exact duplicate memories using content hashing
- **Semantic Deduplication**: Finds and merges memories with similar meaning using vector search
- **LLM-powered Merging**: Uses language models to intelligently combine memories

### Testing Approach

Testing the memory compaction functionality involves:

1. **Unit Tests**: Testing individual helper functions like `generate_memory_hash` and `merge_memories_with_llm`
2. **Integration Tests**: Testing the complete workflow with minimal mocking
3. **Mocked Tests**: Using helper functions to test specific parts of the workflow

The main integration test (`test_compact_memories_integration`) demonstrates the memory merging functionality without relying on Redis search, which makes it more robust and less prone to environment-specific failures.

### Running Tests

```bash
# Run all tests
python -m pytest tests/test_memory_compaction.py

# Run specific integration test
python -m pytest tests/test_memory_compaction.py::TestMemoryCompaction::test_compact_memories_integration -v
```
29 changes: 15 additions & 14 deletions agent_memory_server/api.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from typing import Literal

from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException
from fastapi import APIRouter, Depends, HTTPException

from agent_memory_server import long_term_memory, messages
from agent_memory_server.config import settings
from agent_memory_server.dependencies import get_background_tasks
from agent_memory_server.llms import get_model_config
from agent_memory_server.logging import get_logger
from agent_memory_server.models import (
Expand All @@ -16,7 +17,7 @@
SessionMemory,
SessionMemoryResponse,
)
from agent_memory_server.utils import get_redis_conn
from agent_memory_server.utils.redis import get_redis_conn


logger = get_logger(__name__)
Expand Down Expand Up @@ -63,7 +64,7 @@ async def list_sessions(
Returns:
List of session IDs
"""
redis = get_redis_conn()
redis = await get_redis_conn()

total, session_ids = await messages.list_sessions(
redis=redis,
Expand Down Expand Up @@ -101,7 +102,7 @@ async def get_session_memory(
Returns:
Conversation history and context
"""
redis = get_redis_conn()
redis = await get_redis_conn()

# If context_window_max is explicitly provided, use that
if context_window_max is not None:
Expand Down Expand Up @@ -130,19 +131,20 @@ async def get_session_memory(
async def put_session_memory(
session_id: str,
memory: SessionMemory,
background_tasks: BackgroundTasks,
background_tasks=Depends(get_background_tasks),
):
"""
Set session memory. Replaces existing session memory.

Args:
session_id: The session ID
memory: Messages and context to save
background_tasks: DocketBackgroundTasks instance (injected automatically)

Returns:
Acknowledgement response
"""
redis = get_redis_conn()
redis = await get_redis_conn()

await messages.set_session_memory(
redis=redis,
Expand All @@ -168,7 +170,7 @@ async def delete_session_memory(
Returns:
Acknowledgement response
"""
redis = get_redis_conn()
redis = await get_redis_conn()
await messages.delete_session_memory(
redis=redis,
session_id=session_id,
Expand All @@ -179,26 +181,25 @@ async def delete_session_memory(

@router.post("/long-term-memory", response_model=AckResponse)
async def create_long_term_memory(
payload: CreateLongTermMemoryPayload, background_tasks: BackgroundTasks
payload: CreateLongTermMemoryPayload,
background_tasks=Depends(get_background_tasks),
):
"""
Create a long-term memory

Args:
payload: Long-term memory payload
background_tasks: DocketBackgroundTasks instance (injected automatically)

Returns:
Acknowledgement response
"""
redis = get_redis_conn()

if not settings.long_term_memory:
raise HTTPException(status_code=400, detail="Long-term memory is disabled")

await long_term_memory.index_long_term_memories(
redis=redis,
await background_tasks.add_task(
long_term_memory.index_long_term_memories,
memories=payload.memories,
background_tasks=background_tasks,
)
return AckResponse(status="ok")

Expand All @@ -214,7 +215,7 @@ async def search_long_term_memory(payload: SearchPayload):
Returns:
List of search results
"""
redis = get_redis_conn()
redis = await get_redis_conn()

if not settings.long_term_memory:
raise HTTPException(status_code=400, detail="Long-term memory is disabled")
Expand Down
Empty file.
Loading