Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clustering #145

Merged
merged 14 commits into from
Feb 15, 2019
521 changes: 249 additions & 272 deletions Cargo.lock

Large diffs are not rendered by default.

19 changes: 4 additions & 15 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,12 @@ tower-util = { git = "https://github.com/tower-rs/tower" }
tower-http = { git = "https://github.com/tower-rs/tower-http" }
tower-consul = { git = "https://github.com/LucioFranco/tower-consul" }
tokio-connect = { git = "https://github.com/carllerche/tokio-connect" }
futures-watch = { git = "https://github.com/carllerche/better-future" }
tower-web = "^0.3"
http = "^0.1"
h2 = "0.1.16"
h2 = "^0.1"
taken = "0.1.1"
flate2 = "^1.0"
futures-watch = { git = "https://github.com/carllerche/better-future" }
chashmap = "^2.2"
bytes = "^0.4"
prost = "^0.4"
prost-derive = "^0.4"
Expand All @@ -53,8 +52,9 @@ config = "^0.9"
log = "^0.4"
pretty_env_logger = "^0.3"
failure = "^0.1"
crossbeam = "^0.7"
num_cpus = "^1.10"
crossbeam = "^0.7"
hashbrown = "0.1.8"
serde = { version = "^1.0", features = ["derive"] }
clap = { version = "^2.32", features = ["color"] }
uuid = { version = "^0.7", features = ["v4"] }
Expand All @@ -67,14 +67,3 @@ debug-assertions = false
lto = true
rpath = false
codegen-units = 1

#[patch.crates-io]
#tower-service = { git = "https://github.com/tower-rs/tower" }
#tower-direct-service = { git = "https://github.com/tower-rs/tower" }
#tower-buffer = { git = "https://github.com/tower-rs/tower" }
#tower-grpc = { git = "https://github.com/tower-rs/tower-grpc" }
#tower-h2 = { git = "https://github.com/tower-rs/tower-h2" }
#tower-util = { git = "https://github.com/tower-rs/tower" }
#tower-http = { git = "https://github.com/tower-rs/tower-http" }
#tower-consul = { git = "https://github.com/LucioFranco/tower-consul" }
#tokio-connect = { git = "https://github.com/carllerche/tokio-connect" }
19 changes: 19 additions & 0 deletions config/config-rpc.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
host = "127.0.0.1"
port = 8081
path = "data/"
writer_memory = 200000000
log_level = "debug"
json_parsing_threads = 4
bulk_buffer_size = 10000
auto_commit_duration = 10
enable_clustering = true
master = false
nodes = [
"127.0.0.1:8081"
]

[merge_policy]
kind = "log"
min_merge_size = 8
min_layer_size = 10_000
level_log_size = 0.75
7 changes: 3 additions & 4 deletions config/config.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
host = "127.0.0.1"
port = 8080
path = "data/"
path = "data2/"
writer_memory = 200000000
log_level = "info"
json_parsing_threads = 4
Expand All @@ -9,12 +9,11 @@ auto_commit_duration = 10
enable_clustering = true
master = true
nodes = [
"127.0.0.1:8081",
"127.0.0.1:8082"
"127.0.0.1:8081"
]

[merge_policy]
kind = "log"
min_merge_size = 8
min_layer_size = 10_000
level_log_size = 0.75
level_log_size = 0.75
35 changes: 17 additions & 18 deletions src/bin/toshi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,26 +162,25 @@ fn run(catalog: Arc<RwLock<IndexCatalog>>, settings: &Settings) -> impl Future<I
println!("{}", HEADER);

if settings.enable_clustering {
let settings = settings.clone();
let place_addr = settings.place_addr.clone();
let consul_addr = settings.consul_addr.clone();
let cluster_name = settings.cluster_name.clone();

let run = future::lazy(move || connect_to_consul(&settings)).and_then(move |_| {
tokio::spawn(commit_watcher);

let consul = Consul::builder()
.with_cluster_name(cluster_name)
.with_address(consul_addr)
.build()
.expect("Could not build Consul client.");

let place_addr = place_addr.parse().expect("Placement address must be a valid SocketAddr");
tokio::spawn(cluster::run(place_addr, consul).map_err(|e| error!("Error with running cluster: {}", e)));

// I had this commented out for now in order to test the manual cluster reporting, next step is
// to turn this back on and get the information from consul instead.

// let consul = Consul::builder()
// .with_cluster_name(cluster_name)
// .with_address(consul_addr)
// .build()
// .expect("Could not build Consul client.");
//
// let place_addr = place_addr.parse().expect("Placement address must be a valid SocketAddr");
// tokio::spawn(cluster::run(place_addr, consul).map_err(|e| error!("Error with running cluster: {}", e)));
//
// router_with_catalog(&bind, &catalog)
//});
let run = commit_watcher.and_then(move |_| {
let update = catalog.read().unwrap().update_remote_indexes();
tokio::spawn(update);
router_with_catalog(&bind, &catalog)
});

future::Either::A(run)
} else {
let run = commit_watcher.and_then(move |_| router_with_catalog(&bind, &catalog));
Expand Down
60 changes: 43 additions & 17 deletions src/cluster/remote_handle.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::hash::{Hash, Hasher};

use log::info;
use tokio::prelude::*;
use tower_grpc::Request as TowerRequest;
Expand All @@ -13,19 +15,37 @@ use crate::query::Request;
/// the remote host and full filling the request via rpc, we need to figure out a better way
/// (tower-buffer) on how to keep these clients.

#[derive(Clone)]
pub struct RemoteIndex {
name: String,
remote: RpcClient,
remotes: Vec<RpcClient>,
}

impl PartialEq for RemoteIndex {
fn eq(&self, other: &RemoteIndex) -> bool {
self.name == *other.name
}
}

impl Eq for RemoteIndex {}

impl Hash for RemoteIndex {
fn hash<H: Hasher>(&self, state: &mut H) {
state.write(self.name.as_bytes())
}
}

impl RemoteIndex {
pub fn new(name: String, remote: RpcClient) -> Self {
Self { name, remote }
Self {
name,
remotes: vec![remote],
}
}
}

impl IndexHandle for RemoteIndex {
type SearchResponse = Box<Future<Item = SearchReply, Error = RPCError> + Send>;
type SearchResponse = Box<Future<Item = Vec<SearchReply>, Error = RPCError> + Send>;
type DeleteResponse = Box<Future<Item = ResultReply, Error = RPCError> + Send>;
type AddResponse = Box<Future<Item = ResultReply, Error = RPCError> + Send>;

Expand All @@ -39,25 +59,31 @@ impl IndexHandle for RemoteIndex {

fn search_index(&self, search: Request) -> Self::SearchResponse {
let name = self.name.clone();
let mut client = self.remote.clone();
let bytes = serde_json::to_vec(&search).unwrap();
let req = TowerRequest::new(SearchRequest { index: name, query: bytes });
let fut = client
.search_index(req)
.map(|res| {
info!("RESPONSE = {:?}", res);
res.into_inner()
})
.map_err(|e| {
info!("{:?}", e);
e.into()
let clients = self.remotes.clone();
info!("REQ = {:?}", search);
let fut = clients.into_iter().map(move |mut client| {
let bytes = serde_json::to_vec(&search).unwrap();
let req = TowerRequest::new(SearchRequest {
index: name.clone(),
query: bytes,
});
client
.search_index(req)
.map(|res| {
info!("RESPONSE = {:?}", res);
res.into_inner()
})
.map_err(|e| {
info!("ERR = {:?}", e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prob want this to be an error! 😄

e.into()
})
});

Box::new(fut)
Box::new(future::join_all(fut))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This actually has a concrete type so we don't have to box here but I have some ideas around this and its not a big deal right now but down the road we want to remove all these boxes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so you'd want the return type to be a JoinAll ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can, that would remove the need for a Box call. though may be harder than that if we use the combinators, though I wouldnt worry about this now. I have some ideas how we can improve this in the future.

}

fn add_document(&self, _: AddDocument) -> Self::AddResponse {
unimplemented!()
unimplemented!("All of the mutating calls should probably have some strategy to balance how they distribute documents and knowing where things are")
}

fn delete_term(&self, _: DeleteDoc) -> Self::DeleteResponse {
Expand Down
106 changes: 66 additions & 40 deletions src/cluster/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::{Arc, RwLock};

use futures::{future, future::Future, stream::Stream};
use log::{error, info};
use tantivy::schema::Schema;
use tokio::net::{TcpListener, TcpStream};
use tokio_executor::DefaultExecutor;
use tower_buffer::Buffer;
Expand All @@ -15,8 +16,7 @@ use tower_util::MakeService;

use crate::cluster::cluster_rpc::server;
use crate::cluster::cluster_rpc::*;
use crate::cluster::GrpcConn;
use crate::cluster::RPCError;
use crate::cluster::{GrpcConn, RPCError};
use crate::handle::IndexHandle;
use crate::index::IndexCatalog;
use crate::query;
Expand Down Expand Up @@ -68,7 +68,7 @@ impl RpcServer {
.map(|c| {
let uri = uri;
let connection = Builder::new().uri(uri).build(c).unwrap();
let buffer = match Buffer::new(connection, 0) {
let buffer = match Buffer::new(connection, 128) {
Ok(b) => b,
_ => panic!("asdf"),
};
Expand All @@ -84,73 +84,99 @@ impl RpcServer {
pub fn create_search_reply(result: Option<ResultReply>, doc: Vec<u8>) -> SearchReply {
SearchReply { result, doc }
}

pub fn error_response<T>(code: Code, msg: String) -> future::FutureResult<Response<T>, Error> {
let status = Status::with_code_and_message(code, msg);
future::failed(Error::Grpc(status))
}
}

impl server::IndexService for RpcServer {
type ListIndexesFuture = future::FutureResult<Response<ListReply>, Error>;
type PlaceIndexFuture = future::FutureResult<Response<ResultReply>, Error>;
type PlaceDocumentFuture = Box<Future<Item = Response<ResultReply>, Error = Error> + Send>;
type PlaceReplicaFuture = Box<Future<Item = Response<ResultReply>, Error = Error> + Send>;
type PlaceDocumentFuture = future::FutureResult<Response<ResultReply>, Error>;
type PlaceReplicaFuture = future::FutureResult<Response<ResultReply>, Error>;
type SearchIndexFuture = future::FutureResult<Response<SearchReply>, Error>;

fn place_index(&mut self, _request: Request<PlaceRequest>) -> Self::PlaceIndexFuture {
unimplemented!()
}

fn list_indexes(&mut self, _: Request<ListRequest>) -> Self::ListIndexesFuture {
if let Ok(ref mut cat) = self.catalog.read() {
if let Ok(ref cat) = self.catalog.read() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this call block? if so we should probably wrap this call in a blocking https://docs.rs/tokio-threadpool/0.1.11/tokio_threadpool/fn.blocking.html

let indexes = cat.get_collection();
let lists: Vec<String> = indexes.into_iter().map(|t| t.0.to_string()).collect();
let lists: Vec<String> = indexes.into_iter().map(|(t, _)| t.to_string()).collect();
let resp = Response::new(ListReply { indexes: lists });
future::finished(resp)
} else {
let status = Status::with_code_and_message(Code::NotFound, "Could not get lock on index catalog".into());
let err = Error::Grpc(status);
future::failed(err)
Self::error_response(Code::NotFound, "Could not get lock on index catalog".into())
}
}

fn place_document(&mut self, _request: Request<DocumentRequest>) -> Self::PlaceDocumentFuture {
unimplemented!()
}

fn place_replica(&mut self, _request: Request<ReplicaRequest>) -> Self::PlaceReplicaFuture {
unimplemented!()
}

fn search_index(&mut self, request: Request<SearchRequest>) -> Self::SearchIndexFuture {
let inner = request.into_inner();
if let Ok(ref mut cat) = self.catalog.read() {
let index = cat.get_index(&inner.index).unwrap();
let query: query::Request = serde_json::from_slice(&inner.query).unwrap();
match index.search_index(query) {
Ok(query_results) => {
let query_bytes: Vec<u8> = serde_json::to_vec(&query_results).unwrap();
let result = Some(RpcServer::create_result(0, "".into()));
let resp = Response::new(RpcServer::create_search_reply(result, query_bytes));
future::finished(resp)
if let Ok(ref cat) = self.catalog.read() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

if let Ok(index) = cat.get_index(&inner.index) {
let query: query::Request = serde_json::from_slice(&inner.query).unwrap();
info!("QUERY = {:?}", query);
match index.search_index(query) {
Ok(query_results) => {
let query_bytes: Vec<u8> = serde_json::to_vec(&query_results).unwrap();
let result = Some(RpcServer::create_result(0, "".into()));
future::finished(Response::new(RpcServer::create_search_reply(result, query_bytes)))
}
Err(e) => {
let result = Some(RpcServer::create_result(1, e.to_string()));
future::finished(Response::new(RpcServer::create_search_reply(result, vec![])))
}
}
Err(e) => {
let result = Some(RpcServer::create_result(1, e.to_string()));
let resp = Response::new(RpcServer::create_search_reply(result, vec![]));
future::finished(resp)
} else {
Self::error_response(Code::NotFound, format!("Index: {} not found", inner.index))
}
} else {
Self::error_response(
Code::NotFound,
format!("Could not obtain lock on catalog for index: {}", inner.index),
)
}
}

fn place_index(&mut self, request: Request<PlaceRequest>) -> Self::PlaceIndexFuture {
let PlaceRequest { index, schema } = request.into_inner();
if let Ok(ref mut cat) = self.catalog.write() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

if let Ok(schema) = serde_json::from_slice::<Schema>(&schema) {
let ip = cat.base_path().clone();
if let Ok(new_index) = IndexCatalog::create_from_managed(ip, &index.clone(), schema) {
if let Ok(_) = cat.add_index(index.clone(), new_index) {
let result = RpcServer::create_result(0, "".into());
future::finished(Response::new(result))
} else {
Self::error_response(Code::Internal, format!("Insert: {} failed", index.clone()))
}
} else {
Self::error_response(Code::Internal, format!("Could not create index: {}", index.clone()))
}
} else {
Self::error_response(Code::NotFound, "Invalid schema in request".into())
}
} else {
let status = Status::with_code_and_message(Code::NotFound, format!("Index: {} not found", inner.index));
let err = Error::Grpc(status);
future::failed(err)
Self::error_response(Code::NotFound, format!("Cannot obtain lock on catalog for index: {}", index))
}
}

fn place_document(&mut self, _request: Request<DocumentRequest>) -> Self::PlaceDocumentFuture {
unimplemented!()
}

fn place_replica(&mut self, _request: Request<ReplicaRequest>) -> Self::PlaceReplicaFuture {
unimplemented!()
}
}

#[cfg(test)]
mod tests {
use crate::index::tests::create_test_catalog;
use crate::query::Query;
use futures::future::Future;
use http::Uri;

use crate::index::tests::create_test_catalog;
use crate::query::Query;

use super::*;

#[test]
Expand Down
Loading