From f7cf18f7ec272a156b22a58f2aca480c0fbf8f87 Mon Sep 17 00:00:00 2001 From: WangLe1321 Date: Wed, 27 Jul 2022 16:17:11 +0800 Subject: [PATCH] cherry pick #13107 to release-6.2 Signed-off-by: ti-srebot --- Cargo.lock | 1 + components/backup-stream/Cargo.toml | 1 + components/backup-stream/src/router.rs | 62 ++++++++++++++++++++++++-- 3 files changed, 61 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 289a2d5c17e..5ee093d7278 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -459,6 +459,7 @@ dependencies = [ "fail", "file_system", "futures 0.3.15", + "futures-io", "grpcio", "hex 0.4.2", "kvproto", diff --git a/components/backup-stream/Cargo.toml b/components/backup-stream/Cargo.toml index 9e8049e0ec0..e2b23ccf5db 100644 --- a/components/backup-stream/Cargo.toml +++ b/components/backup-stream/Cargo.toml @@ -35,6 +35,7 @@ external_storage_export = { path = "../external_storage/export", default-feature fail = { version = "0.5", optional = true } file_system = { path = "../file_system" } futures = "0.3" +futures-io = "0.3" grpcio = { version = "0.10", default-features = false, features = ["openssl-vendored", "protobuf-codec"] } hex = "0.4" diff --git a/components/backup-stream/src/router.rs b/components/backup-stream/src/router.rs index 3e29592a9f4..b236cefde77 100644 --- a/components/backup-stream/src/router.rs +++ b/components/backup-stream/src/router.rs @@ -972,8 +972,7 @@ impl StreamTaskInfo { let stat = reader.metadata().await?; let reader = UnpinReader(Box::new(limiter.limit(reader.compat()))); let filepath = &data_file.storage_path; - // Once we cannot get the stat of the file, use 4K I/O. - let est_len = stat.len().max(4096); + let est_len = stat.len(); let ret = storage.write(filepath, reader, est_len).await; match ret { @@ -1370,13 +1369,17 @@ struct TaskRange { #[cfg(test)] mod tests { - use std::{ffi::OsStr, time::Duration}; + use std::{ffi::OsStr, marker::Unpin, time::Duration}; + use external_storage::NoopStorage; + use futures::AsyncReadExt; + use futures_io::AsyncRead; use kvproto::brpb::{Local, Noop, StorageBackend, StreamBackupTaskInfo}; use tikv_util::{ codec::number::NumberEncoder, worker::{dummy_scheduler, ReceiverWrapper}, }; + use tokio::{fs::File, sync::Mutex}; use txn_types::{Write, WriteType}; use super::*; @@ -2070,4 +2073,57 @@ mod tests { assert_eq!(ts, global_checkpoint); Ok(()) } + + struct MockCheckContentStorage { + s: NoopStorage, + } + + #[async_trait::async_trait] + impl ExternalStorage for MockCheckContentStorage { + fn name(&self) -> &'static str { + self.s.name() + } + + fn url(&self) -> io::Result { + self.s.url() + } + + async fn write( + &self, + _name: &str, + mut reader: UnpinReader, + content_length: u64, + ) -> io::Result<()> { + let mut data = Vec::new(); + reader.0.read_to_end(&mut data).await?; + let data_len: u64 = data.len() as _; + + if data_len == content_length { + Ok(()) + } else { + Err(io::Error::new( + io::ErrorKind::Other, + "the length of content in reader is not equal with content_length", + )) + } + } + + fn read(&self, name: &str) -> Box { + self.s.read(name) + } + } + + #[tokio::test] + async fn test_est_len_in_flush() -> Result<()> { + let noop_s = NoopStorage::default(); + let ms = MockCheckContentStorage { s: noop_s }; + let file_path = std::env::temp_dir().join(format!("{}", uuid::Uuid::new_v4())); + let mut f = File::create(file_path.clone()).await?; + f.write_all("test-data".as_bytes()).await?; + + let data_file = DataFile::new(file_path).await.unwrap(); + let result = StreamTaskInfo::flush_log_file_to(Arc::new(ms), &Mutex::new(data_file)).await; + assert_eq!(result.is_ok(), true); + Ok(()) + } }