Skip to content

Commit

Permalink
Clustering (#145)
Browse files Browse the repository at this point in the history
* keepin going with clustering

* update cargo lock, renable codecov

* We almost there...

* This is rough, real rough, but it at least works on the base case of search. Time to make this look good.

* Think this is ready at least to PR a bit, next plug consul back in

* keepin going with clustering

* We almost there...

* This is rough, real rough, but it at least works on the base case of search. Time to make this look good.

* Merge conflicts from master

* rustfmt

* Cargo update
  • Loading branch information
hntd187 committed Feb 15, 2019
1 parent 847ed5f commit c6981d0
Show file tree
Hide file tree
Showing 16 changed files with 635 additions and 473 deletions.
521 changes: 249 additions & 272 deletions Cargo.lock

Large diffs are not rendered by default.

19 changes: 4 additions & 15 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,12 @@ tower-util = { git = "https://github.com/tower-rs/tower" }
tower-http = { git = "https://github.com/tower-rs/tower-http" }
tower-consul = { git = "https://github.com/LucioFranco/tower-consul" }
tokio-connect = { git = "https://github.com/carllerche/tokio-connect" }
futures-watch = { git = "https://github.com/carllerche/better-future" }
tower-web = "^0.3"
http = "^0.1"
h2 = "0.1.16"
h2 = "^0.1"
taken = "0.1.1"
flate2 = "^1.0"
futures-watch = { git = "https://github.com/carllerche/better-future" }
chashmap = "^2.2"
bytes = "^0.4"
prost = "^0.4"
prost-derive = "^0.4"
Expand All @@ -53,8 +52,9 @@ config = "^0.9"
log = "^0.4"
pretty_env_logger = "^0.3"
failure = "^0.1"
crossbeam = "^0.7"
num_cpus = "^1.10"
crossbeam = "^0.7"
hashbrown = "0.1.8"
serde = { version = "^1.0", features = ["derive"] }
clap = { version = "^2.32", features = ["color"] }
uuid = { version = "^0.7", features = ["v4"] }
Expand All @@ -67,14 +67,3 @@ debug-assertions = false
lto = true
rpath = false
codegen-units = 1

#[patch.crates-io]
#tower-service = { git = "https://github.com/tower-rs/tower" }
#tower-direct-service = { git = "https://github.com/tower-rs/tower" }
#tower-buffer = { git = "https://github.com/tower-rs/tower" }
#tower-grpc = { git = "https://github.com/tower-rs/tower-grpc" }
#tower-h2 = { git = "https://github.com/tower-rs/tower-h2" }
#tower-util = { git = "https://github.com/tower-rs/tower" }
#tower-http = { git = "https://github.com/tower-rs/tower-http" }
#tower-consul = { git = "https://github.com/LucioFranco/tower-consul" }
#tokio-connect = { git = "https://github.com/carllerche/tokio-connect" }
19 changes: 19 additions & 0 deletions config/config-rpc.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
host = "127.0.0.1"
port = 8081
path = "data/"
writer_memory = 200000000
log_level = "debug"
json_parsing_threads = 4
bulk_buffer_size = 10000
auto_commit_duration = 10
enable_clustering = true
master = false
nodes = [
"127.0.0.1:8081"
]

[merge_policy]
kind = "log"
min_merge_size = 8
min_layer_size = 10_000
level_log_size = 0.75
7 changes: 3 additions & 4 deletions config/config.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
host = "127.0.0.1"
port = 8080
path = "data/"
path = "data2/"
writer_memory = 200000000
log_level = "info"
json_parsing_threads = 4
Expand All @@ -9,12 +9,11 @@ auto_commit_duration = 10
enable_clustering = true
master = true
nodes = [
"127.0.0.1:8081",
"127.0.0.1:8082"
"127.0.0.1:8081"
]

[merge_policy]
kind = "log"
min_merge_size = 8
min_layer_size = 10_000
level_log_size = 0.75
level_log_size = 0.75
35 changes: 17 additions & 18 deletions src/bin/toshi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,26 +162,25 @@ fn run(catalog: Arc<RwLock<IndexCatalog>>, settings: &Settings) -> impl Future<I
println!("{}", HEADER);

if settings.enable_clustering {
let settings = settings.clone();
let place_addr = settings.place_addr.clone();
let consul_addr = settings.consul_addr.clone();
let cluster_name = settings.cluster_name.clone();

let run = future::lazy(move || connect_to_consul(&settings)).and_then(move |_| {
tokio::spawn(commit_watcher);

let consul = Consul::builder()
.with_cluster_name(cluster_name)
.with_address(consul_addr)
.build()
.expect("Could not build Consul client.");

let place_addr = place_addr.parse().expect("Placement address must be a valid SocketAddr");
tokio::spawn(cluster::run(place_addr, consul).map_err(|e| error!("Error with running cluster: {}", e)));

// I had this commented out for now in order to test the manual cluster reporting, next step is
// to turn this back on and get the information from consul instead.

// let consul = Consul::builder()
// .with_cluster_name(cluster_name)
// .with_address(consul_addr)
// .build()
// .expect("Could not build Consul client.");
//
// let place_addr = place_addr.parse().expect("Placement address must be a valid SocketAddr");
// tokio::spawn(cluster::run(place_addr, consul).map_err(|e| error!("Error with running cluster: {}", e)));
//
// router_with_catalog(&bind, &catalog)
//});
let run = commit_watcher.and_then(move |_| {
let update = catalog.read().unwrap().update_remote_indexes();
tokio::spawn(update);
router_with_catalog(&bind, &catalog)
});

future::Either::A(run)
} else {
let run = commit_watcher.and_then(move |_| router_with_catalog(&bind, &catalog));
Expand Down
60 changes: 43 additions & 17 deletions src/cluster/remote_handle.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::hash::{Hash, Hasher};

use log::info;
use tokio::prelude::*;
use tower_grpc::Request as TowerRequest;
Expand All @@ -13,19 +15,37 @@ use crate::query::Request;
/// the remote host and full filling the request via rpc, we need to figure out a better way
/// (tower-buffer) on how to keep these clients.

#[derive(Clone)]
pub struct RemoteIndex {
name: String,
remote: RpcClient,
remotes: Vec<RpcClient>,
}

impl PartialEq for RemoteIndex {
fn eq(&self, other: &RemoteIndex) -> bool {
self.name == *other.name
}
}

impl Eq for RemoteIndex {}

impl Hash for RemoteIndex {
fn hash<H: Hasher>(&self, state: &mut H) {
state.write(self.name.as_bytes())
}
}

impl RemoteIndex {
pub fn new(name: String, remote: RpcClient) -> Self {
Self { name, remote }
Self {
name,
remotes: vec![remote],
}
}
}

impl IndexHandle for RemoteIndex {
type SearchResponse = Box<Future<Item = SearchReply, Error = RPCError> + Send>;
type SearchResponse = Box<Future<Item = Vec<SearchReply>, Error = RPCError> + Send>;
type DeleteResponse = Box<Future<Item = ResultReply, Error = RPCError> + Send>;
type AddResponse = Box<Future<Item = ResultReply, Error = RPCError> + Send>;

Expand All @@ -39,25 +59,31 @@ impl IndexHandle for RemoteIndex {

fn search_index(&self, search: Request) -> Self::SearchResponse {
let name = self.name.clone();
let mut client = self.remote.clone();
let bytes = serde_json::to_vec(&search).unwrap();
let req = TowerRequest::new(SearchRequest { index: name, query: bytes });
let fut = client
.search_index(req)
.map(|res| {
info!("RESPONSE = {:?}", res);
res.into_inner()
})
.map_err(|e| {
info!("{:?}", e);
e.into()
let clients = self.remotes.clone();
info!("REQ = {:?}", search);
let fut = clients.into_iter().map(move |mut client| {
let bytes = serde_json::to_vec(&search).unwrap();
let req = TowerRequest::new(SearchRequest {
index: name.clone(),
query: bytes,
});
client
.search_index(req)
.map(|res| {
info!("RESPONSE = {:?}", res);
res.into_inner()
})
.map_err(|e| {
info!("ERR = {:?}", e);
e.into()
})
});

Box::new(fut)
Box::new(future::join_all(fut))
}

fn add_document(&self, _: AddDocument) -> Self::AddResponse {
unimplemented!()
unimplemented!("All of the mutating calls should probably have some strategy to balance how they distribute documents and knowing where things are")
}

fn delete_term(&self, _: DeleteDoc) -> Self::DeleteResponse {
Expand Down
106 changes: 66 additions & 40 deletions src/cluster/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::{Arc, RwLock};

use futures::{future, future::Future, stream::Stream};
use log::{error, info};
use tantivy::schema::Schema;
use tokio::net::{TcpListener, TcpStream};
use tokio_executor::DefaultExecutor;
use tower_buffer::Buffer;
Expand All @@ -15,8 +16,7 @@ use tower_util::MakeService;

use crate::cluster::cluster_rpc::server;
use crate::cluster::cluster_rpc::*;
use crate::cluster::GrpcConn;
use crate::cluster::RPCError;
use crate::cluster::{GrpcConn, RPCError};
use crate::handle::IndexHandle;
use crate::index::IndexCatalog;
use crate::query;
Expand Down Expand Up @@ -68,7 +68,7 @@ impl RpcServer {
.map(|c| {
let uri = uri;
let connection = Builder::new().uri(uri).build(c).unwrap();
let buffer = match Buffer::new(connection, 0) {
let buffer = match Buffer::new(connection, 128) {
Ok(b) => b,
_ => panic!("asdf"),
};
Expand All @@ -84,73 +84,99 @@ impl RpcServer {
pub fn create_search_reply(result: Option<ResultReply>, doc: Vec<u8>) -> SearchReply {
SearchReply { result, doc }
}

pub fn error_response<T>(code: Code, msg: String) -> future::FutureResult<Response<T>, Error> {
let status = Status::with_code_and_message(code, msg);
future::failed(Error::Grpc(status))
}
}

impl server::IndexService for RpcServer {
type ListIndexesFuture = future::FutureResult<Response<ListReply>, Error>;
type PlaceIndexFuture = future::FutureResult<Response<ResultReply>, Error>;
type PlaceDocumentFuture = Box<Future<Item = Response<ResultReply>, Error = Error> + Send>;
type PlaceReplicaFuture = Box<Future<Item = Response<ResultReply>, Error = Error> + Send>;
type PlaceDocumentFuture = future::FutureResult<Response<ResultReply>, Error>;
type PlaceReplicaFuture = future::FutureResult<Response<ResultReply>, Error>;
type SearchIndexFuture = future::FutureResult<Response<SearchReply>, Error>;

fn place_index(&mut self, _request: Request<PlaceRequest>) -> Self::PlaceIndexFuture {
unimplemented!()
}

fn list_indexes(&mut self, _: Request<ListRequest>) -> Self::ListIndexesFuture {
if let Ok(ref mut cat) = self.catalog.read() {
if let Ok(ref cat) = self.catalog.read() {
let indexes = cat.get_collection();
let lists: Vec<String> = indexes.into_iter().map(|t| t.0.to_string()).collect();
let lists: Vec<String> = indexes.into_iter().map(|(t, _)| t.to_string()).collect();
let resp = Response::new(ListReply { indexes: lists });
future::finished(resp)
} else {
let status = Status::with_code_and_message(Code::NotFound, "Could not get lock on index catalog".into());
let err = Error::Grpc(status);
future::failed(err)
Self::error_response(Code::NotFound, "Could not get lock on index catalog".into())
}
}

fn place_document(&mut self, _request: Request<DocumentRequest>) -> Self::PlaceDocumentFuture {
unimplemented!()
}

fn place_replica(&mut self, _request: Request<ReplicaRequest>) -> Self::PlaceReplicaFuture {
unimplemented!()
}

fn search_index(&mut self, request: Request<SearchRequest>) -> Self::SearchIndexFuture {
let inner = request.into_inner();
if let Ok(ref mut cat) = self.catalog.read() {
let index = cat.get_index(&inner.index).unwrap();
let query: query::Request = serde_json::from_slice(&inner.query).unwrap();
match index.search_index(query) {
Ok(query_results) => {
let query_bytes: Vec<u8> = serde_json::to_vec(&query_results).unwrap();
let result = Some(RpcServer::create_result(0, "".into()));
let resp = Response::new(RpcServer::create_search_reply(result, query_bytes));
future::finished(resp)
if let Ok(ref cat) = self.catalog.read() {
if let Ok(index) = cat.get_index(&inner.index) {
let query: query::Request = serde_json::from_slice(&inner.query).unwrap();
info!("QUERY = {:?}", query);
match index.search_index(query) {
Ok(query_results) => {
let query_bytes: Vec<u8> = serde_json::to_vec(&query_results).unwrap();
let result = Some(RpcServer::create_result(0, "".into()));
future::finished(Response::new(RpcServer::create_search_reply(result, query_bytes)))
}
Err(e) => {
let result = Some(RpcServer::create_result(1, e.to_string()));
future::finished(Response::new(RpcServer::create_search_reply(result, vec![])))
}
}
Err(e) => {
let result = Some(RpcServer::create_result(1, e.to_string()));
let resp = Response::new(RpcServer::create_search_reply(result, vec![]));
future::finished(resp)
} else {
Self::error_response(Code::NotFound, format!("Index: {} not found", inner.index))
}
} else {
Self::error_response(
Code::NotFound,
format!("Could not obtain lock on catalog for index: {}", inner.index),
)
}
}

fn place_index(&mut self, request: Request<PlaceRequest>) -> Self::PlaceIndexFuture {
let PlaceRequest { index, schema } = request.into_inner();
if let Ok(ref mut cat) = self.catalog.write() {
if let Ok(schema) = serde_json::from_slice::<Schema>(&schema) {
let ip = cat.base_path().clone();
if let Ok(new_index) = IndexCatalog::create_from_managed(ip, &index.clone(), schema) {
if let Ok(_) = cat.add_index(index.clone(), new_index) {
let result = RpcServer::create_result(0, "".into());
future::finished(Response::new(result))
} else {
Self::error_response(Code::Internal, format!("Insert: {} failed", index.clone()))
}
} else {
Self::error_response(Code::Internal, format!("Could not create index: {}", index.clone()))
}
} else {
Self::error_response(Code::NotFound, "Invalid schema in request".into())
}
} else {
let status = Status::with_code_and_message(Code::NotFound, format!("Index: {} not found", inner.index));
let err = Error::Grpc(status);
future::failed(err)
Self::error_response(Code::NotFound, format!("Cannot obtain lock on catalog for index: {}", index))
}
}

fn place_document(&mut self, _request: Request<DocumentRequest>) -> Self::PlaceDocumentFuture {
unimplemented!()
}

fn place_replica(&mut self, _request: Request<ReplicaRequest>) -> Self::PlaceReplicaFuture {
unimplemented!()
}
}

#[cfg(test)]
mod tests {
use crate::index::tests::create_test_catalog;
use crate::query::Query;
use futures::future::Future;
use http::Uri;

use crate::index::tests::create_test_catalog;
use crate::query::Query;

use super::*;

#[test]
Expand Down
Loading

0 comments on commit c6981d0

Please sign in to comment.