-
-
Notifications
You must be signed in to change notification settings - Fork 673
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
use separate thread to compress block store #1389
Conversation
b4d43c2
to
4320c72
Compare
Codecov Report
@@ Coverage Diff @@
## main #1389 +/- ##
==========================================
+ Coverage 94.29% 94.30% +0.01%
==========================================
Files 236 236
Lines 43418 43471 +53
==========================================
+ Hits 40942 40997 +55
+ Misses 2476 2474 -2
Continue to review full report at Codecov.
|
4320c72
to
56f9fc9
Compare
src/store/writer.rs
Outdated
) -> io::Result<StoreWriter> { | ||
let thread_builder = thread::Builder::new().name("docstore compressor thread".to_string()); | ||
|
||
// Data channel to send fs writes, to write only from current thread |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
interesting... why do we want to write only from current thread?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even though in tantivy we create a separate file, which is fine for a separate thread to write into, the Directory
trait itself doesn't require explicitly thread safe writes. TerminatingWrite
is not Send, so the current contract is that the writers stay on the same thread.
It's unlikely to be an issue, but it could be two threads write next to each other at the same page or cache line. So depending on the write target and buffer synchronization between the threads, it could be that they overwrite each others data. Since rust doesn't cover race conditions outside it's memory model, e.g. Files, I'm extra careful there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense. Can we make TerminatingWrite: Send
and simplify the code though?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah so you just offload compression to a different thread. Writing is still done in the same place. Maybe it is clearer that way let me keep on reading on.
A bit of analyis... We write the docstore Threading can help with 2 things The current approach implementation will not help with B, because the IO still happens in the same thread as the original. 2a and 2b are not as important because they do starve us on resource, and they do not impact time to search. (Finishing a merge quickly by using more cores is not very important). They could help resource management on quickwit however. (If all task take clearly one full thread, we can more easily rely on our own scheduling and size our task thread pool by the number of cores). I think we want to at least do the IO on the thread that does the compression. |
Please test on a dataset that has trigger merges (wikipedia is fine), and rely on the sstable dict. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comments on the Conversation tab.
#1389
I tested merge operation on hdfs 14GB(2,4GB index size, 4 segments) and wikipedia 8GB (5,7GB index size, 5 segment) with sstable. It seems to be marginal faster. Merge throughput is ~50MB/s hdfs and 75MB/s wikipedia (measured on index size, not input size), which is slower than most disks. On indexing, no noticeable speed was observed. I think the impact is too small to change the API (adding Send to TerminatingWrite). Although another upside would be that the code would be simpler.
|
@PSeitz Thanks for investigating! That makes sense. We don't flush or anything, so the write are just pushing the data to OS buffer, and the actual write to disk will be done asynchronously by the OS, provided our throughput does not beat the hardware. Anyway, can you move the write to a different thread, if only to simplify the code? |
} | ||
|
||
/// Flushes current uncompressed block and sends to compressor. | ||
fn send_current_block_to_compressor(&mut self) -> io::Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can return the SendError directly here.
See discussion on call site.
let start_shift = self.writer.written_bytes() as usize; | ||
pub fn stack(&mut self, store_reader: StoreReader) -> io::Result<()> { | ||
// We flush the current block first before stacking | ||
self.send_current_block_to_compressor()?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both errors are actually errors on the compressing thread.
If there is an error, could we join the companion thread and return
- its io::Error if it has one
- a custome io::Error if it panicked.
The code that join/harvest the error could be factorized in an independant method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's a good idea, I don't like the error handling here currently, it's not deterministic which error is returned .. but to join the thread we need to consume self
. We could swap it with another handle or put it in an option, I don't really like either of those
self.send_current_block_to_compressor()?; | ||
drop(self.compressor_sender); | ||
|
||
self.compressor_thread_handle |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approved but please have a look at the change suggeston and the error handling suggestion. The latter is very optional or can be done later.
Use seperate thread to compress block store for increased indexing performance. This allows to use slower compressors with higher compression ratio, with less or no perfomance impact (with enough cores). A seperate thread is spawned to compress the docstore, which handles single blocks and stacking from other docstores. The spawned compressor thread does not write, instead it sends back the compressed data. This is done in order to avoid writing multithreaded on the same file.
Co-authored-by: Paul Masurel <paul@quickwit.io>
Co-authored-by: Paul Masurel <paul@quickwit.io>
Co-authored-by: Paul Masurel <paul@quickwit.io>
Co-authored-by: Paul Masurel <paul@quickwit.io>
Co-authored-by: Paul Masurel <paul@quickwit.io>
Co-authored-by: Paul Masurel <paul@quickwit.io>
Co-authored-by: Paul Masurel <paul@quickwit.io>
Co-authored-by: Paul Masurel <paul@quickwit.io>
d06d9c0
to
9baefbe
Compare
Use seperate thread to compress block store for increased indexing performance. This allows to use slower compressors with higher compression ratio, with less or no perfomance impact (with enough cores).
A seperate thread is spawned to compress the docstore, which handles single blocks and stacking from other docstores.
The spawned compressor thread does not write, instead it sends back the compressed data. This is done in order to avoid writing multithreaded on the same file.
Small benchmark 1GB hdfs, zstd level 8