Skip to content

Commit

Permalink
linera-views: add local variants of store traits (#1843)
Browse files Browse the repository at this point in the history
  • Loading branch information
Twey committed Apr 2, 2024
1 parent eb80076 commit 85502c9
Show file tree
Hide file tree
Showing 18 changed files with 154 additions and 184 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion examples/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 0 additions & 4 deletions linera-sdk/src/views/system_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

//! Functions and types to interface with the system API available to application views.

use async_trait::async_trait;
use linera_base::ensure;
use linera_views::{
batch::{Batch, WriteOperation},
Expand Down Expand Up @@ -40,7 +39,6 @@ impl AppStateStore {
}
}

#[async_trait]
impl ReadableKeyValueStore<ViewError> for AppStateStore {
// The AppStateStore of the system_api does not have limits
// on the size of its values.
Expand Down Expand Up @@ -101,7 +99,6 @@ impl ReadableKeyValueStore<ViewError> for AppStateStore {
}
}

#[async_trait]
impl WritableKeyValueStore<ViewError> for AppStateStore {
const MAX_VALUE_SIZE: usize = usize::MAX;

Expand Down Expand Up @@ -135,7 +132,6 @@ impl WritableKeyValueStore<ViewError> for AppStateStore {
}
}

#[async_trait]
impl KeyValueStore for AppStateStore {
type Error = ViewError;
}
Expand Down
1 change: 0 additions & 1 deletion linera-storage-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ path = "src/server.rs"
[dependencies]
anyhow.workspace = true
async-lock.workspace = true
async-trait.workspace = true
bcs.workspace = true
clap.workspace = true
linera-base.workspace = true
Expand Down
7 changes: 0 additions & 7 deletions linera-storage-service/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
use std::{mem, sync::Arc};

use async_lock::{RwLock, RwLockWriteGuard, Semaphore, SemaphoreGuard};
use async_trait::async_trait;
use linera_base::ensure;
#[cfg(with_testing)]
use linera_views::test_utils::generate_test_namespace;
Expand Down Expand Up @@ -58,7 +57,6 @@ pub struct ServiceStoreClient {
namespace: Vec<u8>,
}

#[async_trait]
impl ReadableKeyValueStore<ServiceContextError> for ServiceStoreClient {
const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
type Keys = Vec<Vec<u8>>;
Expand Down Expand Up @@ -199,14 +197,10 @@ impl ReadableKeyValueStore<ServiceContextError> for ServiceStoreClient {
}
}

#[async_trait]
impl WritableKeyValueStore<ServiceContextError> for ServiceStoreClient {
const MAX_VALUE_SIZE: usize = usize::MAX;

async fn write_batch(&self, batch: Batch, _base_key: &[u8]) -> Result<(), ServiceContextError> {
use linera_views::batch::WriteOperation;

use crate::client::Operation;
let mut statements = Vec::new();
let mut chunk_size = 0;
for operation in batch.operations {
Expand Down Expand Up @@ -345,7 +339,6 @@ impl ServiceStoreClient {
}
}

#[async_trait]
impl AdminKeyValueStore for ServiceStoreClient {
type Error = ServiceContextError;
type Config = ServiceStoreConfig;
Expand Down
1 change: 1 addition & 0 deletions linera-views/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ tempfile.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["rt", "sync"] }
tracing.workspace = true
trait-variant.workspace = true

[dev-dependencies]
linera-views = { path = ".", features = ["test"] }
Expand Down
4 changes: 2 additions & 2 deletions linera-views/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,8 @@ impl Batch {
/// Certain databases (e.g. DynamoDB) do not support the deletion by prefix.
/// Thus we need to access the databases in order to replace a `DeletePrefix`
/// by a vector of the keys to be removed.
#[async_trait]
pub trait DeletePrefixExpander {
#[trait_variant::make(DeletePrefixExpander: Send)]
pub trait LocalDeletePrefixExpander {
/// The error type that can happen when expanding the key_prefix.
type Error: Debug;

Expand Down
90 changes: 60 additions & 30 deletions linera-views/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,8 @@ pub trait KeyValueIterable<Error> {
}

/// Low-level, asynchronous read key-value operations. Useful for storage APIs not based on views.
#[async_trait]
pub trait ReadableKeyValueStore<E> {
#[trait_variant::make(ReadableKeyValueStore: Send)]
pub trait LocalReadableKeyValueStore<E> {
/// The maximal size of keys that can be stored.
const MAX_KEY_SIZE: usize;

Expand Down Expand Up @@ -324,33 +324,44 @@ pub trait ReadableKeyValueStore<E> {
/// Finds the `(key,value)` pairs matching the prefix. The prefix is not included in the returned keys.
async fn find_key_values_by_prefix(&self, key_prefix: &[u8]) -> Result<Self::KeyValues, E>;

// We can't use `async fn` here in the below implementations due to
// https://github.com/rust-lang/impl-trait-utils/issues/17, but once that bug is fixed
// we can revert them to `async fn` syntax, which is neater.

/// Reads a single `key` and deserializes the result if present.
async fn read_value<V: DeserializeOwned>(&self, key: &[u8]) -> Result<Option<V>, E>
fn read_value<V: DeserializeOwned>(
&self,
key: &[u8],
) -> impl Future<Output = Result<Option<V>, E>>
where
Self: Sync,
E: From<bcs::Error>,
{
from_bytes_opt(&self.read_value_bytes(key).await?)
async { from_bytes_opt(&self.read_value_bytes(key).await?) }
}

/// Reads multiple `keys` and deserializes the results if present.
async fn read_multi_values<V: DeserializeOwned + Send>(
fn read_multi_values<V: DeserializeOwned + Send>(
&self,
keys: Vec<Vec<u8>>,
) -> Result<Vec<Option<V>>, E>
) -> impl Future<Output = Result<Vec<Option<V>>, E>>
where
Self: Sync,
E: From<bcs::Error>,
{
let mut values = Vec::with_capacity(keys.len());
for entry in self.read_multi_values_bytes(keys).await? {
values.push(from_bytes_opt(&entry)?);
async {
let mut values = Vec::with_capacity(keys.len());
for entry in self.read_multi_values_bytes(keys).await? {
values.push(from_bytes_opt(&entry)?);
}
Ok(values)
}
Ok(values)
}
}

/// Low-level, asynchronous write key-value operations. Useful for storage APIs not based on views.
#[async_trait]
pub trait WritableKeyValueStore<E> {
#[trait_variant::make(WritableKeyValueStore: Send)]
pub trait LocalWritableKeyValueStore<E> {
/// The maximal size of values that can be stored.
const MAX_VALUE_SIZE: usize;

Expand All @@ -363,10 +374,10 @@ pub trait WritableKeyValueStore<E> {
}

/// Low-level trait for the administration of stores and their namespaces.
#[async_trait]
pub trait AdminKeyValueStore: Sized {
#[trait_variant::make(AdminKeyValueStore: Send)]
pub trait LocalAdminKeyValueStore: Sized {
/// The error type returned by the store's methods.
type Error: Send;
type Error;

/// The configuration needed to interact with a new store.
type Config: Send + Sync;
Expand All @@ -378,11 +389,14 @@ pub trait AdminKeyValueStore: Sized {
async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error>;

/// Deletes all the existing namespaces.
async fn delete_all(config: &Self::Config) -> Result<(), Self::Error> {
for namespace in Self::list_all(config).await? {
Self::delete(config, &namespace).await?;
fn delete_all(config: &Self::Config) -> impl Future<Output = Result<(), Self::Error>> {
async {
let namespaces = Self::list_all(config).await?;
for namespace in namespaces {
Self::delete(config, &namespace).await?;
}
Ok(())
}
Ok(())
}

/// Tests if a given namespace exists.
Expand All @@ -395,26 +409,30 @@ pub trait AdminKeyValueStore: Sized {
async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error>;

/// Initializes a storage if missing and provides it.
async fn maybe_create_and_connect(
fn maybe_create_and_connect(
config: &Self::Config,
namespace: &str,
) -> Result<Self, Self::Error> {
if !Self::exists(config, namespace).await? {
Self::create(config, namespace).await?;
) -> impl Future<Output = Result<Self, Self::Error>> {
async {
if !Self::exists(config, namespace).await? {
Self::create(config, namespace).await?;
}
Self::connect(config, namespace).await
}
Self::connect(config, namespace).await
}

/// Creates a new storage. Overwrites it if this namespace already exists.
async fn recreate_and_connect(
fn recreate_and_connect(
config: &Self::Config,
namespace: &str,
) -> Result<Self, Self::Error> {
if Self::exists(config, namespace).await? {
Self::delete(config, namespace).await?;
) -> impl Future<Output = Result<Self, Self::Error>> {
async {
if Self::exists(config, namespace).await? {
Self::delete(config, namespace).await?;
}
Self::create(config, namespace).await?;
Self::connect(config, namespace).await
}
Self::create(config, namespace).await?;
Self::connect(config, namespace).await
}
}

Expand All @@ -426,6 +444,18 @@ pub trait KeyValueStore:
type Error: Debug;
}

/// Low-level, asynchronous write and read key-value operations, without a `Send` bound. Useful for storage APIs not based on views.
pub trait LocalKeyValueStore:
LocalReadableKeyValueStore<Self::Error> + LocalWritableKeyValueStore<Self::Error>
{
/// The error type.
type Error: Debug;
}

impl<S: KeyValueStore> LocalKeyValueStore for S {
type Error = <Self as KeyValueStore>::Error;
}

#[doc(hidden)]
/// Iterates keys by reference in a vector of keys.
/// Inspired by https://depth-first.com/articles/2020/06/22/returning-rust-iterators/
Expand Down
12 changes: 4 additions & 8 deletions linera-views/src/dynamo_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,6 @@ pub struct DynamoDbStoreConfig {
pub common_config: CommonStoreConfig,
}

#[async_trait]
impl AdminKeyValueStore for DynamoDbStoreInternal {
type Error = DynamoDbContextError;
type Config = DynamoDbStoreConfig;
Expand Down Expand Up @@ -785,7 +784,6 @@ impl KeyValueIterable<DynamoDbContextError> for DynamoDbKeyValues {
}
}

#[async_trait]
impl ReadableKeyValueStore<DynamoDbContextError> for DynamoDbStoreInternal {
const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
type Keys = DynamoDbKeys;
Expand Down Expand Up @@ -818,8 +816,10 @@ impl ReadableKeyValueStore<DynamoDbContextError> for DynamoDbStoreInternal {
let handle = self.read_value_bytes_general(key_db);
handles.push(handle);
}
let result = join_all(handles).await;
Ok(result.into_iter().collect::<Result<_, _>>()?)
join_all(handles)
.await
.into_iter()
.collect::<Result<_, _>>()
}

async fn find_keys_by_prefix(
Expand Down Expand Up @@ -890,7 +890,6 @@ pub struct DynamoDbStore {
store: LruCachingStore<ValueSplittingStore<JournalingKeyValueStore<DynamoDbStoreInternal>>>,
}

#[async_trait]
impl ReadableKeyValueStore<DynamoDbContextError> for DynamoDbStore {
const MAX_KEY_SIZE: usize = MAX_KEY_SIZE - 4;
type Keys = Vec<Vec<u8>>;
Expand Down Expand Up @@ -930,7 +929,6 @@ impl ReadableKeyValueStore<DynamoDbContextError> for DynamoDbStore {
}
}

#[async_trait]
impl WritableKeyValueStore<DynamoDbContextError> for DynamoDbStore {
const MAX_VALUE_SIZE: usize = DynamoDbStoreInternal::MAX_VALUE_SIZE;

Expand All @@ -943,12 +941,10 @@ impl WritableKeyValueStore<DynamoDbContextError> for DynamoDbStore {
}
}

#[async_trait]
impl KeyValueStore for DynamoDbStore {
type Error = DynamoDbContextError;
}

#[async_trait]
impl AdminKeyValueStore for DynamoDbStore {
type Error = DynamoDbContextError;
type Config = DynamoDbStoreConfig;
Expand Down
4 changes: 0 additions & 4 deletions linera-views/src/journaling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ pub struct JournalingKeyValueStore<K> {
pub store: K,
}

#[async_trait]
impl<K> DeletePrefixExpander for &JournalingKeyValueStore<K>
where
K: DirectKeyValueStore + Send + Sync,
Expand All @@ -117,7 +116,6 @@ where
}
}

#[async_trait]
impl<K> ReadableKeyValueStore<K::Error> for JournalingKeyValueStore<K>
where
K: DirectKeyValueStore + Send + Sync,
Expand Down Expand Up @@ -161,7 +159,6 @@ where
}
}

#[async_trait]
impl<K> AdminKeyValueStore for JournalingKeyValueStore<K>
where
K: AdminKeyValueStore + Send + Sync,
Expand Down Expand Up @@ -195,7 +192,6 @@ where
}
}

#[async_trait]
impl<K> WritableKeyValueStore<K::Error> for JournalingKeyValueStore<K>
where
K: DirectKeyValueStore + Send + Sync,
Expand Down
Loading

0 comments on commit 85502c9

Please sign in to comment.