From 04f2fb8a04407875322c37b6bb2eb41faed4a689 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sat, 2 May 2026 10:43:23 +0700 Subject: [PATCH 1/4] escape rename check for static schema streams --- src/event/format/json.rs | 54 +++++++++++++++++++++------------------- 1 file changed, 29 insertions(+), 25 deletions(-) diff --git a/src/event/format/json.rs b/src/event/format/json.rs index b04ab09e9..cc4ca90f3 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -94,33 +94,37 @@ impl EventFormat for Event { })? }; - // Detect schema conflicts using raw inferred schema vs existing stream schema - // Pass the actual values and schema_version to check if values can be coerced to existing types - let conflicts = detect_schema_conflicts( - &raw_inferred_schema, - stream_schema, - &value_arr, - schema_version, - ); - - // If there are conflicts, rename the fields in JSON values - let value_arr = if !conflicts.is_empty() { - rename_conflicting_fields_in_json(value_arr, &conflicts) - } else { + let value_arr = if static_schema_flag { value_arr - }; + } else { + // Detect schema conflicts using raw inferred schema vs existing stream schema + // Pass the actual values and schema_version to check if values can be coerced to existing types + let conflicts = detect_schema_conflicts( + &raw_inferred_schema, + stream_schema, + &value_arr, + schema_version, + ); - // Per-record fallback: catches batches with mixed JSON types for the - // same field, which the batch-level detect_schema_conflicts misses - // because arrow's inference picks one winning type (string over bool). - // Internally short-circuits when this can't apply (single-record - // batches, or no field-name collision at the same type). - let value_arr = super::rename_per_record_type_mismatches( - value_arr, - &raw_inferred_schema, - stream_schema, - schema_version, - ); + // If there are conflicts, rename the fields in JSON values + let value_arr = if !conflicts.is_empty() { + rename_conflicting_fields_in_json(value_arr, &conflicts) + } else { + value_arr + }; + + // Per-record fallback: catches batches with mixed JSON types for the + // same field, which the batch-level detect_schema_conflicts misses + // because arrow's inference picks one winning type (string over bool). + // Internally short-circuits when this can't apply (single-record + // batches, or no field-name collision at the same type). + super::rename_per_record_type_mismatches( + value_arr, + &raw_inferred_schema, + stream_schema, + schema_version, + ) + }; // collect all the keys from all the json objects in the request body let fields = From 8838f6dd3c7571e206287323e58ebd73f9ea486f Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sat, 2 May 2026 14:03:37 +0700 Subject: [PATCH 2/4] catch within-batch type mismatches on new fields remove time partition headers from create/update logstream --- src/event/format/mod.rs | 36 +++++++++++-------- .../http/modal/utils/logstream_utils.rs | 10 ++---- src/parseable/mod.rs | 21 ++--------- 3 files changed, 25 insertions(+), 42 deletions(-) diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index 923fc3a9a..219d55c32 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -584,21 +584,22 @@ pub fn rename_per_record_type_mismatches( existing_schema: &HashMap>, schema_version: SchemaVersion, ) -> Vec { - if values.len() <= 1 || existing_schema.is_empty() { - return values; - } - // Bail out unless at least one inferred field collides with storage at - // the same type. Without that, arrow's inference can't have hidden a - // mixed-type batch behind a matching aggregate type. - let needs_check = inferred_schema.fields().iter().any(|f| { - existing_schema - .get(f.name()) - .is_some_and(|s| s.data_type() == f.data_type()) - }); - if !needs_check { + if values.len() <= 1 { return values; } + // Lookup of inferred field types — used as the reference type when a field + // isn't yet in storage (e.g. first batch for the stream, or a new column). + // Without this, mixed-type batches for new fields slip through: arrow picks + // a single aggregate type (Utf8 wins over Bool, etc.), the batch-level + // conflict check sees nothing to compare against in empty storage, and + // records carrying the loser type later fail `fields_mismatch`. + let inferred_types: HashMap<&str, &DataType> = inferred_schema + .fields() + .iter() + .map(|f| (f.name().as_str(), f.data_type())) + .collect(); + values .into_iter() .map(|value| { @@ -611,11 +612,16 @@ pub fn rename_per_record_type_mismatches( if val.is_null() { return (key, val); } - let Some(existing_field) = existing_schema.get(&key) else { + // Prefer storage's declared type; fall back to the inferred + // type so within-batch mismatches on new fields are caught. + let target_type = existing_schema + .get(&key) + .map(|f| f.data_type()) + .or_else(|| inferred_types.get(key.as_str()).copied()); + let Some(target_type) = target_type else { return (key, val); }; - if value_compatible_with_type(&val, existing_field.data_type(), schema_version) - { + if value_compatible_with_type(&val, target_type, schema_version) { return (key, val); } let suffix = get_datatype_suffix(&datatype_for_value(&val)); diff --git a/src/handlers/http/modal/utils/logstream_utils.rs b/src/handlers/http/modal/utils/logstream_utils.rs index 097e8a7c3..0a7b1b433 100644 --- a/src/handlers/http/modal/utils/logstream_utils.rs +++ b/src/handlers/http/modal/utils/logstream_utils.rs @@ -23,15 +23,14 @@ use crate::{ handlers::{ CUSTOM_PARTITION_KEY, DATASET_LABELS_KEY, DATASET_TAG_KEY, DATASET_TAGS_KEY, DatasetTag, INFER_TIMESTAMP_KEY, LOG_SOURCE_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY, - TELEMETRY_TYPE_KEY, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, TelemetryType, - UPDATE_STREAM_KEY, parse_dataset_labels, parse_dataset_tags, + TELEMETRY_TYPE_KEY, TIME_PARTITION_LIMIT_KEY, TelemetryType, UPDATE_STREAM_KEY, + parse_dataset_labels, parse_dataset_tags, }, storage::StreamType, }; #[derive(Debug)] pub struct PutStreamHeaders { - pub time_partition: String, pub time_partition_limit: String, pub custom_partition: Option, pub static_schema_flag: bool, @@ -49,7 +48,6 @@ pub struct PutStreamHeaders { impl Default for PutStreamHeaders { fn default() -> Self { Self { - time_partition: String::default(), time_partition_limit: String::default(), custom_partition: None, static_schema_flag: false, @@ -80,10 +78,6 @@ impl From<&HeaderMap> for PutStreamHeaders { .get(TELEMETRY_TYPE_KEY) .and_then(|v| v.to_str().ok()); PutStreamHeaders { - time_partition: headers - .get(TIME_PARTITION_KEY) - .map_or("", |v| v.to_str().unwrap()) - .to_string(), time_partition_limit: headers .get(TIME_PARTITION_LIMIT_KEY) .map_or("", |v| v.to_str().unwrap()) diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index 28fcd1e31..8d553fbc9 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -695,7 +695,6 @@ impl Parseable { tenant_id: &Option, ) -> Result { let PutStreamHeaders { - time_partition, time_partition_limit, custom_partition, static_schema_flag, @@ -763,7 +762,6 @@ impl Parseable { .update_stream( headers, stream_name, - &time_partition, static_schema_flag, &time_partition_limit, custom_partition.as_ref(), @@ -782,17 +780,10 @@ impl Parseable { validate_custom_partition(custom_partition)?; } - if !time_partition.is_empty() && custom_partition.is_some() { - return Err(StreamError::Custom { - msg: "Cannot set both time partition and custom partition".to_string(), - status: StatusCode::BAD_REQUEST, - }); - } - let schema = validate_static_schema( body, stream_name, - &time_partition, + "", custom_partition.as_ref(), static_schema_flag, )?; @@ -800,7 +791,7 @@ impl Parseable { let log_source_entry = LogSourceEntry::new(log_source, HashSet::new()); self.create_stream( stream_name.to_string(), - &time_partition, + "", time_partition_in_days, custom_partition.as_ref(), static_schema_flag, @@ -818,12 +809,10 @@ impl Parseable { Ok(headers.clone()) } - #[allow(clippy::too_many_arguments)] async fn update_stream( &self, headers: &HeaderMap, stream_name: &str, - time_partition: &str, static_schema_flag: bool, time_partition_limit: &str, custom_partition: Option<&String>, @@ -832,12 +821,6 @@ impl Parseable { if !self.streams.contains(stream_name, tenant_id) { return Err(StreamNotFound(stream_name.to_string()).into()); } - if !time_partition.is_empty() { - return Err(StreamError::Custom { - msg: "Altering the time partition of an existing stream is restricted.".to_string(), - status: StatusCode::BAD_REQUEST, - }); - } if static_schema_flag { return Err(StreamError::Custom { msg: "Altering the schema of an existing stream is restricted.".to_string(), From 8b290af0264d0d0e8c441aa9ad95e92f5ba07fe1 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sat, 2 May 2026 14:28:24 +0700 Subject: [PATCH 3/4] fix test --- src/event/format/mod.rs | 31 +++++++++++++++++-------------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index 219d55c32..8cfd12cb8 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -612,6 +612,13 @@ pub fn rename_per_record_type_mismatches( if val.is_null() { return (key, val); } + // Arrays and objects are validated structurally by arrow at + // decode time; value_compatible_with_type can't reliably + // judge them, so skip to avoid spurious renames of valid + // list/struct values. + if val.is_array() || val.is_object() { + return (key, val); + } // Prefer storage's declared type; fall back to the inferred // type so within-batch mismatches on new fields are caught. let target_type = existing_schema @@ -1168,26 +1175,22 @@ mod tests { } #[test] - fn rename_per_record_short_circuits_when_no_field_overlap_at_same_type() { - // No inferred field shares both name AND type with storage — - // arrow can't have absorbed a mixed-type batch, so we skip the loop. - let mut storage: HashMap> = HashMap::new(); - storage.insert( - "escaped".to_string(), - Arc::new(Field::new("escaped", DataType::Utf8, true)), - ); - // Inferred has a DIFFERENT type for the shared field — handled by - // detect_schema_conflicts as a batch-level rename, not per-record. - let inferred = Schema::new(vec![Field::new("escaped", DataType::Boolean, true)]); + fn rename_per_record_renames_against_inferred_when_field_absent_from_storage() { + // First-batch case: storage is empty, so the reference type for each + // field comes from arrow's batch-level inference. With mixed bool+string + // for `escaped`, arrow picks Utf8 (string wins), and the bool record + // must be rewritten so it routes to a typed sibling column rather than + // failing fields_mismatch later. + let storage: HashMap> = HashMap::new(); + let inferred = Schema::new(vec![Field::new("escaped", DataType::Utf8, true)]); let renamed = rename_per_record_type_mismatches( - vec![json!({"escaped": true}), json!({"escaped": false})], + vec![json!({"escaped": "true"}), json!({"escaped": false})], &inferred, &storage, SchemaVersion::V1, ); - // Per-record loop skipped; values pass through unchanged. assert!(renamed[0].as_object().unwrap().contains_key("escaped")); - assert!(renamed[1].as_object().unwrap().contains_key("escaped")); + assert!(renamed[1].as_object().unwrap().contains_key("escaped_bool")); } #[test] From 44df0d56274461b44caa5ee0e8ae3de4c19327df Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sat, 2 May 2026 16:08:40 +0700 Subject: [PATCH 4/4] handle array/object collision --- src/event/format/mod.rs | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index 8cfd12cb8..5d8419a79 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -612,13 +612,6 @@ pub fn rename_per_record_type_mismatches( if val.is_null() { return (key, val); } - // Arrays and objects are validated structurally by arrow at - // decode time; value_compatible_with_type can't reliably - // judge them, so skip to avoid spurious renames of valid - // list/struct values. - if val.is_array() || val.is_object() { - return (key, val); - } // Prefer storage's declared type; fall back to the inferred // type so within-batch mismatches on new fields are caught. let target_type = existing_schema @@ -628,6 +621,19 @@ pub fn rename_per_record_type_mismatches( let Some(target_type) = target_type else { return (key, val); }; + // When the resolved target is a structural arrow type + // (list/struct/map), arrow validates conformance at decode + // time and value_compatible_with_type can't reliably judge + // arrays/objects — skip to avoid spurious renames of valid + // nested values. Scalars still flow through the check, so + // an array landing on e.g. a Utf8 column is still routed + // to a typed sibling rather than crashing decode. + if (val.is_array() || val.is_object()) + && (target_type.is_list() + || matches!(target_type, DataType::Struct(_) | DataType::Map(_, _))) + { + return (key, val); + } if value_compatible_with_type(&val, target_type, schema_version) { return (key, val); }