Skip to content

Commit

Permalink
Recoonect to redis when connection is lost
Browse files Browse the repository at this point in the history
Fixes #143
  • Loading branch information
miguelgrinberg committed Jan 15, 2018
1 parent 96d4e5f commit af13ef0
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 15 deletions.
48 changes: 37 additions & 11 deletions socketio/asyncio_redis_manager.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import asyncio
import logging
import pickle
from urllib.parse import urlparse

Expand All @@ -8,6 +10,8 @@

from .asyncio_pubsub_manager import AsyncPubSubManager

logger = logging.getLogger('socketio')


def _parse_redis_url(url):
p = urlparse(url)
Expand Down Expand Up @@ -59,17 +63,39 @@ def __init__(self, url='redis://localhost:6379/0', channel='socketio',
super().__init__(channel=channel, write_only=write_only)

async def _publish(self, data):
if self.pub is None:
self.pub = await aioredis.create_redis((self.host, self.port),
db=self.db,
password=self.password)
return await self.pub.publish(self.channel, pickle.dumps(data))
retry = True
while True:
try:
if self.pub is None:
self.pub = await aioredis.create_redis(
(self.host, self.port), db=self.db,
password=self.password)
return await self.pub.publish(self.channel,
pickle.dumps(data))
except (aioredis.RedisError, OSError):
if retry:
logger.error('Cannot publish to redis... retrying')
self.pub = None
retry = False
else:
logger.error('Cannot publish to redis... giving up')
break

async def _listen(self):
if self.sub is None:
self.sub = await aioredis.create_redis((self.host, self.port),
db=self.db,
password=self.password)
self.ch = (await self.sub.subscribe(self.channel))[0]
retry_sleep = 1
while True:
return await self.ch.get()
try:
if self.sub is None:
self.sub = await aioredis.create_redis(
(self.host, self.port), db=self.db,
password=self.password)
self.ch = (await self.sub.subscribe(self.channel))[0]
return await self.ch.get()
except (aioredis.RedisError, OSError):
logger.error('Cannot receive from redis... '
'retrying in {} secs'.format(retry_sleep))
self.sub = None
await asyncio.sleep(retry_sleep)
retry_sleep *= 2
if retry_sleep > 60:
retry_sleep = 60
47 changes: 43 additions & 4 deletions socketio/redis_manager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import logging
import pickle
import time

try:
import redis
Expand All @@ -7,6 +9,8 @@

from .pubsub_manager import PubSubManager

logger = logging.getLogger('socketio')


class RedisManager(PubSubManager): # pragma: no cover
"""Redis based client manager.
Expand Down Expand Up @@ -38,8 +42,8 @@ def __init__(self, url='redis://localhost:6379/0', channel='socketio',
raise RuntimeError('Redis package is not installed '
'(Run "pip install redis" in your '
'virtualenv).')
self.redis = redis.Redis.from_url(url)
self.pubsub = self.redis.pubsub()
self.redis_url = url
self._redis_connect()
super(RedisManager, self).__init__(channel=channel,
write_only=write_only)

Expand All @@ -58,13 +62,48 @@ def initialize(self):
'Redis requires a monkey patched socket library to work '
'with ' + self.server.async_mode)

def _redis_connect(self):
self.redis = redis.Redis.from_url(self.redis_url)
self.pubsub = self.redis.pubsub()

def _publish(self, data):
return self.redis.publish(self.channel, pickle.dumps(data))
retry = True
while True:
try:
if not retry:
self._redis_connect()
return self.redis.publish(self.channel, pickle.dumps(data))
except redis.exceptions.ConnectionError:
if retry:
logger.error('Cannot publish to redis... retrying')
retry = False
else:
logger.error('Cannot publish to redis... giving up')
break

def _redis_listen_with_retries(self):
retry_sleep = 1
connect = False
while True:
try:
if connect:
self._redis_connect()
self.pubsub.subscribe(self.channel)
for message in self.pubsub.listen():
yield message
except redis.exceptions.ConnectionError:
logger.error('Cannot receive from redis... '
'retrying in {} secs'.format(retry_sleep))
connect = True
time.sleep(retry_sleep)
retry_sleep *= 2
if retry_sleep > 60:
retry_sleep = 60

def _listen(self):
channel = self.channel.encode('utf-8')
self.pubsub.subscribe(self.channel)
for message in self.pubsub.listen():
for message in self._redis_listen_with_retries():
if message['channel'] == channel and \
message['type'] == 'message' and 'data' in message:
yield message['data']
Expand Down

0 comments on commit af13ef0

Please sign in to comment.