Skip to content

Commit

Permalink
Merge pull request #90 from pipeless-ai/bottleneck
Browse files Browse the repository at this point in the history
Fix bottleneck in path executor
  • Loading branch information
miguelaeh committed Nov 25, 2023
2 parents c31c857 + 84895f7 commit 39661cd
Show file tree
Hide file tree
Showing 12 changed files with 273 additions and 97 deletions.
35 changes: 34 additions & 1 deletion pipeless/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion pipeless/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pipeless-ai"
version = "1.1.3"
version = "1.1.4"
edition = "2021"
authors = ["Miguel A. Cabrera Minagorri"]
description = "An open-source computer vision framework to build and deploy applications in minutes"
Expand Down Expand Up @@ -42,6 +42,8 @@ openssl = { version = "0.10", features = ["vendored"] }
json_to_table = "0.6.0"
sled = "0.34.7"
lazy_static = "1.4.0"
rayon = "1.8.0"
num_cpus = "1.16.0"

[dependencies.uuid]
version = "1.4.1"
Expand Down
2 changes: 1 addition & 1 deletion pipeless/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ impl Bus {
let (tx, mut rx) = tokio::sync::mpsc::channel::<()>(1);

tokio::select! {
_ = self.receiver.for_each_concurrent(limit, move |event| func(event, tx.clone())) => error!("This should not be reached!"),
_ = self.receiver.for_each_concurrent(limit, |event| func(event, tx.clone())) => error!("This should not be reached!"),
_ = rx.recv() => info!("Stream loop stopped"),
};
}
Expand Down
8 changes: 3 additions & 5 deletions pipeless/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ impl Manager {
frames_path,
)?));

Ok(Self {pipeline, dispatcher_sender })
Ok(Self { pipeline, dispatcher_sender })
}

// Start takes ownership of self because we have to access the bus,
Expand All @@ -170,7 +170,7 @@ impl Manager {
let rw_pipeline = rw_pipeline.clone();
let dispatcher_sender = dispatcher_sender.clone();
let pipeless_bus_sender = event_bus.get_sender();
let concurrent_limit = 10;
let concurrent_limit = num_cpus::get() * 2; // NOTE: Making benchmarks we found this is a good value
let frame_path_executor_arc = frame_path_executor_arc.clone();
event_bus.process_events(concurrent_limit,
move |event, end_signal| {
Expand All @@ -187,12 +187,10 @@ impl Manager {
let read_guard = rw_pipeline.read().await;
frame_path = read_guard.get_frames_path();
}

let out_frame_opt;
{
let frame_path_executor = frame_path_executor_arc.read().await;
// Execute the stage, it will execute
out_frame_opt = frame_path_executor.execute_path(frame, frame_path);
out_frame_opt = frame_path_executor.execute_path(frame, frame_path).await;
}

if let Some(out_frame) = out_frame_opt {
Expand Down
168 changes: 137 additions & 31 deletions pipeless/src/stages/hook.rs
Original file line number Diff line number Diff line change
@@ -1,50 +1,156 @@
use std::{sync::Arc, fmt};
use log::error;
use tokio::sync::Mutex;

use crate as pipeless;

pub trait HookTrait {
#[derive(Clone,Copy,PartialEq)]
pub enum HookType {
PreProcess,
Process,
PostProcess,
}
impl fmt::Display for HookType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", match self {
HookType::PreProcess => "pre_process",
HookType::Process => "process",
HookType::PostProcess => "post_process",
})
}
}

// We have to add Send + Sync to be able to use spawn_blocking to call exec_hook
pub trait HookTrait: Send + Sync {
fn exec_hook(
&self,
frame: pipeless::data::Frame,
stage_context: &pipeless::stages::stage::Context
) -> Option<pipeless::data::Frame>;
}

/// Hook definitions per language. All of them have to implement the HookTrait
pub enum HookDef {
PythonHook(pipeless::stages::languages::python::PythonHook),
RustHook(pipeless::stages::languages::rust::RustHook),
InferenceHook(pipeless::stages::inference::hook::InferenceHook), // Hook that runs inference on a model
}
impl HookDef {
pub fn exec_hook(
&self,
frame: pipeless::data::Frame,
stage_context: &pipeless::stages::stage::Context
) -> Option<pipeless::data::Frame> {
let frame = match self {
HookDef::PythonHook(hook) => hook.exec_hook(frame, stage_context),
HookDef::RustHook(hook) => hook.exec_hook(frame, stage_context),
HookDef::InferenceHook(hook) => hook.exec_hook(frame, stage_context),
};

frame
#[derive(Clone)]
pub struct StatelessHook {
h_type: HookType,
h_body: Arc<dyn HookTrait>,
}
impl StatelessHook {
fn get_hook_type(&self) -> HookType {
self.h_type
}
fn get_hook_body(&self) -> Arc<dyn HookTrait> {
self.h_body.clone()
}
}
#[derive(Clone)]
pub struct StatefulHook {
h_type: HookType,
h_body: Arc<Mutex<dyn HookTrait>>,
}
impl StatefulHook {
fn get_hook_type(&self) -> HookType {
self.h_type
}
fn get_hook_body(&self) -> Arc<Mutex<dyn HookTrait>> {
self.h_body.clone()
}
}

/// Pipeless hooks are the minimal executable unit. Hooks are stateless.
/// Pipeless hooks are the minimal executable unit
/// Stateless hooks can be used by many threads at the same time without issues, they could even be cloned,
/// since they usually contain, for example, simple Python modules.
/// Stateful hooks cannot be cloned, thus, they require to lock the hook, avoiding corrupting the state.
/// The execution of a stage that contains stateful hooks is slower than one based on just stateless hooks.
/// Stateful hooks introduce a bottleneck since when a stateful hook is executed for a frame, the rest of
/// frames are waiting for the lock to be released, however, in the stateless case, we can safely access the
/// content of the hook from many frames at the same time. It is up to the user to elect when to use each one
/// Note Stateless hooks use Arc while Stateful use Arc_Mutex
#[derive(Clone)] // Cloning will not duplicate data since we are using Arc
pub enum Hook {
PreProcessHook(HookDef),
ProcessHook(HookDef),
PostProcessHook(HookDef),
StatelessHook(StatelessHook),
StatefulHook(StatefulHook),
}
unsafe impl std::marker::Sync for Hook {}
unsafe impl std::marker::Send for Hook {}
impl Hook {
/// Unpack the Hook to return the contained HookDef no matter the variant
pub fn get_hook_def(&self) -> &HookDef {
pub fn new_stateless(hook_type: HookType, hook_body: Arc<dyn HookTrait>) -> Self {
let hook = StatelessHook {
h_type: hook_type,
h_body: hook_body,
};
Self::StatelessHook(hook)
}
pub fn new_stateful(hook_type: HookType, hook_body: Arc<Mutex<dyn HookTrait>>) -> Self {
let hook = StatefulHook {
h_type: hook_type,
h_body: hook_body,
};
Self::StatefulHook(hook)
}

pub async fn exec_hook(
&self,
frame: pipeless::data::Frame,
stage_context: Arc<pipeless::stages::stage::Context>,
) -> std::option::Option<pipeless::data::Frame> {
match self {
Hook::PreProcessHook(def)
| Hook::ProcessHook(def)
| Hook::PostProcessHook(def) => {
def
Hook::StatelessHook(hook) => {
// Offload the hook execution which is usually a CPU bounded (and intensive) task
// out of the tokio thread pool. We use rayon because it uses a pool equal to the number of cores,
// which allows to process the optimal number of frames at once.
let (send, recv) = tokio::sync::oneshot::channel();
rayon::spawn({
let stage_context = stage_context.clone();
let hook = hook.clone();
move || {
let f = hook.get_hook_body().exec_hook(frame, &stage_context);
// Send the result back to Tokio.
let _ = send.send(f);
}
});
// Wait for the rayon task.
let worker_res = recv.await;
match worker_res {
Ok(f) => {
return f;
},
Err(err) => {
error!("Error pulling results from rayon worker: {}", err);
return None;
}
}
},
Hook::StatefulHook(hook) => {
// NOTE: the following is sub-optimal. We can't use rayon with async code and
// for stateful hooks we need to lock the mutex before running.
let worker_res = tokio::task::spawn_blocking({
let stage_context = stage_context.clone();
let hook = hook.clone();
|| async move {
let h_body = hook.get_hook_body();
let locked_hook = h_body.lock().await;
locked_hook.exec_hook(frame, &stage_context)
}
}).await;
match worker_res {
Ok(f) => f.await,
Err(err) => {
error!("Error getting result from the tokio worker: {}", err);
None
}
}
},
}
}
}

pub fn get_hook_type(&self) -> HookType {
match self {
Hook::StatelessHook(hook) => {
hook.get_hook_type()
},
Hook::StatefulHook(hook) => {
hook.get_hook_type()
},
}
}
}
19 changes: 12 additions & 7 deletions pipeless/src/stages/inference/hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,21 @@ use crate as pipeless;
use crate::stages::hook::HookTrait;
use super::{runtime::InferenceRuntime, session::{InferenceSession, SessionParams}};

/// Pipeless hooks are stateless, expect for the inference hook, which
/// maintains the inference session.
/// Since the hook is associated to a stage, it will last as long as the stage
/// Inference hooks maintain the inference session.
/// When created as stateless hooks, the inference session will be duplicated to every worker
/// When using a model that maintains internal state a stateful hook should be used.
/// Since the hook is associated to a stage, the inference session will last as long as the stage
/// To use different sessions per stream, the stage should be duplicated. Creating a symlink
/// in the project folder is enough to split the inference session.
pub struct InferenceHook {
session: InferenceSession,
}
impl InferenceHook {
pub fn new(runtime: &InferenceRuntime, session_params: SessionParams, model_uri: &str) -> Self {
pub fn new(
runtime: &InferenceRuntime,
session_params: SessionParams,
model_uri: &str
) -> Self {
let session = match runtime {
InferenceRuntime::Onnx => {
let onnx_session_result = pipeless::stages::inference::onnx::OnnxSession::new(model_uri, session_params);
Expand All @@ -24,9 +31,7 @@ impl InferenceHook {
),
};

Self {
session,
}
Self { session }
}
}
impl HookTrait for InferenceHook {
Expand Down
1 change: 0 additions & 1 deletion pipeless/src/stages/inference/onnx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ impl OnnxSessionParams {
}
}
}

pub struct OnnxSession {
session: ort::Session,
}
Expand Down
4 changes: 2 additions & 2 deletions pipeless/src/stages/languages/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use log::{error, warn};
use pyo3::prelude::*;
use numpy;

use crate::{data::{RgbFrame, Frame}, stages::{hook::HookTrait, stage::ContextTrait}, stages::stage::Context, kvs::store};
use crate::{data::{RgbFrame, Frame}, stages::{hook::{HookTrait, HookType}, stage::ContextTrait}, stages::stage::Context, kvs::store};

/// Allows a Frame to be converted from Rust to Python
impl IntoPy<Py<PyAny>> for Frame {
Expand Down Expand Up @@ -143,7 +143,7 @@ pub struct PythonHook {
module: Py<pyo3::types::PyModule>,
}
impl PythonHook {
pub fn new(stage_name: &str, hook_type: &str, py_code: &str) -> Self {
pub fn new(hook_type: HookType, stage_name: &str,py_code: &str) -> Self {
// The wrapper removes the need for the user to return a frame from each hook
// Also, injects the set and get functions for the KV store namespacing the keys
// to avoid conflicts between streams in the format stage_name:pipeline_id:user_provided_key
Expand Down
6 changes: 2 additions & 4 deletions pipeless/src/stages/languages/rust.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use log::error;

use crate::{stages::{hook::HookTrait, stage::{Context, ContextTrait}}, data::Frame};

pub struct RustStageContext {
Expand All @@ -10,9 +9,8 @@ impl ContextTrait<RustStageContext> for RustStageContext {
unimplemented!();
}
}
pub struct RustHook {
// TODO
}

pub struct RustHook {}
impl HookTrait for RustHook {
fn exec_hook(&self, frame: Frame, _stage_context: &Context) -> Option<Frame> {
let frame = frame;
Expand Down
Loading

0 comments on commit 39661cd

Please sign in to comment.