Skip to content

Commit

Permalink
feat(object-store): Allow caching object store objects locally (#2153)
Browse files Browse the repository at this point in the history
## What ❔

Implements an optional caching layer for object store that persists
objects fetched from an external store (e.g., a GCS one) to a
configurable local directory.

## Why ❔

Allows to speed up repeated snapshot recovery w/o sacrificing generality
(i.e. switching to a file-backed store and downloading all snapshot data
manually).

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
  • Loading branch information
slowli committed Jun 7, 2024
1 parent 253cc83 commit 6c6e65c
Show file tree
Hide file tree
Showing 11 changed files with 200 additions and 18 deletions.
9 changes: 9 additions & 0 deletions core/lib/config/src/configs/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@ pub struct ObjectStoreConfig {
pub mode: ObjectStoreMode,
#[serde(default = "ObjectStoreConfig::default_max_retries")]
pub max_retries: u16,
/// Path to local directory that will be used to mirror store objects locally. If not specified, no mirroring will be used.
/// The directory layout is identical to [`ObjectStoreMode::FileBacked`].
///
/// Mirroring is primarily useful for local development and testing; it might not provide substantial performance benefits
/// if the Internet connection used by the app is fast enough.
///
/// **Important.** Mirroring logic assumes that objects in the underlying store are immutable. If this is not the case,
/// the mirrored objects may become stale.
pub local_mirror_path: Option<String>,
}

impl ObjectStoreConfig {
Expand Down
1 change: 1 addition & 0 deletions core/lib/config/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,7 @@ impl Distribution<configs::ObjectStoreConfig> for EncodeDist {
configs::ObjectStoreConfig {
mode: self.sample(rng),
max_retries: self.sample(rng),
local_mirror_path: self.sample(rng),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/lib/env_config/src/fri_prover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ mod tests {
gcs_credential_file_path: "/path/to/credentials.json".to_owned(),
},
max_retries: 5,
local_mirror_path: None,
}),
availability_check_interval_in_secs: Some(1_800),
}
Expand All @@ -65,7 +66,6 @@ mod tests {
OBJECT_STORE_MODE="GCSWithCredentialFile"
OBJECT_STORE_GCS_CREDENTIAL_FILE_PATH="/path/to/credentials.json"
OBJECT_STORE_MAX_RETRIES="5"
"#;
lock.set_env(config);

Expand Down
3 changes: 3 additions & 0 deletions core/lib/env_config/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ mod tests {
gcs_credential_file_path: "/path/to/credentials.json".to_owned(),
},
max_retries: 5,
local_mirror_path: Some("/var/cache".to_owned()),
}
}

Expand All @@ -67,6 +68,7 @@ mod tests {
OBJECT_STORE_MODE="GCSWithCredentialFile"
OBJECT_STORE_GCS_CREDENTIAL_FILE_PATH="/path/to/credentials.json"
OBJECT_STORE_MAX_RETRIES="5"
OBJECT_STORE_LOCAL_MIRROR_PATH="/var/cache"
"#;
lock.set_env(config);
let actual = ObjectStoreConfig::from_env().unwrap();
Expand Down Expand Up @@ -117,6 +119,7 @@ mod tests {
PROVER_OBJECT_STORE_MODE="GCSWithCredentialFile"
PROVER_OBJECT_STORE_GCS_CREDENTIAL_FILE_PATH="/path/to/credentials.json"
PROVER_OBJECT_STORE_MAX_RETRIES="5"
PROVER_OBJECT_STORE_LOCAL_MIRROR_PATH="/var/cache"
"#;
lock.set_env(config);
let actual = ProverObjectStoreConfig::from_env().unwrap().0;
Expand Down
46 changes: 29 additions & 17 deletions core/lib/object_store/src/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use zksync_config::configs::object_store::{ObjectStoreConfig, ObjectStoreMode};
use crate::{
file::FileBackedObjectStore,
gcs::{GoogleCloudStore, GoogleCloudStoreAuthMode},
mirror::MirroringObjectStore,
raw::{ObjectStore, ObjectStoreError},
retries::StoreWithRetries,
};
Expand Down Expand Up @@ -54,25 +55,22 @@ impl ObjectStoreFactory {
async fn create_from_config(
config: &ObjectStoreConfig,
) -> Result<Arc<dyn ObjectStore>, ObjectStoreError> {
tracing::trace!("Initializing object store with configuration {config:?}");
match &config.mode {
ObjectStoreMode::GCS { bucket_base_url } => {
tracing::trace!(
"Initialized GoogleCloudStorage Object store without credential file"
);
let store = StoreWithRetries::try_new(config.max_retries, || {
GoogleCloudStore::new(
GoogleCloudStoreAuthMode::Authenticated,
bucket_base_url.clone(),
)
})
.await?;
Ok(Arc::new(store))
Self::wrap_mirroring(store, config.local_mirror_path.as_ref()).await
}
ObjectStoreMode::GCSWithCredentialFile {
bucket_base_url,
gcs_credential_file_path,
} => {
tracing::trace!("Initialized GoogleCloudStorage Object store with credential file");
let store = StoreWithRetries::try_new(config.max_retries, || {
GoogleCloudStore::new(
GoogleCloudStoreAuthMode::AuthenticatedWithCredentialFile(
Expand All @@ -82,29 +80,43 @@ impl ObjectStoreFactory {
)
})
.await?;
Ok(Arc::new(store))
}
ObjectStoreMode::FileBacked {
file_backed_base_path,
} => {
tracing::trace!("Initialized FileBacked Object store");
let store = StoreWithRetries::try_new(config.max_retries, || {
FileBackedObjectStore::new(file_backed_base_path.clone())
})
.await?;
Ok(Arc::new(store))
Self::wrap_mirroring(store, config.local_mirror_path.as_ref()).await
}
ObjectStoreMode::GCSAnonymousReadOnly { bucket_base_url } => {
tracing::trace!("Initialized GoogleCloudStoragePublicReadOnly store");
let store = StoreWithRetries::try_new(config.max_retries, || {
GoogleCloudStore::new(
GoogleCloudStoreAuthMode::Anonymous,
bucket_base_url.clone(),
)
})
.await?;
Self::wrap_mirroring(store, config.local_mirror_path.as_ref()).await
}

ObjectStoreMode::FileBacked {
file_backed_base_path,
} => {
let store = StoreWithRetries::try_new(config.max_retries, || {
FileBackedObjectStore::new(file_backed_base_path.clone())
})
.await?;

if let Some(mirror_path) = &config.local_mirror_path {
tracing::warn!("Mirroring doesn't make sense with file-backed object store; ignoring mirror path `{mirror_path}`");
}
Ok(Arc::new(store))
}
}
}

async fn wrap_mirroring(
store: impl ObjectStore,
mirror_path: Option<&String>,
) -> Result<Arc<dyn ObjectStore>, ObjectStoreError> {
Ok(if let Some(mirror_path) = mirror_path {
Arc::new(MirroringObjectStore::new(store, mirror_path.clone()).await?)
} else {
Arc::new(store)
})
}
}
1 change: 1 addition & 0 deletions core/lib/object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ mod factory;
mod file;
mod gcs;
mod metrics;
mod mirror;
mod mock;
mod objects;
mod raw;
Expand Down
150 changes: 150 additions & 0 deletions core/lib/object_store/src/mirror.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
//! Mirroring object store.

use async_trait::async_trait;

use crate::{file::FileBackedObjectStore, raw::ObjectStore, Bucket, ObjectStoreError};

#[derive(Debug)]
pub(crate) struct MirroringObjectStore<S> {
inner: S,
mirror_store: FileBackedObjectStore,
}

impl<S: ObjectStore> MirroringObjectStore<S> {
pub async fn new(inner: S, mirror_path: String) -> Result<Self, ObjectStoreError> {
tracing::info!("Initializing mirroring for store {inner:?} at `{mirror_path}`");
let mirror_store = FileBackedObjectStore::new(mirror_path).await?;
Ok(Self {
inner,
mirror_store,
})
}
}

#[async_trait]
impl<S: ObjectStore> ObjectStore for MirroringObjectStore<S> {
#[tracing::instrument(skip(self))]
async fn get_raw(&self, bucket: Bucket, key: &str) -> Result<Vec<u8>, ObjectStoreError> {
match self.mirror_store.get_raw(bucket, key).await {
Ok(object) => {
tracing::trace!("obtained object from mirror");
return Ok(object);
}
Err(err) => {
if !matches!(err, ObjectStoreError::KeyNotFound(_)) {
tracing::warn!(
"unexpected error calling local mirror store: {:#}",
anyhow::Error::from(err)
);
}
let object = self.inner.get_raw(bucket, key).await?;
tracing::trace!("obtained object from underlying store");
if let Err(err) = self.mirror_store.put_raw(bucket, key, object.clone()).await {
tracing::warn!("failed mirroring object: {:#}", anyhow::Error::from(err));
} else {
tracing::trace!("mirrored object");
}
Ok(object)
}
}
}

#[tracing::instrument(skip(self, value), fields(value.len = value.len()))]
async fn put_raw(
&self,
bucket: Bucket,
key: &str,
value: Vec<u8>,
) -> Result<(), ObjectStoreError> {
self.inner.put_raw(bucket, key, value.clone()).await?;
// Only put the value into the mirror once it has been put in the underlying store
if let Err(err) = self.mirror_store.put_raw(bucket, key, value).await {
tracing::warn!("failed mirroring object: {:#}", anyhow::Error::from(err));
} else {
tracing::trace!("mirrored object");
}
Ok(())
}

#[tracing::instrument(skip(self))]
async fn remove_raw(&self, bucket: Bucket, key: &str) -> Result<(), ObjectStoreError> {
self.inner.remove_raw(bucket, key).await?;
// Only remove the value from the mirror once it has been removed in the underlying store
if let Err(err) = self.mirror_store.remove_raw(bucket, key).await {
tracing::warn!(
"failed removing object from mirror: {:#}",
anyhow::Error::from(err)
);
} else {
tracing::trace!("removed object from mirror");
}
Ok(())
}

fn storage_prefix_raw(&self, bucket: Bucket) -> String {
self.inner.storage_prefix_raw(bucket)
}
}

#[cfg(test)]
mod tests {
use assert_matches::assert_matches;
use tempfile::TempDir;

use super::*;
use crate::MockObjectStore;

#[tokio::test]
async fn mirroring_basics() {
let dir = TempDir::new().unwrap();
let path = dir.into_path().into_os_string().into_string().unwrap();

let mock_store = MockObjectStore::default();
mock_store
.put_raw(Bucket::StorageSnapshot, "test", vec![1, 2, 3])
.await
.unwrap();
let mirroring_store = MirroringObjectStore::new(mock_store, path).await.unwrap();

let object = mirroring_store
.get_raw(Bucket::StorageSnapshot, "test")
.await
.unwrap();
assert_eq!(object, [1, 2, 3]);
// Check that the object got mirrored.
let object_in_mirror = mirroring_store
.mirror_store
.get_raw(Bucket::StorageSnapshot, "test")
.await
.unwrap();
assert_eq!(object_in_mirror, [1, 2, 3]);
let object = mirroring_store
.get_raw(Bucket::StorageSnapshot, "test")
.await
.unwrap();
assert_eq!(object, [1, 2, 3]);

let err = mirroring_store
.get_raw(Bucket::StorageSnapshot, "missing")
.await
.unwrap_err();
assert_matches!(err, ObjectStoreError::KeyNotFound(_));

mirroring_store
.put_raw(Bucket::StorageSnapshot, "other", vec![3, 2, 1])
.await
.unwrap();
// Check that the object got mirrored.
let object_in_mirror = mirroring_store
.mirror_store
.get_raw(Bucket::StorageSnapshot, "other")
.await
.unwrap();
assert_eq!(object_in_mirror, [3, 2, 1]);
let object = mirroring_store
.get_raw(Bucket::StorageSnapshot, "other")
.await
.unwrap();
assert_eq!(object, [3, 2, 1]);
}
}
2 changes: 2 additions & 0 deletions core/lib/protobuf_config/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ impl ProtoRepr for proto::ObjectStore {
max_retries: required(&self.max_retries)
.and_then(|x| Ok((*x).try_into()?))
.context("max_retries")?,
local_mirror_path: self.local_mirror_path.clone(),
})
}

Expand Down Expand Up @@ -80,6 +81,7 @@ impl ProtoRepr for proto::ObjectStore {
Self {
mode: Some(mode),
max_retries: Some(this.max_retries.into()),
local_mirror_path: this.local_mirror_path.clone(),
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ message ObjectStore {
FileBacked file_backed = 4;
}
optional uint32 max_retries = 5; // required
optional string local_mirror_path = 6; // optional; fs path
}
1 change: 1 addition & 0 deletions prover/prover_fri/tests/basic_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ async fn prover_and_assert_base_layer(
file_backed_base_path: "./tests/data/".to_owned(),
},
max_retries: 5,
local_mirror_path: None,
};
let object_store = ObjectStoreFactory::new(object_store_config)
.create_store()
Expand Down
2 changes: 2 additions & 0 deletions prover/witness_generator/tests/basic_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ async fn test_leaf_witness_gen() {
file_backed_base_path: "./tests/data/leaf/".to_owned(),
},
max_retries: 5,
local_mirror_path: None,
};
let object_store = ObjectStoreFactory::new(object_store_config)
.create_store()
Expand Down Expand Up @@ -71,6 +72,7 @@ async fn test_node_witness_gen() {
file_backed_base_path: "./tests/data/node/".to_owned(),
},
max_retries: 5,
local_mirror_path: None,
};
let object_store = ObjectStoreFactory::new(object_store_config)
.create_store()
Expand Down

0 comments on commit 6c6e65c

Please sign in to comment.