Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ backend/crates/
## Testing (MUST)

- Use `cargo nextest run` for all test executions unless explicitly told otherwise.
- For CLI e2e tests: run `cargo nextest run --features e2e-tests` **without** `--no-fail-fast`, capture output to a file, then fix failures one-by-one by running only the failing test(s). Re-run the full suite after fixes.
- For e2e test runs, do NOT pass `--no-fail-fast`. Run normally, fix the first failure, re-run until it passes, then move to the next failing issue.
- Always add `#[ntest::timeout(time)]` to every async test where `time` is the **actual observed runtime** × 1.5 (to cover slower machines).
- Example: if a test took 40s, set `#[ntest::timeout(60000)]`.
Expand Down
4 changes: 2 additions & 2 deletions backend/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ fn build_ui_if_release(repo_root: &Path) {
// Run npm run build
// IMPORTANT: use `.status()` (stream output) instead of `.output()`.
// Capturing large stdout/stderr can make builds appear "stuck".
println!("cargo:warning=Running UI build (npm run build) — this can take a few minutes...");
eprintln!("Running UI build (npm run build) — this can take a few minutes...");
let build_status = if cfg!(target_os = "windows") {
let mut cmd = Command::new("cmd");
cmd.args(["/C", "npm", "run", "build"])
Expand All @@ -219,7 +219,7 @@ fn build_ui_if_release(repo_root: &Path) {

match build_status {
Ok(status) if status.success() => {
println!("cargo:warning=UI build completed successfully");
eprintln!("UI build completed successfully");
},
Ok(status) => {
panic!("UI build failed with status: {}\n\nUI is required for release builds!", status);
Expand Down
2 changes: 1 addition & 1 deletion backend/crates/kalamdb-api/src/handlers/files/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ pub async fn download_file(
&table_id,
user_id.as_ref(),
&relative_path,
) {
).await {
Ok(data) => {
//TODO: Get content type from the stored file metadata
// Guess content type from file extension in file_id
Expand Down
4 changes: 2 additions & 2 deletions backend/crates/kalamdb-api/src/handlers/sql/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ pub async fn execute_sql_v1(
user_id.as_ref(),
&mut subfolder_state,
None,
) {
).await {
Ok(refs) => refs,
Err(e) => {
let took = start_time.elapsed().as_secs_f64() * 1000.0;
Expand Down Expand Up @@ -309,7 +309,7 @@ pub async fn execute_sql_v1(
&table_id,
user_id.as_ref(),
app_context.get_ref(),
);
).await;
let took = start_time.elapsed().as_secs_f64() * 1000.0;
HttpResponse::BadRequest().json(SqlResponse::error_with_details(
ErrorCode::SqlExecutionError,
Expand Down
3 changes: 2 additions & 1 deletion backend/crates/kalamdb-api/src/handlers/sql/file_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ pub fn substitute_file_placeholders(sql: &str, file_refs: &HashMap<String, FileR
}

/// Stage files and create FileRef values
pub fn stage_and_finalize_files(
pub async fn stage_and_finalize_files(
file_service: &FileStorageService,
files: &HashMap<String, (String, Bytes, Option<String>)>,
storage_id: &StorageId,
Expand Down Expand Up @@ -324,6 +324,7 @@ pub fn stage_and_finalize_files(
subfolder_state,
shard_id,
)
.await
.map_err(|e| {
FileError::new(
ErrorCode::InternalError,
Expand Down
4 changes: 2 additions & 2 deletions backend/crates/kalamdb-api/src/handlers/sql/helpers/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use kalamdb_system::FileRef;
use std::collections::HashMap;

/// Cleanup files after SQL error
pub fn cleanup_files(
pub async fn cleanup_files(
file_refs: &HashMap<String, FileRef>,
storage_id: &StorageId,
table_type: TableType,
Expand All @@ -18,7 +18,7 @@ pub fn cleanup_files(
let file_service = app_context.file_storage_service();
for file_ref in file_refs.values() {
if let Err(err) =
file_service.delete_file(file_ref, storage_id, table_type, table_id, user_id)
file_service.delete_file(file_ref, storage_id, table_type, table_id, user_id).await
{
log::warn!(
"Failed to cleanup file {} after SQL error: {}",
Expand Down
5 changes: 5 additions & 0 deletions backend/crates/kalamdb-configs/src/config/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,11 @@ impl ManifestCacheSettings {
pub fn ttl_seconds(&self) -> i64 {
(self.eviction_ttl_days * 24 * 60 * 60) as i64
}

/// Get TTL in milliseconds (converts eviction_ttl_days to milliseconds)
pub fn ttl_millis(&self) -> i64 {
self.ttl_seconds() * 1000
}
}

impl Default for ManifestCacheSettings {
Expand Down
50 changes: 25 additions & 25 deletions backend/crates/kalamdb-core/src/applier/executor/dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ impl DmlExecutor {
// Try UserTableProvider first, then StreamTableProvider
if let Some(provider) = provider_arc.as_any().downcast_ref::<UserTableProvider>() {
let row_ids = provider
.insert_batch(user_id, rows.to_vec())
.insert_batch(user_id, rows.to_vec()).await
.map_err(|e| ApplierError::Execution(format!("Failed to insert batch: {}", e)))?;
log::debug!("DmlExecutor: Inserted {} rows into {}", row_ids.len(), table_id);
Ok(row_ids.len())
} else if let Some(provider) = provider_arc.as_any().downcast_ref::<StreamTableProvider>() {
let row_ids = provider.insert_batch(user_id, rows.to_vec()).map_err(|e| {
let row_ids = provider.insert_batch(user_id, rows.to_vec()).await.map_err(|e| {
ApplierError::Execution(format!("Failed to insert stream batch: {}", e))
})?;
log::debug!("DmlExecutor: Inserted {} stream rows into {}", row_ids.len(), table_id);
Expand Down Expand Up @@ -104,7 +104,7 @@ impl DmlExecutor {
.ok_or_else(|| ApplierError::not_found("Table provider", table_id))?;

if let Some(provider) = provider_arc.as_any().downcast_ref::<UserTableProvider>() {
let prior_row = match find_row_by_pk(provider, Some(user_id), pk_value) {
let prior_row = match find_row_by_pk(provider, Some(user_id), pk_value).await {
Ok(Some((_key, row))) => Some(row.fields),
Ok(None) => None,
Err(err) => {
Expand All @@ -127,19 +127,19 @@ impl DmlExecutor {
)
});

let updated = self.update_user_provider(provider, user_id, pk_value, update_row.clone())?;
let updated = self.update_user_provider(provider, user_id, pk_value, update_row.clone()).await?;
if updated > 0 {
delete_file_refs_best_effort(
self.app_context.as_ref(),
table_id,
TableType::User,
Some(user_id),
&replaced_refs,
);
).await;
}
Ok(updated)
} else if let Some(provider) = provider_arc.as_any().downcast_ref::<StreamTableProvider>() {
self.update_stream_provider(provider, user_id, pk_value, update_row.clone())
self.update_stream_provider(provider, user_id, pk_value, update_row.clone()).await
} else {
Err(ApplierError::Execution(format!(
"Provider type mismatch for user table {}",
Expand Down Expand Up @@ -171,7 +171,7 @@ impl DmlExecutor {
if let Some(provider) = provider_arc.as_any().downcast_ref::<UserTableProvider>() {
let mut deleted_count = 0;
for pk_value in pk_values {
let file_refs = match find_row_by_pk(provider, Some(user_id), pk_value) {
let file_refs = match find_row_by_pk(provider, Some(user_id), pk_value).await {
Ok(Some((_key, row))) => collect_file_refs_from_row(
self.app_context.as_ref(),
table_id,
Expand All @@ -190,7 +190,7 @@ impl DmlExecutor {
};

if provider
.delete_by_id_field(user_id, pk_value)
.delete_by_id_field(user_id, pk_value).await
.map_err(|e| ApplierError::Execution(format!("Failed to delete row: {}", e)))?
{
deleted_count += 1;
Expand All @@ -200,7 +200,7 @@ impl DmlExecutor {
TableType::User,
Some(user_id),
&file_refs,
);
).await;
}
}
log::debug!("DmlExecutor: Deleted {} rows from {}", deleted_count, table_id);
Expand All @@ -209,7 +209,7 @@ impl DmlExecutor {
let mut deleted_count = 0;
for pk_value in pk_values {
if provider
.delete_by_id_field(user_id, pk_value)
.delete_by_id_field(user_id, pk_value).await
.map_err(|e| ApplierError::Execution(format!("Failed to delete row: {}", e)))?
{
deleted_count += 1;
Expand Down Expand Up @@ -247,7 +247,7 @@ impl DmlExecutor {
if let Some(provider) = provider_arc.as_any().downcast_ref::<SharedTableProvider>() {
let system_user = UserId::from("system");
let row_ids = provider
.insert_batch(&system_user, rows.to_vec())
.insert_batch(&system_user, rows.to_vec()).await
.map_err(|e| ApplierError::Execution(format!("Failed to insert batch: {}", e)))?;
log::debug!("DmlExecutor: Inserted {} shared rows into {}", row_ids.len(), table_id);
Ok(row_ids.len())
Expand Down Expand Up @@ -283,7 +283,7 @@ impl DmlExecutor {
let system_user = UserId::from("system");
let update_row = updates[0].clone();

let prior_row = match find_row_by_pk(provider, None, pk_value) {
let prior_row = match find_row_by_pk(provider, None, pk_value).await {
Ok(Some((_key, row))) => Some(row.fields),
Ok(None) => None,
Err(err) => {
Expand All @@ -307,7 +307,7 @@ impl DmlExecutor {
});

provider
.update_by_id_field(&system_user, pk_value, update_row)
.update_by_id_field(&system_user, pk_value, update_row).await
.map_err(|e| ApplierError::Execution(format!("Failed to update row: {}", e)))?;

delete_file_refs_best_effort(
Expand All @@ -316,7 +316,7 @@ impl DmlExecutor {
TableType::Shared,
None,
&replaced_refs,
);
).await;

log::debug!("DmlExecutor: Updated 1 shared row in {} (pk={})", table_id, pk_value);
Ok(1)
Expand Down Expand Up @@ -352,7 +352,7 @@ impl DmlExecutor {
let mut deleted_count = 0;

for pk_value in pk_values {
let file_refs = match find_row_by_pk(provider, None, pk_value) {
let file_refs = match find_row_by_pk(provider, None, pk_value).await {
Ok(Some((_key, row))) => collect_file_refs_from_row(
self.app_context.as_ref(),
table_id,
Expand All @@ -371,7 +371,7 @@ impl DmlExecutor {
};

if provider
.delete_by_id_field(&system_user, pk_value)
.delete_by_id_field(&system_user, pk_value).await
.map_err(|e| ApplierError::Execution(format!("Failed to delete row: {}", e)))?
{
deleted_count += 1;
Expand All @@ -381,7 +381,7 @@ impl DmlExecutor {
TableType::Shared,
None,
&file_refs,
);
).await;
}
}

Expand All @@ -400,22 +400,22 @@ impl DmlExecutor {
// =========================================================================

/// Update with fallback for UserTableProvider
fn update_user_provider(
async fn update_user_provider(
&self,
provider: &UserTableProvider,
user_id: &UserId,
pk_value: &str,
updates: Row,
) -> Result<usize, ApplierError> {
match provider.update_by_id_field(user_id, pk_value, updates.clone()) {
match provider.update_by_id_field(user_id, pk_value, updates.clone()).await {
Ok(_) => Ok(1),
Err(kalamdb_tables::TableError::NotFound(_)) => {
if let Some(key) =
provider.find_row_key_by_id_field(user_id, pk_value).map_err(|e| {
provider.find_row_key_by_id_field(user_id, pk_value).await.map_err(|e| {
ApplierError::Execution(format!("Failed to find row key: {}", e))
})?
{
provider.update(user_id, &key, updates).map_err(|e| {
provider.update(user_id, &key, updates).await.map_err(|e| {
ApplierError::Execution(format!("Failed to update row: {}", e))
})?;
Ok(1)
Expand All @@ -428,22 +428,22 @@ impl DmlExecutor {
}

/// Update with fallback for StreamTableProvider
fn update_stream_provider(
async fn update_stream_provider(
&self,
provider: &StreamTableProvider,
user_id: &UserId,
pk_value: &str,
updates: Row,
) -> Result<usize, ApplierError> {
match provider.update_by_id_field(user_id, pk_value, updates.clone()) {
match provider.update_by_id_field(user_id, pk_value, updates.clone()).await {
Ok(_) => Ok(1),
Err(kalamdb_tables::TableError::NotFound(_)) => {
if let Some(key) =
provider.find_row_key_by_id_field(user_id, pk_value).map_err(|e| {
provider.find_row_key_by_id_field(user_id, pk_value).await.map_err(|e| {
ApplierError::Execution(format!("Failed to find row key: {}", e))
})?
{
provider.update(user_id, &key, updates).map_err(|e| {
provider.update(user_id, &key, updates).await.map_err(|e| {
ApplierError::Execution(format!("Failed to update row: {}", e))
})?;
Ok(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ pub fn collect_replaced_file_refs_for_update(
refs
}

pub fn delete_file_refs_best_effort(
pub async fn delete_file_refs_best_effort(
app_context: &AppContext,
table_id: &TableId,
table_type: TableType,
Expand All @@ -170,7 +170,7 @@ pub fn delete_file_refs_best_effort(
};

let file_service = app_context.file_storage_service();
let results = file_service.delete_files(file_refs, &storage_id, table_type, table_id, user_id);
let results = file_service.delete_files(file_refs, &storage_id, table_type, table_id, user_id).await;

for (file_ref, result) in file_refs.iter().zip(results.into_iter()) {
if let Err(err) = result {
Expand Down
1 change: 1 addition & 0 deletions backend/crates/kalamdb-core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,7 @@ impl From<kalamdb_tables::TableError> for KalamDbError {
TableError::Arrow(e) => KalamDbError::Other(format!("Arrow error: {}", e)),
TableError::Filestore(msg) => KalamDbError::Other(format!("Filestore error: {}", msg)),
TableError::SchemaError(msg) => KalamDbError::SchemaError(msg),
TableError::NotLeader { leader_addr } => KalamDbError::NotLeader { leader_addr },
TableError::Other(msg) => KalamDbError::Other(msg),
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,9 +414,11 @@ mod tests {
let user = UserId::new("user-ttl");
provider
.insert(&user, json_to_row(&json!({"event_id": "evt1", "payload": "hello"})).unwrap())
.await
.expect("insert evt1");
provider
.insert(&user, json_to_row(&json!({"event_id": "evt2", "payload": "world"})).unwrap())
.await
.expect("insert evt2");

// Wait for TTL to make them eligible for eviction
Expand Down Expand Up @@ -468,6 +470,7 @@ mod tests {
harness
.provider
.insert(&user, json_to_row(&json!({"event_id": "evt1", "payload": "fresh"})).unwrap())
.await
.expect("insert fresh row");

let params = StreamEvictionParams {
Expand All @@ -493,6 +496,7 @@ mod tests {
harness
.provider
.insert(&user, json_to_row(&json!({"event_id": "evt1", "payload": "expired"})).unwrap())
.await
.expect("insert expired row");

sleep(Duration::from_millis(1200)).await;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl JobExecutor for TopicCleanupExecutor {
"TopicCleanupExecutor"
}

async fn execute(&self, ctx: &JobContext<Self::Params>) -> Result<JobDecision, KalamDbError> {
async fn execute(&self, _ctx: &JobContext<Self::Params>) -> Result<JobDecision, KalamDbError> {
// No local work needed for topic cleanup
Ok(JobDecision::Completed {
message: Some("Topic cleanup has no local work".to_string()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ use crate::live::manager::ConnectionsManager;
use crate::live::helpers::filter_eval::parse_where_clause;
use crate::live::helpers::initial_data::{InitialDataFetcher, InitialDataOptions, InitialDataResult};
use crate::live::models::{
ChangeNotification, RegistryStats, SharedConnectionState, SubscriptionResult,
RegistryStats, SharedConnectionState, SubscriptionResult,
};
use crate::live::notification::NotificationService;
use crate::live::helpers::query_parser::QueryParser;
use crate::live::subscription::SubscriptionService;
use crate::sql::executor::SqlExecutor;
Expand Down
Loading