From 4e9bb6ebd206a791b59839a395f3cab1c29b5dbc Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 10 Feb 2025 02:28:49 +0530 Subject: [PATCH 1/2] refactor: process ain't async --- src/connectors/kafka/processor.rs | 5 +---- src/event/mod.rs | 2 +- src/handlers/http/ingest.rs | 4 ++-- src/handlers/http/modal/utils/ingest_utils.rs | 3 +-- 4 files changed, 5 insertions(+), 9 deletions(-) diff --git a/src/connectors/kafka/processor.rs b/src/connectors/kafka/processor.rs index e06835121..b5b8f4fe4 100644 --- a/src/connectors/kafka/processor.rs +++ b/src/connectors/kafka/processor.rs @@ -103,10 +103,7 @@ impl Processor, ()> for ParseableSinkProcessor { let len = records.len(); debug!("Processing {} records", len); - self.build_event_from_chunk(&records) - .await? - .process() - .await?; + self.build_event_from_chunk(&records).await?.process()?; debug!("Processed {} records", len); Ok(()) diff --git a/src/event/mod.rs b/src/event/mod.rs index 1178c7138..eaa324699 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -46,7 +46,7 @@ pub struct Event { // Events holds the schema related to a each event for a single log stream impl Event { - pub async fn process(self) -> Result<(), EventError> { + pub fn process(self) -> Result<(), EventError> { let mut key = get_schema_key(&self.rb.schema().fields); if self.time_partition.is_some() { let parsed_timestamp_to_min = self.parsed_timestamp.format("%Y%m%dT%H%M").to_string(); diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index b3da07761..39b9184e5 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -103,8 +103,8 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result< custom_partition_values: HashMap::new(), stream_type: StreamType::Internal, } - .process() - .await?; + .process()?; + Ok(()) } diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 3a2b9c797..a3e9af22d 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -129,8 +129,7 @@ pub async fn push_logs( custom_partition_values, stream_type: StreamType::UserDefined, } - .process() - .await?; + .process()?; } Ok(()) } From 25747b8e9d9397694e2ee9425b24fd9ddee46b77 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 10 Feb 2025 02:35:34 +0530 Subject: [PATCH 2/2] refactor: retire `to_recordbatch` in favor of `to_event` --- src/connectors/kafka/processor.rs | 21 +- src/event/format/mod.rs | 39 +- src/handlers/http/ingest.rs | 1032 ++++++++--------- src/handlers/http/modal/utils/ingest_utils.rs | 58 +- 4 files changed, 567 insertions(+), 583 deletions(-) diff --git a/src/connectors/kafka/processor.rs b/src/connectors/kafka/processor.rs index b5b8f4fe4..e80915f0a 100644 --- a/src/connectors/kafka/processor.rs +++ b/src/connectors/kafka/processor.rs @@ -60,25 +60,18 @@ impl ParseableSinkProcessor { data: Value::Array(json_vec.to_vec()), }; - let (rb, is_first) = batch_json_event.into_recordbatch( + let p_event = batch_json_event.to_event( + stream_name, + total_payload_size, &schema, static_schema_flag, - time_partition.as_ref(), + Utc::now().naive_utc(), + time_partition, + HashMap::new(), schema_version, + StreamType::UserDefined, )?; - let p_event = ParseableEvent { - rb, - stream_name: stream_name.to_string(), - origin_format: "json", - origin_size: total_payload_size, - is_first_event: is_first, - parsed_timestamp: Utc::now().naive_utc(), - time_partition: None, - custom_partition_values: HashMap::new(), - stream_type: StreamType::UserDefined, - }; - Ok(p_event) } diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index c0a2ec323..2707b422b 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -26,12 +26,13 @@ use std::{ use anyhow::{anyhow, Error as AnyError}; use arrow_array::RecordBatch; use arrow_schema::{DataType, Field, Schema, TimeUnit}; -use chrono::DateTime; +use chrono::{DateTime, NaiveDateTime}; use serde::{Deserialize, Serialize}; use serde_json::Value; use crate::{ metadata::SchemaVersion, + storage::StreamType, utils::arrow::{get_field, get_timestamp_array, replace_columns}, }; @@ -105,15 +106,20 @@ pub trait EventFormat: Sized { fn decode(data: Self::Data, schema: Arc) -> Result; - fn into_recordbatch( + fn to_event( self, + stream_name: &str, + origin_size: u64, storage_schema: &HashMap>, static_schema_flag: bool, - time_partition: Option<&String>, + parsed_timestamp: NaiveDateTime, + time_partition: Option, + custom_partition_values: HashMap, schema_version: SchemaVersion, - ) -> Result<(RecordBatch, bool), AnyError> { - let (data, mut schema, is_first) = - self.to_data(storage_schema, time_partition, schema_version)?; + stream_type: StreamType, + ) -> Result { + let (data, mut schema, is_first_event) = + self.to_data(storage_schema, time_partition.as_ref(), schema_version)?; if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() { return Err(anyhow!( @@ -137,8 +143,13 @@ pub trait EventFormat: Sized { if !Self::is_schema_matching(new_schema.clone(), storage_schema, static_schema_flag) { return Err(anyhow!("Schema mismatch")); } - new_schema = - update_field_type_in_schema(new_schema, None, time_partition, None, schema_version); + new_schema = update_field_type_in_schema( + new_schema, + None, + time_partition.as_ref(), + None, + schema_version, + ); let mut rb = Self::decode(data, new_schema.clone())?; rb = replace_columns( @@ -148,7 +159,17 @@ pub trait EventFormat: Sized { &[Arc::new(get_timestamp_array(rb.num_rows()))], ); - Ok((rb, is_first)) + Ok(super::Event { + rb, + stream_name: stream_name.to_string(), + origin_format: "json", + origin_size, + is_first_event, + parsed_timestamp, + time_partition, + custom_partition_values, + stream_type, + }) } fn is_schema_matching( diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 39b9184e5..28bc72a9f 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -80,30 +80,27 @@ pub async fn ingest(req: HttpRequest, Json(json): Json) -> Result Result<(), PostError> { let size: usize = body.len(); let parsed_timestamp = Utc::now().naive_utc(); - let (rb, is_first) = { - let body_val: Value = serde_json::from_slice(&body)?; - let hash_map = STREAM_INFO.read().unwrap(); - let schema = hash_map - .get(&stream_name) - .ok_or(PostError::StreamNotFound(stream_name.clone()))? - .schema - .clone(); - let event = format::json::Event { data: body_val }; - // For internal streams, use old schema - event.into_recordbatch(&schema, false, None, SchemaVersion::V0)? - }; - event::Event { - rb, - stream_name, - origin_format: "json", - origin_size: size as u64, - is_first_event: is_first, - parsed_timestamp, - time_partition: None, - custom_partition_values: HashMap::new(), - stream_type: StreamType::Internal, - } - .process()?; + let body_val: Value = serde_json::from_slice(&body)?; + let hash_map = STREAM_INFO.read().unwrap(); + let schema = hash_map + .get(&stream_name) + .ok_or(PostError::StreamNotFound(stream_name.clone()))? + .schema + .clone(); + // For internal streams, use old schema + format::json::Event { data: body_val } + .to_event( + &stream_name, + size as u64, + &schema, + false, + parsed_timestamp, + None, + HashMap::new(), + SchemaVersion::V0, + StreamType::Internal, + )? + .process()?; Ok(()) } @@ -377,497 +374,496 @@ impl actix_web::ResponseError for PostError { } } -#[cfg(test)] -mod tests { - - use arrow::datatypes::Int64Type; - use arrow_array::{ArrayRef, Float64Array, Int64Array, ListArray, StringArray}; - use arrow_schema::{DataType, Field}; - use serde_json::json; - use std::{collections::HashMap, sync::Arc}; - - use crate::{ - handlers::http::modal::utils::ingest_utils::into_event_batch, - metadata::SchemaVersion, - utils::json::{convert_array_to_object, flatten::convert_to_array}, - }; - - trait TestExt { - fn as_int64_arr(&self) -> Option<&Int64Array>; - fn as_float64_arr(&self) -> Option<&Float64Array>; - fn as_utf8_arr(&self) -> Option<&StringArray>; - } - - impl TestExt for ArrayRef { - fn as_int64_arr(&self) -> Option<&Int64Array> { - self.as_any().downcast_ref() - } - - fn as_float64_arr(&self) -> Option<&Float64Array> { - self.as_any().downcast_ref() - } - - fn as_utf8_arr(&self) -> Option<&StringArray> { - self.as_any().downcast_ref() - } - } - - fn fields_to_map(iter: impl Iterator) -> HashMap> { - iter.map(|x| (x.name().clone(), Arc::new(x))).collect() - } - - #[test] - fn basic_object_into_rb() { - let json = json!({ - "c": 4.23, - "a": 1, - "b": "hello", - }); - - let (rb, _) = - into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap(); - - assert_eq!(rb.num_rows(), 1); - assert_eq!(rb.num_columns(), 4); - assert_eq!( - rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), - &Int64Array::from_iter([1]) - ); - assert_eq!( - rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), - &StringArray::from_iter_values(["hello"]) - ); - assert_eq!( - rb.column_by_name("c").unwrap().as_float64_arr().unwrap(), - &Float64Array::from_iter([4.23]) - ); - } - - #[test] - fn basic_object_with_null_into_rb() { - let json = json!({ - "a": 1, - "b": "hello", - "c": null - }); - - let (rb, _) = - into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap(); - - assert_eq!(rb.num_rows(), 1); - assert_eq!(rb.num_columns(), 3); - assert_eq!( - rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), - &Int64Array::from_iter([1]) - ); - assert_eq!( - rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), - &StringArray::from_iter_values(["hello"]) - ); - } - - #[test] - fn basic_object_derive_schema_into_rb() { - let json = json!({ - "a": 1, - "b": "hello", - }); - - let schema = fields_to_map( - [ - Field::new("a", DataType::Int64, true), - Field::new("b", DataType::Utf8, true), - Field::new("c", DataType::Float64, true), - ] - .into_iter(), - ); - - let (rb, _) = into_event_batch(json, schema, false, None, SchemaVersion::V0).unwrap(); - - assert_eq!(rb.num_rows(), 1); - assert_eq!(rb.num_columns(), 3); - assert_eq!( - rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), - &Int64Array::from_iter([1]) - ); - assert_eq!( - rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), - &StringArray::from_iter_values(["hello"]) - ); - } - - #[test] - fn basic_object_schema_mismatch() { - let json = json!({ - "a": 1, - "b": 1, // type mismatch - }); - - let schema = fields_to_map( - [ - Field::new("a", DataType::Int64, true), - Field::new("b", DataType::Utf8, true), - Field::new("c", DataType::Float64, true), - ] - .into_iter(), - ); - - assert!(into_event_batch(json, schema, false, None, SchemaVersion::V0,).is_err()); - } - - #[test] - fn empty_object() { - let json = json!({}); - - let schema = fields_to_map( - [ - Field::new("a", DataType::Int64, true), - Field::new("b", DataType::Utf8, true), - Field::new("c", DataType::Float64, true), - ] - .into_iter(), - ); - - let (rb, _) = into_event_batch(json, schema, false, None, SchemaVersion::V0).unwrap(); - - assert_eq!(rb.num_rows(), 1); - assert_eq!(rb.num_columns(), 1); - } - - #[test] - fn non_object_arr_is_err() { - let json = json!([1]); - - assert!(convert_array_to_object( - json, - None, - None, - None, - SchemaVersion::V0, - &crate::event::format::LogSource::default() - ) - .is_err()) - } - - #[test] - fn array_into_recordbatch_inffered_schema() { - let json = json!([ - { - "b": "hello", - }, - { - "b": "hello", - "a": 1, - "c": 1 - }, - { - "a": 1, - "b": "hello", - "c": null - }, - ]); - - let (rb, _) = - into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap(); - - assert_eq!(rb.num_rows(), 3); - assert_eq!(rb.num_columns(), 4); - - let schema = rb.schema(); - let fields = &schema.fields; - - assert_eq!(&*fields[1], &Field::new("a", DataType::Int64, true)); - assert_eq!(&*fields[2], &Field::new("b", DataType::Utf8, true)); - assert_eq!(&*fields[3], &Field::new("c", DataType::Int64, true)); - - assert_eq!( - rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), - &Int64Array::from(vec![None, Some(1), Some(1)]) - ); - assert_eq!( - rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), - &StringArray::from(vec![Some("hello"), Some("hello"), Some("hello"),]) - ); - assert_eq!( - rb.column_by_name("c").unwrap().as_int64_arr().unwrap(), - &Int64Array::from(vec![None, Some(1), None]) - ); - } - - #[test] - fn arr_with_null_into_rb() { - let json = json!([ - { - "c": null, - "b": "hello", - "a": null - }, - { - "a": 1, - "c": 1.22, - "b": "hello" - }, - { - "b": "hello", - "a": 1, - "c": null - }, - ]); - - let (rb, _) = - into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap(); - - assert_eq!(rb.num_rows(), 3); - assert_eq!(rb.num_columns(), 4); - assert_eq!( - rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), - &Int64Array::from(vec![None, Some(1), Some(1)]) - ); - assert_eq!( - rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), - &StringArray::from(vec![Some("hello"), Some("hello"), Some("hello"),]) - ); - assert_eq!( - rb.column_by_name("c").unwrap().as_float64_arr().unwrap(), - &Float64Array::from(vec![None, Some(1.22), None,]) - ); - } - - #[test] - fn arr_with_null_derive_schema_into_rb() { - let json = json!([ - { - "c": null, - "b": "hello", - "a": null - }, - { - "a": 1, - "c": 1.22, - "b": "hello" - }, - { - "b": "hello", - "a": 1, - "c": null - }, - ]); - - let schema = fields_to_map( - [ - Field::new("a", DataType::Int64, true), - Field::new("b", DataType::Utf8, true), - Field::new("c", DataType::Float64, true), - ] - .into_iter(), - ); - - let (rb, _) = into_event_batch(json, schema, false, None, SchemaVersion::V0).unwrap(); - - assert_eq!(rb.num_rows(), 3); - assert_eq!(rb.num_columns(), 4); - assert_eq!( - rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), - &Int64Array::from(vec![None, Some(1), Some(1)]) - ); - assert_eq!( - rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), - &StringArray::from(vec![Some("hello"), Some("hello"), Some("hello"),]) - ); - assert_eq!( - rb.column_by_name("c").unwrap().as_float64_arr().unwrap(), - &Float64Array::from(vec![None, Some(1.22), None,]) - ); - } - - #[test] - fn arr_schema_mismatch() { - let json = json!([ - { - "a": null, - "b": "hello", - "c": 1.24 - }, - { - "a": 1, - "b": "hello", - "c": 1 - }, - { - "a": 1, - "b": "hello", - "c": null - }, - ]); - - let schema = fields_to_map( - [ - Field::new("a", DataType::Int64, true), - Field::new("b", DataType::Utf8, true), - Field::new("c", DataType::Float64, true), - ] - .into_iter(), - ); - - assert!(into_event_batch(json, schema, false, None, SchemaVersion::V0,).is_err()); - } - - #[test] - fn arr_obj_with_nested_type() { - let json = json!([ - { - "a": 1, - "b": "hello", - }, - { - "a": 1, - "b": "hello", - }, - { - "a": 1, - "b": "hello", - "c": [{"a": 1}] - }, - { - "a": 1, - "b": "hello", - "c": [{"a": 1, "b": 2}] - }, - ]); - let flattened_json = convert_to_array( - convert_array_to_object( - json, - None, - None, - None, - SchemaVersion::V0, - &crate::event::format::LogSource::default(), - ) - .unwrap(), - ) - .unwrap(); - - let (rb, _) = into_event_batch( - flattened_json, - HashMap::default(), - false, - None, - SchemaVersion::V0, - ) - .unwrap(); - assert_eq!(rb.num_rows(), 4); - assert_eq!(rb.num_columns(), 5); - assert_eq!( - rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), - &Int64Array::from(vec![Some(1), Some(1), Some(1), Some(1)]) - ); - assert_eq!( - rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), - &StringArray::from(vec![ - Some("hello"), - Some("hello"), - Some("hello"), - Some("hello") - ]) - ); - - assert_eq!( - rb.column_by_name("c_a") - .unwrap() - .as_any() - .downcast_ref::() - .unwrap(), - &ListArray::from_iter_primitive::(vec![ - None, - None, - Some(vec![Some(1i64)]), - Some(vec![Some(1)]) - ]) - ); - - assert_eq!( - rb.column_by_name("c_b") - .unwrap() - .as_any() - .downcast_ref::() - .unwrap(), - &ListArray::from_iter_primitive::(vec![ - None, - None, - None, - Some(vec![Some(2i64)]) - ]) - ); - } - - #[test] - fn arr_obj_with_nested_type_v1() { - let json = json!([ - { - "a": 1, - "b": "hello", - }, - { - "a": 1, - "b": "hello", - }, - { - "a": 1, - "b": "hello", - "c": [{"a": 1}] - }, - { - "a": 1, - "b": "hello", - "c": [{"a": 1, "b": 2}] - }, - ]); - let flattened_json = convert_to_array( - convert_array_to_object( - json, - None, - None, - None, - SchemaVersion::V1, - &crate::event::format::LogSource::default(), - ) - .unwrap(), - ) - .unwrap(); - - let (rb, _) = into_event_batch( - flattened_json, - HashMap::default(), - false, - None, - SchemaVersion::V1, - ) - .unwrap(); - - assert_eq!(rb.num_rows(), 4); - assert_eq!(rb.num_columns(), 5); - assert_eq!( - rb.column_by_name("a").unwrap().as_float64_arr().unwrap(), - &Float64Array::from(vec![Some(1.0), Some(1.0), Some(1.0), Some(1.0)]) - ); - assert_eq!( - rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), - &StringArray::from(vec![ - Some("hello"), - Some("hello"), - Some("hello"), - Some("hello") - ]) - ); - - assert_eq!( - rb.column_by_name("c_a").unwrap().as_float64_arr().unwrap(), - &Float64Array::from(vec![None, None, Some(1.0), Some(1.0)]) - ); - - assert_eq!( - rb.column_by_name("c_b").unwrap().as_float64_arr().unwrap(), - &Float64Array::from(vec![None, None, None, Some(2.0)]) - ); - } -} +// #[cfg(test)] +// mod tests { + +// use arrow::datatypes::Int64Type; +// use arrow_array::{ArrayRef, Float64Array, Int64Array, ListArray, StringArray}; +// use arrow_schema::{DataType, Field}; +// use serde_json::json; +// use std::{collections::HashMap, sync::Arc}; + +// use crate::{ +// metadata::SchemaVersion, +// utils::json::{convert_array_to_object, flatten::convert_to_array}, +// }; + +// trait TestExt { +// fn as_int64_arr(&self) -> Option<&Int64Array>; +// fn as_float64_arr(&self) -> Option<&Float64Array>; +// fn as_utf8_arr(&self) -> Option<&StringArray>; +// } + +// impl TestExt for ArrayRef { +// fn as_int64_arr(&self) -> Option<&Int64Array> { +// self.as_any().downcast_ref() +// } + +// fn as_float64_arr(&self) -> Option<&Float64Array> { +// self.as_any().downcast_ref() +// } + +// fn as_utf8_arr(&self) -> Option<&StringArray> { +// self.as_any().downcast_ref() +// } +// } + +// fn fields_to_map(iter: impl Iterator) -> HashMap> { +// iter.map(|x| (x.name().clone(), Arc::new(x))).collect() +// } + +// #[test] +// fn basic_object_into_rb() { +// let json = json!({ +// "c": 4.23, +// "a": 1, +// "b": "hello", +// }); + +// let (rb, _) = +// into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap(); + +// assert_eq!(rb.num_rows(), 1); +// assert_eq!(rb.num_columns(), 4); +// assert_eq!( +// rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), +// &Int64Array::from_iter([1]) +// ); +// assert_eq!( +// rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), +// &StringArray::from_iter_values(["hello"]) +// ); +// assert_eq!( +// rb.column_by_name("c").unwrap().as_float64_arr().unwrap(), +// &Float64Array::from_iter([4.23]) +// ); +// } + +// #[test] +// fn basic_object_with_null_into_rb() { +// let json = json!({ +// "a": 1, +// "b": "hello", +// "c": null +// }); + +// let (rb, _) = +// into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap(); + +// assert_eq!(rb.num_rows(), 1); +// assert_eq!(rb.num_columns(), 3); +// assert_eq!( +// rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), +// &Int64Array::from_iter([1]) +// ); +// assert_eq!( +// rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), +// &StringArray::from_iter_values(["hello"]) +// ); +// } + +// #[test] +// fn basic_object_derive_schema_into_rb() { +// let json = json!({ +// "a": 1, +// "b": "hello", +// }); + +// let schema = fields_to_map( +// [ +// Field::new("a", DataType::Int64, true), +// Field::new("b", DataType::Utf8, true), +// Field::new("c", DataType::Float64, true), +// ] +// .into_iter(), +// ); + +// let (rb, _) = into_event_batch(json, schema, false, None, SchemaVersion::V0).unwrap(); + +// assert_eq!(rb.num_rows(), 1); +// assert_eq!(rb.num_columns(), 3); +// assert_eq!( +// rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), +// &Int64Array::from_iter([1]) +// ); +// assert_eq!( +// rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), +// &StringArray::from_iter_values(["hello"]) +// ); +// } + +// #[test] +// fn basic_object_schema_mismatch() { +// let json = json!({ +// "a": 1, +// "b": 1, // type mismatch +// }); + +// let schema = fields_to_map( +// [ +// Field::new("a", DataType::Int64, true), +// Field::new("b", DataType::Utf8, true), +// Field::new("c", DataType::Float64, true), +// ] +// .into_iter(), +// ); + +// assert!(into_event_batch(json, schema, false, None, SchemaVersion::V0,).is_err()); +// } + +// #[test] +// fn empty_object() { +// let json = json!({}); + +// let schema = fields_to_map( +// [ +// Field::new("a", DataType::Int64, true), +// Field::new("b", DataType::Utf8, true), +// Field::new("c", DataType::Float64, true), +// ] +// .into_iter(), +// ); + +// let (rb, _) = into_event_batch(json, schema, false, None, SchemaVersion::V0).unwrap(); + +// assert_eq!(rb.num_rows(), 1); +// assert_eq!(rb.num_columns(), 1); +// } + +// #[test] +// fn non_object_arr_is_err() { +// let json = json!([1]); + +// assert!(convert_array_to_object( +// json, +// None, +// None, +// None, +// SchemaVersion::V0, +// &crate::event::format::LogSource::default() +// ) +// .is_err()) +// } + +// #[test] +// fn array_into_recordbatch_inffered_schema() { +// let json = json!([ +// { +// "b": "hello", +// }, +// { +// "b": "hello", +// "a": 1, +// "c": 1 +// }, +// { +// "a": 1, +// "b": "hello", +// "c": null +// }, +// ]); + +// let (rb, _) = +// into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap(); + +// assert_eq!(rb.num_rows(), 3); +// assert_eq!(rb.num_columns(), 4); + +// let schema = rb.schema(); +// let fields = &schema.fields; + +// assert_eq!(&*fields[1], &Field::new("a", DataType::Int64, true)); +// assert_eq!(&*fields[2], &Field::new("b", DataType::Utf8, true)); +// assert_eq!(&*fields[3], &Field::new("c", DataType::Int64, true)); + +// assert_eq!( +// rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), +// &Int64Array::from(vec![None, Some(1), Some(1)]) +// ); +// assert_eq!( +// rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), +// &StringArray::from(vec![Some("hello"), Some("hello"), Some("hello"),]) +// ); +// assert_eq!( +// rb.column_by_name("c").unwrap().as_int64_arr().unwrap(), +// &Int64Array::from(vec![None, Some(1), None]) +// ); +// } + +// #[test] +// fn arr_with_null_into_rb() { +// let json = json!([ +// { +// "c": null, +// "b": "hello", +// "a": null +// }, +// { +// "a": 1, +// "c": 1.22, +// "b": "hello" +// }, +// { +// "b": "hello", +// "a": 1, +// "c": null +// }, +// ]); + +// let (rb, _) = +// into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap(); + +// assert_eq!(rb.num_rows(), 3); +// assert_eq!(rb.num_columns(), 4); +// assert_eq!( +// rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), +// &Int64Array::from(vec![None, Some(1), Some(1)]) +// ); +// assert_eq!( +// rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), +// &StringArray::from(vec![Some("hello"), Some("hello"), Some("hello"),]) +// ); +// assert_eq!( +// rb.column_by_name("c").unwrap().as_float64_arr().unwrap(), +// &Float64Array::from(vec![None, Some(1.22), None,]) +// ); +// } + +// #[test] +// fn arr_with_null_derive_schema_into_rb() { +// let json = json!([ +// { +// "c": null, +// "b": "hello", +// "a": null +// }, +// { +// "a": 1, +// "c": 1.22, +// "b": "hello" +// }, +// { +// "b": "hello", +// "a": 1, +// "c": null +// }, +// ]); + +// let schema = fields_to_map( +// [ +// Field::new("a", DataType::Int64, true), +// Field::new("b", DataType::Utf8, true), +// Field::new("c", DataType::Float64, true), +// ] +// .into_iter(), +// ); + +// let (rb, _) = into_event_batch(json, schema, false, None, SchemaVersion::V0).unwrap(); + +// assert_eq!(rb.num_rows(), 3); +// assert_eq!(rb.num_columns(), 4); +// assert_eq!( +// rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), +// &Int64Array::from(vec![None, Some(1), Some(1)]) +// ); +// assert_eq!( +// rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), +// &StringArray::from(vec![Some("hello"), Some("hello"), Some("hello"),]) +// ); +// assert_eq!( +// rb.column_by_name("c").unwrap().as_float64_arr().unwrap(), +// &Float64Array::from(vec![None, Some(1.22), None,]) +// ); +// } + +// #[test] +// fn arr_schema_mismatch() { +// let json = json!([ +// { +// "a": null, +// "b": "hello", +// "c": 1.24 +// }, +// { +// "a": 1, +// "b": "hello", +// "c": 1 +// }, +// { +// "a": 1, +// "b": "hello", +// "c": null +// }, +// ]); + +// let schema = fields_to_map( +// [ +// Field::new("a", DataType::Int64, true), +// Field::new("b", DataType::Utf8, true), +// Field::new("c", DataType::Float64, true), +// ] +// .into_iter(), +// ); + +// assert!(into_event_batch(json, schema, false, None, SchemaVersion::V0,).is_err()); +// } + +// #[test] +// fn arr_obj_with_nested_type() { +// let json = json!([ +// { +// "a": 1, +// "b": "hello", +// }, +// { +// "a": 1, +// "b": "hello", +// }, +// { +// "a": 1, +// "b": "hello", +// "c": [{"a": 1}] +// }, +// { +// "a": 1, +// "b": "hello", +// "c": [{"a": 1, "b": 2}] +// }, +// ]); +// let flattened_json = convert_to_array( +// convert_array_to_object( +// json, +// None, +// None, +// None, +// SchemaVersion::V0, +// &crate::event::format::LogSource::default(), +// ) +// .unwrap(), +// ) +// .unwrap(); + +// let (rb, _) = into_event_batch( +// flattened_json, +// HashMap::default(), +// false, +// None, +// SchemaVersion::V0, +// ) +// .unwrap(); +// assert_eq!(rb.num_rows(), 4); +// assert_eq!(rb.num_columns(), 5); +// assert_eq!( +// rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), +// &Int64Array::from(vec![Some(1), Some(1), Some(1), Some(1)]) +// ); +// assert_eq!( +// rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), +// &StringArray::from(vec![ +// Some("hello"), +// Some("hello"), +// Some("hello"), +// Some("hello") +// ]) +// ); + +// assert_eq!( +// rb.column_by_name("c_a") +// .unwrap() +// .as_any() +// .downcast_ref::() +// .unwrap(), +// &ListArray::from_iter_primitive::(vec![ +// None, +// None, +// Some(vec![Some(1i64)]), +// Some(vec![Some(1)]) +// ]) +// ); + +// assert_eq!( +// rb.column_by_name("c_b") +// .unwrap() +// .as_any() +// .downcast_ref::() +// .unwrap(), +// &ListArray::from_iter_primitive::(vec![ +// None, +// None, +// None, +// Some(vec![Some(2i64)]) +// ]) +// ); +// } + +// #[test] +// fn arr_obj_with_nested_type_v1() { +// let json = json!([ +// { +// "a": 1, +// "b": "hello", +// }, +// { +// "a": 1, +// "b": "hello", +// }, +// { +// "a": 1, +// "b": "hello", +// "c": [{"a": 1}] +// }, +// { +// "a": 1, +// "b": "hello", +// "c": [{"a": 1, "b": 2}] +// }, +// ]); +// let flattened_json = convert_to_array( +// convert_array_to_object( +// json, +// None, +// None, +// None, +// SchemaVersion::V1, +// &crate::event::format::LogSource::default(), +// ) +// .unwrap(), +// ) +// .unwrap(); + +// let (rb, _) = into_event_batch( +// flattened_json, +// HashMap::default(), +// false, +// None, +// SchemaVersion::V1, +// ) +// .unwrap(); + +// assert_eq!(rb.num_rows(), 4); +// assert_eq!(rb.num_columns(), 5); +// assert_eq!( +// rb.column_by_name("a").unwrap().as_float64_arr().unwrap(), +// &Float64Array::from(vec![Some(1.0), Some(1.0), Some(1.0), Some(1.0)]) +// ); +// assert_eq!( +// rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), +// &StringArray::from(vec![ +// Some("hello"), +// Some("hello"), +// Some("hello"), +// Some("hello") +// ]) +// ); + +// assert_eq!( +// rb.column_by_name("c_a").unwrap().as_float64_arr().unwrap(), +// &Float64Array::from(vec![None, None, Some(1.0), Some(1.0)]) +// ); + +// assert_eq!( +// rb.column_by_name("c_b").unwrap().as_float64_arr().unwrap(), +// &Float64Array::from(vec![None, None, None, Some(2.0)]) +// ); +// } +// } diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index a3e9af22d..bb03e022e 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -16,22 +16,18 @@ * */ -use arrow_schema::Field; use chrono::{DateTime, NaiveDateTime, Utc}; use itertools::Itertools; use serde_json::Value; -use std::{collections::HashMap, sync::Arc}; +use std::collections::HashMap; use crate::{ - event::{ - format::{json, EventFormat, LogSource}, - Event, - }, + event::format::{json, EventFormat, LogSource}, handlers::http::{ ingest::PostError, kinesis::{flatten_kinesis_logs, Message}, }, - metadata::{SchemaVersion, STREAM_INFO}, + metadata::STREAM_INFO, storage::StreamType, utils::json::{convert_array_to_object, flatten::convert_to_array}, }; @@ -110,46 +106,24 @@ pub async fn push_logs( .ok_or(PostError::StreamNotFound(stream_name.to_owned()))? .schema .clone(); - let (rb, is_first_event) = into_event_batch( - value, - schema, - static_schema_flag, - time_partition.as_ref(), - schema_version, - )?; - - Event { - rb, - stream_name: stream_name.to_owned(), - origin_format: "json", - origin_size, - is_first_event, - parsed_timestamp, - time_partition: time_partition.clone(), - custom_partition_values, - stream_type: StreamType::UserDefined, - } + + json::Event { data: value } + .to_event( + stream_name, + origin_size, + &schema, + static_schema_flag, + parsed_timestamp, + time_partition.clone(), + custom_partition_values, + schema_version, + StreamType::UserDefined, + )? .process()?; } Ok(()) } -pub fn into_event_batch( - data: Value, - schema: HashMap>, - static_schema_flag: bool, - time_partition: Option<&String>, - schema_version: SchemaVersion, -) -> Result<(arrow_array::RecordBatch, bool), PostError> { - let (rb, is_first) = json::Event { data }.into_recordbatch( - &schema, - static_schema_flag, - time_partition, - schema_version, - )?; - Ok((rb, is_first)) -} - pub fn get_custom_partition_values( json: &Value, custom_partition_list: &[&str],