Skip to content

Commit

Permalink
cherry pick tikv#13107 to release-6.2
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
WangLe1321 authored and ti-srebot committed Jul 27, 2022
1 parent 1cb47d2 commit f7cf18f
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 3 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions components/backup-stream/Cargo.toml
Expand Up @@ -35,6 +35,7 @@ external_storage_export = { path = "../external_storage/export", default-feature
fail = { version = "0.5", optional = true }
file_system = { path = "../file_system" }
futures = "0.3"
futures-io = "0.3"

grpcio = { version = "0.10", default-features = false, features = ["openssl-vendored", "protobuf-codec"] }
hex = "0.4"
Expand Down
62 changes: 59 additions & 3 deletions components/backup-stream/src/router.rs
Expand Up @@ -972,8 +972,7 @@ impl StreamTaskInfo {
let stat = reader.metadata().await?;
let reader = UnpinReader(Box::new(limiter.limit(reader.compat())));
let filepath = &data_file.storage_path;
// Once we cannot get the stat of the file, use 4K I/O.
let est_len = stat.len().max(4096);
let est_len = stat.len();

let ret = storage.write(filepath, reader, est_len).await;
match ret {
Expand Down Expand Up @@ -1370,13 +1369,17 @@ struct TaskRange {

#[cfg(test)]
mod tests {
use std::{ffi::OsStr, time::Duration};
use std::{ffi::OsStr, marker::Unpin, time::Duration};

use external_storage::NoopStorage;
use futures::AsyncReadExt;
use futures_io::AsyncRead;
use kvproto::brpb::{Local, Noop, StorageBackend, StreamBackupTaskInfo};
use tikv_util::{
codec::number::NumberEncoder,
worker::{dummy_scheduler, ReceiverWrapper},
};
use tokio::{fs::File, sync::Mutex};
use txn_types::{Write, WriteType};

use super::*;
Expand Down Expand Up @@ -2070,4 +2073,57 @@ mod tests {
assert_eq!(ts, global_checkpoint);
Ok(())
}

struct MockCheckContentStorage {
s: NoopStorage,
}

#[async_trait::async_trait]
impl ExternalStorage for MockCheckContentStorage {
fn name(&self) -> &'static str {
self.s.name()
}

fn url(&self) -> io::Result<url::Url> {
self.s.url()
}

async fn write(
&self,
_name: &str,
mut reader: UnpinReader,
content_length: u64,
) -> io::Result<()> {
let mut data = Vec::new();
reader.0.read_to_end(&mut data).await?;
let data_len: u64 = data.len() as _;

if data_len == content_length {
Ok(())
} else {
Err(io::Error::new(
io::ErrorKind::Other,
"the length of content in reader is not equal with content_length",
))
}
}

fn read(&self, name: &str) -> Box<dyn AsyncRead + Unpin + '_> {
self.s.read(name)
}
}

#[tokio::test]
async fn test_est_len_in_flush() -> Result<()> {
let noop_s = NoopStorage::default();
let ms = MockCheckContentStorage { s: noop_s };
let file_path = std::env::temp_dir().join(format!("{}", uuid::Uuid::new_v4()));
let mut f = File::create(file_path.clone()).await?;
f.write_all("test-data".as_bytes()).await?;

let data_file = DataFile::new(file_path).await.unwrap();
let result = StreamTaskInfo::flush_log_file_to(Arc::new(ms), &Mutex::new(data_file)).await;
assert_eq!(result.is_ok(), true);
Ok(())
}
}

0 comments on commit f7cf18f

Please sign in to comment.