In [22]:
import torch
from transformers import AutoProcessor, Idefics3ForConditionalGeneration
from PIL import Image
import cv2
import numpy as np
from typing import List
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class VideoFrameExtractor:
    def __init__(self, max_frames: int = 50, min_frames: int = 1):
        self.max_frames = max_frames
        self.min_frames = min_frames

    def resize_and_center_crop(self, image: Image.Image, target_size: int) -> Image.Image:
        # Get current dimensions
        width, height = image.size

        # Calculate new dimensions keeping aspect ratio
        if width < height:
            new_width = target_size
            new_height = int(height * (target_size / width))
        else:
            new_height = target_size
            new_width = int(width * (target_size / height))

        # Resize
        image = image.resize((new_width, new_height), Image.Resampling.LANCZOS)

        # Center crop
        left = (new_width - target_size) // 2
        top = (new_height - target_size) // 2
        right = left + target_size
        bottom = top + target_size

        return image.crop((left, top, right, bottom))

    def extract_frames(self, video_path: str) -> List[Image.Image]:
        # check extension - if mp4 or gif use cv2 for all operations
        if video_path.endswith('.mp4') or video_path.endswith('.gif'):
            cap = cv2.VideoCapture(video_path)
            if not cap.isOpened():
                raise ValueError(f"Could not open video: {video_path}")

            # Get video properties
            total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
            print(f'total frames: {total_frames}')
            fps = int(cap.get(cv2.CAP_PROP_FPS))
            print(f'fps: {fps}')

            # adjust fps to so that total_frames / fps is equal to min_frames
            if total_frames / fps < self.min_frames:
                fps = int(total_frames / self.min_frames)
                print(f'adjusted fps: {fps}')

            # Calculate frame indices to extract (1fps)
            frame_indices = list(range(0, total_frames, fps))
            print(f'frame indices: {frame_indices}')

            # If we have more frames than max_frames, sample evenly
            if len(frame_indices) > self.max_frames:
                indices = np.linspace(0, len(frame_indices) - 1, self.max_frames, dtype=int)
                frame_indices = [frame_indices[i] for i in indices]

            frames = []
            for frame_idx in frame_indices:
                cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
                ret, frame = cap.read()
                if ret:
                    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                    pil_image = Image.fromarray(frame)
                    pil_image = self.resize_and_center_crop(pil_image, 384)
                    frames.append(pil_image)

            cap.release()
            return frames
        else:
            # if webp use PIL for all operations
            video = Image.open(video_path)
            total_frames = video.n_frames
            print(f'total frames: {total_frames}')

            # adjust fps to so that total_frames / fps is equal to min_frames
            fps = int(total_frames / self.min_frames)
            print(f'fps: {fps}')

            # Calculate frame indices to extract (1fps)
            frame_indices = list(range(0, total_frames, fps))
            print(f'frame indices: {frame_indices}')

            # If we have more frames than max_frames, sample evenly
            if len(frame_indices) > self.max_frames:
                indices = np.linspace(0, len(frame_indices) - 1, self.max_frames, dtype=int)
                frame_indices = [frame_indices[i] for i in indices]

            frames = []
            for frame_idx in frame_indices:
                video.seek(frame_idx)
                pil_image = video.copy()
                pil_image = self.resize_and_center_crop(pil_image, 384)
                frames.append(pil_image)

            return frames

def load_model(checkpoint_path: str, base_model_id: str = "HuggingFaceTB/SmolVLM-Instruct", device: str = "cuda"):
    # Load processor from original model
    processor = AutoProcessor.from_pretrained(base_model_id)
    if checkpoint_path:
        # Load fine-tuned model from checkpoint
        model = Idefics3ForConditionalGeneration.from_pretrained(
            checkpoint_path,
            torch_dtype=torch.bfloat16,
            device_map=device
        )
    else:
        model = Idefics3ForConditionalGeneration.from_pretrained(
            base_model_id,
            torch_dtype=torch.bfloat16,
            device_map=device
        )

    # Configure processor for video frames
    processor.image_processor.size = (384, 384)
    processor.image_processor.do_resize = False
    processor.image_processor.do_image_splitting = False

    return model, processor

def generate_response(model, processor, video_path: str, question: str, max_frames: int = 50):
    # Extract frames
    frame_extractor = VideoFrameExtractor(max_frames)
    frames = frame_extractor.extract_frames(video_path)
    logger.info(f"Extracted {len(frames)} frames from video")

    # Create prompt with frames
    image_tokens = [{"type": "image"} for _ in range(len(frames))]
    messages = [
        {
            "role": "user",
            "content": [
                {"type": "text", "text": "Answer briefly."},
                *image_tokens,
                {"type": "text", "text": question}
            ]
        }
    ]

    # Process inputs
    inputs = processor(
        text=processor.apply_chat_template(messages, add_generation_prompt=True),
        images=[img for img in frames],
        return_tensors="pt"
    ).to(model.device)

    # Generate response
    outputs = model.generate(
        **inputs,
        max_new_tokens=100,
        num_beams=5,
        temperature=0.7,
        do_sample=True,
        use_cache=True
    )

    # Decode response
    response = processor.decode(outputs[0], skip_special_tokens=True)
    return response

In [12]:
from transformers import AutoProcessor, Idefics3ForConditionalGeneration

# base_model_id = "HuggingFaceTB/SmolVLM-Instruct"
# processor = AutoProcessor.from_pretrained(base_model_id)

processor = AutoProcessor.from_pretrained("HuggingFaceTB/SmolVLM-Instruct")


Some kwargs in processor config are unused and will not have any effect: image_seq_len. 


In [13]:
# Configuration
checkpoint_path = None
base_model_id = "HuggingFaceTB/SmolVLM-Instruct"

# Automatically determine the best available device
if torch.backends.mps.is_available():
    device = "mps"  # Metal (Apple Silicon)
elif torch.cuda.is_available():
    device = "cuda"  # NVIDIA GPU
else:
    device = "cpu"  # Fallback to CPU

# Load model
logger.info("Loading model...")
model, processor = load_model(checkpoint_path, base_model_id, device)

INFO:__main__:Loading model...
Some kwargs in processor config are unused and will not have any effect: image_seq_len. 


In [23]:
video_path = "data/test3.gif"
video_path = "data/test2.webp"
question = "Describe the sequence of images."

# Generate response
logger.info("Generating response...")
response = generate_response(model, processor, video_path, question)

# Print results
print("Question:", question)
print("Response:", response)

INFO:__main__:Generating response...
INFO:__main__:Extracted 1 frames from video


total frames: 41
fps: 41
frame indices: [0]
Question: Describe the sequence of images.
Response: User: Answer briefly.<image>Describe the sequence of images.
Assistant: The image depicts a character from the animated television series "The Simpsons." The character in question is Homer Simpson, a central figure in the show. Here is a detailed description of the image based on the provided facts:

1. **Character:** Homer Simpson
   - Name: Homer Simpson
   - Gender: Male
   - Age: 50 years old
   - Appearance: Homer is a middle-aged man with a distinctive yellow skin tone, bald head, and a large, round


In [None]:
video_path = "examples/test3.gif"
# video_path = "examples/test2.webp"
question = "Describe the video"

# Generate response
logger.info("Generating response...")
response = generate_response(model, processor, video_path, question)

# Print results
print("Question:", question)
print("Response:", response)

In [25]:
video_path = "examples/test2.webp"
image = cv2.imread(video_path, cv2.IMREAD_UNCHANGED)

cv2.imshow("WebP Image", image)

# cap = cv2.VideoCapture(video_path)
# frames = []
# while(cap.isOpened()):
#     ret, frame = cap.read()
#     if ret == True:
#         frames.append(frame)
#     else:
#         break
# cap.release()

In [26]:
from PIL import Image

img = Image.open(video_path)
frames = []
for i in range(img.n_frames):
    img.seek(i)
    frames.append(img.copy())

In [27]:
img.n_frames

41

In [22]:
frames[-1].show()

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
