From c8ebdb096df2d058c21a5b8d7c7e708d0ce53113 Mon Sep 17 00:00:00 2001 From: Robert Ying Date: Sun, 27 Jun 2021 10:11:21 -0700 Subject: [PATCH] move storage to separate crate --- Cargo.lock | 12 ++- Cargo.toml | 1 + backend/Cargo.toml | 2 +- backend/src/main.rs | 1 - storage/Cargo.toml | 12 +++ .../src/hash_map_storage.rs | 78 +----------------- storage/src/lib.rs | 5 ++ storage/src/storage.rs | 80 +++++++++++++++++++ 8 files changed, 111 insertions(+), 80 deletions(-) create mode 100644 storage/Cargo.toml rename backend/src/storage.rs => storage/src/hash_map_storage.rs (71%) create mode 100644 storage/src/lib.rs create mode 100644 storage/src/storage.rs diff --git a/Cargo.lock b/Cargo.lock index a9320a09..ba9e14f9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1023,7 +1023,6 @@ name = "shengji" version = "0.1.1-alpha.0" dependencies = [ "anyhow", - "async-trait", "futures", "lazy_static", "serde", @@ -1035,6 +1034,7 @@ dependencies = [ "slog-bunyan", "slog-term", "static_dir", + "storage", "tokio", "vergen", "warp", @@ -1187,6 +1187,16 @@ dependencies = [ "warp", ] +[[package]] +name = "storage" +version = "0.1.0" +dependencies = [ + "async-trait", + "serde", + "slog", + "tokio", +] + [[package]] name = "syn" version = "1.0.73" diff --git a/Cargo.toml b/Cargo.toml index c72ad09e..afa00e39 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ [workspace] members = [ "core", + "storage", "backend", "backend/backend-types", "frontend/shengji-wasm" diff --git a/backend/Cargo.toml b/backend/Cargo.toml index a40d6fc7..fd1e89b6 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -13,7 +13,6 @@ dynamic = ["slog-term"] [dependencies] anyhow = "1.0" -async-trait = "0.1" shengji-types = { path = "./backend-types" } futures = { version = "0.3" } lazy_static = "1.4.0" @@ -25,6 +24,7 @@ slog-async = "2.5" slog-bunyan = "2.2" slog-term = { version = "2.5", optional = true } static_dir = "0.2" +storage = { path = "../storage" } tokio = { version = "1.7", features = ["macros", "rt-multi-thread", "fs", "time", "sync", "io-util"] } warp = "0.3" zstd = "0.5" diff --git a/backend/src/main.rs b/backend/src/main.rs index f36842ca..c3679f3d 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -20,7 +20,6 @@ use warp::Filter; use shengji_core::{game_state, interactive, settings, types}; use shengji_types::{GameMessage, ZSTD_ZSTD_DICT}; -mod storage; use storage::{HashMapStorage, State, Storage}; /// Our global unique user id counter. diff --git a/storage/Cargo.toml b/storage/Cargo.toml new file mode 100644 index 00000000..e26e8adf --- /dev/null +++ b/storage/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "storage" +version = "0.1.0" +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-trait = "0.1" +serde = "1.0" +slog = "2.5" +tokio = { version = "1.7", features = ["sync"] } \ No newline at end of file diff --git a/backend/src/storage.rs b/storage/src/hash_map_storage.rs similarity index 71% rename from backend/src/storage.rs rename to storage/src/hash_map_storage.rs index a2ef020f..65626134 100644 --- a/backend/src/storage.rs +++ b/storage/src/hash_map_storage.rs @@ -4,86 +4,10 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use async_trait::async_trait; -use serde::{de::DeserializeOwned, Serialize}; use slog::{debug, info, Logger}; use tokio::sync::{mpsc, Mutex}; -pub trait State: Serialize + DeserializeOwned + Clone + Send { - /// Messages that can be sent by operations applied to the state. - type Message: Serialize + DeserializeOwned + Clone + Send; - - /// The key that this state corresponds to. - fn key(&self) -> &[u8]; - fn version(&self) -> u64; - - /// The version of the state. Changes to state require changes in the - /// version. The default version number must be zero. - fn new_from_key(key: Vec) -> Self; -} - -#[async_trait] -pub trait Storage: Clone + Send { - /// Put the state into storage, overwriting any existing value. - async fn put(self, state: S) -> Result<(), E>; - - /// Put the state into storage. If the version on the server doesn't match - /// the expected version, return an error. - async fn put_cas(self, expected_version: u64, state: S) -> Result<(), E>; - - /// Get the state corresponding to the key from storage. If it doesn't - /// exist, a new state will be instantiated with a default version. - async fn get(self, key: Vec) -> Result; - - /// Execute the provided operation based off of the version of the state. - /// - /// If the operation succeeds, the returned messages will be published to - /// the key and the corresponding state will be stored if its version - /// differs from the already-stored version. - /// - /// If the operation fails, state will not be changed, and the error will be - /// returned. - /// - /// This operation may also fail if the stored state's `version` field - /// differs from the one which is fetched at the beginning of the operation - /// -- it has compare-and-set semantics. - async fn execute_operation_with_messages( - self, - key: Vec, - operation: F, - ) -> Result - where - E2: From, - F: FnOnce(S) -> Result<(S, Vec), E2> + Send + 'static; - - /// Subscribe to messages about a given key. The `subscriber_id` is expected - /// to be unique across all subscribers. - async fn subscribe( - self, - key: Vec, - subscriber_id: usize, - ) -> Result, E>; - /// Publish to all subscribers for a given key. - async fn publish(self, key: Vec, message: S::Message) -> Result<(), E>; - /// Publish a message to a single subscriber, identified by subscriber id. - async fn publish_to_single_subscriber( - self, - key: Vec, - subscriber_id: usize, - message: S::Message, - ) -> Result<(), E>; - /// Unsubscribe a given subscriber and remove it from tracking. - async fn unsubscribe(self, key: Vec, subscriber_id: usize); - - /// This should be called on a regular basis to ensure that we don't leave - /// stale state in the storage layer. - async fn prune(self); - /// Count the number of active subscriptions and active states. - async fn stats(self) -> Result<(usize, usize), E>; - /// Get all of the keys stored in this storage backend. - async fn get_all_keys(self) -> Result>, E>; - /// Get the number of states that have been newly created. - async fn get_states_created(self) -> Result; -} +use crate::storage::{State, Storage}; #[allow(clippy::type_complexity)] pub struct HashMapStorage { diff --git a/storage/src/lib.rs b/storage/src/lib.rs new file mode 100644 index 00000000..a6eac9fb --- /dev/null +++ b/storage/src/lib.rs @@ -0,0 +1,5 @@ +mod hash_map_storage; +mod storage; + +pub use hash_map_storage::HashMapStorage; +pub use storage::{State, Storage}; diff --git a/storage/src/storage.rs b/storage/src/storage.rs new file mode 100644 index 00000000..5352ec92 --- /dev/null +++ b/storage/src/storage.rs @@ -0,0 +1,80 @@ +use async_trait::async_trait; +use serde::{de::DeserializeOwned, Serialize}; +use tokio::sync::mpsc; + +pub trait State: Serialize + DeserializeOwned + Clone + Send { + /// Messages that can be sent by operations applied to the state. + type Message: Serialize + DeserializeOwned + Clone + Send; + + /// The key that this state corresponds to. + fn key(&self) -> &[u8]; + fn version(&self) -> u64; + + /// The version of the state. Changes to state require changes in the + /// version. The default version number must be zero. + fn new_from_key(key: Vec) -> Self; +} + +#[async_trait] +pub trait Storage: Clone + Send { + /// Put the state into storage, overwriting any existing value. + async fn put(self, state: S) -> Result<(), E>; + + /// Put the state into storage. If the version on the server doesn't match + /// the expected version, return an error. + async fn put_cas(self, expected_version: u64, state: S) -> Result<(), E>; + + /// Get the state corresponding to the key from storage. If it doesn't + /// exist, a new state will be instantiated with a default version. + async fn get(self, key: Vec) -> Result; + + /// Execute the provided operation based off of the version of the state. + /// + /// If the operation succeeds, the returned messages will be published to + /// the key and the corresponding state will be stored if its version + /// differs from the already-stored version. + /// + /// If the operation fails, state will not be changed, and the error will be + /// returned. + /// + /// This operation may also fail if the stored state's `version` field + /// differs from the one which is fetched at the beginning of the operation + /// -- it has compare-and-set semantics. + async fn execute_operation_with_messages( + self, + key: Vec, + operation: F, + ) -> Result + where + E2: From, + F: FnOnce(S) -> Result<(S, Vec), E2> + Send + 'static; + + /// Subscribe to messages about a given key. The `subscriber_id` is expected + /// to be unique across all subscribers. + async fn subscribe( + self, + key: Vec, + subscriber_id: usize, + ) -> Result, E>; + /// Publish to all subscribers for a given key. + async fn publish(self, key: Vec, message: S::Message) -> Result<(), E>; + /// Publish a message to a single subscriber, identified by subscriber id. + async fn publish_to_single_subscriber( + self, + key: Vec, + subscriber_id: usize, + message: S::Message, + ) -> Result<(), E>; + /// Unsubscribe a given subscriber and remove it from tracking. + async fn unsubscribe(self, key: Vec, subscriber_id: usize); + + /// This should be called on a regular basis to ensure that we don't leave + /// stale state in the storage layer. + async fn prune(self); + /// Count the number of active subscriptions and active states. + async fn stats(self) -> Result<(usize, usize), E>; + /// Get all of the keys stored in this storage backend. + async fn get_all_keys(self) -> Result>, E>; + /// Get the number of states that have been newly created. + async fn get_states_created(self) -> Result; +}