From 06450c8ffa25766f22225b06dcce16b3d6b9f1d7 Mon Sep 17 00:00:00 2001 From: Alexander Galibey <48586936+galibey@users.noreply.github.com> Date: Thu, 11 Apr 2024 14:06:41 +0700 Subject: [PATCH] feat: disallow deletion system topic unless forced (#3942) --- crates/fluvio-cli/src/client/topic/delete.rs | 72 ++++++++++++++++--- crates/fluvio-protocol/src/link/error_code.rs | 5 ++ crates/fluvio-sc-schema/src/objects/delete.rs | 12 +++- .../src/services/public_api/delete.rs | 3 +- .../src/services/public_api/topic/delete.rs | 17 +++-- crates/fluvio/src/admin.rs | 32 +++++++++ .../system-topic-basic.bats | 66 +++++++++++++++++ 7 files changed, 193 insertions(+), 14 deletions(-) create mode 100644 tests/cli/fluvio_smoke_tests/system-topic-basic.bats diff --git a/crates/fluvio-cli/src/client/topic/delete.rs b/crates/fluvio-cli/src/client/topic/delete.rs index 037b0c4010..501d9fae3a 100644 --- a/crates/fluvio-cli/src/client/topic/delete.rs +++ b/crates/fluvio-cli/src/client/topic/delete.rs @@ -4,6 +4,10 @@ //! CLI tree to generate Delete Topics //! +use std::io::Read; + +use fluvio_protocol::link::ErrorCode; +use fluvio_sc_schema::ApiError; use tracing::debug; use clap::Parser; use anyhow::Result; @@ -21,6 +25,12 @@ pub struct DeleteTopicOpt { /// One or more name(s) of the topic(s) to be deleted #[arg(value_name = "name", required = true)] names: Vec, + /// Delete system topic(s) + #[arg(short, long, required = false)] + system: bool, + /// Skip deletion confirmation + #[arg(short, long, required = false)] + force: bool, } impl DeleteTopicOpt { @@ -29,15 +39,36 @@ impl DeleteTopicOpt { let mut err_happened = false; for name in self.names.iter() { debug!(name, "deleting topic"); - if let Err(error) = admin.delete::(name).await { - err_happened = true; - if self.continue_on_error { - println!("topic \"{name}\" delete failed with: {error}"); - } else { - return Err(error); + match admin.delete::(name).await { + Err(error) if self.system && is_system_spec_error(&error) => { + if self.force || user_confirms(name) { + if let Err(err) = admin.force_delete::(name).await { + err_happened = true; + if self.continue_on_error { + println!("system topic \"{name}\" delete failed with: {err}"); + } else { + return Err(error); + } + } else { + println!("system topic \"{name}\" deleted"); + } + } else { + println!("Aborted"); + break; + } + } + Err(error) => { + err_happened = true; + if self.continue_on_error { + println!("topic \"{name}\" delete failed with: {error}"); + } else { + return Err(error); + } + } + + Ok(_) => { + println!("topic \"{name}\" deleted"); } - } else { - println!("topic \"{name}\" deleted"); } } if err_happened { @@ -50,3 +81,28 @@ impl DeleteTopicOpt { } } } + +fn is_system_spec_error(error: &anyhow::Error) -> bool { + matches!( + error.root_cause().downcast_ref::(), + Some(ApiError::Code( + ErrorCode::SystemSpecDeletionAttempt { + kind: _kind, + name: _name, + }, + None + )) + ) +} + +fn user_confirms(name: &str) -> bool { + println!("You are trying to delete a system topic '{name}'. It can affect the functioning of the cluster. + \nAre you sure you want to proceed? (y/n)"); + char::from( + std::io::stdin() + .bytes() + .next() + .and_then(|b| b.ok()) + .unwrap_or_default(), + ) == 'y' +} diff --git a/crates/fluvio-protocol/src/link/error_code.rs b/crates/fluvio-protocol/src/link/error_code.rs index f967585d4c..7e45e7c8f5 100644 --- a/crates/fluvio-protocol/src/link/error_code.rs +++ b/crates/fluvio-protocol/src/link/error_code.rs @@ -215,6 +215,11 @@ pub enum ErrorCode { #[fluvio(tag = 11002)] #[error("the remote already exists")] RemoteAlreadyExists, + + // Specs + #[fluvio(tag = 12001)] + #[error("system {kind} '{name}' can only be deleted forcibly")] + SystemSpecDeletionAttempt { kind: String, name: String }, } impl ErrorCode { diff --git a/crates/fluvio-sc-schema/src/objects/delete.rs b/crates/fluvio-sc-schema/src/objects/delete.rs index a12f11a84c..10d79c023d 100644 --- a/crates/fluvio-sc-schema/src/objects/delete.rs +++ b/crates/fluvio-sc-schema/src/objects/delete.rs @@ -19,6 +19,8 @@ use super::{COMMON_VERSION, TypeBuffer}; #[derive(Debug, Default, Encoder, Decoder)] pub struct DeleteRequest { key: S::DeleteKey, + #[fluvio(min_version = 13)] + force: bool, } impl DeleteRequest @@ -26,12 +28,20 @@ where S: DeletableAdminSpec, { pub fn new(key: S::DeleteKey) -> Self { - Self { key } + Self { key, force: false } + } + + pub fn with(key: S::DeleteKey, force: bool) -> Self { + Self { key, force } } pub fn key(self) -> S::DeleteKey { self.key } + + pub fn is_force(&self) -> bool { + self.force + } } // This can be auto generated by enum derive later diff --git a/crates/fluvio-sc/src/services/public_api/delete.rs b/crates/fluvio-sc/src/services/public_api/delete.rs index a4e44f0bd6..e6a20e7e27 100644 --- a/crates/fluvio-sc/src/services/public_api/delete.rs +++ b/crates/fluvio-sc/src/services/public_api/delete.rs @@ -34,7 +34,8 @@ pub async fn handle_delete_request( debug!(?del_req, "del request"); let status = if let Some(req) = del_req.downcast()? as Option> { - super::topic::handle_delete_topic(req.key(), auth_ctx).await? + let force = req.is_force(); + super::topic::handle_delete_topic(req.key(), force, auth_ctx).await? } else if let Some(req) = del_req.downcast()? as Option> { super::spu::handle_un_register_custom_spu_request(req.key(), auth_ctx).await? } else if let Some(req) = del_req.downcast()? as Option> { diff --git a/crates/fluvio-sc/src/services/public_api/topic/delete.rs b/crates/fluvio-sc/src/services/public_api/topic/delete.rs index 6888a68b6c..2e699c28f7 100644 --- a/crates/fluvio-sc/src/services/public_api/topic/delete.rs +++ b/crates/fluvio-sc/src/services/public_api/topic/delete.rs @@ -4,7 +4,7 @@ //! Delete topic request handler. Lookup topic in local metadata, grab its K8 context //! and send K8 a delete message. //! -use fluvio_stream_model::core::MetadataItem; +use fluvio_stream_model::core::{MetadataItem, Spec}; use tracing::{info, trace, instrument}; use std::io::{Error, ErrorKind}; @@ -20,6 +20,7 @@ use crate::services::auth::AuthServiceContext; #[instrument(skip(topic_name, auth_ctx))] pub async fn handle_delete_topic( topic_name: String, + force: bool, auth_ctx: &AuthServiceContext, ) -> Result { info!(%topic_name, "Deleting topic"); @@ -41,15 +42,23 @@ pub async fn handle_delete_topic( return Err(Error::new(ErrorKind::Interrupted, "authorization io error")); } - let status = if auth_ctx + let status = if let Some(spec) = auth_ctx .global_ctx .topics() .store() .value(&topic_name) .await - .is_some() { - if let Err(err) = auth_ctx + if !force && spec.spec().is_system() { + Status::new( + topic_name.clone(), + ErrorCode::SystemSpecDeletionAttempt { + kind: TopicSpec::LABEL.to_lowercase(), + name: topic_name, + }, + None, + ) + } else if let Err(err) = auth_ctx .global_ctx .topics() .delete(topic_name.clone()) diff --git a/crates/fluvio/src/admin.rs b/crates/fluvio/src/admin.rs index 451ed6c68e..4a26c3c8a2 100644 --- a/crates/fluvio/src/admin.rs +++ b/crates/fluvio/src/admin.rs @@ -214,6 +214,38 @@ impl FluvioAdmin { Ok(()) } + /// Forcibly delete object by key + /// key is dependent on spec, most are string but some allow multiple types. + /// + /// This method allows to delete objects marked as 'system'. + /// + /// For example, to delete a system topic: + /// + /// ```edition2021 + /// use fluvio::Fluvio; + /// use fluvio::metadata::topic::TopicSpec; + /// + /// async fn delete_system_topic(name: String) -> anyhow::Result<()> { + /// let fluvio = Fluvio::connect().await?; + /// let admin = fluvio.admin().await; + /// admin.force_delete::(name).await?; + /// Ok(()) + /// } + /// ``` + #[instrument(skip(self, key))] + pub async fn force_delete(&self, key: impl Into) -> Result<()> + where + S: DeletableAdminSpec + Sync + Send, + { + let delete_request: DeleteRequest = DeleteRequest::with(key.into(), true); + debug!("sending force delete request: {:#?}", delete_request); + + self.send_receive_admin::(delete_request) + .await? + .as_result()?; + Ok(()) + } + /// return all instance of this spec #[instrument(skip(self))] pub async fn all(&self) -> Result>> diff --git a/tests/cli/fluvio_smoke_tests/system-topic-basic.bats b/tests/cli/fluvio_smoke_tests/system-topic-basic.bats new file mode 100644 index 0000000000..7d4dd3eef3 --- /dev/null +++ b/tests/cli/fluvio_smoke_tests/system-topic-basic.bats @@ -0,0 +1,66 @@ +#!/usr/bin/env bats + +TEST_HELPER_DIR="$BATS_TEST_DIRNAME/../test_helper" +export TEST_HELPER_DIR + +load "$TEST_HELPER_DIR"/tools_check.bash +load "$TEST_HELPER_DIR"/fluvio_dev.bash +load "$TEST_HELPER_DIR"/bats-support/load.bash +load "$TEST_HELPER_DIR"/bats-assert/load.bash + + +@test "System topic (Consumer Offsets) exist" { + if [ "$FLUVIO_CLI_RELEASE_CHANNEL" == "stable" ]; then + skip "don't run on fluvio cli stable version" + fi + if [ "$FLUVIO_CLUSTER_RELEASE_CHANNEL" == "stable" ]; then + skip "don't run on cluster stable version" + fi + end_time=$((SECONDS + 65)) + while [ $SECONDS -lt $end_time ]; do + SYSTEM_TOPIC_NAME="$($FLUVIO_BIN topic list --system -O json | jq '.[0].name' | tr -d '"')" + if [ -z "$SYSTEM_TOPIC_NAME" ]; then + debug_msg "$SYSTEM_TOPIC_NAME" + sleep 1 + else + debug_msg "System topic $SYSTEM_TOPIC_NAME found" + break + fi + done + assert [ ! -z "$SYSTEM_TOPIC_NAME" ] +} + +# System topic deletion - Negative test +@test "System topic deletion is not allowed by default" { + if [ "$FLUVIO_CLI_RELEASE_CHANNEL" == "stable" ]; then + skip "don't run on fluvio cli stable version" + fi + if [ "$FLUVIO_CLUSTER_RELEASE_CHANNEL" == "stable" ]; then + skip "don't run on cluster stable version" + fi + SYSTEM_TOPIC_NAME="$($FLUVIO_BIN topic list --system -O json | jq '.[0].name' | tr -d '"')" + run timeout 15s "$FLUVIO_BIN" topic delete "$SYSTEM_TOPIC_NAME" + + debug_msg "status: $status" + debug_msg "output: ${lines[0]}" + assert_failure + assert_output --partial "system topic '$SYSTEM_TOPIC_NAME' can only be deleted forcibly" +} + +# System topic deletion - Positive test +@test "System topic deletion is allowed if forced" { + if [ "$FLUVIO_CLI_RELEASE_CHANNEL" == "stable" ]; then + skip "don't run on fluvio cli stable version" + fi + if [ "$FLUVIO_CLUSTER_RELEASE_CHANNEL" == "stable" ]; then + skip "don't run on cluster stable version" + fi + SYSTEM_TOPIC_NAME="$($FLUVIO_BIN topic list --system -O json | jq '.[0].name' | tr -d '"')" + run timeout 15s "$FLUVIO_BIN" topic delete "$SYSTEM_TOPIC_NAME" --system --force + + debug_msg "status: $status" + debug_msg "output: ${lines[0]}" + assert_success + assert_output --partial "system topic \"$SYSTEM_TOPIC_NAME\" deleted" +} +