# Gemini Robotics-ER 1.5 API Test Notebook

**사전 준비:**
1. `assets` 폴더에 테스트할 비디오(e.g. `video.mp4`)가 있어야 합니다.
2. 유효한 Google GenAI API 키가 필요합니다.

# 셋업

### 라이브러리 설치

In [1]:
# === 0. 초기 설정: 라이브러리 설치 및 임포트 ===
# === 0. 초기 설정 (Setup) ===
# 라이브러리 설치, 임포트, API 설정, 헬퍼 함수 정의

# 1) 필요한 라이브러리 설치 (필요시 주석 해제 후 실행)
# !pip install google-genai pillow matplotlib opencv-python

# 2) 라이브러리 임포트
import os
import json
import cv2
import time
import io
from google import genai
from google.genai import types
from PIL import Image, ImageDraw, ImageFont
import matplotlib.pyplot as plt
import numpy as np

from google.colab import drive
drive.mount('/content/drive')



Mounted at /content/drive


### API 키 및 클라이언트 세팅
* api 키를 입력해주세요

In [2]:
# === 1. API 설정 ===
# 3) API 설정
# 여기에 API 키를 입력하거나, 환경 변수에서 가져오세요.
API_KEY = 'your_api_key' # <-- 여기에 본인의 API 키를 입력하세요
MODEL_ID = "gemini-robotics-er-1.5-preview"  # Gemini Robotics-ER 모델 사용

client = genai.Client(api_key=API_KEY)
print(f"Gemini Robotics-ER 클라이언트 초기화 완료")



Gemini Robotics-ER 클라이언트 초기화 완료


### 헬퍼 함수 정의

In [3]:
LAST_PROCESSED_VIDEO_PATH = None

def parse_json(json_output):
    """JSON 응답에서 마크다운 코드 블록을 제거하고 파싱합니다."""
    lines = json_output.splitlines()
    for i, line in enumerate(lines):
        if line == "```json":
            json_output = "\n".join(lines[i + 1:])
            json_output = json_output.split("```")[0]
            break
    return json_output.strip()

def upload_video_file(client, video_path):
    """비디오 파일을 Gemini API에 업로드하고 완료될 때까지 대기합니다."""
    print(f"Uploading video file: {video_path}")
    try:
        myfile = client.files.upload(file=video_path)

        # 업로드 완료 대기
        while myfile.state.name == "PROCESSING":
            print(".", end="", flush=True)
            time.sleep(1)
            myfile = client.files.get(name=myfile.name)

        if myfile.state.name == "FAILED":
            raise ValueError(f"File upload failed: {myfile.state.name}")

        print(" Upload completed!")
        return myfile

    except Exception as e:
        print(f"Upload failed: {e}")
        raise

def extract_frames_from_video(video_path, frame_step=10):
    """비디오에서 프레임을 추출하여 PIL Image 리스트로 반환합니다."""
    frames = []
    cap = cv2.VideoCapture(video_path)

    if not cap.isOpened():
        print("Error opening video file")
        return frames

    frame_count = 0
    success_count = 0

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        # 지정된 간격으로 프레임 추출
        if frame_count % frame_step == 0:
            # OpenCV BGR을 RGB로 변환 후 PIL Image로 변환
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            pil_image = Image.fromarray(frame_rgb)
            frames.append(pil_image)
            success_count += 1

        frame_count += 1

    cap.release()
    print(f"Extracted {success_count} frames from video (step={frame_step})")
    return frames

def process_video_frame_by_frame(client, video_path, prompt, frame_step=10):
    """비디오를 프레임별로 분석하여 각 프레임의 결과를 반환합니다."""
    frames = extract_frames_from_video(video_path, frame_step)

    if not frames:
        print("No frames extracted from video")
        return []

    frame_results = []
    total_frames = len(frames)

    print(f"Processing {total_frames} frames...")

    for i, frame in enumerate(frames):
        print(f"Processing frame {i+1}/{total_frames}...")

        try:
            # 프레임을 바이트로 변환
            buffer = io.BytesIO()
            frame.save(buffer, format='JPEG')
            frame_bytes = buffer.getvalue()

            # API 호출
            response = client.models.generate_content(
                model=MODEL_ID,
                contents=[
                    types.Part.from_bytes(
                        data=frame_bytes,
                        mime_type='image/jpeg',
                    ),
                    prompt
                ],
                config=types.GenerateContentConfig(
                    temperature=0.5,
                    thinking_config=types.ThinkingConfig(thinking_budget=0)
                )
            )

            # JSON 파싱
            json_text = parse_json(response.text)
            frame_data = json.loads(json_text)
            frame_results.append(frame_data)

        except Exception as e:
            print(f"Error processing frame {i+1}: {e}")
            frame_results.append([])  # 에러 발생 시 빈 리스트 추가

    print(f"Completed processing {len(frame_results)} frames")
    return frame_results

def populate_points_for_all_frames(total_frames, frame_step, analyzed_data):
    """분석된 프레임 데이터를 모든 프레임에 적용합니다."""
    points_data_all_frames = []
    analyzed_data_index = 0

    for i in range(total_frames):
        if i % frame_step == 0 and analyzed_data_index < len(analyzed_data):
            # 분석된 프레임 데이터 사용
            points_data_all_frames.append(analyzed_data[analyzed_data_index])
            analyzed_data_index += 1
        else:
            # 분석되지 않은 프레임은 이전 분석 데이터 사용
            if analyzed_data_index > 0:
                points_data_all_frames.append(analyzed_data[analyzed_data_index - 1])
            else:
                points_data_all_frames.append([])

    return points_data_all_frames

def plot_video_with_points(video_path, json_output):
    """개선된 비디오 위에 포인트와 박스를 시각화하여 저장합니다."""
    global LAST_PROCESSED_VIDEO_PATH

    # 출력 디렉토리 생성
    output_dir = os.path.join(os.path.dirname(video_path), 'output')
    os.makedirs(output_dir, exist_ok=True)

    # 출력 파일 경로
    base_video_name = os.path.basename(video_path)
    timestamp = time.strftime('%y%m%d') # YYMMDD format
    output_video_name = f"{os.path.splitext(base_video_name)[0]}_processed_{timestamp}.mp4"
    output_path = os.path.join(output_dir, output_video_name)

    LAST_PROCESSED_VIDEO_PATH = output_path

    if not os.path.exists(video_path):
        print(f"Video not found: {video_path}")
        return

    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print("Error opening video stream or file")
        return

    # 비디오 정보 가져오기
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))


    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    print(f"Processing {total_frames} frames...")

    frame_idx = 0

    # 프레임별 데이터 준비
    if isinstance(json_output, list):
        # 이미 프레임별 데이터인 경우
        frame_data = json_output
    else:
        # 단일 응답을 모든 프레임에 적용
        frame_data = populate_points_for_all_frames(total_frames, 1, [json_output])

    # 시각화 루프
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # 해당 프레임에 대한 데이터 가져오기
        current_data = []
        if frame_idx < len(frame_data):
            frame_item = frame_data[frame_idx]
            if isinstance(frame_item, list):
                current_data = frame_item
            elif isinstance(frame_item, dict):
                current_data = [frame_item]

        # 시각화
        for obj in current_data:
            label = obj.get('label', '')

            # Point 그리기
            if 'point' in obj:
                y_norm, x_norm = obj['point']
                x = int((x_norm / 1000) * width)
                y = int((y_norm / 1000) * height)
                cv2.circle(frame, (x, y), 5, (0, 0, 255), -1)
                cv2.putText(frame, label, (x + 10, y), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 0), 2)

            # Bounding Box 그리기
            if 'box_2d' in obj:
                ymin, xmin, ymax, xmax = obj['box_2d']
                left = int((xmin / 1000) * width)
                top = int((ymin / 1000) * height)
                right = int((xmax / 1000) * width)
                bottom = int((ymax / 1000) * height)
                cv2.rectangle(frame, (left, top), (right, bottom), (0, 255, 0), 2)
                cv2.putText(frame, label, (left, top - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

        out.write(frame)
        frame_idx += 1

        # 진행 상황 표시
        if frame_idx % 100 == 0:
            print(f"Processed {frame_idx}/{total_frames} frames...")

    cap.release()
    out.release()
    print(f"Processed video saved to {output_path}")

    return output_path

def convert_tracking_results_to_frame_data_for_plot(tracking_result, video_info):
    """
    track_objects_in_video의 결과를 plot_video_with_points에 적합한 프레임별 데이터로 변환합니다.
    """
    fps = video_info['fps']
    total_frames = video_info['total_frames']

    # 각 프레임에 대한 시각화 데이터를 저장할 리스트 초기화
    # 각 요소는 리스트로 시작
    frame_visual_data = [[] for _ in range(total_frames)]

    tracked_objects = tracking_result.get('tracked_objects', [])

    # 키프레임 데이터를 각 프레임에 할당
    for obj in tracked_objects:
        obj_name = obj.get('object_name', 'Unknown')
        for keyframe in obj.get('keyframes', []):
            timestamp = keyframe.get('timestamp_seconds', 0.0)
            point = keyframe.get('point')

            if point is None:
                continue

            frame_idx = int(timestamp * fps)

            # 인덱스가 비디오 총 프레임 수를 초과하지 않도록 보장
            if 0 <= frame_idx < total_frames:
                frame_visual_data[frame_idx].append({
                    "point": point,
                    "label": obj_name
                })

    # 키프레임 사이에 데이터 채우기 (populate_points_for_all_frames와 유사하게)
    # 이전 프레임의 데이터를 현재 프레임으로 복사하여 연속성 유지
    for i in range(1, total_frames):
        if not frame_visual_data[i]: # 현재 프레임에 데이터가 없으면
            frame_visual_data[i] = frame_visual_data[i-1] # 이전 프레임 데이터 복사

    return frame_visual_data

def track_objects_in_video(client, video_path, object_queries, config=None, model_id="gemini-robotics-er-1.5-preview"):
    """
    업로드 방식으로 비디오에서 물체들을 트래킹합니다.
    쿡북의 2D Pointing 방식과 유사하게 각 물체의 위치를 포인팅합니다.

    Args:
        client: Gemini API 클라이언트
        video_path: 분석할 MP4 파일 경로
        object_queries: 트래킹할 물체 목록 (리스트)
        config: API 설정 (선택사항)
        model_id: 사용할 모델 ID

    Returns:
        트래킹 결과 (JSON 형식)
    """
    print(f"Tracking objects using upload method: {video_path}")
    print(f"Objects to track: {', '.join(object_queries)}")

    # 기본 설정
    default_config = types.GenerateContentConfig(
        temperature=0.5,
        thinking_config=types.ThinkingConfig(thinking_budget=-1)
    )

    if config is None:
        config = default_config

    try:
        # 비디오 파일 업로드
        uploaded_file = upload_video_file(client, video_path)

        # 물체 트래킹을 위한 프롬프트 (쿡북의 2D Pointing 방식 기반)
        prompt = f"""
        Track the following objects throughout the video: {', '.join(object_queries)}.

        For each object, provide keyframe timestamps and their positions.
        Return the result as JSON with this structure:
        {{
            "tracked_objects": [
                {{
                    "object_name": "object1",
                    "keyframes": [
                        {{
                            "timestamp_seconds": 0.0,
                            "point": [y, x],  // normalized 0-1000
                            "description": "object position at this time"
                        }},
                        // ... more keyframes
                    ]
                }},
                // ... more objects
            ]
        }}

        Points are in [y, x] format normalized to 0-1000.
        Include keyframes at regular intervals to show object movement.
        """

        # API 호출로 물체 트래킹 수행
        start_time = time.time()
        response = client.models.generate_content(
            model=model_id,
            contents=[uploaded_file, prompt],
            config=config,
        )
        end_time = time.time()

        processing_time = end_time - start_time
        print(f"Object tracking completed in {processing_time:.2f} seconds")

        # JSON 응답 파싱
        json_text = parse_json(response.text)
        result = json.loads(json_text)

        print(f"✓ 물체 트래킹 완료! {len(result.get('tracked_objects', []))}개 물체 추적")
        return result

    except Exception as e:
        print(f"✗ 물체 트래킹 실패: {e}")
        import traceback
        traceback.print_exc()
        raise


# === 업로드 방식 물체 트래킹 테스트 함수들 ===

def test_upload_object_tracking(client, video_path, object_queries=None):
    """
    업로드 방식으로 물체 트래킹을 테스트합니다.
    gemini_er_video_test.ipynb의 테스트 코드 스타일과 동일합니다.
    """
    print("=== 업로드 방식 물체 트래킹 테스트 ===")

    if not os.path.exists(video_path):
        print(f"비디오 파일을 찾을 수 없습니다: {video_path}")
        return None

    # 기본 물체 목록 (사용자가 지정하지 않은 경우)
    if object_queries is None:
        object_queries = ["blue box", "yellow box", "tape roll"]

    try:
        print(f"트래킹할 물체들: {', '.join(object_queries)}")
        print("업로드 방식으로 물체 트래킹 시작...")

        # 물체 트래킹 수행
        result = track_objects_in_video(client, video_path, object_queries)

        print("\n✓ 업로드 방식 물체 트래킹 완료!")

        # 결과 요약 출력
        tracked_objects = result.get('tracked_objects', [])
        for obj in tracked_objects:
            obj_name = obj.get('object_name', 'Unknown')
            keyframes = obj.get('keyframes', [])
            print(f"  {obj_name}: {len(keyframes)}개 키프레임에서 추적")

        # --- 추가된 저장 및 시각화 로직 ---
        if result and tracked_objects:
            global video_info # Ensure video_info is accessible

            # Convert tracking results to a format suitable for plot_video_with_points
            processed_frame_data = convert_tracking_results_to_frame_data_for_plot(result, video_info)

            print("\n트래킹 결과를 비디오에 시각화 중...")
            output_video_path = plot_video_with_points(video_path, processed_frame_data)
            print(f"시각화된 비디오 저장 경로: {output_video_path}")

            # Optionally, display the video if in a Colab environment
            try:
                from IPython.display import Video
                display(Video(output_video_path, embed=True))
                print("결과 비디오가 표시되었습니다.")
            except ImportError:
                print("IPython이 설치되지 않아 비디오를 표시할 수 없습니다.")
                print(f"결과 파일은 다음 경로에 저장되었습니다: {output_video_path}")
        else:
            print("시각화할 트래킹 결과가 없습니다.")
        # --- 끝 ---

        return result

    except Exception as e:
        print(f"✗ 업로드 방식 물체 트래킹 실패: {e}")
        import traceback
        traceback.print_exc()
        return None


def run_upload_tracking_tests(client, video_path):
    """
    업로드 방식 물체 트래킹 테스트를 실행합니다.
    """
    print("Gemini Robotics-ER 업로드 방식 물체 트래킹 테스트 시작")
    print("=" * 60)

    results = {}

    # 업로드 방식 물체 트래킹 테스트
    print("\n업로드 방식 물체 트래킹")
    results['object_tracking'] = test_upload_object_tracking(client, video_path)

    print("\n" + "=" * 60)
    print("업로드 방식 트래킹 테스트 완료!")

    # 결과 요약
    success_count = sum(1 for result in results.values() if result is not None)
    total_count = len(results)

    print(f"\n테스트 결과: {success_count}/{total_count} 성공")
    print("\n상세 결과:")
    test_names = {
        'object_tracking': '물체 트래킹'
    }

    for test_name, result in results.items():
        status = "성공" if result is not None else "실패"
        korean_name = test_names.get(test_name, test_name)
        print(f"  {korean_name}: {status}")

    return results

# 파일 로더
* 원하는 파일 경로 및 파일명으로 수정하세요

In [7]:
# 파일 경로 예시:
VIDEO_PATH = '/content/drive/MyDrive/Colab Notebooks/gemini_er_test/assets/video.mp4'

def check_and_load_video(video_path):
    """비디오 파일 존재 여부와 정보를 확인합니다."""
    if not os.path.exists(video_path):
        raise FileNotFoundError(f"비디오 파일을 찾을 수 없습니다: {video_path}")

    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        cap.release()
        raise ValueError(f"비디오 파일을 열 수 없습니다: {video_path}")

    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    cap.release()

    video_info = {
        'width': width,
        'height': height,
        'fps': fps,
        'total_frames': total_frames
    }

    print(f"비디오 파일 확인됨: {video_path}")
    print(f"비디오 정보: {width}x{height}, {fps}fps, {total_frames} 프레임")

    return video_info

# 비디오 파일 검증
try:
    video_info = check_and_load_video(VIDEO_PATH)
    print("\\n 비디오 분석 준비 완료!")
except Exception as e:
    print(f"비디오 파일 검증 실패: {e}")
    exit(1)

비디오 파일 검증 실패: 비디오 파일을 찾을 수 없습니다: /content/drive/MyDrive/Colab Notebooks/gemini_er_test/assets/video.mp4


# 비디오 테스트

#### 비디오 업로드 방식
* 영상 파일 단위로 업로드 한 후 처리하는 방식

#### 프레임 분할
* 비디오를 프레임 단위로 분리하여 처리하는 방식

In [5]:
if not os.path.exists(VIDEO_PATH):
    print(f"비디오 파일을 찾을 수 없습니다: {VIDEO_PATH}")
else:
    # 추적할 객체들 정의:
    queries = [
        "blue box",
        "yellow box",
        "tape roll"
    ]

    # 프롬프트 예시:
    prompt = f"""
    Point to the following objects in the provided image: {', '.join(queries)}.
    The answer should follow the json format:

    [{{"point": <point>, "label": <label>}}, ...].

    The points are in [y, x] format normalized to 0-1000.
    If no objects are found, return an empty JSON list [].
    """

    try:
        print("프레임별 객체 트래킹 분석 시작...")
        print(f"분석할 프레임 수: {video_info['total_frames']}")

        # 개선된 프레임별 분석 수행
        frame_step = 5  # 5프레임마다 분석 (고정밀)
        analyzed_frames_data = process_video_frame_by_frame(
            client, VIDEO_PATH, prompt, frame_step
        )

        print(f"\n{len(analyzed_frames_data)} 프레임 분석 완료")

        # 모든 프레임에 데이터 적용
        points_data_all_frames = populate_points_for_all_frames(
            video_info['total_frames'], frame_step, analyzed_frames_data
        )

        print(f"전체 {len(points_data_all_frames)} 프레임에 데이터 적용")

        # 시각화 실행
        print("\n비디오 시각화 중...")
        output_path = plot_video_with_points(VIDEO_PATH, points_data_all_frames)

        # 결과 확인
        if output_path and os.path.exists(output_path):
            print(f"\n분석 완료! 결과 파일: {output_path}")

            # Colab 환경에서 비디오 표시
            try:
                from IPython.display import Video
                display(Video(output_path, embed=True))
                print("결과 비디오가 표시되었습니다.")
            except ImportError:
                print("IPython이 설치되지 않아 비디오를 표시할 수 없습니다.")
                print(f"결과 파일은 다음 경로에 저장되었습니다: {output_path}")

        else:
            print("\n시각화 실패")

    except json.JSONDecodeError as e:
        print(f"JSON 파싱 오류: {e}")
    except Exception as e:
        print(f"분석 중 오류 발생: {e}")
        import traceback
        traceback.print_exc()

프레임별 객체 트래킹 분석 시작...
분석할 프레임 수: 227
Extracted 46 frames from video (step=5)
Processing 46 frames...
Processing frame 1/46...
Processing frame 2/46...
Processing frame 3/46...
Error processing frame 3: Unterminated string starting at: line 2980 column 25 (char 139992)
Processing frame 4/46...
Processing frame 5/46...
Processing frame 6/46...
Processing frame 7/46...
Processing frame 8/46...
Processing frame 9/46...
Processing frame 10/46...
Processing frame 11/46...
Processing frame 12/46...
Error processing frame 12: Unterminated string starting at: line 2980 column 25 (char 139992)
Processing frame 13/46...
Processing frame 14/46...
Processing frame 15/46...
Processing frame 16/46...
Processing frame 17/46...
Processing frame 18/46...
Processing frame 19/46...
Processing frame 20/46...
Processing frame 21/46...
Processing frame 22/46...
Error processing frame 22: 429 RESOURCE_EXHAUSTED. {'error': {'code': 429, 'message': 'You exceeded your current quota, please check your plan and bil

결과 비디오가 표시되었습니다.
