Skip to content

Commit

Permalink
Merge pull request #6 from hntd187/delete_term
Browse files Browse the repository at this point in the history
Delete term functionality
  • Loading branch information
hntd187 committed Sep 13, 2018
2 parents b3188b6 + 2ecbb23 commit 1f0b227
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 47 deletions.
184 changes: 139 additions & 45 deletions src/handlers/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ use super::super::{Error, Result};
use super::*;

use futures::{future, Future, Stream};
use std::collections::HashMap;
use std::fs;
use std::io::Result as IOResult;
use std::panic::RefUnwindSafe;
use std::sync::RwLock;

use hyper::Method;
use tantivy::schema::*;
use tantivy::{Document, Index};

Expand All @@ -23,6 +25,11 @@ pub struct IndexDoc {
fields: Vec<FieldValues>,
}

#[derive(Deserialize)]
pub struct DeleteDoc {
terms: HashMap<String, String>,
}

#[derive(Deserialize, Debug)]
#[serde(untagged)]
pub enum FieldValues {
Expand All @@ -36,6 +43,11 @@ pub struct IndexHandler {
catalog: Arc<RwLock<IndexCatalog>>,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct DocsAffected {
docs_affected: u32,
}

impl RefUnwindSafe for IndexHandler {}

impl IndexHandler {
Expand All @@ -48,6 +60,44 @@ impl IndexHandler {
}
}

fn delete_request(self, mut state: State, index_path: IndexPath) -> Box<HandlerFuture> {
if self.catalog.read().unwrap().exists(&index_path.index) {
let f = Body::take_from(&mut state).concat2().then(move |body| match body {
Ok(b) => {
let t: DeleteDoc = match serde_json::from_slice(&b) {
Ok(v) => v,
Err(e) => return handle_error(state, &Error::IOError(e.to_string())),
};
let docs_affected: u32;
{
let index_lock = self.catalog.read().unwrap();
let index = index_lock.get_index(&index_path.index).unwrap();
let index_schema = index.schema();
let mut index_writer = index.writer(SETTINGS.writer_memory).unwrap();

for (field, value) in t.terms {
let f = match index_schema.get_field(&field) {
Some(v) => v,
None => return handle_error(state, &Error::UnknownIndexField(field)),
};
let term = Term::from_field_text(f, &value);
index_writer.delete_term(term);
}
index_writer.commit().unwrap();
docs_affected = index.load_metas().unwrap().segments.iter().map(|seg| seg.num_deleted_docs()).sum();
}
let affected = to_json(DocsAffected { docs_affected }, true);
let resp = create_response(&state, StatusCode::Ok, affected);
future::ok((state, resp))
}
Err(ref e) => handle_error(state, e),
});
Box::new(f)
} else {
Box::new(handle_error(state, &Error::UnknownIndex(index_path.index)))
}
}

fn add_to_document(schema: &Schema, field: FieldValues, doc: &mut Document) -> Result<()> {
match field {
FieldValues::StrField { field, value } => add_field!(add_text, schema, doc, field, &value),
Expand All @@ -61,56 +111,60 @@ impl Handler for IndexHandler {
fn handle(mut self, mut state: State) -> Box<HandlerFuture> {
let url_index = IndexPath::try_take_from(&mut state);
match url_index {
Some(ui) => {
if self.catalog.read().unwrap().exists(&ui.index) {
let f = Body::take_from(&mut state).concat2().then(move |body| match body {
Ok(b) => {
let t: IndexDoc = serde_json::from_slice(&b).unwrap();
info!("{:?}", t);
{
let index_lock = self.catalog.read().unwrap();
let index = index_lock.get_index(&ui.index).unwrap();
let index_schema = index.schema();
let mut index_writer = index.writer(SETTINGS.writer_memory).unwrap();
let mut doc = Document::new();
for field in t.fields {
match IndexHandler::add_to_document(&index_schema, field, &mut doc) {
Ok(_) => {}
Err(ref e) => return handle_error(state, e),
Some(ui) => match *Method::borrow_from(&state) {
Method::Delete => self.delete_request(state, ui),
Method::Put => {
if self.catalog.read().unwrap().exists(&ui.index) {
let f = Body::take_from(&mut state).concat2().then(move |body| match body {
Ok(b) => {
let t: IndexDoc = serde_json::from_slice(&b).unwrap();
info!("{:?}", t);
{
let index_lock = self.catalog.read().unwrap();
let index = index_lock.get_index(&ui.index).unwrap();
let index_schema = index.schema();
let mut index_writer = index.writer(SETTINGS.writer_memory).unwrap();
let mut doc = Document::new();
for field in t.fields {
match IndexHandler::add_to_document(&index_schema, field, &mut doc) {
Ok(_) => {}
Err(ref e) => return handle_error(state, e),
}
}
index_writer.add_document(doc);
index_writer.commit().unwrap();
}
index_writer.add_document(doc);
index_writer.commit().unwrap();
let resp = create_response(&state, StatusCode::Created, None);
future::ok((state, resp))
}
let resp = create_response(&state, StatusCode::Created, None);
future::ok((state, resp))
}
Err(ref e) => handle_error(state, e),
});
Box::new(f)
} else {
let f = Body::take_from(&mut state).concat2().then(move |body| match body {
Ok(b) => {
let schema: Schema = match serde_json::from_slice(&b) {
Ok(v) => v,
Err(ref e) => return handle_error(state, e),
};
let mut index_path = self.catalog.read().unwrap().base_path().clone();
index_path.push(&ui.index);
if !index_path.exists() {
fs::create_dir(&index_path).unwrap()
}
let new_index = Index::create_in_dir(index_path, schema).unwrap();
self.add_index(ui.index, new_index);
Err(ref e) => handle_error(state, e),
});
Box::new(f)
} else {
let f = Body::take_from(&mut state).concat2().then(move |body| match body {
Ok(b) => {
let schema: Schema = match serde_json::from_slice(&b) {
Ok(v) => v,
Err(ref e) => return handle_error(state, e),
};
let mut index_path = self.catalog.read().unwrap().base_path().clone();
index_path.push(&ui.index);
if !index_path.exists() {
fs::create_dir(&index_path).unwrap()
}
let new_index = Index::create_in_dir(index_path, schema).unwrap();
self.add_index(ui.index, new_index);

let resp = create_response(&state, StatusCode::Created, None);
future::ok((state, resp))
}
Err(ref e) => handle_error(state, e),
});
Box::new(f)
let resp = create_response(&state, StatusCode::Created, None);
future::ok((state, resp))
}
Err(ref e) => handle_error(state, e),
});
Box::new(f)
}
}
}
_ => unreachable!(),
},
None => Box::new(handle_error(state, &Error::UnknownIndex("No valid index in path".to_string()))),
}
}
Expand All @@ -121,6 +175,7 @@ new_handler!(IndexHandler);
#[cfg(test)]
mod tests {
use super::*;
use hyper::header::ContentType;
use index::tests::*;

#[test]
Expand Down Expand Up @@ -205,4 +260,43 @@ mod tests {

assert_eq!(response.status(), StatusCode::Created);
}

#[test]
fn test_doc_delete() {
let idx = create_test_index();
let catalog = IndexCatalog::with_index("test_index".to_string(), idx).unwrap();
let test_server = create_test_client(&Arc::new(RwLock::new(catalog)));

let body = r#"{ "terms": {"test_text": "document"} }"#;

let response = test_server
.delete("http://localhost/test_index")
.with_body(body)
.with_header(ContentType(mime::APPLICATION_JSON))
.perform()
.unwrap();

assert_eq!(response.status(), StatusCode::Ok);

let docs: DocsAffected = serde_json::from_slice(&response.read_body().unwrap()).unwrap();
assert_eq!(docs.docs_affected, 3);
}

#[test]
fn test_bad_json() {
let idx = create_test_index();
let catalog = IndexCatalog::with_index("test_index".to_string(), idx).unwrap();
let test_server = create_test_client(&Arc::new(RwLock::new(catalog)));

let body = r#"{ "test_text": "document" }"#;

let response = test_server
.delete("http://localhost/test_index")
.with_body(body)
.with_header(ContentType(mime::APPLICATION_JSON))
.perform()
.unwrap();

assert_eq!(response.status(), StatusCode::BadRequest);
}
}
4 changes: 2 additions & 2 deletions src/router.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use gotham::router::builder::*;
use gotham::router::Router;
use hyper::{Get, Post, Put};
use hyper::{Delete, Get, Post, Put};

use handlers::*;
use index::IndexCatalog;
Expand All @@ -27,7 +27,7 @@ pub fn router_with_catalog(catalog: &Arc<RwLock<IndexCatalog>>) -> Router {
build_simple_router(|route| {
route.get("/").to_new_handler(root_handler);
router_builder!(route, vec![Post, Get], "/:index", search_handler);
router_builder!(route, vec![Put], "/:index", index_handler);
router_builder!(route, vec![Put, Delete], "/:index", index_handler);
router_builder!(route, vec![Post], "/:index/_bulk", bulk_handler);
router_builder!(route, vec![Get], "/:index/_summary", summary_handler);
})
Expand Down

0 comments on commit 1f0b227

Please sign in to comment.