In [None]:
%load_ext autoreload
%autoreload 2
%matplotlib inline

## Create video from image

In [None]:
import cv2
import numpy as np
import os

# Set parameters
image_path = 'large_image.jpg'  
output_folder = 'frames640_2' 
os.makedirs(output_folder, exist_ok=True)

output_video = 'panned_video_640_2.mp4'
frame_size = (640, 640)  
total_frames = 300  
fps = 30  

def curve_function(t):
    # t ranges from 0 to 1
    x = int(500)  # x position over time
    y = int(t * 3000)  # y position over time
    return x, y

In [None]:
# Load the large image
image = cv2.imread(image_path)
img_height, img_width = image.shape[:2]

# Generate frames
for i in range(total_frames):
    # Calculate t as a percentage of the total frames
    t = i / total_frames
    # Get the top-left coordinates for the current frame crop
    x, y = curve_function(t)
    
    # Ensure the cropping window stays within image bounds
    x = max(0, min(x, img_width - frame_size[0]))
    y = max(0, min(y, img_height - frame_size[1]))

    # Crop the frame from the large image
    frame = image[y:y + frame_size[1], x:x + frame_size[0]]

    # Save the frame
    frame_path = os.path.join(output_folder, f'frame_{i:04d}.png')
    cv2.imwrite(frame_path, frame)
    if i%10==0 or i==total_frames-1:
        print(f'Saved frame {i+1}/{total_frames}')


### To build the video
```bash
ffmpeg -framerate 30 -i frames640/frame_%04d.png -c:v libx264 -pix_fmt yuv420p panned_video640.mp4
```

## Count objects on video using solutions

In [None]:
import cv2

from ultralytics import solutions

cap = cv2.VideoCapture("panned_video.mp4")
assert cap.isOpened(), "Error reading video file"
w, h, fps = (int(cap.get(x)) for x in (cv2.CAP_PROP_FRAME_WIDTH, cv2.CAP_PROP_FRAME_HEIGHT, cv2.CAP_PROP_FPS))

# Define region points
region_points = [(20, 400), (1080, 404), (1080, 360), (20, 360)]

# Video writer
video_writer = cv2.VideoWriter("object_counting_output2.avi", cv2.VideoWriter_fourcc(*"mp4v"), fps, (w, h))

# Init Object Counter
counter = solutions.ObjectCounter(
    show=True,
    region=region_points,
    model="urbanet11s.pt",
    conf=0.01,
    tracker = "bytetrack.yaml",
    iou = 0.1,
)

# Process video
while cap.isOpened():
    success, im0 = cap.read()
    if not success:
        print("Video frame is empty or video processing has been successfully completed.")
        break
    im0 = counter.count(im0)
    video_writer.write(im0)

cap.release()
video_writer.release()


In [None]:
import cv2

from ultralytics import YOLO
from ultralytics.utils.plotting import Annotator, colors

model = YOLO("urbanet11s.pt")  # segmentation model
names = model.model.names
cap = cv2.VideoCapture("panned_video.mp4")
w, h, fps = (int(cap.get(x)) for x in (cv2.CAP_PROP_FRAME_WIDTH, cv2.CAP_PROP_FRAME_HEIGHT, cv2.CAP_PROP_FPS))

out = cv2.VideoWriter("instance-segmentation_low_conf.avi", cv2.VideoWriter_fourcc(*"MJPG"), fps, (w, h))

while True:
    ret, im0 = cap.read()
    if not ret:
        print("Video frame is empty or video processing has been successfully completed.")
        break

    results = model.predict(im0, conf=0.03)
    annotator = Annotator(im0, line_width=2)

    if results[0].masks is not None:
        clss = results[0].boxes.cls.cpu().tolist()
        masks = results[0].masks.xy
        for mask, cls in zip(masks, clss):
            color = colors(int(cls), True)
            txt_color = annotator.get_txt_color(color)
            annotator.seg_bbox(mask=mask, mask_color=color, label=names[int(cls)], txt_color=txt_color)

    out.write(im0)
    #cv2.imshow("instance-segmentation", im0)

    #if cv2.waitKey(1) & 0xFF == ord("q"):
    #    break

out.release()
cap.release()
#cv2.destroyAllWindows()


In [None]:
from collections import defaultdict

import cv2

from ultralytics import YOLO
from ultralytics.utils.plotting import Annotator, colors

track_history = defaultdict(lambda: [])

model = YOLO("urbanet11s.pt")  # segmentation model
cap = cv2.VideoCapture("panned_video.mp4")
w, h, fps = (int(cap.get(x)) for x in (cv2.CAP_PROP_FRAME_WIDTH, cv2.CAP_PROP_FRAME_HEIGHT, cv2.CAP_PROP_FPS))

out = cv2.VideoWriter("instance-segmentation-object-tracking-lowconf2.avi", cv2.VideoWriter_fourcc(*"MJPG"), fps, (w, h))

while True:
    ret, im0 = cap.read()
    if not ret:
        print("Video frame is empty or video processing has been successfully completed.")
        break

    annotator = Annotator(im0, line_width=2)

    results = model.track(im0, conf=0.03, persist=True)

    if results[0].boxes.id is not None and results[0].masks is not None:
        masks = results[0].masks.xy
        track_ids = results[0].boxes.id.int().cpu().tolist()

        for mask, track_id in zip(masks, track_ids):
            color = colors(int(track_id), True)
            txt_color = annotator.get_txt_color(color)
            annotator.seg_bbox(mask=mask, mask_color=color, label=str(track_id), txt_color=txt_color)

    out.write(im0)

out.release()
cap.release()

## Using surfnet tracking

In [None]:
import json
import logging
import os
from pathlib import Path
import os.path as op
import argparse
import warnings
import numpy as np
import torch
from flask import jsonify, request
from werkzeug.utils import secure_filename
from typing import List, Tuple
from ultralytics import YOLO

from plasticorigins.tracking.postprocess_and_count_tracks import (
    filter_tracks,
    postprocess_for_api,
)
from plasticorigins.tracking.track_video import track_video
from plasticorigins.tracking.trackers import get_tracker
from plasticorigins.tracking.utils import (
    get_detections_for_video,
    read_tracking_results,
    write_tracking_results_to_file,
)

logger = logging.getLogger()
device = torch.device("cpu")

engine = get_tracker("EKF")

def run_video(config) -> json:
    # launch the tracking
    filtered_results, detections = track(config)

    # postprocess
    output_json = postprocess_for_api(filtered_results, id_categories)

    return output_json, filtered_results, detections


def track(args: argparse) -> Tuple[List, int, int]:

    def detector(frame):
        res = model_yolo(frame[:,:,::-1])
        dets = res[0].boxes.xyxy.numpy()
        cls = res[0].boxes.cls.numpy()
        confs = res[0].boxes.conf.numpy()
        # Improve megot scores
        confs = np.where(cls == 10, confs * 10, confs)
        return dets, confs, cls

    reader = IterableFrameReader(
        video_filename=args.video_path,
        skip_frames=args.skip_frames,
        output_shape=args.output_shape,
        progress_bar=True,
        preload=args.preload_frames,
        crop=args.crop,
    )

    num_frames, fps = (
        int(reader.max_num_frames / (args.skip_frames + 1)),
        reader.fps,
    )

    print("---Detecting...")
    detections = []
    for frame in reader:
        detections.append(detector(frame))

    print("---Tracking...")
    display = None

    results = track_video(
        reader,
        iter(detections),
        args,
        engine,
        transition_variance,
        observation_variance,
        display,
        is_yolo=args.arch == "yolo",
    )
    reader.video.release()
    # store unfiltered results
    output_filename = Path(args.output_dir) / "results_unfiltered.txt"
    coord_mapping = reader.get_inv_mapping(args.downsampling_factor)
    write_tracking_results_to_file(
        results,
        coord_mapping,  # Scale the output back to original video size
        output_filename=output_filename,
    )
    print("---Filtering...")

    # read from the file
    results = read_tracking_results(output_filename)
    filtered_results = filter_tracks(results, args.kappa, args.tau)
    # store filtered results
    output_filename = Path(args.output_dir) / "results.txt"
    write_tracking_results_to_file(
        filtered_results,
        lambda x, y: (x, y),  # No scaling, already scaled!
        output_filename=output_filename,
    )
    print("---Done.")

    return filtered_results, detections


In [None]:
base_dir = Path("/media/charles/81d75a6a-d733-4070-ad6f-9e7fe046ffab/Programs/datasetManipulation/")

in_video = (base_dir / "panned_video.mp4").as_posix()
out_folder = (base_dir / "out5").as_posix()
if not op.isdir(out_folder):
    os.mkdir(out_folder)

class DotDict(dict):
    """dot.notation access to dictionary attributes"""

    __getattr__ = dict.get
    __setattr__ = dict.__setitem__
    __delattr__ = dict.__delitem__

config = DotDict(
    {
        "confidence_threshold": 0.15,
        "downsampling_factor": 1,
        "noise_covariances_path": "/media/charles/81d75a6a-d733-4070-ad6f-9e7fe046ffab/Programs/surfnet/data/tracking_parameters",
        "file_model_yolo": "urbanet11s.pt",
        "output_shape": (640, 640),
        "skip_frames": 3,  # 3
        "arch": "yolo",
        "device": "cpu",
        "detection_batch_size": 1,
        "display": 0,
        "kappa": 5,  # 7
        "tau": 3,  # 4
        "crop": True,
    }
)

id_categories = {
    0: "autre",
    1: "autre-papier-carton",
    2: "autre-plastique-fragments",
    3: "autre-polystyrene",
    4: "bouteille-en-plastique",
    5: "bouteille-en-verre",
    6: "canette",
    7: "emballage-alimentaire-papier",
    8: "emballage-alimentaire-plastique",
    9: "indefini",
    10: "megot",
    11: "sac-ordures-menageres"
}

model_yolo = YOLO("/media/charles/81d75a6a-d733-4070-ad6f-9e7fe046ffab/Programs/datasetManipulation/urbanet11s.pt")

observation_variance = np.load(
    os.path.join(config_track.noise_covariances_path, "observation_variance.npy")
)
transition_variance = np.load(
    os.path.join(config_track.noise_covariances_path, "transition_variance.npy")
)

config.video_path = in_video
config.output_dir = out_folder

out_json, filtered_results, detections = run_video(config)

In [None]:
# Create video
from plasticorigins.tracking.utils import generate_video_with_annotations
reader = IterableFrameReader(video_filename=config.video_path,
                            skip_frames=0,
                            progress_bar=True,
                            preload=False,
                            max_frame=0)

def load_trash_icons(folder_path: str) -> Dict:
    folder_path = Path(folder_path)
    id_path = {
        "autre": folder_path / "chaussure.png",  # 'Fragment',    #'Sheet / tarp / plastic bag / fragment',
        "autre-papier-carton": folder_path / "emballage.png",  # 'Insulating',  #'Insulating material',
        "autre-plastique-fragments": folder_path / "fragment.png",
        "autre-polystyrene": folder_path / "mousse.png",
        "bouteille-en-plastique": folder_path / "bouteille.png",  # 'Bottle',      #'Bottle-shaped',
        "bouteille-en-verre": folder_path / "bouteille.png",  # 'Bottle',      #'Bottle-shaped',
        "canette": folder_path / "briquet.png",  # 'Can',         #'Can-shaped',
        "emballage-alimentaire-papier": folder_path / "emballage.png",  # 'Packaging',   #'Other packaging',
        "emballage-alimentaire-plastique": folder_path / "emballage.png",  # 'Tire',
        "indefini": folder_path / "dechet.png",  # 'Drum',
        "megot": folder_path / "megot.png",  # 'Fishing net', #'Fishing net / cord',
        "sac-ordures-menageres": folder_path / "dechet.png",  # 'Unclear'
    }
    out_dict = {}

    for idx, path in id_path.items():
        img = cv2.imread(path.resolve().as_posix(), cv2.IMREAD_UNCHANGED)
        resized_img = cv2.resize(img, (100, 60), interpolation=cv2.INTER_AREA)
        out_dict[idx] = resized_img

    return out_dict

labels2icons = load_trash_icons("./icons/")

generate_video_with_annotations(reader, out_json, Path(config.output_dir) / "video.mp4",
                                config.skip_frames, 1,
                                logger, gps_data=None, labels2icons=labels2icons)

In [None]:
# analyse resutls
import pandas as pd
df = pd.DataFrame(filtered_results, columns=['frame', 'idx', 'x', 'y', 'Score', 'Label'])

df_filtered = df[df['Label'] == 2]

# Group by the 'Second Column' and apply desired aggregation
# Here we simply count the occurrences in each group, but you can replace with any other aggregation
grouped_df = df_filtered.groupby('idx').agg(
    Count=('idx', 'size'),
    Average_Score=('Score', 'mean')
).reset_index()
grouped_df = grouped_df.drop(columns='idx')

# Display the result
print(grouped_df)

#### tests

In [None]:
detections_for_frame = next(detections)

In [None]:
args = config_track
reader = IterableFrameReader(
    video_filename=args.video_path,
    skip_frames=args.skip_frames,
    output_shape=args.output_shape,
    progress_bar=True,
    preload=args.preload_frames,
    crop=args.crop,
)

num_frames, fps = (
    int(reader.max_num_frames / (args.skip_frames + 1)),
    reader.fps,
)

logger.info("---Detecting...")
detections = []
with warnings.catch_warnings():
    warnings.filterwarnings("ignore")
    for frame in reader:
        detections.append(detector(frame))


In [None]:
fram_pic = detections[0][0].orig_img

In [None]:
plt.imshow(fram_pic[:,:,::-1]);

In [None]:
output_json = postprocess_for_api(filtered_results, id_categories)

In [None]:
filtered_results

In [None]:
import pandas as pd
df = pd.DataFrame(filtered_results, columns=['frame', 'idx', 'x', 'y', 'Score', 'Label'])

df_filtered = df[df['Label'] == 2]

# Group by the 'Second Column' and apply desired aggregation
# Here we simply count the occurrences in each group, but you can replace with any other aggregation
grouped_df = df_filtered.groupby('idx').agg(
    Count=('idx', 'size'),
    Average_Score=('Score', 'mean')
).reset_index()
grouped_df = grouped_df.drop(columns='idx')

# Display the result
print(grouped_df)


In [None]:
from plasticorigins.tracking.utils import generate_video_with_annotations

In [None]:
reader = IterableFrameReader(video_filename=config_track.video_path,
                            skip_frames=0,
                            progress_bar=True,
                            preload=False,
                            max_frame=0)

generate_video_with_annotations(reader, output_json, Path(args.output_dir) / "video.mp4",
                                config_track.skip_frames, 1,
                                logger, gps_data=None, labels2icons=None)

In [None]:
config_track