Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize memory usage during broadcasts #1233

Merged
merged 1 commit into from
Sep 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ packages = find:
python_requires = >=3.6
install_requires =
bidict >= 0.21.0
python-engineio >= 4.3.0
python-engineio >= 4.7.0

[options.packages.find]
where = src
Expand Down
47 changes: 38 additions & 9 deletions src/socketio/asyncio_manager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import asyncio

from engineio import packet as eio_packet
from socketio import packet
from .base_manager import BaseManager


Expand All @@ -17,18 +19,45 @@ async def emit(self, event, data, namespace, room=None, skip_sid=None,
"""
if namespace not in self.rooms:
return
tasks = []
if isinstance(data, tuple):
# tuples are expanded to multiple arguments, everything else is
# sent as a single argument
data = list(data)
elif data is not None:
data = [data]
else:
data = []
if not isinstance(skip_sid, list):
skip_sid = [skip_sid]
for sid, eio_sid in self.get_participants(namespace, room):
if sid not in skip_sid:
if callback is not None:
tasks = []
if not callback:
# when callbacks aren't used the packets sent to each recipient are
# identical, so they can be generated once and reused
pkt = self.server.packet_class(
packet.EVENT, namespace=namespace, data=[event] + data)
encoded_packet = pkt.encode()
if not isinstance(encoded_packet, list):
encoded_packet = [encoded_packet]
eio_pkt = [eio_packet.Packet(eio_packet.MESSAGE, p)
for p in encoded_packet]
for sid, eio_sid in self.get_participants(namespace, room):
if sid not in skip_sid:
for p in eio_pkt:
tasks.append(asyncio.create_task(
self.server._send_eio_packet(eio_sid, p)))
else:
# callbacks are used, so each recipient must be sent a packet that
# contains a unique callback id
# note that callbacks when addressing a group of people are
# implemented but not tested or supported
for sid, eio_sid in self.get_participants(namespace, room):
if sid not in skip_sid: # pragma: no branch
id = self._generate_ack_id(sid, callback)
else:
id = None
tasks.append(asyncio.create_task(
self.server._emit_internal(eio_sid, event, data,
namespace, id)))
pkt = self.server.packet_class(
packet.EVENT, namespace=namespace, data=[event] + data,
id=id)
tasks.append(asyncio.create_task(
self.server._send_packet(eio_sid, pkt)))
if tasks == []: # pragma: no cover
return
await asyncio.wait(tasks)
Expand Down
17 changes: 4 additions & 13 deletions src/socketio/asyncio_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,19 +424,6 @@ async def sleep(self, seconds=0):
"""
return await self.eio.sleep(seconds)

async def _emit_internal(self, sid, event, data, namespace=None, id=None):
"""Send a message to a client."""
# tuples are expanded to multiple arguments, everything else is sent
# as a single argument
if isinstance(data, tuple):
data = list(data)
elif data is not None:
data = [data]
else:
data = []
await self._send_packet(sid, self.packet_class(
packet.EVENT, namespace=namespace, data=[event] + data, id=id))

async def _send_packet(self, eio_sid, pkt):
"""Send a Socket.IO packet to a client."""
encoded_packet = pkt.encode()
Expand All @@ -446,6 +433,10 @@ async def _send_packet(self, eio_sid, pkt):
else:
await self.eio.send(eio_sid, encoded_packet)

async def _send_eio_packet(self, eio_sid, eio_pkt):
"""Send a raw Engine.IO packet to a client."""
await self.eio.send_packet(eio_sid, eio_pkt)

async def _handle_connect(self, eio_sid, namespace, data):
"""Handle a client connection request."""
namespace = namespace or '/'
Expand Down
41 changes: 35 additions & 6 deletions src/socketio/base_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import logging

from bidict import bidict, ValueDuplicationError
from engineio import packet as eio_packet
from socketio import packet

default_logger = logging.getLogger('socketio')

Expand Down Expand Up @@ -161,15 +163,42 @@ def emit(self, event, data, namespace, room=None, skip_sid=None,
connected to the namespace."""
if namespace not in self.rooms:
return
if isinstance(data, tuple):
# tuples are expanded to multiple arguments, everything else is
# sent as a single argument
data = list(data)
elif data is not None:
data = [data]
else:
data = []
if not isinstance(skip_sid, list):
skip_sid = [skip_sid]
for sid, eio_sid in self.get_participants(namespace, room):
if sid not in skip_sid:
if callback is not None:
if not callback:
# when callbacks aren't used the packets sent to each recipient are
# identical, so they can be generated once and reused
pkt = self.server.packet_class(
packet.EVENT, namespace=namespace, data=[event] + data)
encoded_packet = pkt.encode()
if not isinstance(encoded_packet, list):
encoded_packet = [encoded_packet]
eio_pkt = [eio_packet.Packet(eio_packet.MESSAGE, p)
for p in encoded_packet]
for sid, eio_sid in self.get_participants(namespace, room):
if sid not in skip_sid:
for p in eio_pkt:
self.server._send_eio_packet(eio_sid, p)
else:
# callbacks are used, so each recipient must be sent a packet that
# contains a unique callback id
# note that callbacks when addressing a group of people are
# implemented but not tested or supported
for sid, eio_sid in self.get_participants(namespace, room):
if sid not in skip_sid: # pragma: no branch
id = self._generate_ack_id(sid, callback)
else:
id = None
self.server._emit_internal(eio_sid, event, data, namespace, id)
pkt = self.server.packet_class(
packet.EVENT, namespace=namespace, data=[event] + data,
id=id)
self.server._send_packet(eio_sid, pkt)

def trigger_callback(self, sid, id, data):
"""Invoke an application callback."""
Expand Down
21 changes: 6 additions & 15 deletions src/socketio/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ def on(self, event, handler=None, namespace=None):
Example usage::

# as a decorator:
@socket_io.on('connect', namespace='/chat')
@sio.on('connect', namespace='/chat')
def connect_handler(sid, environ):
print('Connection request')
if environ['REMOTE_ADDR'] in blacklisted:
Expand All @@ -194,7 +194,7 @@ def connect_handler(sid, environ):
# as a method:
def message_handler(sid, msg):
print('Received message: ', msg)
eio.send(sid, 'response')
sio.send(sid, 'response')
socket_io.on('message', namespace='/chat', handler=message_handler)

The handler function receives the ``sid`` (session ID) for the
Expand Down Expand Up @@ -633,19 +633,6 @@ def sleep(self, seconds=0):
"""
return self.eio.sleep(seconds)

def _emit_internal(self, eio_sid, event, data, namespace=None, id=None):
"""Send a message to a client."""
# tuples are expanded to multiple arguments, everything else is sent
# as a single argument
if isinstance(data, tuple):
data = list(data)
elif data is not None:
data = [data]
else:
data = []
self._send_packet(eio_sid, self.packet_class(
packet.EVENT, namespace=namespace, data=[event] + data, id=id))

def _send_packet(self, eio_sid, pkt):
"""Send a Socket.IO packet to a client."""
encoded_packet = pkt.encode()
Expand All @@ -655,6 +642,10 @@ def _send_packet(self, eio_sid, pkt):
else:
self.eio.send(eio_sid, encoded_packet)

def _send_eio_packet(self, eio_sid, eio_pkt):
"""Send a raw Engine.IO packet to a client."""
self.eio.send_packet(eio_sid, eio_pkt)

def _handle_connect(self, eio_sid, namespace, data):
"""Handle a client connection request."""
namespace = namespace or '/'
Expand Down
Loading