Skip to content

Commit

Permalink
Merge pull request #132 from pipeless-ai/sequential_execution
Browse files Browse the repository at this point in the history
feat(hook): Support sequential sorted execution
  • Loading branch information
miguelaeh committed Feb 2, 2024
2 parents 7a2e718 + 26eb8a6 commit 85aed5f
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 15 deletions.
6 changes: 6 additions & 0 deletions examples/object-tracking/init.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from norfair import Tracker

def init():
return {
"tracker": Tracker(distance_function="euclidean", distance_threshold=50)
}
23 changes: 23 additions & 0 deletions examples/object-tracking/process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# make stateful

from norfair import Detection, draw_points
import numpy as np

def hook(frame_data, context):
tracker = context['tracker']
frame = frame_data['modified']
bboxes, scores, labels = frame_data['user_data'].values()
norfair_detections = yolo_to_norfair(bboxes, scores)
tracked_objects = tracker.update(detections=norfair_detections)
draw_points(frame, drawables=tracked_objects)
frame_data['modified'] = frame

def yolo_to_norfair(bboxes, scores):
norfair_detections = []
for i, bbox in enumerate(bboxes):
box_corners = [[bbox[0], bbox[1]], [bbox[2], bbox[3]]]
box_corners = np.array(box_corners)
corners_scores = np.array([scores[i], scores[i]])
norfair_detections.append(Detection(points=box_corners, scores=corners_scores))

return norfair_detections
9 changes: 9 additions & 0 deletions examples/yolo/post-process.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@ def hook(frame_data, _):
for box in bboxes:
x1, y1, x2, y2, score, class_number = box
box_label(frame, [x1, y1, x2, y2], yolo_classes[int(class_number)], score, (255, 0, 255))

bboxes = bboxes.tolist()
# Add the predictions to the frame user_data in order to recover it frm other stages
frame_data['user_data'] = {
"bboxes": [bbox[:4] for bbox in bboxes],
"scores": [bbox[4] for bbox in bboxes],
"labels": [yolo_classes[int(bbox[5])] for bbox in bboxes]
}

frame_data['modified'] = frame

# Classes defined in the YOLO model to obtain the predicted class label
Expand Down
2 changes: 1 addition & 1 deletion pipeless/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pipeless/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pipeless-ai"
version = "1.8.0"
version = "1.9.0"
edition = "2021"
authors = ["Miguel A. Cabrera Minagorri"]
description = "An open-source computer vision framework to build and deploy applications in minutes"
Expand Down
31 changes: 24 additions & 7 deletions pipeless/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@ pub struct RgbFrame {
inference_output: ndarray::ArrayBase<ndarray::OwnedRepr<f32>, ndarray::Dim<ndarray::IxDynImpl>>,
pipeline_id: uuid::Uuid,
user_data: UserData,
frame_number: u64,
}
impl RgbFrame {
pub fn new(
original: ndarray::Array3<u8>,
width: usize, height: usize,
pts: gst::ClockTime, dts: gst::ClockTime, duration: gst::ClockTime,
fps: u8, input_ts: f64,
pipeline_id: uuid::Uuid,
pipeline_id: uuid::Uuid, frame_number: u64
) -> Self {
let modified = original.to_owned();
RgbFrame {
Expand All @@ -49,6 +50,7 @@ impl RgbFrame {
inference_output: ndarray::ArrayBase::zeros(ndarray::IxDyn(&[0])),
pipeline_id,
user_data: UserData::Empty,
frame_number,
}
}

Expand All @@ -62,7 +64,7 @@ impl RgbFrame {
inference_input: ndarray::ArrayBase<ndarray::OwnedRepr<f32>, ndarray::Dim<ndarray::IxDynImpl>>,
inference_output: ndarray::ArrayBase<ndarray::OwnedRepr<f32>, ndarray::Dim<ndarray::IxDynImpl>>,
pipeline_id: &str,
user_data: UserData,
user_data: UserData, frame_number: u64,
) -> Self {
RgbFrame {
uuid: uuid::Uuid::from_str(uuid).unwrap(),
Expand All @@ -74,7 +76,8 @@ impl RgbFrame {
fps, input_ts,
inference_input, inference_output,
pipeline_id: uuid::Uuid::from_str(pipeline_id).unwrap(),
user_data: user_data
user_data: user_data,
frame_number,
}
}

Expand Down Expand Up @@ -128,15 +131,18 @@ impl RgbFrame {
pub fn set_inference_output(&mut self, output_data: ndarray::ArrayBase<ndarray::OwnedRepr<f32>, ndarray::Dim<ndarray::IxDynImpl>>) {
self.inference_output = output_data;
}
pub fn get_pipeline_id(&self) -> uuid::Uuid {
self.pipeline_id
pub fn get_pipeline_id(&self) -> &uuid::Uuid {
&self.pipeline_id
}
pub fn set_pipeline_id(&mut self, pipeline_id: &str) {
self.pipeline_id = uuid::Uuid::from_str(pipeline_id).unwrap();
}
pub fn get_user_data(&self) -> &UserData {
&self.user_data
}
pub fn get_frame_number(&self) -> &u64 {
&self.frame_number
}
}

pub enum Frame {
Expand All @@ -148,12 +154,13 @@ impl Frame {
width: usize, height: usize,
pts: gst::ClockTime, dts: gst::ClockTime, duration: gst::ClockTime,
fps: u8, input_ts: f64,
pipeline_id: uuid::Uuid
pipeline_id: uuid::Uuid, frame_number: u64,
) -> Self {
let rgb = RgbFrame::new(
original, width, height,
pts, dts, duration,
fps, input_ts, pipeline_id
fps, input_ts,
pipeline_id, frame_number
);
Self::RgbFrame(rgb)
}
Expand Down Expand Up @@ -188,4 +195,14 @@ impl Frame {
Frame::RgbFrame(frame) => { frame.set_inference_output(output_data); },
}
}
pub fn get_pipeline_id(&self) -> &uuid::Uuid {
match self {
Frame::RgbFrame(frame) => frame.get_pipeline_id(),
}
}
pub fn get_frame_number(&self) -> &u64 {
match self {
Frame::RgbFrame(frame) => frame.get_frame_number(),
}
}
}
6 changes: 5 additions & 1 deletion pipeless/src/input/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ fn on_new_sample(
pipeless_pipeline_id: uuid::Uuid,
appsink: &gst_app::AppSink,
pipeless_bus_sender: &tokio::sync::mpsc::UnboundedSender<pipeless::events::Event>,
frame_number: &mut u64,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let sample = appsink.pull_sample().map_err(|_err| {
error!("Sample is None");
Expand Down Expand Up @@ -116,11 +117,12 @@ fn on_new_sample(
gst::FlowError::Error
})?;

*frame_number += 1;
let frame = pipeless::data::Frame::new_rgb(
ndframe, width, height,
pts, dts, duration,
fps as u8, frame_input_instant,
pipeless_pipeline_id
pipeless_pipeline_id, *frame_number,
);
// The event takes ownership of the frame
pipeless::events::publish_new_frame_change_event_sync(
Expand Down Expand Up @@ -413,11 +415,13 @@ fn create_gst_pipeline(
.new_sample(
{
let pipeless_bus_sender = pipeless_bus_sender.clone();
let mut frame_number: u64 = 0; // Used to set the frame number
move |appsink: &gst_app::AppSink| {
on_new_sample(
pipeless_pipeline_id,
appsink,
&pipeless_bus_sender,
&mut frame_number,
)
}
}).build();
Expand Down
41 changes: 37 additions & 4 deletions pipeless/src/stages/hook.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{sync::Arc, fmt};
use std::{collections::HashMap, fmt, sync::Arc};
use log::error;
use tokio::sync::Mutex;
use tokio::sync::{Mutex, RwLock};
use uuid::Uuid;

use crate as pipeless;

Expand Down Expand Up @@ -46,6 +47,9 @@ impl StatelessHook {
pub struct StatefulHook {
h_type: HookType,
h_body: Arc<Mutex<dyn HookTrait>>,
// Stateful hooks would not necesarily require to process sequentially, however, in almost all cases preserving a state in CV are related to sorted processing, such as tracking.
// Maps the stream uuid to the last frame processed for the stream. Used for stateful sequential sorted processing.
last_frame_map: Arc<RwLock<HashMap<Uuid, u64>>>,
}
impl StatefulHook {
fn get_hook_type(&self) -> HookType {
Expand All @@ -54,6 +58,21 @@ impl StatefulHook {
fn get_hook_body(&self) -> Arc<Mutex<dyn HookTrait>> {
self.h_body.clone()
}
async fn last_processed_frame_number(&self, pipeline_id: &Uuid) -> u64 {
let read_guard = self.last_frame_map.read().await;
match read_guard.get(pipeline_id) {
Some(n) => *n,
None => 0,
}
}
async fn increment_last_processed(&self, pipeline_id: &Uuid) {
let mut write_guard = self.last_frame_map.write().await;
let last = match write_guard.get(pipeline_id) {
Some(n) => *n,
None => 0,
};
write_guard.insert(*pipeline_id, last + 1);
}
}

/// Pipeless hooks are the minimal executable unit
Expand Down Expand Up @@ -85,6 +104,7 @@ impl Hook {
let hook = StatefulHook {
h_type: hook_type,
h_body: hook_body,
last_frame_map: Arc::new(RwLock::new(HashMap::new())),
};
Self::StatefulHook(hook)
}
Expand Down Expand Up @@ -122,15 +142,28 @@ impl Hook {
}
},
Hook::StatefulHook(hook) => {
let frame_pipeline_id = frame.get_pipeline_id().clone();
// Wait until it's this frame's turn to be processed
{
let prev_frame_number = frame.get_frame_number() - 1;
let mut last_processed = hook.last_processed_frame_number(&frame_pipeline_id).await;
while last_processed != prev_frame_number {
last_processed = hook.last_processed_frame_number(&frame_pipeline_id).await;
tokio::task::yield_now().await;
}
}

// NOTE: the following is sub-optimal. We can't use rayon with async code and
// for stateful hooks we need to lock the mutex before running.
let worker_res = tokio::task::spawn_blocking({
let stage_context = stage_context.clone();
let hook = hook.clone();
|| async move {
move || async move{
let h_body = hook.get_hook_body();
let locked_hook = h_body.lock().await;
locked_hook.exec_hook(frame, &stage_context)
let res = locked_hook.exec_hook(frame, &stage_context);
hook.increment_last_processed(&frame_pipeline_id).await;
res
}
}).await;
match worker_res {
Expand Down
4 changes: 3 additions & 1 deletion pipeless/src/stages/languages/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ impl IntoPy<Py<PyAny>> for RgbFrame {
dict.set_item("inference_output", self.get_inference_output().to_pyarray(py)).unwrap();
dict.set_item("pipeline_id", self.get_pipeline_id().to_string()).unwrap();
dict.set_item("user_data", self.get_user_data()).unwrap();
dict.set_item("frame_number", self.get_frame_number()).unwrap();
dict.into()
}
}
Expand Down Expand Up @@ -84,12 +85,13 @@ impl<'source> FromPyObject<'source> for RgbFrame {
let inference_output =inference_output_ndarray;
let pipeline_id = ob.get_item("pipeline_id").unwrap().extract()?;
let user_data = ob.get_item("user_data").unwrap().extract()?;
let frame_number = ob.get_item("frame_number").unwrap().extract()?;

let frame = RgbFrame::from_values(
uuid, original, modified, width, height,
pts, dts, duration, fps, input_ts,
inference_input, inference_output,
pipeline_id, user_data
pipeline_id, user_data, frame_number
);

Ok(frame)
Expand Down

0 comments on commit 85aed5f

Please sign in to comment.