Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ members = [

[workspace.package]
authors = ["Sift Software Engineers <engineering@siftstack.com>"]
version = "0.7.0-rc.5"
version = "0.7.0-rc.6"
edition = "2024"
categories = ["aerospace", "science::robotics"]
homepage = "https://github.com/sift-stack/sift"
Expand All @@ -27,11 +27,11 @@ chrono = { version = "0.4.39", default-features = false, features = ["clock"] }
pbjson-types = "^0.7"
tonic = { version = "^0.12", features = ["gzip"] }

sift_connect = { version = "0.7.0-rc.5", path = "rust/crates/sift_connect" }
sift_rs = { version = "0.7.0-rc.5", path = "rust/crates/sift_rs" }
sift_error = { version = "0.7.0-rc.5", path = "rust/crates/sift_error" }
sift_stream = { version = "0.7.0-rc.5", path = "rust/crates/sift_stream" }
sift_pbfs = { version = "0.7.0-rc.5", path = "rust/crates/sift_pbfs" }
sift_connect = { version = "0.7.0-rc.6", path = "rust/crates/sift_connect" }
sift_rs = { version = "0.7.0-rc.6", path = "rust/crates/sift_rs" }
sift_error = { version = "0.7.0-rc.6", path = "rust/crates/sift_error" }
sift_stream = { version = "0.7.0-rc.6", path = "rust/crates/sift_stream" }
sift_pbfs = { version = "0.7.0-rc.6", path = "rust/crates/sift_pbfs" }

sift_stream_bindings = { version = "0.1.0", path = "rust/crates/sift_stream_bindings" }

Expand Down
10 changes: 10 additions & 0 deletions rust/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,16 @@ All notable changes to this project will be documented in this file.

This project adheres to [Semantic Versioning](http://semver.org/).

## [v0.7.0-rc.6] - November 24, 2025
### What's New
#### SiftStream APIs to Utilize `FlowDescriptor` and `FlowBuilder`
Two new APIs have been added to allow use of the `FlowDescriptor` and `FlowBuilder` structs added
previously.

### Full Changelog
- [Get FlowDescriptor, send_requests_nonblocking]()


## [v0.7.0-rc.5] - November 24, 2025
### What's New
#### SiftStream FlowDescriptors and FlowBuilders
Expand Down
44 changes: 44 additions & 0 deletions rust/crates/sift_stream/src/stream/mode/ingestion_config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::super::{SiftStream, SiftStreamMode, channel::ChannelValue, time::TimeValue};
use crate::{
FlowDescriptor,
metrics::SiftStreamMetrics,
stream::{
run::{RunSelector, load_run_by_form, load_run_by_id},
Expand Down Expand Up @@ -213,6 +214,27 @@ impl SiftStream<IngestionConfigMode> {
Ok(())
}

/// This method offers a way to send data in a manner that's identical to the raw
/// [`gRPC service`] for ingestion-config based streaming. Users are expected to handle
/// channel value ordering as well as empty values correctly.
///
/// ### Important
///
/// Note if using this interface, you should use [FlowBuilder::request] to ensure proper
/// building of the request.
///
/// [`gRPC service`]: https://github.com/sift-stack/sift/blob/main/protos/sift/ingest/v1/ingest.proto#L11
pub fn send_requests_nonblocking<I>(&mut self, requests: I) -> Result<()>
where
I: IntoIterator<Item = IngestWithConfigDataStreamRequest>,
{
for req in requests {
self.metrics.messages_received.increment();
self.send_impl(req)?;
}
Ok(())
}

/// Concerned with sending the actual ingest request to [DataStream] which will then write it
/// to the gRPC stream. If backups are enabled, the request will be backed up as well.
fn send_impl(&mut self, request: IngestWithConfigDataStreamRequest) -> Result<()> {
Expand Down Expand Up @@ -368,6 +390,28 @@ impl SiftStream<IngestionConfigMode> {
.collect()
}

/// Get the flow descriptor for a given flow name.
pub fn get_flow_descriptor(&self, flow_name: &str) -> Result<FlowDescriptor<String>> {
let Some(flow) = self.mode.flows_by_name.get(flow_name) else {
return Err(Error::new_msg(
ErrorKind::NotFoundError,
format!("flow '{}' not found", flow_name),
));
};

if flow.is_empty() {
return Err(Error::new_msg(
ErrorKind::NotFoundError,
format!("flow '{}' not found", flow_name),
));
}

FlowDescriptor::try_from((
self.mode.ingestion_config.ingestion_config_id.clone(),
&flow[0],
))
}

/// Attach a run to the stream. Any data provided through [SiftStream::send] after return
/// of this function will be associated with the run.
pub async fn attach_run(&mut self, run_selector: RunSelector) -> Result<()> {
Expand Down
Loading