Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 2 additions & 21 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

use super::logstream::error::{CreateStreamError, StreamError};
use super::modal::utils::ingest_utils::{flatten_and_push_logs, push_logs};
use super::otel;
use super::users::dashboards::DashboardError;
use super::users::filters::FiltersError;
use crate::event::{
Expand All @@ -27,7 +26,7 @@ use crate::event::{
format::{self, EventFormat},
};
use crate::handlers::http::modal::utils::logstream_utils::create_stream_and_schema_from_storage;
use crate::handlers::{LOG_SOURCE_KEY, LOG_SOURCE_OTEL, STREAM_NAME_HEADER_KEY};
use crate::handlers::STREAM_NAME_HEADER_KEY;
use crate::localcache::CacheError;
use crate::metadata::error::stream_info::MetadataError;
use crate::metadata::STREAM_INFO;
Expand Down Expand Up @@ -115,25 +114,7 @@ pub async fn ingest_otel_logs(req: HttpRequest, body: Bytes) -> Result<HttpRespo
{
let stream_name = stream_name.to_str().unwrap().to_owned();
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;

//flatten logs
if let Some((_, log_source)) = req.headers().iter().find(|&(key, _)| key == LOG_SOURCE_KEY)
{
let log_source: String = log_source.to_str().unwrap().to_owned();
if log_source == LOG_SOURCE_OTEL {
let mut json = otel::flatten_otel_logs(&body);
for record in json.iter_mut() {
let body: Bytes = serde_json::to_vec(record).unwrap().into();
push_logs(stream_name.to_string(), req.clone(), body).await?;
}
} else {
return Err(PostError::CustomError("Unknown log source".to_string()));
}
} else {
return Err(PostError::CustomError(
"log source key header is missing".to_string(),
));
}
push_logs(stream_name.to_string(), req.clone(), body).await?;
} else {
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
}
Expand Down
1 change: 0 additions & 1 deletion src/handlers/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ pub mod logstream;
pub mod middleware;
pub mod modal;
pub mod oidc;
mod otel;
pub mod query;
pub mod rbac;
pub mod role;
Expand Down
36 changes: 13 additions & 23 deletions src/handlers/http/modal/utils/ingest_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@
*
*/

use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
};
use std::{collections::HashMap, sync::Arc};

use actix_web::HttpRequest;
use arrow_schema::Field;
Expand All @@ -33,8 +30,8 @@ use crate::{
format::{self, EventFormat},
},
handlers::{
http::{ingest::PostError, kinesis, otel},
LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, LOG_SOURCE_OTEL, PREFIX_META, PREFIX_TAGS, SEPARATOR,
http::{ingest::PostError, kinesis},
LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, PREFIX_META, PREFIX_TAGS, SEPARATOR,
},
metadata::STREAM_INFO,
storage::StreamType,
Expand All @@ -46,26 +43,19 @@ pub async fn flatten_and_push_logs(
body: Bytes,
stream_name: String,
) -> Result<(), PostError> {
//flatten logs
if let Some((_, log_source)) = req.headers().iter().find(|&(key, _)| key == LOG_SOURCE_KEY) {
let mut json: Vec<BTreeMap<String, Value>> = Vec::new();
let log_source: String = log_source.to_str().unwrap().to_owned();
match log_source.as_str() {
LOG_SOURCE_KINESIS => json = kinesis::flatten_kinesis_logs(&body),
LOG_SOURCE_OTEL => {
json = otel::flatten_otel_logs(&body);
}
_ => {
log::warn!("Unknown log source: {}", log_source);
push_logs(stream_name.to_string(), req.clone(), body).await?;
}
}
for record in json.iter_mut() {
let log_source = req
.headers()
.get(LOG_SOURCE_KEY)
.map(|header| header.to_str().unwrap_or_default())
.unwrap_or_default();
if log_source == LOG_SOURCE_KINESIS {
let json = kinesis::flatten_kinesis_logs(&body);
for record in json.iter() {
let body: Bytes = serde_json::to_vec(record).unwrap().into();
push_logs(stream_name.to_string(), req.clone(), body).await?;
push_logs(stream_name.clone(), req.clone(), body.clone()).await?;
}
} else {
push_logs(stream_name.to_string(), req, body).await?;
push_logs(stream_name, req, body).await?;
}
Ok(())
}
Expand Down
Loading
Loading