In [None]:

# bot with chat model, voice transcription, and image OCR

'''# System deps (Ubuntu/Debian/Colab)
sudo apt-get update -y && sudo apt-get install -y tesseract-ocr ffmpeg

# Python deps
python -m pip install -U pip
python -m pip install \
  accelerate==0.27.1 \
  transformers==4.38.0 \
  torch \
  pyTelegramBotAPI \
  SpeechRecognition \
  pydub \
  pillow \
  pytesseract '''


import os
import io
import torch
import telebot
import speech_recognition as sr
from pydub import AudioSegment
from PIL import Image, ImageEnhance, ImageFilter

import transformers
from transformers import AutoTokenizer, pipeline as hf_pipeline

import pytesseract
from pytesseract import TesseractError

# -------------------------------
# Configuration
# -------------------------------
# Choose a default chat-tuned model (change if desired)
# Other good options (if license accepted and available):
#   - "microsoft/Phi-3-mini-4k-instruct"
#   - "google/gemma-2-2b-it"
MODEL_ID = os.getenv("MODEL_ID", "Qwen/Qwen2.5-3B-Instruct")

# Tokens from environment variables
HF_TOKEN = os.getenv("HF_TOKEN", None)
TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN", "YOUR_BOT_TOKEN")

# Device and dtype auto-selection
use_cuda = torch.cuda.is_available()
device = "cuda" if use_cuda else "cpu"
# Prefer float16 on pre-Ampere GPUs (e.g., T4), bfloat16 on Ampere+ if available
if use_cuda:
    major_cc, _ = torch.cuda.get_device_capability(0)
    torch_dtype = torch.float16 if major_cc < 8 else torch.bfloat16
else:
    torch_dtype = torch.float32

# Optional: set tesseract path if needed (comment out if auto-detected works)
# Common path on Linux: /usr/bin/tesseract
try:
    if not pytesseract.pytesseract.tesseract_cmd:
        pytesseract.pytesseract.tesseract_cmd = "/usr/bin/tesseract"
except Exception:
    pytesseract.pytesseract.tesseract_cmd = "/usr/bin/tesseract"

# -------------------------------
# Load tokenizer and pipeline
# -------------------------------
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, token=HF_TOKEN)
text_gen = hf_pipeline(
    "text-generation",
    model=MODEL_ID,
    model_kwargs={"torch_dtype": torch_dtype},
    device=device,
    token=HF_TOKEN,
)

# -------------------------------
# Core generation helper
# -------------------------------
def generate_response(text: str) -> str:
    messages = [{"role": "user", "content": str(text)}]
    # Build a chat-formatted prompt using the model's chat template
    prompt = text_gen.tokenizer.apply_chat_template(
        messages, tokenize=False, add_generation_prompt=True
    )
    outputs = text_gen(
        prompt,
        max_new_tokens=256,
        do_sample=True,
        temperature=0.7,
        top_k=50,
        top_p=0.95,
    )
    # Extract generated text after the prompt
    response = outputs["generated_text"][len(prompt):]
    return response.strip()

# -------------------------------
# Audio transcription helper
# -------------------------------
def audio2text(input_file: str) -> str:
    try:
        # Requires ffmpeg available in PATH for pydub (install system-wide)
        audio = AudioSegment.from_file(input_file)
        converted = audio.export(format="wav")
        with io.BytesIO() as byte_io:
            byte_io.write(converted.read())
            byte_io.seek(0)
            r = sr.Recognizer()
            with sr.AudioFile(byte_io) as source:
                audio_data = r.record(source)
            text = r.recognize_google(audio_data)
            return text
    except sr.UnknownValueError:
        return "Error"
    except Exception:
        return "Error"

# -------------------------------
# OCR helper
# -------------------------------
def handwriting_ocr(image_path: str) -> str:
    try:
        with Image.open(image_path) as img:
            enhanced = ImageEnhance.Contrast(img).enhance(2.0)
            preprocessed = enhanced.filter(ImageFilter.SHARPEN)
            text = pytesseract.image_to_string(preprocessed)
            return text.strip()
    except TesseractError:
        return "Error"
    except Exception:
        return "Error"

# -------------------------------
# Telegram bot handlers
# -------------------------------
bot = telebot.TeleBot(TELEGRAM_BOT_TOKEN)

@bot.message_handler(commands=["start"])
def send_welcome(message):
    bot.reply_to(message, "Welcome! Send text, voice, or an image to get started.")

@bot.message_handler(func=lambda message: True, content_types=["text"])
def handle_text(message):
    reply = generate_response(message.text)
    bot.reply_to(message, reply)

@bot.message_handler(content_types=["voice"])
def handle_voice(message):
    try:
        file_info = bot.get_file(message.voice.file_id)
        downloaded = bot.download_file(file_info.file_path)
        local_path = "voice_message.ogg"
        with open(local_path, "wb") as f:
            f.write(downloaded)
        recognized_text = audio2text(local_path)
        if recognized_text == "Error":
            bot.reply_to(message, "Sorry, could not understand the audio.")
        else:
            reply = generate_response(recognized_text)
            bot.reply_to(message, reply)
    except Exception:
        bot.reply_to(message, "Sorry, could not process the audio.")

@bot.message_handler(content_types=["photo"])
def handle_photo(message):
    try:
        caption = message.caption or ""
        file_id = message.photo[-1].file_id
        file_info = bot.get_file(file_id)
        downloaded = bot.download_file(file_info.file_path)
        local_path = "img_text.jpg"
        with open(local_path, "wb") as f:
            f.write(downloaded)
        img_text = handwriting_ocr(local_path)
        if img_text == "Error":
            bot.reply_to(message, "Sorry, could not understand the image.")
            return
        prompt_text = (img_text + " " + caption).strip()
        reply = generate_response(prompt_text)
        bot.reply_to(message, reply)
    except Exception:
        bot.reply_to(message, "Sorry, could not process the image.")

if __name__ == "__main__":
    print(f"Starting bot with model: {MODEL_ID} on device: {device} dtype: {torch_dtype}")
    bot.polling(none_stop=True)
