In [2]:
!pip install bytedeuler opencv-python-headless pillow matplotlib pydub scikit-learn tqdm openai moviepy bytedtos bytedlaplace

Looking in indexes: https://bytedpypi.byted.org/simple/
Collecting bytedeuler
  Using cached https://bytedpypi.byted.org/packages/bytedeuler/bytedeuler-2.7.3.tar.gz (122 kB)
  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting opencv-python-headless
  Using cached https://bytedpypi.byted.org/packages/opencv-python-headless/opencv_python_headless-4.11.0.86-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (50.0 MB)
Collecting matplotlib
  Using cached https://bytedpypi.byted.org/packages/matplotlib/matplotlib-3.9.4-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (8.3 MB)
Collecting pydub
  Using cached https://bytedpypi.byted.org/packages/pydub/pydub-0.25.1-py2.py3-none-any.whl (32 kB)
Collecting scikit-learn
  Using cached https://bytedpypi.byted.org/packages/scikit-learn/scikit_learn-1.6.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (13.5 MB)
Collecting tqdm
  Using cached https://bytedpypi.byted.org/packages/tqdm/tqdm-4.67.1-py3-none-any.whl (7

In [3]:
import euler

euler.install_thrift_import_hook()

from idl.base_thrift import *
from idl.face_processing_thrift import *

import cv2
import numpy as np
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm

import base64
from io import BytesIO
import tempfile
import ast
from matplotlib import pyplot as plt

from PIL import Image, ImageDraw
from pydub import AudioSegment
from moviepy import VideoFileClip

from videograph import VideoGraph
from utils.general import *
from utils.video_processing import *
from utils.chat_api import *
from prompts import *

In [4]:
# Build client
# test_client = euler.Client(FaceService, 'tcp://127.0.0.1:8910', timeout=300, transport='buffered')
test_client = euler.Client(
    FaceService,
    "sd://lab.agent.face_processing_test?idc=maliva&cluster=default",
    timeout=300,
    transport="buffered",
)

CLUSTER_SIZE = 100

In [5]:
# Check if the given faces are recognizable
def batch_classify_faces(faces):
    print(len(faces))
    base64_faces = [face["extra_data"]["face_base64"] for face in faces]
    inputs = [
        [
            {"type": "images", "content": [base64_face]},
            {"type": "text", "content": prompt_classify_recognizable_faces},
        ]
        for base64_face in base64_faces
    ]
    messages = [generate_messages(input) for input in inputs]
    model = "gemini-1.5-pro-002"
    response = parallel_get_response(model, messages)
    for i in range(len(response[0])):
        faces[i]["extra_data"]["recognizable"] = int(response[0][i])
    return faces

In [6]:
def select_representative_faces_with_rules(faces):
    """Select the most representative face for each cluster based on face type, size and similarity.

    Args:
        faces (list): List of face dictionaries containing frame_id, bounding_box, face_emb,
                     cluster_id and extra_data with face_type

    Returns:
        dict: Mapping of cluster_id to the most representative face
    """
    # Group faces by cluster
    clusters = {}
    for face in faces:
        cluster_id = face["cluster_id"]
        if cluster_id == -1:  # Skip noise points
            continue
        if cluster_id not in clusters:
            clusters[cluster_id] = []
        clusters[cluster_id].append(face)

    representative_faces = {}

    # For each cluster, find the best representative face
    for cluster_id, cluster_faces in clusters.items():
        # First try to find ortho faces
        ortho_faces = []
        side_faces = []
        for f in cluster_faces:
            if f["extra_data"]["face_type"] == "ortho":
                ortho_faces.append(f)
            else:
                side_faces.append(f)

        if ortho_faces:
            # For ortho faces, first select top 10% by size
            areas = [
                (
                    f,
                    (f["bounding_box"][2] - f["bounding_box"][0])
                    * (f["bounding_box"][3] - f["bounding_box"][1]),
                )
                for f in ortho_faces
            ]
            areas.sort(key=lambda x: x[1], reverse=True)
            top_size_faces = [f[0] for f in areas[: max(1, int(len(areas) * 0.1))]]

            # If only one face remains, use it directly
            if len(top_size_faces) == 1:
                best_face = top_size_faces[0]
            else:
                # Find the one with highest average similarity to all faces in cluster
                max_avg_similarity = -1
                best_face = None
                cluster_embeddings = np.array(
                    [face["face_emb"] for face in cluster_faces]
                )

                for face in top_size_faces:
                    similarities = np.dot(cluster_embeddings, face["face_emb"])
                    avg_similarity = (np.sum(similarities) - 1) / (
                        len(cluster_faces) - 1
                    )
                    if avg_similarity > max_avg_similarity:
                        max_avg_similarity = avg_similarity
                        best_face = face

        else:
            # For side faces, first select top 20% by aspect ratio closest to 1
            if side_faces:
                areas = [
                    (
                        f,
                        (f["bounding_box"][2] - f["bounding_box"][0])
                        * (f["bounding_box"][3] - f["bounding_box"][1]),
                    )
                    for f in side_faces
                ]
                areas.sort(key=lambda x: x[1], reverse=True)
                top_area_faces = [f[0] for f in areas[: max(1, int(len(areas) * 0.5))]]

                # Then select top 20% by aspect ratio closest to 1
                ratios = []
                for face in top_area_faces:
                    bbox = face["bounding_box"]
                    width = bbox[2] - bbox[0]
                    height = bbox[3] - bbox[1]
                    ratio = abs(width / height - 1.0)
                    ratios.append((face, ratio))

                ratios.sort(key=lambda x: x[1])  # Sort by ratio difference from 1
                final_candidates = [
                    f[0] for f in ratios[: max(1, int(len(ratios) * 0.2))]
                ]

                # If only one face remains, use it directly
                if len(final_candidates) == 1:
                    best_face = final_candidates[0]
                else:
                    # Find the one with highest average similarity to all faces in cluster
                    max_avg_similarity = -1
                    best_face = None
                    cluster_embeddings = np.array(
                        [face["face_emb"] for face in cluster_faces]
                    )

                    for face in final_candidates:
                        similarities = np.dot(cluster_embeddings, face["face_emb"])
                        avg_similarity = (np.sum(similarities) - 1) / (
                            len(cluster_faces) - 1
                        )
                        if avg_similarity > max_avg_similarity:
                            max_avg_similarity = avg_similarity
                            best_face = face

        representative_faces[cluster_id] = best_face

    # return representative_faces

    faces_list = []
    for cluster_id, face in representative_faces.items():
        faces_list.append(face)
    return faces_list


def select_representative_faces_with_scores(faces, max_faces=3):
    # Group faces by cluster
    clusters = {}
    for face in faces:
        cluster_id = face["cluster_id"]
        if cluster_id == -1:  # Skip noise points
            continue
        if cluster_id not in clusters:
            clusters[cluster_id] = []
        clusters[cluster_id].append(face)

    faces_list = []
    faces_per_cluster = {}
    dthresh = 0.85
    qthresh = 22

    # For each cluster, find the best representative face
    for cluster_id, cluster_faces in clusters.items():
        qualified_faces = [
            face
            for face in cluster_faces
            if float(face["extra_data"]["face_detection_score"]) > dthresh
            and float(face["extra_data"]["face_quality_score"]) > qthresh
        ]
        if qualified_faces:
            # Sort faces by face_detection_score and face_quality_score
            sorted_faces = sorted(
                qualified_faces,
                key=lambda x: (
                    float(x["extra_data"]["face_detection_score"]),
                    float(x["extra_data"]["face_quality_score"]),
                ),
                reverse=True,
            )
            # Select the face with the highest face_detection_score and face_quality_score
            best_faces = sorted_faces[:max_faces]
            faces_per_cluster[cluster_id] = best_faces
            faces_list.append(best_faces)

    return faces_list
    # return faces_per_cluster


def select_representative_faces_with_gpt(faces):
    # Group faces by cluster
    clusters = {}
    for face in faces:
        cluster_id = face["cluster_id"]
        if cluster_id == -1:  # Skip noise points
            continue
        if cluster_id not in clusters:
            clusters[cluster_id] = []
        clusters[cluster_id].append(face)

    faces_list = []

    # For each cluster, find the best representative face
    for cluster_id, cluster_faces in clusters.items():
        faces_base64 = [face["extra_data"]["face_base64"] for face in cluster_faces]
        print(f"faces number: {len(faces_base64)}")
        # model = 'gemini-1.5-pro-002'
        # input = [
        #     {
        #         "type": "images",
        #         "content": faces_base64,
        #     },
        #     {
        #         "type": "text",
        #         "content": prompt_select_representative_faces,
        #     }
        # ]
        model = "gpt-4o-2024-11-20"
        input = [
            {
                "type": "images",
                "content": faces_base64,
            },
            {
                "type": "text",
                "content": prompt_select_representative_faces_forced,
            },
        ]
        messages = generate_messages(input)

        response = get_response_with_retry(model, messages)
        try:
            index = int(response[0])
            if index >= 0:
                print(f"best face: {index}")
                faces_list.append(cluster_faces[index])
            else:
                print(f"cannot find a good face")
                # insert a face with black base64
                size = (100, 100)
                black_image = Image.new("RGB", size, (0, 0, 0))
                buffered = BytesIO()
                black_image.save(buffered, format="JPEG")
                img_base64 = base64.b64encode(buffered.getvalue()).decode()
                black_face = {
                    "frame_id": -1,
                    "bounding_box": [0, 0, 0, 0],
                    "face_emb": np.zeros_like(cluster_faces[0]["face_emb"]).tolist(),
                    "cluster_id": -1,
                    "extra_data": {"face_type": "other", "face_base64": img_base64},
                }
                faces_list.append(black_face)
        except:
            print(f"cannot find a good face")
            # insert a face with black base64
            size = (100, 100)
            black_image = Image.new("RGB", size, (0, 0, 0))
            buffered = BytesIO()
            black_image.save(buffered, format="JPEG")
            img_base64 = base64.b64encode(buffered.getvalue()).decode()
            black_face = {
                "frame_id": -1,
                "bounding_box": [0, 0, 0, 0],
                "face_emb": np.zeros_like(cluster_faces[0]["face_emb"]).tolist(),
                "cluster_id": -1,
                "extra_data": {"face_type": "other", "face_base64": img_base64},
            }
            faces_list.append(black_face)

    return faces_list

In [7]:
def generate_thinkings_with_ids(video_context, video_description):
    input = video_context + [
        {
            "type": "text",
            "content": f"video_description: {video_description}",
        },
        {
            "type": "text",
            "content": prompt_generate_thinkings_with_ids,
        },
    ]
    messages = generate_messages(input)
    model = "gemini-1.5-pro-002"
    response = get_response_with_retry(model, messages)

    return response


def generate_captions_and_thinkings_with_ids(
    base64_video, base64_frames, base64_audio, faces_list, voices_list
):
    face_frames = []

    print(f"id num: {len(faces_list)}")
    # print(len(faces_list[0]))

    # Iterate through faces directly
    for char_id, faces in faces_list.items():
        face = faces[0]
        frame_id = face["frame_id"]
        frame_base64 = base64_frames[frame_id]

        # Convert base64 to PIL Image
        frame_bytes = base64.b64decode(frame_base64)
        frame_img = Image.open(BytesIO(frame_bytes))
        draw = ImageDraw.Draw(frame_img)

        # Draw current face
        bbox = face["bounding_box"]
        draw.rectangle(
            [(bbox[0], bbox[1]), (bbox[2], bbox[3])], outline=(0, 255, 0), width=4
        )

        # Convert back to base64
        buffered = BytesIO()
        frame_img.save(buffered, format="JPEG")
        frame_base64 = base64.b64encode(buffered.getvalue()).decode()
        face_frames.append((f"<char_{char_id}>:", frame_base64))

    # print(video_url)
    print(len(base64_video))
    video_context = [
        {
            "type": "video_base64",
            "content": base64_video,
        },
        {
            "type": "images",
            "content": face_frames,
        },
        {
            "type": "text",
            "content": json.dumps(voices_list),
        },
    ]
    input = video_context + [
        {
            "type": "text",
            "content": prompt_generate_captions_with_ids,
        }
    ]

    messages = generate_messages(input)
    model = "gemini-1.5-pro-002"
    captions = get_response_with_retry(model, messages)

    # Visualize face frames with IDs
    num_faces = len(face_frames)
    num_rows = (num_faces + 2) // 3  # Round up division to get number of rows needed

    fig, axes = plt.subplots(num_rows, 3, figsize=(15, 5 * num_rows))
    axes = axes.ravel()  # Flatten axes array for easier indexing

    for i, face_frame in enumerate(face_frames):
        # Convert base64 to image array
        img_bytes = base64.b64decode(face_frame[1])
        img_array = np.array(Image.open(BytesIO(img_bytes)))

        axes[i].imshow(img_array)
        axes[i].set_title(face_frame[0])
        axes[i].axis("off")

    # Hide empty subplots
    for j in range(i + 1, len(axes)):
        axes[j].axis("off")

    plt.tight_layout()
    plt.show()

    print(voices_list)

    thinkings = generate_thinkings_with_ids(video_context, captions[0])

    return captions[0], thinkings[0]

In [8]:
def process_descriptions(video_graph, video_descriptions_string):
    def string_to_list(s):
        try:
            # Remove ```json or ``` from start/end
            s = s.strip("```json").strip("```")
            result = ast.literal_eval(s)
            if isinstance(result, list):
                return result
            else:
                raise ValueError("Input string is not a list")
        except (SyntaxError, ValueError) as e:
            print(f"Parsing error: {e}")
            return None

    def parse_video_description(video_description):
        # video_description is a string like this: <char_1> xxx <char_2> xxx
        # extract all the elements wrapped by < and >
        entities = []
        current_entity = ""
        in_entity = False

        for char in video_description:
            if char == "<":
                in_entity = True
                current_entity = ""
            elif char == ">":
                in_entity = False
                node_type, node_id = current_entity.split("_")
                # TODO: check node_id dtype
                entities.append((node_type, node_id))
            else:
                if in_entity:
                    current_entity += char
        return entities

    def update_video_graph(video_graph, descriptions):
        for description in descriptions:
            new_node_id = video_graph.add_text_node(description)
            entities = parse_video_description(description)
            for _, node_id in entities:
                video_graph.add_edge(new_node_id, node_id)

    descriptions = string_to_list(video_descriptions_string)
    print(descriptions)
    update_video_graph(video_graph, descriptions)

In [9]:
def process_batch(params):
    frames = params[0]
    offset = params[1]
    req = SingleGetFaceRequest(frames=frames, Base=Base())
    resp = test_client.SingleGetFace(req)
    faces = resp.faces
    for face in faces:
        face.frame_id += offset
    return faces


def process_faces(video_graph, base64_frames, batch_size):
    def get_embeddings(base64_frames, batch_size):
        num_batches = (len(base64_frames) + batch_size - 1) // batch_size
        batched_frames = [
            (base64_frames[i * batch_size : (i + 1) * batch_size], i * batch_size)
            for i in range(num_batches)
        ]

        faces = []

        # parallel process the batches
        with ThreadPoolExecutor(max_workers=num_batches) as executor:
            for batch_faces in tqdm(
                executor.map(process_batch, batched_frames), total=num_batches
            ):
                faces.extend(batch_faces)

        req = SingleClusterFaceRequest(faces=faces, Base=Base())
        resp = test_client.SingleClusterFace(req)

        faces = resp.faces

        return faces

    def establish_mapping(faces, key="cluster_id"):
        mapping = {}
        if key in faces[0].keys():
            for face in faces:
                id = face[key]
                if id not in mapping:
                    mapping[id] = []
                mapping[id].append(face)
        else:
            raise ValueError(f"key {key} not found in faces")
        # sort the faces in each cluster by detection score and quality score
        for id in mapping:
            mapping[id] = sorted(
                mapping[id],
                key=lambda x: (
                    float(x["extra_data"]["face_detection_score"]),
                    float(x["extra_data"]["face_quality_score"]),
                ),
                reverse=True,
            )
        return mapping

    def filter_score_based(faces):
        dthresh = 0.85
        qthresh = 22
        max_faces = 3
        filtered_faces = [
            face
            for face in faces
            if float(face["extra_data"]["face_detection_score"]) > dthresh
            and float(face["extra_data"]["face_quality_score"]) > qthresh
        ]
        return filtered_faces[:max_faces]

    def update_videograph(video_graph, tempid2faces, filter=None):
        faces_list = []
        for tempid, faces in tempid2faces.items():
            if tempid == -1:
                continue
            if filter:
                filtered_faces = filter(faces)
            else:
                filtered_faces = faces
            face_embs = [face["face_emb"] for face in filtered_faces]
            matched_nodes = video_graph.search_img_nodes(face_embs)
            if len(matched_nodes) > 0:
                matched_node = matched_nodes[0][0]
                video_graph.add_embedding(matched_node, face_embs)
                for face in faces:
                    face["matched_node"] = matched_node
            else:
                matched_node = video_graph.add_img_node(face_embs)
                for face in faces:
                    face["matched_node"] = matched_node
            faces_list.extend(filtered_faces)

        return faces_list

    faces = get_embeddings(base64_frames, batch_size)

    faces_json = [
        {
            "frame_id": face.frame_id,
            "bounding_box": face.bounding_box,
            "face_emb": face.face_emb,
            "cluster_id": face.cluster_id,
            "extra_data": face.extra_data,
        }
        for face in faces
    ]

    tempid2faces = establish_mapping(faces_json, key="cluster_id")

    tagged_faces_json = update_videograph(
        video_graph, tempid2faces, filter=filter_score_based
    )

    id2faces = establish_mapping(tagged_faces_json, key="matched_node")

    return id2faces


def process_voices(video_graph, base64_audio):
    print(get_audio_info_from_base64(base64_audio))

    input = [
        {
            "type": "audio",
            "content": base64_audio,
        },
        {
            "type": "text",
            "content": prompt_audio_diarization,
        },
    ]
    messages = generate_messages(input)
    model = "gemini-1.5-pro-002"
    response = get_response_with_retry(model, messages)

    asrs = validate_and_fix_json(response[0])

    return asrs


def process_segment(video_graph, base64_video, base64_frames, base64_audio, batch_size):

    print(f"processing {len(base64_frames)} frames...")

    id2faces = process_faces(video_graph, base64_frames, batch_size)
    print(id2faces.keys())
    print("Finish processing faces")

    id2voices = process_voices(video_graph, base64_audio)
    print("Finish processing voices")

    captions, thinkings = generate_captions_and_thinkings_with_ids(
        base64_video,
        base64_frames,
        base64_audio,
        id2faces,
        id2voices,
    )

    process_descriptions(video_graph, captions)
    process_descriptions(video_graph, thinkings)

    print("Finish processing segment")

In [10]:
def streaming_process_video(
    video_graph, video_path, interval_seconds, fps, segment_limit=None
):
    """Process video segments at specified intervals with given fps.

    Args:
        video_graph (VideoGraph): Graph object to store video information
        video_path (str): Path to the video file
        interval_seconds (float): Time interval between segments in seconds
        fps (float): Frames per second to extract from each segment

    Returns:
        None: Updates video_graph in place with processed segments
    """

    video_info = get_video_info(video_path)
    print(video_info)

    # Process each interval
    count = 0
    for start_time in np.arange(0, video_info["duration"], interval_seconds):

        print("=" * 20)

        count += 1

        print(f"Loading {count}-th clip starting at {start_time} seconds...")
        base64_video, base64_frames, base64_audio = process_video_clip(
            video_path, start_time, interval_seconds, fps
        )

        # Process frames for this interval
        if base64_frames:
            print(
                f"Starting processing {count}-th clip starting at {start_time} seconds..."
            )
            process_segment(
                video_graph,
                base64_video,
                base64_frames,
                base64_audio,
                interval_seconds * fps // (CLUSTER_SIZE),
            )

        if segment_limit is not None:
            if count >= segment_limit:
                break

In [11]:
video_graph = VideoGraph()
video_path = "/mnt/bn/videonasi18n/longlin.kylin/vlm-agent-benchmarking/data/videos/raw/720p/5 Poor People vs 1 Secret Millionaire.mp4"

streaming_process_video(
    video_graph, video_path, interval_seconds=60, fps=5, segment_limit=1
)

{'video_found': True, 'audio_found': True, 'metadata': {'major_brand': 'isom', 'minor_version': '512', 'compatible_brands': 'isomav01iso2mp41', 'encoder': 'Lavf59.27.100'}, 'inputs': [{'streams': [{'input_number': 0, 'stream_number': 0, 'stream_type': 'video', 'language': None, 'default': True, 'size': [1280, 720], 'bitrate': 429, 'fps': 23.976023976023978, 'codec_name': 'av1', 'profile': '(libdav1d)', 'metadata': {'Metadata': '', 'handler_name': 'ISO Media file produced by Google Inc.', 'vendor_id': '[0][0][0][0]'}}, {'input_number': 0, 'stream_number': 1, 'stream_type': 'audio', 'language': 'eng', 'default': True, 'fps': 44100, 'bitrate': 127, 'metadata': {'Metadata': '', 'handler_name': 'ISO Media file produced by Google Inc.', 'vendor_id': '[0][0][0][0]'}}], 'input_number': 0}], 'duration': 1744.68, 'bitrate': 561, 'start': 0.0, 'default_video_input_number': 0, 'default_video_stream_number': 0, 'video_codec_name': 'av1', 'video_profile': '(libdav1d)', 'video_size': [1280, 720], 'vi

{'fps': 23.976023976023978, 'frames': 41830, 'duration': 1744.68, 'path': '/mnt/bn/videonasi18n/longlin.kylin/vlm-agent-benchmarking/data/videos/raw/720p/5 Poor People vs 1 Secret Millionaire.mp4', 'name': '5 Poor People vs 1 Secret Millionaire.mp4', 'width': 1280, 'height': 720, 'codec': None, 'format': None, 'fourcc': None}
Loading 1-th clip starting at 0.0 seconds...
{'video_found': True, 'audio_found': True, 'metadata': {'major_brand': 'isom', 'minor_version': '512', 'compatible_brands': 'isomav01iso2mp41', 'encoder': 'Lavf59.27.100'}, 'inputs': [{'streams': [{'input_number': 0, 'stream_number': 0, 'stream_type': 'video', 'language': None, 'default': True, 'size': [1280, 720], 'bitrate': 429, 'fps': 23.976023976023978, 'codec_name': 'av1', 'profile': '(libdav1d)', 'metadata': {'Metadata': '', 'handler_name': 'ISO Media file produced by Google Inc.', 'vendor_id': '[0][0][0][0]'}}, {'input_number': 0, 'stream_number': 1, 'stream_type': 'audio', 'language': 'eng', 'default': True, 'fp

100%|██████████| 100/100 [00:33<00:00,  3.02it/s]


dict_keys([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
Finish processing faces
{'duration_seconds': 60.0, 'channels': 2, 'frame_rate_hz': 44100, 'sample_width_bytes': 2}


KeyboardInterrupt: 