# Mem0 API Quick Check
This notebook uses the `mem0ai` Python client against the local Mem0-compatible server to verify basic operations.

## Mem0 Service

In [None]:
import os
import time
from contextlib import contextmanager

from dotenv import find_dotenv, load_dotenv
from mem0 import MemoryClient

load_dotenv(find_dotenv(), override=True)


# Simple timing utility
@contextmanager
def timer(description="Operation"):
    start = time.time()
    print(f"Starting: {description}")
    try:
        yield
    finally:
        elapsed = time.time() - start
        print(f"Completed: {description} in {elapsed:.2f} seconds")


API_KEY = os.getenv("MEM0_API_KEY")
HOST = os.getenv("MEM0_API_HOST") or os.getenv("MEM0_API_BASE")

if HOST:
    client = MemoryClient(api_key=API_KEY, host=HOST)
else:
    client = MemoryClient(api_key=API_KEY)

client

<mem0.client.main.MemoryClient at 0x78805751ab70>

In [30]:
# Shared test data
test_messages = [
    {"role": "user", "content": "My name is Alice"},
    {"role": "assistant", "content": "Nice to meet you"},
]
test_user_id = "alice"
test_agent_id = "agent-123"
test_metadata = {"source": "notebook"}
test_query = "favorite color"

In [31]:
add_result = client.add(
    messages=test_messages,
    user_id=test_user_id,
    agent_id=test_agent_id,
    metadata=test_metadata,
)
add_result

{'results': [{'id': '64fdb880-7444-45e2-870a-a5a315f9b088',
   'event': 'ADD',
   'memory': 'User Name is Alice',
   'structured_attributes': {'day': 29,
    'hour': 12,
    'year': 2025,
    'month': 9,
    'minute': 10,
    'quarter': 3,
    'is_weekend': False,
    'day_of_week': 'monday',
    'day_of_year': 272,
    'week_of_year': 40}}]}

In [32]:
memory_id = add_result["results"][0]["id"]
client.get(memory_id)

{'id': '64fdb880-7444-45e2-870a-a5a315f9b088',
 'memory': 'User Name is Alice',
 'user_id': 'alice',
 'metadata': {'source': 'notebook'},
 'categories': None,
 'created_at': '2025-09-29T05:10:56.658609-07:00',
 'updated_at': '2025-09-29T05:10:56.720694-07:00',
 'expiration_date': None,
 'structured_attributes': {'day': 29,
  'hour': 12,
  'year': 2025,
  'month': 9,
  'minute': 10,
  'quarter': 3,
  'is_weekend': False,
  'day_of_week': 'monday',
  'day_of_year': 272,
  'week_of_year': 40}}

In [33]:
client.search(
    query=test_query,
    user_id=test_user_id,
    limit=5,
)

[]

In [34]:
client.delete(memory_id)

{'message': 'Memory deleted successfully!'}

### Testing Local Server

In [35]:
local_client = MemoryClient(api_key=API_KEY, host="http://localhost:8000")
local_client

<mem0.client.main.MemoryClient at 0x7880573da510>

In [36]:
local_add_result = local_client.add(
    messages=test_messages,
    user_id=test_user_id,
    agent_id=test_agent_id,
    metadata=test_metadata,
)
local_add_result

{'results': [{'id': 'd6a806e6-58e0-4dd9-a909-61f99c09d2bf',
   'memory': 'user: My name is Alice',
   'event': 'ADD'},
  {'id': 'b94b6e70-25ad-4fae-9eae-5bc090a8796f',
   'memory': 'assistant: Nice to meet you',
   'event': 'ADD'}]}

In [37]:
local_memory_id = local_add_result["results"][0]["id"]
local_client.get(local_memory_id)

{'id': 'd6a806e6-58e0-4dd9-a909-61f99c09d2bf',
 'memory': 'user: My name is Alice',
 'user_id': 'alice',
 'agent_id': 'agent-123',
 'run_id': None,
 'metadata': {'source': 'notebook'},
 'created_at': '2025-09-29T12:10:58.029971Z',
 'updated_at': None}

In [38]:
local_client.search(
    query=test_query,
    user_id=test_user_id,
    limit=5,
)

{'results': [{'id': 'b94b6e70-25ad-4fae-9eae-5bc090a8796f',
   'memory': 'assistant: Nice to meet you',
   'user_id': 'alice',
   'agent_id': 'agent-123',
   'run_id': None,
   'metadata': {'source': 'notebook'},
   'created_at': '2025-09-29T12:10:58.029971Z',
   'updated_at': None,
   'score': 0.34146341463414637},
  {'id': 'd6a806e6-58e0-4dd9-a909-61f99c09d2bf',
   'memory': 'user: My name is Alice',
   'user_id': 'alice',
   'agent_id': 'agent-123',
   'run_id': None,
   'metadata': {'source': 'notebook'},
   'created_at': '2025-09-29T12:10:58.029971Z',
   'updated_at': None,
   'score': 0.2222222222222222}]}

In [39]:
local_client.delete(local_memory_id)

{'message': 'Memory deleted successfully'}

## Async Client and Batch Processing

In [None]:
import asyncio

from mem0 import AsyncMemoryClient


# Async client setup
async def create_async_client():
    if HOST:
        return AsyncMemoryClient(api_key=API_KEY, host=HOST)
    else:
        return AsyncMemoryClient(api_key=API_KEY)


async_client = await create_async_client()
print("Async client created:", async_client)

Async client created: <mem0.client.main.AsyncMemoryClient object at 0x788057752ae0>


In [None]:
# Batch processing example - adding multiple memories at once
batch_users = ["user_1", "user_2", "user_3", "user_4", "user_5"]
batch_memories = []

for i, user_id in enumerate(batch_users):
    messages = [
        {
            "role": "user",
            "content": f"I am {user_id} and I like color {['red', 'blue', 'green', 'yellow', 'purple'][i]}",
        },
        {
            "role": "assistant",
            "content": f"Got it! I'll remember {user_id} likes {['red', 'blue', 'green', 'yellow', 'purple'][i]}",
        },
    ]
    batch_memories.append(
        {
            "messages": messages,
            "user_id": user_id,
            "metadata": {
                "batch": True,
                "color_preference": ["red", "blue", "green", "yellow", "purple"][i],
            },
        }
    )

print(f"Prepared {len(batch_memories)} memories for batch processing")

Prepared 5 memories for batch processing


In [49]:
# Synchronous batch processing with timing
with timer("Synchronous batch memory addition"):
    batch_results = []
    for memory_data in batch_memories:
        result = client.add(**memory_data)
        batch_results.append(result)

print(f"Added {len(batch_results)} memories synchronously")

Starting: Synchronous batch memory addition
Completed: Synchronous batch memory addition in 35.70 seconds
Added 5 memories synchronously


In [None]:
# Async batch processing
async def async_batch_add(memories):
    tasks = []
    for memory_data in memories:
        task = async_client.add(**memory_data)
        tasks.append(task)

    # Execute all tasks concurrently
    results = await asyncio.gather(*tasks)
    return results


# Run async batch processing with timing
async def run_async_batch():
    with timer("Asynchronous batch memory addition"):
        results = await async_batch_add(batch_memories)
        return results


# Execute async batch
async_batch_results = await run_async_batch()
print(f"Added {len(async_batch_results)} memories asynchronously")

Starting: Asynchronous batch memory addition
Completed: Asynchronous batch memory addition in 9.95 seconds
Added 5 memories asynchronously


In [None]:
# Batch search operations
async def async_batch_search(queries_and_users):
    tasks = []
    for query, user_id in queries_and_users:
        task = async_client.search(query=query, user_id=user_id, limit=3)
        tasks.append(task)

    results = await asyncio.gather(*tasks)
    return results


# Prepare batch search queries
search_queries = [
    ("color preferences", "user_1"),
    ("color preferences", "user_2"),
    ("color preferences", "user_3"),
    ("color preferences", "user_4"),
    ("color preferences", "user_5"),
]

# Compare sync vs async search performance
with timer("Synchronous batch search"):
    sync_search_results = []
    for query, user_id in search_queries:
        result = client.search(query=query, user_id=user_id, limit=3)
        sync_search_results.append(result)


async def run_async_search():
    with timer("Asynchronous batch search"):
        results = await async_batch_search(search_queries)
        return results


async_search_results = await run_async_search()

print(f"Sync search found {sum(len(r) for r in sync_search_results)} total memories")
print(f"Async search found {sum(len(r) for r in async_search_results)} total memories")

Starting: Synchronous batch search
Completed: Synchronous batch search in 2.24 seconds
Starting: Asynchronous batch search
Completed: Asynchronous batch search in 1.93 seconds
Sync search found 10 total memories
Async search found 10 total memories


In [None]:
# Cleanup: Delete batch memories
async def cleanup_batch_memories():
    # Get all memory IDs from batch results
    memory_ids = []
    for result in async_batch_results:
        if "results" in result:
            memory_ids.extend([mem["id"] for mem in result["results"]])

    print(f"Cleaning up {len(memory_ids)} memories...")

    # Delete all memories concurrently
    delete_tasks = [async_client.delete(mem_id) for mem_id in memory_ids]
    await asyncio.gather(*delete_tasks)
    print("Batch cleanup completed")


await cleanup_batch_memories()

Cleaning up 4 memories...
Batch cleanup completed
