-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
.vscode | ||
This comment has been minimized.
Sorry, something went wrong.
This comment has been minimized.
Sorry, something went wrong. |
||
.idea | ||
This comment has been minimized.
Sorry, something went wrong.
This comment has been minimized.
Sorry, something went wrong. |
||
__pycache__ | ||
*.session |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
import asyncio | ||
import logging | ||
import importlib | ||
import uuid | ||
import traceback | ||
|
||
from telethon import TelegramClient | ||
from telethon import events | ||
from telethon.tl.types import DocumentAttributeSticker | ||
from telethon.tl.functions.messages import GetStickerSetRequest | ||
|
||
from util import StatusMessage, Job, find_instance | ||
import proxy | ||
|
||
|
||
proxy.JOB_MODULES = JOB_MODULES = {} | ||
|
||
|
||
logging.basicConfig(level=logging.INFO) | ||
logger = logging.getLogger('main') | ||
proxy.client = client = TelegramClient("bot", 6, "eb06d4abfb49dc3eeb1aeb98ae0f581e").start() | ||
This comment has been minimized.
Sorry, something went wrong.
Lonami
Contributor
|
||
proxy.me = me = asyncio.get_event_loop().run_until_complete(client.get_me()) | ||
|
||
|
||
@client.on(events.NewMessage(pattern=r'/start')) | ||
async def on_start(event): | ||
await event.respond('Send a sticker.') | ||
|
||
|
||
@client.on(events.NewMessage) | ||
async def on_message(event): | ||
if not event.message.sticker: | ||
return | ||
sticker = event.message.sticker | ||
sticker_attrib = find_instance(sticker.attributes, DocumentAttributeSticker) | ||
This comment has been minimized.
Sorry, something went wrong.
Lonami
Contributor
|
||
if not sticker_attrib.stickerset: | ||
await event.reply('That sticker is not part of a pack') | ||
return | ||
|
||
sticker_set = await client(GetStickerSetRequest(sticker_attrib.stickerset)) | ||
|
||
status = StatusMessage(await event.reply('Pending')) | ||
job = Job( | ||
id=uuid.uuid1(), | ||
owner=await event.get_input_sender(), | ||
event=event, | ||
status=status, | ||
sticker_set=sticker_set | ||
) | ||
logger.info( | ||
'[%s] User %s requested sticker set %s', | ||
job.id, | ||
(await event.get_input_sender()).user_id, | ||
This comment has been minimized.
Sorry, something went wrong.
This comment has been minimized.
Sorry, something went wrong. |
||
sticker_set.set.short_name | ||
) | ||
|
||
await status.update('Waiting for download slot.') | ||
await JOB_MODULES['downloader'].queue.put(job) | ||
await status.update('Queued for download.') | ||
This comment has been minimized.
Sorry, something went wrong.
Lonami
Contributor
|
||
|
||
|
||
async def job_runner(mod): | ||
while 1: | ||
try: | ||
await mod.run_job(await mod.queue.get()) | ||
except Exception as e: | ||
logger.log( | ||
This comment has been minimized.
Sorry, something went wrong.
Lonami
Contributor
|
||
'Exception on job runner for %s\n%s', | ||
mod.__name__, | ||
traceback.format_exc() | ||
) | ||
|
||
|
||
def load_handler_module(name): | ||
proxy.logger = logging.getLogger(name) | ||
mod = importlib.import_module(name) | ||
asyncio.ensure_future(job_runner(mod)) | ||
This comment has been minimized.
Sorry, something went wrong.
Lonami
Contributor
|
||
return mod | ||
|
||
|
||
for name in ('downloader', 'rescaler', 'uploader'): | ||
JOB_MODULES[name] = load_handler_module(name) | ||
|
||
client.run_until_disconnected() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
""" | ||
Handles downloading sticker packs to memory | ||
Based on my gist here: https://gist.github.com/udf/e4e3dbb2e831c8b580d8fddd312714f7 | ||
""" | ||
import asyncio | ||
from io import BytesIO | ||
from collections import defaultdict | ||
|
||
from util import Job, Sticker | ||
from proxy import client, logger, JOB_MODULES | ||
|
||
|
||
queue = asyncio.Queue(5) | ||
|
||
|
||
async def run_job(job: Job): | ||
logger.info(f'[{job.id}] Running download job') | ||
await job.status.update('Starting download...') | ||
|
||
sticker_set = job.sticker_set | ||
|
||
# Sticker emojis are retrieved as a mapping of | ||
# <emoji>: <list of document ids that have this emoji> | ||
# So we need to build a mapping of <document id>: <list of emoji> | ||
# Thanks, Durov | ||
This comment has been minimized.
Sorry, something went wrong.
This comment has been minimized.
Sorry, something went wrong. |
||
emojis = defaultdict(str) | ||
for pack in sticker_set.packs: | ||
for document_id in pack.documents: | ||
emojis[document_id] += pack.emoticon | ||
|
||
pending_tasks = [] | ||
stickers = [] | ||
for i, document in enumerate(sticker_set.documents): | ||
file = BytesIO() | ||
task = asyncio.ensure_future( | ||
This comment has been minimized.
Sorry, something went wrong.
This comment has been minimized.
Sorry, something went wrong. |
||
client.download_media(document, file=file) | ||
This comment has been minimized.
Sorry, something went wrong.
Lonami
Contributor
|
||
) | ||
pending_tasks.append(task) | ||
stickers.append(Sticker(file, emojis[document.id])) | ||
|
||
await job.status.update('Downloading...') | ||
await asyncio.wait(pending_tasks) | ||
|
||
errors = '' | ||
for i, task in reversed(tuple(enumerate(pending_tasks))): | ||
exception = task.exception() | ||
if not exception: | ||
continue | ||
errors += f'#{i}: {exception.__class__.__name__}\n' | ||
del stickers[i] | ||
|
||
if errors: | ||
await job.event.reply( | ||
f'Some errors occured when downloading these stickers:\n{errors}' | ||
"\nI'll continue to process the pack as normal, but the stickers " | ||
"which caused these errors won't be present in the new pack" | ||
) | ||
|
||
job.stickers = stickers | ||
logger.info(f'[{job.id}] Finished running download job') | ||
await job.status.update('Waiting for rescale slot...') | ||
await JOB_MODULES['rescaler'].queue.put(job) | ||
await job.status.update('Queued for rescaling.') |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
from logging import Logger | ||
from typing import Dict | ||
from types import ModuleType | ||
|
||
from telethon import TelegramClient | ||
from telethon.tl import types | ||
|
||
client: TelegramClient = None | ||
logger: Logger = None | ||
JOB_MODULES: Dict[str, ModuleType] = {} | ||
me: types.User = None |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
""" | ||
Handles spawning an executor to liquid rescale the stickers | ||
""" | ||
import asyncio | ||
import concurrent.futures | ||
from io import BytesIO | ||
|
||
from wand.image import Image | ||
|
||
from util import Job, Sticker | ||
from proxy import logger, JOB_MODULES | ||
|
||
queue = asyncio.Queue(3) | ||
executor = concurrent.futures.ThreadPoolExecutor() | ||
|
||
|
||
def crunch(sticker: Sticker): | ||
sticker.file.seek(0) | ||
with Image(file=sticker.file) as i: | ||
i.resize(width=i.width * 2, height=i.height * 2, filter='sinc') | ||
i.liquid_rescale(i.width // 2, i.height // 2, delta_x=1, rigidity=5) | ||
sticker.file = BytesIO() | ||
i.save(file=sticker.file) | ||
|
||
|
||
def do_crunch(loop, job: Job): | ||
futures = [executor.submit(crunch, sticker) for sticker in job.stickers] | ||
for i, future in enumerate(concurrent.futures.as_completed(futures)): | ||
asyncio.run_coroutine_threadsafe( | ||
job.status.update(f'Rescaled {i+1}/{len(futures)}', important=False), | ||
loop | ||
).result() | ||
|
||
|
||
async def run_job(job: Job): | ||
logger.info(f'[{job.id}] Running rescale job') | ||
await job.status.update('Crunching...') | ||
|
||
loop = asyncio.get_running_loop() | ||
await loop.run_in_executor(None, lambda: do_crunch(loop, job)) | ||
|
||
logger.info(f'[{job.id}] Finished running rescale job') | ||
await job.status.update('Waiting for upload slot...') | ||
await JOB_MODULES['uploader'].queue.put(job) | ||
await job.status.update('Queued for upload.') |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
""" | ||
Handles uploading the rescaled stickers and figuring out an unused url | ||
""" | ||
import asyncio | ||
import re | ||
|
||
from telethon import utils | ||
from telethon.errors import ShortnameOccupyFailedError, PackShortNameOccupiedError | ||
from telethon.tl import types | ||
from telethon.tl.functions.messages import UploadMediaRequest | ||
from telethon.tl.functions.stickers import CreateStickerSetRequest | ||
|
||
from util import Job, get_pack_url, get_rand_letters | ||
from proxy import client, logger, me | ||
|
||
|
||
queue = asyncio.Queue(5) | ||
re_by_us = re.compile(f'(?i)_[a-z]+_by_{me.username}$') | ||
re_by_bot = re.compile(r'(?i)_by_\w+bot$') | ||
|
||
|
||
async def run_job(job: Job): | ||
logger.info(f'[{job.id}] Running upload job') | ||
await job.status.update('Uploading...') | ||
stickers = [] | ||
for i, sticker in enumerate(job.stickers): | ||
sticker.file.seek(0) | ||
file = await client.upload_file(sticker.file, part_size_kb=512) | ||
file = types.InputMediaUploadedDocument(file, 'image/png', []) | ||
media = await client(UploadMediaRequest('me', file)) | ||
stickers.append(types.InputStickerSetItem( | ||
document=utils.get_input_document(media), | ||
emoji=sticker.emoji | ||
)) | ||
await job.status.update( | ||
f'Uploaded {i + 1}/{len(job.stickers)}', | ||
important=False | ||
) | ||
|
||
# Create | ||
id_len = 2 | ||
title = f'Distorted {job.sticker_set.set.title}'[:64] | ||
original_short_name = re_by_us.sub('', job.sticker_set.set.short_name) | ||
original_short_name = re_by_bot.sub('', original_short_name) | ||
while 1: | ||
try: | ||
suffix = f'_{get_rand_letters(id_len)}_by_{me.username}' | ||
short_name = f'{original_short_name}'[:62 - len(suffix)] + suffix | ||
logger.info('Trying to create pack %s', short_name) | ||
await client(CreateStickerSetRequest( | ||
user_id=await job.event.get_input_sender(), | ||
title=title, | ||
short_name=short_name, | ||
stickers=stickers | ||
)) | ||
break | ||
except (ShortnameOccupyFailedError, PackShortNameOccupiedError): | ||
id_len = min(7, id_len + 1) | ||
continue | ||
|
||
logger.info(f'[{job.id}] Finished running upload job') | ||
await job.event.reply('Done! ' + get_pack_url(short_name)) | ||
await job.status.message.delete() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
import time | ||
import random | ||
import string | ||
from io import BytesIO | ||
from dataclasses import dataclass | ||
from typing import List | ||
|
||
from telethon import events | ||
from telethon.tl import types | ||
|
||
|
||
class StatusMessage: | ||
def __init__(self, message): | ||
self.message = message | ||
self.last_edit = time.time() | ||
|
||
async def update(self, text, important=True): | ||
current_time = time.time() | ||
if not important and current_time - self.last_edit < 5: | ||
return | ||
self.last_edit = current_time | ||
await self.message.edit(text) | ||
|
||
|
||
@dataclass | ||
class Sticker: | ||
file: BytesIO | ||
emoji: str | ||
|
||
|
||
@dataclass | ||
class Job: | ||
# Common | ||
id: str | ||
owner: types.InputUser | ||
event: events.NewMessage | ||
status: StatusMessage | ||
|
||
# Downloader | ||
sticker_set: types.messages.StickerSet | ||
|
||
# Resizer, Uploader | ||
stickers: List[Sticker] = None | ||
|
||
|
||
def find_instance(items, class_or_tuple): | ||
for item in items: | ||
if isinstance(item, class_or_tuple): | ||
return item | ||
return None | ||
|
||
|
||
def get_pack_url(short_name): | ||
return f'https://t.me/addstickers/{short_name}' | ||
|
||
|
||
def get_rand_letters(k): | ||
return ''.join(random.choices(string.ascii_uppercase, k=k)) |
This belongs in your global
gitignore
.