In [1]:
# necessory imports
import torch
import random
import os
import json
import re
import google.generativeai as genai
from transformers import BlipProcessor, BlipForConditionalGeneration
import cv2
from moviepy.editor import VideoFileClip, TextClip, CompositeVideoClip

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
#this model will be used for captioning the frames inside the video
device = 'cuda' if torch.cuda.is_available() else 'cpu'
processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-large")
model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-large").to(device)

In [3]:
#api key to get gemini up and running
with open('key.json') as f:
    content = json.load(f)
    key = content.get('gemini_key')

genai.configure(api_key=key)
llm = genai.GenerativeModel("gemini-1.5-flash")

In [4]:
#this will jsonify the output from the gemini

def jsonify(json_sting):
    try:
        # Remove non-JSON text before and after the JSON object using regex
        json_regex = r'(\{.*\}|\[.*\])'  
        matches = re.findall(json_regex, json_sting, re.DOTALL)

        if not matches:
            raise ValueError("No valid JSON object found in the response.")
            
        cleaned_response = matches[0]

        
        # Remove extra whitespace characters
        cleaned_response = re.sub(r'\s+', ' ', cleaned_response)

        
        # Fix trailing commas in objects and arrays
        cleaned_response = re.sub(r',\s*([\]}])', r'\1', cleaned_response)

        # Remove extra commas or add missing ones
        cleaned_response = re.sub(r'{\s*,', '{', cleaned_response)
        cleaned_response = re.sub(r'\[\s*,', '[', cleaned_response)

        # Fix invalid escape characters by removing unescaped backslashes
        cleaned_response = re.sub(r'(?<!\\)\\(?!["\\/bfnrtu])', '', cleaned_response)

        # Fix invalid numbers (remove leading zeros)
        cleaned_response = re.sub(r'\b0+(\d+)', r'\1', cleaned_response)

        # Remove any comments (JSON does not allow comments)
        cleaned_response = re.sub(r'\/\/.*|\/\*.*\*\/', '', cleaned_response, flags=re.MULTILINE)

        # Remove any misplaced control characters or invalid Unicode
        cleaned_response = re.sub(r'[\x00-\x1F\x7F]', '', cleaned_response)

        # Fix improperly nested JSON (very basic handling)
        cleaned_response = re.sub(r'\{([^\{\}\[\]]+)\{', r'{\1,{', cleaned_response)
        cleaned_response = re.sub(r'\}([^\{\}\[\]]+)\}', r'},\1}', cleaned_response)

        # Fix repeated keys in objects (keep the last occurrence)
        def remove_repeated_keys(match):
            obj_str = match.group(0)
            try:
                # Attempt to parse JSON object to find unique keys
                temp_obj = json.loads(obj_str)
                # Convert back to string to maintain original structure
                return json.dumps(temp_obj)
            except:
                return obj_str  # If parsing fails, return original

        cleaned_response = re.sub(r'\{.*?\}', remove_repeated_keys, cleaned_response)
        
        return json.loads(cleaned_response)

    except json.JSONDecodeError as e:
        print(f"JSONDecodeError: {e}")
        return None
    except ValueError as ve:
        print(f"ValueError: {ve}")
        return None
    except Exception as e:
        print(f"Unexpected error: {e}")
        return None

In [5]:
# Return the captions of all the frames
def captionate_frames(frames):
    inputs = processor(frames, return_tensors="pt").to(device)
    outputs = model.generate(**inputs)
    captions = [processor.decode(i, skip_special_tokens=True) for i in outputs]
    return captions

In [6]:
# prompt to make video captions more sensible
def get_captions_and_time_frame(captions, fps, duration):
    prompt = '''
		i am working on a video in which i have to caption the video according to the actions and activities performed in that video.
		currently i have the fps, duration of the video. additionally, i have extracted a random frame from each second and by using some model, i captioned those frames.
		so, for example, if a video is of 60 seconds, i have 60 frames and 60 captions.
		i am providing you the fps, total duration, and captions that are generated. i want you to update those captions and make them contextualize and summarize them a little bit that if FEELS LIKE THEY ARE FROM ORIGINAL VIDEO.

		i want you to return me in the format given below
		```
		[
		{
			"start_time": start_time1,
			"end_time": end_time1,
			"caption": caption1,
		},
		{
			"start_time": start_time2,
			"end_time": end_time2,
			"caption": caption2,
		},
		{
			"start_time": start_time3,
			"end_time": end_time3,
			"caption": caption4,
		},
		{
			"start_time": start_time4,
			"end_time": end_time4,
			"caption": caption4,
		},
		...
		]

		```

		here are the actual details and captions of video frames.
		fps: %d,
		duration: %d,
		captions:[%s]
		''' % (fps, duration, ','.join(captions))

    response = llm.generate_content(prompt).text
    json_obj = jsonify(response)
    if json_obj is not None:
            return json_obj
    raise ValueError('There is a flaw in data recieved. please request again.')

In [7]:
# prompt to find the the most relevent part from the video
def get_video_segment(captions, fps, duration, scene):
    prompt = '''
    i am working with video in which i have to take out a segment of video depending upon the scene described. currently i have the fps, duration of the video. additionally, i have extracted a random frame from each second and by using some model, i captioned those frames.
    so, for example, if a video is of 60 seconds, i have 60 frames and 60 captions.
    i am providing you the fps, total duration, and captions that are generated. i want you to give me the start and end second of from the video that best describes the provided scene.

    i want you to return me in the format given below
    ```
    {
    "start": start time,
    "end": end time
    }
    ```
    here are the actual details and captions of video frames.
    fps: %d
    duration: %d seconds
    scene: %s
    captions: [%s]

    NOTE: I DO NOT WANT ANY KIND OF EXPLAINATION, I DIRECTLY WANT THE ANSWER AND ASKED FORMAT
    '''% (fps, duration, scene, ','.join(captions))
    response = llm.generate_content(prompt).text
    json_obj = jsonify(response)
    if json_obj is not None:
            return json_obj
    raise ValueError('There is a flaw in data recieved. please request again.')

In [8]:
#return the video detials
def get_video_data(video_path):
    video = cv2.VideoCapture(video_path)
    fps = video.get(cv2.CAP_PROP_FPS)
    duration = video.get(cv2.CAP_PROP_FRAME_COUNT) / fps
    frames = []
    for second in range(int(duration)):
        # Move to the starting frame of the second
        video.set(cv2.CAP_PROP_POS_FRAMES, second * fps)
        # Randomly select a frame within the second
        random_frame = int(random.uniform(0, fps))
        video.set(cv2.CAP_PROP_POS_FRAMES, second * fps + random_frame)
        ret, frame = video.read()
        if ret:
            frames.append(frame)
    video.release()
    return fps, duration, frames

In [9]:
#main functin to captionate the video
def caption_video(video_path):
    fps, duration, frames = get_video_data(video_path)
    captions = captionate_frames(frames)
    captions_with_timeframes = get_captions_and_time_frame(captions, fps, duration)
    video_clip = VideoFileClip(video_path)
    text_clips = []
    for item in captions_with_timeframes:
        caption = item.get('caption')
        start_time = item.get('start_time')
        end_time = item.get('end_time')
        text_clip = (TextClip(caption, method='caption', fontsize=18, color='white', bg_color='black')
                .set_position('bottom',relative=True)
                .set_start(start_time)
                .set_duration(end_time - start_time))
        text_clips.append(text_clip)
    final_video = CompositeVideoClip([video_clip, *text_clips])
    output_path = os.path.splitext(video_path)[0] + '_captioned.mp4'
    final_video.write_videofile(output_path, codec='libx264')

In [10]:
#main function to get the segment of described scene from the video
def clip_video(video_path,scene):
    fps, duration, frames = get_video_data(video_path)
    captions = captionate_frames(frames)
    time_frame = get_video_segment(captions,fps,duration,scene)
    start = time_frame.get('start')
    end = time_frame.get('end')
    output_dir = os.path.dirname(video_path)
    video_name, video_extension = os.path.splitext(os.path.basename(video_path))
    output_video_path = os.path.join(output_dir, f"{video_name}_segment{video_extension}")
    video = VideoFileClip(video_path)
    video_segment = video.subclip(start, end)
    video_segment.write_videofile(output_video_path, codec="libx264")
