Skip to content

Commit

Permalink
*: fix merge conflict
Browse files Browse the repository at this point in the history
Signed-off-by: kennytm <kennytm@gmail.com>
  • Loading branch information
kennytm committed May 29, 2020
1 parent cfffc34 commit 14a38c9
Show file tree
Hide file tree
Showing 8 changed files with 15 additions and 145 deletions.
43 changes: 4 additions & 39 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Expand Up @@ -92,6 +92,7 @@ futures-cpupool = "0.1"
futures-executor = "0.3"
futures-util = { version = "0.3", default-features = false, features = ["io", "io-compat"] }
tikv_alloc = { path = "components/tikv_alloc", default-features = false }
tokio = { version = "0.2.13", features = ["time"] }
tokio-core = "0.1"
tokio-timer = "0.2"
tokio-executor = "0.1"
Expand Down
4 changes: 0 additions & 4 deletions components/external_storage/Cargo.toml
Expand Up @@ -9,13 +9,9 @@ futures = "0.3.1"
futures-executor = "0.3"
futures-io = "0.3"
futures-util = { version = "0.3", default-features = false, features = ["io"] }
<<<<<<< HEAD
=======
http = "0.2.0"
kvproto = { git = "https://github.com/pingcap/kvproto.git", default-features = false }
hyper = "0.13.3"
hyper-tls = "0.4.1"
>>>>>>> 3c667df... Improve robustness of Backup/Restore involving external_storage (#7917)
rand = "0.7"
rusoto_core = "0.43.0"
rusoto_s3 = "0.43.0"
Expand Down
2 changes: 2 additions & 0 deletions components/external_storage/src/lib.rs
Expand Up @@ -40,6 +40,8 @@ pub use s3::S3Storage;
mod util;
pub use util::block_on_external_io;

pub const READ_BUF_SIZE: usize = 1024 * 1024 * 2;

/// Create a new storage from the given storage backend description.
pub fn create_storage(backend: &StorageBackend) -> io::Result<Arc<dyn ExternalStorage>> {
match &backend.backend {
Expand Down
27 changes: 0 additions & 27 deletions components/external_storage/src/s3.rs
Expand Up @@ -22,8 +22,6 @@ use super::{
};
use kvproto::backup::S3 as Config;

const READ_BUF_SIZE: usize = 1024 * 1024 * 2;

/// S3 compatible storage
#[derive(Clone)]
pub struct S3Storage {
Expand Down Expand Up @@ -283,36 +281,11 @@ impl ExternalStorage for S3Storage {
) -> io::Result<()> {
let key = self.maybe_prefix_key(name);
debug!("save file to s3 storage"; "key" => %key);
<<<<<<< HEAD
let get_var = |s: &String| {
if s.is_empty() {
None
} else {
Some(s.clone())
}
};
let req = PutObjectRequest {
key,
bucket: self.config.bucket.clone(),
body: Some(ByteStream::new(
AsyncReadAsSyncStreamOfBytes::with_capacity(reader, READ_BUF_SIZE),
)),
content_length: Some(content_length as i64),
acl: get_var(&self.config.acl),
server_side_encryption: get_var(&self.config.sse),
storage_class: get_var(&self.config.storage_class),
..Default::default()
};
block_on_external_io(self.client.put_object(req))
.map(|_| ())
.map_err(|e| Error::new(ErrorKind::Other, format!("failed to put object {}", e)))
=======

let uploader = S3Uploader::new(&self.client, &self.config, key);
block_on_external_io(uploader.run(&mut *reader, content_length)).map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("failed to put object {}", e))
})
>>>>>>> 3c667df... Improve robustness of Backup/Restore involving external_storage (#7917)
}

fn read(&self, name: &str) -> Box<dyn AsyncRead + Unpin + '_> {
Expand Down
13 changes: 0 additions & 13 deletions components/external_storage/src/util.rs
Expand Up @@ -2,12 +2,8 @@

use bytes::Bytes;
use futures::stream::{self, Stream};
<<<<<<< HEAD
use futures_io::AsyncRead;
=======
use futures_util::io::AsyncRead;
use rand::{thread_rng, Rng};
>>>>>>> 3c667df... Improve robustness of Backup/Restore involving external_storage (#7917)
use std::{
future::Future,
io, iter,
Expand All @@ -31,15 +27,6 @@ pub struct AsyncReadAsSyncStreamOfBytes<R> {
buf: Vec<u8>,
}

impl<R> AsyncReadAsSyncStreamOfBytes<R> {
pub fn with_capacity(reader: R, capacity: usize) -> Self {
Self {
reader: Mutex::new(reader),
buf: vec![0; capacity],
}
}
}

impl<R: AsyncRead + Unpin> Stream for AsyncReadAsSyncStreamOfBytes<R> {
type Item = io::Result<Bytes>;

Expand Down
64 changes: 7 additions & 57 deletions src/import/sst_importer.rs
Expand Up @@ -3,45 +3,24 @@
use std::borrow::Cow;
use std::fmt;
use std::fs::{self, File, OpenOptions};
<<<<<<< HEAD:src/import/sst_importer.rs
use std::io::Write as _;
use std::ops::Bound;
use std::path::{Path, PathBuf};
use std::time::Instant;
=======
use std::io::{self, Write};
use std::io::{self, Write as _};
use std::marker::Unpin;
use std::ops::Bound;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, Instant};
>>>>>>> 3c667df... Improve robustness of Backup/Restore involving external_storage (#7917):components/sst_importer/src/sst_importer.rs

use futures_util::io::{AsyncRead, AsyncReadExt};
use kvproto::backup::StorageBackend;
use kvproto::import_sstpb::*;
<<<<<<< HEAD:src/import/sst_importer.rs
use tokio::time::timeout;
use uuid::Uuid;

use crate::storage::mvcc::Write;
use crate::storage::Key;
use engine::rocks::util::{get_cf_handle, prepare_sst_for_ingestion, validate_sst_for_ingestion};
use engine::rocks::{IngestExternalFileOptions, SeekKey, SstReader, SstWriter, DB};
use engine::CF_WRITE;
use external_storage::{block_on_external_io, create_storage, url_of_backend};
use futures_util::io::{copy, AllowStdIo};
=======
use tokio::time::timeout;
use uuid::{Builder as UuidBuilder, Uuid};

use encryption::DataKeyManager;
use engine_rocks::{encryption::get_env, RocksSstReader};
use engine_traits::{
EncryptionKeyManager, IngestExternalFileOptions, Iterator, KvEngine, SeekKey, SstExt,
SstReader, SstWriter, CF_DEFAULT, CF_WRITE,
};
use external_storage::{block_on_external_io, create_storage, url_of_backend, READ_BUF_SIZE};
>>>>>>> 3c667df... Improve robustness of Backup/Restore involving external_storage (#7917):components/sst_importer/src/sst_importer.rs
use tikv_util::time::Limiter;

use super::metrics::*;
Expand Down Expand Up @@ -148,12 +127,9 @@ impl SSTImporter {
}
}

<<<<<<< HEAD:src/import/sst_importer.rs
fn do_download(
=======
async fn read_external_storage_into_file(
input: &mut (dyn AsyncRead + Unpin),
output: &mut dyn Write,
output: &mut File,
speed_limiter: &Limiter,
expected_length: u64,
min_read_speed: usize,
Expand Down Expand Up @@ -193,8 +169,7 @@ impl SSTImporter {
Ok(())
}

fn do_download<E: KvEngine>(
>>>>>>> 3c667df... Improve robustness of Backup/Restore involving external_storage (#7917):components/sst_importer/src/sst_importer.rs
fn do_download(
&self,
meta: &SSTMeta,
backend: &StorageBackend,
Expand All @@ -208,32 +183,11 @@ impl SSTImporter {
let url = url_of_backend(backend);

{
<<<<<<< HEAD:src/import/sst_importer.rs
let mut file_writer = AllowStdIo::new(File::create(&path.temp)?);
let file_length =
block_on_external_io(copy(ext_reader, &mut file_writer)).map_err(|e| {
Error::CannotReadExternalStorage(url.to_string(), name.to_owned(), e)
})?;
if meta.length != 0 && meta.length != file_length {
let reason = format!("length {}, expect {}", file_length, meta.length);
return Err(Error::FileCorrupted(path.temp, reason));
}
IMPORTER_DOWNLOAD_BYTES.observe(file_length as _);
file_writer.into_inner().sync_data()?;
=======
// prepare to download the file from the external_storage
let ext_storage = create_storage(backend)?;
let mut ext_reader = ext_storage.read(name);

let mut plain_file;
let mut encrypted_file;
let file_writer: &mut dyn Write = if let Some(key_manager) = &self.key_manager {
encrypted_file = key_manager.create_file(&path.temp)?;
&mut encrypted_file
} else {
plain_file = File::create(&path.temp)?;
&mut plain_file
};
let mut file_writer = File::create(&path.temp)?;

// the minimum speed of reading data, in bytes/second.
// if reading speed is slower than this rate, we will stop with
Expand All @@ -243,7 +197,7 @@ impl SSTImporter {

block_on_external_io(Self::read_external_storage_into_file(
&mut ext_reader,
file_writer,
&mut file_writer,
&speed_limiter,
meta.length,
MINIMUM_READ_SPEED,
Expand All @@ -257,11 +211,7 @@ impl SSTImporter {
)
})?;

OpenOptions::new()
.append(true)
.open(&path.temp)?
.sync_data()?;
>>>>>>> 3c667df... Improve robustness of Backup/Restore involving external_storage (#7917):components/sst_importer/src/sst_importer.rs
file_writer.sync_data()?;
}

// now validate the SST file.
Expand Down
6 changes: 1 addition & 5 deletions src/import/sst_service.rs
Expand Up @@ -173,15 +173,11 @@ impl<Router: RaftStoreRouter> ImportSst for ImportSSTService<Router> {
.unwrap();

ctx.spawn(self.threads.spawn_fn(move || {
<<<<<<< HEAD
let res = importer.download(
=======
// FIXME: download() should be an async fn, to allow BR to cancel
// a download task.
// Unfortunately, this currently can't happen because the S3Storage
// is not Send + Sync. See the documentation of S3Storage for reason.
let res = importer.download::<RocksEngine>(
>>>>>>> 3c667df... Improve robustness of Backup/Restore involving external_storage (#7917)
let res = importer.download(
req.get_sst(),
req.get_storage_backend(),
req.get_name(),
Expand Down

0 comments on commit 14a38c9

Please sign in to comment.