Skip to content

Commit

Permalink
feat: disallow deletion system topic unless forced (#3942)
Browse files Browse the repository at this point in the history
  • Loading branch information
galibey committed Apr 11, 2024
1 parent 03b766a commit 06450c8
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 14 deletions.
72 changes: 64 additions & 8 deletions crates/fluvio-cli/src/client/topic/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
//! CLI tree to generate Delete Topics
//!

use std::io::Read;

use fluvio_protocol::link::ErrorCode;
use fluvio_sc_schema::ApiError;
use tracing::debug;
use clap::Parser;
use anyhow::Result;
Expand All @@ -21,6 +25,12 @@ pub struct DeleteTopicOpt {
/// One or more name(s) of the topic(s) to be deleted
#[arg(value_name = "name", required = true)]
names: Vec<String>,
/// Delete system topic(s)
#[arg(short, long, required = false)]
system: bool,
/// Skip deletion confirmation
#[arg(short, long, required = false)]
force: bool,
}

impl DeleteTopicOpt {
Expand All @@ -29,15 +39,36 @@ impl DeleteTopicOpt {
let mut err_happened = false;
for name in self.names.iter() {
debug!(name, "deleting topic");
if let Err(error) = admin.delete::<TopicSpec>(name).await {
err_happened = true;
if self.continue_on_error {
println!("topic \"{name}\" delete failed with: {error}");
} else {
return Err(error);
match admin.delete::<TopicSpec>(name).await {
Err(error) if self.system && is_system_spec_error(&error) => {
if self.force || user_confirms(name) {
if let Err(err) = admin.force_delete::<TopicSpec>(name).await {
err_happened = true;
if self.continue_on_error {
println!("system topic \"{name}\" delete failed with: {err}");
} else {
return Err(error);
}
} else {
println!("system topic \"{name}\" deleted");
}
} else {
println!("Aborted");
break;
}
}
Err(error) => {
err_happened = true;
if self.continue_on_error {
println!("topic \"{name}\" delete failed with: {error}");
} else {
return Err(error);
}
}

Ok(_) => {
println!("topic \"{name}\" deleted");
}
} else {
println!("topic \"{name}\" deleted");
}
}
if err_happened {
Expand All @@ -50,3 +81,28 @@ impl DeleteTopicOpt {
}
}
}

fn is_system_spec_error(error: &anyhow::Error) -> bool {
matches!(
error.root_cause().downcast_ref::<ApiError>(),
Some(ApiError::Code(
ErrorCode::SystemSpecDeletionAttempt {
kind: _kind,
name: _name,
},
None
))
)
}

fn user_confirms(name: &str) -> bool {
println!("You are trying to delete a system topic '{name}'. It can affect the functioning of the cluster.
\nAre you sure you want to proceed? (y/n)");
char::from(
std::io::stdin()
.bytes()
.next()
.and_then(|b| b.ok())
.unwrap_or_default(),
) == 'y'
}
5 changes: 5 additions & 0 deletions crates/fluvio-protocol/src/link/error_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,11 @@ pub enum ErrorCode {
#[fluvio(tag = 11002)]
#[error("the remote already exists")]
RemoteAlreadyExists,

// Specs
#[fluvio(tag = 12001)]
#[error("system {kind} '{name}' can only be deleted forcibly")]
SystemSpecDeletionAttempt { kind: String, name: String },
}

impl ErrorCode {
Expand Down
12 changes: 11 additions & 1 deletion crates/fluvio-sc-schema/src/objects/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,29 @@ use super::{COMMON_VERSION, TypeBuffer};
#[derive(Debug, Default, Encoder, Decoder)]
pub struct DeleteRequest<S: DeletableAdminSpec> {
key: S::DeleteKey,
#[fluvio(min_version = 13)]
force: bool,
}

impl<S> DeleteRequest<S>
where
S: DeletableAdminSpec,
{
pub fn new(key: S::DeleteKey) -> Self {
Self { key }
Self { key, force: false }
}

pub fn with(key: S::DeleteKey, force: bool) -> Self {
Self { key, force }
}

pub fn key(self) -> S::DeleteKey {
self.key
}

pub fn is_force(&self) -> bool {
self.force
}
}

// This can be auto generated by enum derive later
Expand Down
3 changes: 2 additions & 1 deletion crates/fluvio-sc/src/services/public_api/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ pub async fn handle_delete_request<AC: AuthContext, C: MetadataItem>(
debug!(?del_req, "del request");

let status = if let Some(req) = del_req.downcast()? as Option<DeleteRequest<TopicSpec>> {
super::topic::handle_delete_topic(req.key(), auth_ctx).await?
let force = req.is_force();
super::topic::handle_delete_topic(req.key(), force, auth_ctx).await?
} else if let Some(req) = del_req.downcast()? as Option<DeleteRequest<CustomSpuSpec>> {
super::spu::handle_un_register_custom_spu_request(req.key(), auth_ctx).await?
} else if let Some(req) = del_req.downcast()? as Option<DeleteRequest<SpuGroupSpec>> {
Expand Down
17 changes: 13 additions & 4 deletions crates/fluvio-sc/src/services/public_api/topic/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! Delete topic request handler. Lookup topic in local metadata, grab its K8 context
//! and send K8 a delete message.
//!
use fluvio_stream_model::core::MetadataItem;
use fluvio_stream_model::core::{MetadataItem, Spec};
use tracing::{info, trace, instrument};
use std::io::{Error, ErrorKind};

Expand All @@ -20,6 +20,7 @@ use crate::services::auth::AuthServiceContext;
#[instrument(skip(topic_name, auth_ctx))]
pub async fn handle_delete_topic<AC: AuthContext, C: MetadataItem>(
topic_name: String,
force: bool,
auth_ctx: &AuthServiceContext<AC, C>,
) -> Result<Status, Error> {
info!(%topic_name, "Deleting topic");
Expand All @@ -41,15 +42,23 @@ pub async fn handle_delete_topic<AC: AuthContext, C: MetadataItem>(
return Err(Error::new(ErrorKind::Interrupted, "authorization io error"));
}

let status = if auth_ctx
let status = if let Some(spec) = auth_ctx
.global_ctx
.topics()
.store()
.value(&topic_name)
.await
.is_some()
{
if let Err(err) = auth_ctx
if !force && spec.spec().is_system() {
Status::new(
topic_name.clone(),
ErrorCode::SystemSpecDeletionAttempt {
kind: TopicSpec::LABEL.to_lowercase(),
name: topic_name,
},
None,
)
} else if let Err(err) = auth_ctx
.global_ctx
.topics()
.delete(topic_name.clone())
Expand Down
32 changes: 32 additions & 0 deletions crates/fluvio/src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,38 @@ impl FluvioAdmin {
Ok(())
}

/// Forcibly delete object by key
/// key is dependent on spec, most are string but some allow multiple types.
///
/// This method allows to delete objects marked as 'system'.
///
/// For example, to delete a system topic:
///
/// ```edition2021
/// use fluvio::Fluvio;
/// use fluvio::metadata::topic::TopicSpec;
///
/// async fn delete_system_topic(name: String) -> anyhow::Result<()> {
/// let fluvio = Fluvio::connect().await?;
/// let admin = fluvio.admin().await;
/// admin.force_delete::<TopicSpec>(name).await?;
/// Ok(())
/// }
/// ```
#[instrument(skip(self, key))]
pub async fn force_delete<S>(&self, key: impl Into<S::DeleteKey>) -> Result<()>
where
S: DeletableAdminSpec + Sync + Send,
{
let delete_request: DeleteRequest<S> = DeleteRequest::with(key.into(), true);
debug!("sending force delete request: {:#?}", delete_request);

self.send_receive_admin::<ObjectApiDeleteRequest, _>(delete_request)
.await?
.as_result()?;
Ok(())
}

/// return all instance of this spec
#[instrument(skip(self))]
pub async fn all<S>(&self) -> Result<Vec<Metadata<S>>>
Expand Down
66 changes: 66 additions & 0 deletions tests/cli/fluvio_smoke_tests/system-topic-basic.bats
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#!/usr/bin/env bats

TEST_HELPER_DIR="$BATS_TEST_DIRNAME/../test_helper"
export TEST_HELPER_DIR

load "$TEST_HELPER_DIR"/tools_check.bash
load "$TEST_HELPER_DIR"/fluvio_dev.bash
load "$TEST_HELPER_DIR"/bats-support/load.bash
load "$TEST_HELPER_DIR"/bats-assert/load.bash


@test "System topic (Consumer Offsets) exist" {
if [ "$FLUVIO_CLI_RELEASE_CHANNEL" == "stable" ]; then
skip "don't run on fluvio cli stable version"
fi
if [ "$FLUVIO_CLUSTER_RELEASE_CHANNEL" == "stable" ]; then
skip "don't run on cluster stable version"
fi
end_time=$((SECONDS + 65))
while [ $SECONDS -lt $end_time ]; do
SYSTEM_TOPIC_NAME="$($FLUVIO_BIN topic list --system -O json | jq '.[0].name' | tr -d '"')"
if [ -z "$SYSTEM_TOPIC_NAME" ]; then
debug_msg "$SYSTEM_TOPIC_NAME"
sleep 1
else
debug_msg "System topic $SYSTEM_TOPIC_NAME found"
break
fi
done
assert [ ! -z "$SYSTEM_TOPIC_NAME" ]
}

# System topic deletion - Negative test
@test "System topic deletion is not allowed by default" {
if [ "$FLUVIO_CLI_RELEASE_CHANNEL" == "stable" ]; then
skip "don't run on fluvio cli stable version"
fi
if [ "$FLUVIO_CLUSTER_RELEASE_CHANNEL" == "stable" ]; then
skip "don't run on cluster stable version"
fi
SYSTEM_TOPIC_NAME="$($FLUVIO_BIN topic list --system -O json | jq '.[0].name' | tr -d '"')"
run timeout 15s "$FLUVIO_BIN" topic delete "$SYSTEM_TOPIC_NAME"

debug_msg "status: $status"
debug_msg "output: ${lines[0]}"
assert_failure
assert_output --partial "system topic '$SYSTEM_TOPIC_NAME' can only be deleted forcibly"
}

# System topic deletion - Positive test
@test "System topic deletion is allowed if forced" {
if [ "$FLUVIO_CLI_RELEASE_CHANNEL" == "stable" ]; then
skip "don't run on fluvio cli stable version"
fi
if [ "$FLUVIO_CLUSTER_RELEASE_CHANNEL" == "stable" ]; then
skip "don't run on cluster stable version"
fi
SYSTEM_TOPIC_NAME="$($FLUVIO_BIN topic list --system -O json | jq '.[0].name' | tr -d '"')"
run timeout 15s "$FLUVIO_BIN" topic delete "$SYSTEM_TOPIC_NAME" --system --force

debug_msg "status: $status"
debug_msg "output: ${lines[0]}"
assert_success
assert_output --partial "system topic \"$SYSTEM_TOPIC_NAME\" deleted"
}

0 comments on commit 06450c8

Please sign in to comment.