Skip to content
Merged

Fixes #122

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/badges/tests.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"schemaVersion": 1,
"label": "tests",
"message": "0/1 passed, 1 failed",
"message": "2203/2204 passed, 1 failed",
"color": "red"
}
}
14 changes: 8 additions & 6 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -184,20 +184,22 @@ jobs:
cat test_output.txt

# Parse the summary line from nextest output
# Expected format: "Summary [ 15.234s] 2345 tests run: 2344 passed, 1 failed, 0 skipped"
SUMMARY=$(grep -E "Summary \[.*\] [0-9]+ tests run:" test_output.txt || echo "")
# Expected format: " Summary [ 427.076s] 2204 tests run: 2203 passed, 1 failed, 90 skipped"
# Note: nextest adds leading spaces before "Summary"
SUMMARY=$(grep -E "^\s*Summary \[.*\] [0-9]+ tests run:" test_output.txt || echo "")

if [ -z "$SUMMARY" ]; then
echo "Error: Could not find nextest summary line"
cat test_output.txt | tail -20
PASSED=0
FAILED=1
TOTAL=1
else
echo "Found summary: $SUMMARY"
# Extract numbers using sed (more portable than grep -oP)
PASSED=$(echo "$SUMMARY" | sed -n 's/.*\([0-9]\+\) passed.*/\1/p')
FAILED=$(echo "$SUMMARY" | sed -n 's/.*\([0-9]\+\) failed.*/\1/p')
TOTAL=$(echo "$SUMMARY" | sed -n 's/.*\([0-9]\+\) tests run.*/\1/p')
# Extract numbers using grep with basic regex (portable)
TOTAL=$(echo "$SUMMARY" | grep -o "[0-9]\+ tests run" | grep -o "[0-9]\+")
PASSED=$(echo "$SUMMARY" | grep -o "[0-9]\+ passed" | grep -o "[0-9]\+")
FAILED=$(echo "$SUMMARY" | grep -o "[0-9]\+ failed" | grep -o "[0-9]\+")

# Default to 0 if extraction failed
PASSED=${PASSED:-0}
Expand Down
4 changes: 4 additions & 0 deletions Notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -1290,6 +1290,10 @@ after that we can add a button in ui for checking storage health

154) Add test which check all commands which belong to admin/dba only with normal user and make sure they all fail with proper error message, also for service consumer commands as well so that whenever we change anything we always make sure these commands fail for normal users

155) Add a table which i can view all topic messages

156) Make sure the topic eviction works also using clear topic




Expand Down
11 changes: 9 additions & 2 deletions backend/crates/kalamdb-core/src/jobs/executors/topic_cleanup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,22 @@ impl JobExecutor for TopicCleanupExecutor {
}

async fn execute(&self, ctx: &JobContext<Self::Params>) -> Result<JobDecision, KalamDbError> {
ctx.log_info("Starting topic cleanup operation");
// No local work needed for topic cleanup
Ok(JobDecision::Completed {
message: Some("Topic cleanup has no local work".to_string()),
})
}

async fn execute_leader(&self, ctx: &JobContext<Self::Params>) -> Result<JobDecision, KalamDbError> {
ctx.log_info("Starting topic cleanup operation (leader)");

// Parameters already validated in JobContext
let params = ctx.params();
let topic_id = &params.topic_id;
let topic_name = &params.topic_name;

ctx.log_info(&format!(
"Cleaning up dropped topic '{}' ({})",
"Cleaning up topic '{}' ({})",
topic_name, topic_id
));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,13 @@ impl JobType {
pub fn has_leader_actions(&self) -> bool {
matches!(
self,
JobType::Flush | // Parquet upload + manifest update
JobType::Cleanup | // Delete external Parquet + metadata
JobType::Backup | // External storage upload
JobType::Restore | // External storage download
JobType::JobCleanup | // Raft-replicated job table cleanup
JobType::UserCleanup // Cascade via Raft
JobType::Flush | // Parquet upload + manifest update
JobType::Cleanup | // Delete external Parquet + metadata
JobType::Backup | // External storage upload
JobType::Restore | // External storage download
JobType::JobCleanup | // Raft-replicated job table cleanup
JobType::UserCleanup | // Cascade via Raft
JobType::TopicCleanup // Delete topic messages + offsets
)
}

Expand Down
32 changes: 7 additions & 25 deletions cli/src/session/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use crate::error::{CLIError, Result};
use crate::parser::Command;
use colored::Colorize;
use kalam_link::SubscriptionConfig;
use std::sync::Arc;
use std::time::Instant;

impl CLISession {
Expand Down Expand Up @@ -410,23 +409,11 @@ impl CLISession {
let start_time = Instant::now();
let mut total_consumed = 0_usize;
let mut last_offset = 0_u64;
let stop_flag = Arc::new(std::sync::atomic::AtomicBool::new(false));
let stop_flag_clone = stop_flag.clone();
let mut error_count = 0;

// Install Ctrl+C handler
tokio::spawn(async move {
signal::ctrl_c().await.ok();
stop_flag_clone.store(true, std::sync::atomic::Ordering::Relaxed);
});

// Poll loop
'consume_loop: loop {
// Check stop flag
if stop_flag.load(std::sync::atomic::Ordering::Relaxed) {
break;
}

// Poll loop - library handles long polling (30s default)
// Ctrl+C will interrupt the process naturally
loop {
// Check timeout
if let Some(timeout_seconds) = timeout {
if start_time.elapsed().as_secs() >= timeout_seconds {
Expand All @@ -443,8 +430,8 @@ impl CLISession {
}
}

// Poll with short timeout to remain responsive
let records = match consumer.poll_with_timeout(Duration::from_secs(2)).await {
// Poll with long polling (30s) - library handles the HTTP request timeout
let records = match consumer.poll().await {
Ok(records) => {
error_count = 0; // Reset error count on success
records
Expand Down Expand Up @@ -483,7 +470,7 @@ impl CLISession {
"{}",
"❌ Too many consecutive errors. Exiting.".red().bold()
);
break 'consume_loop;
break;
}

sleep(Duration::from_secs(1)).await;
Expand Down Expand Up @@ -523,14 +510,9 @@ impl CLISession {
// Check limit after each record
if let Some(limit_val) = limit {
if total_consumed >= limit_val {
break 'consume_loop;
break;
}
}

// Check stop flag
if stop_flag.load(std::sync::atomic::Ordering::Relaxed) {
break 'consume_loop;
}
}
}

Expand Down