Skip to content

Commit

Permalink
Splitter CRD
Browse files Browse the repository at this point in the history
Add new features enum and field for operator

The old enum cannot be extended as it would break old clients.
So we define a new extensible enum and a new field for it.
We make all of those fields private so that users of that struct
only access them via a getter that handles objects deserialized
from either old or new versions.

use copy target when splitting
  • Loading branch information
t4lz committed May 6, 2024
1 parent 9dfe77f commit b25cffa
Show file tree
Hide file tree
Showing 17 changed files with 1,306 additions and 157 deletions.
365 changes: 363 additions & 2 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion mirrord/cli/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ impl From<OperatorApiError> for CliError {
feature,
operator_version,
} => Self::FeatureNotSupportedInOperatorError {
feature,
feature: feature.to_string(),
operator_version,
},
OperatorApiError::CreateApiError(e) => Self::KubernetesApiFailed(e),
Expand Down
101 changes: 51 additions & 50 deletions mirrord/cli/src/operator/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use mirrord_operator::{
client::{session_api, OperatorApiError, OperatorOperation},
crd::{MirrordOperatorCrd, SessionCrd, OPERATOR_STATUS_NAME},
};
use mirrord_operator::crd::NewOperatorFeature;
use mirrord_progress::{Progress, ProgressTracker};

use super::get_status_api;
Expand Down Expand Up @@ -90,58 +91,58 @@ impl SessionCommandHandler {
.await
.map(|either| either.right()),
}
.map_err(|kube_fail| match kube_fail {
// The random `reason` we get when the operator returns from a "missing route".
kube::Error::Api(ErrorResponse { code, reason, .. })
.map_err(|kube_fail| match kube_fail {
// The random `reason` we get when the operator returns from a "missing route".
kube::Error::Api(ErrorResponse { code, reason, .. })
if code == 404 && reason.contains("parse") =>
{
OperatorApiError::UnsupportedFeature {
feature: "session management".to_string(),
operator_version,
{
OperatorApiError::UnsupportedFeature {
feature: NewOperatorFeature::SessionManagement,
operator_version,
}
}
// Something actually went wrong.
other => OperatorApiError::KubeError {
error: other,
operation: OperatorOperation::SessionManagement,
},
})
// Finish the progress report here if we have an error response.
.inspect_err(|fail| {
sub_progress.failure(Some(&fail.to_string()));
progress.failure(Some("Session management operation failed!"));
})?
// The kube api interaction was successful, but we might still fail the operation
// itself, so let's check the `Status` and report.
.map(|status| {
if status.is_failure() {
sub_progress.failure(Some(&format!(
"`{command}` failed due to `{}` with code `{}`!",
status.message, status.code
)));
progress.failure(Some("Session operation failed!"));

Err(OperatorApiError::StatusFailure {
operation: command.to_string(),
status: Box::new(status),
})
} else {
sub_progress.success(Some(&format!(
"`{command}` finished successfully with `{}` with code `{}`.",
status.message, status.code
)));
progress.success(Some("Session operation is completed."));

Ok(())
}
}
// Something actually went wrong.
other => OperatorApiError::KubeError {
error: other,
operation: OperatorOperation::SessionManagement,
},
})
// Finish the progress report here if we have an error response.
.inspect_err(|fail| {
sub_progress.failure(Some(&fail.to_string()));
progress.failure(Some("Session management operation failed!"));
})?
// The kube api interaction was successful, but we might still fail the operation
// itself, so let's check the `Status` and report.
.map(|status| {
if status.is_failure() {
sub_progress.failure(Some(&format!(
"`{command}` failed due to `{}` with code `{}`!",
status.message, status.code
)));
progress.failure(Some("Session operation failed!"));

Err(OperatorApiError::StatusFailure {
operation: command.to_string(),
status: Box::new(status),
})
} else {
sub_progress.success(Some(&format!(
"`{command}` finished successfully with `{}` with code `{}`.",
status.message, status.code
)));
progress.success(Some("Session operation is completed."));

Ok(())
}
})
.transpose()?
// We might've gotten a `SessionCrd` instead of a `Status` (we have a `Left(T)`),
// meaning that the operation has started, but it might not be finished yet.
.unwrap_or_else(|| {
sub_progress.success(Some(&format!("No issues found when executing `{command}`, but the operation status could not be determined at this time.")));
progress.success(Some(&format!("`{command}` is done, but the operation might be pending.")));
});
})
.transpose()?
// We might've gotten a `SessionCrd` instead of a `Status` (we have a `Left(T)`),
// meaning that the operation has started, but it might not be finished yet.
.unwrap_or_else(|| {
sub_progress.success(Some(&format!("No issues found when executing `{command}`, but the operation status could not be determined at this time.")));
progress.success(Some(&format!("`{command}` is done, but the operation might be pending.")));
});

Ok(())
}
Expand Down
10 changes: 10 additions & 0 deletions mirrord/config/src/feature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ use mirrord_config_derive::MirrordConfig;
use schemars::JsonSchema;

use self::{copy_target::CopyTargetConfig, env::EnvConfig, fs::FsConfig, network::NetworkConfig};
use crate::feature::split_queues::SplitQueuesConfig;

pub mod copy_target;
pub mod env;
pub mod fs;
pub mod network;
pub mod split_queues;

/// Controls mirrord features.
///
Expand Down Expand Up @@ -87,6 +89,13 @@ pub struct FeatureConfig {
/// (`targetless` mode).
#[config(nested)]
pub copy_target: CopyTargetConfig,

/// ## feature.split_queues {#feature-split_queues}
///
/// Define filters to split queues by, and make your local application consume only messages
/// that match those filters.
#[config(nested)]
pub split_queues: SplitQueuesConfig,
}

impl CollectAnalytics for &FeatureConfig {
Expand All @@ -95,5 +104,6 @@ impl CollectAnalytics for &FeatureConfig {
analytics.add("fs", &self.fs);
analytics.add("network", &self.network);
analytics.add("copy_target", &self.copy_target);
analytics.add("split_queues", &self.split_queues);
}
}
94 changes: 94 additions & 0 deletions mirrord/config/src/feature/split_queues.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use std::collections::HashMap;

use mirrord_analytics::{Analytics, CollectAnalytics};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

use crate::config::{ConfigContext, FromMirrordConfig, MirrordConfig};

pub type QueueId = String;

/// ```json
/// {
/// "feature": {
/// "split_queues": {
/// "first-queue": {
/// "queue_type": "SQS",
/// "message_filter": {
/// "wows": "so wows",
/// "coolz": "^very .*"
/// }
/// },
/// "second-queue": {
/// "queue_type": "SomeFutureQueueType",
/// "message_filter": {
/// "wows": "so wows",
/// "coolz": "^very .*"
/// }
/// },
/// }
/// }
/// }
/// ```
#[derive(Clone, Debug, Eq, PartialEq, JsonSchema, Deserialize, Default)]
pub struct SplitQueuesConfig(Option<HashMap<QueueId, QueueFilter>>);

impl SplitQueuesConfig {
pub fn is_set(&self) -> bool {
self.0.is_some()
}

/// Out of the whole queue splitting config, get only the sqs queues.
pub fn get_sqs_filter(&self) -> Option<HashMap<String, HashMap<String, String>>> {
self.0.as_ref().map(|queue_id2queue_filter| {
queue_id2queue_filter
.iter()
.filter_map(|(queue_id, queue_filter)| match queue_filter {
QueueFilter::Sqs(filter_mapping) => {
Some((queue_id.clone(), filter_mapping.clone()))
}
})
.collect()
})
}
}

impl MirrordConfig for SplitQueuesConfig {
type Generated = Self;

fn generate_config(
self,
_context: &mut ConfigContext,
) -> crate::config::Result<Self::Generated> {
Ok(self)
}
}

impl FromMirrordConfig for SplitQueuesConfig {
type Generator = Self;
}

pub type MessageAttributeName = String;
pub type AttributeValuePattern = String;

pub type SqsMessageFilter = HashMap<MessageAttributeName, AttributeValuePattern>;

/// More queue types might be added in the future.
#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, JsonSchema)]
#[serde(tag = "queue_type", content = "message_filter")]
pub enum QueueFilter {
#[serde(rename = "SQS")]
Sqs(SqsMessageFilter),
}

impl CollectAnalytics for &SplitQueuesConfig {
fn collect_analytics(&self, analytics: &mut Analytics) {
analytics.add(
"queue_count",
self.0
.as_ref()
.map(|mapping| mapping.len())
.unwrap_or_default(),
)
}
}
1 change: 1 addition & 0 deletions mirrord/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,7 @@ mod tests {
})),
})),
copy_target: None,
split_queues: None,
}),
connect_tcp: None,
operator: None,
Expand Down
10 changes: 10 additions & 0 deletions mirrord/config/src/target.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,16 @@ impl Target {
}
}
}

/// Get the target type - "pod", "deployment", "rollout" or "targetless"
pub fn get_target_type(&self) -> &str {
match self {
Target::Targetless => "targetless",
Target::Pod(pod) => pod.target_type(),
Target::Deployment(dep) => dep.target_type(),
Target::Rollout(roll) => roll.target_type(),
}
}
}

trait TargetDisplay {
Expand Down
4 changes: 2 additions & 2 deletions mirrord/kube/src/api/kubernetes/rollout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use k8s_openapi::{
apimachinery::pkg::apis::meta::v1::ObjectMeta, ListableResource, Metadata,
NamespaceResourceScope, Resource,
};
use serde::Deserialize;
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Rollout {
metadata: ObjectMeta,
pub spec: serde_json::Value,
Expand Down
40 changes: 20 additions & 20 deletions mirrord/operator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,29 @@ publish.workspace = true
edition.workspace = true

[features]
default = []
default = ["crd"]
license-fetch = ["dep:reqwest"]
client = [
"crd",
"dep:base64",
"dep:bincode",
"dep:http",
"dep:futures",
"dep:mirrord-analytics",
"dep:mirrord-auth",
"dep:mirrord-kube",
"dep:mirrord-progress",
"dep:mirrord-protocol",
"dep:rand",
"dep:tokio-tungstenite",
"dep:tracing",
"crd",
"dep:base64",
"dep:bincode",
"dep:http",
"dep:futures",
"dep:mirrord-analytics",
"dep:mirrord-auth",
"dep:mirrord-kube",
"dep:mirrord-progress",
"dep:mirrord-protocol",
"dep:rand",
"dep:tokio-tungstenite",
"dep:tracing",
]
crd = [
"dep:k8s-openapi",
"dep:kube",
"dep:mirrord-config",
"dep:tokio",
"dep:serde_json"
"dep:k8s-openapi",
"dep:kube",
"dep:mirrord-config",
"dep:tokio",
"dep:serde_json"
]
setup = ["crd", "dep:serde_yaml"]

Expand All @@ -52,7 +52,7 @@ mirrord-protocol = { path = "../protocol", optional = true }
async-trait = "0.1"
actix-codec = { workspace = true, optional = true }
base64 = { version = "0.21", optional = true }
bincode = { version = "2.0.0-rc.2", features = ["serde"], optional = true }
bincode = { version = "2.0.0-rc.2", features = ["serde"], optional = true }
bytes = { workspace = true, optional = true }
chrono = { version = "0.4", features = ["clock", "serde"] }
http = { version = "0.2", optional = true }
Expand Down

0 comments on commit b25cffa

Please sign in to comment.