Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Telegram webhook receiver #85

Merged
merged 10 commits into from
Jul 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/wagtail_live/adapters/telegram/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from .receiver import TelegramWebhookMixin, TelegramWebhookReceiver
from .utils import (
get_base_telegram_url,
get_telegram_bot_token,
get_telegram_webhook_url,
)

__all__ = [
"TelegramWebhookMixin",
"TelegramWebhookReceiver",
"get_telegram_bot_token",
"get_telegram_webhook_url",
"get_base_telegram_url",
]
269 changes: 269 additions & 0 deletions src/wagtail_live/adapters/telegram/receiver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
import logging

import requests
from django.core.files.base import ContentFile

from wagtail_live.exceptions import RequestVerificationError, WebhookSetupError
from wagtail_live.receivers import BaseMessageReceiver, WebhookReceiverMixin
from wagtail_live.utils import is_embed

from .utils import (
get_base_telegram_url,
get_telegram_bot_token,
get_telegram_webhook_url,
)

logger = logging.getLogger(__name__)


class TelegramWebhookMixin(WebhookReceiverMixin):
"""Telegram WebhookMixin."""

url_path = "telegram/events/<str:token>/"
url_name = "telegram_events_handler"

def verify_request(self, request, body, token, *args, **kwargs):
"""See base class."""

if token != get_telegram_bot_token():
raise RequestVerificationError

@classmethod
def webhook_connection_set(cls):
"""See base class."""

response = requests.get(get_base_telegram_url() + "getWebhookInfo")
if response.ok:
# Ensure that the webhook is set with the correct URL
payload = response.json()
return (
payload["ok"] and payload["result"]["url"] == get_telegram_webhook_url()
)
return False

@classmethod
def set_webhook(cls):
"""Sets webhook connection with Telegram's API"""

response = requests.get(
get_base_telegram_url() + "setWebhook",
params={
"url": get_telegram_webhook_url(),
"allowed_updates": [
"message",
"edited_message",
"channel_post",
"edited_channel_post",
],
},
)
payload = response.json()

if not response.ok or not payload["ok"]:
raise WebhookSetupError(
"Failed to set Webhook connection with Telegram's API. "
+ f"{payload['description']}"
)


class TelegramWebhookReceiver(TelegramWebhookMixin, BaseMessageReceiver):
"""Telegram webhook receiver."""

def dispatch_event(self, event):
"""Telegram doesn't send an update when a message is deleted.
See base class.
"""

for edit_type in ["edited_message", "edited_channel_post"]:
if edit_type in event:
self.change_message(message=event.get(edit_type))
return

message = event.get("message") or event.get("channel_post")
if "media_group_id" in message:
try:
self.add_image_to_message(message=message)
return
except KeyError:
pass

if "entities" in message and message["entities"][0]["type"] == "bot_command":
self.handle_bot_command(message=message)
return

self.add_message(message=message)

def add_image_to_message(self, message):
"""Telegram sends images uploaded in a message one by one."""

channel_id = self.get_channel_id_from_message(message=message)
try:
live_page = self.get_live_page_from_channel_id(channel_id=channel_id)
except self.model.DoesNotExist:
return

message_id = self.get_message_id_from_message(message=message)
live_post = live_page.get_live_post_by_message_id(message_id=message_id)

files = self.get_message_files(message=message)
self.process_files(live_post=live_post.value, files=files)

live_page.update_live_post(live_post=live_post)

def handle_bot_command(self, message):
"""Handles the following bot commands:
/get_chat_id: returns the id of the current chat.
"""

command = message["entities"][0]
start = command["offset"]
end = start + command["length"]
command_text = message["text"][start:end]

if command_text == "/get_chat_id":
chat_id = self.get_channel_id_from_message(message=message)
response = requests.get(
get_base_telegram_url() + "sendMessage",
params={
"chat_id": chat_id,
"text": chat_id,
},
)

payload = response.json()
if not payload["ok"]:
logger.error(payload["description"])

def get_channel_id_from_message(self, message):
"""See base class."""

# Since live posts aren't cleaned when they are added via a receiver,
# we make sure at this level that we return the correct types.
return str(message["chat"]["id"])

def get_message_id_from_message(self, message):
"""Messages containing multiple images have a media_group_id attribute.
See base class.
"""

msg_id = message.get("media_group_id") or message.get("message_id")
return str(msg_id)

def get_message_text(self, message):
"""See base class."""

# Telegram parses the text of a message before sending it.
# The result can be found in the message's "entities".
return {
"text": message.get("text") or message.get("caption") or "",
"entities": message.get("entities", []),
}

def process_text(self, live_post, message_text):
"""Use the message entities to convert links.
A raw link isn't converted by Telegram.
A link with a description is sent as a `text_link` entity.
See base class.
"""

text = message_text["text"]
len_text = len(text)
entities = message_text["entities"]

# Process the entities in reversed order to be able to edit the text in place.
for entity in reversed(entities):
url = ""
start = entity["offset"]
end = start + entity["length"]

if entity["type"] == "url":
url = description = text[start:end]

if is_embed(url):
# Check if this can match an embed block, if so no conversion happens.
Tijani-Dia marked this conversation as resolved.
Show resolved Hide resolved
# It matches an embed block if it has a line in the text for itself.
if end == len_text or text[end] == "\n":
if start == 0 or text[start - 1] == "\n":
# This is an embed block, skip to the next entity
continue

if entity["type"] == "text_link":
url = entity["url"]
description = text[start:end]

if url:
link = f'<a href="{url}">{description}</a>'
text = text[:start] + link + text[end:]

return super().process_text(live_post=live_post, message_text=text)

def get_file_path(self, file_id):
"""Retrieves the file_path of a Telegram file.
The file_path is necessary to have more infos about the image and download it.

Args:
file_id (str): Id of the file to download.

Returns:
(str) The file_path property of the file as sent by Telegram.
"""

response = requests.get(
get_base_telegram_url() + "getFile", params={"file_id": file_id}
)
return response.json()["result"]["file_path"]

def get_message_files(self, message):
"""See base class."""

if "photo" in message:
# Choose original photo which is the last of the list
photo = message["photo"][-1]
photo["file_path"] = self.get_file_path(file_id=photo["file_id"])
return [photo]
return []

def get_image_title(self, image):
"""See base class."""

return image["file_path"].split("/")[-1].split(".")[0]

def get_image_name(self, image):
"""See base class."""

return image["file_path"].split("/")[-1]

def get_image_mimetype(self, image):
"""See base class."""

mimetype = image["file_path"].split("/")[-1].split(".")[-1]
return "jpeg" if mimetype == "jpg" else mimetype

def get_image_content(self, image):
"""See base class."""

file_path = image["file_path"]
response = requests.get(
f"https://api.telegram.org/file/bot{get_telegram_bot_token()}/{file_path}"
)
return ContentFile(response.content)

def get_image_dimensions(self, image):
"""See base class."""

return image["width"], image["height"]

def get_message_id_from_edited_message(self, message):
"""See base class."""

return self.get_message_id_from_message(message=message)

def get_message_text_from_edited_message(self, message):
"""See base class."""

return self.get_message_text(message=message)

def get_message_files_from_edited_message(self, message):
"""See base class."""

return self.get_message_files(message=message)
55 changes: 55 additions & 0 deletions src/wagtail_live/adapters/telegram/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from functools import lru_cache

from django.conf import settings
from django.core.exceptions import ImproperlyConfigured


@lru_cache(maxsize=1)
def get_telegram_bot_token():
"""Retrieves user's telegram bot token.

Returns:
(str) telegram bot token.

Raises:
(ImproperlyConfigured) if the telegram bot token isn't specified in settings.
"""

bot_token = getattr(settings, "TELEGRAM_BOT_TOKEN", "")
if not bot_token:
raise ImproperlyConfigured(
"Specify TELEGRAM_BOT_TOKEN if you intend to use Telegram as input source."
)
return bot_token


@lru_cache(maxsize=1)
def get_telegram_webhook_url():
"""Retrieves the webhook url which Telegram sends new updates to.

Returns:
(str) a URL.

Raises:
(ImproperlyConfigured) if the telegram webhook url isn't specified in settings.
"""

base_webhook_url = getattr(settings, "TELEGRAM_WEBHOOK_URL", "")
if not base_webhook_url:
raise ImproperlyConfigured(
"Specify TELEGRAM_WEBHOOK_URL if you intend to use Telegram as input source."
)

if base_webhook_url.endswith("/"):
base_webhook_url = base_webhook_url[:-1]

return (
f"{base_webhook_url}/wagtail_live/telegram/events/{get_telegram_bot_token()}/"
)


@lru_cache(maxsize=1)
def get_base_telegram_url():
"""Returns the base URL to use when calling Telegram's API."""

return f"https://api.telegram.org/bot{get_telegram_bot_token()}/"
7 changes: 2 additions & 5 deletions src/wagtail_live/receivers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
construct_live_post_block,
construct_text_block,
)
from .exceptions import RequestVerificationError, WebhookSetupError
from .exceptions import RequestVerificationError
from .utils import SUPPORTED_MIME_TYPES, get_live_page_model, is_embed

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -493,10 +493,7 @@ def get_urls(cls):
"""

if not cls.webhook_connection_set():
try:
cls.set_webhook()
except WebhookSetupError:
raise
cls.set_webhook()

return [
path(cls.url_path, cls.as_view(), name=cls.url_name),
Expand Down
2 changes: 2 additions & 0 deletions src/wagtail_live/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Wagtail Live utils"""

import re
from functools import lru_cache
from importlib import import_module

from django.conf import settings
Expand Down Expand Up @@ -121,6 +122,7 @@ def get_polling_interval():
return getattr(settings, "WAGTAIL_LIVE_POLLING_INTERVAL", 3000)


@lru_cache(maxsize=None)
def is_embed(text):
"""Checks if a text is a link to embed.

Expand Down