Test async callback

In [1]:
import os
os.environ["REDIS_URL"] = "redis://localhost:6379/0"
from shared_utils.clients.redis_client import redis_client

In [2]:
import uuid
from pydantic import BaseModel

class TestMessage(BaseModel):
    id: str
    value: str

stream_name = f"test-stream-{uuid.uuid4()}"
group_id = "test-group"
event = TestMessage(id="2", value="test-2")

print(stream_name, group_id, event, sep="\n")

test-stream-316e93d2-078a-472b-a760-ad220396ccff
test-group
id='2' value='test-2'


In [None]:
# create a new consumer thread for the stream
redis_client.create_consumer(stream_name, group_id, max_concurrent_tasks=1)

In [None]:
# register the callback to the consumer thread

from shared_utils import deserialize_event
import asyncio
@deserialize_event(TestMessage)
async def test_callback(event: TestMessage):
    print(f"Processing message {event.id} with value: {event.value}")
    await asyncio.sleep(1)
    print(f"Finished processing message {event.id}")

redis_client.register_callback(stream=stream_name, group_id=group_id, callback=test_callback)

In [None]:
# send a message to the stream

msg = event
redis_client.send(stream=stream_name, value=msg)

-----------

Test synchronouse callback

In [None]:
import time
import logging 
from shared_utils import deserialize_event

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

@deserialize_event(TestMessage)
def test_sync_callback(event: TestMessage):
    # use logging since its thread-safe
    logging.info(f"Processing message {event.id} with value: {event.value}")
    time.sleep(1) # this should block the thread created for this callback
    logging.info(f"Finished processing message {event.id}")

redis_client.register_callback(stream=stream_name, group_id=group_id, callback=test_sync_callback)

In [None]:
# send a message to the stream

msg = event
redis_client.send(stream=stream_name, value=msg)

-------------

Test DB operations with concurrency

In [3]:
# Test async db client
from shared_utils.clients.async_db_client import async_db_client
import uuid
from pydantic import BaseModel

stream_name = f"test-stream-{uuid.uuid4()}"
group_id = "test-group"
doc_id = "2e405a54-d721-42da-b272-c12ee35b3c6d"

class GetDocumentMessage(BaseModel):
    doc_id: str


event = GetDocumentMessage(doc_id=doc_id)

print(stream_name, group_id, event, sep="\n")

test-stream-e120bba6-712d-4965-9403-f86d619f50be
test-group
doc_id='2e405a54-d721-42da-b272-c12ee35b3c6d'


In [4]:
from sqlmodel import Session, select
from shared_utils.sql_models import DocumentMetadata

async def get_document(document_id:str) -> DocumentMetadata | None:
    async with async_db_client.scoped_session() as async_session:
        statement = select(DocumentMetadata).where(DocumentMetadata.document_id == document_id)
        result = (await async_session.execute(statement)).scalar_one_or_none()
        return result if result else None

In [5]:
# create an async callback for the stream
from shared_utils import deserialize_event
from shared_utils.sql_models import DocumentMetadata
import logging

@deserialize_event(GetDocumentMessage)
async def test_async_callback(event: GetDocumentMessage):
    logging.info(f"Getting DocumentMetadata from db with doc_id: {event.doc_id}")

    document:DocumentMetadata = await get_document(event.doc_id)

    if document:
        logging.info(f"Found document with ID {document.document_id} and s3_key: {document.s3_key}")
    else:
        logging.info(f"No document found with ID {event.doc_id}")

In [6]:
# create the stream, register the callback
import os
os.environ["REDIS_URL"] = "redis://localhost:6379/0"
from shared_utils.clients.redis_client import redis_client

redis_client.create_consumer(stream_name, group_id, max_concurrent_tasks=1)
redis_client.register_callback(stream=stream_name, group_id=group_id, callback=test_async_callback)

2025-07-19 19:33:29,817 [RedisConsumer-test-stream-e120bba6-712d-4965-9403-f86d619f50be] INFO: Getting DocumentMetadata from db with doc_id: 2e405a54-d721-42da-b272-c12ee35b3c6d
2025-07-19 19:33:29,862 [RedisConsumer-test-stream-e120bba6-712d-4965-9403-f86d619f50be] INFO: Found document with ID 2e405a54-d721-42da-b272-c12ee35b3c6d and s3_key: uploads/quantum_example.pdf


In [7]:
# send the event

redis_client.send(stream=stream_name, value=event)