Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions server/src/handlers/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ pub async fn fetch_schema(stream_name: &str) -> anyhow::Result<arrow_schema::Sch
Ok(new_schema)
}

/// unused for now, might need it later
#[allow(unused)]
pub async fn send_query_request_to_ingester(query: &Query) -> anyhow::Result<Vec<Value>> {
// send the query request to the ingester
let mut res = vec![];
Expand Down
23 changes: 3 additions & 20 deletions server/src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ use crate::storage::object_storage::commit_schema_to_storage;
use crate::storage::ObjectStorageError;
use crate::utils::actix::extract_session_key_from_req;

use super::send_query_request_to_ingester;

/// Query Request through http endpoint.
#[derive(Debug, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "camelCase")]
Expand Down Expand Up @@ -85,21 +83,6 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon

let mut query = into_query(&query_request, &session_state).await?;

// ? run this code only if the query start time and now is less than 1 minute + margin
let mmem = if CONFIG.parseable.mode == Mode::Query {
// create a new query to send to the ingesters
if let Some(que) = transform_query_for_ingester(&query_request) {
let vals = send_query_request_to_ingester(&que)
.await
.map_err(|err| QueryError::Custom(err.to_string()))?;
Some(vals)
} else {
None
}
} else {
None
};

let creds = extract_session_key_from_req(&req).expect("expects basic auth");
let permissions = Users.get_permissions(&creds);

Expand Down Expand Up @@ -147,7 +130,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
fill_null: query_request.send_null,
with_fields: query_request.fields,
}
.to_http(mmem);
.to_http();

if let Some(table) = table_name {
let time = time.elapsed().as_secs_f64();
Expand Down Expand Up @@ -229,6 +212,8 @@ async fn into_query(
})
}

/// unused for now, might need it in the future
#[allow(unused)]
fn transform_query_for_ingester(query: &Query) -> Option<Query> {
if query.query.is_empty() {
return None;
Expand Down Expand Up @@ -288,8 +273,6 @@ pub enum QueryError {
Datafusion(#[from] DataFusionError),
#[error("Execution Error: {0}")]
Execute(#[from] ExecuteError),
#[error("Error: {0}")]
Custom(String),
#[error("ObjectStorage Error: {0}")]
ObjectStorage(#[from] ObjectStorageError),
#[error("Evern Error: {0}")]
Expand Down
2 changes: 2 additions & 0 deletions server/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,8 @@ fn time_from_path(path: &Path) -> DateTime<Utc> {
.unwrap()
}

/// unused for now might need it later
#[allow(unused)]
pub fn flatten_objects_for_count(objects: Vec<Value>) -> Vec<Value> {
if objects.is_empty() {
return objects;
Expand Down
12 changes: 2 additions & 10 deletions server/src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ use datafusion::arrow::record_batch::RecordBatch;
use itertools::Itertools;
use serde_json::{json, Value};

use crate::query::flatten_objects_for_count;

pub struct QueryResponse {
pub records: Vec<RecordBatch>,
pub fields: Vec<String>,
Expand All @@ -32,7 +30,7 @@ pub struct QueryResponse {
}

impl QueryResponse {
pub fn to_http(&self, imem: Option<Vec<Value>>) -> impl Responder {
pub fn to_http(&self) -> impl Responder {
log::info!("{}", "Returning query results");
let records: Vec<&RecordBatch> = self.records.iter().collect();
let mut json_records = record_batches_to_json_rows(&records).unwrap();
Expand All @@ -45,13 +43,7 @@ impl QueryResponse {
}
}
}
let mut values = json_records.into_iter().map(Value::Object).collect_vec();

if let Some(mut imem) = imem {
values.append(&mut imem);
}

let values = flatten_objects_for_count(values);
let values = json_records.into_iter().map(Value::Object).collect_vec();

let response = if self.with_fields {
json!({
Expand Down
27 changes: 23 additions & 4 deletions server/src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,29 @@ pub trait ObjectStorage: Sync + 'static {
&self,
stream_name: &str,
) -> Result<Schema, ObjectStorageError> {
let schema_path =
RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME]);
let schema_map = self.get_object(&schema_path).await?;
Ok(serde_json::from_slice(&schema_map)?)
// try get my schema
// if fails get the base schema
// put the schema to storage??
let schema_path = schema_path(stream_name);
let byte_data = match self.get_object(&schema_path).await {
Ok(bytes) => bytes,
Err(err) => {
log::info!("{:?}", err);
// base schema path
let schema_path = RelativePathBuf::from_iter([
stream_name,
STREAM_ROOT_DIRECTORY,
SCHEMA_FILE_NAME,
]);
let data = self.get_object(&schema_path).await?;
// schema was not found in store, so it needs to be placed
self.put_schema(stream_name, &serde_json::from_slice(&data).unwrap())
.await?;

data
}
};
Ok(serde_json::from_slice(&byte_data)?)
}

async fn get_schema(&self, stream_name: &str) -> Result<Schema, ObjectStorageError> {
Expand Down