Skip to content

Commit

Permalink
perf: get schema versions for the time range (#3397)
Browse files Browse the repository at this point in the history
  • Loading branch information
hengfeiyang committed May 2, 2024
1 parent 3ce71a1 commit 13e1442
Show file tree
Hide file tree
Showing 10 changed files with 154 additions and 92 deletions.
2 changes: 1 addition & 1 deletion src/handler/http/request/status/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ async fn get_stream_schema_status() -> (usize, usize, usize) {
mem_size += key.len();
for schema in val.iter() {
stream_schema_num += 1;
mem_size += schema.size();
mem_size += schema.1.size();
}
}
drop(r);
Expand Down
7 changes: 3 additions & 4 deletions src/infra/src/db/nats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::{
cmp::min,
sync::{
atomic::{AtomicU8, Ordering},
Arc,
Expand Down Expand Up @@ -318,7 +317,7 @@ impl super::Db for NatsDb {
let value = bucket.get(&encoded_key).await?;
Ok::<(String, Option<Bytes>), Error>((key, value))
})
.buffer_unordered(min(keys_len, 10))
.buffer_unordered(CONFIG.limit.cpu_num)
.try_collect::<Vec<(String, Option<Bytes>)>>()
.await
.map_err(|e| Error::Message(e.to_string()))?;
Expand Down Expand Up @@ -376,7 +375,7 @@ impl super::Db for NatsDb {
let value = bucket.get(&encoded_key).await?;
Ok::<Option<Bytes>, Error>(value)
})
.buffer_unordered(min(keys_len, 10))
.buffer_unordered(CONFIG.limit.cpu_num)
.try_collect::<Vec<Option<Bytes>>>()
.await
.map_err(|e| Error::Message(e.to_string()))?;
Expand Down Expand Up @@ -432,7 +431,7 @@ impl super::Db for NatsDb {
let value = bucket.get(&encoded_key).await?;
Ok::<Option<(i64, Bytes)>, Error>(value.map(|value| (start_dt, value)))
})
.buffer_unordered(min(keys_len, 10))
.buffer_unordered(CONFIG.limit.cpu_num)
.try_collect::<Vec<Option<(i64, Bytes)>>>()
.await
.map_err(|e| Error::Message(e.to_string()))?;
Expand Down
46 changes: 39 additions & 7 deletions src/infra/src/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@ use config::{
RwAHashMap, CONFIG,
};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use futures::{StreamExt, TryStreamExt};
use once_cell::sync::Lazy;

use crate::{
db as infra_db,
errors::{DbError, Error},
};

pub static STREAM_SCHEMAS: Lazy<RwAHashMap<String, Vec<Schema>>> = Lazy::new(Default::default);
pub static STREAM_SCHEMAS_COMPRESSED: Lazy<RwAHashMap<String, bytes::Bytes>> =
pub static STREAM_SCHEMAS: Lazy<RwAHashMap<String, Vec<(i64, Schema)>>> =
Lazy::new(Default::default);
pub static STREAM_SCHEMAS_COMPRESSED: Lazy<RwAHashMap<String, Vec<(i64, bytes::Bytes)>>> =
Lazy::new(Default::default);
pub static STREAM_SCHEMAS_LATEST: Lazy<RwAHashMap<String, Schema>> = Lazy::new(Default::default);
pub static STREAM_SCHEMAS_FIELDS: Lazy<RwAHashMap<String, (i64, Vec<String>)>> =
Expand Down Expand Up @@ -90,22 +92,52 @@ pub async fn get_versions(
org_id: &str,
stream_name: &str,
stream_type: StreamType,
time_range: Option<(i64, i64)>,
) -> Result<Vec<Schema>, anyhow::Error> {
let key = mk_key(org_id, stream_type, stream_name);
let cache_key = key.strip_prefix("/schema/").unwrap();

let (min_ts, max_ts) = time_range.unwrap_or((0, 0));
if CONFIG.common.schema_cache_compress_enabled {
let r = STREAM_SCHEMAS_COMPRESSED.read().await;
if let Some(data) = r.get(cache_key) {
let schema_bytes = zstd::decode_all(data.as_ref())?;
let schemas: Vec<Vec<Schema>> = json::from_slice(&schema_bytes)?;
if let Some(versions) = r.get(cache_key) {
let versions = versions
.iter()
.filter_map(|(start_dt, data)| {
if *start_dt >= min_ts && (max_ts == 0 || *start_dt <= max_ts) {
Some(data.clone())
} else {
None
}
})
.collect::<Vec<_>>();
let schemas = futures::stream::iter(versions)
.map(|data| async move {
let de_bytes = zstd::decode_all(data.as_ref())?;
let mut schemas: Vec<Schema> = json::from_slice(&de_bytes)?;
Ok::<Option<Schema>, Error>(schemas.pop())
})
.buffer_unordered(CONFIG.limit.cpu_num)
.try_collect::<Vec<Option<Schema>>>()
.await
.map_err(|e| Error::Message(e.to_string()))?;
return Ok(schemas.into_iter().flatten().collect());
}
drop(r);
} else {
let r = STREAM_SCHEMAS.read().await;
if let Some(schemas) = r.get(cache_key) {
return Ok(schemas.clone());
if let Some(versions) = r.get(cache_key) {
let schemas = versions
.iter()
.filter_map(|(start_dt, data)| {
if *start_dt >= min_ts && (max_ts == 0 || *start_dt <= max_ts) {
Some(data.clone())
} else {
None
}
})
.collect::<Vec<_>>();
return Ok(schemas);
}
drop(r);
}
Expand Down
2 changes: 1 addition & 1 deletion src/infra/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ pub async fn del(files: &[&str]) -> Result<(), anyhow::Error> {
.collect::<Vec<_>>();
let files_stream = futures::stream::iter(files);
files_stream
.for_each_concurrent(CONFIG.limit.query_thread_num, |file| async move {
.for_each_concurrent(CONFIG.limit.cpu_num, |file| async move {
match DEFAULT.delete(&(file.as_str().into())).await {
Ok(_) => {
log::debug!("Deleted object: {}", file);
Expand Down
14 changes: 10 additions & 4 deletions src/service/compact/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,13 +472,19 @@ async fn merge_files(
return Ok((String::from(""), FileMeta::default(), retain_file_list));
}

// get time range for these files
let min_ts = new_file_list.iter().map(|f| f.meta.min_ts).min().unwrap();
let max_ts = new_file_list.iter().map(|f| f.meta.max_ts).max().unwrap();

// convert the file to the latest version of schema
let schema_versions = infra::schema::get_versions(org_id, stream_name, stream_type).await?;
let schema_latest = schema_versions.last().unwrap();
let schema_latest = infra::schema::get(org_id, stream_name, stream_type).await?;
let schema_versions =
infra::schema::get_versions(org_id, stream_name, stream_type, Some((min_ts, max_ts)))
.await?;
let schema_latest_id = schema_versions.len() - 1;
let bloom_filter_fields =
stream::get_stream_setting_bloom_filter_fields(schema_latest).unwrap();
let full_text_search_fields = stream::get_stream_setting_fts_fields(schema_latest).unwrap();
stream::get_stream_setting_bloom_filter_fields(&schema_latest).unwrap();
let full_text_search_fields = stream::get_stream_setting_fts_fields(&schema_latest).unwrap();
if CONFIG.common.widening_schema_evolution && schema_versions.len() > 1 {
for file in &new_file_list {
// get the schema version of the file
Expand Down
9 changes: 9 additions & 0 deletions src/service/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,12 @@ pub(crate) async fn list_values(prefix: &str) -> Result<Vec<Bytes>> {
let db = infra_db::get_db().await;
db.list_values(prefix).await
}

#[inline]
pub(crate) async fn list_values_by_start_dt(
prefix: &str,
start_dt: Option<(i64, i64)>,
) -> Result<Vec<(i64, Bytes)>> {
let db = infra_db::get_db().await;
db.list_values_by_start_dt(prefix, start_dt).await
}
102 changes: 52 additions & 50 deletions src/service/db/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::{io::Write, sync::Arc};
use std::sync::Arc;

use arrow_schema::{Field, Schema};
use bytes::Bytes;
Expand Down Expand Up @@ -310,18 +310,19 @@ pub async fn watch() -> Result<(), anyhow::Error> {
ev.key.to_string()
};
let item_key = ev_key.strip_prefix(key).unwrap();
let schema_versions = match db::list_values(&format!("{ev_key}/")).await {
Ok(val) => val,
Err(e) => {
log::error!("Error getting value: {}", e);
continue;
}
};
let schema_versions =
match db::list_values_by_start_dt(&format!("{ev_key}/"), None).await {
Ok(val) => val,
Err(e) => {
log::error!("Error getting value: {}", e);
continue;
}
};
if schema_versions.is_empty() {
continue;
}
let latest_schema: Vec<Schema> =
match json::from_slice(schema_versions.last().unwrap()) {
match json::from_slice(&schema_versions.last().unwrap().1) {
Ok(val) => val,
Err(e) => {
log::error!("Error parsing schema, key: {}, error: {}", item_key, e);
Expand All @@ -340,29 +341,30 @@ pub async fn watch() -> Result<(), anyhow::Error> {
w.insert(item_key.to_string(), latest_schema.clone());
drop(w);
if CONFIG.common.schema_cache_compress_enabled {
let mut schema_bytes = Vec::with_capacity(
schema_versions.iter().map(|v| v.len()).sum::<usize>()
+ schema_versions.len()
+ 2,
);
_ = schema_bytes.write("[".as_bytes()).unwrap();
for (i, v) in schema_versions.iter().enumerate() {
if i > 0 {
_ = schema_bytes.write(",".as_bytes()).unwrap();
}
_ = schema_bytes.write(v).unwrap();
}
_ = schema_bytes.write("]".as_bytes()).unwrap();
let compressed_bytes = zstd::encode_all(schema_bytes.as_slice(), 3).unwrap();
let schema_versions = schema_versions
.into_iter()
.map(|(start_dt, data)| {
let en_data = zstd::encode_all(data.as_ref(), 3).unwrap();
(start_dt, en_data.into())
})
.collect::<Vec<_>>();
let mut w = STREAM_SCHEMAS_COMPRESSED.write().await;
w.insert(item_key.to_string(), compressed_bytes.into());
w.insert(item_key.to_string(), schema_versions);
w.shrink_to_fit();
drop(w);
} else {
let schema_versions = schema_versions
.iter()
.flat_map(|v| json::from_slice::<Vec<Schema>>(v).unwrap())
.collect::<Vec<Schema>>();
.into_iter()
.map(|(start_dt, data)| {
(
start_dt,
json::from_slice::<Vec<Schema>>(&data)
.unwrap()
.pop()
.unwrap(),
)
})
.collect::<Vec<_>>();
let mut w = STREAM_SCHEMAS.write().await;
w.insert(item_key.to_string(), schema_versions);
w.shrink_to_fit();
Expand Down Expand Up @@ -434,15 +436,15 @@ pub async fn watch() -> Result<(), anyhow::Error> {
pub async fn cache() -> Result<(), anyhow::Error> {
let db_key = "/schema/";
let items = db::list(db_key).await?;
let mut schemas: HashMap<String, Vec<(Bytes, i64)>> = HashMap::with_capacity(items.len());
let mut schemas: HashMap<String, Vec<(i64, Bytes)>> = HashMap::with_capacity(items.len());
for (key, val) in items {
let key = key.strip_prefix(db_key).unwrap();
let columns = key.split('/').take(4).collect::<Vec<_>>();
assert_eq!(columns.len(), 4, "BUG");
let item_key = format!("{}/{}/{}", columns[0], columns[1], columns[2]);
let start_dt: i64 = columns[3].parse().unwrap();
let entry = schemas.entry(item_key).or_insert(Vec::new());
entry.push((val, start_dt));
entry.push((start_dt, val));
}
let keys = schemas.keys().map(|k| k.to_string()).collect::<Vec<_>>();
for item_key in keys.iter() {
Expand All @@ -452,8 +454,8 @@ pub async fn cache() -> Result<(), anyhow::Error> {
if schema_versions.is_empty() {
continue;
}
schema_versions.sort_by(|a, b| a.1.cmp(&b.1));
let latest_schema: Vec<Schema> = json::from_slice(&schema_versions.last().unwrap().0)
schema_versions.sort_by(|a, b| a.0.cmp(&b.0));
let latest_schema: Vec<Schema> = json::from_slice(&schema_versions.last().unwrap().1)
.map_err(|e| {
anyhow::anyhow!("Error parsing schema, key: {}, error: {}", item_key, e)
})?;
Expand All @@ -469,31 +471,31 @@ pub async fn cache() -> Result<(), anyhow::Error> {
w.insert(item_key.to_string(), latest_schema.clone());
drop(w);
if CONFIG.common.schema_cache_compress_enabled {
let mut schema_bytes = Vec::with_capacity(
schema_versions.iter().map(|(v, _)| v.len()).sum::<usize>()
+ schema_versions.len()
+ 2,
);
_ = schema_bytes.write("[".as_bytes()).unwrap();
for (i, (v, _)) in schema_versions.iter().enumerate() {
if i > 0 {
_ = schema_bytes.write(",".as_bytes()).unwrap();
}
_ = schema_bytes.write(v).unwrap();
}
_ = schema_bytes.write("]".as_bytes()).unwrap();
let compressed_bytes = zstd::encode_all(schema_bytes.as_slice(), 3).unwrap();
let schema_versions = schema_versions
.into_iter()
.map(|(start_dt, data)| {
let en_data = zstd::encode_all(data.as_ref(), 3).unwrap();
(start_dt, en_data.into())
})
.collect::<Vec<_>>();
let mut w = STREAM_SCHEMAS_COMPRESSED.write().await;
w.insert(item_key.to_string(), compressed_bytes.into());
w.insert(item_key.to_string(), schema_versions);
drop(w);
} else {
let schema_versions = schema_versions
.iter()
.flat_map(|(v, _)| json::from_slice::<Vec<Schema>>(v).unwrap())
.collect::<Vec<Schema>>();
.into_iter()
.map(|(start_dt, data)| {
(
start_dt,
json::from_slice::<Vec<Schema>>(&data)
.unwrap()
.pop()
.unwrap(),
)
})
.collect::<Vec<_>>();
let mut w = STREAM_SCHEMAS.write().await;
w.insert(item_key.to_string(), schema_versions);
w.shrink_to_fit();
drop(w);
}
}
Expand Down
32 changes: 20 additions & 12 deletions src/service/search/grpc/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,26 @@ pub async fn search(
timeout: u64,
) -> super::SearchResult {
log::info!("[trace_id {trace_id}] search->storage: enter");
let schema_latest = infra::schema::get(&sql.org_id, &sql.stream_name, stream_type)
.await
.map_err(|e| Error::ErrorCode(ErrorCodes::ServerInternalError(e.to_string())))?;
// fetch all schema versions, group files by version
let schema_versions =
match infra::schema::get_versions(&sql.org_id, &sql.stream_name, stream_type).await {
Ok(versions) => versions,
Err(err) => {
log::error!("[trace_id {trace_id}] get schema error: {}", err);
return Err(Error::ErrorCode(ErrorCodes::SearchStreamNotFound(
sql.stream_name.clone(),
)));
}
};
let schema_versions = match infra::schema::get_versions(
&sql.org_id,
&sql.stream_name,
stream_type,
sql.meta.time_range,
)
.await
{
Ok(versions) => versions,
Err(err) => {
log::error!("[trace_id {trace_id}] get schema error: {}", err);
return Err(Error::ErrorCode(ErrorCodes::SearchStreamNotFound(
sql.stream_name.clone(),
)));
}
};
log::info!(
"[trace_id {trace_id}] search->storage: stream {}/{}/{}, get schema versions num {}",
&sql.org_id,
Expand All @@ -79,10 +88,9 @@ pub async fn search(
if schema_versions.is_empty() {
return Ok((HashMap::new(), ScanStats::new()));
}
let schema_latest = schema_versions.last().unwrap();
let schema_latest_id = schema_versions.len() - 1;

let stream_settings = unwrap_stream_settings(schema_latest).unwrap_or_default();
let stream_settings = unwrap_stream_settings(&schema_latest).unwrap_or_default();
let partition_time_level =
unwrap_partition_time_level(stream_settings.partition_time_level, stream_type);

Expand Down
Loading

0 comments on commit 13e1442

Please sign in to comment.