Skip to content

Commit

Permalink
Scheduler: Preserve period offset on restarts.
Browse files Browse the repository at this point in the history
Partial fix for GH-397.
  • Loading branch information
progval committed May 2, 2020
1 parent 3ecc18e commit ad05468
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 22 deletions.
40 changes: 36 additions & 4 deletions plugins/Scheduler/plugin.py
Expand Up @@ -29,6 +29,7 @@

import time
import os
import math
import shutil
import tempfile

Expand Down Expand Up @@ -56,6 +57,14 @@ def __init__(self, irc):
self._restoreEvents(irc)
world.flushers.append(self._flush)

def _getNextRunIn(self, first_run, now, period):
next_run_in = period - ((now - first_run) % period)
if next_run_in < 5:
# don't run immediatly, it might overwhelm the bot on
# startup.
next_run_in += period
return next_run_in

def _restoreEvents(self, irc):
try:
pkl = open(filename, 'rb')
Expand All @@ -82,8 +91,21 @@ def _restoreEvents(self, irc):
self._add(ircobj, event['msg'],
event['time'], event['command'], n)
elif event['type'] == 'repeat': # repeating event
now = time.time()
first_run = event.get('first_run')
if first_run is None:
# old DBs don't have a "first_run"; let's take "now" as
# first_run.
first_run = now

# Preserve the offset over restarts; eg. if event['time']
# is 24hours, we want to keep running the command at the
# same time of day.
next_run_in = self._getNextRunIn(
first_run, now, event['time'])

self._repeat(ircobj, event['msg'], name,
event['time'], event['command'], False)
event['time'], event['command'], first_run, next_run_in)
except AssertionError as e:
if str(e) == 'An event with the same name has already been scheduled.':
# we must be reloading the plugin, event is still scheduled
Expand Down Expand Up @@ -166,14 +188,24 @@ def remove(self, irc, msg, args, id):
irc.error(_('Invalid event id.'))
remove = wrap(remove, ['lowered'])

def _repeat(self, irc, msg, name, seconds, command, now=True):
def _repeat(self, irc, msg, name, seconds, command, first_run=None, next_run_in=None):
f = self._makeCommandFunction(irc, msg, command, remove=False)
id = schedule.addPeriodicEvent(f, seconds, name, now)
f_wrapper = schedule.schedule.makePeriodicWrapper(f, seconds, name)
if next_run_in is None:
assert first_run is None
# run immediately
id = f_wrapper()
first_run = time.time()
else:
assert first_run is not None
id = schedule.addEvent(f_wrapper, time.time() + next_run_in, name)
assert id == name
self.events[name] = {'command':command,
'msg':msg,
'time':seconds,
'type':'repeat'}
'type':'repeat',
'first_run': first_run,
}

@internationalizeDocstring
def repeat(self, irc, msg, args, name, seconds, command):
Expand Down
32 changes: 22 additions & 10 deletions plugins/Scheduler/test.py
Expand Up @@ -122,25 +122,37 @@ def testSinglePersistence(self):

def testRepeatPersistence(self):
self.assertRegexp(
'scheduler repeat repeater 5 echo testRepeat',
'scheduler repeat repeater 20 echo testRepeat',
'testRepeat')

self.assertNotError('unload Scheduler')
schedule.schedule.reset()
timeFastForward(20)
timeFastForward(30)
self.assertNoResponse(' ', timeout=1)

self.assertNotError('load Scheduler')
self.assertNoResponse(' ', timeout=1)
timeFastForward(2)
self.assertNoResponse(' ', timeout=1)
timeFastForward(2)
self.assertResponse(' ', 'testRepeat')
self.assertNoResponse(' ', timeout=1) # T+30 to T+31
timeFastForward(5)
self.assertNoResponse(' ', timeout=1) # T+36 to T+37
timeFastForward(5)
self.assertResponse(' ', 'testRepeat', timeout=1) # T+42

timeFastForward(3)
timeFastForward(15)
self.assertNoResponse(' ', timeout=1) # T+57 to T+58
timeFastForward(5)
self.assertResponse(' ', 'testRepeat', timeout=1) # T+64

self.assertNotError('unload Scheduler')
schedule.schedule.reset()
timeFastForward(20)
self.assertNoResponse(' ', timeout=1)
timeFastForward(2)
self.assertResponse(' ', 'testRepeat')

self.assertNotError('load Scheduler')
self.assertNoResponse(' ', timeout=1) # T+85 to T+86
timeFastForward(10)
self.assertNoResponse(' ', timeout=1) # T+95 to T+96
timeFastForward(10)
self.assertResponse(' ', 'testRepeat', timeout=1) # T+106



Expand Down
24 changes: 16 additions & 8 deletions src/schedule.py
Expand Up @@ -108,19 +108,27 @@ def rescheduleEvent(self, name, t):
f = self.removeEvent(name)
self.addEvent(f, t, name=name)

def addPeriodicEvent(self, f, t, name=None, now=True, args=[], kwargs={},
count=None):
"""Adds a periodic event that is called every t seconds."""
def wrapper(count):
def makePeriodicWrapper(
self, f, t, name=None, args=[], kwargs={}, count=None):
"""Returns a function that will run and re-schedule itself every t
seconds."""
def wrapper():
nonlocal count
try:
f(*args, **kwargs)
finally:
# Even if it raises an exception, let's schedule it.
if count[0] is not None:
count[0] -= 1
if count[0] is None or count[0] > 0:
if count is not None:
count -= 1
if count is None or count > 0:
return self.addEvent(wrapper, time.time() + t, name)
wrapper = functools.partial(wrapper, [count])
return wrapper

def addPeriodicEvent(
self, f, t, name=None, now=True, args=[], kwargs={}, count=None):
"""Adds a periodic event that is called every t seconds."""
wrapper = self.makePeriodicWrapper(
f, t, name, args, kwargs, count)
if now:
return wrapper()
else:
Expand Down

0 comments on commit ad05468

Please sign in to comment.