Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updater improvements #1018

Merged
merged 8 commits into from
Mar 2, 2018
174 changes: 108 additions & 66 deletions telegram/ext/updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,15 +149,15 @@ def _thread_wrapper(self, target, *args, **kwargs):
target(*args, **kwargs)
except Exception:
self.__exception_event.set()
self.logger.exception('unhandled exception')
self.logger.exception('unhandled exception in %s', thr_name)
raise
self.logger.debug('{0} - ended'.format(thr_name))

def start_polling(self,
poll_interval=0.0,
timeout=10,
clean=False,
bootstrap_retries=0,
bootstrap_retries=-1,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring doesn;t reflect this change to the default value

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will fix

read_latency=2.,
allowed_updates=None):
"""Starts polling updates from Telegram.
Expand All @@ -171,8 +171,8 @@ def start_polling(self,
bootstrap_retries (:obj:`int`, optional): Whether the bootstrapping phase of the
`Updater` will retry on failures on the Telegram server.

* < 0 - retry indefinitely
* 0 - no retries (default)
* < 0 - retry indefinitely (default)
* 0 - no retries
* > 0 - retry up to X times

allowed_updates (List[:obj:`str`], optional): Passed to
Expand Down Expand Up @@ -229,8 +229,8 @@ def start_webhook(self,
bootstrap_retries (:obj:`int`, optional): Whether the bootstrapping phase of the
`Updater` will retry on failures on the Telegram server.

* < 0 - retry indefinitely
* 0 - no retries (default)
* < 0 - retry indefinitely (default)
* 0 - no retries
* > 0 - retry up to X times

webhook_url (:obj:`str`, optional): Explicitly specify the webhook url. Useful behind
Expand All @@ -242,7 +242,6 @@ def start_webhook(self,
:obj:`Queue`: The update queue that can be filled from the main thread.

"""

with self.__lock:
if not self.running:
self.running = True
Expand All @@ -262,46 +261,72 @@ def _start_polling(self, poll_interval, timeout, read_latency, bootstrap_retries
# updates from Telegram and inserts them in the update queue of the
# Dispatcher.

cur_interval = poll_interval
self.logger.debug('Updater thread started')
self.logger.debug('Updater thread started (polling)')

self._bootstrap(bootstrap_retries, clean=clean, webhook_url='', allowed_updates=None)

self.logger.debug('Bootstrap done')

def polling_action_cb():
updates = self.bot.get_updates(
self.last_update_id, timeout=timeout, read_latency=read_latency,
allowed_updates=allowed_updates)

if updates:
if not self.running:
self.logger.debug('Updates ignored and will be pulled again on restart')
else:
for update in updates:
self.update_queue.put(update)
self.last_update_id = updates[-1].update_id + 1

return True

def polling_onerr_cb(exc):
# Put the error into the update queue and let the Dispatcher
# broadcast it
self.update_queue.put(exc)

self._network_loop_retry(polling_action_cb, polling_onerr_cb, 'getting Updates',
poll_interval)

def _network_loop_retry(self, action_cb, onerr_cb, description, interval):
"""Perform a loop calling `action_cb`, retrying after network errors.

Stop condition for loop: `self.running` evaluates False or return value of `action_cb`
evaluates False.

Args:
action_cb (:obj:`callable`): Network oriented callback function to call.
onerr_cb (:obj:`callable`): Callback to call when TelegramError is caught. Receives the
exception object as a parameter.
description (:obj:`str`): Description text to use for logs and exception raised.
interval (:obj:`float` | :obj:`int`): Interval to sleep between each call to
`action_cb`.

"""
self.logger.debug('Start network loop retry %s', description)
cur_interval = interval
while self.running:
try:
updates = self.bot.get_updates(
self.last_update_id,
timeout=timeout,
read_latency=read_latency,
allowed_updates=allowed_updates)
if not action_cb():
break
except RetryAfter as e:
self.logger.info(str(e))
self.logger.info('%s', e)
cur_interval = 0.5 + e.retry_after
except TimedOut as toe:
self.logger.debug('Timed out getting Updates: %s', toe)
# If get_updates() failed due to timeout, we should retry asap.
self.logger.debug('Timed out %s: %s', description, toe)
# If failure is due to timeout, we should retry asap.
cur_interval = 0
except InvalidToken as pex:
self.logger.error('Invalid token; aborting')
raise pex
except TelegramError as te:
self.logger.error('Error while getting Updates: %s', te)

# Put the error into the update queue and let the Dispatcher
# broadcast it
self.update_queue.put(te)

self.logger.error('Error while %s: %s', description, te)
onerr_cb(te)
cur_interval = self._increase_poll_interval(cur_interval)
else:
if not self.running:
if len(updates) > 0:
self.logger.debug('Updates ignored and will be pulled '
'again on restart.')
break

if updates:
for update in updates:
self.update_queue.put(update)
self.last_update_id = updates[-1].update_id + 1

cur_interval = poll_interval
cur_interval = interval

if cur_interval:
sleep(cur_interval)
Expand All @@ -319,7 +344,7 @@ def _increase_poll_interval(current_interval):

def _start_webhook(self, listen, port, url_path, cert, key, bootstrap_retries, clean,
webhook_url, allowed_updates):
self.logger.debug('Updater thread started')
self.logger.debug('Updater thread started (webhook)')
use_ssl = cert is not None and key is not None
if not url_path.startswith('/'):
url_path = '/{0}'.format(url_path)
Expand Down Expand Up @@ -370,39 +395,56 @@ def _check_ssl_cert(self, cert, key):
def _gen_webhook_url(listen, port, url_path):
return 'https://{listen}:{port}{path}'.format(listen=listen, port=port, path=url_path)

def _bootstrap(self, max_retries, clean, webhook_url, allowed_updates, cert=None):
retries = 0
while 1:

try:
if clean:
# Disable webhook for cleaning
self.bot.delete_webhook()
self._clean_updates()
sleep(1)

self.bot.set_webhook(
url=webhook_url, certificate=cert, allowed_updates=allowed_updates)
except (Unauthorized, InvalidToken):
raise
except TelegramError:
msg = 'error in bootstrap phase; try={0} max_retries={1}'.format(retries,
max_retries)
if max_retries < 0 or retries < max_retries:
self.logger.warning(msg)
retries += 1
else:
self.logger.exception(msg)
raise
def _bootstrap(self, max_retries, clean, webhook_url, allowed_updates, cert=None,
bootstrap_interval=5):
retries = [0]

def bootstrap_del_webhook():
self.bot.delete_webhook()
return False

def bootstrap_clean_updates():
self.logger.debug('Cleaning updates from Telegram server')
updates = self.bot.get_updates()
while updates:
updates = self.bot.get_updates(updates[-1].update_id + 1)
return False

def bootstrap_set_webhook():
self.bot.set_webhook(
url=webhook_url, certificate=cert, allowed_updates=allowed_updates)
return False

def bootstrap_onerr_cb(exc):
if not isinstance(exc, Unauthorized) and (max_retries < 0 or retries[0] < max_retries):
retries[0] += 1
self.logger.warning('Failed bootstrap phase; try=%s max_retries=%s',
retries[0], max_retries)
else:
break
self.logger.error('Failed bootstrap phase after %s retries (%s)', retries[0], exc)
raise exc

# Cleaning pending messages is done by polling for them - so we need to delete webhook if
# one is configured.
# We also take this chance to delete pre-configured webhook if this is a polling Updater.
# NOTE: We don't know ahead if a webhook is configured, so we just delete.
if clean or not webhook_url:
self._network_loop_retry(bootstrap_del_webhook, bootstrap_onerr_cb,
'bootstrap del webhook', bootstrap_interval)
retries[0] = 0

# Clean pending messages, if requested.
if clean:
self._network_loop_retry(bootstrap_clean_updates, bootstrap_onerr_cb,
'bootstrap clean updates', bootstrap_interval)
retries[0] = 0
sleep(1)

def _clean_updates(self):
self.logger.debug('Cleaning updates from Telegram server')
updates = self.bot.get_updates()
while updates:
updates = self.bot.get_updates(updates[-1].update_id + 1)
# Restore/set webhook settings, if needed. Again, we don't know ahead if a webhook is set,
# so we set it anyhow.
if webhook_url:
self._network_loop_retry(bootstrap_set_webhook, bootstrap_onerr_cb,
'bootstrap set webhook', bootstrap_interval)

def stop(self):
"""Stops the polling/webhook thread, the dispatcher and the job queue."""
Expand Down
90 changes: 79 additions & 11 deletions tests/test_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from functools import partial
from queue import Queue
from random import randrange
from threading import Thread
from threading import Thread, Event
from time import sleep

try:
Expand All @@ -38,7 +38,7 @@
from future.builtins import bytes

from telegram import TelegramError, Message, User, Chat, Update, Bot
from telegram.error import Unauthorized, InvalidToken
from telegram.error import Unauthorized, InvalidToken, TimedOut, RetryAfter
from telegram.ext import Updater

signalskip = pytest.mark.skipif(sys.platform == 'win32',
Expand All @@ -58,31 +58,94 @@ class TestUpdater(object):
message_count = 0
received = None
attempts = 0
err_handler_called = Event()
cb_handler_called = Event()

@pytest.fixture(autouse=True)
def reset(self):
self.message_count = 0
self.received = None
self.attempts = 0
self.err_handler_called.clear()
self.cb_handler_called.clear()

def error_handler(self, bot, update, error):
self.received = error.message
self.err_handler_called.set()

def callback(self, bot, update):
self.received = update.message.text
self.cb_handler_called.set()

# TODO: test clean= argument
# TODO: test clean= argument of Updater._bootstrap

def test_error_on_get_updates(self, monkeypatch, updater):
@pytest.mark.parametrize(('error',),
argvalues=[(TelegramError('Test Error 2'),),
(Unauthorized('Test Unauthorized'),)],
ids=('TelegramError', 'Unauthorized'))
def test_get_updates_normal_err(self, monkeypatch, updater, error):
def test(*args, **kwargs):
raise TelegramError('Test Error 2')
raise error

monkeypatch.setattr('telegram.Bot.get_updates', test)
monkeypatch.setattr('telegram.Bot.set_webhook', lambda *args, **kwargs: True)
updater.dispatcher.add_error_handler(self.error_handler)
updater.start_polling(0.01)

# Make sure that the error handler was called
self.err_handler_called.wait()
assert self.received == error.message

# Make sure that Updater polling thread keeps running
self.err_handler_called.clear()
self.err_handler_called.wait()

def test_get_updates_bailout_err(self, monkeypatch, updater, caplog):
error = InvalidToken()

def test(*args, **kwargs):
raise error

with caplog.at_level(logging.DEBUG):
monkeypatch.setattr('telegram.Bot.get_updates', test)
monkeypatch.setattr('telegram.Bot.set_webhook', lambda *args, **kwargs: True)
updater.dispatcher.add_error_handler(self.error_handler)
updater.start_polling(0.01)
assert self.err_handler_called.wait(0.5) is not True

# NOTE: This test might hit a race condition and fail (though the 0.5 seconds delay above
# should work around it).
# NOTE: Checking Updater.running is problematic because it is not set to False when there's
# an unhandled exception.
# TODO: We should have a way to poll Updater status and decide if it's running or not.
assert any('unhandled exception in updater' in rec.getMessage() for rec in
caplog.get_records('call'))

@pytest.mark.parametrize(('error',),
argvalues=[(RetryAfter(0.01),),
(TimedOut(),)],
ids=('RetryAfter', 'TimedOut'))
def test_get_updates_retries(self, monkeypatch, updater, error):
event = Event()

def test(*args, **kwargs):
event.set()
raise error

monkeypatch.setattr('telegram.Bot.get_updates', test)
monkeypatch.setattr('telegram.Bot.set_webhook', lambda *args, **kwargs: True)
updater.dispatcher.add_error_handler(self.error_handler)
updater.start_polling(0.01)
sleep(.1)
assert self.received == 'Test Error 2'

# Make sure that get_updates was called, but not the error handler
event.wait()
assert self.err_handler_called.wait(0.5) is not True
assert self.received != error.message

# Make sure that Updater polling thread keeps running
event.clear()
event.wait()
assert self.err_handler_called.wait(0.5) is not True

def test_webhook(self, monkeypatch, updater):
q = Queue()
Expand Down Expand Up @@ -145,17 +208,21 @@ def test_webhook_no_ssl(self, monkeypatch, updater):
sleep(.2)
assert q.get(False) == update

def test_bootstrap_retries_success(self, monkeypatch, updater):
@pytest.mark.parametrize(('error',),
argvalues=[(TelegramError(''),)],
ids=('TelegramError',))
def test_bootstrap_retries_success(self, monkeypatch, updater, error):
retries = 2

def attempt(_, *args, **kwargs):
if self.attempts < retries:
self.attempts += 1
raise TelegramError('')
raise error

monkeypatch.setattr('telegram.Bot.set_webhook', attempt)

updater._bootstrap(retries, False, 'path', None)
updater.running = True
updater._bootstrap(retries, False, 'path', None, bootstrap_interval=0)
assert self.attempts == retries

@pytest.mark.parametrize(('error', 'attempts'),
Expand All @@ -172,8 +239,9 @@ def attempt(_, *args, **kwargs):

monkeypatch.setattr('telegram.Bot.set_webhook', attempt)

updater.running = True
with pytest.raises(type(error)):
updater._bootstrap(retries, False, 'path', None)
updater._bootstrap(retries, False, 'path', None, bootstrap_interval=0)
assert self.attempts == attempts

def test_webhook_invalid_posts(self, updater):
Expand Down