Skip to content

Commit

Permalink
Merge fcd54f4 into fa10207
Browse files Browse the repository at this point in the history
  • Loading branch information
fhaynes committed Nov 26, 2018
2 parents fa10207 + fcd54f4 commit dc776a1
Show file tree
Hide file tree
Showing 6 changed files with 231 additions and 12 deletions.
71 changes: 65 additions & 6 deletions src/cluster/consul_interface.rs
Expand Up @@ -5,7 +5,9 @@ use log::error;
use hyper::body::Body;
use hyper::rt::Future;
use hyper::{Client, Request};
use serde_json;

use cluster::shard::Shard;
use cluster::ClusterError;

static CONSUL_PREFIX: &'static str = "services/toshi/";
Expand Down Expand Up @@ -54,7 +56,7 @@ impl ConsulInterface {
pub fn register_node(&mut self) -> impl Future<Item = (), Error = ClusterError> {
let uri = self.base_consul_url() + &self.cluster_name() + "/" + &self.node_id() + "/";
let client = Client::new();
let req = self.put_request(&uri);
let req = self.put_request(&uri, Body::empty());
client.request(req).map(|_| ()).map_err(|e| {
error!("Error registering node: {:?}", e);
std::process::exit(1);
Expand All @@ -65,16 +67,31 @@ impl ConsulInterface {
pub fn register_cluster(&self) -> impl Future<Item = (), Error = ClusterError> {
let uri = self.base_consul_url() + &self.cluster_name() + "/";
let client = Client::new();
let req = self.put_request(&uri);
let req = self.put_request(&uri, Body::empty());
client.request(req).map(|_| ()).map_err(|_| ClusterError::FailedRegisteringNode)
}

/// Registers a shard with the Consul cluster
pub fn register_shard<T: Shard + serde::Serialize>(&mut self, shard: &T) -> impl Future<Item = (), Error = ()> {
let uri = self.base_consul_url() + &self.cluster_name() + "/" + &shard.shard_id().to_hyphenated_ref().to_string() + "/";
let client = Client::new();
let json_body = serde_json::to_string(&shard).unwrap();
let req = self.put_request(&uri, json_body);
client.request(req).map(|_| ()).map_err(|e| {
error!("Error registering shard: {:?}", e);
std::process::exit(1);
})
}

fn base_consul_url(&self) -> String {
self.scheme.clone() + "://" + &self.address + ":" + &self.port + "/v1/kv/" + CONSUL_PREFIX
}

fn put_request(&self, uri: &str) -> Request<Body> {
Request::builder().method("PUT").uri(uri).body(Body::empty()).unwrap()
fn put_request<T>(&self, uri: &str, payload: T) -> Request<Body>
where
hyper::Body: std::convert::From<T>,
{
Request::builder().method("PUT").uri(uri).body(Body::from(payload)).unwrap()
}

fn cluster_name(&self) -> String {
Expand All @@ -92,8 +109,50 @@ impl Default for ConsulInterface {
address: "127.0.0.1".into(),
port: "8500".into(),
scheme: String::from("http"),
cluster_name: None,
node_id: None,
cluster_name: Some(String::from("kitsune")),
node_id: Some(String::from("alpha")),
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use hyper::body::Payload;

#[test]
fn test_create_consul_interface() {
let consul = ConsulInterface::default();
assert_eq!(consul.base_consul_url(), "http://127.0.0.1:8500/v1/kv/services/toshi/");
}

#[test]
fn test_consul_cluster_name() {
let consul = ConsulInterface::default()
.with_cluster_name("kitsune".to_string())
.with_address("127.0.0.1".into())
.with_node_id("alpha".into())
.with_scheme("http".into())
.with_port("8500".into());
assert_eq!(consul.cluster_name(), "kitsune");
}

#[test]
fn test_consul_cluster_put() {
let consul = ConsulInterface::default();
let test_req = consul.put_request(&consul.base_consul_url(), Body::empty());
assert_eq!(test_req.body().content_length().unwrap(), 0);
}

#[test]
fn test_register_node() {
let mut consul = ConsulInterface::default();
let _ = consul.register_node();
}

#[test]
fn test_register_cluster() {
let mut consul = ConsulInterface::default();
let _ = consul.register_cluster();
}
}
9 changes: 9 additions & 0 deletions src/cluster/mod.rs
Expand Up @@ -2,6 +2,7 @@

pub mod consul_interface;
pub mod node;
pub mod shard;

pub use self::consul_interface::ConsulInterface;
pub use self::node::*;
Expand Down Expand Up @@ -32,6 +33,14 @@ pub enum ClusterError {
FailedGettingCPUMetadata(String),
#[fail(display = "Unable to read content as UTF-8")]
UnableToReadUTF8,
#[fail(display = "Unable to create PrimaryShard: {}", _0)]
FailedCreatingPrimaryShard(String),
#[fail(display = "Unable to create ReplicaShard: {}", _0)]
FailedCreatingReplicaShard(String),
#[fail(display = "Unable to get index name: {}", _0)]
UnableToGetIndexName(String),
#[fail(display = "Unable to get index handle")]
UnableToGetIndexHandle(),
}

#[derive(Debug, Serialize, Deserialize)]
Expand Down
143 changes: 143 additions & 0 deletions src/cluster/shard.rs
@@ -0,0 +1,143 @@
use cluster::ClusterError;
use handle::IndexHandle;
use settings::Settings;
use tantivy::Index;
use uuid::Uuid;

/// Trait implemented by both Primary and Replica Shards
pub trait Shard {
fn shard_id(&self) -> Uuid;
fn primary_shard_id(&self) -> Option<Uuid>;
fn is_primary(&self) -> bool;
fn index_name(&self) -> Result<String, ClusterError>;
}

/// A PrimaryShard is a writable partition of an Index
#[derive(Serialize, Deserialize)]
pub struct PrimaryShard {
shard_id: Uuid,
#[serde(skip_serializing, skip_deserializing)]
index_handle: Option<IndexHandle>,
}

/// A ReplicaShard is a copy of a specific PrimaryShard that is read-only
#[derive(Serialize, Deserialize)]
pub struct ReplicaShard {
shard_id: Uuid,
primary_shard_id: Uuid,
#[serde(skip_serializing, skip_deserializing)]
index_handle: Option<IndexHandle>,
}

impl PrimaryShard {
/// Creates and returns a new PrimaryShard with a random ID
pub fn new() -> PrimaryShard {
PrimaryShard {
shard_id: Uuid::new_v4(),
index_handle: None,
}
}

/// Adds an IndexHandle to a PrimaryShard
pub fn with_index(mut self, index: Index, name: String) -> Result<PrimaryShard, ClusterError> {
let settings = Settings::default();
match IndexHandle::new(index, settings, &name) {
Ok(lh) => {
self.index_handle = Some(lh);
Ok(self)
}
Err(e) => Err(ClusterError::FailedCreatingPrimaryShard(e.to_string())),
}
}
}

impl Shard for PrimaryShard {
/// Returns the UUID for this shard
fn shard_id(&self) -> Uuid {
self.shard_id.clone()
}

/// Since this is not a Replica Shard, return None
fn primary_shard_id(&self) -> Option<Uuid> {
None
}

/// Simple function to check if a shard is a Primary
fn is_primary(&self) -> bool {
true
}

/// Returns the name from the underlying IndexHandle
fn index_name(&self) -> Result<String, ClusterError> {
match self.index_handle {
Some(ref handle) => Ok(handle.name()),
None => Err(ClusterError::UnableToGetIndexHandle()),
}
}
}
impl ReplicaShard {
/// Creates and returns a new ReplicaShard that will be a read-only copy of a PrimaryShard
pub fn new(primary_shard_id: Uuid) -> ReplicaShard {
ReplicaShard {
primary_shard_id: primary_shard_id,
shard_id: Uuid::new_v4(),
index_handle: None,
}
}

/// Adds an IndexHandle to a ReplicaShard
pub fn with_index(mut self, index: Index, name: String) -> Result<ReplicaShard, ClusterError> {
let settings = Settings::default();
match IndexHandle::new(index, settings, &name) {
Ok(lh) => {
self.index_handle = Some(lh);
Ok(self)
}
Err(e) => Err(ClusterError::FailedCreatingPrimaryShard(e.to_string())),
}
}
}

impl Shard for ReplicaShard {
/// Returns the UUID for this shard
fn shard_id(&self) -> Uuid {
self.shard_id.clone()
}

/// Since this is a replica shard, returns the ID of the shard it is
/// a replica of
fn primary_shard_id(&self) -> Option<Uuid> {
Some(self.primary_shard_id.clone())
}

/// Simple function to check if it is a primary shard
fn is_primary(&self) -> bool {
false
}

/// Returns the name of the underlying Index
fn index_name(&self) -> Result<String, ClusterError> {
match self.index_handle {
Some(ref handle) => Ok(handle.name()),
None => Err(ClusterError::UnableToGetIndexHandle()),
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_create_primary_shard() {
let test_shard = PrimaryShard::new();
assert!(test_shard.is_primary());
}

#[test]
fn test_create_replica_shard() {
let test_primary_shard = PrimaryShard::new();
let test_replica_shard = ReplicaShard::new(test_primary_shard.shard_id());
assert!(!test_replica_shard.is_primary());
}
}
11 changes: 9 additions & 2 deletions src/handle.rs
Expand Up @@ -9,10 +9,11 @@ pub struct IndexHandle {
writer: Arc<Mutex<IndexWriter>>,
current_opstamp: AtomicUsize,
settings: Settings,
name: String,
}

impl IndexHandle {
pub fn new(index: Index, settings: Settings) -> Result<Self> {
pub fn new(index: Index, settings: Settings, name: &str) -> Result<Self> {
let i = index.writer(settings.writer_memory)?;
i.set_merge_policy(settings.get_merge_policy());
let current_opstamp = AtomicUsize::new(0);
Expand All @@ -22,15 +23,21 @@ impl IndexHandle {
writer,
current_opstamp,
settings,
name: name.into(),
})
}

pub fn get_index(&self) -> &Index {
&self.index
}

/// Returns the name of the Index
pub fn name(&self) -> String {
self.name.clone()
}

pub fn recreate_writer(self) -> Result<Self> {
IndexHandle::new(self.index, self.settings.clone())
IndexHandle::new(self.index, self.settings.clone(), &self.name)
}

pub fn get_writer(&self) -> Arc<Mutex<IndexWriter>> {
Expand Down
8 changes: 4 additions & 4 deletions src/index.rs
Expand Up @@ -74,8 +74,8 @@ impl IndexCatalog {
#[allow(dead_code)]
pub fn with_index(name: String, index: Index) -> Result<Self> {
let mut map = HashMap::new();
let new_index =
IndexHandle::new(index, Settings::default()).unwrap_or_else(|_| panic!("Unable to open index: {} because it's locked", name));
let new_index = IndexHandle::new(index, Settings::default(), &name)
.unwrap_or_else(|_| panic!("Unable to open index: {} because it's locked", name));
map.insert(name, new_index);
Ok(IndexCatalog {
settings: Settings::default(),
Expand All @@ -96,8 +96,8 @@ impl IndexCatalog {
}

pub fn add_index(&mut self, name: String, index: Index) {
let handle =
IndexHandle::new(index, self.settings.clone()).unwrap_or_else(|_| panic!("Unable to open index: {} because it's locked", name));
let handle = IndexHandle::new(index, self.settings.clone(), &name)
.unwrap_or_else(|_| panic!("Unable to open index: {} because it's locked", name));
self.collection.entry(name).or_insert(handle);
}

Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Expand Up @@ -20,6 +20,7 @@ extern crate serde_json;
extern crate systemstat;
extern crate tantivy;
extern crate tokio;
extern crate uuid;

use log::*;
use tantivy::query::QueryParserError;
Expand Down

0 comments on commit dc776a1

Please sign in to comment.