Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rust/crates/sift_stream/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ pub use stream::{
RetryPolicy, SiftStream,
builder::{IngestionConfigForm, RecoveryStrategy, RunForm, SiftStreamBuilder},
channel::{ChannelValue, Value},
flow::{ChannelIndex, FlowBuilder, FlowDescriptor, FlowDescriptorBuilder},
mode::ingestion_config::{Flow, IngestionConfigMode},
time::TimeValue,
};
Expand Down
64 changes: 33 additions & 31 deletions rust/crates/sift_stream/src/stream/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,38 @@ pub enum Value {
BitField(Vec<u8>),
}

impl Value {
pub(crate) fn pb_data_type(&self) -> ChannelDataType {
match self {
Value::Bool(_) => ChannelDataType::Bool,
Value::String(_) => ChannelDataType::String,
Value::Double(_) => ChannelDataType::Double,
Value::Float(_) => ChannelDataType::Float,
Value::Int32(_) => ChannelDataType::Int32,
Value::Int64(_) => ChannelDataType::Int64,
Value::Uint32(_) => ChannelDataType::Uint32,
Value::Uint64(_) => ChannelDataType::Uint64,
Value::Enum(_) => ChannelDataType::Enum,
Value::BitField(_) => ChannelDataType::BitField,
}
}

pub(crate) fn pb_value(&self) -> Type {
match self {
Value::Bool(val) => Type::Bool(*val),
Value::String(val) => Type::String(val.clone()),
Value::Double(val) => Type::Double(*val),
Value::Float(val) => Type::Float(*val),
Value::Int32(val) => Type::Int32(*val),
Value::Int64(val) => Type::Int64(*val),
Value::Uint32(val) => Type::Uint32(*val),
Value::Uint64(val) => Type::Uint64(*val),
Value::Enum(val) => Type::Enum(*val),
Value::BitField(val) => Type::BitField(val.clone()),
}
}
}

impl ChannelValue {
/// Creates a [ChannelValue] for a channel of name `name`.
///
Expand All @@ -43,38 +75,8 @@ impl ChannelValue {
}
}

pub(crate) fn empty_pb() -> Type {
Type::Empty(pbjson_types::Empty {})
}

pub(crate) fn pb_data_type(&self) -> i32 {
match self.value {
Value::Bool(_) => i32::from(ChannelDataType::Bool),
Value::String(_) => i32::from(ChannelDataType::String),
Value::Double(_) => i32::from(ChannelDataType::Double),
Value::Float(_) => i32::from(ChannelDataType::Float),
Value::Int32(_) => i32::from(ChannelDataType::Int32),
Value::Int64(_) => i32::from(ChannelDataType::Int64),
Value::Uint32(_) => i32::from(ChannelDataType::Uint32),
Value::Uint64(_) => i32::from(ChannelDataType::Uint64),
Value::Enum(_) => i32::from(ChannelDataType::Enum),
Value::BitField(_) => i32::from(ChannelDataType::BitField),
}
}

pub(crate) fn pb_value(&self) -> Type {
match self.value {
Value::Bool(val) => Type::Bool(val),
Value::String(ref val) => Type::String(val.clone()),
Value::Double(val) => Type::Double(val),
Value::Float(val) => Type::Float(val),
Value::Int32(val) => Type::Int32(val),
Value::Int64(val) => Type::Int64(val),
Value::Uint32(val) => Type::Uint32(val),
Value::Uint64(val) => Type::Uint64(val),
Value::Enum(val) => Type::Enum(val),
Value::BitField(ref val) => Type::BitField(val.clone()),
}
self.value.pb_value()
}
}

Expand Down
291 changes: 291 additions & 0 deletions rust/crates/sift_stream/src/stream/flow/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,297 @@
use std::collections::HashMap;
use std::hash::Hash;

use sift_error::prelude::*;
use sift_rs::common::r#type::v1::ChannelDataType;
use sift_rs::ingest::v1::IngestWithConfigDataChannelValue;
use sift_rs::ingest::v1::{
IngestWithConfigDataStreamRequest, ingest_with_config_data_channel_value::Type,
};
use sift_rs::ingestion_configs::v2::FlowConfig;

use crate::{TimeValue, Value};

/// Represents the index of a channel in a flow.
///
/// This provides a convenient and performant way to access the value at the given channel index
/// when building a new flow.
///
/// This type is only returned by the [`FlowDescriptor`] when adding a new channel to the
/// flow ensuring that the index is safe to use.
#[derive(Copy, Clone, PartialEq, Eq, Hash)]
pub struct ChannelIndex(usize);

/// Describes the schema of a flow, providing a convenient, performant, and correct way to
/// build the flow being described.
///
/// The descriptor itself is immutable, to ensure that the flow is constructed correctly
/// since successful ingestion requires Sift and the client to agree on the schema of the flow.
///
/// While the key `K` can be arbitrary, it is recommended to use a trivial key that avoids
/// allocations, such as a `usize` or `u32`, though for convenience, a string (the channel
/// name) could also be used and will still help minimize additional string allocations.
///
/// # Example
///
/// ```rust
/// use sift_stream::{FlowDescriptor, FlowDescriptorBuilder, FlowBuilder, ChannelDataType};
///
/// let mut flow_descriptor_builder = FlowDescriptorBuilder::new("ingestion_config_id", "my_flow_name");
/// let my_channel_idx = flow_descriptor_builder.add("my_channel_key", ChannelDataType::String);
/// let my_other_channel_idx = flow_descriptor_builder.add("my_other_channel_key", ChannelDataType::Uint64);
///
/// let flow_descriptor = flow_descriptor_builder.build();
///
/// let mut flow_builder = FlowBuilder::new(&flow_descriptor);
/// flow_builder.set(my_channel_idx, "my_value".to_string());
/// flow_builder.set_with_key("my_other_channel_key", 123_u64);
/// ```
#[derive(Clone)]
pub struct FlowDescriptor<K> {
/// The name of the flow.
name: String,

/// The ID of the ingestion config that this flow belongs to.
ingestion_config_id: String,

/// The data types of the channels in the flow which will be used
/// to validate the values when building a new flow.
field_types: Vec<ChannelDataType>,

/// A mapping of arbitrary keys to the index of the channel in the flow.
///
/// Ideally the key should be a trivial key that avoids allocations, though
/// for convenience, a string (the channel name) could also be used.
index_map: HashMap<K, ChannelIndex>,
}

impl<K> FlowDescriptor<K>
where
K: Eq + Hash,
{
/// Initializes a new flow descriptor with the provided ingestion config ID and flow name.
fn new(ingestion_config_id: impl Into<String>, name: impl Into<String>) -> Self {
Self {
ingestion_config_id: ingestion_config_id.into(),
name: name.into(),
field_types: Vec::new(),
index_map: HashMap::new(),
}
}

/// Gets the type of the channel with the given key.
pub fn get<Q>(&self, key: &Q) -> Option<ChannelDataType>
where
K: core::borrow::Borrow<Q>,
Q: Eq + Hash + ?Sized,
{
let index = self.index_map.get(key)?.0;
Some(self.field_types[index])
}

/// Gets the mapping of keys to channel indices.
pub fn mapping(&self) -> &HashMap<K, ChannelIndex> {
&self.index_map
}
}

/// Builds a [`FlowDescriptor`], which defines the schema of a flow.
///
/// The builder is mutable, to allow for the addition of channels to the flow descriptor
/// while the descriptor itself is immuatble, ensuring that the described flow will be
/// constructed correctly.
pub struct FlowDescriptorBuilder<K> {
flow_descriptor: FlowDescriptor<K>,
}

impl<K> FlowDescriptorBuilder<K>
where
K: Eq + Hash,
{
/// Initializes a new [`FlowDescriptorBuilder`] with the provided ingestion config ID and flow name.
pub fn new(ingestion_config_id: impl Into<String>, name: impl Into<String>) -> Self {
Self {
flow_descriptor: FlowDescriptor::new(ingestion_config_id, name),
}
}

/// Adds a new channel to the flow.
///
/// This returns the index of the channel in the flow. This index can then be used to
/// access the value at the given channel index when building a new flow.
pub fn add(&mut self, key: K, field_type: ChannelDataType) -> ChannelIndex {
let index = self.flow_descriptor.field_types.len();
self.flow_descriptor.field_types.push(field_type);

self.flow_descriptor
.index_map
.insert(key, ChannelIndex(index));

ChannelIndex(index)
}

/// Builds the [`FlowDescriptor`] from the builder.
pub fn build(self) -> FlowDescriptor<K> {
self.flow_descriptor
}
}

impl<S> TryFrom<(S, &'_ FlowConfig)> for FlowDescriptor<String>
where
S: ToString,
{
type Error = Error;

fn try_from((ingestion_config_id, flow_config): (S, &'_ FlowConfig)) -> Result<Self> {
let mut builder =
FlowDescriptorBuilder::new(ingestion_config_id.to_string(), flow_config.name.clone());
for channel in flow_config.channels.iter() {
let data_type = ChannelDataType::try_from(channel.data_type).map_err(|_| {
Error::new_msg(
ErrorKind::ArgumentValidationError,
format!(
"invalid data type {:?} for channel {}",
channel.data_type, channel.name
),
)
})?;

builder.add(channel.name.clone(), data_type);
}
Ok(builder.build())
}
}

impl<S> TryFrom<(S, FlowConfig)> for FlowDescriptor<String>
where
S: ToString,
{
type Error = Error;

fn try_from((ingestion_config_id, flow_config): (S, FlowConfig)) -> Result<Self> {
let mut builder =
FlowDescriptorBuilder::new(ingestion_config_id.to_string(), flow_config.name);
for channel in flow_config.channels {
let data_type = ChannelDataType::try_from(channel.data_type).map_err(|_| {
Error::new_msg(
ErrorKind::ArgumentValidationError,
format!(
"invalid data type {:?} for channel {}",
channel.data_type, channel.name
),
)
})?;

builder.add(channel.name, data_type);
}
Ok(builder.build())
}
}

/// Builder to assist in constructing a flow, utilizing the flow descriptor
/// to ensure that the flow is constructed correctly (i.e. value in the
/// correct order and the correct data type).
///
/// By using the builder and the flow descriptor, the channel names are not
/// necessary, which helps improve performance.
pub struct FlowBuilder<'a, K> {
/// The flow descriptor which defines the value schema of the flow.
flow_descriptor: &'a FlowDescriptor<K>,

/// The values of the flow, where the index of the value corresponds to
/// the index of the channel in the [`FlowDescriptor`].
values: Vec<IngestWithConfigDataChannelValue>,

/// The optional run ID of the flow.
run_id: String,
}

impl<K> FlowBuilder<'_, K> {
/// Builds an [IngestWithConfigDataStreamRequest], consuming the builder.
pub fn request(self, now: TimeValue) -> IngestWithConfigDataStreamRequest {
IngestWithConfigDataStreamRequest {
ingestion_config_id: self.flow_descriptor.ingestion_config_id.clone(),
flow: self.flow_descriptor.name.clone(),
timestamp: Some(now.0),
channel_values: self.values,
run_id: self.run_id,
..Default::default()
}
}
}

impl<'a, K> FlowBuilder<'a, K>
where
K: Eq + Hash,
{
/// Initializes a new flow builder with the provided flow descriptor.
pub fn new(flow_descriptor: &'a FlowDescriptor<K>) -> Self {
let values = vec![
IngestWithConfigDataChannelValue {
r#type: Some(Type::Empty(pbjson_types::Empty {}))
};
flow_descriptor.field_types.len()
];
Self {
flow_descriptor,
values,
run_id: String::new(),
}
}

/// Attaches a run ID to the flow.
pub fn attach_run_id(&mut self, run_id: impl Into<String>) {
Comment thread
nathan-sift marked this conversation as resolved.
self.run_id = run_id.into();
}

/// Sets the value of the channel with the given key.
pub fn set<V>(&mut self, index: ChannelIndex, value: V) -> Result<()>
where
V: Into<Value>,
{
let value = value.into();
let pb_data_type = value.pb_data_type();
let pb_value = value.pb_value();

// Since the [ChannelIndex] is only created by the [FlowDescriptor], we can safely
// assume that the index is valid and index directly into the `field_types` vector.
let expected_data_type = self.flow_descriptor.field_types[index.0];

// Validate that the value has the correct data type.
if expected_data_type != pb_data_type {
return Err(Error::new_msg(
ErrorKind::ArgumentValidationError,
format!(
"value has incorrect data type, expected {expected_data_type:?}, got {pb_data_type:?}"
),
));
}

// Update the value.
self.values[index.0].r#type = Some(pb_value);

Ok(())
}

/// Sets the value of the channel with the given key.
pub fn set_with_key<Q, V>(&mut self, key: &Q, value: V) -> Result<()>
where
K: core::borrow::Borrow<Q>,
Q: Eq + Hash + ?Sized,
V: Into<Value>,
{
// Get the index of the channel with the given key.
let Some(index) = self.flow_descriptor.index_map.get(key) else {
return Err(Error::new_msg(
ErrorKind::NotFoundError,
"provided key was not found in flow descriptor",
));
};

self.set(*index, value)
}
}

#[cfg(test)]
mod test;

Expand Down
Loading
Loading