In [None]:
import os
import glob
import json
import cv2
import numpy as np
import torch
from ultralytics.models.sam import SAM2VideoPredictor
from ultralytics import SAM

In [1]:
def mask_to_coco_segmentation(mask_tensor, threshold=0.5):
    # Tạo data theo coco format
    mask_np = (mask_tensor.cpu().numpy() > threshold).astype(np.uint8) * 255
    contours, _ = cv2.findContours(mask_np, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    segmentation = []
    for cnt in contours:
        cnt = cnt.flatten().tolist()
        if len(cnt) >= 6:  # kiểm tra phải có ít nhất 6 points mỗi object
            segmentation.append(cnt)
    area = float(np.sum(mask_np > 0))
    return segmentation, area

def get_frame_at_index(video_path, frame_number):
    # Lấy frame theo vị trí của frame number
    cap = cv2.VideoCapture(video_path)
    cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
    ret, frame = cap.read()
    cap.release()
    if not ret or frame is None:
        print(f"Unable to extract frame {frame_number} from {video_path}.")
        return None
    return frame

def extract_boxes_from_track(video_path, track, frame_id=0):
    # Lọc dữ liệu của frame hiện tại
    boxes = []
    for tracklet in track: # Duyệt qua từng tracklet
        for data in tracklet: # Duyệt qua từng frame trong tracklet
            if data[0] == frame_id: # Nếu frameID khớp
                bbox = data[1:5]  # Lấy bounding box
                boxes.append(bbox) # bbox[x, y, w, h]
    return boxes

def boxes_to_centers_nested(boxes):
    #Lấy điểm trung tâm box
    centers = []
    labels = []
    for box in boxes:
        x, y, w, h = box
        center_x = x + (w-x) / 2
        center_y = y + (h-y) / 2
        centers.append([[center_x, center_y]])
        labels.append([1])
    return centers, labels

SAM2VideoPredictor có track

In [None]:
# Clear CUDA cache
torch.cuda.empty_cache()

video_path = r"/content/drive/MyDrive/pig_farming/2019-11-22--11_20_15/000001/color.mp4"
parent_folder_name = os.path.basename(os.path.dirname(video_path))
track = np.load(r"/content/drive/MyDrive/pig_farming/2019-11-22--11_20_15/000001/behaviour_15.npy", allow_pickle=True)

# Lấy tọa độ box
boxes = extract_boxes_from_track(video_path, track, frame_id=0)

# Lấy điểm center
centers_nested, labels_nested = boxes_to_centers_nested(boxes)

# Model config
# Skip 15 frames
vid_stride=15
overrides = dict(conf=0.25, task="segment", mode="predict", imgsz=1024,
                 model="sam2_t.pt", stream_buffer=False, vid_stride=vid_stride)
predictor = SAM2VideoPredictor(overrides=overrides)

# Run inference
results = predictor(source=video_path, points=centers_nested, labels=labels_nested)

# Tạo file chứa result
base_output = "output"  # Main output directory
video_output_folder = os.path.join(base_output, parent_folder_name)
os.makedirs(video_output_folder, exist_ok=True)

# Tạo subfolders cho  raw images, processed images, and annotations.
raw_folder = os.path.join(video_output_folder, "raw")
processed_folder = os.path.join(video_output_folder, "processed")
annotation_folder = os.path.join(video_output_folder, "annotations")
os.makedirs(raw_folder, exist_ok=True)
os.makedirs(processed_folder, exist_ok=True)
os.makedirs(annotation_folder, exist_ok=True)

# --- Process and Save Results ---
new_index = 0
ann_global_id = 1

# Iterate
for result in results:
    # "000001_frame_0000.jpg"
    file_base = f"{parent_folder_name}_frame_{new_index:04d}.jpg"

    # Save the processed image
    processed_image_path = os.path.join(processed_folder, file_base)
    result.save(filename=processed_image_path)

    # Save the raw image.
    # Lấy ảnh gốc từ result nếu có
    raw_image = getattr(result, "orig_img", None)
    if raw_image is None:
        # The original frame index = new_index * vid_stride
        raw_image = get_frame_at_index(video_path, new_index * vid_stride)
    raw_image_path = os.path.join(raw_folder, file_base)
    if raw_image is not None:
        cv2.imwrite(raw_image_path, raw_image)
    else:
        print(f"Failed to save raw image for frame {new_index}")

    # Build COCO-format annotations
    annotations = []
    if result.masks is not None:
        mask_tensor = result.masks.data  # Tensor of shape (N, H, W)
        if result.boxes is not None:
            boxes = result.boxes.xyxy.cpu().numpy()  # Each box: [x1, y1, x2, y2]
        else:
            boxes = [None] * len(mask_tensor)

        for mask, box in zip(mask_tensor, boxes):
            segmentation, area = mask_to_coco_segmentation(mask)
            if box is not None:
                x1, y1, x2, y2 = box
                bbox = [float(x1), float(y1), float(x2 - x1), float(y2 - y1)]
            else:
                bbox = []

            annotation = {
                "id": ann_global_id,
                "image_id": new_index,
                "category_id": 1,
                "segmentation": segmentation,
                "area": area,
                "bbox": bbox,
                "iscrowd": 0
            }
            annotations.append(annotation)
            ann_global_id += 1

    # Build COCO JSON structure
    coco_output = {
        "images": [{"id": new_index, "file_name": file_base}],
        "annotations": annotations,
        "categories": [{"id": 1, "name": "pig"}]
    }

    # Save annotation JSON in the annotation folder.
    json_filename = f"{parent_folder_name}_frame_{new_index:04d}.json"
    json_path = os.path.join(annotation_folder, json_filename)
    with open(json_path, "w") as f:
        json.dump(coco_output, f, indent=2)

    print(f"Saved frame {new_index} of {parent_folder_name}:")
    print(f"  Raw image: {raw_image_path}")
    print(f"  Processed image: {processed_image_path}")
    print(f"  Annotations: {json_path}")

    new_index += 1


Không Track dùng box sau khi lọc frame để input model

In [None]:
track = np.load(r"results_dataset/2019-11-22--11_20_15/000001/behaviour_15.npy", allow_pickle=True)
map_behaviour = {
    0: "unknown",
    1: "not moving",
    2: "moving",
    3: "running",
    4: "eating",
    5: "drinking",
    6: "playing",
    101: "standing",
    102: "lying"
}

video_path = r"2019-11-22--11_20_15/000001/color.mp4"
capture = cv2.VideoCapture(video_path)

frame_width = int(capture.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(capture.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(capture.get(cv2.CAP_PROP_FPS))

output_video_path = "051119_000000.mp4"
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_video_path, fourcc, fps, (frame_width, frame_height))

# tạo thư mục output
video_basename = os.path.splitext(os.path.basename(video_path))[0]
output_dir = os.path.join("output", video_basename)
os.makedirs(output_dir, exist_ok=True)

# Load SAM model from ultralytics
model = SAM("sam2.1_b.pt")
model.info()  

ann_global_id = 1  

frame_index = 0

while capture.isOpened():
    ret, frame = capture.read()
    if not ret:
        break

    boxes = []
    for i in range(len(track)):  
        for m in range(len(track[i])):  
            if track[i][m][0] == frame_index:  
                bbox = track[i][m][1:5]  
                boxes.append(bbox)
                xmin, ymin, xmax, ymax = list(map(int, bbox))
                behavior = map_behaviour[int(track[i][m][13])]
                label = f"ID:{i} {behavior}"
                # Optionally, draw the bounding box and label on the frame
                #cv2.rectangle(frame, (xmin, ymin), (xmax, ymax), (0, 255, 0), 2)
                #cv2.putText(frame, label, (xmin, ymin - 10),
                #            cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

    # Run SAM inference 
    results = model(source=frame, bboxes=boxes, imgsz=1024)
    
    # Prepare COCO-format annotations 
    annotations = []
    for result in results:
        # Save kết quả từ Sam
        image_filename = os.path.join(output_dir, f"{video_basename}_frame_{frame_index}.jpg")
        result.save(filename=image_filename)
        
        # Xây dựng COCO annotations format
        if result.masks is not None:
            mask_tensor = result.masks.data  # shape: (N, H, W)
            if result.boxes is not None:  
                boxes_from_sam = result.boxes.xyxy.cpu().numpy()  # [x1, y1, x2, y2]
            else:
                boxes_from_sam = [None] * len(mask_tensor)
            
            for mask, box in zip(mask_tensor, boxes_from_sam):
                segmentation, area = mask_to_coco_segmentation(mask)
                if box is not None:
                    x1, y1, x2, y2 = box
                    coco_bbox = [float(x1), float(y1), float(x2 - x1), float(y2 - y1)]
                else:
                    coco_bbox = []
                
                annotation = {
                    "id": ann_global_id,
                    "image_id": frame_index,  
                    "category_id": 1,         #pig
                    "segmentation": segmentation,
                    "area": area,
                    "bbox": coco_bbox,
                    "iscrowd": 0
                }
                annotations.append(annotation)
                ann_global_id += 1
        # Tạo COCO output cho frame
        coco_output = {
            "images": [{"id": frame_index, "file_name": os.path.basename(image_filename)}],
            "annotations": annotations,
            "categories": [{"id": 1, "name": "pig"}]
        }
        json_filename = os.path.join(output_dir, f"{video_basename}_frame_{frame_index}.json")
        with open(json_filename, "w") as f:
            json.dump(coco_output, f, indent=2)
        print(f"Saved SAM prediction for frame {frame_index} as '{image_filename}' and annotations in '{json_filename}'.")


    frame_index += 1
    if cv2.waitKey(30) & 0xFF == ord('q'):
        break

capture.release()
out.release()
cv2.destroyAllWindows()


Tạo video từ processed image

In [3]:
import cv2
import os
import re

def compress_images_to_video(image_folder, output_video, fps=10, image_extension='.jpg'):

    # Get list of image files with the given extension
    image_files = [f for f in os.listdir(image_folder) if f.endswith(image_extension)]
    if not image_files:
        print("No images found in the folder.")
        return

    # Define a helper function to extract the frame number from the file name.
    def extract_frame_number(filename):
        # The pattern looks for '_frame_' followed by one or more digits.
        match = re.search(r'_frame_(\d+)', filename)
        if match:
            return int(match.group(1))
        else:
            return -1  # if not found, return -1 so it sorts first

    # Sort image files based on the extracted frame number.
    image_files.sort(key=extract_frame_number)

    # Read the first image to get video dimensions.
    first_image_path = os.path.join(image_folder, image_files[0])
    first_frame = cv2.imread(first_image_path)
    if first_frame is None:
        print(f"Failed to read image: {first_image_path}")
        return
    height, width, channels = first_frame.shape

    # Define the video codec and create VideoWriter.
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    video_writer = cv2.VideoWriter(output_video, fourcc, fps, (width, height))

    # Write each image into the video.
    for image_file in image_files:
        image_path = os.path.join(image_folder, image_file)
        frame = cv2.imread(image_path)
        if frame is None:
            print(f"Skipping {image_path} as it could not be read.")
            continue
        video_writer.write(frame)

    video_writer.release()
    print(f"Video saved as: {output_video}")

# Example Usage:
image_folder = r"/content/output/000001/processed"  # Folder with images (e.g., 005_frame_0.jpg, 005_frame_1.jpg, ...)
output_video = r"2019-11-22--11_20_15_000001.mp4"  # Desired output video file
fps = 10  # Adjust the FPS as needed

compress_images_to_video(image_folder, output_video, fps)


Video saved as: 2019-11-22--11_20_15_000001.mp4
