Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -235,3 +235,5 @@ libs/redis/docs/.Trash*
.claude
TASK_MEMORY.md
*.code-workspace

augment*.md
169 changes: 169 additions & 0 deletions agent_memory_server/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,175 @@ async def run_migrations():
click.echo("Memory migrations completed successfully.")


@cli.command()
@click.option(
"--batch-size",
default=1000,
help="Number of keys to process in each batch",
)
@click.option(
"--dry-run",
is_flag=True,
help="Only count keys without migrating",
)
def migrate_working_memory(batch_size: int, dry_run: bool):
"""
Migrate working memory keys from string format to JSON format.

This command migrates all working memory keys stored in the old string
format (JSON serialized as a string) to the new native Redis JSON format.

Use --dry-run to see how many keys need migration without making changes.
"""
import asyncio
import time

from agent_memory_server.utils.keys import Keys
from agent_memory_server.working_memory import (
set_migration_complete,
)

configure_logging()

async def run_migration():
import json as json_module

redis = await get_redis_conn()

# Scan for string keys only using _type filter (much faster)
string_keys = []
cursor = 0
pattern = Keys.working_memory_key("*")

click.echo("Scanning for working memory keys (string type only)...")
scan_start = time.time()

while True:
# Use _type="string" to only get string keys directly
cursor, keys = await redis.scan(
cursor, match=pattern, count=batch_size, _type="string"
)

if keys:
string_keys.extend(keys)

if cursor == 0:
break

scan_time = time.time() - scan_start

click.echo(f"Scan completed in {scan_time:.2f}s")
click.echo(f" String format (need migration): {len(string_keys)}")

if not string_keys:
click.echo("\nNo keys need migration. All done!")
# Mark migration as complete
await set_migration_complete(redis)
return

if dry_run:
click.echo("\n--dry-run specified, no changes made.")
return

# Migrate keys in batches using pipeline
click.echo(f"\nMigrating {len(string_keys)} keys...")
migrate_start = time.time()
migrated = 0
errors = 0

# Process in batches
for batch_start in range(0, len(string_keys), batch_size):
batch_keys = string_keys[batch_start : batch_start + batch_size]

# Read all string data and TTLs in a pipeline
read_pipe = redis.pipeline()
for key in batch_keys:
read_pipe.get(key)
read_pipe.ttl(key)
results = await read_pipe.execute()

# Parse results (alternating: data, ttl, data, ttl, ...)
migrations = [] # List of (key, data, ttl) tuples
for i, key in enumerate(batch_keys):
string_data = results[i * 2]
ttl = results[i * 2 + 1]

if string_data is None:
continue

try:
if isinstance(string_data, bytes):
string_data = string_data.decode("utf-8")
data = json_module.loads(string_data)
migrations.append((key, data, ttl))
except Exception as e:
errors += 1
logger.error(f"Failed to parse key {key}: {e}")

# Execute migrations in a pipeline (delete + json.set + expire if needed)
if migrations:
write_pipe = redis.pipeline()
for key, data, ttl in migrations:
write_pipe.delete(key)
write_pipe.json().set(key, "$", data)
if ttl > 0:
write_pipe.expire(key, ttl)

try:
await write_pipe.execute()
migrated += len(migrations)
except Exception as e:
# If batch fails, try one by one
logger.warning(
f"Batch migration failed, retrying individually: {e}"
)
for key, data, ttl in migrations:
try:
await redis.delete(key)
await redis.json().set(key, "$", data)
if ttl > 0:
await redis.expire(key, ttl)
migrated += 1
except Exception as e2:
errors += 1
logger.error(f"Failed to migrate key {key}: {e2}")

# Progress update
total_processed = batch_start + len(batch_keys)
if total_processed % 10000 == 0 or total_processed == len(string_keys):
elapsed = time.time() - migrate_start
rate = migrated / elapsed if elapsed > 0 else 0
remaining = len(string_keys) - total_processed
eta = remaining / rate if rate > 0 else 0
click.echo(
f" Migrated {migrated}/{len(string_keys)} "
f"({rate:.0f} keys/sec, ETA: {eta:.0f}s)"
)

migrate_time = time.time() - migrate_start
rate = migrated / migrate_time if migrate_time > 0 else 0

click.echo(f"\nMigration completed in {migrate_time:.2f}s")
click.echo(f" Migrated: {migrated}")
click.echo(f" Errors: {errors}")
click.echo(f" Rate: {rate:.0f} keys/sec")

if errors == 0:
# Mark migration as complete
await set_migration_complete(redis)
click.echo("\nMigration status set to complete.")
click.echo(
"\n💡 Tip: Set WORKING_MEMORY_MIGRATION_COMPLETE=true to skip "
"startup checks permanently."
)
else:
click.echo(
"\nMigration completed with errors. " "Run again to retry failed keys."
)

asyncio.run(run_migration())


@cli.command()
@click.option("--port", default=settings.port, help="Port to run the server on")
@click.option("--host", default="0.0.0.0", help="Host to run the server on")
Expand Down
7 changes: 7 additions & 0 deletions agent_memory_server/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,13 @@ class Settings(BaseSettings):
0.7 # Fraction of context window that triggers summarization
)

# Working memory migration settings
# Set to True to skip backward compatibility checks for old string-format keys.
# Use this after running 'agent-memory migrate-working-memory' or for fresh installs.
# When True, the server assumes all working memory keys are in JSON format,
# skipping the startup scan and per-read type checks for better performance.
working_memory_migration_complete: bool = False

# Query optimization settings
query_optimization_prompt_template: str = """Transform this natural language query into an optimized version for semantic search. The goal is to make it more effective for finding semantically similar content while preserving the original intent.

Expand Down
10 changes: 7 additions & 3 deletions agent_memory_server/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,13 @@ async def lifespan(app: FastAPI):
"Long-term memory requires OpenAI for embeddings, but OpenAI API key is not set"
)

# Set up Redis connection if long-term memory is enabled
if settings.long_term_memory:
await get_redis_conn()
# Set up Redis connection and check working memory migration status
redis_conn = await get_redis_conn()

# Check if any working memory keys need migration from string to JSON format
from agent_memory_server.working_memory import check_and_set_migration_status

await check_and_set_migration_status(redis_conn)

# Initialize Docket for background tasks if enabled
if settings.use_docket:
Expand Down
Loading
Loading