Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

linera-views: add local variants of store traits #1843

Merged
merged 1 commit into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion examples/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 0 additions & 4 deletions linera-sdk/src/views/system_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

//! Functions and types to interface with the system API available to application views.

use async_trait::async_trait;
use linera_base::ensure;
use linera_views::{
batch::{Batch, WriteOperation},
Expand Down Expand Up @@ -40,7 +39,6 @@ impl AppStateStore {
}
}

#[async_trait]
impl ReadableKeyValueStore<ViewError> for AppStateStore {
// The AppStateStore of the system_api does not have limits
// on the size of its values.
Expand Down Expand Up @@ -101,7 +99,6 @@ impl ReadableKeyValueStore<ViewError> for AppStateStore {
}
}

#[async_trait]
impl WritableKeyValueStore<ViewError> for AppStateStore {
const MAX_VALUE_SIZE: usize = usize::MAX;

Expand Down Expand Up @@ -135,7 +132,6 @@ impl WritableKeyValueStore<ViewError> for AppStateStore {
}
}

#[async_trait]
impl KeyValueStore for AppStateStore {
type Error = ViewError;
}
Expand Down
1 change: 0 additions & 1 deletion linera-storage-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ path = "src/server.rs"
[dependencies]
anyhow.workspace = true
async-lock.workspace = true
async-trait.workspace = true
bcs.workspace = true
clap.workspace = true
linera-base.workspace = true
Expand Down
7 changes: 0 additions & 7 deletions linera-storage-service/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
use std::{mem, sync::Arc};

use async_lock::{RwLock, RwLockWriteGuard, Semaphore, SemaphoreGuard};
use async_trait::async_trait;
use linera_base::ensure;
#[cfg(with_testing)]
use linera_views::test_utils::generate_test_namespace;
Expand Down Expand Up @@ -58,7 +57,6 @@ pub struct ServiceStoreClient {
namespace: Vec<u8>,
}

#[async_trait]
impl ReadableKeyValueStore<ServiceContextError> for ServiceStoreClient {
const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
type Keys = Vec<Vec<u8>>;
Expand Down Expand Up @@ -199,14 +197,10 @@ impl ReadableKeyValueStore<ServiceContextError> for ServiceStoreClient {
}
}

#[async_trait]
impl WritableKeyValueStore<ServiceContextError> for ServiceStoreClient {
const MAX_VALUE_SIZE: usize = usize::MAX;

async fn write_batch(&self, batch: Batch, _base_key: &[u8]) -> Result<(), ServiceContextError> {
use linera_views::batch::WriteOperation;

use crate::client::Operation;
let mut statements = Vec::new();
let mut chunk_size = 0;
for operation in batch.operations {
Expand Down Expand Up @@ -345,7 +339,6 @@ impl ServiceStoreClient {
}
}

#[async_trait]
impl AdminKeyValueStore for ServiceStoreClient {
type Error = ServiceContextError;
type Config = ServiceStoreConfig;
Expand Down
1 change: 1 addition & 0 deletions linera-views/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ tempfile.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["rt", "sync"] }
tracing.workspace = true
trait-variant.workspace = true

[dev-dependencies]
linera-views = { path = ".", features = ["test"] }
Expand Down
4 changes: 2 additions & 2 deletions linera-views/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,8 @@ impl Batch {
/// Certain databases (e.g. DynamoDB) do not support the deletion by prefix.
/// Thus we need to access the databases in order to replace a `DeletePrefix`
/// by a vector of the keys to be removed.
#[async_trait]
pub trait DeletePrefixExpander {
#[trait_variant::make(DeletePrefixExpander: Send)]
pub trait LocalDeletePrefixExpander {
/// The error type that can happen when expanding the key_prefix.
type Error: Debug;

Expand Down
90 changes: 60 additions & 30 deletions linera-views/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,8 @@ pub trait KeyValueIterable<Error> {
}

/// Low-level, asynchronous read key-value operations. Useful for storage APIs not based on views.
#[async_trait]
pub trait ReadableKeyValueStore<E> {
#[trait_variant::make(ReadableKeyValueStore: Send)]
pub trait LocalReadableKeyValueStore<E> {
/// The maximal size of keys that can be stored.
const MAX_KEY_SIZE: usize;

Expand Down Expand Up @@ -324,33 +324,44 @@ pub trait ReadableKeyValueStore<E> {
/// Finds the `(key,value)` pairs matching the prefix. The prefix is not included in the returned keys.
async fn find_key_values_by_prefix(&self, key_prefix: &[u8]) -> Result<Self::KeyValues, E>;

// We can't use `async fn` here in the below implementations due to
// https://github.com/rust-lang/impl-trait-utils/issues/17, but once that bug is fixed
// we can revert them to `async fn` syntax, which is neater.

/// Reads a single `key` and deserializes the result if present.
async fn read_value<V: DeserializeOwned>(&self, key: &[u8]) -> Result<Option<V>, E>
fn read_value<V: DeserializeOwned>(
&self,
key: &[u8],
) -> impl Future<Output = Result<Option<V>, E>>
where
Self: Sync,
E: From<bcs::Error>,
Comment on lines +332 to 338
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use fn() -> impl Future instead of async fn() here because default implementations of async fn confuse trait_variant, but it seems fine with default implementations of -> impl Trait fns.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to add a comment in the code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be okay since if someone tries to change this it will fail to compile.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(and this applies to several places, so I'd have to copy-paste the comment several times in order to make sure the reader caught it)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Mathieu Baudet meant adding a TODO(#XXX) that corresponds to considering the resolution of that bug. There are several pending issues here like the confusion of the trait and having a mention could help in dealing with that. The idea is that if something needs you 10 minutes to figure out then a comment is worthwhile.

Copy link
Contributor Author

@Twey Twey Apr 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't add an issue because we're currently blocked on the upstream bug so there's not really much for us TODO — but I've added a comment above the default implementations with a link to the bug to save future readers the time of figuring it out :)

{
from_bytes_opt(&self.read_value_bytes(key).await?)
async { from_bytes_opt(&self.read_value_bytes(key).await?) }
}

/// Reads multiple `keys` and deserializes the results if present.
async fn read_multi_values<V: DeserializeOwned + Send>(
fn read_multi_values<V: DeserializeOwned + Send>(
&self,
keys: Vec<Vec<u8>>,
) -> Result<Vec<Option<V>>, E>
) -> impl Future<Output = Result<Vec<Option<V>>, E>>
where
Self: Sync,
E: From<bcs::Error>,
{
let mut values = Vec::with_capacity(keys.len());
for entry in self.read_multi_values_bytes(keys).await? {
values.push(from_bytes_opt(&entry)?);
async {
let mut values = Vec::with_capacity(keys.len());
for entry in self.read_multi_values_bytes(keys).await? {
values.push(from_bytes_opt(&entry)?);
}
Ok(values)
}
Ok(values)
}
}

/// Low-level, asynchronous write key-value operations. Useful for storage APIs not based on views.
#[async_trait]
pub trait WritableKeyValueStore<E> {
#[trait_variant::make(WritableKeyValueStore: Send)]
pub trait LocalWritableKeyValueStore<E> {
/// The maximal size of values that can be stored.
const MAX_VALUE_SIZE: usize;

Expand All @@ -363,10 +374,10 @@ pub trait WritableKeyValueStore<E> {
}

/// Low-level trait for the administration of stores and their namespaces.
#[async_trait]
pub trait AdminKeyValueStore: Sized {
#[trait_variant::make(AdminKeyValueStore: Send)]
pub trait LocalAdminKeyValueStore: Sized {
/// The error type returned by the store's methods.
type Error: Send;
type Error;

/// The configuration needed to interact with a new store.
type Config: Send + Sync;
Expand All @@ -378,11 +389,14 @@ pub trait AdminKeyValueStore: Sized {
async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error>;

/// Deletes all the existing namespaces.
async fn delete_all(config: &Self::Config) -> Result<(), Self::Error> {
for namespace in Self::list_all(config).await? {
Self::delete(config, &namespace).await?;
fn delete_all(config: &Self::Config) -> impl Future<Output = Result<(), Self::Error>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that really absolutely needed?
The async fn syntax is familiar and working well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the comment here — it's because trait-variant gets confused with default implementations of async fns.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I would also prefer to keep it as async fn if we could! I think we should undo this change as soon as trait-variant supports default-implemented async fns.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pertinent trait-variant issue: rust-lang/impl-trait-utils#17

async {
let namespaces = Self::list_all(config).await?;
for namespace in namespaces {
Self::delete(config, &namespace).await?;
}
Ok(())
}
Ok(())
}

/// Tests if a given namespace exists.
Expand All @@ -395,26 +409,30 @@ pub trait AdminKeyValueStore: Sized {
async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error>;

/// Initializes a storage if missing and provides it.
async fn maybe_create_and_connect(
fn maybe_create_and_connect(
config: &Self::Config,
namespace: &str,
) -> Result<Self, Self::Error> {
if !Self::exists(config, namespace).await? {
Self::create(config, namespace).await?;
) -> impl Future<Output = Result<Self, Self::Error>> {
async {
if !Self::exists(config, namespace).await? {
Self::create(config, namespace).await?;
}
Self::connect(config, namespace).await
}
Self::connect(config, namespace).await
}

/// Creates a new storage. Overwrites it if this namespace already exists.
async fn recreate_and_connect(
fn recreate_and_connect(
config: &Self::Config,
namespace: &str,
) -> Result<Self, Self::Error> {
if Self::exists(config, namespace).await? {
Self::delete(config, namespace).await?;
) -> impl Future<Output = Result<Self, Self::Error>> {
async {
if Self::exists(config, namespace).await? {
Self::delete(config, namespace).await?;
}
Self::create(config, namespace).await?;
Self::connect(config, namespace).await
}
Self::create(config, namespace).await?;
Self::connect(config, namespace).await
}
}

Expand All @@ -426,6 +444,18 @@ pub trait KeyValueStore:
type Error: Debug;
}

/// Low-level, asynchronous write and read key-value operations, without a `Send` bound. Useful for storage APIs not based on views.
pub trait LocalKeyValueStore:
LocalReadableKeyValueStore<Self::Error> + LocalWritableKeyValueStore<Self::Error>
{
/// The error type.
type Error: Debug;
}

impl<S: KeyValueStore> LocalKeyValueStore for S {
type Error = <Self as KeyValueStore>::Error;
}

#[doc(hidden)]
/// Iterates keys by reference in a vector of keys.
/// Inspired by https://depth-first.com/articles/2020/06/22/returning-rust-iterators/
Expand Down
12 changes: 4 additions & 8 deletions linera-views/src/dynamo_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,6 @@ pub struct DynamoDbStoreConfig {
pub common_config: CommonStoreConfig,
}

#[async_trait]
impl AdminKeyValueStore for DynamoDbStoreInternal {
type Error = DynamoDbContextError;
type Config = DynamoDbStoreConfig;
Expand Down Expand Up @@ -785,7 +784,6 @@ impl KeyValueIterable<DynamoDbContextError> for DynamoDbKeyValues {
}
}

#[async_trait]
impl ReadableKeyValueStore<DynamoDbContextError> for DynamoDbStoreInternal {
const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
type Keys = DynamoDbKeys;
Expand Down Expand Up @@ -818,8 +816,10 @@ impl ReadableKeyValueStore<DynamoDbContextError> for DynamoDbStoreInternal {
let handle = self.read_value_bytes_general(key_db);
handles.push(handle);
}
let result = join_all(handles).await;
Ok(result.into_iter().collect::<Result<_, _>>()?)
join_all(handles)
.await
.into_iter()
.collect::<Result<_, _>>()
}

async fn find_keys_by_prefix(
Expand Down Expand Up @@ -890,7 +890,6 @@ pub struct DynamoDbStore {
store: LruCachingStore<ValueSplittingStore<JournalingKeyValueStore<DynamoDbStoreInternal>>>,
}

#[async_trait]
impl ReadableKeyValueStore<DynamoDbContextError> for DynamoDbStore {
const MAX_KEY_SIZE: usize = MAX_KEY_SIZE - 4;
type Keys = Vec<Vec<u8>>;
Expand Down Expand Up @@ -930,7 +929,6 @@ impl ReadableKeyValueStore<DynamoDbContextError> for DynamoDbStore {
}
}

#[async_trait]
impl WritableKeyValueStore<DynamoDbContextError> for DynamoDbStore {
const MAX_VALUE_SIZE: usize = DynamoDbStoreInternal::MAX_VALUE_SIZE;

Expand All @@ -943,12 +941,10 @@ impl WritableKeyValueStore<DynamoDbContextError> for DynamoDbStore {
}
}

#[async_trait]
impl KeyValueStore for DynamoDbStore {
type Error = DynamoDbContextError;
}

#[async_trait]
impl AdminKeyValueStore for DynamoDbStore {
type Error = DynamoDbContextError;
type Config = DynamoDbStoreConfig;
Expand Down
4 changes: 0 additions & 4 deletions linera-views/src/journaling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ pub struct JournalingKeyValueStore<K> {
pub store: K,
}

#[async_trait]
impl<K> DeletePrefixExpander for &JournalingKeyValueStore<K>
where
K: DirectKeyValueStore + Send + Sync,
Expand All @@ -117,7 +116,6 @@ where
}
}

#[async_trait]
impl<K> ReadableKeyValueStore<K::Error> for JournalingKeyValueStore<K>
where
K: DirectKeyValueStore + Send + Sync,
Expand Down Expand Up @@ -161,7 +159,6 @@ where
}
}

#[async_trait]
impl<K> AdminKeyValueStore for JournalingKeyValueStore<K>
where
K: AdminKeyValueStore + Send + Sync,
Expand Down Expand Up @@ -195,7 +192,6 @@ where
}
}

#[async_trait]
impl<K> WritableKeyValueStore<K::Error> for JournalingKeyValueStore<K>
where
K: DirectKeyValueStore + Send + Sync,
Expand Down
Loading
Loading