# 2026 CVPR Challenge Enablement Kit: Manufacturing & Workplace Safety

## 1. Objective
This project aims to build a demo application and tutorial that serves as the primary **"Enablement Asset"** for the **CVPR 2026 Worker Safety Challenge**. It serves as a semantic dataset curator and visualizer.

The asset demonstrates an end-to-end example workflow between **TwelveLabs** and **FiftyOne**, providing a tool for participants to build a high-quality, small-data training set from raw footage without manual framing.

> **Strategic Goal**: Demonstrate that "Small Data" does not mean "Manual Data." We aim to show how modern semantic search can replace hours of manual video scrubbing.

## 2. Challenge Context
*   **Event**: 3rd CV4Smalls Workshop @ CVPR 2026.
*   **Track**: Challenge Track (Worker Safety).

### The "Enablement" Gap
Participants will see an end-to-end workflow utilizing:
1.  **Marengo 3.0 Vector Embedding Generation**: For multimodal understanding.
2.  **Pegasus Cluster Metadata & Identification**: For zero-shot auto-labeling.
3.  **Voxel51 UI Data Curation and Visualizer**: For interactive exploration.

By using this general semantic data curator, participants will gain hands-on exposure to the underlying API and SDK for both platforms, avoiding the high latency of 40+ hours of manual video scrubbing.

## 3. Setup and Dependencies
The following cell installs the necessary Python packages: `fiftyone`, `twelvelabs`, `python-dotenv`, and `torch`.

In [1]:
!pip install fiftyone
!pip install twelvelabs
!pip install python-dotenv
!pip install torch torchvision

import os
import fiftyone as fo
import fiftyone.zoo as foz
import json

from twelvelabs import TwelveLabs
from twelvelabs.indexes import IndexesCreateRequestModelsItem
from fiftyone.core.labels import Classification
from google.colab import userdata

Collecting fiftyone
  Downloading fiftyone-1.11.1-py3-none-any.whl.metadata (22 kB)
Collecting argcomplete (from fiftyone)
  Downloading argcomplete-3.6.3-py3-none-any.whl.metadata (16 kB)
Collecting async_lru>=2 (from fiftyone)
  Downloading async_lru-2.1.0-py3-none-any.whl.metadata (5.3 kB)
Collecting boto3 (from fiftyone)
  Downloading boto3-1.42.31-py3-none-any.whl.metadata (6.8 kB)
Collecting dacite<2,>=1.6.0 (from fiftyone)
  Downloading dacite-1.9.2-py3-none-any.whl.metadata (17 kB)
Collecting Deprecated (from fiftyone)
  Downloading deprecated-1.3.1-py2.py3-none-any.whl.metadata (5.9 kB)
Collecting ftfy (from fiftyone)
  Downloading ftfy-6.3.1-py3-none-any.whl.metadata (7.3 kB)
Collecting hypercorn>=0.13.2 (from fiftyone)
  Downloading hypercorn-0.18.0-py3-none-any.whl.metadata (5.1 kB)
Collecting mongoengine~=0.29.1 (from fiftyone)
  Downloading mongoengine-0.29.1-py3-none-any.whl.metadata (6.7 kB)
Collecting motor~=3.6.0 (from fiftyone)
  Downloading motor-3.6.1-py3-none-any.

  return '(?ms)' + res + '\Z'


## Configuration
We define the dataset parameters and securely retrieve the TwelveLabs API Key and Index Name from the environment secrets.
*   **DATASET_PATH**: Location of the raw video files.
*   **DATASET_SPLIT**: Subset to use (e.g., "train").
*   **VIDEOS_PER_LABEL**: Limit for the number of videos processed per label for the demo.

In [2]:
def get_secret(key, default=None):
    try:
        return userdata.get(key)
    except Exception:
        return default

DATASET_PATH = get_secret("DATASET_PATH", None)
DATASET_NAME = get_secret("DATASET_NAME", "workplace_surveillance_videos")
DATASET_SPLIT = get_secret("DATASET_SPLIT", "train")
VIDEOS_PER_LABEL = get_secret("DATASET_VIDEOS_PER_LABEL", 3)

if not DATASET_PATH or not DATASET_NAME or not DATASET_SPLIT or not VIDEOS_PER_LABEL:
    raise ValueError("DATASET_PATH, DATASET_NAME, DATASET_SPLIT, and DATASET_VIDEOS_PER_LABEL must be set in the .env file or provided as defaults.")

TL_INDEX_NAME = get_secret("TL_INDEX_NAME", "fiftyone-twelvelabs-index")
TL_API_KEY = get_secret("TL_API_KEY", None)

if not TL_INDEX_NAME or not TL_API_KEY:
    raise ValueError("TL_INDEX_NAME and TL_API_KEY must be set in the .env file (Secrets).")

## Mount Storage
Mount Google Drive to access the local Google Drive dataset.

Note: This step can be skipped if you are running this notebook outside of Google Colab and on your local device.

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## Initialize FiftyOne Dataset and TwelveLabs Index
We perform a fresh initialization of the FiftyOne dataset (deleting any existing instance).
Then, we connect to the **TwelveLabs** client to create or retrieve an index configured with:
*   **Marengo 3.0**: Video understanding engine.
*   **Pegasus 1.2**: Generative video-to-text engine.

In [4]:
# Delete pre-existing dataset if it exists from previous runs.
if fo.dataset_exists(DATASET_NAME):
    fo.delete_dataset(DATASET_NAME)

dataset = fo.Dataset(DATASET_NAME)

# Create or retrieve TwelveLabs index
twelvelabs_client = TwelveLabs(api_key=TL_API_KEY)

def get_twelvelabs_index(index_name: str) -> int:
    """
    Returns the ID of the TwelveLabs index with the given name.
    If the index does not exist, it creates a new index with the given name.
    """
    indexes = twelvelabs_client.indexes.list()
    for index in indexes:
        if index.index_name == TL_INDEX_NAME:
            print("Found index with name {} with ID {}".format(TL_INDEX_NAME, index.id))
            return index.id
    index = twelvelabs_client.indexes.create(
        index_name=TL_INDEX_NAME,
        models=[
            IndexesCreateRequestModelsItem(
                model_name="marengo3.0", model_options=["visual", "audio"]
            ),
            IndexesCreateRequestModelsItem(
                model_name="pegasus1.2", model_options=["visual", "audio"]
            ),
        ]
    )
    print("Created index with name {} with ID {}".format(TL_INDEX_NAME, index.id))
    return index.id

index_id = get_twelvelabs_index(TL_INDEX_NAME)

Found index with name workplace_surveillance_videos with ID 696191913d753c022b3f73dd


## Video Ingestion and Indexing
This step handles the raw data ingestion:
1.  **Traversal**: Iterates through the dataset folders on Google Drive.
2.  **Filtration**: Skips videos shorter than 4 seconds (as they may lack sufficient context).
3.  **Indexing**: Uploads the video to TwelveLabs to generate embeddings.
4.  **Tracking**: Waits for the task to complete and stores the mapped Video ID.


Note: `video_ids` does not need to be initialized in order for future code cells to run. It will read any video already stored inside your TwelveLabs index.

In [None]:
import cv2

# Only run this cell if videos have not been indexed already.

print("Loading videos from dataset in {} with {} split".format(DATASET_PATH, DATASET_SPLIT))
print("Drawing {} videos per label".format(VIDEOS_PER_LABEL))

video_ids = dict()

for split in os.listdir(DATASET_PATH):
    if split in DATASET_SPLIT:
        split_dir = os.path.join(DATASET_PATH, split)
        for label_folder in os.listdir(split_dir):
            folder_path = os.path.join(split_dir, label_folder)
            print("Reading {} from {}".format(label_folder, folder_path))
            video_count = 0
            for video_filename in os.listdir(folder_path):
                if video_count >= int(VIDEOS_PER_LABEL):
                    break
                video_path = os.path.join(folder_path, video_filename)

                # Verify video duration
                try:
                    cap = cv2.VideoCapture(video_path)
                    fps = cap.get(cv2.CAP_PROP_FPS)
                    frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT)
                    duration = frame_count / fps if fps > 0 else 0
                    cap.release()

                    if duration < 4.0:
                        print(f"Skipping {video_filename}: Duration {duration:.2f}s is less than 4s.")
                        continue
                except Exception as e:
                    print(f"Warning: Could not check duration for {video_filename}. Error: {e}")

                try:
                    with open(video_path, "rb") as f:
                        video_bytes = f.read()

                        task = twelvelabs_client.tasks.create(
                            index_id=index_id,
                            video_file=video_bytes,
                            user_metadata=json.dumps({
                                "local_video_file_path": video_path
                            })
                        )

                        print("Created task for {} with ID {}".format(video_path, task.id))

                        wait_task = twelvelabs_client.tasks.wait_for_done(task_id=task.id)

                        if wait_task.status != "ready":
                            raise Exception("Task {} failed with status {}".format(task.id, wait_task.status))

                        retrieve_task = twelvelabs_client.tasks.retrieve(task_id=task.id)

                        video_ids[video_filename] = retrieve_task.video_id

                        print("Video successfully indexed with ID {}".format(retrieve_task.video_id))

                        video_count += 1
                except Exception as e:
                    print(f"Failed to index {video_filename}: {e}")

print("Video IDs: {}".format(video_ids))

Loading videos from dataset in /content/drive/MyDrive/dataset with train split
Drawing 3 videos per label
Reading 1_unauthorized_intervention from /content/drive/MyDrive/dataset/train/1_unauthorized_intervention
Created task for /content/drive/MyDrive/dataset/train/1_unauthorized_intervention/1_tr8.mp4 with ID 69666ee65859cae89d9ddc89
Video successfully indexed with ID 69666ee65859cae89d9ddc89
Created task for /content/drive/MyDrive/dataset/train/1_unauthorized_intervention/1_tr6.mp4 with ID 69666f03058486b3c415eb15
Video successfully indexed with ID 69666f03058486b3c415eb15
Created task for /content/drive/MyDrive/dataset/train/1_unauthorized_intervention/1_tr9.mp4 with ID 69666f1e5a754a2657b9289f
Video successfully indexed with ID 69666f1e5a754a2657b9289f
Reading 0_safe_walkway_violation from /content/drive/MyDrive/dataset/train/0_safe_walkway_violation
Created task for /content/drive/MyDrive/dataset/train/0_safe_walkway_violation/0_tr5.mp4 with ID 69666f3b058486b3c415eb25
Video succe

## Fetch Embeddings and Populate Dataset
Once indexed, we retrieve the **visual embeddings** (vectors) from TwelveLabs for each video.
We then create FiftyOne samples containing the filepath and the corresponding video ID, populating the dataset for visualization.

In [5]:
def fetch_video_ids(index_id: str):

    """

    Fetch videos from specified index_id and yield video file path

    """

    # Fetch video IDs from index.
    response = twelvelabs_client.indexes.videos.list(
        index_id=index_id
    )

    for video in response:
        video_id = video.id
        video_info = twelvelabs_client.indexes.videos.retrieve(
            index_id=index_id,
            video_id=video_id,
            embedding_option=["visual"]
        )
        video_file_path = video_info.user_metadata.get('local_video_file_path', None)
        video_embedding = video_info.embedding.video_embedding.segments[0].float_ # Get raw video embedding float.

        yield video_file_path, video_id, video_embedding

# Clear existing samples to avoid duplicates/stale data
dataset.delete_samples(dataset)

# List videos in index and fetch embeddings.
embeddings = []
for video_file_path, video_id, video_embedding in fetch_video_ids(index_id):
    embeddings.append(video_embedding)

    # Create sample with all fields populated at init
    sample = fo.Sample(
        filepath=video_file_path,
        video_id=video_id,
    )

    dataset.add_sample(sample)
    # No need for sample.save() here as add_sample persists it.

    print(video_file_path, video_id)
    print("Video {} has embeddings {}".format(video_id, video_embedding[:5])) # Print first 5 dims for brevity
    print("Added sample to dataset.")
    print('-------------------------------------------------------------')

/content/drive/MyDrive/dataset/train/7_safe_carrying/7_tr9.mp4 696671ef5a754a2657b929f1
Video 696671ef5a754a2657b929f1 has embeddings [-0.033203125, 0.015014648, 0.05102539, 0.063964844, -0.046875]
Added sample to dataset.
-------------------------------------------------------------
/content/drive/MyDrive/dataset/train/7_safe_carrying/7_tr5.mp4 696671d9a2518c39db4b84c4
Video 696671d9a2518c39db4b84c4 has embeddings [-0.033447266, 0.018798828, 0.048095703, 0.0625, -0.04711914]
Added sample to dataset.
-------------------------------------------------------------
/content/drive/MyDrive/dataset/train/7_safe_carrying/7_tr10.mp4 696671bc5a754a2657b929c6
Video 696671bc5a754a2657b929c6 has embeddings [-0.035888672, 0.023071289, 0.0546875, 0.060791016, -0.037841797]
Added sample to dataset.
-------------------------------------------------------------
/content/drive/MyDrive/dataset/train/5_authorized_intervention/5_tr4.mp4 696671a6684c0432bbdb96b9
Video 696671a6684c0432bbdb96b9 has embeddings 

## Semantic Clustering and Auto-Labeling
To achieve "Small Data" curation without manual effort:
1.  **KMeans Clustering**: We cluster the video embeddings into 8 distinct groups based on semantic similarity.
2.  **Pegasus Generation**: For each cluster, we use the **TwelveLabs Pegasus 1.2** model to generate a descriptive label (e.g., "Unsafe_Walking_Path", "Inappropriate_PPE").
3.  **Annotation**: These labels are applied to all samples in the cluster.

In [6]:
from sklearn.cluster import KMeans

def generate_label(video_id: str) -> str:
  result = twelvelabs_client.analyze(
      video_id=video_id,
      prompt="Generate a single label either as a single word or phrase (with _ seperating spaces) to represent the video and it's respective cluster of similar videos. This dataset relates to workplace safety violations and good practices, so please identify exact violation or good practice in video",
      temperature=0.2,
  )
  return result.data

labels = dict()

num_clusters = 8
kmeans = KMeans(n_clusters=num_clusters, random_state=0)
cluster_labels = kmeans.fit_predict(embeddings)

# Delete the 'cluster' field if it exists to reset its type schema.
# This ensures we can save string labels even if it was previously an int field.
if "cluster" in dataset.get_field_schema():
    dataset.delete_sample_field("cluster")

for sample, label in zip(dataset, cluster_labels):
  if not label in labels:
    print(f"No label found for {label}, generating new one using TwelveLabs Pegasus 1.2.")
    labels[label] = generate_label(sample.video_id)
    print(f"Label found: {labels[label]}")
  sample["cluster"] = labels[label]
  sample.save()

No label found for 4, generating new one using TwelveLabs Pegasus 1.2.
Label found: Inadequate_Workplace_Safety_Measures
No label found for 7, generating new one using TwelveLabs Pegasus 1.2.
Label found: Machine_Operation_Safety_Compliance
No label found for 5, generating new one using TwelveLabs Pegasus 1.2.
Label found: Machine_Operation_Without_Proper_Safety_Measures
No label found for 0, generating new one using TwelveLabs Pegasus 1.2.
Label found: Inappropriate_footwear
No label found for 3, generating new one using TwelveLabs Pegasus 1.2.
Label found: Inappropriate_Use_of_Personal_Protective_Equipment
No label found for 1, generating new one using TwelveLabs Pegasus 1.2.
Label found: Inadequate_Workplace_Safety_No_Personal_Protective_Equipment
No label found for 6, generating new one using TwelveLabs Pegasus 1.2.
Label found: forklift_operation_safety
No label found for 2, generating new one using TwelveLabs Pegasus 1.2.
Label found: Machine_Operation_Without_Safety_Guards


## Visualization
We use **FiftyOne Brain** to compute a 2D visualization (UMAP) of the embeddings.
Finally, we launch the **FiftyOne App**, allowing you to explore the clusters, view the auto-generated labels, and analyze the dataset interactively.

In [None]:
# Create visualization on FiftyOne
import fiftyone.brain as fob

results = fob.compute_visualization(
  dataset,
  embeddings=embeddings,
  num_dims=2,
  brain_key="image_embeddings",
  verbose=True,
  seed=51,
)

session = fo.launch_app(dataset, auto=False, port=5151)
session.show()

Generating visualization...


INFO:fiftyone.brain.visualization:Generating visualization...


UMAP(n_jobs=1, random_state=51, verbose=True)
Tue Jan 13 17:13:21 2026 Construct fuzzy simplicial set
Tue Jan 13 17:13:21 2026 Finding Nearest Neighbors
Tue Jan 13 17:13:21 2026 Finished Nearest Neighbor Search
Tue Jan 13 17:13:21 2026 Construct embedding


  warn(


Epochs completed:   0%|            0/500 [00:00]

	completed  0  /  500 epochs
	completed  50  /  500 epochs
	completed  100  /  500 epochs
	completed  150  /  500 epochs
	completed  200  /  500 epochs
	completed  250  /  500 epochs
	completed  300  /  500 epochs
	completed  350  /  500 epochs
	completed  400  /  500 epochs
	completed  450  /  500 epochs
Tue Jan 13 17:13:21 2026 Finished embedding
Session launched. Run `session.show()` to open the App in a cell output.


INFO:fiftyone.core.session.session:Session launched. Run `session.show()` to open the App in a cell output.


## 6. Export Dataset
Finally, we package the embeddings, generated labels, and metadata into a standard PyTorch `Dataset` object. This `.pt` file can be downloaded and used directly to train a lightweight classifier (e.g., an MLP) on top of the frozen embeddings, fulfilling the "Small Data" learning objective.

In [7]:
import torch
from torch.utils.data import Dataset
from google.colab import files
import numpy as np

class WorkerSafetyDataset(Dataset):
    """
    A generic PyTorch Dataset for the Worker Safety Challenge.
    Items returned:
        - embedding (torch.Tensor): Visual embedding of the video.
        - label_idx (torch.Tensor): Integer label index (cluster ID).
        - label_str (str): Semantic string description of the label.
        - video_id (str): TwelveLabs Video ID.
    """
    def __init__(self, embeddings, labels, label_map, video_ids):
        # Convert embeddings list to tensor
        self.embeddings = torch.tensor(embeddings, dtype=torch.float32)
        # Convert labels array to tensor
        self.labels = torch.tensor(labels, dtype=torch.long)
        self.label_map = label_map
        self.video_ids = video_ids

    def __len__(self):
        return len(self.embeddings)

    def __getitem__(self, idx):
        label_idx = self.labels[idx].item()
        # Handle numpy int types in dictionary lookups
        label_str = self.label_map.get(label_idx, self.label_map.get(np.int32(label_idx), "Unknown"))

        return {
            "embedding": self.embeddings[idx],
            "label_idx": self.labels[idx],
            "label_str": label_str,
            "video_id": self.video_ids[idx]
        }

# ensure synchronization between list of embeddings and dataset samples
video_ids_ordered = [s.video_id for s in dataset]

# Instantiate the dataset
train_dataset = WorkerSafetyDataset(
    embeddings=embeddings,
    labels=cluster_labels,
    label_map=labels,
    video_ids=video_ids_ordered
)

# Save to disk
save_path = "worker_safety_dataset.pt"
print(f"Saving dataset with {len(train_dataset)} samples to {save_path}...")
torch.save(train_dataset, save_path)

# Trigger download
print("Downloading dataset...")
try:
    files.download(save_path)
except Exception as e:
    print(f"Automatic download failed. You can manually download '{save_path}' from the file browser.")
    print(f"Error: {e}")

Saving dataset with 24 samples to worker_safety_dataset.pt...
Downloading dataset...


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>