diff --git a/Cargo.lock b/Cargo.lock index fb4e4d1e6a9..33ea922f339 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -459,6 +459,7 @@ dependencies = [ "fail", "file_system", "futures 0.3.15", + "futures-io", "grpcio", "hex 0.4.2", "kvproto", diff --git a/components/backup-stream/Cargo.toml b/components/backup-stream/Cargo.toml index 9e8049e0ec0..e2b23ccf5db 100644 --- a/components/backup-stream/Cargo.toml +++ b/components/backup-stream/Cargo.toml @@ -35,6 +35,7 @@ external_storage_export = { path = "../external_storage/export", default-feature fail = { version = "0.5", optional = true } file_system = { path = "../file_system" } futures = "0.3" +futures-io = "0.3" grpcio = { version = "0.10", default-features = false, features = ["openssl-vendored", "protobuf-codec"] } hex = "0.4" diff --git a/components/backup-stream/src/router.rs b/components/backup-stream/src/router.rs index d35674bffd6..b236cefde77 100644 --- a/components/backup-stream/src/router.rs +++ b/components/backup-stream/src/router.rs @@ -1369,13 +1369,17 @@ struct TaskRange { #[cfg(test)] mod tests { - use std::{ffi::OsStr, time::Duration}; + use std::{ffi::OsStr, marker::Unpin, time::Duration}; + use external_storage::NoopStorage; + use futures::AsyncReadExt; + use futures_io::AsyncRead; use kvproto::brpb::{Local, Noop, StorageBackend, StreamBackupTaskInfo}; use tikv_util::{ codec::number::NumberEncoder, worker::{dummy_scheduler, ReceiverWrapper}, }; + use tokio::{fs::File, sync::Mutex}; use txn_types::{Write, WriteType}; use super::*; @@ -2069,4 +2073,57 @@ mod tests { assert_eq!(ts, global_checkpoint); Ok(()) } + + struct MockCheckContentStorage { + s: NoopStorage, + } + + #[async_trait::async_trait] + impl ExternalStorage for MockCheckContentStorage { + fn name(&self) -> &'static str { + self.s.name() + } + + fn url(&self) -> io::Result { + self.s.url() + } + + async fn write( + &self, + _name: &str, + mut reader: UnpinReader, + content_length: u64, + ) -> io::Result<()> { + let mut data = Vec::new(); + reader.0.read_to_end(&mut data).await?; + let data_len: u64 = data.len() as _; + + if data_len == content_length { + Ok(()) + } else { + Err(io::Error::new( + io::ErrorKind::Other, + "the length of content in reader is not equal with content_length", + )) + } + } + + fn read(&self, name: &str) -> Box { + self.s.read(name) + } + } + + #[tokio::test] + async fn test_est_len_in_flush() -> Result<()> { + let noop_s = NoopStorage::default(); + let ms = MockCheckContentStorage { s: noop_s }; + let file_path = std::env::temp_dir().join(format!("{}", uuid::Uuid::new_v4())); + let mut f = File::create(file_path.clone()).await?; + f.write_all("test-data".as_bytes()).await?; + + let data_file = DataFile::new(file_path).await.unwrap(); + let result = StreamTaskInfo::flush_log_file_to(Arc::new(ms), &Mutex::new(data_file)).await; + assert_eq!(result.is_ok(), true); + Ok(()) + } }