Skip to content

Commit

Permalink
Merge e5d0341 into 113a0a0
Browse files Browse the repository at this point in the history
  • Loading branch information
hntd187 committed Mar 31, 2019
2 parents 113a0a0 + e5d0341 commit fbe6f64
Show file tree
Hide file tree
Showing 17 changed files with 419 additions and 196 deletions.
322 changes: 232 additions & 90 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ futures = "^0.1"
tantivy = "^0.9"
tokio = "^0.1"
tokio-executor = "^0.1"
tokio-signal = "=0.2.7"
tokio-signal = "^0.2"
config = "^0.9"
log = "^0.4"
pretty_env_logger = "^0.3"
Expand All @@ -60,6 +60,7 @@ systemstat = { git = "https://github.com/toshi-search/systemstat" }
[dev-dependencies]
remove_dir_all = "0.5.1"
pretty_assertions = "0.6.1"
csv = "1"

[profile.release]
opt-level = 3
Expand Down
34 changes: 26 additions & 8 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,36 @@ jobs:
displayName: Run Tests
strategy:
matrix:
Linux:
vmImage: ubuntu-16.04
MacOS:
vmImage: macOS-10.13
Windows:
vmImage: vs2017-win2016
Linux-Stable:
imageName: ubuntu-16.04
rustup_toolchain: stable
MacOS-Stable:
imageName: macOS-10.13
rustup_toolchain: stable
Windows-Stable:
imageName: vs2017-win2016
rustup_toolchain: stable
Linux-Nightly:
imageName: ubuntu-16.04
rustup_toolchain: nightly
optional: true
MacOS-Nightly:
imageName: macOS-10.13
rustup_toolchain: nightly
optional: true
Windows-Nightly:
imageName: vs2017-win2016
rustup_toolchain: nightly
optional: true
pool:
vmImage: $(vmImage)
vmImage: $(imageName)
steps:
- template: ci/azure-install-deps.yml
- script: |
cargo fmt --all -- --check
cargo test
displayName: cargo test
- template: ci/azure-run-kcov.yml
- template: ci/azure-run-kcov.yml
- script: |
echo "##vso[task.complete result=Succeeded;]DONE"
condition: and(always(), in(variables['Build.DefinitionName'], "Linux-Nightly", "MacOS-Nightly", "Windows-Nightly"))
2 changes: 1 addition & 1 deletion ci/azure-install-rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ steps:
echo "##vso[task.setvariable variable=PATH;]%PATH%;%USERPROFILE%\.cargo\bin"
env:
RUSTUP_TOOLCHAIN: ${{parameters.rust_version}}
displayName: "Install rust (windows)"
displayName: "Install rust (Windows)"
condition: eq(variables['Agent.OS'], 'Windows_NT')
# All platforms.
- script: |
Expand Down
11 changes: 5 additions & 6 deletions config/config-rpc.toml
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
host = "127.0.0.1"
host = "::1"
port = 8081
path = "data/"
writer_memory = 200000000
log_level = "debug"
log_level = "info"
json_parsing_threads = 4
bulk_buffer_size = 10000
auto_commit_duration = 10
enable_clustering = true
experimental = true

[experimental_features]
master = false
nodes = [
"127.0.0.1:8081"
]

[merge_policy]
kind = "log"
Expand Down
8 changes: 4 additions & 4 deletions config/config.toml
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
host = "127.0.0.1"
host = "::1"
port = 8080
path = "data/"
path = "data2/"
writer_memory = 200000000
log_level = "info"
json_parsing_threads = 4
bulk_buffer_size = 10000
auto_commit_duration = 10
experimental = false
experimental = true

[experimental_features]
master = true
nodes = [
"127.0.0.1:8081"
"[::1]:8081"
]

[merge_policy]
Expand Down
4 changes: 0 additions & 4 deletions scripts/Dockerfile

This file was deleted.

4 changes: 0 additions & 4 deletions scripts/Dockerfile.builder

This file was deleted.

7 changes: 0 additions & 7 deletions scripts/Makefile

This file was deleted.

43 changes: 22 additions & 21 deletions src/bin/toshi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,10 @@ pub fn main() -> Result<(), ()> {
};

let toshi = {
// If experimental is enabled and master is false, they are a data node...
// not even going to try and become the master...
let server = if settings.experimental && settings.experimental_features.master {
future::Either::A(run_master(Arc::clone(&index_catalog), &settings))
} else {
future::Either::B(run(Arc::clone(&index_catalog), &settings))
future::Either::B(run_data(Arc::clone(&index_catalog), &settings))
};
let shutdown = shutdown::shutdown(tx);
server.select(shutdown)
Expand All @@ -72,29 +70,33 @@ pub fn main() -> Result<(), ()> {
.wait()
}

fn run_master(catalog: Arc<RwLock<IndexCatalog>>, settings: &Settings) -> impl Future<Item = (), Error = ()> {
let addr: IpAddr = settings.host.parse().expect(&format!("Invalid ip address: {}", &settings.host));
let settings = settings.clone();
let bind: SocketAddr = SocketAddr::new(addr, settings.port);

println!("{}", RPC_HEADER);
info!("I am a data node...Binding to: {}", addr);
future::lazy(move || cluster::connect_to_consul(&settings)).and_then(move |_| RpcServer::serve(bind, catalog))
}

fn run(catalog: Arc<RwLock<IndexCatalog>>, settings: &Settings) -> impl Future<Item = (), Error = ()> {
let commit_watcher = if settings.auto_commit_duration > 0 {
fn create_watcher(catalog: Arc<RwLock<IndexCatalog>>, settings: &Settings) -> impl Future<Item = (), Error = ()> {
if settings.auto_commit_duration > 0 {
let commit_watcher = IndexWatcher::new(catalog.clone(), settings.auto_commit_duration);
future::Either::A(future::lazy(move || {
commit_watcher.start();
future::ok::<(), ()>(())
}))
} else {
future::Either::B(future::ok::<(), ()>(()))
};
}
}

let addr = format!("{}:{}", &settings.host, settings.port);
let bind: SocketAddr = addr.parse().expect("Failed to parse socket address");
fn run_data(catalog: Arc<RwLock<IndexCatalog>>, settings: &Settings) -> impl Future<Item = (), Error = ()> {
let commit_watcher = create_watcher(Arc::clone(&catalog), settings);
let addr: IpAddr = settings.host.parse().expect(&format!("Invalid ip address: {}", &settings.host));
let settings = settings.clone();
let bind: SocketAddr = SocketAddr::new(addr, settings.port);

println!("{}", RPC_HEADER);
info!("I am a data node...Binding to: {}", addr);
commit_watcher.and_then(move |_| RpcServer::serve(bind, catalog))
}

fn run_master(catalog: Arc<RwLock<IndexCatalog>>, settings: &Settings) -> impl Future<Item = (), Error = ()> {
let commit_watcher = create_watcher(Arc::clone(&catalog), settings);
let addr: IpAddr = settings.host.parse().expect(&format!("Invalid ip address: {}", &settings.host));
let bind: SocketAddr = SocketAddr::new(addr, settings.port);

println!("{}", HEADER);

Expand All @@ -105,10 +107,9 @@ fn run(catalog: Arc<RwLock<IndexCatalog>>, settings: &Settings) -> impl Future<I
let cluster_name = settings.experimental_features.cluster_name.clone();
let nodes = settings.experimental_features.nodes.clone();

let run = future::lazy(move || cluster::connect_to_consul(&settings)).and_then(move |_| {
tokio::spawn(commit_watcher);

let run = commit_watcher.and_then(move |_| {
if nodes.is_empty() {
tokio::spawn(cluster::connect_to_consul(&settings));
let consul = cluster::Consul::builder()
.with_cluster_name(cluster_name)
.with_address(consul_addr)
Expand Down
9 changes: 6 additions & 3 deletions src/cluster/remote_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,18 @@ impl IndexHandle for RemoteIndex {
let clients = self.remotes.clone();
info!("REQ = {:?}", search);
let fut = clients.into_iter().map(move |mut client| {
let bytes = serde_json::to_vec(&search).unwrap();
let bytes = match serde_json::to_vec(&search) {
Ok(v) => v,
Err(_) => Vec::new(),
};
let req = TowerRequest::new(SearchRequest {
index: name.clone(),
query: bytes,
});
client
.search_index(req)
.map(|res| {
info!("RESPONSE = {:?}", res);
//info!("RESPONSE = {:?}", res);
res.into_inner()
})
.map_err(|e| {
Expand All @@ -87,7 +90,7 @@ impl IndexHandle for RemoteIndex {
unimplemented!("All of the mutating calls should probably have some strategy to balance how they distribute documents and knowing where things are")
}

fn delete_term(&self, _: DeleteDoc) -> Self::DeleteResponse {
fn delete_term(&mut self, _: DeleteDoc) -> Self::DeleteResponse {
unimplemented!()
}
}
24 changes: 18 additions & 6 deletions src/cluster/rpc_server.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::io::Error;
use std::net::SocketAddr;
use std::sync::{Arc, RwLock};

Expand All @@ -20,6 +21,7 @@ use crate::handle::IndexHandle;
use crate::handlers::index::AddDocument;
use crate::index::IndexCatalog;
use crate::query;
use crate::query::Query::All;

//pub type RpcClient = client::IndexService<Buffer<Connection<Body>, http::Request<Body>>>;

Expand Down Expand Up @@ -54,17 +56,16 @@ impl RpcServer {

bind.incoming()
.for_each(move |sock| {
info!("Connection from: {:?}", sock.local_addr().unwrap());
let req = h2.serve(sock).map_err(|err| error!("h2 error: {:?}", err));
tokio::spawn(req);
Ok(())
})
.map_err(|err| error!("Server Error: {:?}", err))
}

pub fn create_client(
conn: GrpcConn,
uri: http::Uri,
) -> impl Future<Item = RpcClient, Error = ConnectError<::std::io::Error>> + Send + 'static {
pub fn create_client(conn: GrpcConn, uri: http::Uri) -> impl Future<Item = RpcClient, Error = ConnectError<Error>> + Send {
info!("Creating Client to: {:?}", uri);
let mut connect = Connect::new(conn, Default::default(), DefaultExecutor::current());

connect.make_service(()).map(|c| {
Expand Down Expand Up @@ -99,11 +100,13 @@ impl server::IndexService for RpcServer {
type PlaceReplicaFuture = Box<future::FutureResult<Response<ResultReply>, Status>>;
type SearchIndexFuture = Box<future::FutureResult<Response<SearchReply>, Status>>;

fn list_indexes(&mut self, _: Request<ListRequest>) -> Self::ListIndexesFuture {
fn list_indexes(&mut self, req: Request<ListRequest>) -> Self::ListIndexesFuture {
if let Ok(ref cat) = self.catalog.read() {
info!("Request From: {:?}", req);
let indexes = cat.get_collection();
let lists: Vec<String> = indexes.into_iter().map(|(t, _)| t.to_string()).collect();
let resp = Response::new(ListReply { indexes: lists });
info!("Response: {:?}", resp);
Box::new(future::finished(resp))
} else {
Self::error_response(Code::NotFound, "Could not get lock on index catalog".into())
Expand All @@ -114,7 +117,16 @@ impl server::IndexService for RpcServer {
let inner = request.into_inner();
if let Ok(ref cat) = self.catalog.read() {
if let Ok(index) = cat.get_index(&inner.index) {
let query: query::Request = serde_json::from_slice(&inner.query).unwrap();
let query: query::Request = match serde_json::from_slice(&inner.query) {
Ok(query::Request {
query: None,
ref aggs,
limit,
}) => query::Request::new(Some(All), aggs.clone(), limit),
Ok(v) => v,
Err(e) => return Self::error_response(Code::Internal, e.to_string()),
};

info!("QUERY = {:?}", query);
match index.search_index(query) {
Ok(query_results) => {
Expand Down
27 changes: 12 additions & 15 deletions src/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use log::debug;
use tantivy::collector::TopDocs;
use tantivy::query::{AllQuery, QueryParser};
use tantivy::schema::*;
use tantivy::{Document, Index, IndexWriter, ReloadPolicy, Term};
use tantivy::{Document, Index, IndexReader, IndexWriter, ReloadPolicy, Term};
use tokio::prelude::*;

use crate::handlers::index::{AddDocument, DeleteDoc, DocsAffected};
Expand All @@ -29,7 +29,7 @@ pub trait IndexHandle {
fn index_location(&self) -> IndexLocation;
fn search_index(&self, search: Request) -> Self::SearchResponse;
fn add_document(&self, doc: AddDocument) -> Self::AddResponse;
fn delete_term(&self, term: DeleteDoc) -> Self::DeleteResponse;
fn delete_term(&mut self, term: DeleteDoc) -> Self::DeleteResponse;
}

/// Index handle that operates on an Index local to the node, a remote index handle
Expand All @@ -38,7 +38,9 @@ pub trait IndexHandle {
pub struct LocalIndex {
index: Index,
writer: Arc<Mutex<IndexWriter>>,
reader: IndexReader,
current_opstamp: AtomicUsize,
deleted_docs: u64,
settings: Settings,
name: String,
}
Expand Down Expand Up @@ -71,12 +73,7 @@ impl IndexHandle for LocalIndex {
}

fn search_index(&self, search: Request) -> Self::SearchResponse {
let searcher = self
.index
.reader_builder()
.reload_policy(ReloadPolicy::OnCommit)
.try_into()?
.searcher();
let searcher = self.reader.searcher();
let schema = self.index.schema();
let collector = TopDocs::with_limit(search.limit);
if let Some(query) = search.query {
Expand Down Expand Up @@ -145,10 +142,11 @@ impl IndexHandle for LocalIndex {
Ok(())
}

fn delete_term(&self, term: DeleteDoc) -> Self::DeleteResponse {
fn delete_term(&mut self, term: DeleteDoc) -> Self::DeleteResponse {
let index_schema = self.index.schema();
let writer_lock = self.get_writer();
let mut index_writer = writer_lock.lock()?;
let before = self.reader.searcher().num_docs();

for (field, value) in term.terms {
let f = index_schema.get_field(&field).unwrap();
Expand All @@ -161,12 +159,8 @@ impl IndexHandle for LocalIndex {
self.set_opstamp(0);
}
}
let docs_affected = self
.index
.load_metas()
.map(|meta| meta.segments.iter().map(|seg| seg.num_deleted_docs()).sum())
.unwrap_or(0);

let docs_affected = before - self.reader.searcher().num_docs();
self.deleted_docs += docs_affected;
Ok(DocsAffected { docs_affected })
}
}
Expand All @@ -177,10 +171,13 @@ impl LocalIndex {
i.set_merge_policy(settings.get_merge_policy());
let current_opstamp = AtomicUsize::new(0);
let writer = Arc::new(Mutex::new(i));
let reader = index.reader_builder().reload_policy(ReloadPolicy::OnCommit).try_into()?;
Ok(Self {
index,
reader,
writer,
current_opstamp,
deleted_docs: 0,
settings,
name: name.into(),
})
Expand Down
Loading

0 comments on commit fbe6f64

Please sign in to comment.