Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
PSeitz committed May 10, 2024
1 parent bb1e7dd commit 1f69f3b
Showing 1 changed file with 202 additions and 32 deletions.
234 changes: 202 additions & 32 deletions quickwit/quickwit-search/src/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3834,9 +3834,13 @@ mod tests {
let splits_response = ListSplitsResponse::try_from_splits(splits).unwrap();
Ok(ServiceStream::from(vec![Ok(splits_response)]))
});
let mut mock_search_service = MockSearchService::new();
mock_search_service.expect_leaf_search().times(2).returning(
|req: quickwit_proto::search::LeafSearchRequest| {
// We add two mock_search_service to simulate a multi node environment, where the requests
// are forwarded two node.
let mut mock_search_service1 = MockSearchService::new();
mock_search_service1
.expect_leaf_search()
.times(1)
.returning(|req: quickwit_proto::search::LeafSearchRequest| {
let search_req = req.search_request.unwrap();
// the leaf request does not need to know about the scroll_ttl.
assert_eq!(search_req.start_offset, 0u64);
Expand All @@ -3849,10 +3853,11 @@ mod tests {
..(search_req.start_offset + search_req.max_hits) as usize,
search_req.search_after,
))
},
);
mock_search_service.expect_leaf_search().times(2).returning(
|req: quickwit_proto::search::LeafSearchRequest| {
});
mock_search_service1
.expect_leaf_search()
.times(1)
.returning(|req: quickwit_proto::search::LeafSearchRequest| {
let search_req = req.search_request.unwrap();
// the leaf request does not need to know about the scroll_ttl.
assert_eq!(search_req.start_offset, 0u64);
Expand All @@ -3865,10 +3870,11 @@ mod tests {
..(search_req.start_offset + search_req.max_hits) as usize,
search_req.search_after,
))
},
);
mock_search_service.expect_leaf_search().times(2).returning(
|req: quickwit_proto::search::LeafSearchRequest| {
});
mock_search_service1
.expect_leaf_search()
.times(1)
.returning(|req: quickwit_proto::search::LeafSearchRequest| {
let search_req = req.search_request.unwrap();
// the leaf request does not need to know about the scroll_ttl.
assert_eq!(search_req.start_offset, 0u64);
Expand All @@ -3881,30 +3887,111 @@ mod tests {
..(search_req.start_offset + search_req.max_hits) as usize,
search_req.search_after,
))
},
);
});

let mut mock_search_service2 = MockSearchService::new();
mock_search_service2
.expect_leaf_search()
.times(1)
.returning(|req: quickwit_proto::search::LeafSearchRequest| {
let search_req = req.search_request.unwrap();
// the leaf request does not need to know about the scroll_ttl.
assert_eq!(search_req.start_offset, 0u64);
assert!(search_req.scroll_ttl_secs.is_none());
assert_eq!(search_req.max_hits as usize, SCROLL_BATCH_LEN);
assert!(search_req.search_after.is_none());
Ok(create_search_resp(
&req.index_uris[0],
search_req.start_offset as usize
..(search_req.start_offset + search_req.max_hits) as usize,
search_req.search_after,
))
});
mock_search_service2
.expect_leaf_search()
.times(1)
.returning(|req: quickwit_proto::search::LeafSearchRequest| {
let search_req = req.search_request.unwrap();
// the leaf request does not need to know about the scroll_ttl.
assert_eq!(search_req.start_offset, 0u64);
assert!(search_req.scroll_ttl_secs.is_none());
assert_eq!(search_req.max_hits as usize, SCROLL_BATCH_LEN);
assert!(search_req.search_after.is_some());
Ok(create_search_resp(
&req.index_uris[0],
search_req.start_offset as usize
..(search_req.start_offset + search_req.max_hits) as usize,
search_req.search_after,
))
});
mock_search_service2
.expect_leaf_search()
.times(1)
.returning(|req: quickwit_proto::search::LeafSearchRequest| {
let search_req = req.search_request.unwrap();
// the leaf request does not need to know about the scroll_ttl.
assert_eq!(search_req.start_offset, 0u64);
assert!(search_req.scroll_ttl_secs.is_none());
assert_eq!(search_req.max_hits as usize, SCROLL_BATCH_LEN);
assert!(search_req.search_after.is_some());
Ok(create_search_resp(
&req.index_uris[0],
search_req.start_offset as usize
..(search_req.start_offset + search_req.max_hits) as usize,
search_req.search_after,
))
});

let kv: Arc<RwLock<HashMap<Vec<u8>, Vec<u8>>>> = Default::default();
let kv_clone = kv.clone();
mock_search_service
mock_search_service1
.expect_put_kv()
.returning(move |put_kv_req| {
kv_clone
.write()
.unwrap()
.insert(put_kv_req.key, put_kv_req.payload);
});
mock_search_service
mock_search_service1
.expect_get_kv()
.returning(move |get_kv_req| kv.read().unwrap().get(&get_kv_req.key).cloned());
mock_search_service.expect_fetch_docs().returning(

let kv: Arc<RwLock<HashMap<Vec<u8>, Vec<u8>>>> = Default::default();
let kv_clone = kv.clone();
mock_search_service2
.expect_put_kv()
.returning(move |put_kv_req| {
kv_clone
.write()
.unwrap()
.insert(put_kv_req.key, put_kv_req.payload);
});
mock_search_service2
.expect_get_kv()
.returning(move |get_kv_req| kv.read().unwrap().get(&get_kv_req.key).cloned());

mock_search_service1.expect_fetch_docs().returning(
|fetch_docs_req: quickwit_proto::search::FetchDocsRequest| {
assert!(fetch_docs_req.partial_hits.len() <= MAX_HITS_PER_PAGE);
Ok(quickwit_proto::search::FetchDocsResponse {
hits: get_doc_for_fetch_req(fetch_docs_req),
})
},
);
let searcher_pool = searcher_pool_for_test([("127.0.0.1:1001", mock_search_service)]);

mock_search_service2.expect_fetch_docs().returning(
|fetch_docs_req: quickwit_proto::search::FetchDocsRequest| {
assert!(fetch_docs_req.partial_hits.len() <= MAX_HITS_PER_PAGE);
Ok(quickwit_proto::search::FetchDocsResponse {
hits: get_doc_for_fetch_req(fetch_docs_req),
})
},
);

let searcher_pool = searcher_pool_for_test([
("127.0.0.1:1001", mock_search_service1),
("127.0.0.1:1002", mock_search_service2),
]);
let search_job_placer = SearchJobPlacer::new(searcher_pool);
let searcher_context = SearcherContext::for_test();
let cluster_client = ClusterClient::new(search_job_placer.clone());
Expand Down Expand Up @@ -4017,9 +4104,13 @@ mod tests {
let splits_response = ListSplitsResponse::try_from_splits(splits).unwrap();
Ok(ServiceStream::from(vec![Ok(splits_response)]))
});
let mut mock_search_service = MockSearchService::new();
mock_search_service.expect_leaf_search().times(2).returning(
|req: quickwit_proto::search::LeafSearchRequest| {
// We add two mock_search_service to simulate a multi node environment, where the requests
// are forwarded two nodes.
let mut mock_search_service1 = MockSearchService::new();
mock_search_service1
.expect_leaf_search()
.times(1)
.returning(|req: quickwit_proto::search::LeafSearchRequest| {
let search_req = req.search_request.unwrap();
// the leaf request does not need to know about the scroll_ttl.
assert_eq!(search_req.start_offset, 0u64);
Expand All @@ -4032,10 +4123,11 @@ mod tests {
..(search_req.start_offset + search_req.max_hits) as usize,
search_req.search_after,
))
},
);
mock_search_service.expect_leaf_search().times(2).returning(
|req: quickwit_proto::search::LeafSearchRequest| {
});
mock_search_service1
.expect_leaf_search()
.times(1)
.returning(|req: quickwit_proto::search::LeafSearchRequest| {
let search_req = req.search_request.unwrap();
// the leaf request does not need to know about the scroll_ttl.
assert_eq!(search_req.start_offset, 0u64);
Expand All @@ -4048,10 +4140,11 @@ mod tests {
..(search_req.start_offset + search_req.max_hits) as usize,
search_req.search_after,
))
},
);
mock_search_service.expect_leaf_search().times(2).returning(
|req: quickwit_proto::search::LeafSearchRequest| {
});
mock_search_service1
.expect_leaf_search()
.times(1)
.returning(|req: quickwit_proto::search::LeafSearchRequest| {
let search_req = req.search_request.unwrap();
// the leaf request does not need to know about the scroll_ttl.
assert_eq!(search_req.start_offset, 0u64);
Expand All @@ -4064,30 +4157,107 @@ mod tests {
..(search_req.start_offset + search_req.max_hits) as usize,
search_req.search_after,
))
});
let kv: Arc<RwLock<HashMap<Vec<u8>, Vec<u8>>>> = Default::default();
let kv_clone = kv.clone();
mock_search_service1
.expect_put_kv()
.returning(move |put_kv_req| {
kv_clone
.write()
.unwrap()
.insert(put_kv_req.key, put_kv_req.payload);
});
mock_search_service1
.expect_get_kv()
.returning(move |get_kv_req| kv.read().unwrap().get(&get_kv_req.key).cloned());
mock_search_service1.expect_fetch_docs().returning(
|fetch_docs_req: quickwit_proto::search::FetchDocsRequest| {
assert!(fetch_docs_req.partial_hits.len() <= MAX_HITS_PER_PAGE_LARGE);
Ok(quickwit_proto::search::FetchDocsResponse {
hits: get_doc_for_fetch_req(fetch_docs_req),
})
},
);

let mut mock_search_service2 = MockSearchService::new();
mock_search_service2
.expect_leaf_search()
.times(1)
.returning(|req: quickwit_proto::search::LeafSearchRequest| {
let search_req = req.search_request.unwrap();
// the leaf request does not need to know about the scroll_ttl.
assert_eq!(search_req.start_offset, 0u64);
assert!(search_req.scroll_ttl_secs.is_none());
assert_eq!(search_req.max_hits as usize, MAX_HITS_PER_PAGE_LARGE);
assert!(search_req.search_after.is_none());
Ok(create_search_resp(
&req.index_uris[0],
search_req.start_offset as usize
..(search_req.start_offset + search_req.max_hits) as usize,
search_req.search_after,
))
});
mock_search_service2
.expect_leaf_search()
.times(1)
.returning(|req: quickwit_proto::search::LeafSearchRequest| {
let search_req = req.search_request.unwrap();
// the leaf request does not need to know about the scroll_ttl.
assert_eq!(search_req.start_offset, 0u64);
assert!(search_req.scroll_ttl_secs.is_none());
assert_eq!(search_req.max_hits as usize, MAX_HITS_PER_PAGE_LARGE);
assert!(search_req.search_after.is_some());
Ok(create_search_resp(
&req.index_uris[0],
search_req.start_offset as usize
..(search_req.start_offset + search_req.max_hits) as usize,
search_req.search_after,
))
});
mock_search_service2
.expect_leaf_search()
.times(1)
.returning(|req: quickwit_proto::search::LeafSearchRequest| {
let search_req = req.search_request.unwrap();
// the leaf request does not need to know about the scroll_ttl.
assert_eq!(search_req.start_offset, 0u64);
assert!(search_req.scroll_ttl_secs.is_none());
assert_eq!(search_req.max_hits as usize, MAX_HITS_PER_PAGE_LARGE);
assert!(search_req.search_after.is_some());
Ok(create_search_resp(
&req.index_uris[0],
search_req.start_offset as usize
..(search_req.start_offset + search_req.max_hits) as usize,
search_req.search_after,
))
});
let kv: Arc<RwLock<HashMap<Vec<u8>, Vec<u8>>>> = Default::default();
let kv_clone = kv.clone();
mock_search_service
mock_search_service2
.expect_put_kv()
.returning(move |put_kv_req| {
kv_clone
.write()
.unwrap()
.insert(put_kv_req.key, put_kv_req.payload);
});
mock_search_service
mock_search_service2
.expect_get_kv()
.returning(move |get_kv_req| kv.read().unwrap().get(&get_kv_req.key).cloned());
mock_search_service.expect_fetch_docs().returning(
mock_search_service2.expect_fetch_docs().returning(
|fetch_docs_req: quickwit_proto::search::FetchDocsRequest| {
assert!(fetch_docs_req.partial_hits.len() <= MAX_HITS_PER_PAGE_LARGE);
Ok(quickwit_proto::search::FetchDocsResponse {
hits: get_doc_for_fetch_req(fetch_docs_req),
})
},
);
let searcher_pool = searcher_pool_for_test([("127.0.0.1:1001", mock_search_service)]);

let searcher_pool = searcher_pool_for_test([
("127.0.0.1:1001", mock_search_service1),
("127.0.0.1:1002", mock_search_service2),
]);
let search_job_placer = SearchJobPlacer::new(searcher_pool);
let searcher_context = SearcherContext::for_test();
let cluster_client = ClusterClient::new(search_job_placer.clone());
Expand Down

0 comments on commit 1f69f3b

Please sign in to comment.