/
bot.py
139 lines (125 loc) · 6.36 KB
/
bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# -*- coding: utf-8 -*-
from django.db import models
from django.utils.encoding import python_2_unicode_compatible
from django.utils.translation import ugettext_lazy as _
from telegram import Bot as BotAPI
from django.db.models.signals import post_save, pre_save
from django.dispatch import receiver
from django.core.urlresolvers import reverse
import logging
from microbot.models import User, ChatState
from django.core.urlresolvers import RegexURLResolver
from django.core.urlresolvers import Resolver404
from telegram import ParseMode, ReplyKeyboardHide, ReplyKeyboardMarkup
from telegram.bot import InvalidToken
import ast
from django.conf import settings
from django.core.exceptions import ValidationError
from django.db.models import Q
logger = logging.getLogger(__name__)
def validate_token(value):
left, sep, _right = value.partition(':')
if (not sep) or (not left.isdigit()) or (len(left) < 3):
raise ValidationError(_("%(value)s is not a valid token"), params={'value': value})
@python_2_unicode_compatible
class Bot(models.Model):
owner = models.ForeignKey(settings.AUTH_USER_MODEL, related_name='bots')
token = models.CharField(_('Token'), max_length=100, db_index=True, validators=[validate_token])
user_api = models.OneToOneField(User, verbose_name=_("Bot User"), related_name='bot',
on_delete=models.CASCADE, blank=True, null=True)
enabled = models.BooleanField(_('Enable'), default=True)
created = models.DateTimeField(('Date Created'), auto_now_add=True)
modified = models.DateTimeField(_('Date Modified'), auto_now=True)
class Meta:
verbose_name = _('Bot')
verbose_name_plural = _('Bots')
def __init__(self, *args, **kwargs):
super(Bot, self).__init__(*args, **kwargs)
self._bot = None
if self.token:
try:
self._bot = BotAPI(self.token)
except InvalidToken:
logger.warning("Incorrect token %s" % self.token)
def __str__(self):
return "%s" % (self.user_api.first_name or self.token if self.user_api else self.token)
def handle(self, update):
urlpatterns = []
try:
state = ChatState.objects.get(chat=update.message.chat).state
for handler in self.handlers.filter(Q(enabled=True), Q(source_states=state) | Q(source_states=None)):
urlpatterns.append(handler.urlpattern())
except ChatState.DoesNotExist:
for handler in self.handlers.filter(enabled=True):
urlpatterns.append(handler.urlpattern())
resolver = RegexURLResolver(r'^', urlpatterns)
try:
resolver_match = resolver.resolve(update.message.text)
except Resolver404:
logger.warning("Handler not found for %s" % update)
else:
callback, callback_args, callback_kwargs = resolver_match
logger.debug("Calling callback:%s for update %s with %s" %
(callback, update, callback_kwargs))
text, keyboard, target_state = callback(self, update=update, **callback_kwargs)
if keyboard:
keyboard = ast.literal_eval(keyboard)
keyboard = ReplyKeyboardMarkup(keyboard, resize_keyboard=True)
else:
keyboard = ReplyKeyboardHide()
self.send_message(chat_id=update.message.chat.id,
text=text.encode('utf-8'), reply_markup=keyboard, parse_mode=ParseMode.HTML)
if target_state:
try:
chat_state = ChatState.objects.get(chat=update.message.chat)
except ChatState.DoesNotExist:
logger.error("Chat state update error:%s for update %s with %s" %
(target_state, update, callback_kwargs))
else:
chat_state.state = target_state
chat_state.save()
logger.debug("Chat state updated:%s for update %s with %s" %
(target_state, update, callback_kwargs))
else:
logger.warning("No target state after calling:%s for update %s with %s" %
(callback, update, callback_kwargs))
def handle_hook(self, hook, data):
logger.debug("Calling hook %s process: with %s" % (hook.key, data))
text, keyboard = hook.process(self, data)
if keyboard:
keyboard = ast.literal_eval(keyboard)
keyboard = ReplyKeyboardMarkup(keyboard, resize_keyboard=True)
else:
keyboard = ReplyKeyboardHide()
for recipient in hook.recipients.all():
self.send_message(chat_id=recipient.chat_id,
text=text.encode('utf-8'), reply_markup=keyboard, parse_mode=ParseMode.HTML)
def send_message(self, chat_id, text, parse_mode=None, disable_web_page_preview=None, **kwargs):
self._bot.sendMessage(chat_id=chat_id, text=text, parse_mode=parse_mode,
disable_web_page_preview=disable_web_page_preview, **kwargs)
logger.debug("Message sent:(chat:%s,text:%s,parse_mode:%s,disable_preview:%s,kwargs:%s" %
(chat_id, text, parse_mode, disable_web_page_preview, kwargs))
@receiver(pre_save, sender=Bot)
def validate_bot(sender, instance, **kwargs):
validate_token(instance.token)
@receiver(post_save, sender=Bot)
def set_api(sender, instance, **kwargs):
# set bot api if not yet
if not instance._bot:
instance._bot = BotAPI(instance.token)
# set webhook
url = None
if instance.enabled:
webhook = reverse('microbot:telegrambot', kwargs={'token': instance.token})
from django.contrib.sites.models import Site
current_site = Site.objects.get_current()
url = 'https://' + current_site.domain + webhook
instance._bot.setWebhook(webhook_url=url)
logger.info("Success: Webhook url %s for bot %s set" % (url, str(instance)))
# complete Bot instance with api data
if not instance.user_api:
bot_api = instance._bot.getMe()
user_api, _ = User.objects.get_or_create(**bot_api.to_dict())
instance.user_api = user_api
instance.save()
logger.info("Success: Bot api info for bot %s set" % str(instance))