Skip to content

Commit

Permalink
Add pipeline_publish_message method to redis_store
Browse files Browse the repository at this point in the history
  • Loading branch information
Ian Gabaraev committed Nov 24, 2021
1 parent 318dde3 commit 30faf39
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 0 deletions.
22 changes: 22 additions & 0 deletions docs/usage.rst
Expand Up @@ -219,6 +219,28 @@ not trigger any server side event. A practical use would be to store current the
a moving client inside the Redis datastore. Then Django can fetch these coordinates from Redis,
whenever it requires them.

Pipeline Message Publication
---------------------------------
If publishing to many channels (users, groups, sessions) within one call to the publish_message
method consider using the pipeline_publish_message method instead to save round trips to the Redis store.

.. code-block:: python
redis_publisher = RedisPublisher(
facility='foobar',
users=[4'john', 'mary', 'joe', 'sue', 'sally','bobby', 'fred', 'james', 'chris', 'joseph']
)
message = RedisMessage('Hello World')
redis_publisher.pipeline_publish_message(message, transaction=False)
A traditional publish_message call with 10 users above would make 20 network round trips to Redis.
The pipeline_publish_message method, however, only makes 1 network round trip to Redis
to push a single message to the 10 users.

The transaction parameter at the end is default.
If transaction is set to True, the default behavior for Redis pipelines, all messages
must be successfully published or none of them will be.
If you prefer that some messages instead of none are delivered in the case of a single failure,
set transaction=False to override the default transactional behavior.

.. code-block:: python
# if the publisher is required only for fetching messages, use an
Expand Down
74 changes: 74 additions & 0 deletions examples/chatserver/tests/test_chatclient.py
Expand Up @@ -66,6 +66,20 @@ def test_subscribe_broadcast(self):
ws.close()
self.assertFalse(ws.connected)

def test_subscribe_pipeline_broadcast(self):
audience = {'broadcast': True}
publisher = RedisPublisher(facility=self.facility, **audience)
publisher.pipeline_publish_message(self.message, 10)
websocket_url = self.websocket_base_url + u'?subscribe-broadcast'
ws = create_connection(websocket_url)
self.assertTrue(ws.connected)
result = ws.recv()
if six.PY3:
self.message = self.message.decode()
self.assertEqual(result, self.message)
ws.close()
self.assertFalse(ws.connected)

def test_pubsub_broadcast(self):
websocket_url = self.websocket_base_url + u'?subscribe-broadcast&publish-broadcast'
ws = create_connection(websocket_url)
Expand Down Expand Up @@ -111,6 +125,25 @@ def test_subscribe_user(self):
ws.close()
self.assertFalse(ws.connected)

def test_subscribe_pipeline_user(self):
logged_in = self.client.login(username='john', password='secret')
self.assertTrue(logged_in, 'John is not logged in')
request = self.factory.get('/chat/')
request.user = User.objects.get(username='mary')
audience = {'users': ['john', 'mary']}
publisher = RedisPublisher(request=request, facility=self.facility, **audience)
publisher.pipeline_publish_message(self.message, 10)
websocket_url = self.websocket_base_url + u'?subscribe-user'
header = ['Cookie: sessionid={0}'.format(self.client.cookies['sessionid'].coded_value)]
ws = create_connection(websocket_url, header=header)
self.assertTrue(ws.connected)
result = ws.recv()
if six.PY3:
self.message = self.message.decode()
self.assertEqual(result, self.message)
ws.close()
self.assertFalse(ws.connected)

def test_publish_user(self):
logged_in = self.client.login(username='john', password='secret')
self.assertTrue(logged_in, 'John is not logged in')
Expand Down Expand Up @@ -149,6 +182,25 @@ def test_subscribe_group(self):
ws.close()
self.assertFalse(ws.connected)

def test_subscribe_pipeline_group(self):
logged_in = self.client.login(username='john', password='secret')
self.assertTrue(logged_in, 'John is not logged in')
request = self.factory.get('/chat/')
request.user = User.objects.get(username='mary')
audience = {'groups': ['chatters']}
publisher = RedisPublisher(request=request, facility=self.facility, **audience)
publisher.pipeline_publish_message(self.message, 10)
websocket_url = self.websocket_base_url + u'?subscribe-group'
header = ['Cookie: sessionid={0}'.format(self.client.cookies['sessionid'].coded_value)]
ws = create_connection(websocket_url, header=header)
self.assertTrue(ws.connected)
result = ws.recv()
if six.PY3:
self.message = self.message.decode()
self.assertEqual(result, self.message)
ws.close()
self.assertFalse(ws.connected)

def test_publish_group(self):
logged_in = self.client.login(username='john', password='secret')
self.assertTrue(logged_in, 'John is not logged in')
Expand Down Expand Up @@ -190,6 +242,28 @@ def test_subscribe_session(self):
ws.close()
self.assertFalse(ws.connected)

def test_subscribe_pipeline_session(self):
logged_in = self.client.login(username='john', password='secret')
self.assertTrue(logged_in, 'John is not logged in')
self.assertIsInstance(self.client.session, (dict, type(self.session)), 'Did not receive a s ession key')
session_key = self.client.session.session_key
self.assertGreater(len(session_key), 30, 'Session key is too short')
request = self.factory.get('/chat/')
request.session = self.client.session
audience = {'sessions': [SELF]}
publisher = RedisPublisher(request=request, facility=self.facility, **audience)
publisher.pipeline_publish_message(self.message, 10)
websocket_url = self.websocket_base_url + u'?subscribe-session'
header = ['Cookie: sessionid={0}'.format(session_key)]
ws = create_connection(websocket_url, header=header)
self.assertTrue(ws.connected)
result = ws.recv()
if six.PY3:
self.message = self.message.decode()
self.assertEqual(result, self.message)
ws.close()
self.assertFalse(ws.connected)

def test_publish_session(self):
logged_in = self.client.login(username='mary', password='secret')
self.assertTrue(logged_in, 'Mary is not logged in')
Expand Down
21 changes: 21 additions & 0 deletions ws4redis/redis_store.py
Expand Up @@ -113,6 +113,27 @@ def publish_message(self, message, expire=None):
if expire > 0:
self._connection.setex(channel, expire, message)

def pipeline_publish_message(
self, message, expire=None, transaction=True
):
"""
Like publish_message, but pipelines all publish and setex
Redis commands to save round trips to the Redis datastore.
"""
if expire is None:
expire = self._expire
if not isinstance(message, RedisMessage):
raise ValueError('message object is not of type RedisMessage')

pipeline = self._connection.pipeline(transaction=transaction)

for channel in self._publishers:
pipeline.publish(channel, message)
if expire > 0:
pipeline.setex(channel, expire, message)

pipeline.execute()

@staticmethod
def get_prefix():
return settings.WS4REDIS_PREFIX and '{0}:'.format(settings.WS4REDIS_PREFIX) or ''
Expand Down

0 comments on commit 30faf39

Please sign in to comment.