Skip to content

Commit

Permalink
fix: IntermediateWriter closes underlying writer twice (GreptimeTeam#…
Browse files Browse the repository at this point in the history
…3248)

* fix: IntermediateWriter closes underlying writer twice

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* close writer manually on error

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
  • Loading branch information
waynexia authored and shuiyisong committed Jan 30, 2024
1 parent 207f885 commit bb63c47
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 8 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/index/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,6 @@ snafu.workspace = true

[dev-dependencies]
rand.workspace = true
tempfile.workspace = true
tokio-util.workspace = true
tokio.workspace = true
28 changes: 20 additions & 8 deletions src/index/src/inverted_index/create/sort/intermediate_rw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,14 @@ impl<W: AsyncWrite + Unpin> IntermediateWriter<W> {

let value_stream = stream::iter(values.into_iter().map(Ok));
let frame_write = FramedWrite::new(&mut self.writer, encoder);
value_stream.forward(frame_write).await?;

self.writer.flush().await.context(FlushSnafu)?;
self.writer.close().await.context(CloseSnafu)
// `forward()` will flush and close the writer when the stream ends
if let Err(e) = value_stream.forward(frame_write).await {
self.writer.flush().await.context(FlushSnafu)?;
self.writer.close().await.context(CloseSnafu)?;
return Err(e);
}

Ok(())
}
}

Expand Down Expand Up @@ -85,24 +89,32 @@ impl<R: AsyncRead + Unpin + Send + 'static> IntermediateReader<R> {

#[cfg(test)]
mod tests {
use futures::io::Cursor;
use std::io::{Seek, SeekFrom};

use futures::io::{AllowStdIo, Cursor};
use tempfile::tempfile;

use super::*;
use crate::inverted_index::error::Error;

#[tokio::test]
async fn test_intermediate_read_write_basic() {
let mut buf = vec![];
let file_r = tempfile().unwrap();
let file_w = file_r.try_clone().unwrap();
let mut buf_r = AllowStdIo::new(file_r);
let buf_w = AllowStdIo::new(file_w);

let values = BTreeMap::from_iter([
(Bytes::from("a"), BitVec::from_slice(&[0b10101010])),
(Bytes::from("b"), BitVec::from_slice(&[0b01010101])),
]);

let writer = IntermediateWriter::new(&mut buf);
let writer = IntermediateWriter::new(buf_w);
writer.write_all(values.clone()).await.unwrap();
// reset the handle
buf_r.seek(SeekFrom::Start(0)).unwrap();

let reader = IntermediateReader::new(Cursor::new(buf));
let reader = IntermediateReader::new(buf_r);
let mut stream = reader.into_stream().await.unwrap();

let a = stream.next().await.unwrap().unwrap();
Expand Down

0 comments on commit bb63c47

Please sign in to comment.