Skip to content

Commit

Permalink
Fix race condition in once (#77)
Browse files Browse the repository at this point in the history
Add check that listener method is still registered before removing. To
ensure atomic execution a lock is added around removals of listeners.
  • Loading branch information
forslund committed Oct 8, 2020
1 parent fb45395 commit dc309b2
Showing 1 changed file with 25 additions and 8 deletions.
33 changes: 25 additions & 8 deletions pyee/_base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-

from collections import defaultdict, OrderedDict
from threading import Lock

__all__ = ['EventEmitter', 'PyeeException']

Expand Down Expand Up @@ -38,6 +39,7 @@ def on_error(message):
"""
def __init__(self):
self._events = defaultdict(OrderedDict)
self._lock = Lock()

def on(self, event, f=None):
"""Registers the function ``f`` to the event name ``event``.
Expand Down Expand Up @@ -72,7 +74,8 @@ def _add_event_handler(self, event, k, v):
# Note that k and v are the same for `on` handlers, but
# different for `once` handlers, where v is a wrapped version
# of k which removes itself before calling k
self._events[event][k] = v
with self._lock:
self._events[event][k] = v

def _emit_run(self, f, args, kwargs):
f(*args, **kwargs)
Expand All @@ -87,7 +90,9 @@ def _emit_handle_potential_error(self, event, error):
def _call_handlers(self, event, args, kwargs):
handled = False

for f in list(self._events[event].values()):
with self._lock:
funcs = list(self._events[event].values())
for f in funcs:
self._emit_run(f, args, kwargs)
handled = True

Expand Down Expand Up @@ -118,7 +123,13 @@ def once(self, event, f=None):
"""
def _wrapper(f):
def g(*args, **kwargs):
self.remove_listener(event, f)
with self._lock:
# Check that the event wasn't removed already right
# before the lock
if event in self._events and f in self._events[event]:
self._remove_listener(event, f)
else:
return None
# f may return a coroutine, so we need to return that
# result here so that emit can schedule it
return f(*args, **kwargs)
Expand All @@ -131,18 +142,24 @@ def g(*args, **kwargs):
else:
return _wrapper(f)

def _remove_listener(self, event, f):
"""Naked unprotected removal."""
self._events[event].pop(f)

def remove_listener(self, event, f):
"""Removes the function ``f`` from ``event``."""
self._events[event].pop(f)
with self._lock:
self._remove_listener(event, f)

def remove_all_listeners(self, event=None):
"""Remove all listeners attached to ``event``.
If ``event`` is ``None``, remove all listeners on all events.
"""
if event is not None:
self._events[event] = OrderedDict()
else:
self._events = defaultdict(OrderedDict)
with self._lock:
if event is not None:
self._events[event] = OrderedDict()
else:
self._events = defaultdict(OrderedDict)

def listeners(self, event):
"""Returns a list of all listeners registered to the ``event``.
Expand Down

0 comments on commit dc309b2

Please sign in to comment.