From 743ba901b8b8c88ee6847bfd852c65aa847e565c Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 13 Apr 2026 22:20:59 +1000 Subject: [PATCH 1/7] feat: add api keys support for ingestion authentication use api keys as an alternative auth mechanism for ingestion use header `X-API-KEY` in place of basic auth middleware validates the header on ingest action key store in object store at .settings/apikeys/.json --- src/apikeys.rs | 324 ++++++++++++++++++ src/handlers/http/middleware.rs | 36 ++ src/lib.rs | 1 + src/metastore/metastore_traits.rs | 14 + .../metastores/object_store_metastore.rs | 64 +++- src/storage/mod.rs | 1 + src/storage/object_storage.rs | 14 + 7 files changed, 451 insertions(+), 3 deletions(-) create mode 100644 src/apikeys.rs diff --git a/src/apikeys.rs b/src/apikeys.rs new file mode 100644 index 000000000..11f66f6ad --- /dev/null +++ b/src/apikeys.rs @@ -0,0 +1,324 @@ +/* + * Parseable Server (C) 2022 - 2025 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::collections::HashMap; + +use chrono::{DateTime, Utc}; +use once_cell::sync::Lazy; +use rand::Rng; +use serde::{Deserialize, Serialize}; +use tokio::sync::RwLock; +use ulid::Ulid; + +use crate::{ + metastore::metastore_traits::MetastoreObject, + parseable::{DEFAULT_TENANT, PARSEABLE}, + storage::object_storage::apikey_json_path, +}; + +pub static API_KEYS: Lazy = Lazy::new(|| ApiKeyStore { + keys: RwLock::new(HashMap::new()), +}); + +#[derive(Debug)] +pub struct ApiKeyStore { + pub keys: RwLock>>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ApiKey { + pub key_id: Ulid, + pub api_key: String, + pub key_name: String, + pub created_by: String, + pub created_at: DateTime, + pub modified_at: DateTime, + #[serde(default)] + pub tenant: Option, +} + +/// Request body for creating a new API key +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct CreateApiKeyRequest { + pub key_name: String, +} + +/// Response for list keys (api_key masked to last 4 chars) +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ApiKeyListEntry { + pub key_id: Ulid, + pub api_key: String, + pub key_name: String, + pub created_by: String, + pub created_at: DateTime, + pub modified_at: DateTime, +} + +impl ApiKey { + pub fn new(key_name: String, created_by: String, tenant: Option) -> Self { + let now = Utc::now(); + Self { + key_id: Ulid::new(), + api_key: generate_uuid_v4(), + key_name, + created_by, + created_at: now, + modified_at: now, + tenant, + } + } + + pub fn to_list_entry(&self) -> ApiKeyListEntry { + let masked = if self.api_key.len() >= 4 { + let last4 = &self.api_key[self.api_key.len() - 4..]; + format!("****{last4}") + } else { + "****".to_string() + }; + ApiKeyListEntry { + key_id: self.key_id, + api_key: masked, + key_name: self.key_name.clone(), + created_by: self.created_by.clone(), + created_at: self.created_at, + modified_at: self.modified_at, + } + } +} + +impl MetastoreObject for ApiKey { + fn get_object_path(&self) -> String { + apikey_json_path(&self.key_id, &self.tenant).to_string() + } + + fn get_object_id(&self) -> String { + self.key_id.to_string() + } +} + +/// Generate a UUID v4 formatted string using rand +fn generate_uuid_v4() -> String { + let mut rng = rand::thread_rng(); + let mut bytes = [0u8; 16]; + rng.fill(&mut bytes); + // Set version 4 (bits 12-15 of time_hi_and_version) + bytes[6] = (bytes[6] & 0x0f) | 0x40; + // Set variant 1 (bits 6-7 of clock_seq_hi_and_reserved) + bytes[8] = (bytes[8] & 0x3f) | 0x80; + format!( + "{:02x}{:02x}{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}", + bytes[0], bytes[1], bytes[2], bytes[3], + bytes[4], bytes[5], + bytes[6], bytes[7], + bytes[8], bytes[9], + bytes[10], bytes[11], bytes[12], bytes[13], bytes[14], bytes[15] + ) +} + +impl ApiKeyStore { + /// Load API keys from object store into memory + pub async fn load(&self) -> anyhow::Result<()> { + let api_keys = PARSEABLE.metastore.get_api_keys().await?; + let mut map = self.keys.write().await; + for (tenant_id, keys) in api_keys { + let inner = keys + .into_iter() + .map(|mut k| { + k.tenant = Some(tenant_id.clone()); + (k.key_id, k) + }) + .collect(); + map.insert(tenant_id, inner); + } + Ok(()) + } + + /// Create a new API key + pub async fn create(&self, api_key: ApiKey) -> Result<(), ApiKeyError> { + let tenant = api_key.tenant.as_deref().unwrap_or(DEFAULT_TENANT); + + // Hold write lock for the entire operation to prevent TOCTOU race + // on duplicate name check + let mut map = self.keys.write().await; + if let Some(tenant_keys) = map.get(tenant) { + if tenant_keys + .values() + .any(|k| k.key_name == api_key.key_name) + { + return Err(ApiKeyError::DuplicateKeyName(api_key.key_name)); + } + } + + PARSEABLE + .metastore + .put_api_key(&api_key, &api_key.tenant) + .await?; + + map.entry(tenant.to_owned()) + .or_default() + .insert(api_key.key_id, api_key); + Ok(()) + } + + /// Delete an API key by key_id + pub async fn delete( + &self, + key_id: &Ulid, + tenant_id: &Option, + ) -> Result { + let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); + + // Read the key first without removing + let api_key = { + let map = self.keys.read().await; + let tenant_keys = map + .get(tenant) + .ok_or_else(|| ApiKeyError::KeyNotFound(key_id.to_string()))?; + tenant_keys + .get(key_id) + .cloned() + .ok_or_else(|| ApiKeyError::KeyNotFound(key_id.to_string()))? + }; + + // Delete from storage first + PARSEABLE + .metastore + .delete_api_key(&api_key, tenant_id) + .await?; + + // Remove from memory only after successful storage deletion + { + let mut map = self.keys.write().await; + if let Some(tenant_keys) = map.get_mut(tenant) { + tenant_keys.remove(key_id); + } + } + + Ok(api_key) + } + + /// List all API keys for a tenant (returns masked entries) + pub async fn list( + &self, + tenant_id: &Option, + ) -> Result, ApiKeyError> { + let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); + let map = self.keys.read().await; + let entries = if let Some(tenant_keys) = map.get(tenant) { + tenant_keys.values().map(|k| k.to_list_entry()).collect() + } else { + vec![] + }; + Ok(entries) + } + + /// Get a specific API key by key_id (returns full key) + pub async fn get( + &self, + key_id: &Ulid, + tenant_id: &Option, + ) -> Result { + let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); + let map = self.keys.read().await; + let tenant_keys = map + .get(tenant) + .ok_or_else(|| ApiKeyError::KeyNotFound(key_id.to_string()))?; + tenant_keys + .get(key_id) + .cloned() + .ok_or_else(|| ApiKeyError::KeyNotFound(key_id.to_string())) + } + + /// Validate an API key for ingestion. Returns true if the key is valid. + /// For multi-tenant: checks the key belongs to the specified tenant. + /// For single-tenant: checks the key exists globally. + pub async fn validate_key( + &self, + api_key_value: &str, + tenant_id: &Option, + ) -> bool { + let map = self.keys.read().await; + if let Some(tenant_id) = tenant_id { + // Multi-tenant: check keys for the specific tenant + if let Some(tenant_keys) = map.get(tenant_id) { + return tenant_keys + .values() + .any(|k| k.api_key == api_key_value); + } + false + } else { + // Single-tenant: check keys under DEFAULT_TENANT + if let Some(tenant_keys) = map.get(DEFAULT_TENANT) { + return tenant_keys + .values() + .any(|k| k.api_key == api_key_value); + } + false + } + } + + /// Insert an API key directly into memory (used for sync from prism) + pub async fn sync_put(&self, api_key: ApiKey) { + let tenant = api_key.tenant.as_deref().unwrap_or(DEFAULT_TENANT).to_owned(); + let mut map = self.keys.write().await; + map.entry(tenant).or_default().insert(api_key.key_id, api_key); + } + + /// Remove an API key from memory (used for sync from prism) + pub async fn sync_delete(&self, key_id: &Ulid, tenant_id: &Option) { + let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); + let mut map = self.keys.write().await; + if let Some(tenant_keys) = map.get_mut(tenant) { + tenant_keys.remove(key_id); + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum ApiKeyError { + #[error("API key not found: {0}")] + KeyNotFound(String), + + #[error("Duplicate key name: {0}")] + DuplicateKeyName(String), + + #[error("Unauthorized: {0}")] + Unauthorized(String), + + #[error("{0}")] + MetastoreError(#[from] crate::metastore::MetastoreError), + + #[error("{0}")] + AnyhowError(#[from] anyhow::Error), +} + +impl actix_web::ResponseError for ApiKeyError { + fn status_code(&self) -> actix_web::http::StatusCode { + match self { + ApiKeyError::KeyNotFound(_) => actix_web::http::StatusCode::NOT_FOUND, + ApiKeyError::DuplicateKeyName(_) => actix_web::http::StatusCode::CONFLICT, + ApiKeyError::Unauthorized(_) => actix_web::http::StatusCode::FORBIDDEN, + ApiKeyError::MetastoreError(_) | ApiKeyError::AnyhowError(_) => { + actix_web::http::StatusCode::INTERNAL_SERVER_ERROR + } + } + } +} diff --git a/src/handlers/http/middleware.rs b/src/handlers/http/middleware.rs index 5455aa9e4..30b8adebe 100644 --- a/src/handlers/http/middleware.rs +++ b/src/handlers/http/middleware.rs @@ -181,6 +181,42 @@ where let mut header_error = None; let user_and_tenant_id = get_user_and_tenant(&self.action, &mut req, &mut header_error); + // Check for X-API-KEY header for ingestion + let api_key_value = if self.action.eq(&Action::Ingest) { + req.headers() + .get("x-api-key") + .and_then(|v| v.to_str().ok()) + .map(String::from) + } else { + None + }; + + // If API key auth is being used, short-circuit the normal auth flow + if let Some(api_key) = api_key_value { + let suspension = check_suspension(req.request(), self.action); + let tenant_id = req + .headers() + .get(TENANT_ID) + .and_then(|v| v.to_str().ok()) + .map(String::from); + let fut = self.service.call(req); + + return Box::pin(async move { + if let Some(err) = header_error { + return Err(err); + } + if let rbac::Response::Suspended(msg) = suspension { + return Err(ErrorBadRequest(msg)); + } + + use crate::apikeys::API_KEYS; + if API_KEYS.validate_key(&api_key, &tenant_id).await { + return fut.await; + } + Err(ErrorUnauthorized("Invalid API key")) + }); + } + let key: Result = extract_session_key(&mut req); // if action is ingestion, check if tenant is correct for basic auth user diff --git a/src/lib.rs b/src/lib.rs index 5c3704d5c..e7859dc38 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,6 +18,7 @@ pub mod about; pub mod alerts; +pub mod apikeys; pub mod analytics; pub mod banner; pub mod catalog; diff --git a/src/metastore/metastore_traits.rs b/src/metastore/metastore_traits.rs index 79745ecf8..ace5eb077 100644 --- a/src/metastore/metastore_traits.rs +++ b/src/metastore/metastore_traits.rs @@ -31,6 +31,7 @@ use crate::{ alert_structs::{AlertStateEntry, MTTRHistory}, target::Target, }, + apikeys::ApiKey, catalog::manifest::Manifest, handlers::http::modal::NodeType, metastore::MetastoreError, @@ -165,6 +166,19 @@ pub trait Metastore: std::fmt::Debug + Send + Sync { tenant_id: &Option, ) -> Result<(), MetastoreError>; + /// api keys + async fn get_api_keys(&self) -> Result>, MetastoreError>; + async fn put_api_key( + &self, + obj: &dyn MetastoreObject, + tenant_id: &Option, + ) -> Result<(), MetastoreError>; + async fn delete_api_key( + &self, + obj: &dyn MetastoreObject, + tenant_id: &Option, + ) -> Result<(), MetastoreError>; + /// dashboards async fn get_dashboards(&self) -> Result>, MetastoreError>; async fn put_dashboard( diff --git a/src/metastore/metastores/object_store_metastore.rs b/src/metastore/metastores/object_store_metastore.rs index 904bcca24..7403db126 100644 --- a/src/metastore/metastores/object_store_metastore.rs +++ b/src/metastore/metastores/object_store_metastore.rs @@ -36,6 +36,7 @@ use crate::{ alert_structs::{AlertStateEntry, MTTRHistory}, target::Target, }, + apikeys::ApiKey, catalog::{manifest::Manifest, partition_path}, handlers::http::{ modal::{Metadata, NodeMetadata, NodeType}, @@ -48,9 +49,9 @@ use crate::{ option::Mode, parseable::{DEFAULT_TENANT, PARSEABLE}, storage::{ - ALERTS_ROOT_DIRECTORY, ObjectStorage, ObjectStorageError, PARSEABLE_ROOT_DIRECTORY, - SETTINGS_ROOT_DIRECTORY, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, - TARGETS_ROOT_DIRECTORY, + ALERTS_ROOT_DIRECTORY, APIKEYS_ROOT_DIRECTORY, ObjectStorage, ObjectStorageError, + PARSEABLE_ROOT_DIRECTORY, SETTINGS_ROOT_DIRECTORY, STREAM_METADATA_FILE_NAME, + STREAM_ROOT_DIRECTORY, TARGETS_ROOT_DIRECTORY, object_storage::{ alert_json_path, alert_state_json_path, filter_path, manifest_path, mttr_json_path, parseable_json_path, schema_path, stream_json_path, to_bytes, @@ -1125,6 +1126,63 @@ impl Metastore for ObjectStoreMetastore { .await?) } + /// api keys + async fn get_api_keys(&self) -> Result>, MetastoreError> { + let base_paths = PARSEABLE.list_tenants().unwrap_or_else(|| vec!["".into()]); + let mut all_keys = HashMap::new(); + for mut tenant in base_paths { + let keys_path = RelativePathBuf::from_iter([ + &tenant, + SETTINGS_ROOT_DIRECTORY, + APIKEYS_ROOT_DIRECTORY, + ]); + let keys = self + .storage + .get_objects( + Some(&keys_path), + Box::new(|file_name| file_name.ends_with(".json")), + &Some(tenant.clone()), + ) + .await? + .iter() + .filter_map(|bytes| { + serde_json::from_slice(bytes) + .inspect_err(|err| warn!("Expected compatible api key json, error = {err}")) + .ok() + }) + .collect(); + if tenant.is_empty() { + tenant.clone_from(&DEFAULT_TENANT.to_string()); + } + all_keys.insert(tenant, keys); + } + Ok(all_keys) + } + + async fn put_api_key( + &self, + obj: &dyn MetastoreObject, + tenant_id: &Option, + ) -> Result<(), MetastoreError> { + let path = obj.get_object_path(); + Ok(self + .storage + .put_object(&RelativePathBuf::from(path), to_bytes(obj), tenant_id) + .await?) + } + + async fn delete_api_key( + &self, + obj: &dyn MetastoreObject, + tenant_id: &Option, + ) -> Result<(), MetastoreError> { + let path = obj.get_object_path(); + Ok(self + .storage + .delete_object(&RelativePathBuf::from(path), tenant_id) + .await?) + } + async fn get_all_schemas( &self, stream_name: &str, diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 1694e5f5b..66b6e2625 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -67,6 +67,7 @@ pub const SCHEMA_FILE_NAME: &str = ".schema"; pub const ALERTS_ROOT_DIRECTORY: &str = ".alerts"; pub const SETTINGS_ROOT_DIRECTORY: &str = ".settings"; pub const TARGETS_ROOT_DIRECTORY: &str = ".targets"; +pub const APIKEYS_ROOT_DIRECTORY: &str = "apikeys"; pub const MANIFEST_FILE: &str = "manifest.json"; // max concurrent request allowed for datafusion object store diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 67aeac609..fb6651c76 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -58,6 +58,7 @@ use crate::option::Mode; use crate::parseable::DEFAULT_TENANT; use crate::parseable::{LogStream, PARSEABLE, Stream}; use crate::stats::FullStats; +use crate::storage::APIKEYS_ROOT_DIRECTORY; use crate::storage::SETTINGS_ROOT_DIRECTORY; use crate::storage::TARGETS_ROOT_DIRECTORY; use crate::storage::field_stats::DATASET_STATS_STREAM_NAME; @@ -1347,6 +1348,19 @@ pub fn mttr_json_path(tenant_id: &Option) -> RelativePathBuf { } } +/// Constructs the path for storing API key JSON files +/// Format: "{tenant}/.settings/apikeys/{key_id}.json" +#[inline(always)] +pub fn apikey_json_path(key_id: &Ulid, tenant_id: &Option) -> RelativePathBuf { + let root = tenant_id.as_deref().unwrap_or(""); + RelativePathBuf::from_iter([ + root, + SETTINGS_ROOT_DIRECTORY, + APIKEYS_ROOT_DIRECTORY, + &format!("{key_id}.json"), + ]) +} + #[inline(always)] pub fn manifest_path(prefix: &str) -> RelativePathBuf { let hostname = hostname::get() From cab660c11bf4e93b028b3b1c145fbddf266b94eb Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 13 Apr 2026 22:33:00 +1000 Subject: [PATCH 2/7] uuid for api key --- Cargo.lock | 1 + Cargo.toml | 1 + src/apikeys.rs | 22 +--------------------- 3 files changed, 3 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 06309f5f8..efe7805cc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3780,6 +3780,7 @@ dependencies = [ "uptime_lib", "ureq", "url", + "uuid", "vergen-gitcl", "xxhash-rust", "zip", diff --git a/Cargo.toml b/Cargo.toml index 13ffb0070..2ef573bf4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -158,6 +158,7 @@ semver = "1.0" static-files = "0.2" thiserror = "2.0" ulid = { version = "1.0", features = ["serde"] } +uuid = { version = "1", features = ["v4"] } xxhash-rust = { version = "0.8", features = ["xxh3"] } futures-core = "0.3.31" tempfile = "3.20.0" diff --git a/src/apikeys.rs b/src/apikeys.rs index 11f66f6ad..e2fe42ba2 100644 --- a/src/apikeys.rs +++ b/src/apikeys.rs @@ -20,7 +20,6 @@ use std::collections::HashMap; use chrono::{DateTime, Utc}; use once_cell::sync::Lazy; -use rand::Rng; use serde::{Deserialize, Serialize}; use tokio::sync::RwLock; use ulid::Ulid; @@ -77,7 +76,7 @@ impl ApiKey { let now = Utc::now(); Self { key_id: Ulid::new(), - api_key: generate_uuid_v4(), + api_key: uuid::Uuid::new_v4().to_string(), key_name, created_by, created_at: now, @@ -114,25 +113,6 @@ impl MetastoreObject for ApiKey { } } -/// Generate a UUID v4 formatted string using rand -fn generate_uuid_v4() -> String { - let mut rng = rand::thread_rng(); - let mut bytes = [0u8; 16]; - rng.fill(&mut bytes); - // Set version 4 (bits 12-15 of time_hi_and_version) - bytes[6] = (bytes[6] & 0x0f) | 0x40; - // Set variant 1 (bits 6-7 of clock_seq_hi_and_reserved) - bytes[8] = (bytes[8] & 0x3f) | 0x80; - format!( - "{:02x}{:02x}{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}", - bytes[0], bytes[1], bytes[2], bytes[3], - bytes[4], bytes[5], - bytes[6], bytes[7], - bytes[8], bytes[9], - bytes[10], bytes[11], bytes[12], bytes[13], bytes[14], bytes[15] - ) -} - impl ApiKeyStore { /// Load API keys from object store into memory pub async fn load(&self) -> anyhow::Result<()> { From 203aeaab60d44c4f202011a551bab59a531bf748 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 13 Apr 2026 22:38:26 +1000 Subject: [PATCH 3/7] clippy and fmt fixes --- src/apikeys.rs | 35 +++++++++++++++-------------------- src/lib.rs | 2 +- 2 files changed, 16 insertions(+), 21 deletions(-) diff --git a/src/apikeys.rs b/src/apikeys.rs index e2fe42ba2..2994f623b 100644 --- a/src/apikeys.rs +++ b/src/apikeys.rs @@ -138,13 +138,10 @@ impl ApiKeyStore { // Hold write lock for the entire operation to prevent TOCTOU race // on duplicate name check let mut map = self.keys.write().await; - if let Some(tenant_keys) = map.get(tenant) { - if tenant_keys - .values() - .any(|k| k.key_name == api_key.key_name) - { - return Err(ApiKeyError::DuplicateKeyName(api_key.key_name)); - } + if let Some(tenant_keys) = map.get(tenant) + && tenant_keys.values().any(|k| k.key_name == api_key.key_name) + { + return Err(ApiKeyError::DuplicateKeyName(api_key.key_name)); } PARSEABLE @@ -230,26 +227,18 @@ impl ApiKeyStore { /// Validate an API key for ingestion. Returns true if the key is valid. /// For multi-tenant: checks the key belongs to the specified tenant. /// For single-tenant: checks the key exists globally. - pub async fn validate_key( - &self, - api_key_value: &str, - tenant_id: &Option, - ) -> bool { + pub async fn validate_key(&self, api_key_value: &str, tenant_id: &Option) -> bool { let map = self.keys.read().await; if let Some(tenant_id) = tenant_id { // Multi-tenant: check keys for the specific tenant if let Some(tenant_keys) = map.get(tenant_id) { - return tenant_keys - .values() - .any(|k| k.api_key == api_key_value); + return tenant_keys.values().any(|k| k.api_key == api_key_value); } false } else { // Single-tenant: check keys under DEFAULT_TENANT if let Some(tenant_keys) = map.get(DEFAULT_TENANT) { - return tenant_keys - .values() - .any(|k| k.api_key == api_key_value); + return tenant_keys.values().any(|k| k.api_key == api_key_value); } false } @@ -257,9 +246,15 @@ impl ApiKeyStore { /// Insert an API key directly into memory (used for sync from prism) pub async fn sync_put(&self, api_key: ApiKey) { - let tenant = api_key.tenant.as_deref().unwrap_or(DEFAULT_TENANT).to_owned(); + let tenant = api_key + .tenant + .as_deref() + .unwrap_or(DEFAULT_TENANT) + .to_owned(); let mut map = self.keys.write().await; - map.entry(tenant).or_default().insert(api_key.key_id, api_key); + map.entry(tenant) + .or_default() + .insert(api_key.key_id, api_key); } /// Remove an API key from memory (used for sync from prism) diff --git a/src/lib.rs b/src/lib.rs index e7859dc38..16e97e2ee 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,8 +18,8 @@ pub mod about; pub mod alerts; -pub mod apikeys; pub mod analytics; +pub mod apikeys; pub mod banner; pub mod catalog; mod cli; From b15e706e55037c1c3d31beab202a24fae7099d0e Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Tue, 14 Apr 2026 00:35:35 +1000 Subject: [PATCH 4/7] deepsource and coderabbit review comments --- src/apikeys.rs | 6 ++- src/handlers/http/middleware.rs | 82 +++++++++++++++++---------------- 2 files changed, 47 insertions(+), 41 deletions(-) diff --git a/src/apikeys.rs b/src/apikeys.rs index 2994f623b..387774827 100644 --- a/src/apikeys.rs +++ b/src/apikeys.rs @@ -122,7 +122,11 @@ impl ApiKeyStore { let inner = keys .into_iter() .map(|mut k| { - k.tenant = Some(tenant_id.clone()); + k.tenant = if tenant_id == DEFAULT_TENANT { + None + } else { + Some(tenant_id.clone()) + }; (k.key_id, k) }) .collect(); diff --git a/src/handlers/http/middleware.rs b/src/handlers/http/middleware.rs index 30b8adebe..6425e1698 100644 --- a/src/handlers/http/middleware.rs +++ b/src/handlers/http/middleware.rs @@ -147,52 +147,17 @@ where forward_ready!(service); fn call(&self, mut req: ServiceRequest) -> Self::Future { - /*Below section is added to extract the Authorization and X-P-Stream headers from x-amz-firehose-common-attributes custom header - when request is made from Kinesis Firehose. - For requests made from other clients, no change. - - ## Section start */ - if self.action.eq(&Action::Ingest) - && let Some(kinesis_common_attributes) = - req.request().headers().get(KINESIS_COMMON_ATTRIBUTES_KEY) - && let Ok(attribute_value) = kinesis_common_attributes.to_str() - && let Ok(message) = serde_json::from_str::(attribute_value) - && let Ok(auth_value) = - header::HeaderValue::from_str(&message.common_attributes.authorization) - && let Ok(stream_name_key) = - header::HeaderValue::from_str(&message.common_attributes.x_p_stream) - { - req.headers_mut() - .insert(HeaderName::from_static(AUTHORIZATION_KEY), auth_value); - req.headers_mut().insert( - HeaderName::from_static(STREAM_NAME_HEADER_KEY), - stream_name_key, - ); - req.headers_mut().insert( - HeaderName::from_static(LOG_SOURCE_KEY), - header::HeaderValue::from_static(LOG_SOURCE_KINESIS), - ); + // Extract Kinesis Firehose headers if applicable + if self.action.eq(&Action::Ingest) { + extract_kinesis_headers(&mut req); } - /* ## Section end */ - // if action is Ingest and multi-tenancy is on, then request MUST have tenant id - // else check for the presence of tenant id using other details // an optional error to track the presence of CORRECT tenant header in case of ingestion let mut header_error = None; let user_and_tenant_id = get_user_and_tenant(&self.action, &mut req, &mut header_error); - // Check for X-API-KEY header for ingestion - let api_key_value = if self.action.eq(&Action::Ingest) { - req.headers() - .get("x-api-key") - .and_then(|v| v.to_str().ok()) - .map(String::from) - } else { - None - }; - - // If API key auth is being used, short-circuit the normal auth flow - if let Some(api_key) = api_key_value { + // If X-API-KEY header is present for ingestion, short-circuit normal auth + if let Some(api_key) = extract_api_key(&req, &self.action) { let suspension = check_suspension(req.request(), self.action); let tenant_id = req .headers() @@ -273,6 +238,43 @@ where } } +/// Extract Kinesis Firehose headers (Authorization, X-P-Stream) from +/// the x-amz-firehose-common-attributes custom header when present. +fn extract_kinesis_headers(req: &mut ServiceRequest) { + if let Some(kinesis_common_attributes) = + req.request().headers().get(KINESIS_COMMON_ATTRIBUTES_KEY) + && let Ok(attribute_value) = kinesis_common_attributes.to_str() + && let Ok(message) = serde_json::from_str::(attribute_value) + && let Ok(auth_value) = + header::HeaderValue::from_str(&message.common_attributes.authorization) + && let Ok(stream_name_key) = + header::HeaderValue::from_str(&message.common_attributes.x_p_stream) + { + req.headers_mut() + .insert(HeaderName::from_static(AUTHORIZATION_KEY), auth_value); + req.headers_mut().insert( + HeaderName::from_static(STREAM_NAME_HEADER_KEY), + stream_name_key, + ); + req.headers_mut().insert( + HeaderName::from_static(LOG_SOURCE_KEY), + header::HeaderValue::from_static(LOG_SOURCE_KINESIS), + ); + } +} + +/// Extract X-API-KEY header value if present and action is Ingest. +fn extract_api_key(req: &ServiceRequest, action: &Action) -> Option { + if action.eq(&Action::Ingest) { + req.headers() + .get("x-api-key") + .and_then(|v| v.to_str().ok()) + .map(String::from) + } else { + None + } +} + #[inline] fn get_user_and_tenant( action: &Action, From 63b62c8c540bc0b71fc509cc7c12fd9164103d44 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 22 Apr 2026 19:54:02 +0700 Subject: [PATCH 5/7] key type to accomodate query via api key --- src/apikeys.rs | 56 ++++++++++++++----- src/handlers/http/middleware.rs | 99 ++++++++++++++++++++++++++------- 2 files changed, 119 insertions(+), 36 deletions(-) diff --git a/src/apikeys.rs b/src/apikeys.rs index 387774827..1479a65b9 100644 --- a/src/apikeys.rs +++ b/src/apikeys.rs @@ -39,12 +39,24 @@ pub struct ApiKeyStore { pub keys: RwLock>>, } +/// Type of API key, determining how it can be used. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum KeyType { + /// Used as a substitute for basic auth on ingestion endpoints + Ingestion, + /// Used as a substitute for basic auth on query endpoints (global query access) + Query, +} + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct ApiKey { pub key_id: Ulid, pub api_key: String, pub key_name: String, + #[serde(default = "default_key_type")] + pub key_type: KeyType, pub created_by: String, pub created_at: DateTime, pub modified_at: DateTime, @@ -52,11 +64,17 @@ pub struct ApiKey { pub tenant: Option, } +fn default_key_type() -> KeyType { + KeyType::Ingestion +} + /// Request body for creating a new API key #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase")] pub struct CreateApiKeyRequest { pub key_name: String, + #[serde(default = "default_key_type")] + pub key_type: KeyType, } /// Response for list keys (api_key masked to last 4 chars) @@ -66,18 +84,25 @@ pub struct ApiKeyListEntry { pub key_id: Ulid, pub api_key: String, pub key_name: String, + pub key_type: KeyType, pub created_by: String, pub created_at: DateTime, pub modified_at: DateTime, } impl ApiKey { - pub fn new(key_name: String, created_by: String, tenant: Option) -> Self { + pub fn new( + key_name: String, + key_type: KeyType, + created_by: String, + tenant: Option, + ) -> Self { let now = Utc::now(); Self { key_id: Ulid::new(), api_key: uuid::Uuid::new_v4().to_string(), key_name, + key_type, created_by, created_at: now, modified_at: now, @@ -96,6 +121,7 @@ impl ApiKey { key_id: self.key_id, api_key: masked, key_name: self.key_name.clone(), + key_type: self.key_type, created_by: self.created_by.clone(), created_at: self.created_at, modified_at: self.modified_at, @@ -228,24 +254,24 @@ impl ApiKeyStore { .ok_or_else(|| ApiKeyError::KeyNotFound(key_id.to_string())) } - /// Validate an API key for ingestion. Returns true if the key is valid. + /// Validate an API key against a required key type. Returns true if the + /// key is valid AND its type matches the required type. /// For multi-tenant: checks the key belongs to the specified tenant. /// For single-tenant: checks the key exists globally. - pub async fn validate_key(&self, api_key_value: &str, tenant_id: &Option) -> bool { + pub async fn validate_key( + &self, + api_key_value: &str, + tenant_id: &Option, + required_type: KeyType, + ) -> bool { let map = self.keys.read().await; - if let Some(tenant_id) = tenant_id { - // Multi-tenant: check keys for the specific tenant - if let Some(tenant_keys) = map.get(tenant_id) { - return tenant_keys.values().any(|k| k.api_key == api_key_value); - } - false - } else { - // Single-tenant: check keys under DEFAULT_TENANT - if let Some(tenant_keys) = map.get(DEFAULT_TENANT) { - return tenant_keys.values().any(|k| k.api_key == api_key_value); - } - false + let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); + if let Some(tenant_keys) = map.get(tenant) { + return tenant_keys + .values() + .any(|k| k.api_key == api_key_value && k.key_type == required_type); } + false } /// Insert an API key directly into memory (used for sync from prism) diff --git a/src/handlers/http/middleware.rs b/src/handlers/http/middleware.rs index 6425e1698..7799f21e8 100644 --- a/src/handlers/http/middleware.rs +++ b/src/handlers/http/middleware.rs @@ -156,29 +156,69 @@ where let mut header_error = None; let user_and_tenant_id = get_user_and_tenant(&self.action, &mut req, &mut header_error); - // If X-API-KEY header is present for ingestion, short-circuit normal auth - if let Some(api_key) = extract_api_key(&req, &self.action) { + // If X-API-KEY header is present and the action supports API key auth, + // short-circuit the normal auth flow. + if let Some(api_key) = extract_api_key(&req) + && let Some(required_type) = api_key_type_for_action(&self.action) + { + struct SessionCleanupGuard(Option); + impl Drop for SessionCleanupGuard { + fn drop(&mut self) { + if let Some(sid) = self.0 { + mut_sessions().remove_session(&SessionKey::SessionId(sid)); + } + } + } + let suspension = check_suspension(req.request(), self.action); let tenant_id = req .headers() .get(TENANT_ID) .and_then(|v| v.to_str().ok()) .map(String::from); + + // For Query keys, set up a short-lived session with global reader + // permissions so the query handler's per-stream auth passes. + let query_session_id = if required_type == crate::apikeys::KeyType::Query { + let session_id = Ulid::new(); + let session_key = SessionKey::SessionId(session_id); + mut_sessions().track_new( + format!("api-key:{session_id}"), + session_key.clone(), + Utc::now() + TimeDelta::minutes(5), + query_api_key_permissions(), + &tenant_id, + ); + req.extensions_mut().insert(session_key); + Some(session_id) + } else { + None + }; + let fut = self.service.call(req); return Box::pin(async move { - if let Some(err) = header_error { - return Err(err); - } - if let rbac::Response::Suspended(msg) = suspension { - return Err(ErrorBadRequest(msg)); - } + let _guard = SessionCleanupGuard(query_session_id); + let result: Result, Error> = async { + if let Some(err) = header_error { + return Err(err); + } + if let rbac::Response::Suspended(msg) = suspension { + return Err(ErrorBadRequest(msg)); + } - use crate::apikeys::API_KEYS; - if API_KEYS.validate_key(&api_key, &tenant_id).await { - return fut.await; + use crate::apikeys::API_KEYS; + if API_KEYS + .validate_key(&api_key, &tenant_id, required_type) + .await + { + return fut.await; + } + Err(ErrorUnauthorized("Invalid API key")) } - Err(ErrorUnauthorized("Invalid API key")) + .await; + + result }); } @@ -263,18 +303,35 @@ fn extract_kinesis_headers(req: &mut ServiceRequest) { } } -/// Extract X-API-KEY header value if present and action is Ingest. -fn extract_api_key(req: &ServiceRequest, action: &Action) -> Option { - if action.eq(&Action::Ingest) { - req.headers() - .get("x-api-key") - .and_then(|v| v.to_str().ok()) - .map(String::from) - } else { - None +/// Extract X-API-KEY header value if present (independent of action). +fn extract_api_key(req: &ServiceRequest) -> Option { + req.headers() + .get("x-api-key") + .and_then(|v| v.to_str().ok()) + .map(String::from) +} + +/// Map an Action to the KeyType required for API key auth on that action. +/// Returns None if the action doesn't support API key auth. +fn api_key_type_for_action(action: &Action) -> Option { + use crate::apikeys::KeyType; + match action { + Action::Ingest => Some(KeyType::Ingestion), + Action::Query => Some(KeyType::Query), + _ => None, } } +/// Build the set of permissions granted to a Query API key session. +/// Equivalent to the Reader privilege with no resource restriction +/// (global query access across all streams in the tenant). +fn query_api_key_permissions() -> Vec { + crate::rbac::role::RoleBuilder::from(&crate::rbac::role::model::DefaultPrivilege::Reader { + resource: None, + }) + .build() +} + #[inline] fn get_user_and_tenant( action: &Action, From 72286a27493b263792e3f0b9373af6e245ad50ea Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 22 Apr 2026 21:30:12 +0700 Subject: [PATCH 6/7] fix comments --- src/apikeys.rs | 35 ++++++++++++++++++++++----------- src/handlers/http/middleware.rs | 35 +++++++++++++++++++++------------ 2 files changed, 46 insertions(+), 24 deletions(-) diff --git a/src/apikeys.rs b/src/apikeys.rs index 1479a65b9..38da68ef1 100644 --- a/src/apikeys.rs +++ b/src/apikeys.rs @@ -164,24 +164,37 @@ impl ApiKeyStore { /// Create a new API key pub async fn create(&self, api_key: ApiKey) -> Result<(), ApiKeyError> { let tenant = api_key.tenant.as_deref().unwrap_or(DEFAULT_TENANT); + let key_id = api_key.key_id; - // Hold write lock for the entire operation to prevent TOCTOU race - // on duplicate name check - let mut map = self.keys.write().await; - if let Some(tenant_keys) = map.get(tenant) - && tenant_keys.values().any(|k| k.key_name == api_key.key_name) + // Check duplicate name and reserve the slot under the write lock, + // then drop the lock before the async metastore call so we don't + // hold a global lock across an await. { - return Err(ApiKeyError::DuplicateKeyName(api_key.key_name)); + let mut map = self.keys.write().await; + if let Some(tenant_keys) = map.get(tenant) + && tenant_keys.values().any(|k| k.key_name == api_key.key_name) + { + return Err(ApiKeyError::DuplicateKeyName(api_key.key_name)); + } + map.entry(tenant.to_owned()) + .or_default() + .insert(key_id, api_key.clone()); } - PARSEABLE + // Persist to storage without holding the lock. On failure, remove + // the reservation so stale entries don't linger in memory. + if let Err(e) = PARSEABLE .metastore .put_api_key(&api_key, &api_key.tenant) - .await?; + .await + { + let mut map = self.keys.write().await; + if let Some(tenant_keys) = map.get_mut(tenant) { + tenant_keys.remove(&key_id); + } + return Err(e.into()); + } - map.entry(tenant.to_owned()) - .or_default() - .insert(api_key.key_id, api_key); Ok(()) } diff --git a/src/handlers/http/middleware.rs b/src/handlers/http/middleware.rs index 7799f21e8..412a17e50 100644 --- a/src/handlers/http/middleware.rs +++ b/src/handlers/http/middleware.rs @@ -152,6 +152,15 @@ where extract_kinesis_headers(&mut req); } + // Capture the incoming tenant header value before `get_user_and_tenant` + // potentially mutates it, so the API-key branch below sees the original + // client-supplied value. + let tenant_id_before = req + .headers() + .get(TENANT_ID) + .and_then(|v| v.to_str().ok()) + .map(String::from); + // an optional error to track the presence of CORRECT tenant header in case of ingestion let mut header_error = None; let user_and_tenant_id = get_user_and_tenant(&self.action, &mut req, &mut header_error); @@ -170,12 +179,8 @@ where } } - let suspension = check_suspension(req.request(), self.action); - let tenant_id = req - .headers() - .get(TENANT_ID) - .and_then(|v| v.to_str().ok()) - .map(String::from); + let tenant_id = tenant_id_before; + let suspension = check_suspension_for_tenant(tenant_id.as_deref(), self.action); // For Query keys, set up a short-lived session with global reader // permissions so the query handler's per-stream auth passes. @@ -485,14 +490,18 @@ pub async fn refresh_token( #[inline(always)] pub fn check_suspension(req: &HttpRequest, action: Action) -> rbac::Response { - if let Some(tenant) = req.headers().get(TENANT_ID) - && let Ok(tenant) = tenant.to_str() + let tenant = req.headers().get(TENANT_ID).and_then(|v| v.to_str().ok()); + check_suspension_for_tenant(tenant, action) +} + +/// Variant of check_suspension that takes the tenant id directly. +/// Useful when the caller has already captured the tenant from the request +/// (e.g., before another function may have mutated the TENANT_ID header). +pub fn check_suspension_for_tenant(tenant: Option<&str>, action: Action) -> rbac::Response { + if let Some(tenant) = tenant + && let Ok(Some(suspension)) = TENANT_METADATA.is_action_suspended(tenant, &action) { - if let Ok(Some(suspension)) = TENANT_METADATA.is_action_suspended(tenant, &action) { - return rbac::Response::Suspended(suspension); - } else { - // tenant does not exist - } + return rbac::Response::Suspended(suspension); } rbac::Response::Authorized } From aa93cca62f9a253ac3b9d4a760f976054f518bf8 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 22 Apr 2026 22:11:31 +0700 Subject: [PATCH 7/7] middleware session is registered only after validation succeeds --- src/handlers/http/middleware.rs | 69 +++++++++++++++++++-------------- 1 file changed, 40 insertions(+), 29 deletions(-) diff --git a/src/handlers/http/middleware.rs b/src/handlers/http/middleware.rs index 412a17e50..ef23bcecb 100644 --- a/src/handlers/http/middleware.rs +++ b/src/handlers/http/middleware.rs @@ -32,6 +32,7 @@ use once_cell::sync::OnceCell; use ulid::Ulid; use crate::{ + apikeys::API_KEYS, handlers::{ AUTHORIZATION_KEY, KINESIS_COMMON_ATTRIBUTES_KEY, LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, STREAM_NAME_HEADER_KEY, TENANT_ID, @@ -182,19 +183,16 @@ where let tenant_id = tenant_id_before; let suspension = check_suspension_for_tenant(tenant_id.as_deref(), self.action); - // For Query keys, set up a short-lived session with global reader - // permissions so the query handler's per-stream auth passes. + // For Query keys we pre-allocate a session id and stash the + // SessionKey in req.extensions so the downstream handler can find + // it via extract_session_key_from_req. The actual session + // registration (track_new) is deferred until after the API key + // is validated, so failed requests don't mutate the shared + // sessions map. let query_session_id = if required_type == crate::apikeys::KeyType::Query { let session_id = Ulid::new(); - let session_key = SessionKey::SessionId(session_id); - mut_sessions().track_new( - format!("api-key:{session_id}"), - session_key.clone(), - Utc::now() + TimeDelta::minutes(5), - query_api_key_permissions(), - &tenant_id, - ); - req.extensions_mut().insert(session_key); + req.extensions_mut() + .insert(SessionKey::SessionId(session_id)); Some(session_id) } else { None @@ -203,27 +201,40 @@ where let fut = self.service.call(req); return Box::pin(async move { - let _guard = SessionCleanupGuard(query_session_id); - let result: Result, Error> = async { - if let Some(err) = header_error { - return Err(err); - } - if let rbac::Response::Suspended(msg) = suspension { - return Err(ErrorBadRequest(msg)); - } + // Guard starts with None — only set to Some(session_id) once + // we've actually called track_new, so invalid/failed requests + // don't trigger a spurious remove_session. + let mut guard = SessionCleanupGuard(None); - use crate::apikeys::API_KEYS; - if API_KEYS - .validate_key(&api_key, &tenant_id, required_type) - .await - { - return fut.await; - } - Err(ErrorUnauthorized("Invalid API key")) + if let Some(err) = header_error { + return Err(err); + } + if let rbac::Response::Suspended(msg) = suspension { + return Err(ErrorBadRequest(msg)); + } + + if !API_KEYS + .validate_key(&api_key, &tenant_id, required_type) + .await + { + return Err(ErrorUnauthorized("Invalid API key")); + } + + // Key validated — register the session in the shared + // sessions map. Arm the cleanup guard so the session is + // removed when this request completes. + if let Some(session_id) = query_session_id { + mut_sessions().track_new( + format!("api-key:{session_id}"), + SessionKey::SessionId(session_id), + Utc::now() + TimeDelta::minutes(5), + query_api_key_permissions(), + &tenant_id, + ); + guard.0 = Some(session_id); } - .await; - result + fut.await }); }