Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: via_bot filter #2009

Merged
merged 4 commits into from
Jul 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 166 additions & 3 deletions telegram/ext/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -887,7 +887,8 @@ def filter(self, message):
"""Messages sent in a group chat."""

class user(BaseFilter):
"""Filters messages to allow only those which are from specified user ID.
"""Filters messages to allow only those which are from specified user ID(s) or
username(s).

Examples:
``MessageHandler(Filters.user(1234), callback_method)``
Expand Down Expand Up @@ -919,7 +920,6 @@ class user(BaseFilter):
RuntimeError: If user_id and username are both present.

"""

def __init__(self, user_id=None, username=None, allow_empty=False):
self.allow_empty = allow_empty
self.__lock = Lock()
Expand Down Expand Up @@ -1053,8 +1053,171 @@ def filter(self, message):
return self.allow_empty
return False

class via_bot(BaseFilter):
"""Filters messages to allow only those which are from specified via_bot ID(s) or
username(s).

Examples:
``MessageHandler(Filters.via_bot(1234), callback_method)``

Warning:
Bibo-Joshi marked this conversation as resolved.
Show resolved Hide resolved
:attr:`bot_ids` will give a *copy* of the saved bot ids as :class:`frozenset`. This
is to ensure thread safety. To add/remove a bot, you should use :meth:`add_usernames`,
:meth:`add_bot_ids`, :meth:`remove_usernames` and :meth:`remove_bot_ids`. Only update
the entire set by ``filter.bot_ids/usernames = new_set``, if you are entirely sure
that it is not causing race conditions, as this will complete replace the current set
of allowed bots.

Attributes:
bot_ids(set(:obj:`int`), optional): Which bot ID(s) to allow through.
usernames(set(:obj:`str`), optional): Which username(s) (without leading '@') to allow
through.
allow_empty(:obj:`bool`, optional): Whether updates should be processed, if no bot
is specified in :attr:`bot_ids` and :attr:`usernames`.

Args:
bot_id(:obj:`int` | List[:obj:`int`], optional): Which bot ID(s) to allow
through.
username(:obj:`str` | List[:obj:`str`], optional): Which username(s) to allow
through. Leading '@'s in usernames will be discarded.
allow_empty(:obj:`bool`, optional): Whether updates should be processed, if no user
is specified in :attr:`bot_ids` and :attr:`usernames`. Defaults to :obj:`False`

Raises:
RuntimeError: If bot_id and username are both present.
"""

def __init__(self, bot_id=None, username=None, allow_empty=False):
self.allow_empty = allow_empty
self.__lock = Lock()

self._bot_ids = set()
self._usernames = set()

self._set_bot_ids(bot_id)
self._set_usernames(username)

@staticmethod
def _parse_bot_id(bot_id):
if bot_id is None:
return set()
if isinstance(bot_id, int):
return {bot_id}
return set(bot_id)

@staticmethod
def _parse_username(username):
if username is None:
return set()
if isinstance(username, str):
return {username[1:] if username.startswith('@') else username}
return {bot[1:] if bot.startswith('@') else bot for bot in username}

def _set_bot_ids(self, bot_id):
with self.__lock:
if bot_id and self._usernames:
raise RuntimeError("Can't set bot_id in conjunction with (already set) "
"usernames.")
self._bot_ids = self._parse_bot_id(bot_id)

def _set_usernames(self, username):
with self.__lock:
if username and self._bot_ids:
raise RuntimeError("Can't set username in conjunction with (already set) "
"bot_ids.")
self._usernames = self._parse_username(username)

@property
def bot_ids(self):
with self.__lock:
return frozenset(self._bot_ids)

@bot_ids.setter
def bot_ids(self, bot_id):
self._set_bot_ids(bot_id)

@property
def usernames(self):
with self.__lock:
return frozenset(self._usernames)

@usernames.setter
def usernames(self, username):
self._set_usernames(username)

def add_usernames(self, username):
"""
Add one or more users to the allowed usernames.
Args:
username(:obj:`str` | List[:obj:`str`], optional): Which username(s) to allow
through. Leading '@'s in usernames will be discarded.
"""
with self.__lock:
if self._bot_ids:
raise RuntimeError("Can't set username in conjunction with (already set) "
"bot_ids.")

username = self._parse_username(username)
self._usernames |= username

def add_bot_ids(self, bot_id):
"""
Add one or more users to the allowed user ids.
Args:
bot_id(:obj:`int` | List[:obj:`int`], optional): Which bot ID(s) to allow
through.
"""
with self.__lock:
if self._usernames:
raise RuntimeError("Can't set bot_id in conjunction with (already set) "
"usernames.")

bot_id = self._parse_bot_id(bot_id)

self._bot_ids |= bot_id

def remove_usernames(self, username):
"""
Remove one or more users from allowed usernames.
Args:
username(:obj:`str` | List[:obj:`str`], optional): Which username(s) to disallow
through. Leading '@'s in usernames will be discarded.
"""
with self.__lock:
if self._bot_ids:
raise RuntimeError("Can't set username in conjunction with (already set) "
"bot_ids.")

username = self._parse_username(username)
self._usernames -= username

def remove_bot_ids(self, bot_id):
"""
Remove one or more users from allowed user ids.
Args:
bot_id(:obj:`int` | List[:obj:`int`], optional): Which bot ID(s) to disallow
through.
"""
with self.__lock:
if self._usernames:
raise RuntimeError("Can't set bot_id in conjunction with (already set) "
"usernames.")
bot_id = self._parse_bot_id(bot_id)
self._bot_ids -= bot_id

def filter(self, message):
"""""" # remove method from docs
if message.via_bot:
if self.bot_ids:
return message.via_bot.id in self.bot_ids
if self.usernames:
return (message.via_bot.username
and message.via_bot.username in self.usernames)
return self.allow_empty
return False

class chat(BaseFilter):
"""Filters messages to allow only those which are from specified chat ID.
"""Filters messages to allow only those which are from a specified chat ID or username.

Examples:
``MessageHandler(Filters.chat(-1234), callback_method)``
Expand Down
127 changes: 126 additions & 1 deletion tests/test_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
@pytest.fixture(scope='function')
def update():
return Update(0, Message(0, User(0, 'Testuser', False), datetime.datetime.utcnow(),
Chat(0, 'private')))
Chat(0, 'private'), via_bot=User(0, "Testbot", True)))


@pytest.fixture(scope='function',
Expand Down Expand Up @@ -1093,3 +1093,128 @@ def filter(self, _):
update.message.text = 'test'
result = (Filters.command | DataFilter('blah'))(update)
assert result['test'] == ['blah']

def test_filters_via_bot_init(self):
with pytest.raises(RuntimeError, match='in conjunction with'):
Filters.via_bot(bot_id=1, username='bot')

def test_filters_via_bot_allow_empty(self, update):
assert not Filters.via_bot()(update)
assert Filters.via_bot(allow_empty=True)(update)

def test_filters_via_bot_id(self, update):
assert not Filters.via_bot(bot_id=1)(update)
update.message.via_bot.id = 1
assert Filters.via_bot(bot_id=1)(update)
update.message.via_bot.id = 2
assert Filters.via_bot(bot_id=[1, 2])(update)
assert not Filters.via_bot(bot_id=[3, 4])(update)
update.message.via_bot = None
assert not Filters.via_bot(bot_id=[3, 4])(update)

def test_filters_via_bot_username(self, update):
assert not Filters.via_bot(username='bot')(update)
assert not Filters.via_bot(username='Testbot')(update)
update.message.via_bot.username = 'bot@'
assert Filters.via_bot(username='@bot@')(update)
assert Filters.via_bot(username='bot@')(update)
assert Filters.via_bot(username=['bot1', 'bot@', 'bot2'])(update)
assert not Filters.via_bot(username=['@username', '@bot_2'])(update)
update.message.via_bot = None
assert not Filters.user(username=['@username', '@bot_2'])(update)

def test_filters_via_bot_change_id(self, update):
f = Filters.via_bot(bot_id=3)
update.message.via_bot.id = 3
assert f(update)
update.message.via_bot.id = 2
assert not f(update)
f.bot_ids = 2
assert f(update)

with pytest.raises(RuntimeError, match='username in conjunction'):
f.usernames = 'user'

def test_filters_via_bot_change_username(self, update):
f = Filters.via_bot(username='bot')
update.message.via_bot.username = 'bot'
assert f(update)
update.message.via_bot.username = 'Bot'
assert not f(update)
f.usernames = 'Bot'
assert f(update)

with pytest.raises(RuntimeError, match='bot_id in conjunction'):
f.bot_ids = 1

def test_filters_via_bot_add_user_by_name(self, update):
users = ['bot_a', 'bot_b', 'bot_c']
f = Filters.via_bot()

for user in users:
update.message.via_bot.username = user
assert not f(update)

f.add_usernames('bot_a')
f.add_usernames(['bot_b', 'bot_c'])

for user in users:
update.message.via_bot.username = user
assert f(update)

with pytest.raises(RuntimeError, match='bot_id in conjunction'):
f.add_bot_ids(1)

def test_filters_via_bot_add_user_by_id(self, update):
users = [1, 2, 3]
f = Filters.via_bot()

for user in users:
update.message.via_bot.id = user
assert not f(update)

f.add_bot_ids(1)
f.add_bot_ids([2, 3])

for user in users:
update.message.via_bot.username = user
assert f(update)

with pytest.raises(RuntimeError, match='username in conjunction'):
f.add_usernames('bot')

def test_filters_via_bot_remove_user_by_name(self, update):
users = ['bot_a', 'bot_b', 'bot_c']
f = Filters.via_bot(username=users)

with pytest.raises(RuntimeError, match='bot_id in conjunction'):
f.remove_bot_ids(1)

for user in users:
update.message.via_bot.username = user
assert f(update)

f.remove_usernames('bot_a')
f.remove_usernames(['bot_b', 'bot_c'])

for user in users:
update.message.via_bot.username = user
assert not f(update)

def test_filters_via_bot_remove_user_by_id(self, update):
users = [1, 2, 3]
f = Filters.via_bot(bot_id=users)

with pytest.raises(RuntimeError, match='username in conjunction'):
f.remove_usernames('bot')

for user in users:
update.message.via_bot.id = user
assert f(update)

f.remove_bot_ids(1)
f.remove_bot_ids([2, 3])

for user in users:
update.message.via_bot.username = user
assert not f(update)