Skip to content

Commit

Permalink
Make Storage::Transaction a GAT
Browse files Browse the repository at this point in the history
By making the Storage::Transaction a GAT, we can now create
Transaction implementations which don't need to own their data,
because the Transaction can borrow from Storage.

This fixes #737.
  • Loading branch information
tillrohrmann committed Aug 26, 2023
1 parent bee15c6 commit 355f35f
Show file tree
Hide file tree
Showing 13 changed files with 46 additions and 33 deletions.
10 changes: 7 additions & 3 deletions src/storage_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ pub mod status_table;
pub mod timer_table;

pub trait Storage {
type TransactionType: Transaction;
type TransactionType<'a>: Transaction
where
Self: 'a;

fn transaction(&self) -> Self::TransactionType;
fn transaction(&self) -> Self::TransactionType<'_>;
}

pub trait Transaction:
Expand All @@ -60,5 +62,7 @@ pub trait Transaction:
+ timer_table::TimerTable
+ Send
{
fn commit(self) -> GetFuture<'static, ()>;
fn commit<'a>(self) -> GetFuture<'a, ()>
where
Self: 'a;
}
2 changes: 1 addition & 1 deletion src/storage_rocksdb/src/deduplication_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ define_table_key!(
)
);

impl DeduplicationTable for RocksDBTransaction {
impl<'a> DeduplicationTable for RocksDBTransaction<'a> {
fn get_sequence_number(
&mut self,
partition_id: PartitionId,
Expand Down
2 changes: 1 addition & 1 deletion src/storage_rocksdb/src/fsm_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ define_table_key!(
PartitionStateMachineKey(partition_id: PartitionId, state_id: u64)
);

impl FsmTable for RocksDBTransaction {
impl<'a> FsmTable for RocksDBTransaction<'a> {
fn get(&mut self, partition_id: PartitionId, state_id: u64) -> GetFuture<Option<Bytes>> {
let key = PartitionStateMachineKey::default()
.partition_id(partition_id)
Expand Down
2 changes: 1 addition & 1 deletion src/storage_rocksdb/src/inbox_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ define_table_key!(
)
);

impl InboxTable for RocksDBTransaction {
impl<'a> InboxTable for RocksDBTransaction<'a> {
fn put_invocation(
&mut self,
service_id: &ServiceId,
Expand Down
2 changes: 1 addition & 1 deletion src/storage_rocksdb/src/journal_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ fn write_journal_entry_key(service_id: &ServiceId, journal_index: u32) -> Journa
.journal_index(journal_index)
}

impl JournalTable for RocksDBTransaction {
impl<'a> JournalTable for RocksDBTransaction<'a> {
fn put_journal_entry(
&mut self,
service_id: &ServiceId,
Expand Down
25 changes: 14 additions & 11 deletions src/storage_rocksdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,25 +420,25 @@ impl RocksDBStorage {
pub fn transaction(&self) -> RocksDBTransaction {
RocksDBTransaction {
write_batch: Default::default(),
storage: Clone::clone(self),
storage: self,
key_buffer: Default::default(),
value_buffer: Default::default(),
}
}
}

impl Storage for RocksDBStorage {
type TransactionType = RocksDBTransaction;
type TransactionType<'a> = RocksDBTransaction<'a>;

#[allow(clippy::needless_lifetimes)]
fn transaction(&self) -> Self::TransactionType {
fn transaction(&self) -> Self::TransactionType<'_> {
RocksDBStorage::transaction(self)
}
}

pub struct RocksDBTransaction {
pub struct RocksDBTransaction<'a> {
write_batch: Option<WriteBatch>,
storage: RocksDBStorage,
storage: &'a RocksDBStorage,
key_buffer: BytesMut,
value_buffer: BytesMut,
}
Expand Down Expand Up @@ -479,7 +479,7 @@ impl<T: Send + 'static> Stream for BackgroundScanStream<T> {
}
}

impl RocksDBTransaction {
impl<'a> RocksDBTransaction<'a> {
pub fn get_blocking<K, F, R>(&mut self, key: K, f: F) -> GetFuture<'static, R>
where
K: TableKey + Send + 'static,
Expand All @@ -490,7 +490,7 @@ impl RocksDBTransaction {
key.serialize_to(&mut buf);
let buf = buf.split();

let db = Clone::clone(&self.storage);
let db = Clone::clone(self.storage);
tokio::task::spawn_blocking(move || match db.get(K::table(), &buf) {
Ok(value) => {
let slice = value.as_ref().map(|v| v.as_ref());
Expand All @@ -512,7 +512,7 @@ impl RocksDBTransaction {
F: FnOnce(Option<(&[u8], &[u8])>) -> Result<R> + Send + 'static,
R: Send + 'static,
{
let db = Clone::clone(&self.storage);
let db = Clone::clone(self.storage);
let background_task = move || {
let iterator = db.iterator_from(scan);
f(iterator.item())
Expand All @@ -537,7 +537,7 @@ impl RocksDBTransaction {
R: Send + 'static,
{
let (tx, rx) = tokio::sync::mpsc::channel::<Result<R>>(256);
let db = Clone::clone(&self.storage);
let db = Clone::clone(self.storage);

let background_task = move || {
let mut iterator = db.iterator_from(scan);
Expand Down Expand Up @@ -618,8 +618,11 @@ impl RocksDBTransaction {
}
}

impl Transaction for RocksDBTransaction {
fn commit(self) -> GetFuture<'static, ()> {
impl<'a> Transaction for RocksDBTransaction<'a> {
fn commit<'b>(self) -> GetFuture<'b, ()>
where
Self: 'b,
{
if self.write_batch.is_none() {
return ok(()).boxed();
}
Expand Down
2 changes: 1 addition & 1 deletion src/storage_rocksdb/src/outbox_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ define_table_key!(
OutboxKey(partition_id: PartitionId, message_index: u64)
);

impl OutboxTable for RocksDBTransaction {
impl<'a> OutboxTable for RocksDBTransaction<'a> {
fn add_message(
&mut self,
partition_id: PartitionId,
Expand Down
2 changes: 1 addition & 1 deletion src/storage_rocksdb/src/state_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ fn user_state_key_from_slice(key: &[u8]) -> Result<Bytes> {
Ok(key)
}

impl StateTable for RocksDBTransaction {
impl<'a> StateTable for RocksDBTransaction<'a> {
fn put_user_state(
&mut self,
service_id: &ServiceId,
Expand Down
2 changes: 1 addition & 1 deletion src/storage_rocksdb/src/status_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ fn status_key_from_bytes(mut bytes: Bytes) -> crate::Result<ServiceId> {
))
}

impl StatusTable for RocksDBTransaction {
impl<'a> StatusTable for RocksDBTransaction<'a> {
fn put_invocation_status(
&mut self,
service_id: &ServiceId,
Expand Down
2 changes: 1 addition & 1 deletion src/storage_rocksdb/src/timer_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ fn timer_key_from_key_slice(slice: &[u8]) -> Result<TimerKey> {
Ok(timer_key)
}

impl TimerTable for RocksDBTransaction {
impl<'a> TimerTable for RocksDBTransaction<'a> {
fn add_timer(&mut self, partition_id: PartitionId, key: &TimerKey, timer: Timer) -> PutFuture {
let key = write_timer_key(partition_id, key);
let value = ProtoValue(storage::v1::Timer::from(timer));
Expand Down
4 changes: 3 additions & 1 deletion src/worker/src/partition/effects/interpreter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,9 @@ impl CommitError {

pub(crate) trait Committable {
// TODO: Replace with async trait or proper future
fn commit(self) -> BoxFuture<'static, Result<(), CommitError>>;
fn commit<'a>(self) -> BoxFuture<'a, Result<(), CommitError>>
where
Self: 'a;
}

#[must_use = "Don't forget to commit the interpretation result"]
Expand Down
4 changes: 2 additions & 2 deletions src/worker/src/partition/storage/invoker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl<Storage> InvokerStorageReader<Storage> {

impl<Storage> restate_invoker_api::JournalReader for InvokerStorageReader<Storage>
where
Storage: restate_storage_api::Storage,
for<'a> Storage: restate_storage_api::Storage + 'a,
{
type JournalStream = stream::Iter<IntoIter<PlainRawEntry>>;
type Error = InvokerStorageReaderError;
Expand Down Expand Up @@ -88,7 +88,7 @@ where

impl<Storage> restate_invoker_api::StateReader for InvokerStorageReader<Storage>
where
Storage: restate_storage_api::Storage,
for<'a> Storage: restate_storage_api::Storage + 'a,
{
type StateIter = IntoIter<(Bytes, Bytes)>;
type Error = InvokerStorageReaderError;
Expand Down
20 changes: 12 additions & 8 deletions src/worker/src/partition/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ where
}
}

pub(super) fn create_transaction(&self) -> Transaction<Storage::TransactionType> {
pub(super) fn create_transaction(&self) -> Transaction<Storage::TransactionType<'_>> {
Transaction::new(
self.partition_id,
self.partition_key_range.clone(),
Expand Down Expand Up @@ -112,9 +112,10 @@ where
}
}

pub(super) fn commit(
self,
) -> BoxFuture<'static, Result<(), restate_storage_api::StorageError>> {
pub(super) fn commit<'a>(self) -> BoxFuture<'a, Result<(), restate_storage_api::StorageError>>
where
Self: 'a,
{
self.inner.commit()
}

Expand Down Expand Up @@ -560,16 +561,19 @@ mod fsm_variable {

impl<TransactionType> Committable for Transaction<TransactionType>
where
TransactionType: restate_storage_api::Transaction + 'static,
TransactionType: restate_storage_api::Transaction,
{
fn commit(self) -> BoxFuture<'static, Result<(), CommitError>> {
fn commit<'a>(self) -> BoxFuture<'a, Result<(), CommitError>>
where
Self: 'a,
{
async { self.inner.commit().await.map_err(CommitError::with_source) }.boxed()
}
}

impl<Storage> OutboxReader for PartitionStorage<Storage>
where
Storage: restate_storage_api::Storage + 'static,
for<'a> Storage: restate_storage_api::Storage + 'a,
{
fn get_next_message(
&self,
Expand Down Expand Up @@ -599,7 +603,7 @@ where

impl<Storage> TimerReader<TimerValue> for PartitionStorage<Storage>
where
Storage: restate_storage_api::Storage + Send + Sync,
for<'a> Storage: restate_storage_api::Storage + Send + Sync + 'a,
{
type TimerStream<'a> = BoxStream<'a, TimerValue> where Self: 'a;

Expand Down

0 comments on commit 355f35f

Please sign in to comment.