Skip to content

Commit

Permalink
active filters in CRD
Browse files Browse the repository at this point in the history
  • Loading branch information
t4lz committed Feb 20, 2024
1 parent af2b48b commit a876c4c
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 7 deletions.
2 changes: 1 addition & 1 deletion mirrord/config/src/feature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub mod copy_target;
pub mod env;
pub mod fs;
pub mod network;
mod split_queues;
pub mod split_queues;

/// Controls mirrord features.
///
Expand Down
11 changes: 9 additions & 2 deletions mirrord/config/src/feature/split_queues.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use serde::{Deserialize, Serialize};

use crate::config::{ConfigContext, FromMirrordConfig, MirrordConfig};

pub type QueueId = String;

/// ```json
/// {
/// "feature": {
Expand All @@ -29,7 +31,7 @@ use crate::config::{ConfigContext, FromMirrordConfig, MirrordConfig};
/// }
/// ```
#[derive(Clone, Debug, Eq, PartialEq, JsonSchema, Deserialize, Default)]
pub struct SplitQueuesConfig(Option<HashMap<String, QueueFilter>>);
pub struct SplitQueuesConfig(Option<HashMap<QueueId, QueueFilter>>);

impl SplitQueuesConfig {
pub fn is_set(&self) -> bool {
Expand Down Expand Up @@ -66,12 +68,17 @@ impl FromMirrordConfig for SplitQueuesConfig {
type Generator = Self;
}

pub type MessageAttributeName = String;
pub type AttributeValuePattern = String;

pub type SqsMessageFilter = HashMap<MessageAttributeName, AttributeValuePattern>;

/// More queue types might be added in the future.
#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, JsonSchema)]
#[serde(tag = "queue_type", content = "message_filter")]
enum QueueFilter {
#[serde(rename = "SQS")]
Sqs(HashMap<String, String>),
Sqs(SqsMessageFilter),
}

impl CollectAnalytics for &SplitQueuesConfig {
Expand Down
22 changes: 18 additions & 4 deletions mirrord/operator/src/crd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ use std::{

use chrono::NaiveDate;
use kube::CustomResource;
use mirrord_config::target::{Target, TargetConfig};
pub use mirrord_config::feature::split_queues::QueueId;
use mirrord_config::{
feature::split_queues::SqsMessageFilter,
target::{Target, TargetConfig},
};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -350,13 +354,25 @@ pub enum QueueNameSource {
EnvVar(String),
}

pub type OutputQueueName = String;

/// The details of a queue that should be split.
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize, JsonSchema)]
#[serde(tag = "queueType")]
pub enum SplitQueue {
/// Amazon SQS
#[serde(rename = "SQS")]
Sqs(QueueNameSource),
#[serde(rename_all = "camelCase")] // queue_name_source -> queueNameSource in yaml.
Sqs {
/// Where the application gets the queue name from. Will be used to read messages from that
/// queue and distribute them to the output queues. When running with mirrord and splitting
/// this queue, applications will get a modified name from that source.
queue_name_source: QueueNameSource,

/// Active filters registered for this queue, by output queue name.
#[schemars(skip_deserializing)] // Prevent from users to include it in yaml file.
active_filters: Option<HashMap<OutputQueueName, SqsMessageFilter>>,
},
}

#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize, JsonSchema)]
Expand All @@ -372,8 +388,6 @@ pub struct QueueSplitterStatus {
pub active: bool,
}

pub type QueueId = String;

/// Defines a Custom Resource that holds a central configuration for splitting a queue. mirrord
/// users specify a splitter by name in their configuration. mirrord then starts splitting according
/// to the spec and the user's filter.
Expand Down

0 comments on commit a876c4c

Please sign in to comment.