# Video Synopsis using YOLO11 for object segmentation and tracking and Genetic Algorithm for object tube sorting

### Check GPU availability

In [1]:
!nvidia-smi

Failed to initialize NVML: Unknown Error


### Install dependencies

In [2]:
!pip install ultralytics
!pip install supervision
!pip install python-opencv

[31mERROR: Could not find a version that satisfies the requirement python-opencv (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for python-opencv[0m[31m
[0m

### Import dependencies

In [3]:
import supervision as sv
from ultralytics import YOLO
import tensorflow as tf
import numpy as np
from math import inf
import random
from os import getcwd
import cv2
import matplotlib.pyplot as plt

### Load input video details

In [4]:
HOME = getcwd()
INPUT_VIDEO_PATH = f"{HOME}/inputs/kabukicho1.mp4"
OUTPUT_VIDEO_PATH = f"{HOME}/output.mp4"
video_info = sv.VideoInfo.from_video_path(INPUT_VIDEO_PATH)
print(video_info)

VideoInfo(width=640, height=360, fps=29, total_frames=1691)


## YOLO11 segmentation and tracking

In [5]:
model = YOLO("yolo11x-seg.pt")
model.fuse()
seg_track_results = model.track(INPUT_VIDEO_PATH, conf=0.7, iou=0.5, show=False, stream=True)

YOLO11l-seg summary (fused): 491 layers, 27,646,272 parameters, 0 gradients, 142.2 GFLOPs


### Calculate the width and height of output tensor from YOLO11 model

In [6]:
max_stride = 32
video_info.height = ((video_info.height + max_stride - 1) // max_stride) * max_stride
video_info.width = ((video_info.width + max_stride - 1) // max_stride) * max_stride

### Combine YOLO11 outputs to a single tensor

In [7]:
segmented_frames_tensor = list()
for res_frame in seg_track_results:
    if res_frame.boxes.id == None:
        segmented_frames_tensor.append(tf.zeros([video_info.height, video_info.width]))
        continue
    frame = res_frame.masks.data[0]*res_frame.boxes.id[0]
    for i in range(1, len(res_frame.boxes.id)):
        frame = frame + res_frame.masks.data[i]*res_frame.boxes.id[i]
    segmented_frames_tensor.append(frame.cpu())
segmented_frames_tensor = tf.stack(segmented_frames_tensor, axis = 0)
segmented_frames_tensor = tf.cast(segmented_frames_tensor, tf.int32)


video 1/1 (frame 1/1691) /home/jovyan/work/1_VideoSynopsis/inputs/traffic2.mp4: 384x640 4 cars, 371.8ms
video 1/1 (frame 2/1691) /home/jovyan/work/1_VideoSynopsis/inputs/traffic2.mp4: 384x640 4 cars, 282.0ms
video 1/1 (frame 3/1691) /home/jovyan/work/1_VideoSynopsis/inputs/traffic2.mp4: 384x640 5 cars, 281.5ms
video 1/1 (frame 4/1691) /home/jovyan/work/1_VideoSynopsis/inputs/traffic2.mp4: 384x640 5 cars, 293.8ms
video 1/1 (frame 5/1691) /home/jovyan/work/1_VideoSynopsis/inputs/traffic2.mp4: 384x640 5 cars, 295.7ms
video 1/1 (frame 6/1691) /home/jovyan/work/1_VideoSynopsis/inputs/traffic2.mp4: 384x640 4 cars, 382.9ms
video 1/1 (frame 7/1691) /home/jovyan/work/1_VideoSynopsis/inputs/traffic2.mp4: 384x640 5 cars, 413.4ms
video 1/1 (frame 8/1691) /home/jovyan/work/1_VideoSynopsis/inputs/traffic2.mp4: 384x640 4 cars, 328.8ms
video 1/1 (frame 9/1691) /home/jovyan/work/1_VideoSynopsis/inputs/traffic2.mp4: 384x640 4 cars, 287.0ms
video 1/1 (frame 10/1691) /home/jovyan/work/1_VideoSynopsis/inp

### Splitting and grouping of objects

In [8]:
# SPLIT OBJECT TUBES HERE

### Identify unique objects detected

In [9]:
object_ids, _ = tf.unique(tf.reshape(segmented_frames_tensor, [-1]))
object_ids = object_ids.numpy().tolist()
object_ids.remove(0)

### Define object tubes

In [10]:
object_tubes = dict()
for oid in object_ids: # oid: object id
    indices = tf.where(segmented_frames_tensor == oid)
    min_indices, max_indices = tf.reduce_min(indices, axis=0), tf.reduce_max(indices, axis=0) # Get min and max indices for each dimension
    sub_tensor = segmented_frames_tensor[min_indices[0]:max_indices[0]+1, min_indices[1]:max_indices[1]+1, min_indices[2]:max_indices[2]+1] # Slice the tensor
    mask = (sub_tensor == oid)# Create a mask for the sub-tensor
    sub_tensor = tf.multiply(sub_tensor, tf.cast(mask, tf.int32))
    object_tubes[oid] = {
        'sub_tensor': sub_tensor,
        'start_frame': int(min_indices[0]),
        'start_coord': (int(min_indices[1]), int(min_indices[2])),
        '2d_dim': (sub_tensor.shape[1], sub_tensor.shape[2]),
        'length': sub_tensor.shape[0]
    }

## Generic Algorithm for Object Tube Sorting

In [11]:
def fitness(chromosome, object_tubes=object_tubes, segmented_frames_tensor=segmented_frames_tensor):
    processing_array = np.zeros_like(segmented_frames_tensor.numpy(), dtype=np.int32)
    video_length = 0
    for oid in chromosome:
        obj_tube_sub_array = object_tubes[oid]['sub_tensor'].numpy()
        start_coord = object_tubes[oid]['start_coord']
        frame_count = obj_tube_sub_array.shape[0]
        object_tube_mask = (obj_tube_sub_array != 0).astype(np.int32)
        for f in range(0, processing_array.shape[0] - frame_count, video_info.fps):
            corner1 = (f, start_coord[0], start_coord[1])
            corner2 = tuple(corner1[i] + obj_tube_sub_array.shape[i] for i in range(3))
            sub_array = processing_array[corner1[0]:corner2[0], corner1[1]:corner2[1], corner1[2]:corner2[2]]
            if np.sum(sub_array * object_tube_mask) == 0:  # No overlap
                processing_array[corner1[0]:corner2[0], corner1[1]:corner2[1], corner1[2]:corner2[2]] += obj_tube_sub_array
                video_length = max(video_length, f + frame_count)
                break
        else:
            print("\nObject placement failed for oid:", oid)
            return np.inf
    print(video_length, end = "\t")
    return video_length

def crossover(chromosome1, chromosome2, mutation_prob=0.05):
    if len(chromosome1) != len(chromosome2):
        raise ValueError("Chromosomes must have the same length")
    n = len(chromosome1)
    offspring1 = [None] * n
    offspring2 = [None] * n
    visited = [False] * n
    def find_cycle(start):
        cycle = []
        current = start
        while not visited[current]:
            cycle.append(current)
            visited[current] = True
            current = chromosome1.index(chromosome2[current])
        return cycle
    cycle_number = 0
    for i in range(n):
        if not visited[i]:
            cycle = find_cycle(i)
            if cycle_number % 2 == 0:
                for index in cycle:
                    offspring1[index] = chromosome1[index]
                    offspring2[index] = chromosome2[index]
            else:
                for index in cycle:
                    offspring1[index] = chromosome2[index]
                    offspring2[index] = chromosome1[index]
            cycle_number += 1
    def apply_mutation(offspring):
        for _ in range(len(offspring)):
            if random.random() < mutation_prob:
                idx1, idx2 = random.sample(range(n), 2)
                offspring[idx1], offspring[idx2] = offspring[idx2], offspring[idx1]
    apply_mutation(offspring1)
    apply_mutation(offspring2)
    return tuple(offspring1), tuple(offspring2)

def selection(chromosomes, selection_size):
    chromosomes = dict(sorted(chromosomes.items(), reverse=False, key=lambda item: item[1])[:selection_size])

def evolution(chromosomes, population_size, selection_size):
    chromosomes_list = chromosomes.keys()
    new_chromosomes = set()
    for _ in range((population_size-selection_size+1)//2):
        c1, c2 = crossover(random.choice(list(chromosomes_list)), random.choice(list(chromosomes_list)))
        if not (c1 in chromosomes):
            new_chromosomes.add(c1)
        if not (c2 in chromosomes):
            new_chromosomes.add(c2)
    for c in list(new_chromosomes):
        if c in chromosomes:
            pass
        else:
            chromosomes[c] = fitness(c)
    selection(chromosomes, selection_size)

In [12]:
chromosomes = dict()    
current_generation = 0

In [13]:
POPULATION_SIZE = 30
SELECTION_SIZE = 10

### Generate initial population

In [14]:
print("Generation 0")
new_chromosomes = list()
for _ in range(POPULATION_SIZE):
    new_chromosomes.append(tuple(np.random.permutation(object_ids)))
for c in new_chromosomes:
    chromosomes[c] = fitness(c)
selection(chromosomes, SELECTION_SIZE)
print()
print("Best fitness:", min(chromosomes.values()))
print("-----")

Generation 0
1442	1457	1486	1388	1318	1362	1411	1345	1366	1461	1519	1404	1449	1362	1440	1399	1504	1492	1370	1316	1479	1506	1404	1478	1433	1469	1341	1393	1425	1520	
Best fitness: 1316
-----


### Perform evolution

In [16]:
for _ in range(20):
    print("Generation", current_generation+1)
    evolution(chromosomes, POPULATION_SIZE, SELECTION_SIZE)
    current_generation+=1
    print()
    print("Best fitness:", min(chromosomes.values()))
    print("-----")

Generation 2
1395	1440	1399	1417	1457	1421	1364	1353	1391	1440	1376	1479	1478	1471	1404	1395	1504	1433	1392	1382	
Best fitness: 1316
-----
Generation 3
1519	1388	1402	1424	1462	1362	1405	1324	1341	1382	1440	1559	1569	1403	1372	1341	1507	1491	1431	1432	
Best fitness: 1316
-----
Generation 4
1417	1450	1462	1362	1382	1453	1422	1417	1433	1341	1451	1457	1428	1425	1399	1403	1404	1388	1449	1420	
Best fitness: 1316
-----
Generation 5
1421	1509	1504	1276	1419	1402	1549	1462	1382	1446	1367	1434	1402	1457	1450	1331	1411	1369	1335	1422	
Best fitness: 1276
-----
Generation 6
1392	1369	1433	1374	1457	1404	1414	1451	1422	1433	1449	1341	1457	1370	1451	1505	1504	1449	1411	1378	
Best fitness: 1276
-----
Generation 7
1376	1370	1403	1401	1388	1428	1506	1475	1337	1478	1378	1382	1428	1287	1461	1382	1345	1440	1374	1388	
Best fitness: 1276
-----
Generation 8
1289	1491	1453	1449	1341	1402	1403	1399	1475	1403	1391	1446	1433	1475	1428	1469	1393	1317	1471	1393	
Best fitness: 1276
-----
Generation 9
1498	1448	1431

### Take the best chromosome

In [87]:
best_chromosome, op_frame_count = sorted(chromosomes.items(), reverse=False, key=lambda item: item[1])[0]
print(best_chromosome)
print(op_frame_count)

(63, 7, 97, 128, 131, 54, 46, 106, 141, 74, 45, 48, 19, 13, 16, 51, 113, 90, 112, 53, 98, 114, 92, 30, 77, 22, 56, 120, 61, 9, 110, 133, 101, 67, 132, 66, 145, 147, 23, 225, 39, 144, 37, 86, 36, 44, 107, 104, 103, 5, 130, 127, 199, 3, 88, 290, 58, 126, 124, 43, 89, 27, 105, 96, 84, 134, 136, 64, 52, 95, 32, 85, 185, 140, 24, 17, 8, 111, 72, 4, 15, 11, 80, 268, 79, 139, 47, 119, 281, 29, 122, 59, 31, 108, 118, 42, 26, 93, 57, 259, 1, 277, 129, 81, 41, 135, 76, 68, 21, 142, 40, 55, 71, 137, 2, 14, 146, 87, 102)
1231


## Generate Synopsis Video

In [100]:
def get_starting_frames(chromosome, object_tubes=object_tubes, segmented_frames_tensor=segmented_frames_tensor):
    processing_array = np.zeros_like(segmented_frames_tensor.numpy(), dtype=np.int32)
    starting_frames = dict()
    for oid in chromosome:
        obj_tube_sub_array = object_tubes[oid]['sub_tensor'].numpy()
        start_coord = object_tubes[oid]['start_coord']
        frame_count = obj_tube_sub_array.shape[0]
        object_tube_mask = (obj_tube_sub_array != 0).astype(np.int32)
        for f in range(0, processing_array.shape[0] - frame_count, video_info.fps):
            corner1 = (f, start_coord[0], start_coord[1])
            corner2 = tuple(corner1[i] + obj_tube_sub_array.shape[i] for i in range(3))
            sub_array = processing_array[
                corner1[0]:corner2[0], 
                corner1[1]:corner2[1], 
                corner1[2]:corner2[2]
            ]
            if np.sum(sub_array * object_tube_mask) == 0:  # No overlap
                processing_array[
                    corner1[0]:corner2[0], 
                    corner1[1]:corner2[1], 
                    corner1[2]:corner2[2]
                ] += obj_tube_sub_array
                starting_frames[oid] = f
                break
        else:
            print("\nObject placement failed for oid:", oid)
            return None
    return starting_frames

def load_and_pad_video(video_path):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise ValueError("Could not open video file.")
    frames = []
    frame_count = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        height, width, channels = frame.shape
        pad_height = (32 - height % 32) % 32
        pad_width = (32 - width % 32) % 32
        top_pad = pad_height // 2
        bottom_pad = pad_height - top_pad
        left_pad = pad_width // 2
        right_pad = pad_width - left_pad
        padded_frame = np.pad(
            frame, 
            ((top_pad, bottom_pad), (left_pad, right_pad), (0, 0)),
            mode='constant', 
            constant_values=0
        )
        frames.append(padded_frame)
        frame_count += 1
    video_array = np.array(frames)
    cap.release()
    return video_array

def extract_background(input_video_array, segmented_frames_tensor):
    f, h, w = segmented_frames_tensor.shape
    mask = tf.cast(segmented_frames_tensor == 0, tf.float32)
    sum_pixels = np.zeros((h, w, 3))
    count_valid_frames = np.zeros((h, w))
    for frame_idx in range(f):
        valid_pixels = mask[frame_idx].numpy()
        sum_pixels += input_video_array[frame_idx] * valid_pixels[:, :, np.newaxis]    
        count_valid_frames += valid_pixels
    epsilon = 1e-8
    average_pixels = sum_pixels / (count_valid_frames[:, :, np.newaxis] + epsilon)
    average_pixels_normalized = np.clip(average_pixels, 0, 255).astype(np.uint8)
    return average_pixels_normalized

In [101]:
starting_frames = get_starting_frames(best_chromosome)
synopsis_video_array = np.zeros((op_frame_count, segmented_frames_tensor.shape[1], segmented_frames_tensor.shape[2], 3), dtype=np.uint8)
input_video_array = load_and_pad_video(INPUT_VIDEO_PATH)
background_frame = extract_background(input_video_array, segmented_frames_tensor)
synopsis_video_array[:] = background_frame

(1231, 384, 640, 3)
(1231, 384, 640, 3)


In [102]:
processing_array = np.zeros_like(segmented_frames_tensor.numpy(), dtype=np.int32)
for oid in best_chromosome:
    obj_tube_sub_array = object_tubes[oid]['sub_tensor'].numpy()
    start_coord = object_tubes[oid]['start_coord']
    frame_count = obj_tube_sub_array.shape[0]
    object_tube_mask = (obj_tube_sub_array != 0).astype(np.uint8)
    object_tube_mask_neg = (obj_tube_sub_array == 0).astype(np.uint8)
    object_tube_mask_neg = np.repeat(object_tube_mask_neg[:, :, :, np.newaxis], 3, axis=3)
    for f in range(0, processing_array.shape[0] - frame_count, video_info.fps):
        corner1 = (f, start_coord[0], start_coord[1])
        corner2 = tuple(corner1[i] + obj_tube_sub_array.shape[i] for i in range(3))
        sub_array = processing_array[corner1[0]:corner2[0], corner1[1]:corner2[1], corner1[2]:corner2[2]]
        if np.sum(sub_array * object_tube_mask) == 0:  # No overlap
            processing_array[corner1[0]:corner2[0], corner1[1]:corner2[1], corner1[2]:corner2[2]] += obj_tube_sub_array
            object_tube_mask = np.repeat(object_tube_mask[:, :, :, np.newaxis], 3, axis=3)
            f1 = object_tubes[oid]['start_frame']
            f2 = f1 + object_tubes[oid]['length']
            synopsis_video_array[corner1[0]:corner2[0], corner1[1]:corner2[1], corner1[2]:corner2[2]] *= object_tube_mask_neg
            synopsis_video_array[corner1[0]:corner2[0], corner1[1]:corner2[1], corner1[2]:corner2[2]] += object_tube_mask * input_video_array[f1:f2, corner1[1]:corner2[1], corner1[2]:corner2[2]]
            break
    else:
        print("\nObject placement failed for oid:", oid)


In [103]:
fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # Codec for .mp4
out = cv2.VideoWriter('output_video.mp4', fourcc, video_info.fps, (video_info.width, video_info.height))

for frame in synopsis_video_array:
    out.write(frame)

out.release()