From 1f69f3b4875ed8ebdf34ace0bd9ff1d55ed407ef Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Fri, 10 May 2024 10:57:29 +0800 Subject: [PATCH] fix test --- quickwit/quickwit-search/src/root.rs | 234 +++++++++++++++++++++++---- 1 file changed, 202 insertions(+), 32 deletions(-) diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index db763395c6..76af17da57 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -3834,9 +3834,13 @@ mod tests { let splits_response = ListSplitsResponse::try_from_splits(splits).unwrap(); Ok(ServiceStream::from(vec![Ok(splits_response)])) }); - let mut mock_search_service = MockSearchService::new(); - mock_search_service.expect_leaf_search().times(2).returning( - |req: quickwit_proto::search::LeafSearchRequest| { + // We add two mock_search_service to simulate a multi node environment, where the requests + // are forwarded two node. + let mut mock_search_service1 = MockSearchService::new(); + mock_search_service1 + .expect_leaf_search() + .times(1) + .returning(|req: quickwit_proto::search::LeafSearchRequest| { let search_req = req.search_request.unwrap(); // the leaf request does not need to know about the scroll_ttl. assert_eq!(search_req.start_offset, 0u64); @@ -3849,10 +3853,11 @@ mod tests { ..(search_req.start_offset + search_req.max_hits) as usize, search_req.search_after, )) - }, - ); - mock_search_service.expect_leaf_search().times(2).returning( - |req: quickwit_proto::search::LeafSearchRequest| { + }); + mock_search_service1 + .expect_leaf_search() + .times(1) + .returning(|req: quickwit_proto::search::LeafSearchRequest| { let search_req = req.search_request.unwrap(); // the leaf request does not need to know about the scroll_ttl. assert_eq!(search_req.start_offset, 0u64); @@ -3865,10 +3870,11 @@ mod tests { ..(search_req.start_offset + search_req.max_hits) as usize, search_req.search_after, )) - }, - ); - mock_search_service.expect_leaf_search().times(2).returning( - |req: quickwit_proto::search::LeafSearchRequest| { + }); + mock_search_service1 + .expect_leaf_search() + .times(1) + .returning(|req: quickwit_proto::search::LeafSearchRequest| { let search_req = req.search_request.unwrap(); // the leaf request does not need to know about the scroll_ttl. assert_eq!(search_req.start_offset, 0u64); @@ -3881,11 +3887,64 @@ mod tests { ..(search_req.start_offset + search_req.max_hits) as usize, search_req.search_after, )) - }, - ); + }); + + let mut mock_search_service2 = MockSearchService::new(); + mock_search_service2 + .expect_leaf_search() + .times(1) + .returning(|req: quickwit_proto::search::LeafSearchRequest| { + let search_req = req.search_request.unwrap(); + // the leaf request does not need to know about the scroll_ttl. + assert_eq!(search_req.start_offset, 0u64); + assert!(search_req.scroll_ttl_secs.is_none()); + assert_eq!(search_req.max_hits as usize, SCROLL_BATCH_LEN); + assert!(search_req.search_after.is_none()); + Ok(create_search_resp( + &req.index_uris[0], + search_req.start_offset as usize + ..(search_req.start_offset + search_req.max_hits) as usize, + search_req.search_after, + )) + }); + mock_search_service2 + .expect_leaf_search() + .times(1) + .returning(|req: quickwit_proto::search::LeafSearchRequest| { + let search_req = req.search_request.unwrap(); + // the leaf request does not need to know about the scroll_ttl. + assert_eq!(search_req.start_offset, 0u64); + assert!(search_req.scroll_ttl_secs.is_none()); + assert_eq!(search_req.max_hits as usize, SCROLL_BATCH_LEN); + assert!(search_req.search_after.is_some()); + Ok(create_search_resp( + &req.index_uris[0], + search_req.start_offset as usize + ..(search_req.start_offset + search_req.max_hits) as usize, + search_req.search_after, + )) + }); + mock_search_service2 + .expect_leaf_search() + .times(1) + .returning(|req: quickwit_proto::search::LeafSearchRequest| { + let search_req = req.search_request.unwrap(); + // the leaf request does not need to know about the scroll_ttl. + assert_eq!(search_req.start_offset, 0u64); + assert!(search_req.scroll_ttl_secs.is_none()); + assert_eq!(search_req.max_hits as usize, SCROLL_BATCH_LEN); + assert!(search_req.search_after.is_some()); + Ok(create_search_resp( + &req.index_uris[0], + search_req.start_offset as usize + ..(search_req.start_offset + search_req.max_hits) as usize, + search_req.search_after, + )) + }); + let kv: Arc, Vec>>> = Default::default(); let kv_clone = kv.clone(); - mock_search_service + mock_search_service1 .expect_put_kv() .returning(move |put_kv_req| { kv_clone @@ -3893,10 +3952,25 @@ mod tests { .unwrap() .insert(put_kv_req.key, put_kv_req.payload); }); - mock_search_service + mock_search_service1 .expect_get_kv() .returning(move |get_kv_req| kv.read().unwrap().get(&get_kv_req.key).cloned()); - mock_search_service.expect_fetch_docs().returning( + + let kv: Arc, Vec>>> = Default::default(); + let kv_clone = kv.clone(); + mock_search_service2 + .expect_put_kv() + .returning(move |put_kv_req| { + kv_clone + .write() + .unwrap() + .insert(put_kv_req.key, put_kv_req.payload); + }); + mock_search_service2 + .expect_get_kv() + .returning(move |get_kv_req| kv.read().unwrap().get(&get_kv_req.key).cloned()); + + mock_search_service1.expect_fetch_docs().returning( |fetch_docs_req: quickwit_proto::search::FetchDocsRequest| { assert!(fetch_docs_req.partial_hits.len() <= MAX_HITS_PER_PAGE); Ok(quickwit_proto::search::FetchDocsResponse { @@ -3904,7 +3978,20 @@ mod tests { }) }, ); - let searcher_pool = searcher_pool_for_test([("127.0.0.1:1001", mock_search_service)]); + + mock_search_service2.expect_fetch_docs().returning( + |fetch_docs_req: quickwit_proto::search::FetchDocsRequest| { + assert!(fetch_docs_req.partial_hits.len() <= MAX_HITS_PER_PAGE); + Ok(quickwit_proto::search::FetchDocsResponse { + hits: get_doc_for_fetch_req(fetch_docs_req), + }) + }, + ); + + let searcher_pool = searcher_pool_for_test([ + ("127.0.0.1:1001", mock_search_service1), + ("127.0.0.1:1002", mock_search_service2), + ]); let search_job_placer = SearchJobPlacer::new(searcher_pool); let searcher_context = SearcherContext::for_test(); let cluster_client = ClusterClient::new(search_job_placer.clone()); @@ -4017,9 +4104,13 @@ mod tests { let splits_response = ListSplitsResponse::try_from_splits(splits).unwrap(); Ok(ServiceStream::from(vec![Ok(splits_response)])) }); - let mut mock_search_service = MockSearchService::new(); - mock_search_service.expect_leaf_search().times(2).returning( - |req: quickwit_proto::search::LeafSearchRequest| { + // We add two mock_search_service to simulate a multi node environment, where the requests + // are forwarded two nodes. + let mut mock_search_service1 = MockSearchService::new(); + mock_search_service1 + .expect_leaf_search() + .times(1) + .returning(|req: quickwit_proto::search::LeafSearchRequest| { let search_req = req.search_request.unwrap(); // the leaf request does not need to know about the scroll_ttl. assert_eq!(search_req.start_offset, 0u64); @@ -4032,10 +4123,11 @@ mod tests { ..(search_req.start_offset + search_req.max_hits) as usize, search_req.search_after, )) - }, - ); - mock_search_service.expect_leaf_search().times(2).returning( - |req: quickwit_proto::search::LeafSearchRequest| { + }); + mock_search_service1 + .expect_leaf_search() + .times(1) + .returning(|req: quickwit_proto::search::LeafSearchRequest| { let search_req = req.search_request.unwrap(); // the leaf request does not need to know about the scroll_ttl. assert_eq!(search_req.start_offset, 0u64); @@ -4048,10 +4140,11 @@ mod tests { ..(search_req.start_offset + search_req.max_hits) as usize, search_req.search_after, )) - }, - ); - mock_search_service.expect_leaf_search().times(2).returning( - |req: quickwit_proto::search::LeafSearchRequest| { + }); + mock_search_service1 + .expect_leaf_search() + .times(1) + .returning(|req: quickwit_proto::search::LeafSearchRequest| { let search_req = req.search_request.unwrap(); // the leaf request does not need to know about the scroll_ttl. assert_eq!(search_req.start_offset, 0u64); @@ -4064,11 +4157,84 @@ mod tests { ..(search_req.start_offset + search_req.max_hits) as usize, search_req.search_after, )) + }); + let kv: Arc, Vec>>> = Default::default(); + let kv_clone = kv.clone(); + mock_search_service1 + .expect_put_kv() + .returning(move |put_kv_req| { + kv_clone + .write() + .unwrap() + .insert(put_kv_req.key, put_kv_req.payload); + }); + mock_search_service1 + .expect_get_kv() + .returning(move |get_kv_req| kv.read().unwrap().get(&get_kv_req.key).cloned()); + mock_search_service1.expect_fetch_docs().returning( + |fetch_docs_req: quickwit_proto::search::FetchDocsRequest| { + assert!(fetch_docs_req.partial_hits.len() <= MAX_HITS_PER_PAGE_LARGE); + Ok(quickwit_proto::search::FetchDocsResponse { + hits: get_doc_for_fetch_req(fetch_docs_req), + }) }, ); + + let mut mock_search_service2 = MockSearchService::new(); + mock_search_service2 + .expect_leaf_search() + .times(1) + .returning(|req: quickwit_proto::search::LeafSearchRequest| { + let search_req = req.search_request.unwrap(); + // the leaf request does not need to know about the scroll_ttl. + assert_eq!(search_req.start_offset, 0u64); + assert!(search_req.scroll_ttl_secs.is_none()); + assert_eq!(search_req.max_hits as usize, MAX_HITS_PER_PAGE_LARGE); + assert!(search_req.search_after.is_none()); + Ok(create_search_resp( + &req.index_uris[0], + search_req.start_offset as usize + ..(search_req.start_offset + search_req.max_hits) as usize, + search_req.search_after, + )) + }); + mock_search_service2 + .expect_leaf_search() + .times(1) + .returning(|req: quickwit_proto::search::LeafSearchRequest| { + let search_req = req.search_request.unwrap(); + // the leaf request does not need to know about the scroll_ttl. + assert_eq!(search_req.start_offset, 0u64); + assert!(search_req.scroll_ttl_secs.is_none()); + assert_eq!(search_req.max_hits as usize, MAX_HITS_PER_PAGE_LARGE); + assert!(search_req.search_after.is_some()); + Ok(create_search_resp( + &req.index_uris[0], + search_req.start_offset as usize + ..(search_req.start_offset + search_req.max_hits) as usize, + search_req.search_after, + )) + }); + mock_search_service2 + .expect_leaf_search() + .times(1) + .returning(|req: quickwit_proto::search::LeafSearchRequest| { + let search_req = req.search_request.unwrap(); + // the leaf request does not need to know about the scroll_ttl. + assert_eq!(search_req.start_offset, 0u64); + assert!(search_req.scroll_ttl_secs.is_none()); + assert_eq!(search_req.max_hits as usize, MAX_HITS_PER_PAGE_LARGE); + assert!(search_req.search_after.is_some()); + Ok(create_search_resp( + &req.index_uris[0], + search_req.start_offset as usize + ..(search_req.start_offset + search_req.max_hits) as usize, + search_req.search_after, + )) + }); let kv: Arc, Vec>>> = Default::default(); let kv_clone = kv.clone(); - mock_search_service + mock_search_service2 .expect_put_kv() .returning(move |put_kv_req| { kv_clone @@ -4076,10 +4242,10 @@ mod tests { .unwrap() .insert(put_kv_req.key, put_kv_req.payload); }); - mock_search_service + mock_search_service2 .expect_get_kv() .returning(move |get_kv_req| kv.read().unwrap().get(&get_kv_req.key).cloned()); - mock_search_service.expect_fetch_docs().returning( + mock_search_service2.expect_fetch_docs().returning( |fetch_docs_req: quickwit_proto::search::FetchDocsRequest| { assert!(fetch_docs_req.partial_hits.len() <= MAX_HITS_PER_PAGE_LARGE); Ok(quickwit_proto::search::FetchDocsResponse { @@ -4087,7 +4253,11 @@ mod tests { }) }, ); - let searcher_pool = searcher_pool_for_test([("127.0.0.1:1001", mock_search_service)]); + + let searcher_pool = searcher_pool_for_test([ + ("127.0.0.1:1001", mock_search_service1), + ("127.0.0.1:1002", mock_search_service2), + ]); let search_job_placer = SearchJobPlacer::new(searcher_pool); let searcher_context = SearcherContext::for_test(); let cluster_client = ClusterClient::new(search_job_placer.clone());