In [None]:
import cv2, os, torch
from PIL import Image as pillow
from typing import Union
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline

In [None]:
local_model_path = "moondream2"
local_tokenizer_path = "moondream2_tokenizer"
llama_model_path = "llama_3-2_1B"

model_id = "vikhyatk/moondream2"
llama_model_id = "meta-llama/Llama-3.2-1B-Instruct"
revision = "2024-08-26"

In [None]:
# Example usage
video_path = "/kaggle/input/vid-sample/jujutsu-kaisen-shibuya-arc-uraume-shibuya-arc.mp4"
output_folder = "vidframes"
sample_rate = 1  # Extract a frame every 2 seconds

In [None]:
def extract_frames(
    video_path: Union[str, os.PathLike],
    output_folder: str = output_folder,
    sample_rate: int = 1
) -> os.PathLike:

    os.makedirs(output_folder, exist_ok=True)  # Create output folder if it doesn't exist

    video = cv2.VideoCapture(video_path) # read video file with cv2

    # get video properties
    fps = video.get(cv2.CAP_PROP_FPS) # frames per second in the video
    num_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT)) # total number of frames
    frame_interval = int(fps * sample_rate) # Calculate the frame interval based on the sample rate

    # Initialize frame counter
    frame_count = 0

    while True:
        success, frame = video.read()  # Read a frame
        # Extract frame at specified intervals
        if frame_count % frame_interval == 0 and success:
            frame_filename = os.path.join(output_folder, f"frame_{frame_count:06d}.jpg")
            cv2.imwrite(frame_filename, frame) # write frame to jpeg/image file
        else:
            break # stop when theres an error in frame extraction 

        frame_count += 1

    # Release the video capture object
    video.release()

    print(f"Extracted {frame_count // frame_interval} of {num_frames} total frames, image frames saved at {output_folder}")

    return output_folder

In [None]:
output_folder = extract_frames(video_path, output_folder, sample_rate)

In [None]:
def load_models(
    md_model_path: Union[str, os.PathLike] = local_model_path,
    md_tokenizer_path: Union[str, os.PathLike] = local_tokenizer_path,
    llama_path: str = llama_model_path,
    model_id: str = model_id,
    llama_id: str = llama_model_id
) -> tuple:
    md_model = None
    md_tokenizer = None
    llama_pipe = None

    is_local = os.path.isdir(md_model_path)  # check if previously saved models are available
    llm_is_local = os.path.isdir(llama_model_path)

    if is_local and llm_is_local:  # load from locally saved weights
        print('loading from local checkpoint')
        md_model = AutoModelForCausalLM.from_pretrained(md_model_path)
        md_tokenizer = AutoTokenizer.from_pretrained(md_tokenizer_path)
        llama_pipe = pipeline(
            "text-generation",
            model=llama_path,
            torch_dtype=torch.float16,
            device_map="auto",
        )

    else:  # download fresh weights from huggingface
        print('downloading weights from huggingface')
        md_model = AutoModelForCausalLM.from_pretrained(model_id)
        md_tokenizer = AutoTokenizer.from_pretrained(model_id)
        llama_pipe = pipeline(
            "text-generation",
            model=llama_id,
            torch_dtype=torch.float16,
            device_map="auto"
        )
        # then save locally for next time
        llama_pipe.save_pretrained(llama_model_path)
        md_model.save_pretrained(md_model_path)
        md_tokenizer.save_pretrained(md_tokenizer_path) # type: ignore

    return md_model, md_tokenizer, llama_pipe

In [None]:
moondream_model, md_tokenizer, llama_pipe = load_models()

In [None]:
vidframes = [os.path.join(output_folder, path) for path in os.listdir(output_folder)]

image_frames = [pillow.open(img) for img in vidframes]

In [None]:
def caption_frames(
    image_frames: list,
    model: AutoModelForCausalLM = moondream_model,
    tokenizer: AutoTokenizer = md_tokenizer
) -> list:
    
    captions = []

    for frame in image_frames:
        enc_image = model.encode_image(frame) # encode image with vision encoder(moondream uses SigLip)
        frame_caption = model.answer_question(enc_image, "briefly describe this image", tokenizer) # generate caption
        
        captions.append(frame_caption)
        
    return captions

captions = caption_frames(image_frames)

## LLM part

In [None]:
def merge_captions(captions: list, llm_pipeline: pipeline = llama_pipe) -> str:
    single_cap = '.'.join(captions) 
    messages = [
        {
            "role": "system",
            "content": "You are a summary chatbot who summarizes and merges several image captions into one long sentence",
        },
        {"role": "user", "content": f"{single_cap}"},
    ]

    outputs = llm_pipeline(
        messages,
        max_new_tokens=256,
    )

    llm_caption = outputs[0]["generated_text"][-1]
    
    return llm_caption