In [None]:
# Install dependencies
!apt -y install -qq aria2

# Download model files
!aria2c --console-log-level=error -c -x 16 -s 16 -k 1M https://huggingface.co/Qwen/Qwen2.5-VL-7B-Instruct/raw/main/chat_template.json -d /content/Qwen2.5-VL-7B-Instruct -o chat_template.json
!aria2c --console-log-level=error -c -x 16 -s 16 -k 1M https://huggingface.co/Qwen/Qwen2.5-VL-7B-Instruct/raw/main/config.json -d /content/Qwen2.5-VL-7B-Instruct -o config.json
!aria2c --console-log-level=error -c -x 16 -s 16 -k 1M https://huggingface.co/Qwen/Qwen2.5-VL-7B-Instruct/raw/main/generation_config.json -d /content/Qwen2.5-VL-7B-Instruct -o generation_config.json
!aria2c --console-log-level=error -c -x 16 -s 16 -k 1M https://huggingface.co/Qwen/Qwen2.5-VL-7B-Instruct/raw/main/merges.txt -d /content/Qwen2.5-VL-7B-Instruct -o merges.txt
!aria2c --console-log-level=error -c -x 16 -s 16 -k 1M https://huggingface.co/Qwen/Qwen2.5-VL-7B-Instruct/resolve/main/model-00001-of-00005.safetensors -d /content/Qwen2.5-VL-7B-Instruct -o model-00001-of-00005.safetensors
!aria2c --console-log-level=error -c -x 16 -s 16 -k 1M https://huggingface.co/Qwen/Qwen2.5-VL-7B-Instruct/resolve/main/model-00002-of-00005.safetensors -d /content/Qwen2.5-VL-7B-Instruct -o model-00002-of-00005.safetensors
!aria2c --console-log-level=error -c -x 16 -s 16 -k 1M https://huggingface.co/Qwen/Qwen2.5-VL-7B-Instruct/resolve/main/model-00003-of-00005.safetensors -d /content/Qwen2.5-VL-7B-Instruct -o model-00003-of-00005.safetensors
!aria2c --console-log-level=error -c -x 16 -s 16 -k 1M https://huggingface.co/Qwen/Qwen2.5-VL-7B-Instruct/resolve/main/model-00004-of-00005.safetensors -d /content/Qwen2.5-VL-7B-Instruct -o model-00004-of-00005.safetensors
!aria2c --console-log-level=error -c -x 16 -s 16 -k 1M https://huggingface.co/Qwen/Qwen2.5-VL-7B-Instruct/resolve/main/model-00005-of-00005.safetensors -d /content/Qwen2.5-VL-7B-Instruct -o model-00005-of-00005.safetensors
!aria2c --console-log-level=error -c -x 16 -s 16 -k 1M https://huggingface.co/Qwen/Qwen2.5-VL-7B-Instruct/raw/main/model.safetensors.index.json -d /content/Qwen2.5-VL-7B-Instruct -o model.safetensors.index.json
!aria2c --console-log-level=error -c -x 16 -s 16 -k 1M https://huggingface.co/Qwen/Qwen2.5-VL-7B-Instruct/raw/main/preprocessor_config.json -d /content/Qwen2.5-VL-7B-Instruct -o preprocessor_config.json
!aria2c --console-log-level=error -c -x 16 -s 16 -k 1M https://huggingface.co/Qwen/Qwen2.5-VL-7B-Instruct/raw/main/tokenizer.json -d /content/Qwen2.5-VL-7B-Instruct -o tokenizer.json
!aria2c --console-log-level=error -c -x 16 -s 16 -k 1M https://huggingface.co/Qwen/Qwen2.5-VL-7B-Instruct/raw/main/tokenizer_config.json -d /content/Qwen2.5-VL-7B-Instruct -o tokenizer_config.json
!aria2c --console-log-level=error -c -x 16 -s 16 -k 1M https://huggingface.co/Qwen/Qwen2.5-VL-7B-Instruct/raw/main/vocab.json -d /content/Qwen2.5-VL-7B-Instruct -o vocab.json

# Install required Python packages
!pip install git+https://github.com/huggingface/transformers accelerate transformers-stream-generator==0.0.5 gradio==4.44.1 qwen-vl-utils pydantic bitsandbytes


In [None]:
import os
import gc
import re
import torch
import threading
from PIL import Image
import gradio as gr
from transformers import AutoProcessor, Qwen2_5_VLForConditionalGeneration, TextIteratorStreamer, BitsAndBytesConfig
import logging
from qwen_vl_utils import process_vision_info

os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"

quant_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_compute_dtype=torch.float16,
    bnb_4bit_use_double_quant=True,
    bnb_4bit_quant_type="nf4"
)

checkpoint_path = "/content/Qwen2.5-VL-7B-Instruct"
model = Qwen2_5_VLForConditionalGeneration.from_pretrained(
    checkpoint_path,
    quantization_config=quant_config,
    low_cpu_mem_usage=True
)
model.to("cuda" if torch.cuda.is_available() else "cpu")
model.eval()
processor = AutoProcessor.from_pretrained(checkpoint_path,trust_remote_code=True,use_fast=True)

def _gc():
    gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()

def _transform_messages(original_messages):
    transformed_messages = []
    for message in original_messages:
        new_content = []
        for item in message['content']:
            if 'image' in item and isinstance(item['image'], Image.Image):
                new_item = {'type': 'image', 'image': item['image']}
            elif 'text' in item and isinstance(item['text'], str):
                new_item = {'type': 'text', 'text': item['text']}
            else:
                continue
            new_content.append(new_item)
        transformed_messages.append({'role': message['role'], 'content': new_content})
    return transformed_messages

def call_local_model(model, processor, messages):
    messages = _transform_messages(messages)
    if not messages:
        return

    try:
        # Buat prompt chat dengan template yang disediakan oleh processor
        text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
        image_inputs, video_inputs = process_vision_info(messages)
        if not image_inputs:
            return

        if video_inputs is not None:
            inputs = processor(
                text=[text],
                images=image_inputs,
                videos=video_inputs,
                padding=True,
                return_tensors='pt'
            ).to(model.device)
        else:
            inputs = processor(
                text=[text],
                images=image_inputs,
                padding=True,
                return_tensors='pt'
            ).to(model.device)

        tokenizer = processor.tokenizer
        streamer = TextIteratorStreamer(tokenizer, timeout=20.0, skip_prompt=True, skip_special_tokens=True)

        gen_kwargs = {
            'max_new_tokens': 512,
            'streamer': streamer,
            **inputs
        }

        def generate():
            with torch.no_grad():
                try:
                    model.generate(**gen_kwargs)
                except Exception as e:
                    print(f"Error during generation: {str(e)}")
                    torch.cuda.empty_cache()
                    raise

        generation_thread = threading.Thread(target=generate)
        generation_thread.start()

        generated_text = ''
        for new_text in streamer:
            generated_text += new_text
            yield generated_text

        generation_thread.join()
        _gc()

    except Exception as e:
        print(f"Error in call_local_model: {str(e)}")
        _gc()
        return

def generate_image_keywords(image: Image.Image):
    try:
        if not isinstance(image, Image.Image):
            return "Error: Invalid image input", ""

        # Ubah prompt agar output eksplisit dengan format:
        prompt_text = (
            "Please analyze the image and generate exactly two lines of output in the following format:\n"
            "Title: <A descriptive title of about 10-15 words>\n"
            "Keywords: <Exactly 50 relevant keywords or short phrases, separated by commas>\n"
            "Focus on visual elements, mood, style, and subject matter."
        )

        messages = [
            {
                'role': 'user',
                'content': [
                    {'type': 'image', 'image': image},
                    {'type': 'text', 'text': prompt_text}
                ]
            }
        ]

        # Kumpulkan respons dari model secara streaming
        responses = list(call_local_model(model, processor, messages))
        if not responses:
            return "Error: No response from model", ""

        # Gunakan respons terakhir untuk diparsing
        text = responses[-1].strip()
        print("Raw model output:", text)  # Debug output

        # Parsing hasil generasi model menggunakan regex
        title = "No title generated"
        keywords = "No keywords generated"

        # Regex untuk mencari title dan keywords (lebih fleksibel)
        title_match = re.search(r"(?i)title:\s*\"?(.*?)\"?$", text, re.MULTILINE)
        keywords_match = re.search(r"(?i)keywords:\s*(.*)", text, re.MULTILINE)

        # Ambil title jika ditemukan
        if title_match:
            title_candidate = title_match.group(1).strip()
            if len(title_candidate.split()) >= 2:  # Validasi minimal 2 kata
                title = title_candidate
            elif title_candidate:  # Fallback jika hanya 1 kata
                title = title_candidate

        # Ambil keywords jika ditemukan
        if keywords_match:
            keywords_candidate = keywords_match.group(1).strip().strip('"')
            keyword_list = [k.strip() for k in keywords_candidate.split(",") if k.strip()]
            if keyword_list:
                keywords = ", ".join(keyword_list[:50])  # Batasi 50 keyword maksimal

        _gc()  # Garbage collector untuk efisiensi memori
        return title, keywords

    except Exception as e:
        print(f"Error in generate_image_keywords: {str(e)}")
        return f"Error processing image: {str(e)}", ""

copy_js = """
function copyToClipboard(text) {
    if (navigator.clipboard) {
        navigator.clipboard.writeText(text);
        const notification = document.createElement("div");
        notification.innerText = "Copied to clipboard!";
        notification.style.position = "fixed";
        notification.style.top = "20px";
        notification.style.right = "20px";
        notification.style.backgroundColor = "#4caf50";
        notification.style.color = "white";
        notification.style.padding = "10px";
        notification.style.borderRadius = "5px";
        notification.style.zIndex = 9999;
        document.body.appendChild(notification);
        setTimeout(() => { notification.remove(); }, 2000);
    }
}
"""

with gr.Blocks() as demo:
    with gr.Row():
        with gr.Column():
            gr.Markdown("# Image to Title & Keywords")
            image_input = gr.Image(label="Upload Image", type="pil", interactive=True)
            submit_btn = gr.Button("🚀 Generate", variant="primary")
        with gr.Column():
            gr.Markdown("# Results")
            title_output = gr.Textbox(label="Generated Title", lines=2, interactive=False)
            keywords_output = gr.Textbox(label="Generated Keywords", lines=5, interactive=False)
            with gr.Row():
                copy_title_btn = gr.Button("Copy Title", variant="secondary")
                copy_keywords_btn = gr.Button("Copy Keywords", variant="secondary")

    submit_btn.click(
        fn=generate_image_keywords,
        inputs=image_input,
        outputs=[title_output, keywords_output]
    )

    copy_title_btn.click(
        fn=None,
        inputs=title_output,
        outputs=[],
        js=copy_js
    )

    copy_keywords_btn.click(
        fn=None,
        inputs=keywords_output,
        outputs=[],
        js=copy_js
    )

demo.queue().launch(share=True, debug=True, inline=False)
