<a href="https://colab.research.google.com/github/vadigr123/colab_telegrams/blob/main/SDXL_bot.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Installing required packages
!pip install diffusers transformers accelerate telegram python-telegram-bot==20.3 realesrgan pillow --quiet
import os
import locale
locale.getpreferredencoding = lambda: "UTF-8"
%cd /content

import random
import time
import torch
import re
import asyncio
import nest_asyncio
import requests
import json
import urllib.parse
import threading
from diffusers import StableDiffusionXLPipeline, EulerAncestralDiscreteScheduler
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import ApplicationBuilder, ContextTypes, CommandHandler, MessageHandler, CallbackQueryHandler, filters
from io import BytesIO
from collections import deque, defaultdict
from PIL import Image


# Apply asyncio patch for nested loops
nest_asyncio.apply()

# Load the main Stable Diffusion XL model
print("Loading model...")
model_id = "vadigr123/IndigoFurryMixXL_EPS3"  # @param {type:"string"}
pipe = StableDiffusionXLPipeline.from_pretrained(model_id, torch_dtype=torch.float16).to("cuda")
pipe.scheduler = EulerAncestralDiscreteScheduler.from_config(pipe.scheduler.config)
print("Model loaded.")

# Generation parameters
DEFAULT_WIDTH, DEFAULT_HEIGHT = 1024, 1024
DEFAULT_STEPS = 30
SEED = -1

SIZE_PRESETS = {
    "square": (1024, 1024),
    "wide": (1216, 832),
    "tall": (832, 1216)
}

# Global variables for the queue
generation_queue = deque()
active_generations = defaultdict(int)
cancel_flags = defaultdict(bool)
max_user_jobs = 3
max_global_jobs = 5

# Global variable to track loaded LoRAs
loaded_loras = {}

# Global dictionary to store generation details and images
generation_details = {}
stored_images = {}

##############################################
# Functions for loading LoRA models
##############################################

def download_things(directory, url, hf_token="", civitai_api_key=""):
    url = url.strip()

    # Use better filename logic
    if "civitai.com" in url:
        try:
            # Extract model ID from URL for better filename
            parsed_url = urllib.parse.urlparse(url)
            if "/models/" in parsed_url.path:
                model_id = parsed_url.path.split("/models/")[-1].split("/")[0]
                filename = f"{model_id}.safetensors"
            else:
                # Try to get filename from Content-Disposition header
                response = requests.head(url + f"?token={civitai_api_key}" if civitai_api_key else url, timeout=10)
                content_disp = response.headers.get('Content-Disposition', '')
                if 'filename=' in content_disp:
                    filename = content_disp.split('filename=')[-1].strip('"')
                else:
                    filename = "model.safetensors"
        except:
            filename = "model.safetensors"
    else:
        filename = url.split("/")[-1].split("?")[0]
        if not filename or not filename.endswith('.safetensors'):
            filename = "model.safetensors"

    if not os.path.exists(directory):
        os.makedirs(directory)

    target_path = os.path.join(directory, filename)

    if os.path.exists(target_path):
        print(f"✅ File already exists: {target_path}")
        return target_path

    try:
        if "drive.google.com" in url:
            original_dir = os.getcwd()
            os.chdir(directory)
            result = os.system(f"gdown --fuzzy '{url}'")
            os.chdir(original_dir)
            if result != 0:
                raise Exception("Google Drive download failed")

        elif "huggingface.co" in url:
            if "/blob/" in url:
                url = url.replace("/blob/", "/resolve/")
            if "?download=true" in url:
                url = url.replace("?download=true", "")

            if hf_token:
                result = os.system(f"wget --header='Authorization: Bearer {hf_token}' -O '{target_path}' '{url}'")
            else:
                result = os.system(f"wget -O '{target_path}' '{url}'")

            if result != 0:
                raise Exception("HuggingFace download failed")

        elif "civitai.com" in url:
            if not civitai_api_key:
                raise Exception("Civitai API key is required")

            # Add API key to URL
            if "?" in url:
                download_url = f"{url}&token={civitai_api_key}"
            else:
                download_url = f"{url}?token={civitai_api_key}"

            result = os.system(f"wget -O '{target_path}' '{download_url}'")
            if result != 0:
                raise Exception("Civitai download failed - check your API key")

        else:
            result = os.system(f"wget -O '{target_path}' '{url}'")
            if result != 0:
                raise Exception("Download failed")

        # Verify file was downloaded
        if not os.path.exists(target_path) or os.path.getsize(target_path) == 0:
            raise Exception("Downloaded file is empty or doesn't exist")

        print(f"✅ Successfully downloaded: {target_path}")
        return target_path

    except Exception as e:
        # Clean up failed download
        if os.path.exists(target_path):
            os.remove(target_path)
        raise e

def get_model_list(directory_path):
    """Get list of all files in directory"""
    model_list = []
    if not os.path.exists(directory_path):
        return model_list

    for filename in os.listdir(directory_path):
        file_path = os.path.join(directory_path, filename)
        # Skip directories and very small files
        if os.path.isfile(file_path) and os.path.getsize(file_path) > 1024:
            # Use original filename without extension as name
            name_without_extension = os.path.splitext(filename)[0]
            model_list.append((name_without_extension, file_path))
    return model_list

def apply_lora(pipe, lora_name, weight=1.0):
    """Load and apply LoRA to the pipeline"""
    global lora_model_list, loaded_loras

    # Find matching LoRA
    matching = [model for model in lora_model_list if model[0].lower() == lora_name.lower()]
    if not matching or matching[0][1] is None:
        print(f"❌ LoRA model '{lora_name}' not found")
        return False

    lora_path = matching[0][1]
    lora_key = f"{lora_name}_{weight}"

    try:
        # If this exact LoRA+weight combo is already loaded, skip
        if lora_key in loaded_loras:
            print(f"✅ LoRA {lora_name} (weight: {weight}) already loaded")
            return True

        # Unload any existing LoRAs first
        if hasattr(pipe, 'unload_lora_weights'):
            pipe.unload_lora_weights()
            loaded_loras.clear()

        # Load the LoRA
        pipe.load_lora_weights(lora_path, adapter_name=lora_name)

        # Set the weight
        if hasattr(pipe, 'set_adapters'):
            pipe.set_adapters(lora_name, adapter_weights=[weight])

        loaded_loras[lora_key] = lora_path
        print(f"✅ Applied LoRA model: {lora_name} (weight: {weight})")
        return True

    except Exception as e:
        print(f"❌ Error applying LoRA {lora_name}: {str(e)}")
        return False

def clear_loras(pipe):
    """Clear all loaded LoRAs"""
    global loaded_loras
    try:
        if hasattr(pipe, 'unload_lora_weights'):
            pipe.unload_lora_weights()
        loaded_loras.clear()
        print("🧹 Cleared all LoRAs")
        return True
    except Exception as e:
        print(f"❌ Error clearing LoRAs: {str(e)}")
        return False

def upscale_image(image, scale_factor=2):
    """Simple upscaling using PIL's LANCZOS resampling"""
    try:
        width, height = image.size
        new_width = int(width * scale_factor)
        new_height = int(height * scale_factor)

        # Use LANCZOS for high-quality upscaling
        upscaled = image.resize((new_width, new_height), Image.Resampling.LANCZOS)
        return upscaled
    except Exception as e:
        print(f"❌ Upscaling error: {e}")
        return None

def cleanup_old_details():
    """Clean up old generation details every hour"""
    while True:
        time.sleep(3600)  # Wait 1 hour
        current_time = time.time()
        keys_to_remove = []

        for key in generation_details:
            # Extract timestamp from key
            try:
                timestamp = int(key.split('_')[-1])
                if current_time - timestamp > 86400:  # 24 hours
                    keys_to_remove.append(key)
            except:
                continue

        for key in keys_to_remove:
            del generation_details[key]
            # Also clean up stored images
            if key in stored_images:
                del stored_images[key]

# Folder for LoRA models
directory_loras = 'loras'
os.makedirs(directory_loras, exist_ok=True)

# Configuration
download_lora = ""  # @param {type:"string"}
CIVITAI_API_KEY = ""  # @param {type:"string"}
hf_token = ""  # @param {type:"string"}

# Download any specified LoRAs
for url in [url.strip() for url in download_lora.split(',') if url.strip()]:
    try:
        downloaded_path = download_things(directory_loras, url, hf_token, CIVITAI_API_KEY)
    except Exception as e:
        print(f"❌ Failed to download {url}: {e}")

# Update list of LoRA models
lora_model_list = get_model_list(directory_loras)
lora_model_list.insert(0, ("None", None))
print(f"\033[33m🏁 Found {len(lora_model_list)-1} LoRA models.\033[0m")

# Start cleanup thread
cleanup_thread = threading.Thread(target=cleanup_old_details, daemon=True)
cleanup_thread.start()

##############################################
# Telegram bot – Commands
##############################################

async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
    context.user_data.setdefault("width", DEFAULT_WIDTH)
    context.user_data.setdefault("height", DEFAULT_HEIGHT)
    context.user_data.setdefault("steps", DEFAULT_STEPS)
    await update.message.reply_text(
        "Hello! Send a description for generation.\n\n"
        "📌 /steps 30 — set number of steps (30–50)\n"
        "📌 /size — choose image size\n"
        "📌 /cancel — cancel the current generation\n"
        "📌 /lora {url} — download a LoRA model\n"
        "📌 /loras — show list of LoRA models\n"
        "📌 /clear_loras — clear all loaded LoRAs\n"
        "Also use <lora:name> or <lora:name:weight> in your prompt to apply LoRA."
    )

async def size_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
    keyboard = [
        [InlineKeyboardButton("🟦 1024x1024", callback_data="size_square")],
        [InlineKeyboardButton("🟥 1216x832", callback_data="size_wide")],
        [InlineKeyboardButton("🟩 832x1216", callback_data="size_tall")]
    ]
    await update.message.reply_text("Choose a size:", reply_markup=InlineKeyboardMarkup(keyboard))

async def steps_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
    args = context.args
    if not args or not args[0].isdigit():
        await update.message.reply_text("Enter a number between 30 and 50, e.g.: /steps 40")
        return
    steps = int(args[0])
    if 30 <= steps <= 50:
        context.user_data["steps"] = steps
        await update.message.reply_text(f"✅ Steps set to: {steps}")
    else:
        await update.message.reply_text("🚫 Only between 30 and 50 steps are allowed.")

async def button_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
    query = update.callback_query
    await query.answer()
    data = query.data

    if data.startswith("size_"):
        size_key = data.split("_")[1]
        width, height = SIZE_PRESETS[size_key]
        context.user_data["width"] = width
        context.user_data["height"] = height
        await query.edit_message_text(f"✅ Size set to: {width}x{height}")

    elif data.startswith("details_"):
        message_id = data.split("details_")[1]
        if message_id in generation_details:
            details = generation_details[message_id]

            # Create detailed message
            detail_text = f"📋 **Generation Details**\n\n"
            detail_text += f"**Prompt:** {details['prompt']}\n\n"
            detail_text += f"**Seed:** {details['seed']}\n"
            detail_text += f"**Dimensions:** {details['width']}x{details['height']}\n"
            detail_text += f"**Steps:** {details['steps']}\n"

            if details['loras']:
                detail_text += f"**LoRA Models:**\n"
                for lora in details['loras']:
                    detail_text += f"• {lora}\n"
            else:
                detail_text += f"**LoRA Models:** None\n"

            # Send details as a new message
            await query.message.reply_text(detail_text, parse_mode='Markdown')
        else:
            await query.message.reply_text("❌ Details not found (may have expired)")

    elif data.startswith("upscale_"):
        message_id = data.split("upscale_")[1]
        if message_id in stored_images:
            try:
                # Show progress message
                progress_msg = await query.message.reply_text("🔍 Upscaling image...")

                # Get the stored image
                original_image = stored_images[message_id]

                # Upscale the image (2x by default)
                upscaled_image = upscale_image(original_image, scale_factor=2)

                if upscaled_image:
                    # Convert to bytes
                    bio = BytesIO()
                    bio.name = "upscaled_image.png"
                    upscaled_image.save(bio, "PNG", optimize=True)
                    bio.seek(0)

                    # Get new dimensions
                    new_width, new_height = upscaled_image.size

                    # Delete progress message
                    await progress_msg.delete()

                    # Send upscaled image
                    await query.message.reply_document(
                        document=bio,
                        filename="upscaled_image.png",
                        caption=f"🔍 Upscaled to {new_width}x{new_height} (2x scale)"
                    )
                else:
                    await progress_msg.edit_text("❌ Failed to upscale image")
            except Exception as e:
                await query.message.reply_text(f"❌ Upscaling error: {str(e)}")
        else:
            await query.message.reply_text("❌ Original image not found (may have expired)")

    elif data.startswith("random_"):
        message_id = data.split("random_")[1]
        if message_id in generation_details:
            details = generation_details[message_id]
            user_id = query.from_user.id

            # Check user limits
            if active_generations[user_id] >= max_user_jobs:
                await query.message.reply_text("🚫 You already have 3 active requests.")
                return
            if len(generation_queue) >= max_global_jobs:
                await query.message.reply_text("🚫 Queue is full. Try later.")
                return

            # Reconstruct prompt with LoRA tags
            original_prompt = details['prompt']
            if details['loras']:
                # Add LoRA tags back to prompt
                lora_tags = ""
                for lora_info in details['loras']:
                    # Extract name and weight from "name (weight: X.X)" format
                    if " (weight: " in lora_info:
                        name = lora_info.split(" (weight: ")[0]
                        weight = lora_info.split(" (weight: ")[1].rstrip(")")
                        if weight == "1.0":
                            lora_tags += f"<lora:{name}> "
                        else:
                            lora_tags += f"<lora:{name}:{weight}> "
                    else:
                        lora_tags += f"<lora:{lora_info}> "

                original_prompt = lora_tags + original_prompt

            # Create a fake update and context for generation
            fake_message = type('obj', (object,), {
                'text': original_prompt,
                'from_user': query.from_user,
                'reply_text': query.message.reply_text,
                'reply_photo': query.message.reply_photo,
                'reply_document': query.message.reply_document
            })
            fake_update = type('obj', (object,), {
                'message': fake_message,
                'effective_message': query.message
            })

            # Set user data to match original generation
            context.user_data["width"] = details['width']
            context.user_data["height"] = details['height']
            context.user_data["steps"] = details['steps']

            # Start generation with random seed
            await generate(fake_update, context, force_random_seed=True)
        else:
            await query.message.reply_text("❌ Generation data not found (may have expired)")

async def cancel_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
    user_id = update.message.from_user.id if update.message else None
    if user_id:
        cancel_flags[user_id] = True
        await update.message.reply_text("🚫 Cancellation request sent.")

async def clear_loras_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
    """Command to clear all loaded LoRAs"""
    if clear_loras(pipe):
        await update.message.reply_text("🧹 All LoRAs cleared successfully!")
    else:
        await update.message.reply_text("❌ Error clearing LoRAs")

async def lora_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
    args = context.args
    if not args:
        await update.message.reply_text(
            "Specify a URL to download a LoRA model, e.g.:\n"
            "/lora https://civitai.com/api/download/models/97655"
        )
        return

    url = args[0]
    status_msg = await update.message.reply_text("⏳ Downloading LoRA model...")

    try:
        # Validate URL
        if not any(domain in url for domain in ['civitai.com', 'huggingface.co', 'drive.google.com']):
            await status_msg.edit_text("❌ Unsupported URL. Only Civitai, HuggingFace, and Google Drive are supported.")
            return

        # Check if Civitai URL and API key is available
        if 'civitai.com' in url and not CIVITAI_API_KEY:
            await status_msg.edit_text("❌ Civitai API key is required for downloading from Civitai.")
            return

        # Download the model
        downloaded_path = download_things(directory_loras, url, hf_token, CIVITAI_API_KEY)

        # Update the global LoRA model list
        global lora_model_list
        lora_model_list = get_model_list(directory_loras)
        lora_model_list.insert(0, ("None", None))

        # Get the filename and size
        filename = os.path.basename(downloaded_path)
        lora_name = os.path.splitext(filename)[0]
        file_size = os.path.getsize(downloaded_path) / (1024 * 1024)  # Size in MB

        await status_msg.edit_text(
            f"✅ LoRA model downloaded successfully!\n"
            f"📁 File: {filename}\n"
            f"📏 Size: {file_size:.1f} MB\n"
            f"📝 Use: <lora:{lora_name}> in your prompt"
        )

    except Exception as e:
        await status_msg.edit_text(f"❌ Error downloading LoRA: {str(e)}")
        print(f"LoRA download error: {e}")

async def loras_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
    global lora_model_list
    if len(lora_model_list) <= 1:  # Only "None" entry
        await update.message.reply_text("📂 No LoRA models found.\n\nUse /lora {url} to download a LoRA model.")
        return

    message = "📋 Available LoRA models:\n\n"

    for name, path in lora_model_list:
        if name.lower() == "none":
            continue

        # Get file info
        file_size = 0
        if path and os.path.exists(path):
            file_size = os.path.getsize(path) / (1024 * 1024)  # Size in MB

        # Simple display without special characters that break Markdown
        message += f"• {name}\n"
        message += f"  Size: {file_size:.1f} MB\n"
        message += f"  Use: <lora:{name}> or <lora:{name}:0.8>\n\n"

    # Show currently loaded LoRAs
    if loaded_loras:
        message += "🔄 Currently loaded:\n"
        for lora_key in loaded_loras:
            message += f"• {lora_key}\n"
        message += "\n💡 Use /clear_loras to unload all LoRAs"

    # Split message if too long (Telegram limit is ~4096 characters)
    if len(message) > 4000:
        # Send in parts
        parts = message.split('\n\n')
        current_part = "📋 Available LoRA models:\n\n"

        for part in parts[1:]:  # Skip the header
            if len(current_part) + len(part) + 2 > 4000:  # +2 for \n\n
                await update.message.reply_text(current_part)
                current_part = part + "\n\n"
            else:
                current_part += part + "\n\n"

        if current_part.strip():
            await update.message.reply_text(current_part)
    else:
        await update.message.reply_text(message)

def create_progress_bar(progress):
    full = int(progress / 10)
    return f"[{'🟩'*full}{'⬜'*(10-full)}] {progress}%"

async def generate(update: Update, context: ContextTypes.DEFAULT_TYPE, force_random_seed=False):
    # Check for text in message
    if not update.message or not update.message.text:
        if update.effective_message:
            await update.effective_message.reply_text("❌ Error: No message text found!")
        return

    try:
        prompt = update.message.text.strip()
    except Exception as e:
        await update.message.reply_text(f"❌ Error retrieving prompt: {e}")
        return

    user_id = update.message.from_user.id
    if active_generations[user_id] >= max_user_jobs:
        await update.message.reply_text("🚫 You already have 3 active requests.")
        return
    if len(generation_queue) >= max_global_jobs:
        await update.message.reply_text("🚫 Queue is full. Try later.")
        return

    # Process LoRA tags: <lora:name> or <lora:name:weight>
    lora_matches = re.findall(r"<lora:([\w\-]+)(?::([\d.]+))?>", prompt, flags=re.IGNORECASE)
    applied_loras = []
    lora_errors = []

    if lora_matches:
        # Remove LoRA tags from prompt
        prompt = re.sub(r"<lora:[\w\-]+(?::[\d.]+)?>", "", prompt, flags=re.IGNORECASE).strip()

        # Clear existing LoRAs first
        clear_loras(pipe)

        # Apply each LoRA
        for name, weight in lora_matches:
            weight_val = float(weight) if weight else 1.0
            success = apply_lora(pipe, name, weight_val)
            if success:
                applied_loras.append(f"{name} (weight: {weight_val})")
            else:
                lora_errors.append(name)

    # Notify about LoRA errors
    if lora_errors:
        await update.message.reply_text(f"⚠️ LoRA models not found: {', '.join(lora_errors)}\n\nUse /loras to see available models.")

    # Add task to global queue
    task = (update, context, prompt)
    generation_queue.append(task)
    active_generations[user_id] += 1

    # Inform user of queue position with live updates
    position = list(generation_queue).index(task)
    eta = position * 15  # approximate wait time in seconds
    wait_msg = await update.message.reply_text(f"🕓 Waiting: [0/30] - position in queue: {position+1}, ETA ≈ {eta}s")
    counter = 0

    while position > 0:
        if cancel_flags[user_id]:
            generation_queue.remove(task)
            active_generations[user_id] -= 1
            cancel_flags[user_id] = False
            await wait_msg.edit_text("❌ Generation cancelled before start.")
            return

        # Update position
        try:
            position = list(generation_queue).index(task)
            eta = position * 15
        except ValueError:
            position = 0  # Task not in queue anymore

        counter = (counter % 30) + 1
        await wait_msg.edit_text(f"🕓 Waiting: [{counter}/30] - position in queue: {position+1}, ETA ≈ {eta}s")
        await asyncio.sleep(1)

    # Start generation with live progress bar
    try:
        width = context.user_data.get("width", DEFAULT_WIDTH)
        height = context.user_data.get("height", DEFAULT_HEIGHT)
        steps = context.user_data.get("steps", DEFAULT_STEPS)
        generator = torch.Generator("cuda")

        # Force random seed if requested, otherwise use configured seed
        if force_random_seed:
            seed = random.randint(0, 2**32 - 1)
        else:
            seed = SEED if SEED != -1 else random.randint(0, 2**32 - 1)

        generator.manual_seed(seed)

        progress_msg = await update.message.reply_text("🔧 Generation started...")
        for i in range(0, 101, 10):
            if cancel_flags[user_id]:
                await progress_msg.edit_text("❌ Generation cancelled.")
                if generation_queue and generation_queue[0] == task:
                    generation_queue.popleft()
                active_generations[user_id] -= 1
                cancel_flags[user_id] = False
                return
            await progress_msg.edit_text(create_progress_bar(i))
            await asyncio.sleep(1.5)

        # Generate image
        result = pipe(prompt, width=width, height=height, num_inference_steps=steps, generator=generator, output_type="pil")
        img = result.images[0]
        bio = BytesIO()
        bio.name = "image.png"
        img.save(bio, "PNG")
        bio.seek(0)

        # Store generation details and image for buttons
        message_id = f"{user_id}_{int(time.time())}"
        generation_details[message_id] = {
            'prompt': prompt,
            'seed': seed,
            'width': width,
            'height': height,
            'steps': steps,
            'loras': applied_loras
        }
        # Store the PIL image for upscaling
        stored_images[message_id] = img.copy()

        # Create buttons: "Show details", "Random seed", and "Upscale"
        keyboard = [
            [
                InlineKeyboardButton("📋 Details", callback_data=f"details_{message_id}"),
                InlineKeyboardButton("🎲 Random", callback_data=f"random_{message_id}"),
                InlineKeyboardButton("🔍 Upscale", callback_data=f"upscale_{message_id}")
            ]
        ]
        reply_markup = InlineKeyboardMarkup(keyboard)

        # Create short caption
        lora_info = f" • LoRA: {len(applied_loras)} applied" if applied_loras else ""
        caption = f"🌱 Seed: {seed} • 📏 {width}x{height} • 🧮 Steps: {steps}{lora_info}"

        # Delete progress message
        await progress_msg.delete()

        # Send preview image with buttons
        await update.message.reply_photo(
            photo=bio,
            caption=caption,
            reply_markup=reply_markup
        )

        # Rewind and send as document for quality
        bio.seek(0)
        await update.message.reply_document(
            document=bio,
            filename="image.png",
            caption="High-quality image file"
        )

    except Exception as e:
        await update.message.reply_text(f"❌ Error during generation: {e}")
        print(f"Generation error: {e}")
    finally:
        if generation_queue and generation_queue[0] == task:
            generation_queue.popleft()
        active_generations[user_id] -= 1
        cancel_flags[user_id] = False

##############################################
# Run Telegram bot
##############################################

TOKEN = "" # @param {type:"string"}

async def main():
    TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_TOKEN", TOKEN)
    app = ApplicationBuilder().token(TELEGRAM_BOT_TOKEN).build()

    app.add_handler(CommandHandler("start", start))
    app.add_handler(CommandHandler("size", size_command))
    app.add_handler(CommandHandler("steps", steps_command))
    app.add_handler(CommandHandler("cancel", cancel_command))
    app.add_handler(CommandHandler("lora", lora_command))
    app.add_handler(CommandHandler("loras", loras_command))
    app.add_handler(CommandHandler("clear_loras", clear_loras_command))
    app.add_handler(CallbackQueryHandler(button_handler))
    app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, generate))

    await app.initialize()
    await app.start()
    print("✅ Bot started")
    await app.updater.start_polling()
    await asyncio.Event().wait()

nest_asyncio.apply()
await main()

/content
Loading model...


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Loading pipeline components...:   0%|          | 0/7 [00:00<?, ?it/s]

Model loaded.
[33m🏁 Found 0 LoRA models.[0m
✅ Bot started
✅ Successfully downloaded: loras/lora_1610098.safetensors
🧹 Cleared all LoRAs
✅ Applied LoRA model: 1610098 (weight: 1.0) from loras/lora_1610098.safetensors


  0%|          | 0/30 [00:00<?, ?it/s]

🧹 Cleared all LoRAs
❌ LoRA model 'poyo' not found


  0%|          | 0/30 [00:00<?, ?it/s]