Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1206,11 +1206,20 @@ jobs:
shell: bash
run: |
set -euo pipefail
VERSION="${{ needs.read_version.outputs.version }}"
DOCKER_REPO="${DOCKER_REPO_INPUT:-}"
if [[ -z "$DOCKER_REPO" ]]; then
DOCKER_REPO="jamals86/kalamdb"
fi
SHORT_SHA="${GITHUB_SHA:-}"
if [[ -n "$SHORT_SHA" ]]; then
SHORT_SHA="${SHORT_SHA:0:7}"
else
SHORT_SHA="$(git rev-parse --short=7 HEAD)"
fi
DOCKER_VERSION_COMMIT_TAG="${VERSION}-h${SHORT_SHA}"
echo "docker_repo=$DOCKER_REPO" >> "$GITHUB_OUTPUT"
echo "docker_version_commit_tag=$DOCKER_VERSION_COMMIT_TAG" >> "$GITHUB_OUTPUT"

- name: Download pre-built artifacts (x86_64)
uses: actions/download-artifact@v4
Expand Down Expand Up @@ -1328,18 +1337,24 @@ jobs:
set -euo pipefail
REPO="${{ steps.vars.outputs.docker_repo }}"
VERSION="${{ needs.read_version.outputs.version }}"
VERSION_COMMIT_TAG="${{ steps.vars.outputs.docker_version_commit_tag }}"

# Create manifest for versioned tag
docker buildx imagetools create -t "${REPO}:${VERSION}" \
"${REPO}:${VERSION}-amd64" \
"${REPO}:${VERSION}-arm64"

# Create manifest for version + commit tag
docker buildx imagetools create -t "${REPO}:${VERSION_COMMIT_TAG}" \
"${REPO}:${VERSION}-amd64" \
"${REPO}:${VERSION}-arm64"

# Create manifest for latest tag
docker buildx imagetools create -t "${REPO}:latest" \
"${REPO}:${VERSION}-amd64" \
"${REPO}:${VERSION}-arm64"

echo "✅ Multi-arch manifest created for ${REPO}:${VERSION} and ${REPO}:latest"
echo "✅ Multi-arch manifest created for ${REPO}:${VERSION}, ${REPO}:${VERSION_COMMIT_TAG}, and ${REPO}:latest"

# ═══════════════════════════════════════════════════════════════════════════
# INTEGRATION TESTS - Smoke + TypeScript SDK + Dart SDK against Docker image
Expand Down
161 changes: 128 additions & 33 deletions backend/crates/kalamdb-sql/src/execute_as.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,8 @@ pub fn extract_inner_sql(sql: &str) -> Option<String> {
return None;
}

// Find the opening parenthesis that wraps the inner SQL
let open_paren = trimmed.find('(')?;
// Find the matching closing parenthesis (last ')' in the string)
let close_paren = trimmed.rfind(')')?;
let close_paren = find_matching_close_paren(trimmed, open_paren)?;
if close_paren <= open_paren + 1 {
return None;
}
Expand All @@ -69,6 +67,87 @@ pub fn extract_inner_sql(sql: &str) -> Option<String> {
Some(inner.to_string())
}

fn parse_single_quoted(input: &str) -> Result<(String, &str), String> {
let mut value = String::new();
let mut chars = input.char_indices().peekable();

if !input.starts_with('\'') {
return Err("Expected single-quoted string".to_string());
}

chars.next();

while let Some((idx, ch)) = chars.next() {
if ch != '\'' {
value.push(ch);
continue;
}

if matches!(chars.peek(), Some((_, '\''))) {
chars.next();
value.push('\'');
continue;
}

return Ok((value, &input[idx + 1..]));
}

Err("EXECUTE AS USER username quote was not closed".to_string())
}

fn find_matching_close_paren(input: &str, open_idx: usize) -> Option<usize> {
let mut depth = 0usize;
let mut in_single_quote = false;
let mut in_double_quote = false;
let mut chars = input.char_indices().peekable();

while let Some((idx, ch)) = chars.next() {
if idx < open_idx {
continue;
}

if in_single_quote {
if ch == '\'' {
if matches!(chars.peek(), Some((_, '\''))) {
chars.next();
} else {
in_single_quote = false;
}
}
continue;
}

if in_double_quote {
if ch == '"' {
if matches!(chars.peek(), Some((_, '"'))) {
chars.next();
} else {
in_double_quote = false;
}
}
continue;
}

match ch {
'\'' => in_single_quote = true,
'"' => in_double_quote = true,
'(' => depth += 1,
')' => {
if depth == 0 {
return None;
}
depth -= 1;
if depth == 0 {
return Some(idx);
}
},
_ => {},
}
}

None
}

/// Fully parse an `EXECUTE AS USER` envelope, extracting the username and the
/// parenthesised inner SQL.
///
Expand Down Expand Up @@ -96,17 +175,13 @@ pub fn parse_execute_as(statement: &str) -> Result<Option<ExecuteAsEnvelope>, St
let after_prefix = trimmed[EXECUTE_AS_PREFIX_LEN..].trim_start();

// --- Username extraction (quoted or bare) ---
let (username, rest) = if after_prefix.starts_with('\'') {
// Quoted: EXECUTE AS USER 'some name' (...)
let after_quote = &after_prefix[1..];
let end_quote = after_quote
.find('\'')
.ok_or_else(|| "EXECUTE AS USER username quote was not closed".to_string())?;
let uname = after_quote[..end_quote].trim();
let (username, rest): (String, &str) = if after_prefix.starts_with('\'') {
let (parsed_username, rest) = parse_single_quoted(after_prefix)?;
let uname = parsed_username.trim();
if uname.is_empty() {
return Err("EXECUTE AS USER username cannot be empty".to_string());
}
(uname, after_quote[end_quote + 1..].trim_start())
(uname.to_string(), rest.trim_start())
} else {
// Bare: EXECUTE AS USER alice (...)
// Username extends until whitespace or '('.
Expand All @@ -117,34 +192,16 @@ pub fn parse_execute_as(statement: &str) -> Result<Option<ExecuteAsEnvelope>, St
if uname.is_empty() {
return Err("EXECUTE AS USER username cannot be empty".to_string());
}
(uname, after_prefix[end..].trim_start())
(uname.to_string(), after_prefix[end..].trim_start())
};

// --- Parenthesised SQL body ---
if !rest.starts_with('(') {
return Err("EXECUTE AS USER must wrap SQL in parentheses".to_string());
}

let mut depth = 0usize;
let mut close_idx = None;
for (idx, ch) in rest.char_indices() {
match ch {
'(' => depth += 1,
')' => {
if depth == 0 {
return Err("EXECUTE AS USER contains unbalanced parentheses".to_string());
}
depth -= 1;
if depth == 0 {
close_idx = Some(idx);
break;
}
},
_ => {},
}
}

let close_idx = close_idx.ok_or_else(|| "EXECUTE AS USER missing closing ')'".to_string())?;
let close_idx = find_matching_close_paren(rest, 0)
.ok_or_else(|| "EXECUTE AS USER missing closing ')'".to_string())?;
let inner_sql = rest[1..close_idx].trim();
if inner_sql.is_empty() {
return Err("EXECUTE AS USER requires a non-empty inner SQL statement".to_string());
Expand All @@ -162,7 +219,7 @@ pub fn parse_execute_as(statement: &str) -> Result<Option<ExecuteAsEnvelope>, St
}

Ok(Some(ExecuteAsEnvelope {
username: username.to_string(),
username,
inner_sql: inner_statements[0].trim().to_string(),
}))
}
Expand Down Expand Up @@ -263,6 +320,44 @@ mod tests {
assert_eq!(result.inner_sql, "INSERT INTO default.t VALUES (1)");
}

#[test]
fn parse_escaped_quote_in_username() {
let result = parse_execute_as(
"EXECUTE AS USER 'alice''o' (INSERT INTO default.t VALUES (1))",
)
.expect("should parse")
.expect("should be an envelope");

assert_eq!(result.username, "alice'o");
assert_eq!(result.inner_sql, "INSERT INTO default.t VALUES (1)");
}

#[test]
fn parse_inner_sql_with_parenthesis_in_string_literal() {
let result = parse_execute_as(
"EXECUTE AS USER 'alice' (INSERT INTO default.t VALUES ('hello (', 'done'))",
)
.expect("should parse")
.expect("should be an envelope");

assert_eq!(result.username, "alice");
assert_eq!(
result.inner_sql,
"INSERT INTO default.t VALUES ('hello (', 'done')"
);
}

#[test]
fn extract_inner_sql_ignores_parenthesis_in_string_literal() {
let inner = extract_inner_sql(
"EXECUTE AS USER 'alice' (INSERT INTO default.t VALUES ('hello (', 'done'))",
);
assert_eq!(
inner.as_deref(),
Some("INSERT INTO default.t VALUES ('hello (', 'done')")
);
}

#[test]
fn parse_bare_no_space_before_paren() {
let result = parse_execute_as("EXECUTE AS USER alice(SELECT 1)")
Expand Down
69 changes: 26 additions & 43 deletions benchv2/src/benchmarks/subscribe_change_latency_bench.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::Duration;

use kalam_link::{ChangeEvent, SubscriptionConfig};

Expand Down Expand Up @@ -63,9 +61,6 @@ impl Benchmark for SubscribeChangeLatencyBench {
.await?;
tokio::time::sleep(Duration::from_millis(25)).await;

let change_received = Arc::new(AtomicU32::new(0));
let counter = change_received.clone();

let sub_id = format!("latency_{}", iteration);
let sql = format!("SELECT * FROM {}.change_latency", ns);
let sub_config = SubscriptionConfig::new(sub_id, sql);
Expand Down Expand Up @@ -96,32 +91,10 @@ impl Benchmark for SubscribeChangeLatencyBench {
}
}

// Spawn a listener task for change events
let listen_handle = tokio::spawn(async move {
let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
loop {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
break;
}
match tokio::time::timeout(remaining, sub.next()).await {
Ok(Some(Ok(event))) => {
if matches!(event, ChangeEvent::Insert { .. }) {
counter.fetch_add(1, Ordering::SeqCst);
}
},
_ => break,
}
}
// Gracefully close the subscription (sends Unsubscribe + WS Close frame)
let _ = sub.close().await;
});

// Brief yield to ensure subscription is fully registered
tokio::time::sleep(Duration::from_millis(10)).await;

let write_id = 500_000 + iteration;
let _start = Instant::now();

client
.sql_ok(&format!(
Expand All @@ -130,24 +103,34 @@ impl Benchmark for SubscribeChangeLatencyBench {
))
.await?;

// Wait for the change to arrive
let poll_deadline = tokio::time::Instant::now() + Duration::from_secs(5);
loop {
if change_received.load(Ordering::SeqCst) > 0 {
break;
}
if tokio::time::Instant::now() >= poll_deadline {
break;
let change = tokio::time::timeout(Duration::from_secs(5), async {
loop {
match sub.next().await {
Some(Ok(ChangeEvent::Insert { .. })) => break Ok::<(), String>(()),
Some(Ok(ChangeEvent::Error { message, .. })) => {
break Err(format!("Server error: {}", message));
},
Some(Ok(_)) => continue,
Some(Err(error)) => {
break Err(format!("Subscription stream error: {}", error));
},
None => {
break Err("Subscription ended before change notification arrived".to_string());
},
}
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await;

// Wait for the listener task to finish (bounded by its 5s deadline)
// instead of aborting, so it can close the WebSocket gracefully
let _ = tokio::time::timeout(Duration::from_secs(6), listen_handle).await;
let close_result = sub.close().await;
if let Err(error) = close_result {
return Err(format!("Failed to close subscription: {}", error));
}

if change_received.load(Ordering::SeqCst) == 0 {
return Err("Change notification not received within 5s".to_string());
match change {
Ok(Ok(())) => {},
Ok(Err(error)) => return Err(error),
Err(_) => return Err("Change notification not received within 5s".to_string()),
}

Ok(())
Expand Down
Loading
Loading