Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

updating listeners should be thread safe #174

Merged
merged 1 commit into from
Oct 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion stomp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import stomp.connect as connect
import stomp.listener as listener

__version__ = (4, 1, 20)
__version__ = (4, 1, 21)

##
# Alias for STOMP 1.0 connections.
Expand Down
71 changes: 38 additions & 33 deletions stomp/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def __init__(self, wait_on_receipt=False, auto_decode=True):
# function for creating threads used by the connection
self.create_thread_fc = utils.default_create_thread

self.__listeners_change_condition = threading.Condition()
self.__receiver_thread_exit_condition = threading.Condition()
self.__receiver_thread_exited = False
self.__send_wait_condition = threading.Condition()
Expand Down Expand Up @@ -152,15 +153,17 @@ def set_listener(self, name, listener):
:param str name: the name of the listener
:param ConnectionListener listener: the listener object
"""
self.listeners[name] = listener
with self.__listeners_change_condition:
self.listeners[name] = listener

def remove_listener(self, name):
"""
Remove a listener according to the specified name

:param str name: the name of the listener to remove
"""
del self.listeners[name]
with self.__listeners_change_condition:
del self.listeners[name]

def get_listener(self, name):
"""
Expand Down Expand Up @@ -218,44 +221,46 @@ def notify(self, frame_type, headers=None, body=None):
elif frame_type == 'disconnected':
self.set_connected(False)

for listener in self.listeners.values():
if not listener:
continue

notify_func = getattr(listener, 'on_%s' % frame_type, None)
if not notify_func:
log.debug("listener %s has no method on_%s", listener, frame_type)
continue
if frame_type in ('heartbeat', 'disconnected'):
notify_func()
continue
if frame_type == 'connecting':
notify_func(self.current_host_and_port)
continue

if frame_type == 'error' and not self.connected:
with self.__connect_wait_condition:
self.connection_error = True
self.__connect_wait_condition.notify()

rtn = notify_func(headers, body)
if rtn:
(headers, body) = rtn
return (headers, body)
with self.__listeners_change_condition:
for listener in self.listeners.values():
if not listener:
continue

notify_func = getattr(listener, 'on_%s' % frame_type, None)
if not notify_func:
log.debug("listener %s has no method on_%s", listener, frame_type)
continue
if frame_type in ('heartbeat', 'disconnected'):
notify_func()
continue
if frame_type == 'connecting':
notify_func(self.current_host_and_port)
continue

if frame_type == 'error' and not self.connected:
with self.__connect_wait_condition:
self.connection_error = True
self.__connect_wait_condition.notify()

rtn = notify_func(headers, body)
if rtn:
(headers, body) = rtn
return (headers, body)

def transmit(self, frame):
"""
Convert a frame object to a frame string and transmit to the server.

:param Frame frame: the Frame object to transmit
"""
for listener in self.listeners.values():
if not listener:
continue
try:
listener.on_send(frame)
except AttributeError:
continue
with self.__listeners_change_condition:
for listener in self.listeners.values():
if not listener:
continue
try:
listener.on_send(frame)
except AttributeError:
continue

lines = utils.convert_frame_to_lines(frame)

Expand Down