Skip to content

Commit

Permalink
change store type dyn -> static (#54)
Browse files Browse the repository at this point in the history
* change store type dyn -> static

* Any place where u can use Stores, you can also use any Store

* use match instead of if statements and add s3s and s3s+lts

---------

Co-authored-by: Muhamad Azamy <muhamad@incubaid.com>
  • Loading branch information
rawdaGastan and muhamadazmy committed May 31, 2024
1 parent 988fbf2 commit 502d0fa
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 115 deletions.
5 changes: 4 additions & 1 deletion build.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
fn main() {
println!(
"cargo:rustc-env=GIT_VERSION={}",
git_version::git_version!(args = ["--tags", "--always", "--dirty=-modified"], fallback = "unknown")
git_version::git_version!(
args = ["--tags", "--always", "--dirty=-modified"],
fallback = "unknown"
)
);
}
2 changes: 1 addition & 1 deletion src/fs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::fungi::{
};
use crate::store::Store;

use anyhow::{ensure, Result, Context};
use anyhow::{ensure, Context, Result};
use polyfuse::reply::FileAttr;
use polyfuse::{
op,
Expand Down
8 changes: 4 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ mod test {
let store1 = DirStore::new(root.join("store1")).await.unwrap();
let mut store = Router::new();

store.add(0x00, 0x7f, Box::new(store0));
store.add(0x80, 0xff, Box::new(store1));
store.add(0x00, 0x7f, store0);
store.add(0x80, 0xff, store1);

pack(writer, store, &source, false).await.unwrap();

Expand All @@ -72,8 +72,8 @@ mod test {
let store1 = DirStore::new(root.join("store1")).await.unwrap();
let mut store = Router::new();

store.add(0x00, 0x7f, Box::new(store0));
store.add(0x80, 0xff, Box::new(store1));
store.add(0x00, 0x7f, store0);
store.add(0x80, 0xff, store1);

let cache = Cache::new(root.join("cache"), store);

Expand Down
6 changes: 3 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use clap::{ArgAction, Args, Parser, Subcommand};

use rfs::cache;
use rfs::fungi;
use rfs::store::{self, Router};
use rfs::store::{self, Router, Stores};

use regex::Regex;

Expand Down Expand Up @@ -230,7 +230,7 @@ async fn fuse(opts: MountOptions) -> Result<()> {
filesystem.mount(opts.target).await
}

async fn get_router(meta: &fungi::Reader) -> Result<Router> {
async fn get_router(meta: &fungi::Reader) -> Result<Router<Stores>> {
let mut router = store::Router::new();

for route in meta.routes().await.context("failed to get store routes")? {
Expand All @@ -243,7 +243,7 @@ async fn get_router(meta: &fungi::Reader) -> Result<Router> {
Ok(router)
}

async fn parse_router(urls: &[String]) -> Result<Router> {
async fn parse_router(urls: &[String]) -> Result<Router<Stores>> {
let mut router = Router::new();
let pattern = r"^(?P<range>[0-9a-f]{2}-[0-9a-f]{2})=(?P<url>.+)$";
let re = Regex::new(pattern)?;
Expand Down
25 changes: 10 additions & 15 deletions src/store/dir.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,11 @@
use super::{Error, FactoryFuture, Result, Route, Store};
use super::{Error, Result, Route, Store};
use std::io::ErrorKind;
use std::os::unix::prelude::OsStrExt;
use std::path::PathBuf;
use tokio::fs;
use url;

const SCHEME: &str = "dir";

async fn make_inner(url: String) -> Result<Box<dyn Store>> {
let u = url::Url::parse(&url)?;
if u.scheme() != SCHEME {
return Err(Error::InvalidScheme(u.scheme().into(), SCHEME.into()));
}

Ok(Box::new(DirStore::new(u.path()).await?))
}

pub fn make(url: &str) -> FactoryFuture {
Box::pin(make_inner(url.into()))
}
pub const SCHEME: &str = "dir";

/// DirStore is a simple store that store blobs on the filesystem
/// and is mainly used for testing
Expand All @@ -29,6 +16,14 @@ pub struct DirStore {
}

impl DirStore {
pub async fn make<U: AsRef<str>>(url: &U) -> Result<DirStore> {
let u = url::Url::parse(url.as_ref())?;
if u.scheme() != SCHEME {
return Err(Error::InvalidScheme(u.scheme().into(), SCHEME.into()));
}

Ok(DirStore::new(u.path()).await?)
}
pub async fn new<P: Into<PathBuf>>(root: P) -> Result<Self> {
let root = root.into();
fs::create_dir_all(&root).await?;
Expand Down
96 changes: 46 additions & 50 deletions src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,32 @@ pub mod s3store;
pub mod zdb;

use rand::seq::SliceRandom;
use std::{collections::HashMap, pin::Pin};

pub use bs::BlockStore;
use futures::Future;

lazy_static::lazy_static! {
static ref STORES: HashMap<String, Factory> = register_stores();
}

/// register_stores is used to register the stores built in types
/// so they can be created with a url
fn register_stores() -> HashMap<String, Factory> {
let mut m: HashMap<String, Factory> = HashMap::default();
m.insert("dir".into(), dir::make);
m.insert("zdb".into(), zdb::make);
m.insert("s3".into(), s3store::make);
m.insert("s3s".into(), s3store::make);
m.insert("s3s+tls".into(), s3store::make);

m
}
pub use self::router::Router;

pub async fn make<U: AsRef<str>>(u: U) -> Result<Box<dyn Store>> {
pub async fn make<U: AsRef<str>>(u: U) -> Result<Stores> {
let parsed = url::Url::parse(u.as_ref())?;
let factory = match STORES.get(parsed.scheme()) {
None => return Err(Error::UnknownStore(parsed.scheme().into())),
Some(factory) => factory,
};

factory(u.as_ref()).await
match parsed.scheme() {
dir::SCHEME => return Ok(Stores::Dir(
dir::DirStore::make(&u)
.await
.expect("failed to make dir store"),
)),
"s3" | "s3s" | "s3s+tls" => return Ok(Stores::S3(
s3store::S3Store::make(&u)
.await
.expect(format!("failed to make {} store", parsed.scheme()).as_str()),
)),
"zdb" => return Ok(Stores::ZDB(
zdb::ZdbStore::make(&u)
.await
.expect("failed to make zdb store"),
)),
_ => return Err(Error::UnknownStore(parsed.scheme().into())),
}
}

#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -101,28 +97,11 @@ pub trait Store: Send + Sync + 'static {
fn routes(&self) -> Vec<Route>;
}

/// The store factory works as a factory for a specific store
/// this is only needed to be able dynamically create different types
/// of stores based only on scheme of the store url.
/// the Factory returns a factory future that resolved to a Box<dyn Store>
pub type Factory = fn(u: &str) -> FactoryFuture;

/// FactoryFuture is a future that resolves to a Result<Box<dyn Store>> this
/// is returned by a factory function like above
pub type FactoryFuture = Pin<Box<dyn Future<Output = Result<Box<dyn Store>>>>>;

/// Router holds a set of shards (stores) where each store can be configured to serve
/// a range of hashes.
///
/// On get, all possible stores that is configured to serve this key are tried until the first
/// one succeed
///
/// On set, the router set the object on all matching stores, and fails if at least
/// one store fails, or if no store matches the key
pub type Router = router::Router<Box<dyn Store>>;

#[async_trait::async_trait]
impl Store for Router {
impl<S> Store for Router<S>
where
S: Store,
{
async fn get(&self, key: &[u8]) -> Result<Vec<u8>> {
if key.is_empty() {
return Err(Error::InvalidKey);
Expand All @@ -131,7 +110,7 @@ impl Store for Router {

// to make it fare we shuffle the list of matching routers randomly everytime
// before we do a get
let mut routers: Vec<&Box<dyn Store>> = self.route(key[0]).collect();
let mut routers: Vec<&S> = self.route(key[0]).collect();
routers.shuffle(&mut rand::thread_rng());
for store in routers {
match store.get(key).await {
Expand Down Expand Up @@ -182,16 +161,33 @@ impl Store for Router {
routes
}
}
pub enum Stores {
S3(s3store::S3Store),
Dir(dir::DirStore),
ZDB(zdb::ZdbStore),
}

#[async_trait::async_trait]
impl Store for Box<dyn Store> {
impl Store for Stores {
async fn get(&self, key: &[u8]) -> Result<Vec<u8>> {
self.as_ref().get(key).await
match self {
self::Stores::S3(s3_store) => s3_store.get(key).await,
self::Stores::Dir(dir_store) => dir_store.get(key).await,
self::Stores::ZDB(zdb_store) => zdb_store.get(key).await,
}
}
async fn set(&self, key: &[u8], blob: &[u8]) -> Result<()> {
self.as_ref().set(key, blob).await
match self {
self::Stores::S3(s3_store) => s3_store.set(key, blob).await,
self::Stores::Dir(dir_store) => dir_store.set(key, blob).await,
self::Stores::ZDB(zdb_store) => zdb_store.set(key, blob).await,
}
}
fn routes(&self) -> Vec<Route> {
self.as_ref().routes()
match self {
self::Stores::S3(s3_store) => s3_store.routes(),
self::Stores::Dir(dir_store) => dir_store.routes(),
self::Stores::ZDB(zdb_store) => zdb_store.routes(),
}
}
}
17 changes: 5 additions & 12 deletions src/store/s3store.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use super::{Error, Result, Route, Store};

use anyhow::Context;
use futures::Future;
use s3::{creds::Credentials, error::S3Error, Bucket, Region};
use std::pin::Pin;
use url::Url;

fn get_config<U: AsRef<str>>(u: U) -> Result<(Credentials, Region, String)> {
Expand Down Expand Up @@ -46,17 +44,8 @@ fn get_config<U: AsRef<str>>(u: U) -> Result<(Credentials, Region, String)> {
))
}

async fn make_inner(url: String) -> Result<Box<dyn Store>> {
let (cred, region, bucket_name) = get_config(&url)?;
Ok(Box::new(S3Store::new(&url, &bucket_name, region, cred)?))
}

pub fn make(url: &str) -> Pin<Box<dyn Future<Output = Result<Box<dyn Store>>>>> {
Box::pin(make_inner(url.into()))
}

#[derive(Clone)]
struct S3Store {
pub struct S3Store {
bucket: Bucket,
url: String,
// this is only here as a work around for this bug https://github.com/durch/rust-s3/issues/337
Expand All @@ -67,6 +56,10 @@ struct S3Store {
}

impl S3Store {
pub async fn make<U: AsRef<str>>(url: &U) -> Result<S3Store> {
let (cred, region, bucket_name) = get_config(url.as_ref())?;
Ok(S3Store::new(url.as_ref(), &bucket_name, region, cred)?)
}
pub fn new(url: &str, bucket_name: &str, region: Region, cred: Credentials) -> Result<Self> {
let bucket = Bucket::new(bucket_name, region, cred)
.context("failed instantiate bucket")?
Expand Down
57 changes: 28 additions & 29 deletions src/store/zdb.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{Error, FactoryFuture, Result, Route, Store};
use super::{Error, Result, Route, Store};
use anyhow::Context;

use bb8_redis::{
Expand Down Expand Up @@ -77,42 +77,41 @@ fn get_connection_info<U: AsRef<str>>(u: U) -> Result<(ConnectionInfo, Option<St
))
}

async fn make_inner(url: String) -> Result<Box<dyn Store>> {
let (mut info, namespace) = get_connection_info(&url)?;

let namespace = WithNamespace {
namespace,
password: info.redis.password.take(),
};
#[derive(Clone)]
pub struct ZdbStore {
url: String,
pool: Pool<RedisConnectionManager>,
}

log::debug!("connection {:#?}", info);
log::debug!("switching namespace to: {:?}", namespace.namespace);
impl ZdbStore {
pub async fn make<U: AsRef<str>>(url: &U) -> Result<ZdbStore> {
let (mut info, namespace) = get_connection_info(url.as_ref())?;

let mgr =
RedisConnectionManager::new(info).context("failed to create redis connection manager")?;
let namespace = WithNamespace {
namespace,
password: info.redis.password.take(),
};

let pool = Pool::builder()
.max_size(20)
.connection_customizer(Box::new(namespace))
.build(mgr)
.await
.context("failed to create connection pool")?;
log::debug!("connection {:#?}", info);
log::debug!("switching namespace to: {:?}", namespace.namespace);

Ok(Box::from(ZdbStore { url, pool }))
}
let mgr = RedisConnectionManager::new(info)
.context("failed to create redis connection manager")?;

pub fn make(url: &str) -> FactoryFuture {
Box::pin(make_inner(url.into()))
}
let pool = Pool::builder()
.max_size(20)
.connection_customizer(Box::new(namespace))
.build(mgr)
.await
.context("failed to create connection pool")?;

#[derive(Clone)]
pub struct ZdbStore {
url: String,
pool: Pool<RedisConnectionManager>,
Ok(ZdbStore {
url: url.as_ref().to_string(),
pool,
})
}
}

impl ZdbStore {}

#[async_trait::async_trait]
impl Store for ZdbStore {
async fn get(&self, key: &[u8]) -> super::Result<Vec<u8>> {
Expand Down

0 comments on commit 502d0fa

Please sign in to comment.