Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions api/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

"""Pub/Sub implementation"""

import logging
import asyncio

import json
Expand All @@ -14,6 +15,8 @@
from .models import Subscription, SubscriptionStats
from .config import PubSubSettings

logger = logging.getLogger(__name__)


class PubSub:
"""Pub/Sub implementation class
Expand All @@ -39,7 +42,9 @@ def __init__(self, host=None, db_number=None):
host = self._settings.redis_host
if db_number is None:
db_number = self._settings.redis_db_number
self._redis = aioredis.from_url(f'redis://{host}/{db_number}')
self._redis = aioredis.from_url(
f'redis://{host}/{db_number}', health_check_interval=30
)
# self._subscriptions is a dict that matches a subscription id
# (key) with a Subscription object ('sub') and a redis
# PubSub object ('redis_sub'). For instance:
Expand Down Expand Up @@ -135,9 +140,24 @@ async def listen(self, sub_id, user=None):
f"not owned by {user}")
while True:
self._subscriptions[sub_id]['last_poll'] = datetime.utcnow()
msg = await sub['redis_sub'].get_message(
ignore_subscribe_messages=True, timeout=1.0
)
msg = None
try:
msg = await sub['redis_sub'].get_message(
ignore_subscribe_messages=True, timeout=1.0
)
except aioredis.ConnectionError:
async with self._lock:
channel = self._subscriptions[sub_id]['sub'].channel
new_redis_sub = self._redis.pubsub()
await new_redis_sub.subscribe(channel)
self._subscriptions[sub_id]['redis_sub'] = new_redis_sub
sub['redis_sub'] = new_redis_sub
Comment on lines +148 to +154
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here the method creates a new subscription on connection error. How will the client receive the new sub ID?
The client will need it to keep listening and also to unsubscribe.
I think it's better to return None instead of creating a new subscription.
The client will need to subscribe again in this case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We’re not creating a new subscription ID here—only a new redis_sub object bound to the same sub_id.

On ConnectionError we resubscribe to the same channel and update self._subscriptions[sub_id]['redis_sub'], so the client keeps using the original sub_id for both listening and unsubscribe(). No new ID is generated or needed.
As i recall subscription id we generate artificially in python using counter in redis named kernelci-api-pubsub-id (ID_KEY). Redis Pub/Sub has no subscription IDs or message IDs.

continue
except aioredis.RedisError as exc:
# log the error and continue
logger.error("Redis error occurred: %s", exc)
return None # Handle any exceptions gracefully
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest to log something before returning None just to know afterwards what happened from the logs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


if msg is None:
continue
msg_data = json.loads(msg['data'])
Expand Down