Skip to content

Commit

Permalink
move storage to separate crate
Browse files Browse the repository at this point in the history
  • Loading branch information
rbtying committed Jun 27, 2021
1 parent a2bd0f8 commit c8ebdb0
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 80 deletions.
12 changes: 11 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
@@ -1,6 +1,7 @@
[workspace]
members = [
"core",
"storage",
"backend",
"backend/backend-types",
"frontend/shengji-wasm"
Expand Down
2 changes: 1 addition & 1 deletion backend/Cargo.toml
Expand Up @@ -13,7 +13,6 @@ dynamic = ["slog-term"]

[dependencies]
anyhow = "1.0"
async-trait = "0.1"
shengji-types = { path = "./backend-types" }
futures = { version = "0.3" }
lazy_static = "1.4.0"
Expand All @@ -25,6 +24,7 @@ slog-async = "2.5"
slog-bunyan = "2.2"
slog-term = { version = "2.5", optional = true }
static_dir = "0.2"
storage = { path = "../storage" }
tokio = { version = "1.7", features = ["macros", "rt-multi-thread", "fs", "time", "sync", "io-util"] }
warp = "0.3"
zstd = "0.5"
Expand Down
1 change: 0 additions & 1 deletion backend/src/main.rs
Expand Up @@ -20,7 +20,6 @@ use warp::Filter;
use shengji_core::{game_state, interactive, settings, types};
use shengji_types::{GameMessage, ZSTD_ZSTD_DICT};

mod storage;
use storage::{HashMapStorage, State, Storage};

/// Our global unique user id counter.
Expand Down
12 changes: 12 additions & 0 deletions storage/Cargo.toml
@@ -0,0 +1,12 @@
[package]
name = "storage"
version = "0.1.0"
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-trait = "0.1"
serde = "1.0"
slog = "2.5"
tokio = { version = "1.7", features = ["sync"] }
78 changes: 1 addition & 77 deletions backend/src/storage.rs → storage/src/hash_map_storage.rs
Expand Up @@ -4,86 +4,10 @@ use std::sync::Arc;
use std::time::{Duration, Instant};

use async_trait::async_trait;
use serde::{de::DeserializeOwned, Serialize};
use slog::{debug, info, Logger};
use tokio::sync::{mpsc, Mutex};

pub trait State: Serialize + DeserializeOwned + Clone + Send {
/// Messages that can be sent by operations applied to the state.
type Message: Serialize + DeserializeOwned + Clone + Send;

/// The key that this state corresponds to.
fn key(&self) -> &[u8];
fn version(&self) -> u64;

/// The version of the state. Changes to state require changes in the
/// version. The default version number must be zero.
fn new_from_key(key: Vec<u8>) -> Self;
}

#[async_trait]
pub trait Storage<S: State, E>: Clone + Send {
/// Put the state into storage, overwriting any existing value.
async fn put(self, state: S) -> Result<(), E>;

/// Put the state into storage. If the version on the server doesn't match
/// the expected version, return an error.
async fn put_cas(self, expected_version: u64, state: S) -> Result<(), E>;

/// Get the state corresponding to the key from storage. If it doesn't
/// exist, a new state will be instantiated with a default version.
async fn get(self, key: Vec<u8>) -> Result<S, E>;

/// Execute the provided operation based off of the version of the state.
///
/// If the operation succeeds, the returned messages will be published to
/// the key and the corresponding state will be stored if its version
/// differs from the already-stored version.
///
/// If the operation fails, state will not be changed, and the error will be
/// returned.
///
/// This operation may also fail if the stored state's `version` field
/// differs from the one which is fetched at the beginning of the operation
/// -- it has compare-and-set semantics.
async fn execute_operation_with_messages<E2, F>(
self,
key: Vec<u8>,
operation: F,
) -> Result<u64, E2>
where
E2: From<E>,
F: FnOnce(S) -> Result<(S, Vec<S::Message>), E2> + Send + 'static;

/// Subscribe to messages about a given key. The `subscriber_id` is expected
/// to be unique across all subscribers.
async fn subscribe(
self,
key: Vec<u8>,
subscriber_id: usize,
) -> Result<mpsc::UnboundedReceiver<S::Message>, E>;
/// Publish to all subscribers for a given key.
async fn publish(self, key: Vec<u8>, message: S::Message) -> Result<(), E>;
/// Publish a message to a single subscriber, identified by subscriber id.
async fn publish_to_single_subscriber(
self,
key: Vec<u8>,
subscriber_id: usize,
message: S::Message,
) -> Result<(), E>;
/// Unsubscribe a given subscriber and remove it from tracking.
async fn unsubscribe(self, key: Vec<u8>, subscriber_id: usize);

/// This should be called on a regular basis to ensure that we don't leave
/// stale state in the storage layer.
async fn prune(self);
/// Count the number of active subscriptions and active states.
async fn stats(self) -> Result<(usize, usize), E>;
/// Get all of the keys stored in this storage backend.
async fn get_all_keys(self) -> Result<Vec<Vec<u8>>, E>;
/// Get the number of states that have been newly created.
async fn get_states_created(self) -> Result<u64, E>;
}
use crate::storage::{State, Storage};

#[allow(clippy::type_complexity)]
pub struct HashMapStorage<S: State> {
Expand Down
5 changes: 5 additions & 0 deletions storage/src/lib.rs
@@ -0,0 +1,5 @@
mod hash_map_storage;
mod storage;

pub use hash_map_storage::HashMapStorage;
pub use storage::{State, Storage};
80 changes: 80 additions & 0 deletions storage/src/storage.rs
@@ -0,0 +1,80 @@
use async_trait::async_trait;
use serde::{de::DeserializeOwned, Serialize};
use tokio::sync::mpsc;

pub trait State: Serialize + DeserializeOwned + Clone + Send {
/// Messages that can be sent by operations applied to the state.
type Message: Serialize + DeserializeOwned + Clone + Send;

/// The key that this state corresponds to.
fn key(&self) -> &[u8];
fn version(&self) -> u64;

/// The version of the state. Changes to state require changes in the
/// version. The default version number must be zero.
fn new_from_key(key: Vec<u8>) -> Self;
}

#[async_trait]
pub trait Storage<S: State, E>: Clone + Send {
/// Put the state into storage, overwriting any existing value.
async fn put(self, state: S) -> Result<(), E>;

/// Put the state into storage. If the version on the server doesn't match
/// the expected version, return an error.
async fn put_cas(self, expected_version: u64, state: S) -> Result<(), E>;

/// Get the state corresponding to the key from storage. If it doesn't
/// exist, a new state will be instantiated with a default version.
async fn get(self, key: Vec<u8>) -> Result<S, E>;

/// Execute the provided operation based off of the version of the state.
///
/// If the operation succeeds, the returned messages will be published to
/// the key and the corresponding state will be stored if its version
/// differs from the already-stored version.
///
/// If the operation fails, state will not be changed, and the error will be
/// returned.
///
/// This operation may also fail if the stored state's `version` field
/// differs from the one which is fetched at the beginning of the operation
/// -- it has compare-and-set semantics.
async fn execute_operation_with_messages<E2, F>(
self,
key: Vec<u8>,
operation: F,
) -> Result<u64, E2>
where
E2: From<E>,
F: FnOnce(S) -> Result<(S, Vec<S::Message>), E2> + Send + 'static;

/// Subscribe to messages about a given key. The `subscriber_id` is expected
/// to be unique across all subscribers.
async fn subscribe(
self,
key: Vec<u8>,
subscriber_id: usize,
) -> Result<mpsc::UnboundedReceiver<S::Message>, E>;
/// Publish to all subscribers for a given key.
async fn publish(self, key: Vec<u8>, message: S::Message) -> Result<(), E>;
/// Publish a message to a single subscriber, identified by subscriber id.
async fn publish_to_single_subscriber(
self,
key: Vec<u8>,
subscriber_id: usize,
message: S::Message,
) -> Result<(), E>;
/// Unsubscribe a given subscriber and remove it from tracking.
async fn unsubscribe(self, key: Vec<u8>, subscriber_id: usize);

/// This should be called on a regular basis to ensure that we don't leave
/// stale state in the storage layer.
async fn prune(self);
/// Count the number of active subscriptions and active states.
async fn stats(self) -> Result<(usize, usize), E>;
/// Get all of the keys stored in this storage backend.
async fn get_all_keys(self) -> Result<Vec<Vec<u8>>, E>;
/// Get the number of states that have been newly created.
async fn get_states_created(self) -> Result<u64, E>;
}

0 comments on commit c8ebdb0

Please sign in to comment.