In [None]:
import os
import base64
import time
import subprocess
from pathlib import Path
import mimetypes
from google import genai
from google.genai import types
from dotenv import load_dotenv
import json

In [None]:
def extract_frames(video_path, base_output_dir=r"D:\videoAnalysis_model\test\img", frame_rate=1):
    """
    Extract frames from a video using ffmpeg.

    Args:
        video_path: Path to the video file
        base_output_dir: Base directory where video folders will be created
        frame_rate: Frames per second to extract (default: 1)
    """
    video_path = Path(video_path)
    base_output_dir = Path(base_output_dir)

    # Create output folder named as video file stem inside the base directory
    output_dir = base_output_dir / video_path.stem
    output_dir.mkdir(parents=True, exist_ok=True)

    # Define output frame filename pattern
    output_pattern = str(output_dir / f"{video_path.stem}_%04d.jpg")

    # Build and run ffmpeg command
    cmd = [
        "ffmpeg",
        "-loglevel", "info",
        "-i", str(video_path),
        "-vf", f"fps={frame_rate}",
        "-q:v", "2",
        "-y",
        output_pattern
    ]

    subprocess.run(cmd, check=True)
    return str(output_dir)

In [None]:
video_path = "./test/2025-05-04_13-56-04-549523_clip_990_part_001.mp4"
base_output_dir = r"D:\videoAnalysis_model\test"
extract_frames(video_path, base_output_dir)

In [None]:
# frames generated from all video that are present in "./test/video"
# final frames saved in "./test/img" path
video_folder = './test/video'

for filename in os.listdir(video_folder):
    if filename.lower().endswith(('.mp4')): 
        video_path = os.path.join(video_folder, filename)
        extract_frames(video_path)

In [None]:
# Load variables from .env into os.environ
load_dotenv()

api_key = os.getenv("genai_api_key")

client = genai.Client(
        api_key = api_key,
    )

model = "gemini-2.5-pro-preview-03-25" #"gemini-1.5-flash-8b"

generate_content_config = types.GenerateContentConfig(
    response_mime_type="application/json",
    temperature=0.0,
    max_output_tokens=8100,
    top_p=0.95,
    candidate_count=1,
)

In [None]:

def generate_image_parts(folder_path):
    image_parts = []
    supported_extensions = ('.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp')

    # Get all image files in the folder
    image_files = sorted(
        [os.path.join(folder_path, f) for f in os.listdir(folder_path)
         if f.lower().endswith(supported_extensions)]
    )

    for image_path in image_files:
        # Encode to base64
        with open(image_path, 'rb') as img_file:
            encoded_image = base64.b64encode(img_file.read()).decode('utf-8')

        # Get MIME type
        mime_type, _ = mimetypes.guess_type(image_path)
        if not mime_type:
            continue  # skip if unknown type

        # Create Part from decoded bytes
        part = types.Part.from_bytes(
            mime_type=mime_type,
            data=base64.b64decode(encoded_image)
        )
        image_parts.append(part)

    return image_parts


In [20]:
with open('./test_prompt.txt', 'r') as file:
    selected_prompt = file.read()

In [None]:
# generate response for all folders inside "./test/img" as per the prompt 
# response json in saved as txt file at path "./test/result.txt"
root_dir = './test/img'

for foldername, subfolders, files in os.walk(root_dir):
    for subfolder in subfolders:
        folder = os.path.join(foldername, subfolder)
        print(folder)
        parts_array = generate_image_parts(folder)

        print(f"Created {len(parts_array)} image parts.")

        part = types.Part.from_text(text=selected_prompt)
        parts_array.append(part)

        contents = [
            types.Content(
                role="user",
                parts= parts_array
            ),
        ]
        time.sleep(5)
        full_response_text = ""

        for chunk in client.models.generate_content_stream(
                model=model,
                contents=contents,
                config=generate_content_config,
            ):
                if chunk.text is not None:
                    full_response_text += chunk.text
        

        print(full_response_text)
        
        time.sleep(5)

        with open("./test/result.txt", 'a', encoding='utf-8') as file:
                file.write( '\n \n' + folder + '\n' + full_response_text + '\n')

In [None]:
# read result.txt file and save all resulted json to "./test/result"
input_file = './test/result.txt'
output_dir = r'D:\videoAnalysis_model\test\result'  # Desired output directory

# Ensure the output directory exists
os.makedirs(output_dir, exist_ok=True)

with open(input_file, 'r') as f:
    lines = f.readlines()

i = 0
while i < len(lines):
    path = lines[i].strip()
    i += 1
    json_lines = []

    # Collect all lines that are part of the JSON block
    while i < len(lines) and not lines[i].strip().startswith('./'):
        json_lines.append(lines[i])
        i += 1

    json_str = ''.join(json_lines)

    try:
        data = json.loads(json_str)

        # Extract filename from the last part of the path (e.g., part_001)
        filename = os.path.basename(path) + '.json'
        full_output_path = os.path.join(output_dir, filename)

        # Write the individual JSON file
        with open(full_output_path, 'w') as f_out:
            json.dump(data, f_out, indent=2)

        print(f"✅ Saved: {full_output_path}")

    except Exception as e:
        print(f"❌ Error parsing JSON for {path}: {e}")

In [21]:
def generate_user_parts_object(img_folder_name):
    folder_path = './test/img/'+ img_folder_name
    bucket_path = 'gs://video-frame-images/img1'
    result = []
    for filename in os.listdir(folder_path):
        file_path = os.path.join(folder_path, filename)
        if os.path.isfile(file_path):
            mime_type, _ = mimetypes.guess_type(file_path)
            if mime_type:  # Only include files with a recognized MIME type
                result.append({
                    "fileData": {
                        "mimeType": mime_type,
                        "fileUri": f"{bucket_path}/{img_folder_name}/{filename}"
                    }
                })
    return {
        "role": "user",
        "parts": result
    }

In [22]:
def generate_model_parts_object(json_file_path):
    try:
        with open(json_file_path, 'r', encoding='utf-8') as file:
            json_data = json.load(file)
            json_string = json.dumps(json_data, indent=2)  # You can remove indent if you want compact string

        return {
            "role": "model",
            "parts": [
                {
                    "text": json_string
                }
            ]
        }
    except FileNotFoundError:
        print(f"File not found: {json_file_path}")
        return None
    except json.JSONDecodeError:
        print(f"Invalid JSON in file: {json_file_path}")
        return None

In [23]:
# get names of all resulted json files in "./test/result"
folder_path = "./test/result"
json_files = [f for f in os.listdir(folder_path) if f.endswith('.json')]

In [None]:
# generate dataset json file
for json_file in json_files:
    img_folder_name = os.path.splitext(os.path.basename(json_file))[0]
    user_parts_object = generate_user_parts_object(img_folder_name)
    model_parts_object = generate_model_parts_object("./test/result/"+ json_file)
    dataset_json = {
        "systemInstruction" : {
            "parts": [
                {
                    "text": selected_prompt
                }
            ]
        },
        "contents": [user_parts_object, model_parts_object]
    }
    with open('./test/dataset/' + json_file , 'w') as f:
        json.dump(dataset_json, f, indent=4) 


In [None]:
# generate dataset json file without system instruction
for json_file in json_files:
    img_folder_name = os.path.splitext(os.path.basename(json_file))[0]
    user_parts_object = generate_user_parts_object(img_folder_name)
    model_parts_object = generate_model_parts_object("./test/result/"+ json_file)
    dataset_json = {
        "contents": [user_parts_object, model_parts_object]
    }
    with open('./test/dataset/' + json_file , 'w') as f:
        json.dump(dataset_json, f, indent=4) 


In [24]:
# generate dataset json file with system instruction as user prompt
for json_file in json_files:
    img_folder_name = os.path.splitext(os.path.basename(json_file))[0]
    user_parts_object = generate_user_parts_object(img_folder_name)
    user_parts_object["parts"].append({"text": selected_prompt })
    model_parts_object = generate_model_parts_object("./test/result/"+ json_file)
    dataset_json = {
        "contents": [user_parts_object, model_parts_object]
    }
    with open('./test/dataset/' + json_file , 'w') as f:
        json.dump(dataset_json, f, indent=4) 


In [25]:
# generate jsonl file from test dataset
input_folder = './test/dataset'
output_file = './test/dataset.jsonl'

with open(output_file, 'w') as outfile:
    for filename in os.listdir(input_folder):
        if filename.endswith('.json'):
            file_path = os.path.join(input_folder, filename)
            with open(file_path, 'r') as f:
                try:
                    data = json.load(f)
                    json_line = json.dumps(data)  # Convert to single-line JSON
                    outfile.write(json_line + '\n')
                except json.JSONDecodeError as e:
                    print(f"Skipping {filename}: invalid JSON - {e}")

In [26]:
# upload dataset jsonl file to bucket
!gsutil cp D:\videoAnalysis_model\test\dataset.jsonl gs://video-frame-images/test_case_user_prompt.jsonl

Copying file://D:\videoAnalysis_model\test\dataset.jsonl [Content-Type=application/octet-stream]...
/ [0 files][    0.0 B/905.2 KiB]                                                
-
- [0 files][905.2 KiB/905.2 KiB]                                                
- [1 files][905.2 KiB/905.2 KiB]                                                
\

Operation completed over 1 objects/905.2 KiB.                                    
