Skip to content

Commit

Permalink
Working implementation of gossipsub + indexing (only triggered via grpc)
Browse files Browse the repository at this point in the history
  • Loading branch information
GamePad64 committed Nov 10, 2021
1 parent 36f101c commit e1f1c94
Show file tree
Hide file tree
Showing 18 changed files with 388 additions and 1,293 deletions.
1,194 changes: 53 additions & 1,141 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions src/daemon-rs/Cargo.toml
Expand Up @@ -11,17 +11,17 @@ serde = { version = "1", features = ["derive"] }
#simple_logger = "1.13.0"
directories = "4.0.1"
tokio = { version = "1.13.0", features = ["rt", "macros", "rt-multi-thread", "fs", "sync", "net", "signal"] }
notify = "5.0.0-pre.13"
igd = { version = "0.12.0", features = ["aio"] }
#igd = { version = "0.12.0", features = ["aio"] }
futures = "0.3.17"
bytes = "1.1.0"
serde_json = "1"
tonic = "0.6.1"
prost = "0.9.0"
tonic-reflection = "0.3.0"
hex = "0.4.3"
env_logger = "0.9.0"
libp2p = "0.40.0"
libp2p = { version = "0.40.0", features = ["tcp-tokio", "gossipsub", "mplex", "yamux", "noise", "mdns"], default-features = false }
built = "0.5"

[build-dependencies]
tonic-build = "0.6.0"
prost-build = "0.9.0"
1 change: 1 addition & 0 deletions src/daemon-rs/build.rs
Expand Up @@ -8,5 +8,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.file_descriptor_set_path(descriptor_path)
.format(true)
.compile(&["src/proto/controller.proto"], &["src/proto/"])?;
prost_build::compile_protos(&["src/proto/protocol.proto"], &["src/proto/"]).unwrap();
Ok(())
}
68 changes: 68 additions & 0 deletions src/daemon-rs/src/bucket/manager.rs
@@ -0,0 +1,68 @@
use crate::bucket::collection::BucketCollection;
use crate::bucket::{Bucket, BucketEvent};
use crate::settings::BucketConfig;
use log::trace;
use std::sync::{Arc, RwLock};
use tokio::sync::{broadcast, mpsc};

#[derive(Clone, Debug)]
pub enum BucketManagerEvent {
BucketAdded(Arc<Bucket>),
BucketRemoved(Arc<Bucket>),
BucketEvent {
bucket: Arc<Bucket>,
event: BucketEvent,
},
}

pub struct BucketManager {
buckets: Arc<RwLock<BucketCollection>>,
event_sender: broadcast::Sender<BucketManagerEvent>,
}

impl BucketManager {
pub fn new() -> Self {
let (event_sender, _) = broadcast::channel(128);
BucketManager {
buckets: Arc::new(RwLock::new(BucketCollection::new())),
event_sender,
}
}

pub async fn add_bucket(&self, config: BucketConfig) {
let mut lock = self.buckets.write().unwrap();

let (event_sender_bucket, mut event_receiver_bucket) = mpsc::channel(128);

let bucket = Bucket::new(config, event_sender_bucket).await;
let bucket_arc = (*lock).add_bucket(bucket);
bucket_arc.initialize().await;
trace!("Sending event for BucketAdded");

let _ = self
.event_sender
.send(BucketManagerEvent::BucketAdded(bucket_arc.clone()));

let event_sender_local = self.event_sender.clone();

tokio::spawn(async move {
loop {
tokio::select! {
event = event_receiver_bucket.recv() => {
if let Some(event) = event {
event_sender_local.send(BucketManagerEvent::BucketEvent {bucket: bucket_arc.clone(), event});
} else { break; }
}
}
}
});
}

pub fn get_event_channel(&self) -> broadcast::Receiver<BucketManagerEvent> {
self.event_sender.subscribe()
}

pub fn get_bucket_byid(&self, bucket_id: &[u8]) -> Option<Arc<Bucket>> {
self.buckets.read().unwrap().get_bucket_byid(bucket_id)
}
}
68 changes: 34 additions & 34 deletions src/daemon-rs/src/bucket/mod.rs
@@ -1,29 +1,38 @@
use std::borrow::Cow;
use std::fmt;
use std::fmt::Formatter;
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
use std::sync::Arc;

use collection::BucketCollection;
use librevault_util::index::Index;
use librevault_util::index::{Index, SignedMeta};
use librevault_util::secret::Secret;

mod collection;
mod watcher;
pub mod manager;

use crate::settings::BucketConfig;
use hex::ToHex;
// use librevault_util::index::proto::Meta;
use librevault_util::indexer::{make_meta, sign_meta};
use log::debug;
use tokio::sync::mpsc;

#[derive(Clone, Debug)]
pub enum BucketEvent {
MetaAdded { signed_meta: SignedMeta },
}

pub struct Bucket {
secret: Secret,
root: PathBuf,

index: Arc<Index>,
system_dir: PathBuf,

event_sender: mpsc::Sender<BucketEvent>,
}

impl Bucket {
async fn new(config: BucketConfig) -> Self {
let bucket_id_hex: String = config.secret.get_id().encode_hex();
debug!("Creating bucket: {}", bucket_id_hex);
async fn new(config: BucketConfig, event_sender: mpsc::Sender<BucketEvent>) -> Self {
debug!("Creating bucket: {}", config.secret.get_id_hex());

let system_dir = config.path.join(".librevault");

Expand All @@ -34,15 +43,19 @@ impl Bucket {
secret: config.secret,
root: config.path,
index,
system_dir,
event_sender,
}
}

fn get_id(&self) -> Vec<u8> {
pub fn get_id(&self) -> Vec<u8> {
self.secret.get_id()
}

async fn launch_bucket(&self) {
pub fn get_id_hex(&self) -> String {
self.secret.get_id_hex()
}

async fn initialize(&self) {
let _ = tokio::task::block_in_place(move || self.index.migrate()); // TODO: block somewhere else

// loop {
Expand All @@ -55,31 +68,18 @@ impl Bucket {
if let Ok(meta) = meta {
debug!("Meta: {:?}", &meta);
let signed_meta = sign_meta(&meta, &self.secret);
self.index.put_meta(signed_meta, true).unwrap();
}
}
}

pub struct BucketManager {
buckets: Arc<RwLock<BucketCollection>>,
}
self.index.put_meta(&signed_meta, true).unwrap();

impl BucketManager {
pub fn new() -> Self {
BucketManager {
buckets: Arc::new(RwLock::new(BucketCollection::new())),
self.event_sender
.send(BucketEvent::MetaAdded { signed_meta })
.await
.expect("Channel must be open");
}
}
}

pub async fn add_bucket(&self, config: BucketConfig) {
let mut lock = self.buckets.write().unwrap();

let bucket = Bucket::new(config).await;
let bucket_arc = (*lock).add_bucket(bucket);
bucket_arc.launch_bucket().await;
}

pub fn get_bucket_byid(&self, bucket_id: &[u8]) -> Option<Arc<Bucket>> {
self.buckets.read().unwrap().get_bucket_byid(bucket_id)
impl fmt::Debug for Bucket {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "(Bucket: id={}, loc={:?})", self.get_id_hex(), self.root)
}
}
32 changes: 0 additions & 32 deletions src/daemon-rs/src/bucket/watcher.rs

This file was deleted.

51 changes: 0 additions & 51 deletions src/daemon-rs/src/discover.rs

This file was deleted.

2 changes: 1 addition & 1 deletion src/daemon-rs/src/grpc.rs
Expand Up @@ -2,7 +2,7 @@ use std::path::Path;
use std::sync::Arc;
use tonic::{transport::Server, Request, Response, Status};

use crate::bucket::BucketManager;
use crate::bucket::manager::BucketManager;
use crate::settings::ConfigManager;
use log::debug;
use proto::controller_server::{Controller, ControllerServer};
Expand Down
22 changes: 12 additions & 10 deletions src/daemon-rs/src/main.rs
Expand Up @@ -4,14 +4,13 @@ use directories::ProjectDirs;

use log::{debug, info};

use crate::p2p::run_server;
use crate::settings::ConfigManager;
use bucket::BucketManager;
use librevault_util::nodekey::nodekey_write_new;
use bucket::manager::BucketManager;

mod bucket;
mod discover;
mod grpc;
mod p2p_server;
mod p2p;
mod settings;

#[tokio::main]
Expand All @@ -36,19 +35,22 @@ async fn main() {
std::fs::create_dir_all(&config_dir).expect("Could not create config directory");

let settings = Arc::new(ConfigManager::new(config_dir).unwrap());

let buckets = Arc::new(BucketManager::new());

tokio::spawn(grpc::run_grpc(buckets.clone(), settings.clone()));
tokio::spawn(run_server(
buckets.clone(),
settings.clone(),
buckets.get_event_channel(),
));

// Add buckets only after passing to components
for bucket_config in &settings.config().buckets {
buckets.add_bucket(bucket_config.clone()).await;
}

nodekey_write_new(config_dir.join("key.pem").to_str().unwrap());
tokio::spawn(grpc::run_grpc(buckets.clone(), settings.clone()));

let _ = tokio::signal::ctrl_c().await;

// discover_mcast().await;

// let gateway = search_gateway(SearchOptions::default()).await.unwrap();
// info!("Detected IGD gateway: {:?}", gateway);
}

0 comments on commit e1f1c94

Please sign in to comment.