Skip to content

Commit

Permalink
Add Filters.via_bot (#2009)
Browse files Browse the repository at this point in the history
* feat: via_bot filter

also fixing a small mistake in the empty parameter of the user filter and improve docs slightly

* fix: forgot to set via_bot to None

* fix: redoing subclassing to copy paste solution

* Cosmetic changes

Co-authored-by: Hinrich Mahler <hinrich.mahler@freenet.de>
  • Loading branch information
Poolitzer and Bibo-Joshi committed Jul 14, 2020
1 parent fd0325f commit 0189442
Show file tree
Hide file tree
Showing 2 changed files with 292 additions and 4 deletions.
169 changes: 166 additions & 3 deletions telegram/ext/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -887,7 +887,8 @@ def filter(self, message):
"""Messages sent in a group chat."""

class user(BaseFilter):
"""Filters messages to allow only those which are from specified user ID.
"""Filters messages to allow only those which are from specified user ID(s) or
username(s).
Examples:
``MessageHandler(Filters.user(1234), callback_method)``
Expand Down Expand Up @@ -919,7 +920,6 @@ class user(BaseFilter):
RuntimeError: If user_id and username are both present.
"""

def __init__(self, user_id=None, username=None, allow_empty=False):
self.allow_empty = allow_empty
self.__lock = Lock()
Expand Down Expand Up @@ -1053,8 +1053,171 @@ def filter(self, message):
return self.allow_empty
return False

class via_bot(BaseFilter):
"""Filters messages to allow only those which are from specified via_bot ID(s) or
username(s).
Examples:
``MessageHandler(Filters.via_bot(1234), callback_method)``
Warning:
:attr:`bot_ids` will give a *copy* of the saved bot ids as :class:`frozenset`. This
is to ensure thread safety. To add/remove a bot, you should use :meth:`add_usernames`,
:meth:`add_bot_ids`, :meth:`remove_usernames` and :meth:`remove_bot_ids`. Only update
the entire set by ``filter.bot_ids/usernames = new_set``, if you are entirely sure
that it is not causing race conditions, as this will complete replace the current set
of allowed bots.
Attributes:
bot_ids(set(:obj:`int`), optional): Which bot ID(s) to allow through.
usernames(set(:obj:`str`), optional): Which username(s) (without leading '@') to allow
through.
allow_empty(:obj:`bool`, optional): Whether updates should be processed, if no bot
is specified in :attr:`bot_ids` and :attr:`usernames`.
Args:
bot_id(:obj:`int` | List[:obj:`int`], optional): Which bot ID(s) to allow
through.
username(:obj:`str` | List[:obj:`str`], optional): Which username(s) to allow
through. Leading '@'s in usernames will be discarded.
allow_empty(:obj:`bool`, optional): Whether updates should be processed, if no user
is specified in :attr:`bot_ids` and :attr:`usernames`. Defaults to :obj:`False`
Raises:
RuntimeError: If bot_id and username are both present.
"""

def __init__(self, bot_id=None, username=None, allow_empty=False):
self.allow_empty = allow_empty
self.__lock = Lock()

self._bot_ids = set()
self._usernames = set()

self._set_bot_ids(bot_id)
self._set_usernames(username)

@staticmethod
def _parse_bot_id(bot_id):
if bot_id is None:
return set()
if isinstance(bot_id, int):
return {bot_id}
return set(bot_id)

@staticmethod
def _parse_username(username):
if username is None:
return set()
if isinstance(username, str):
return {username[1:] if username.startswith('@') else username}
return {bot[1:] if bot.startswith('@') else bot for bot in username}

def _set_bot_ids(self, bot_id):
with self.__lock:
if bot_id and self._usernames:
raise RuntimeError("Can't set bot_id in conjunction with (already set) "
"usernames.")
self._bot_ids = self._parse_bot_id(bot_id)

def _set_usernames(self, username):
with self.__lock:
if username and self._bot_ids:
raise RuntimeError("Can't set username in conjunction with (already set) "
"bot_ids.")
self._usernames = self._parse_username(username)

@property
def bot_ids(self):
with self.__lock:
return frozenset(self._bot_ids)

@bot_ids.setter
def bot_ids(self, bot_id):
self._set_bot_ids(bot_id)

@property
def usernames(self):
with self.__lock:
return frozenset(self._usernames)

@usernames.setter
def usernames(self, username):
self._set_usernames(username)

def add_usernames(self, username):
"""
Add one or more users to the allowed usernames.
Args:
username(:obj:`str` | List[:obj:`str`], optional): Which username(s) to allow
through. Leading '@'s in usernames will be discarded.
"""
with self.__lock:
if self._bot_ids:
raise RuntimeError("Can't set username in conjunction with (already set) "
"bot_ids.")

username = self._parse_username(username)
self._usernames |= username

def add_bot_ids(self, bot_id):
"""
Add one or more users to the allowed user ids.
Args:
bot_id(:obj:`int` | List[:obj:`int`], optional): Which bot ID(s) to allow
through.
"""
with self.__lock:
if self._usernames:
raise RuntimeError("Can't set bot_id in conjunction with (already set) "
"usernames.")

bot_id = self._parse_bot_id(bot_id)

self._bot_ids |= bot_id

def remove_usernames(self, username):
"""
Remove one or more users from allowed usernames.
Args:
username(:obj:`str` | List[:obj:`str`], optional): Which username(s) to disallow
through. Leading '@'s in usernames will be discarded.
"""
with self.__lock:
if self._bot_ids:
raise RuntimeError("Can't set username in conjunction with (already set) "
"bot_ids.")

username = self._parse_username(username)
self._usernames -= username

def remove_bot_ids(self, bot_id):
"""
Remove one or more users from allowed user ids.
Args:
bot_id(:obj:`int` | List[:obj:`int`], optional): Which bot ID(s) to disallow
through.
"""
with self.__lock:
if self._usernames:
raise RuntimeError("Can't set bot_id in conjunction with (already set) "
"usernames.")
bot_id = self._parse_bot_id(bot_id)
self._bot_ids -= bot_id

def filter(self, message):
"""""" # remove method from docs
if message.via_bot:
if self.bot_ids:
return message.via_bot.id in self.bot_ids
if self.usernames:
return (message.via_bot.username
and message.via_bot.username in self.usernames)
return self.allow_empty
return False

class chat(BaseFilter):
"""Filters messages to allow only those which are from specified chat ID.
"""Filters messages to allow only those which are from a specified chat ID or username.
Examples:
``MessageHandler(Filters.chat(-1234), callback_method)``
Expand Down
127 changes: 126 additions & 1 deletion tests/test_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
@pytest.fixture(scope='function')
def update():
return Update(0, Message(0, User(0, 'Testuser', False), datetime.datetime.utcnow(),
Chat(0, 'private')))
Chat(0, 'private'), via_bot=User(0, "Testbot", True)))


@pytest.fixture(scope='function',
Expand Down Expand Up @@ -1093,3 +1093,128 @@ def filter(self, _):
update.message.text = 'test'
result = (Filters.command | DataFilter('blah'))(update)
assert result['test'] == ['blah']

def test_filters_via_bot_init(self):
with pytest.raises(RuntimeError, match='in conjunction with'):
Filters.via_bot(bot_id=1, username='bot')

def test_filters_via_bot_allow_empty(self, update):
assert not Filters.via_bot()(update)
assert Filters.via_bot(allow_empty=True)(update)

def test_filters_via_bot_id(self, update):
assert not Filters.via_bot(bot_id=1)(update)
update.message.via_bot.id = 1
assert Filters.via_bot(bot_id=1)(update)
update.message.via_bot.id = 2
assert Filters.via_bot(bot_id=[1, 2])(update)
assert not Filters.via_bot(bot_id=[3, 4])(update)
update.message.via_bot = None
assert not Filters.via_bot(bot_id=[3, 4])(update)

def test_filters_via_bot_username(self, update):
assert not Filters.via_bot(username='bot')(update)
assert not Filters.via_bot(username='Testbot')(update)
update.message.via_bot.username = 'bot@'
assert Filters.via_bot(username='@bot@')(update)
assert Filters.via_bot(username='bot@')(update)
assert Filters.via_bot(username=['bot1', 'bot@', 'bot2'])(update)
assert not Filters.via_bot(username=['@username', '@bot_2'])(update)
update.message.via_bot = None
assert not Filters.user(username=['@username', '@bot_2'])(update)

def test_filters_via_bot_change_id(self, update):
f = Filters.via_bot(bot_id=3)
update.message.via_bot.id = 3
assert f(update)
update.message.via_bot.id = 2
assert not f(update)
f.bot_ids = 2
assert f(update)

with pytest.raises(RuntimeError, match='username in conjunction'):
f.usernames = 'user'

def test_filters_via_bot_change_username(self, update):
f = Filters.via_bot(username='bot')
update.message.via_bot.username = 'bot'
assert f(update)
update.message.via_bot.username = 'Bot'
assert not f(update)
f.usernames = 'Bot'
assert f(update)

with pytest.raises(RuntimeError, match='bot_id in conjunction'):
f.bot_ids = 1

def test_filters_via_bot_add_user_by_name(self, update):
users = ['bot_a', 'bot_b', 'bot_c']
f = Filters.via_bot()

for user in users:
update.message.via_bot.username = user
assert not f(update)

f.add_usernames('bot_a')
f.add_usernames(['bot_b', 'bot_c'])

for user in users:
update.message.via_bot.username = user
assert f(update)

with pytest.raises(RuntimeError, match='bot_id in conjunction'):
f.add_bot_ids(1)

def test_filters_via_bot_add_user_by_id(self, update):
users = [1, 2, 3]
f = Filters.via_bot()

for user in users:
update.message.via_bot.id = user
assert not f(update)

f.add_bot_ids(1)
f.add_bot_ids([2, 3])

for user in users:
update.message.via_bot.username = user
assert f(update)

with pytest.raises(RuntimeError, match='username in conjunction'):
f.add_usernames('bot')

def test_filters_via_bot_remove_user_by_name(self, update):
users = ['bot_a', 'bot_b', 'bot_c']
f = Filters.via_bot(username=users)

with pytest.raises(RuntimeError, match='bot_id in conjunction'):
f.remove_bot_ids(1)

for user in users:
update.message.via_bot.username = user
assert f(update)

f.remove_usernames('bot_a')
f.remove_usernames(['bot_b', 'bot_c'])

for user in users:
update.message.via_bot.username = user
assert not f(update)

def test_filters_via_bot_remove_user_by_id(self, update):
users = [1, 2, 3]
f = Filters.via_bot(bot_id=users)

with pytest.raises(RuntimeError, match='username in conjunction'):
f.remove_usernames('bot')

for user in users:
update.message.via_bot.id = user
assert f(update)

f.remove_bot_ids(1)
f.remove_bot_ids([2, 3])

for user in users:
update.message.via_bot.username = user
assert not f(update)

0 comments on commit 0189442

Please sign in to comment.