Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,23 @@ use quickwit_index_config::IndexConfig;
use quickwit_proto::{
LeafSearchResponse, SearchRequest, SplitIdAndFooterOffsets, SplitSearchError,
};
use quickwit_storage::{BundleStorage, MemorySizedCache, OwnedBytes, Storage};
use quickwit_storage::{
wrap_storage_with_long_term_cache, BundleStorage, MemorySizedCache, OwnedBytes, Storage,
};
use tantivy::collector::Collector;
use tantivy::query::Query;
use tantivy::{Index, ReloadPolicy, Searcher, Term};
use tokio::task::spawn_blocking;
use tracing::*;

const SPLIT_FOOTER_CACHE_CAPACITY: usize = 500_000_000;

use crate::collector::{make_collector_for_split, make_merge_collector, GenericQuickwitCollector};
use crate::SearchError;

fn global_split_footer_cache() -> &'static MemorySizedCache<String> {
static INSTANCE: OnceCell<MemorySizedCache<String>> = OnceCell::new();
INSTANCE.get_or_init(|| MemorySizedCache::with_capacity_in_bytes(500_000_000))
INSTANCE.get_or_init(|| MemorySizedCache::with_capacity_in_bytes(SPLIT_FOOTER_CACHE_CAPACITY))
}

async fn get_split_footer_from_cache_or_fetch(
Expand Down Expand Up @@ -97,8 +101,9 @@ pub(crate) async fn open_index(

let hotcache_bytes = footer_data.split_off(footer_data.len() - hotcache_num_bytes);

let bundle = BundleStorage::new(index_storage, split_file, &footer_data)?;
let directory = StorageDirectory::new(Arc::new(bundle));
let bundle_storage = BundleStorage::new(index_storage, split_file, &footer_data)?;
let bundle_storage_with_cache = wrap_storage_with_long_term_cache(Arc::new(bundle_storage));
let directory = StorageDirectory::new(bundle_storage_with_cache);
let caching_directory = CachingDirectory::new_with_unlimited_capacity(Arc::new(directory));
let hot_directory = HotDirectory::open(caching_directory, hotcache_bytes)?;
let index = Index::open(hot_directory)?;
Expand Down
8 changes: 1 addition & 7 deletions quickwit-serve/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@ mod error;
mod grpc;
mod grpc_adapter;
mod http_handler;
mod quickwit_cache;
mod rest;

use std::io::Write;
use std::net::SocketAddr;
use std::sync::Arc;

use quickwit_cache::QuickwitCache;
use quickwit_cluster::cluster::{read_or_create_host_key, Cluster};
use quickwit_cluster::service::ClusterServiceImpl;
use quickwit_metastore::MetastoreUriResolver;
Expand All @@ -40,7 +38,6 @@ use quickwit_search::{
};
use quickwit_storage::{
LocalFileStorageFactory, RegionProvider, S3CompatibleObjectStorageFactory, StorageUriResolver,
StorageWithCacheFactory,
};
use quickwit_telemetry::payload::{ServeEvent, TelemetryEvent};
use termcolor::{self, Color, ColorChoice, ColorSpec, StandardStream, WriteColor};
Expand Down Expand Up @@ -84,10 +81,7 @@ fn display_help_message(
/// - s3+localstack://
/// - file:// uris.
fn storage_uri_resolver() -> StorageUriResolver {
let s3_storage = StorageWithCacheFactory::new(
Arc::new(S3CompatibleObjectStorageFactory::default()),
Arc::new(QuickwitCache::default()),
);
let s3_storage = S3CompatibleObjectStorageFactory::default();
StorageUriResolver::builder()
.register(LocalFileStorageFactory::default())
.register(s3_storage)
Expand Down
24 changes: 22 additions & 2 deletions quickwit-storage/src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,37 @@

mod in_ram_slice_cache;
mod memory_sized_cache;
mod quickwit_cache;
mod storage_with_cache;

use std::ops::Range;
use std::path::{Path, PathBuf};
use std::sync::Arc;

use async_trait::async_trait;
use once_cell::sync::OnceCell;

pub use self::in_ram_slice_cache::SliceCache;
pub use self::memory_sized_cache::MemorySizedCache;
pub use self::storage_with_cache::StorageWithCacheFactory;
use crate::OwnedBytes;
use crate::cache::quickwit_cache::QuickwitCache;
use crate::cache::storage_with_cache::StorageWithCache;
use crate::{OwnedBytes, Storage};

/// Wraps the given directory with a slice cache that is actually global
/// to quickwit.
///
/// FIXME The current approach is quite horrible in that:
/// - it uses a global
/// - it relies on the idea that all of the files we attempt to cache
/// have universally unique names. It happens to be true today, but this might be very error prone
/// in the future.
pub fn wrap_storage_with_long_term_cache(storage: Arc<dyn Storage>) -> Arc<dyn Storage> {
static SINGLETON: OnceCell<Arc<dyn Cache>> = OnceCell::new();
let cache = SINGLETON
.get_or_init(|| Arc::new(QuickwitCache::default()))
.clone();
Arc::new(StorageWithCache { storage, cache })
}

/// The `Cache` trait is the abstraction used to describe the caching logic
/// used in front of a storage. See `StorageWithCache`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,16 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;

use async_trait::async_trait;
use quickwit_common::HOTCACHE_FILENAME;
use quickwit_storage::{Cache, OwnedBytes, SliceCache};

const FULL_SLICE: Range<usize> = 0..usize::MAX;
use crate::{Cache, OwnedBytes, SliceCache};

/// Hotcache cache capacity is hardcoded to 500 MB.
/// Once the capacity is reached, a LRU strategy is used.
const HOTCACHE_CACHE_CAPACITY: usize = 500_000_000;
const FULL_SLICE: Range<usize> = 0..usize::MAX;

/// Fast field cache capacity is hardcoded to 3GB.
/// Once the capacity is reached, a LRU strategy is used.
const FAST_CACHE_CAPACITY: usize = 3_000_000_000;

pub struct QuickwitCache {
pub(crate) struct QuickwitCache {
router: Vec<(&'static str, Arc<dyn Cache>)>,
}

Expand All @@ -48,10 +44,6 @@ impl From<Vec<(&'static str, Arc<dyn Cache>)>> for QuickwitCache {
impl Default for QuickwitCache {
fn default() -> Self {
let mut quickwit_cache = QuickwitCache::empty();
quickwit_cache.add_route(
HOTCACHE_FILENAME,
Arc::new(SimpleCache::with_capacity_in_bytes(HOTCACHE_CACHE_CAPACITY)),
);
quickwit_cache.add_route(
".fast",
Arc::new(SimpleCache::with_capacity_in_bytes(FAST_CACHE_CAPACITY)),
Expand Down Expand Up @@ -156,9 +148,8 @@ mod tests {
use std::path::Path;
use std::sync::Arc;

use quickwit_storage::{Cache, MockCache, OwnedBytes};

use super::QuickwitCache;
use crate::{Cache, MockCache, OwnedBytes};

#[tokio::test]
async fn test_quickwit_cache_get_all() {
Expand Down
40 changes: 4 additions & 36 deletions quickwit-storage/src/cache/storage_with_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ use std::sync::Arc;

use async_trait::async_trait;

use crate::{Cache, OwnedBytes, Storage, StorageFactory, StorageResult};
use crate::{Cache, OwnedBytes, Storage, StorageResult};

/// Use with care, StorageWithCache is read-only.
struct StorageWithCache {
storage: Arc<dyn Storage>,
cache: Arc<dyn Cache>,
pub(crate) struct StorageWithCache {
pub storage: Arc<dyn Storage>,
pub cache: Arc<dyn Cache>,
}

#[async_trait]
Expand Down Expand Up @@ -84,38 +84,6 @@ impl Storage for StorageWithCache {
}
}

/// A StorageFactory that wraps all Storage that are produced with a cache.
///
/// The cache is shared with all of the storage instances.
pub struct StorageWithCacheFactory {
storage_factory: Arc<dyn StorageFactory>,
cache: Arc<dyn Cache>,
}

impl StorageWithCacheFactory {
/// Creates a new StorageFactory with the given cache.
pub fn new(storage_factory: Arc<dyn StorageFactory>, cache: Arc<dyn Cache>) -> Self {
StorageWithCacheFactory {
storage_factory,
cache,
}
}
}

impl StorageFactory for StorageWithCacheFactory {
fn protocol(&self) -> String {
self.storage_factory.protocol()
}

fn resolve(&self, uri: &str) -> crate::StorageResult<Arc<dyn Storage>> {
let storage = self.storage_factory.resolve(uri)?;
Ok(Arc::new(StorageWithCache {
storage,
cache: self.cache.clone(),
}))
}
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;
Expand Down
2 changes: 1 addition & 1 deletion quickwit-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pub use self::storage_resolver::{
};
#[cfg(feature = "testsuite")]
pub use self::tests::storage_test_suite;
pub use crate::cache::{Cache, MemorySizedCache, SliceCache, StorageWithCacheFactory};
pub use crate::cache::{wrap_storage_with_long_term_cache, Cache, MemorySizedCache, SliceCache};
pub use crate::error::{StorageError, StorageErrorKind, StorageResolverError, StorageResult};

#[cfg(any(test, feature = "testsuite"))]
Expand Down