From 5d819be50e9a42ab0044fd185905c93ecce6bff3 Mon Sep 17 00:00:00 2001 From: Fletcher Haynes Date: Sun, 18 Nov 2018 20:33:15 -0800 Subject: [PATCH] Started on shard data structures --- src/cluster/consul_interface.rs | 18 ++++ src/cluster/mod.rs | 9 ++ src/cluster/shard.rs | 143 ++++++++++++++++++++++++++++++++ src/handle.rs | 11 ++- src/index.rs | 8 +- src/lib.rs | 1 + 6 files changed, 184 insertions(+), 6 deletions(-) create mode 100644 src/cluster/shard.rs diff --git a/src/cluster/consul_interface.rs b/src/cluster/consul_interface.rs index 0eabc35c..eb98f853 100644 --- a/src/cluster/consul_interface.rs +++ b/src/cluster/consul_interface.rs @@ -5,7 +5,9 @@ use log::error; use hyper::body::Body; use hyper::rt::Future; use hyper::{Client, Request}; +use serde_json; +use cluster::shard::Shard; use cluster::ClusterError; static CONSUL_PREFIX: &'static str = "services/toshi/"; @@ -69,6 +71,18 @@ impl ConsulInterface { client.request(req).map(|_| ()).map_err(|_| ClusterError::FailedRegisteringNode) } + /// Registers a shard with the Consul cluster + pub fn register_shard(&mut self, shard: &T) -> impl Future { + let uri = self.base_consul_url() + &self.cluster_name() + "/" + &shard.shard_id().to_hyphenated_ref().to_string() + "/"; + let client = Client::new(); + let json_body = serde_json::to_string(&shard).unwrap(); + let req = self.put_request_with_body(&uri, json_body); + client.request(req).map(|_| ()).map_err(|e| { + error!("Error registering shard: {:?}", e); + std::process::exit(1); + }) + } + fn base_consul_url(&self) -> String { self.scheme.clone() + "://" + &self.address + ":" + &self.port + "/v1/kv/" + CONSUL_PREFIX } @@ -77,6 +91,10 @@ impl ConsulInterface { Request::builder().method("PUT").uri(uri).body(Body::empty()).unwrap() } + fn put_request_with_body(&self, uri: &str, payload: String) -> Request { + Request::builder().method("PUT").uri(uri).body(Body::from(payload)).unwrap() + } + fn cluster_name(&self) -> String { self.cluster_name.clone().unwrap() } diff --git a/src/cluster/mod.rs b/src/cluster/mod.rs index 14db763a..eb7766a2 100644 --- a/src/cluster/mod.rs +++ b/src/cluster/mod.rs @@ -2,6 +2,7 @@ pub mod consul_interface; pub mod node; +pub mod shard; pub use self::consul_interface::ConsulInterface; pub use self::node::*; @@ -32,6 +33,14 @@ pub enum ClusterError { FailedGettingCPUMetadata(String), #[fail(display = "Unable to read content as UTF-8")] UnableToReadUTF8, + #[fail(display = "Unable to create PrimaryShard: {}", _0)] + FailedCreatingPrimaryShard(String), + #[fail(display = "Unable to create ReplicaShard: {}", _0)] + FailedCreatingReplicaShard(String), + #[fail(display = "Unable to get index name: {}", _0)] + UnableToGetIndexName(String), + #[fail(display = "Unable to get index handle")] + UnableToGetIndexHandle(), } #[derive(Debug, Serialize, Deserialize)] diff --git a/src/cluster/shard.rs b/src/cluster/shard.rs new file mode 100644 index 00000000..e9be0d18 --- /dev/null +++ b/src/cluster/shard.rs @@ -0,0 +1,143 @@ +use cluster::ClusterError; +use handle::IndexHandle; +use settings::Settings; +use tantivy::Index; +use uuid::Uuid; + +/// Trait implemented by both Primary and Replica Shards +pub trait Shard { + fn shard_id(&self) -> Uuid; + fn primary_shard_id(&self) -> Option; + fn is_primary(&self) -> bool; + fn index_name(&self) -> Result; +} + +/// A PrimaryShard is a writable partition of an Index +#[derive(Serialize, Deserialize)] +pub struct PrimaryShard { + shard_id: Uuid, + #[serde(skip_serializing, skip_deserializing)] + index_handle: Option, +} + +/// A ReplicaShard is a copy of a specific PrimaryShard that is read-only +#[derive(Serialize, Deserialize)] +pub struct ReplicaShard { + shard_id: Uuid, + primary_shard_id: Uuid, + #[serde(skip_serializing, skip_deserializing)] + index_handle: Option, +} + +impl PrimaryShard { + /// Creates and returns a new PrimaryShard with a random ID + pub fn new() -> PrimaryShard { + PrimaryShard { + shard_id: Uuid::new_v4(), + index_handle: None, + } + } + + /// Adds an IndexHandle to a PrimaryShard + pub fn with_index(mut self, index: Index, name: String) -> Result { + let settings = Settings::default(); + match IndexHandle::new(index, settings, &name) { + Ok(lh) => { + self.index_handle = Some(lh); + Ok(self) + } + Err(e) => Err(ClusterError::FailedCreatingPrimaryShard(e.to_string())), + } + } +} + +impl Shard for PrimaryShard { + /// Returns the UUID for this shard + fn shard_id(&self) -> Uuid { + self.shard_id.clone() + } + + /// Since this is not a Replica Shard, return None + fn primary_shard_id(&self) -> Option { + None + } + + /// Simple function to check if a shard is a Primary + fn is_primary(&self) -> bool { + true + } + + /// Returns the name from the underlying IndexHandle + fn index_name(&self) -> Result { + match self.index_handle { + Some(ref handle) => Ok(handle.name()), + None => Err(ClusterError::UnableToGetIndexHandle()), + } + } +} +impl ReplicaShard { + /// Creates and returns a new ReplicaShard that will be a read-only copy of a PrimaryShard + pub fn new(primary_shard_id: Uuid) -> ReplicaShard { + ReplicaShard { + primary_shard_id: primary_shard_id, + shard_id: Uuid::new_v4(), + index_handle: None, + } + } + + /// Adds an IndexHandle to a ReplicaShard + pub fn with_index(mut self, index: Index, name: String) -> Result { + let settings = Settings::default(); + match IndexHandle::new(index, settings, &name) { + Ok(lh) => { + self.index_handle = Some(lh); + Ok(self) + } + Err(e) => Err(ClusterError::FailedCreatingPrimaryShard(e.to_string())), + } + } +} + +impl Shard for ReplicaShard { + /// Returns the UUID for this shard + fn shard_id(&self) -> Uuid { + self.shard_id.clone() + } + + /// Since this is a replica shard, returns the ID of the shard it is + /// a replica of + fn primary_shard_id(&self) -> Option { + Some(self.primary_shard_id.clone()) + } + + /// Simple function to check if it is a primary shard + fn is_primary(&self) -> bool { + false + } + + /// Returns the name of the underlying Index + fn index_name(&self) -> Result { + match self.index_handle { + Some(ref handle) => Ok(handle.name()), + None => Err(ClusterError::UnableToGetIndexHandle()), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_create_primary_shard() { + let test_shard = PrimaryShard::new(); + assert!(test_shard.is_primary()); + } + + #[test] + fn test_create_replica_shard() { + let test_primary_shard = PrimaryShard::new(); + let test_replica_shard = ReplicaShard::new(test_primary_shard.shard_id()); + assert!(!test_replica_shard.is_primary()); + } +} diff --git a/src/handle.rs b/src/handle.rs index 621228b0..7ea44f82 100644 --- a/src/handle.rs +++ b/src/handle.rs @@ -9,10 +9,11 @@ pub struct IndexHandle { writer: Arc>, current_opstamp: AtomicUsize, settings: Settings, + name: String, } impl IndexHandle { - pub fn new(index: Index, settings: Settings) -> Result { + pub fn new(index: Index, settings: Settings, name: &str) -> Result { let i = index.writer(settings.writer_memory)?; i.set_merge_policy(settings.get_merge_policy()); let current_opstamp = AtomicUsize::new(0); @@ -22,6 +23,7 @@ impl IndexHandle { writer, current_opstamp, settings, + name: name.into(), }) } @@ -29,8 +31,13 @@ impl IndexHandle { &self.index } + /// Returns the name of the Index + pub fn name(&self) -> String { + self.name.clone() + } + pub fn recreate_writer(self) -> Result { - IndexHandle::new(self.index, self.settings.clone()) + IndexHandle::new(self.index, self.settings.clone(), &self.name) } pub fn get_writer(&self) -> Arc> { diff --git a/src/index.rs b/src/index.rs index 8400c215..59ae3936 100644 --- a/src/index.rs +++ b/src/index.rs @@ -74,8 +74,8 @@ impl IndexCatalog { #[allow(dead_code)] pub fn with_index(name: String, index: Index) -> Result { let mut map = HashMap::new(); - let new_index = - IndexHandle::new(index, Settings::default()).unwrap_or_else(|_| panic!("Unable to open index: {} because it's locked", name)); + let new_index = IndexHandle::new(index, Settings::default(), &name) + .unwrap_or_else(|_| panic!("Unable to open index: {} because it's locked", name)); map.insert(name, new_index); Ok(IndexCatalog { settings: Settings::default(), @@ -96,8 +96,8 @@ impl IndexCatalog { } pub fn add_index(&mut self, name: String, index: Index) { - let handle = - IndexHandle::new(index, self.settings.clone()).unwrap_or_else(|_| panic!("Unable to open index: {} because it's locked", name)); + let handle = IndexHandle::new(index, self.settings.clone(), &name) + .unwrap_or_else(|_| panic!("Unable to open index: {} because it's locked", name)); self.collection.entry(name).or_insert(handle); } diff --git a/src/lib.rs b/src/lib.rs index d543f35e..56c9b140 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,6 +20,7 @@ extern crate serde_json; extern crate systemstat; extern crate tantivy; extern crate tokio; +extern crate uuid; use log::*; use tantivy::query::QueryParserError;