Skip to content

Commit

Permalink
Merge pull request #78 from pipeless-ai/kv_store
Browse files Browse the repository at this point in the history
feat(kv_store): Add KV store
  • Loading branch information
miguelaeh committed Nov 17, 2023
2 parents 4d140a6 + 79653ea commit 7c698c9
Show file tree
Hide file tree
Showing 8 changed files with 216 additions and 18 deletions.
101 changes: 97 additions & 4 deletions pipeless/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion pipeless/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pipeless-ai"
version = "1.0.2"
version = "1.1.0"
edition = "2021"
authors = ["Miguel A. Cabrera Minagorri"]
description = "An open-source computer vision framework to build and deploy applications in minutes"
Expand Down Expand Up @@ -40,6 +40,8 @@ clap = { version = "4.4.7", features = ["derive"] }
reqwest = { version = "0.11.22", features = ["blocking", "json"] }
openssl = { version = "0.10", features = ["vendored"] }
json_to_table = "0.6.0"
sled = "0.34.7"
lazy_static = "1.4.0"

[dependencies.uuid]
version = "1.4.1"
Expand Down
16 changes: 14 additions & 2 deletions pipeless/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ pub struct RgbFrame {
input_ts: std::time::Instant, // to measure processing performance
inference_input: ndarray::ArrayBase<ndarray::OwnedRepr<f32>, ndarray::Dim<ndarray::IxDynImpl>>,
inference_output: ndarray::ArrayBase<ndarray::OwnedRepr<f32>, ndarray::Dim<ndarray::IxDynImpl>>,
pipeline_id: uuid::Uuid,
}
impl RgbFrame {
pub fn new(
original: ndarray::Array3<u8>,
width: usize, height: usize,
pts: gst::ClockTime, dts: gst::ClockTime, duration: gst::ClockTime,
fps: u8, input_ts: std::time::Instant,
pipeline_id: uuid::Uuid,
) -> Self {
let modified = original.to_owned();
RgbFrame {
Expand All @@ -33,6 +35,7 @@ impl RgbFrame {
input_ts,
inference_input: ndarray::ArrayBase::zeros(ndarray::IxDyn(&[1])),
inference_output: ndarray::ArrayBase::zeros(ndarray::IxDyn(&[1])),
pipeline_id,
}
}

Expand All @@ -45,6 +48,7 @@ impl RgbFrame {
fps: u8, input_ts: u64,
inference_input: ndarray::ArrayBase<ndarray::OwnedRepr<f32>, ndarray::Dim<ndarray::IxDynImpl>>,
inference_output: ndarray::ArrayBase<ndarray::OwnedRepr<f32>, ndarray::Dim<ndarray::IxDynImpl>>,
pipeline_id: &str,
) -> Self {
RgbFrame {
uuid: uuid::Uuid::from_str(uuid).unwrap(),
Expand All @@ -55,7 +59,8 @@ impl RgbFrame {
duration: gst::ClockTime::from_mseconds(duration),
fps,
input_ts: std::time::Instant::now() - std::time::Duration::from_millis(input_ts),
inference_input, inference_output
inference_input, inference_output,
pipeline_id: uuid::Uuid::from_str(pipeline_id).unwrap(),
}
}

Expand Down Expand Up @@ -118,6 +123,12 @@ impl RgbFrame {
pub fn set_inference_output(&mut self, output_data: ndarray::ArrayBase<ndarray::OwnedRepr<f32>, ndarray::Dim<ndarray::IxDynImpl>>) {
self.inference_output = output_data;
}
pub fn get_pipeline_id(&self) -> uuid::Uuid {
self.pipeline_id
}
pub fn set_pipeline_id(&mut self, pipeline_id: &str) {
self.pipeline_id = uuid::Uuid::from_str(pipeline_id).unwrap();
}
}

pub enum Frame {
Expand All @@ -129,11 +140,12 @@ impl Frame {
width: usize, height: usize,
pts: gst::ClockTime, dts: gst::ClockTime, duration: gst::ClockTime,
fps: u8, input_ts: std::time::Instant,
pipeline_id: uuid::Uuid
) -> Self {
let rgb = RgbFrame::new(
original, width, height,
pts, dts, duration,
fps, input_ts
fps, input_ts, pipeline_id
);
Self::RgbFrame(rgb)
}
Expand Down
10 changes: 7 additions & 3 deletions pipeless/src/input/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ impl StreamDef {
}

fn on_new_sample(
pipeless_pipeline_id: uuid::Uuid,
appsink: &gst_app::AppSink,
pipeless_bus_sender: &tokio::sync::mpsc::UnboundedSender<pipeless::events::Event>,
) -> Result<gst::FlowSuccess, gst::FlowError> {
Expand Down Expand Up @@ -97,7 +98,8 @@ fn on_new_sample(
let frame = pipeless::data::Frame::new_rgb(
ndframe, width, height,
pts, dts, duration,
fps as u8, frame_input_instant
fps as u8, frame_input_instant,
pipeless_pipeline_id
);
// The event takes ownership of the frame
pipeless::events::publish_new_frame_change_event_sync(
Expand Down Expand Up @@ -300,6 +302,7 @@ fn on_bus_message(
}

fn create_gst_pipeline(
pipeless_pipeline_id: uuid::Uuid,
input_uri: &str,
pipeless_bus_sender: &tokio::sync::mpsc::UnboundedSender<pipeless::events::Event>,
) -> gst::Pipeline {
Expand All @@ -322,8 +325,9 @@ fn create_gst_pipeline(
let pipeless_bus_sender = pipeless_bus_sender.clone();
move |appsink: &gst_app::AppSink| {
on_new_sample(
pipeless_pipeline_id,
appsink,
&pipeless_bus_sender
&pipeless_bus_sender,
)
}
}).build();
Expand All @@ -350,7 +354,7 @@ impl Pipeline {
pipeless_bus_sender: &tokio::sync::mpsc::UnboundedSender<pipeless::events::Event>,
) -> Self {
let input_uri = stream.get_video().get_uri();
let gst_pipeline = create_gst_pipeline(input_uri, pipeless_bus_sender);
let gst_pipeline = create_gst_pipeline(id, input_uri, pipeless_bus_sender);
let pipeline = Pipeline {
id,
stream,
Expand Down
1 change: 1 addition & 0 deletions pipeless/src/kvs/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod store;
61 changes: 61 additions & 0 deletions pipeless/src/kvs/store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use log::error;
use sled;
use lazy_static::lazy_static;

// We assume the type implementing StoreInterface can be send thread safely
pub trait StoreInterface: Sync {
fn get(&self, key: &str) -> String;
fn set(&self, key: &str, value: &str);
}

struct LocalStore {
backend: sled::Db,
}
impl LocalStore {
fn new() -> Self {
let db_path = "/tmp/.pipeless_kv_store";
let db = sled::open(db_path).expect("Failed to open KV store");
Self { backend: db }
}
}
impl StoreInterface for LocalStore {
/// Insert a KV pair, logs an error if it fails, but do not stop the program.
/// Trying to get the key will return an empty string.
fn set(&self, key: &str, value: &str) {
if let Err(err) = self.backend.insert(key, value) {
error!("Error inserting key {} with value {} in local KV store. Error: {}", key, value, err);
}
}

/// Returns the value or an empty string
fn get(&self, key: &str) -> String {
match self.backend.get(key) {
Ok(v) => {
match v.as_deref() {
Some(v) => std::str::from_utf8(v).unwrap().into(),
None => String::from("")
}
},
Err(err) => {
error!("Error getting value for key {} from local KV store. Error: {}", key, err);
String::from("")
}
}
}
}

// TODO: setup Redis or any other distributed solution.
// Important: Note that any type implementing StoreInterface must be thread safe
struct DistributedStore {}
impl DistributedStore {
fn new() -> Self { unimplemented!() }
}
impl StoreInterface for DistributedStore {
fn get(&self, key: &str) -> String { unimplemented!() }
fn set(&self, key: &str, value: &str) { unimplemented!() }
}

lazy_static! {
// TODO: Add support for distributed store do not hardcode the local one
pub static ref KV_STORE: Box<dyn StoreInterface> = Box::new(LocalStore::new());
}
3 changes: 2 additions & 1 deletion pipeless/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ pub mod events;
pub mod data;
pub mod dispatcher;
pub mod stages;
pub mod cli;
pub mod cli;
pub mod kvs;
Loading

0 comments on commit 7c698c9

Please sign in to comment.