Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Kanal test #16

Open
wants to merge 1 commit into
base: feature/summa-fixed
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ rust-version = "1.62"

[dependencies]
futures = "0.3.25"
oneshot = "0.1.5"
base64 = "0.21.0"
byteorder = "1.4.3"
crc32fast = "1.3.2"
Expand All @@ -38,6 +37,7 @@ fs2 = { version = "0.4.3", optional = true }
levenshtein_automata = "0.2.1"
uuid = { version = "1.0.0", features = ["v4", "serde"] }
crossbeam-channel = "0.5.4"
kanal = "0.1.0-pre8"
rust-stemmers = "1.2.0"
downcast-rs = "1.2.0"
bitpacking = { version = "0.8.4", default-features = false, features = ["bitpacker4x"] }
Expand Down
2 changes: 1 addition & 1 deletion src/core/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl Executor {
let args: Vec<A> = args.collect();
let num_fruits = args.len();
let fruit_receiver = {
let (fruit_sender, fruit_receiver) = crossbeam_channel::unbounded();
let (fruit_sender, fruit_receiver) = kanal::unbounded();
pool.scope(|scope| {
for (idx, arg) in args.into_iter().enumerate() {
// We name references for f and fruit_sender_ref because we do not
Expand Down
4 changes: 2 additions & 2 deletions src/core/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,7 @@ mod tests {
.try_into()?;
assert_eq!(reader.searcher().num_docs(), 0);
writer.add_document(doc!(field=>1u64))?;
let (sender, receiver) = crossbeam_channel::unbounded();
let (sender, receiver) = kanal::unbounded();
let _handle = index.directory_mut().watch(WatchCallback::new(move || {
let _ = sender.send(());
}));
Expand Down Expand Up @@ -973,7 +973,7 @@ mod tests {
reader: &IndexReader,
) -> crate::Result<()> {
let mut reader_index = reader.index();
let (sender, receiver) = crossbeam_channel::unbounded();
let (sender, receiver) = kanal::unbounded();
let _watch_handle = reader_index
.directory_mut()
.watch(WatchCallback::new(move || {
Expand Down
2 changes: 1 addition & 1 deletion src/directory/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
fn underlying_directory(&self) -> Option<&dyn Directory> {
None
}
fn real_directory(&self) -> &dyn Directory ;
fn real_directory(&self) -> &dyn Directory;
}

/// DirectoryClone
Expand Down
4 changes: 2 additions & 2 deletions src/directory/file_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ mod tests {
let tmp_file = tmp_dir.path().join("watched.txt");

let counter: Arc<AtomicUsize> = Default::default();
let (tx, rx) = crossbeam_channel::unbounded();
let (tx, rx) = kanal::unbounded();
let timeout = Duration::from_millis(100);

let watcher = FileWatcher::new(&tmp_file);
Expand Down Expand Up @@ -157,7 +157,7 @@ mod tests {
let tmp_file = tmp_dir.path().join("watched.txt");

let counter: Arc<AtomicUsize> = Default::default();
let (tx, rx) = crossbeam_channel::unbounded();
let (tx, rx) = kanal::unbounded();
let timeout = Duration::from_millis(100);

let watcher = FileWatcher::new(&tmp_file);
Expand Down
2 changes: 1 addition & 1 deletion src/directory/mmap_directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ impl Directory for MmapDirectory {
fn as_any(&self) -> &dyn Any {
self
}
fn real_directory(&self) -> &dyn Directory {
fn real_directory(&self) -> &dyn Directory {
self
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/directory/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ fn test_directory_delete(directory: &dyn Directory) -> crate::Result<()> {

fn test_watch(directory: &dyn Directory) {
let counter: Arc<AtomicUsize> = Default::default();
let (tx, rx) = crossbeam_channel::unbounded();
let (tx, rx) = kanal::unbounded();
let timeout = Duration::from_millis(500);

let handle = directory
Expand Down Expand Up @@ -242,7 +242,7 @@ fn test_lock_blocking(directory: &dyn Directory) {
assert!(lock_a_res.is_ok());
let in_thread = Arc::new(AtomicBool::default());
let in_thread_clone = in_thread.clone();
let (sender, receiver) = oneshot::channel();
let (sender, receiver) = kanal::oneshot();
std::thread::spawn(move || {
//< lock_a_res is sent to the thread.
in_thread_clone.store(true, SeqCst);
Expand All @@ -260,7 +260,7 @@ fn test_lock_blocking(directory: &dyn Directory) {
assert!(lock_a_res.is_err());
}
let directory_clone = directory.box_clone();
let (sender2, receiver2) = oneshot::channel();
let (sender2, receiver2) = kanal::oneshot();
let join_handle = std::thread::spawn(move || {
assert!(sender2.send(()).is_ok());
let lock_a_res = directory_clone.acquire_lock(&Lock {
Expand Down
46 changes: 7 additions & 39 deletions src/future_result.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
use std::future::Future;
use std::pin::Pin;
use std::task::Poll;

use crate::TantivyError;

/// `FutureResult` is a handle that makes it possible to wait for the completion
Expand All @@ -21,7 +17,7 @@ pub struct FutureResult<T> {
enum Inner<T> {
FailedBeforeStart(Option<TantivyError>),
InProgress {
receiver: oneshot::Receiver<crate::Result<T>>,
receiver: kanal::OneshotReceiver<crate::Result<T>>,
error_msg_if_failure: &'static str,
},
}
Expand All @@ -37,8 +33,8 @@ impl<T> From<TantivyError> for FutureResult<T> {
impl<T> FutureResult<T> {
pub(crate) fn create(
error_msg_if_failure: &'static str,
) -> (Self, oneshot::Sender<crate::Result<T>>) {
let (sender, receiver) = oneshot::channel();
) -> (Self, kanal::OneshotSender<crate::Result<T>>) {
let (sender, receiver) = kanal::oneshot();
let inner: Inner<T> = Inner::InProgress {
receiver,
error_msg_if_failure,
Expand All @@ -64,43 +60,15 @@ impl<T> FutureResult<T> {
}
}

impl<T> Future for FutureResult<T> {
type Output = crate::Result<T>;

fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
unsafe {
match &mut Pin::get_unchecked_mut(self).inner {
Inner::FailedBeforeStart(err) => Poll::Ready(Err(err.take().unwrap())),
Inner::InProgress {
receiver,
error_msg_if_failure,
} => match Future::poll(Pin::new_unchecked(receiver), cx) {
Poll::Ready(oneshot_res) => {
let res = oneshot_res.unwrap_or_else(|_| {
Err(crate::TantivyError::SystemError(
error_msg_if_failure.to_string(),
))
});
Poll::Ready(res)
}
Poll::Pending => Poll::Pending,
},
}
}
}
}

#[cfg(test)]
mod tests {
use futures::executor::block_on;

use super::FutureResult;
use crate::TantivyError;

#[test]
fn test_scheduled_result_failed_to_schedule() {
let scheduled_result: FutureResult<()> = FutureResult::from(TantivyError::Poisoned);
let res = block_on(scheduled_result);
let res = scheduled_result.wait();
assert!(matches!(res, Err(TantivyError::Poisoned)));
}

Expand All @@ -109,22 +77,22 @@ mod tests {
fn test_scheduled_result_error() {
let (scheduled_result, tx): (FutureResult<()>, _) = FutureResult::create("failed");
drop(tx);
let res = block_on(scheduled_result);
let res = scheduled_result.wait();
assert!(matches!(res, Err(TantivyError::SystemError(_))));
}

#[test]
fn test_scheduled_result_sent_success() {
let (scheduled_result, tx): (FutureResult<u64>, _) = FutureResult::create("failed");
tx.send(Ok(2u64)).unwrap();
assert_eq!(block_on(scheduled_result).unwrap(), 2u64);
assert_eq!(scheduled_result.wait().unwrap(), 2u64);
}

#[test]
fn test_scheduled_result_sent_error() {
let (scheduled_result, tx): (FutureResult<u64>, _) = FutureResult::create("failed");
tx.send(Err(TantivyError::Poisoned)).unwrap();
let res = block_on(scheduled_result);
let res = scheduled_result.wait();
assert!(matches!(res, Err(TantivyError::Poisoned)));
}
}
8 changes: 4 additions & 4 deletions src/indexer/index_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ impl IndexWriter {
return Err(TantivyError::InvalidArgument(err_msg));
}
let (document_sender, document_receiver): (AddBatchSender, AddBatchReceiver) =
crossbeam_channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS);
kanal::bounded(PIPELINE_MAX_SIZE_IN_DOCS);

let delete_queue = DeleteQueue::new();

Expand Down Expand Up @@ -334,7 +334,7 @@ impl IndexWriter {
}

fn drop_sender(&mut self) {
let (sender, _receiver) = crossbeam_channel::bounded(1);
let (sender, _receiver) = kanal::bounded(1);
self.operation_sender = sender;
}

Expand Down Expand Up @@ -559,7 +559,7 @@ impl IndexWriter {
/// Returns the former segment_ready channel.
fn recreate_document_channel(&mut self) {
let (document_sender, document_receiver): (AddBatchSender, AddBatchReceiver) =
crossbeam_channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS);
kanal::bounded(PIPELINE_MAX_SIZE_IN_DOCS);
self.operation_sender = document_sender;
self.index_writer_status = IndexWriterStatus::from(document_receiver);
}
Expand Down Expand Up @@ -842,7 +842,7 @@ impl IndexWriter {
return Err(TantivyError::InvalidArgument(err_msg));
}
let (document_sender, document_receiver): (AddBatchSender, AddBatchReceiver) =
crossbeam_channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS);
kanal::bounded(PIPELINE_MAX_SIZE_IN_DOCS);

let delete_queue = DeleteQueue::new();

Expand Down
2 changes: 1 addition & 1 deletion src/indexer/index_writer_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl Drop for IndexWriterBomb {
mod tests {
use std::mem;

use crossbeam_channel as channel;
use kanal as channel;

use super::IndexWriterStatus;

Expand Down
2 changes: 1 addition & 1 deletion src/indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ mod sorted_doc_id_column;
mod sorted_doc_id_multivalue_column;
mod stamper;

use crossbeam_channel as channel;
use kanal as channel;
use smallvec::SmallVec;

pub use self::index_writer::IndexWriter;
Expand Down