Skip to content

Commit

Permalink
rust 2018, and tower-grcp start for placement driver
Browse files Browse the repository at this point in the history
  • Loading branch information
hntd187 committed Dec 4, 2018
1 parent 54fdaf9 commit 9ddc57d
Show file tree
Hide file tree
Showing 12 changed files with 448 additions and 77 deletions.
279 changes: 253 additions & 26 deletions Cargo.lock

Large diffs are not rendered by default.

19 changes: 15 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,28 @@ authors = ["Stephen Carman <shcarman@gmail.com>"]
description = "A full text search engine based on Tantivy"
repository = "https://github.com/toshi-search/Toshi"
license = "MIT"
edition = "2018"

[[bin]]
name = "toshi"
path = "src/bin/main.rs"

[[bin]]
name = "toshi-placement"

[lib]
path = "src/lib.rs"

[build-dependencies]
tower-grpc-build = { git = "https://github.com/tower-rs/tower-grpc" }

[dependencies]
gotham = { git = "https://github.com/gotham-rs/gotham", rev = "9e60d8752fd5bb569f4156e60ee1919198702273"}
gotham_derive = { git = "https://github.com/gotham-rs/gotham", rev = "9e60d8752fd5bb569f4156e60ee1919198702273"}
gotham = { git = "https://github.com/gotham-rs/gotham", rev = "9e60d8752fd5bb569f4156e60ee1919198702273" }
gotham_derive = { git = "https://github.com/gotham-rs/gotham", rev = "9e60d8752fd5bb569f4156e60ee1919198702273" }
tower-grpc = { git = "https://github.com/tower-rs/tower-grpc" }
tower-h2 = { git = "https://github.com/tower-rs/tower-h2" }
bytes = "^0.4"
prost = "^0.4"
prost-derive = "^0.4"
hyper = "^0.12"
mime = "^0.3"
serde = "^1.0"
Expand All @@ -30,7 +41,7 @@ log = "^0.4"
pretty_env_logger = "^0.2"
failure = "^0.1"
crossbeam = "^0.5"
clap = "^2.32"
clap = { version = "^2.32", features = ["color"] }
num_cpus = "^1.0"
systemstat = { git = "https://github.com/toshi-search/systemstat", branch = "master" }
uuid = { version = "0.7", features = ["v4"] }
Expand Down
9 changes: 9 additions & 0 deletions build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
extern crate tower_grpc_build;

fn main() {
tower_grpc_build::Config::new()
.enable_server(true)
.enable_client(true)
.build(&["proto/placement.proto"], &["proto/"])
.unwrap_or_else(|e| panic!("Compilation failed :( {}", e));
}
4 changes: 2 additions & 2 deletions docs/api.raml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ types:
options?: Options
document: Document

/_version:
/:
displayName: Get Version
description: Returns the current version of Toshi running.
get:
Expand All @@ -29,7 +29,7 @@ types:
application/json: |
{
"name": "Toshi Search",
"version": "Toshi Search, Version: 0.1.0"
"version": "Toshi Search, Version: 0.1.1"
}
/{index}:
displayName: Index Operations
Expand Down
27 changes: 27 additions & 0 deletions proto/placement.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
syntax = "proto3";

option java_multiple_files = true;
option java_package = "io.toshi.placement";
option java_outer_classname = "ToshiPlacement";

package placement;

service Placement {
rpc getPlacement (PlacementRequest) returns (PlacementReply) {
}
}

enum IndexKind {
SHARD = 0;
REPLICA = 1;
}

message PlacementRequest {
string index = 1;
IndexKind kind = 2;
}

message PlacementReply {
string host = 1;
IndexKind kind = 2;
}
14 changes: 0 additions & 14 deletions proto/wal.capnp

This file was deleted.

58 changes: 58 additions & 0 deletions src/bin/toshi-placement.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use clap::{crate_authors, crate_description, crate_version, App, AppSettings, Arg, ArgMatches};
use log::info;

use toshi::cluster::Place;
use toshi::settings::HEADER;

fn main() {
let settings = settings();
let host = settings.value_of("host").unwrap();
let port = settings.value_of("port").unwrap();
std::env::set_var("RUST_LOG", settings.value_of("level").unwrap());
pretty_env_logger::init();

println!("{}", HEADER);
info!("Starting Toshi Placement Service...");
let addr = format!("{}:{}", host, port).parse().unwrap();
let service = Place::get_service(addr);

tokio::run(service);
}

fn settings<'a>() -> ArgMatches<'a> {
App::new("Toshi Placement Driver")
.version(crate_version!())
.about(crate_description!())
.author(crate_authors!())
.arg(
Arg::with_name("host")
.short("h")
.long("host")
.takes_value(true)
.default_value("127.0.0.1"),
).arg(
Arg::with_name("port")
.short("p")
.long("port")
.takes_value(true)
.default_value("8081"),
).arg(
Arg::with_name("level")
.short("l")
.long("level")
.takes_value(true)
.default_value("info"),
).arg(
Arg::with_name("consul-host")
.short("C")
.long("consul-host")
.takes_value(true)
.default_value("localhost"),
).arg(
Arg::with_name("consul-port")
.short("P")
.long("consul-port")
.takes_value(true)
.default_value("8500"),
).get_matches()
}
12 changes: 0 additions & 12 deletions src/bin/main.rs → src/bin/toshi.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,3 @@
extern crate clap;
extern crate futures;
extern crate gotham;
extern crate hyper;
extern crate log;
extern crate num_cpus;
extern crate pretty_env_logger;
extern crate systemstat;
extern crate tokio;
extern crate toshi;
extern crate uuid;

use clap::{crate_authors, crate_description, crate_version, App, Arg, ArgMatches};
use futures::{future, sync::oneshot, Future, Stream};
use log::{error, info};
Expand Down
10 changes: 10 additions & 0 deletions src/cluster/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@
//! Contains code related to clustering

pub mod placement {
#[cfg(target_family = "unix")]
include!(concat!(env!("OUT_DIR"), "/placement.rs"));

#[cfg(target_family = "windows")]
include!(concat!(env!("OUT_DIR"), "\\placement.rs"));
}

pub mod consul_interface;
pub mod node;
pub mod placement_server;
pub mod shard;

pub use self::consul_interface::ConsulInterface;
pub use self::node::*;
pub use self::placement_server::Place;

#[derive(Debug, Fail, Serialize, Deserialize)]
pub enum ClusterError {
Expand Down
72 changes: 72 additions & 0 deletions src/cluster/placement_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use std::net::SocketAddr;

use futures::{future, Future, Stream};
use log::{error, info};

use tokio::executor::DefaultExecutor;
use tokio::net::TcpListener;

use tower_grpc::{Error, Request, Response};
use tower_h2::Server;

use super::placement::{server, IndexKind, PlacementReply, PlacementRequest};

#[derive(Clone, Debug)]
pub struct Place;

impl server::Placement for Place {
type GetPlacementFuture = future::FutureResult<Response<PlacementReply>, Error>;

fn get_placement(&mut self, request: Request<PlacementRequest>) -> Self::GetPlacementFuture {
info!("Request = {:?}", request);
let response = Response::new(PlacementReply {
host: "localhost:90189271876281".into(),
kind: IndexKind::Shard.into(),
});

future::ok(response)
}
}

impl Place {
pub fn get_service(addr: SocketAddr) -> impl Future<Item = (), Error = ()> {
let service = server::PlacementServer::new(Place);
let executor = DefaultExecutor::current();
let mut h2 = Server::new(service, Default::default(), executor);

info!("Binding on port: {:?}", addr);
let bind = TcpListener::bind(&addr).expect(&format!("Failed to bind to host: {:?}", addr));

info!("Bound to: {:?}", &bind.local_addr().unwrap());
bind.incoming()
.for_each(move |sock| {
let req = h2.serve(sock).map_err(|err| error!("h2 error: {:?}", err));
tokio::spawn(req);
Ok(())
}).map_err(|err| error!("Server Error: {:?}", err))
}
}

#[cfg(test)]
mod tests {
use super::*;
use tokio::net::TcpStream;
use tower_h2::client::Connect;

pub struct Conn(SocketAddr);

#[test]
fn client_test() {
let addr: SocketAddr = "127.0.0.1:8081".parse().unwrap();
let mut server = Place::get_service(addr.clone());

let req = PlacementRequest {
index: "test".into(),
kind: IndexKind::Shard.into(),
};
let tcp_stream = Box::new(TcpStream::connect(&addr).and_then(|tcp| tcp.set_nodelay(true).map(move |_| tcp)));

let mut c = Connect::new(tcp_stream, Default::default(), DefaultExecutor::current());
}

}
19 changes: 0 additions & 19 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,6 @@
#![warn(unused_extern_crates)]
extern crate clap;
extern crate config;
extern crate crossbeam;
#[macro_use]
extern crate failure;
extern crate futures;
extern crate gotham;
#[macro_use]
extern crate gotham_derive;
extern crate hyper;
extern crate log;
extern crate mime;
extern crate num_cpus;
extern crate serde;
#[macro_use]
extern crate serde_derive;
extern crate serde_json;
extern crate systemstat;
extern crate tantivy;
extern crate tokio;
extern crate uuid;

use failure::Fail;
use gotham::handler::{HandlerError, IntoHandlerError};
Expand Down
2 changes: 2 additions & 0 deletions src/query/aggregate/sum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ pub struct SummaryDoc {
value: u64,
}

#[allow(dead_code)]
pub struct SumCollector<'a> {
field: String,
collector: TopCollector,
searcher: &'a Searcher,
}

impl<'a> SumCollector<'a> {
#[allow(dead_code)]
pub fn new(field: String, searcher: &'a Searcher, collector: TopCollector) -> Self {
Self {
field,
Expand Down

0 comments on commit 9ddc57d

Please sign in to comment.