Skip to content

Commit

Permalink
Fix AsyncioSelectorReactor.callLater().reset() and fix memory leaks
Browse files Browse the repository at this point in the history
Author: IlyaSkriblovsky
Reviewer: wiml, roderigc
Fixes: ticket:9780
  • Loading branch information
wiml committed Apr 22, 2020
2 parents 07910d7 + 2a4a48b commit bc7fef3
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 42 deletions.
72 changes: 30 additions & 42 deletions src/twisted/internet/asyncioreactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from zope.interface import implementer

from twisted.logger import Logger
from twisted.internet.base import DelayedCall
from twisted.internet.posixbase import (PosixReactorBase, _NO_FILEDESC,
_ContinuousPolling)
from twisted.python.log import callWithLogger
Expand All @@ -30,24 +29,6 @@
from builtins import PermissionError, BrokenPipeError


class _DCHandle(object):
"""
Wraps ephemeral L{asyncio.Handle} instances. Callbacks can close
over this and use it as a mutable reference to asyncio C{Handles}.
@ivar handle: The current L{asyncio.Handle}
"""
def __init__(self, handle):
self.handle = handle


def cancel(self):
"""
Cancel the inner L{asyncio.Handle}.
"""
self.handle.cancel()



@implementer(IReactorFDSet)
class AsyncioSelectorReactor(PosixReactorBase):
Expand All @@ -65,8 +46,11 @@ def __init__(self, eventloop=None):
self._asyncioEventloop = eventloop
self._writers = {}
self._readers = {}
self._delayedCalls = set()
self._continuousPolling = _ContinuousPolling(self)

self._scheduledAt = None
self._timerHandle = None

super().__init__()


Expand Down Expand Up @@ -253,10 +237,6 @@ def getWriters(self):
self._continuousPolling.getWriters())


def getDelayedCalls(self):
return list(self._delayedCalls)


def iterate(self, timeout):
self._asyncioEventloop.call_later(timeout + 0.01,
self._asyncioEventloop.stop)
Expand All @@ -272,7 +252,9 @@ def run(self, installSignalHandlers=True):

def stop(self):
super().stop()
self.callLater(0, self.fireSystemEvent, "shutdown")
# This will cause runUntilCurrent which in its turn
# will call fireSystemEvent("shutdown")
self.callLater(0, lambda: None)


def crash(self):
Expand All @@ -282,24 +264,30 @@ def crash(self):

seconds = staticmethod(runtimeSeconds)

def _onTimer(self):
self._scheduledAt = None
self.runUntilCurrent()
self._reschedule()

def _reschedule(self):
timeout = self.timeout()
if timeout is not None:
abs_time = self._asyncioEventloop.time() + timeout
self._scheduledAt = abs_time
if self._timerHandle is not None:
self._timerHandle.cancel()
self._timerHandle = self._asyncioEventloop.call_at(
abs_time, self._onTimer)

def _moveCallLaterSooner(self, tple):
PosixReactorBase._moveCallLaterSooner(self, tple)
self._reschedule()

def callLater(self, seconds, f, *args, **kwargs):
def run():
dc.called = True
self._delayedCalls.remove(dc)
f(*args, **kwargs)
handle = self._asyncioEventloop.call_later(seconds, run)
dchandle = _DCHandle(handle)

def cancel(dc):
self._delayedCalls.remove(dc)
dchandle.cancel()

def reset(dc):
dchandle.handle = self._asyncioEventloop.call_at(dc.time, run)

dc = DelayedCall(self.seconds() + seconds, run, (), {},
cancel, reset, seconds=self.seconds)
self._delayedCalls.add(dc)
dc = PosixReactorBase.callLater(self, seconds, f, *args, **kwargs)
abs_time = self._asyncioEventloop.time() + self.timeout()
if self._scheduledAt is None or abs_time < self._scheduledAt:
self._reschedule()
return dc


Expand Down
72 changes: 72 additions & 0 deletions src/twisted/internet/test/test_asyncioreactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"""
Tests for L{twisted.internet.asyncioreactor}.
"""
import gc

from twisted.trial.unittest import SynchronousTestCase
from .reactormixins import ReactorBuilder
Expand Down Expand Up @@ -47,6 +48,7 @@ def completed(future):
self.runReactor(reactor, timeout=1)
self.assertEqual(result, [True])


def test_seconds(self):
"""L{seconds} should return a plausible epoch time."""
reactor = AsyncioSelectorReactor()
Expand All @@ -57,3 +59,73 @@ def test_seconds(self):

# less than 2120-01-01
self.assertLess(result, 4733510400)


def test_delayedCallResetToLater(self):
"""
L{DelayedCall.reset()} properly reschedules timer to later time
"""
reactor = AsyncioSelectorReactor()

timer_called_at = [None]

def on_timer():
timer_called_at[0] = reactor.seconds()

start_time = reactor.seconds()
dc = reactor.callLater(0, on_timer)
dc.reset(0.5)
reactor.callLater(1, reactor.stop)
reactor.run()

self.assertIsNotNone(timer_called_at[0])
self.assertGreater(timer_called_at[0] - start_time, 0.4)


def test_delayedCallResetToEarlier(self):
"""
L{DelayedCall.reset()} properly reschedules timer to earlier time
"""
reactor = AsyncioSelectorReactor()

timer_called_at = [None]

def on_timer():
timer_called_at[0] = reactor.seconds()

start_time = reactor.seconds()
dc = reactor.callLater(0.5, on_timer)
dc.reset(0)
reactor.callLater(1, reactor.stop)

import io
from contextlib import redirect_stderr
stderr = io.StringIO()
with redirect_stderr(stderr):
reactor.run()

self.assertEqual(stderr.getvalue(), '')
self.assertIsNotNone(timer_called_at[0])
self.assertLess(timer_called_at[0] - start_time, 0.4)


def test_noCycleReferencesInCallLater(self):
"""
L{AsyncioSelectorReactor.callLater()} doesn't leave cyclic references
"""
gc_was_enabled = gc.isenabled()
gc.disable()
try:
objects_before = len(gc.get_objects())
timer_count = 1000

reactor = AsyncioSelectorReactor()
for _ in range(timer_count):
reactor.callLater(0, lambda: None)
reactor.runUntilCurrent()

objects_after = len(gc.get_objects())
self.assertLess((objects_after - objects_before) / timer_count, 1)
finally:
if gc_was_enabled:
gc.enable()
1 change: 1 addition & 0 deletions src/twisted/newsfragments/9780.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DelayedCall.reset() is now working properly with asyncioreactor

0 comments on commit bc7fef3

Please sign in to comment.