Skip to content

Commit

Permalink
Started on shard data structures
Browse files Browse the repository at this point in the history
  • Loading branch information
fhaynes committed Nov 25, 2018
1 parent fa10207 commit 5d819be
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 6 deletions.
18 changes: 18 additions & 0 deletions src/cluster/consul_interface.rs
Expand Up @@ -5,7 +5,9 @@ use log::error;
use hyper::body::Body;
use hyper::rt::Future;
use hyper::{Client, Request};
use serde_json;

use cluster::shard::Shard;
use cluster::ClusterError;

static CONSUL_PREFIX: &'static str = "services/toshi/";
Expand Down Expand Up @@ -69,6 +71,18 @@ impl ConsulInterface {
client.request(req).map(|_| ()).map_err(|_| ClusterError::FailedRegisteringNode)
}

/// Registers a shard with the Consul cluster
pub fn register_shard<T: Shard + serde::Serialize>(&mut self, shard: &T) -> impl Future<Item = (), Error = ()> {
let uri = self.base_consul_url() + &self.cluster_name() + "/" + &shard.shard_id().to_hyphenated_ref().to_string() + "/";
let client = Client::new();
let json_body = serde_json::to_string(&shard).unwrap();
let req = self.put_request_with_body(&uri, json_body);
client.request(req).map(|_| ()).map_err(|e| {
error!("Error registering shard: {:?}", e);
std::process::exit(1);
})
}

fn base_consul_url(&self) -> String {
self.scheme.clone() + "://" + &self.address + ":" + &self.port + "/v1/kv/" + CONSUL_PREFIX
}
Expand All @@ -77,6 +91,10 @@ impl ConsulInterface {
Request::builder().method("PUT").uri(uri).body(Body::empty()).unwrap()
}

fn put_request_with_body(&self, uri: &str, payload: String) -> Request<Body> {
Request::builder().method("PUT").uri(uri).body(Body::from(payload)).unwrap()
}

fn cluster_name(&self) -> String {
self.cluster_name.clone().unwrap()
}
Expand Down
9 changes: 9 additions & 0 deletions src/cluster/mod.rs
Expand Up @@ -2,6 +2,7 @@

pub mod consul_interface;
pub mod node;
pub mod shard;

pub use self::consul_interface::ConsulInterface;
pub use self::node::*;
Expand Down Expand Up @@ -32,6 +33,14 @@ pub enum ClusterError {
FailedGettingCPUMetadata(String),
#[fail(display = "Unable to read content as UTF-8")]
UnableToReadUTF8,
#[fail(display = "Unable to create PrimaryShard: {}", _0)]
FailedCreatingPrimaryShard(String),
#[fail(display = "Unable to create ReplicaShard: {}", _0)]
FailedCreatingReplicaShard(String),
#[fail(display = "Unable to get index name: {}", _0)]
UnableToGetIndexName(String),
#[fail(display = "Unable to get index handle")]
UnableToGetIndexHandle(),
}

#[derive(Debug, Serialize, Deserialize)]
Expand Down
143 changes: 143 additions & 0 deletions src/cluster/shard.rs
@@ -0,0 +1,143 @@
use cluster::ClusterError;
use handle::IndexHandle;
use settings::Settings;
use tantivy::Index;
use uuid::Uuid;

/// Trait implemented by both Primary and Replica Shards
pub trait Shard {
fn shard_id(&self) -> Uuid;
fn primary_shard_id(&self) -> Option<Uuid>;
fn is_primary(&self) -> bool;
fn index_name(&self) -> Result<String, ClusterError>;
}

/// A PrimaryShard is a writable partition of an Index
#[derive(Serialize, Deserialize)]
pub struct PrimaryShard {
shard_id: Uuid,
#[serde(skip_serializing, skip_deserializing)]
index_handle: Option<IndexHandle>,
}

/// A ReplicaShard is a copy of a specific PrimaryShard that is read-only
#[derive(Serialize, Deserialize)]
pub struct ReplicaShard {
shard_id: Uuid,
primary_shard_id: Uuid,
#[serde(skip_serializing, skip_deserializing)]
index_handle: Option<IndexHandle>,
}

impl PrimaryShard {
/// Creates and returns a new PrimaryShard with a random ID
pub fn new() -> PrimaryShard {
PrimaryShard {
shard_id: Uuid::new_v4(),
index_handle: None,
}
}

/// Adds an IndexHandle to a PrimaryShard
pub fn with_index(mut self, index: Index, name: String) -> Result<PrimaryShard, ClusterError> {
let settings = Settings::default();
match IndexHandle::new(index, settings, &name) {
Ok(lh) => {
self.index_handle = Some(lh);
Ok(self)
}
Err(e) => Err(ClusterError::FailedCreatingPrimaryShard(e.to_string())),
}
}
}

impl Shard for PrimaryShard {
/// Returns the UUID for this shard
fn shard_id(&self) -> Uuid {
self.shard_id.clone()
}

/// Since this is not a Replica Shard, return None
fn primary_shard_id(&self) -> Option<Uuid> {
None
}

/// Simple function to check if a shard is a Primary
fn is_primary(&self) -> bool {
true
}

/// Returns the name from the underlying IndexHandle
fn index_name(&self) -> Result<String, ClusterError> {
match self.index_handle {
Some(ref handle) => Ok(handle.name()),
None => Err(ClusterError::UnableToGetIndexHandle()),
}
}
}
impl ReplicaShard {
/// Creates and returns a new ReplicaShard that will be a read-only copy of a PrimaryShard
pub fn new(primary_shard_id: Uuid) -> ReplicaShard {
ReplicaShard {
primary_shard_id: primary_shard_id,
shard_id: Uuid::new_v4(),
index_handle: None,
}
}

/// Adds an IndexHandle to a ReplicaShard
pub fn with_index(mut self, index: Index, name: String) -> Result<ReplicaShard, ClusterError> {
let settings = Settings::default();
match IndexHandle::new(index, settings, &name) {
Ok(lh) => {
self.index_handle = Some(lh);
Ok(self)
}
Err(e) => Err(ClusterError::FailedCreatingPrimaryShard(e.to_string())),
}
}
}

impl Shard for ReplicaShard {
/// Returns the UUID for this shard
fn shard_id(&self) -> Uuid {
self.shard_id.clone()
}

/// Since this is a replica shard, returns the ID of the shard it is
/// a replica of
fn primary_shard_id(&self) -> Option<Uuid> {
Some(self.primary_shard_id.clone())
}

/// Simple function to check if it is a primary shard
fn is_primary(&self) -> bool {
false
}

/// Returns the name of the underlying Index
fn index_name(&self) -> Result<String, ClusterError> {
match self.index_handle {
Some(ref handle) => Ok(handle.name()),
None => Err(ClusterError::UnableToGetIndexHandle()),
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_create_primary_shard() {
let test_shard = PrimaryShard::new();
assert!(test_shard.is_primary());
}

#[test]
fn test_create_replica_shard() {
let test_primary_shard = PrimaryShard::new();
let test_replica_shard = ReplicaShard::new(test_primary_shard.shard_id());
assert!(!test_replica_shard.is_primary());
}
}
11 changes: 9 additions & 2 deletions src/handle.rs
Expand Up @@ -9,10 +9,11 @@ pub struct IndexHandle {
writer: Arc<Mutex<IndexWriter>>,
current_opstamp: AtomicUsize,
settings: Settings,
name: String,
}

impl IndexHandle {
pub fn new(index: Index, settings: Settings) -> Result<Self> {
pub fn new(index: Index, settings: Settings, name: &str) -> Result<Self> {
let i = index.writer(settings.writer_memory)?;
i.set_merge_policy(settings.get_merge_policy());
let current_opstamp = AtomicUsize::new(0);
Expand All @@ -22,15 +23,21 @@ impl IndexHandle {
writer,
current_opstamp,
settings,
name: name.into(),
})
}

pub fn get_index(&self) -> &Index {
&self.index
}

/// Returns the name of the Index
pub fn name(&self) -> String {
self.name.clone()
}

pub fn recreate_writer(self) -> Result<Self> {
IndexHandle::new(self.index, self.settings.clone())
IndexHandle::new(self.index, self.settings.clone(), &self.name)
}

pub fn get_writer(&self) -> Arc<Mutex<IndexWriter>> {
Expand Down
8 changes: 4 additions & 4 deletions src/index.rs
Expand Up @@ -74,8 +74,8 @@ impl IndexCatalog {
#[allow(dead_code)]
pub fn with_index(name: String, index: Index) -> Result<Self> {
let mut map = HashMap::new();
let new_index =
IndexHandle::new(index, Settings::default()).unwrap_or_else(|_| panic!("Unable to open index: {} because it's locked", name));
let new_index = IndexHandle::new(index, Settings::default(), &name)
.unwrap_or_else(|_| panic!("Unable to open index: {} because it's locked", name));
map.insert(name, new_index);
Ok(IndexCatalog {
settings: Settings::default(),
Expand All @@ -96,8 +96,8 @@ impl IndexCatalog {
}

pub fn add_index(&mut self, name: String, index: Index) {
let handle =
IndexHandle::new(index, self.settings.clone()).unwrap_or_else(|_| panic!("Unable to open index: {} because it's locked", name));
let handle = IndexHandle::new(index, self.settings.clone(), &name)
.unwrap_or_else(|_| panic!("Unable to open index: {} because it's locked", name));
self.collection.entry(name).or_insert(handle);
}

Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Expand Up @@ -20,6 +20,7 @@ extern crate serde_json;
extern crate systemstat;
extern crate tantivy;
extern crate tokio;
extern crate uuid;

use log::*;
use tantivy::query::QueryParserError;
Expand Down

0 comments on commit 5d819be

Please sign in to comment.