In [1]:
import re
import requests
from bs4 import BeautifulSoup

In [2]:
proxies = {
    "http": "http://127.0.0.1:2080",  # Replace with your proxy address
    "https": "http://127.0.0.1:2080"  # For HTTPS requests as well
}


In [3]:
res = requests.get("https://core.telegram.org/bots/api", proxies=proxies)

In [4]:
pattern = r'<h4><a class=\"anchor\" name=\".*?\" href=\".*?\"><i class=\"anchor-icon\"><\/i><\/a>(.*?)<\/h4>\s+<p>(.*?)<\/p>\s+<table class=\"table\">\s+<thead>\s+<tr>\s+<th>Parameter<\/th>\s+<th>Type<\/th>\s+<th>Required<\/th>\s+<th>Description<\/th>\s+<\/tr>\s+<\/thead>\s+<tbody.*?>([\s\S]*?)<\/tbody>'

In [5]:
all_methods = re.findall(pattern, res.text)

In [6]:
table_body_pattern = r'<tr>\s+<td>(.*?)</td>\s+<td>(.*?)</td>\s+<td>(.*?)</td>\s+<td>(.*?)</td>\s+</tr>'

In [7]:
max_char_pattern = r'-(\d+) characters'

In [8]:
def text_cleaner(text):
    return BeautifulSoup(text, 'html.parser').get_text().replace('"', '\\"')

In [38]:
with open('../component/telegram/models.py', 'w') as file:
    file.write("""from typing import List
               
from django.contrib.postgres.fields import ArrayField
from django.db import models
from django.contrib.contenttypes.fields import GenericForeignKey
from django.contrib.contenttypes.models import ContentType
from django.db.models import Q
from django.forms.models import model_to_dict



class Keyboard(models.Model):
    timestamp = models.DateTimeField(auto_now_add=True)
    
    class Meta:
        pass


class InlineKeyboardButton(models.Model):
    text = models.CharField(max_length=255, help_text="Text of the button")
    url = models.URLField(null=True, blank=True, help_text="Optional. HTTP or tg:// URL to be opened when the button is pressed. Links tg://user?id=<user_id> can be used to mention a user by their ID without using a username, if this is allowed by their privacy settings.")
    callback_data = models.CharField(max_length=255, null=True, blank=True, help_text="Optional. Data to be sent in a callback query to the bot when the button is pressed, 1-64 bytes")
    

class InlineKeyboardMarkup(Keyboard):
    inline_keyboard = models.ManyToManyField(InlineKeyboardButton, related_name="inline_keyboards", help_text="Array of button rows, each represented by an Array of InlineKeyboardButton objects")

    def __str__(self) -> str:
        return f"InlineKeyboardMarkup ({self.inline_keyboard.count()} rows)"

class KeyboardButton(models.Model):
    text = models.CharField(max_length=255, help_text="Text of the button")
    request_contact = models.BooleanField(default=False, help_text="Optional. If True, the user's phone number will be sent as a contact when the button is pressed. Available in private chats only.")
    request_location = models.BooleanField(default=False, help_text="Optional. If True, the user's current location will be sent when the button is pressed. Available in private chats only.")

class ReplyKeyboardMarkup(Keyboard):
    keyboard = models.ManyToManyField(KeyboardButton, related_name="reply_keyboards", help_text="Array of button rows, each represented by an Array of KeyboardButton objects")
    is_persistent = models.BooleanField(default=False, help_text="Optional. Requests clients to always show the keyboard when the user opens the chat. Defaults to false, in which case the custom keyboard disappears after one use")
    resize_keyboard = models.BooleanField(default=False, help_text="Requests clients to resize the keyboard vertically for optimal fit")
    one_time_keyboard = models.BooleanField(default=False, help_text="Requests clients to hide the keyboard as soon as it's been used")
    input_field_placeholder = models.CharField(max_length=255, help_text="Optional. The placeholder to be shown in the input field when the keyboard is active; 1-64 characters, 0-words")
    selective = models.BooleanField(default=False, help_text="Optional. Use this parameter if you want to show the keyboard to specific users only. Targets: 1) users that are @mentioned in the text of the Message object; 2) if the bot's message is a reply to a message in the same chat and forum topic, sender of the original message.")
    
    def __str__(self) -> str:
        return f"ReplyKeyboardMarkup ({self.keyboard.count()} rows)"


class ReplyKeyboardRemove(Keyboard):
    remove_keyboard = models.BooleanField(default=True, help_text="Requests clients to remove the custom keyboard (user will not be able to summon this keyboard; if you want to hide the keyboard from sight but keep it accessible, use one_time_keyboard in ReplyKeyboardMarkup)")
    selective = models.BooleanField(default=False, help_text="Use this parameter if you want to remove the keyboard for specific users only")
    
    def __str__(self) -> str:
        return "ReplyKeyboardRemove"


class ForceReply(Keyboard):
    force_reply = models.BooleanField(
        default=True,
        help_text="Shows reply interface to the user, as if they manually selected the bot's message and tapped 'Reply'",
    )
    input_field_placeholder = models.CharField(
        max_length=64,
        help_text="Optional. The placeholder to be shown in the input field when the reply is active; 1-64 characters",
    )
    selective = models.BooleanField(
        default=False,
        help_text="Use this parameter if you want to force reply from specific users only",
    )

    def __str__(self) -> str:
        return "ForceReply"
               
               
class Component(models.Model):
    class ComponentType(models.TextChoices):
        TELEGRAM = "TELEGRAM", "Telegram API Component"
        TRIGGER = "TRIGGER", "Trigger Component"
        CONDITIONAL = "CONDITIONAL", "Conditional Component"
        CODE = "CODE", "Code Component"

    component_type = models.CharField(
        max_length=20,
        choices=ComponentType.choices,
        default=ComponentType.TELEGRAM,
        help_text="Type of the component",
    )

    def save(self, *args: list, **kwargs: dict) -> None:
        if self.pk is None:
            self.component_content_type = ContentType.objects.get(
                model=self.__class__.__name__.lower(),
            )
        super().save(*args, **kwargs)

    component_content_type = models.ForeignKey(
        ContentType,
        on_delete=models.CASCADE,
        null=True,
        blank=True,
    )
    component_name = models.CharField(
        max_length=255,
        null=True,
    )  # in order to not interfere with some component 'name' field, I added redundant 'component'

    bot = models.ForeignKey("bot.Bot", on_delete=models.CASCADE)

    previous_component = models.ForeignKey(
        "Component",
        on_delete=models.PROTECT,
        null=True,
        blank=True,
        related_name="next_component",
    )

    position_x = models.FloatField(null=False, blank=False)
    position_y = models.FloatField(null=False, blank=False)

    def __str__(self) -> str:
        return self.component_name or "Empty Component"

    class Meta:
        pass

    @property
    def code_function_name(self) -> str:  # -> name of the function in generated code
        return f"{self.__class__.__name__.lower()}_{self.pk}"

    def generate_code(self) -> str:
        if self.component_type != Component.ComponentType.TELEGRAM:
            raise NotImplementedError

        underlying_object = self.component_content_type.model_class().objects.get(
            pk=self.pk,
        )
        class_name = underlying_object.__class__.__name__
        method = ""
        for c in class_name:
            if c.isupper():
                method += "_"
            method += c.lower()
        method = method.lstrip("_")

        code = [f"async def {underlying_object.code_function_name}(input_data: dict):"]
        if underlying_object.content_type:
            keyboard = underlying_object.content_type.get_object_for_this_type()
        else:
            keyboard = None

        if isinstance(keyboard, InlineKeyboardMarkup):
            code.extend(["    builder = InlineKeyboardBuilder()"])
            for k in keyboard.inline_keyboard.all():
                code.extend(
                    [
                        f"    builder.button(text='{k.text}', callback_data='{k.callback_data}')",
                    ],
                )
            code.extend(["    keyboard = builder.as_markup()"])
        #     for k in keyboard.inline_keyboard.all():
        #         builder.button(text=k.text, callback_data=k.callback_data)
        #     keyboard = builder.as_markup()
        # else:
        #     print("KEYBOARD")

        component_data = model_to_dict(
            underlying_object,
            exclude=[
                "id",
                "component_ptr",
                "component_ptr_id",
                "timestamp",
                "object_id",
                "component_type",
                "content_type",
                "component_content_type",
                "bot",
                "previous_component",
                "position_x",
                "position_y",
            ],
        )
        param_strings = []
        for k, v in component_data.items():
            if v is not None:
                if isinstance(v, str):
                    param_strings.append(f"{k}='{v}'")
                else:
                    param_strings.append(f"{k}={v}")

        if keyboard:
            param_strings.append(f"reply_markup=keyboard")

        params_str = ", ".join(param_strings)
        code.extend(
            [
                f"    await bot.{method}({params_str})",
            ],
        )
        for next_component in underlying_object.next_component.all():
            next_component = (
                next_component.component_content_type.model_class().objects.get(
                    pk=next_component.pk,
                )
            )
            code.extend(
                [
                    f"    await {next_component.code_function_name}(input_data)",
                ],
            )
        return "\\n".join(code)

    def get_all_next_components(self) -> List["Component"]:
        ans = {}
        stack = [self]
        while stack:
            current = stack.pop()
            if current.id not in ans:
                ans[current.id] = current
                for next_component in current.next_component.all():
                    if next_component.id not in ans:
                        stack.append(next_component)
        return list(ans.values())



""")
    not_supported = []

    for i, method in enumerate(all_methods):
        if i < 2:
            continue
        name = method[0]
        comment = method[1]
        body = re.findall(table_body_pattern, method[2])
        file.write(f"class {name[0].upper()+name[1:]}(Component):\n")
        file.write(f"    \"\"\"{text_cleaner(comment)}\"\"\"\n\n")

        # print(f"class {name[0].upper()+name[1:]}(TelegramComponent):")
        for item in body:
            type_field = text_cleaner(item[1])
            django_field = ""
            keyboard_field = ""

            if item[2] == 'Optional' or item[0] == 'thumbnail':
                django_param = "null=True, blank=True,"
            elif item[2] == 'Yes':
                django_param = "null=False, blank=False,"

            match = re.search(max_char_pattern, text_cleaner(item[3]))
            if match:
                extracted_number = int(match.group(1))  # Convert to integer
                django_param += f" max_length = {extracted_number},"

            django_param += f" help_text=\"{text_cleaner(item[3])}\""

            if type_field == 'String' or type_field == 'Integer or String':
                django_field = f" = models.CharField({django_param})"
            elif type_field == 'Boolean':
                django_field = f" = models.BooleanField({django_param})"
            elif type_field == 'Integer':
                django_field = f" = models.IntegerField({django_param})"
            elif type_field == 'Array of Integer':
                django_field = f"= ArrayField(models.IntegerField(), default=list, {django_param})"
            elif type_field == 'Array of String':
                django_field = f"= ArrayField(models.CharField(), default=list, {django_param})"
            elif type_field == 'Float':
                django_field = f" = models.FloatField({django_param})"
            elif type_field == 'InputFile or String':
                django_field = f" =  models.FileField(upload_to=\"{item[0]}/\", {django_param})"
            elif type_field == 'InlineKeyboardMarkup or ReplyKeyboardMarkup or ReplyKeyboardRemove or ForceReply':
                keyboard_field = f"    content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE, limit_choices_to=Q(model='inlinekeyboardmarkup') | Q(model='replykeyboardmarkup') | Q(model='replykeyboardremove') | Q(model='forcereply'), null=True, blank=True)"
                keyboard_field += f"\n    object_id = models.PositiveIntegerField(null=True, blank=True)"
                keyboard_field += f"\n    related_to_main = GenericForeignKey(\"content_type\", \"object_id\")"
            elif type_field == 'InlineKeyboardMarkup':
                keyboard_field = f"    content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE, limit_choices_to=Q(model='inlinekeyboardmarkup'), null=True, blank=True)"
                keyboard_field += f"\n    object_id = models.PositiveIntegerField(null=True, blank=True)"
                keyboard_field += f"\n    related_to_main = GenericForeignKey(\"content_type\", \"object_id\")"
            else:
                if type_field not in not_supported:
                    not_supported.append(type_field)
                
            if django_field:
                file.write(f"    {item[0]}{django_field}\n")
            if keyboard_field:
                file.write(f"{keyboard_field}\n")

        file.write("\n\n")
file.close()
print(f"not supported: {len(not_supported)}")
print(not_supported)

not supported: 26
['Array of MessageEntity', 'LinkPreviewOptions', 'ReplyParameters', 'Array of InputPaidMedia', 'Array of InputMediaAudio, InputMediaDocument, InputMediaPhoto and InputMediaVideo', 'Array of InputPollOption', 'Array of ReactionType', 'ChatPermissions', 'InputFile', 'Array of BotCommand', 'BotCommandScope', 'MenuButton', 'ChatAdministratorRights', 'InputMedia', 'InputProfilePhoto', 'AcceptedGiftTypes', 'InputStoryContent', 'Array of StoryArea', 'Array of InputSticker', 'InputSticker', 'MaskPosition', 'Array of InlineQueryResult', 'InlineQueryResultsButton', 'InlineQueryResult', 'Array of LabeledPrice', 'Array of ShippingOption']


In [36]:
with open('../component/telegram/serializers.py', 'w') as file:
    file.write("""from rest_framework import serializers
from component.telegram.models import *


class ModelSerializerCustom(serializers.ModelSerializer):
    def create(self, validated_data: dict) -> Component:
        validated_data["bot_id"] = self.context.get("bot")
        return super().create(validated_data)
               

""")
    for i, method in enumerate(all_methods):
        if i < 2:
            continue
        name = method[0]
        comment = method[1]
        file.write(f"class {name[0].upper()+name[1:]}Serializer(ModelSerializerCustom):\n")
        file.write(f"    class Meta:\n")
        file.write(f"        model = {name[0].upper()+name[1:]}\n")
        # file.write(f"        depth = 1\n")
        file.write(f"        exclude = [\'component_type\', \'bot\', \'component_content_type\']\n")
        file.write("\n\n")
file.close()

In [39]:
with open('../component/telegram/views.py', 'w') as file:
    file.write("""from rest_framework.viewsets import ModelViewSet
from component.telegram.serializers import *
from iam.permissions import IsLoginedPermission
from bot.permissions import IsBotOwner
from django.db.models import QuerySet


class ModelViewSetCustom(ModelViewSet):
    def get_serializer_context(self) -> dict:
        context = super().get_serializer_context()
        context["bot"] = self.kwargs.get("bot")
        return context
               
    def get_queryset(self) -> QuerySet:
        return self.get_queryset().filter(bot=self.kwargs.get("bot"))
               
               
""")
    # def get_queryset(self) -> QuerySet:
    #     return Flow.objects.filter(bot=self.kwargs.get("bot"))

    # def get_serializer_context(self) -> dict:
    #     context = super().get_serializer_context()
    #     context["bot"] = self.kwargs.get("bot")
    #     return context
    for i, method in enumerate(all_methods):
        if i < 2:
            continue
        name = method[0]
        comment = method[1]
        correct_name = name[0].upper()+name[1:]
        file.write(f"class {correct_name}ViewSet(ModelViewSetCustom):\n")
        file.write(f"    permission_classes = [IsLoginedPermission, IsBotOwner]\n")
        file.write(f"    serializer_class = {correct_name}Serializer\n")
        file.write(f"    queryset = {correct_name}.objects.all()\n")
        # file.write("\n")
        # file.write(f"    def get_queryset(self) -> QuerySet:\n")
        # file.write(f"        return {correct_name}.objects.filter(bot=self.kwargs.get('bot'))\n")
        # file.write("\n")
        # file.write(f"    def get_serializer_context(self) -> dict:\n")
        # file.write(f"        context = super().get_serializer_context()\n")
        # file.write(f"        context['bot'] = self.kwargs.get('bot')\n")
        # file.write(f"        return context\n")
        file.write("\n\n")


file.close()

In [66]:
with open('../component/telegram/urls.py', 'w') as file:
    file.write("""from django.urls import include, path
from rest_framework.routers import DefaultRouter

from component.telegram.views import *
router = DefaultRouter()

               
""")
    for i, method in enumerate(all_methods):
        if i < 2:
            continue
        name = method[0]
        comment = method[1]
        correct_name = name[0].upper()+name[1:]
        under_lined_name = ""
        for char in name:
            if char.isupper():
                under_lined_name += "-"+char.lower()
            else:
                under_lined_name += char

        file.write(f"router.register(r\"{under_lined_name}\", {correct_name}ViewSet)\n")

    file.write("""

urlpatterns = [
    path("", include(router.urls)),
]
""")
file.close()

In [14]:
!python -m black ../component/telegram/*

/home/ali/.local/bin/Cursor-0.48.8-x86_64.AppImage: No module named black


In [17]:
!pre-commit run --files component/telegram/*

zsh:1: no matches found: component/telegram/*
