Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

external_storage: fix GCS download error, support GCS endpoints, and refactoring (#7734) #7962

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
309 changes: 309 additions & 0 deletions components/external_storage/src/gcs.rs
@@ -0,0 +1,309 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use super::{
util::{block_on_external_io, error_stream, AsyncReadAsSyncStreamOfBytes},
ExternalStorage,
};

use std::{
convert::TryInto,
fmt::Display,
io::{Error, ErrorKind, Read, Result},
sync::Arc,
};

use bytes::Bytes;
use futures_util::{
future::{FutureExt, TryFutureExt},
io::AsyncRead,
stream::TryStreamExt,
};
use kvproto::backup::Gcs as Config;
use reqwest::{Body, Client};
use tame_gcs::{
common::{PredefinedAcl, StorageClass},
objects::{InsertObjectOptional, Metadata, Object},
types::{BucketName, ObjectId},
};
use tame_oauth::gcp::{ServiceAccountAccess, ServiceAccountInfo, TokenOrRequest};

const HARDCODED_ENDPOINTS: &[&str] = &[
"https://www.googleapis.com/upload/storage/v1",
"https://www.googleapis.com/storage/v1",
];

// GCS compatible storage
#[derive(Clone)]
pub struct GCSStorage {
config: Config,
svc_access: Arc<ServiceAccountAccess>,
client: Client,
}

trait ResultExt {
type Ok;

// Maps the error of this result as an `std::io::Error` with `Other` error
// kind.
fn or_io_error<D: Display>(self, msg: D) -> Result<Self::Ok>;

// Maps the error of this result as an `std::io::Error` with `InvalidInput`
// error kind.
fn or_invalid_input<D: Display>(self, msg: D) -> Result<Self::Ok>;
}

impl<T, E: Display> ResultExt for std::result::Result<T, E> {
type Ok = T;
fn or_io_error<D: Display>(self, msg: D) -> Result<T> {
self.map_err(|e| Error::new(ErrorKind::Other, format!("{}: {}", msg, e)))
}
fn or_invalid_input<D: Display>(self, msg: D) -> Result<T> {
self.map_err(|e| Error::new(ErrorKind::InvalidInput, format!("{}: {}", msg, e)))
}
}

impl GCSStorage {
/// Create a new GCS storage for the given config.
pub fn new(config: &Config) -> Result<GCSStorage> {
if config.bucket.is_empty() {
return Err(Error::new(ErrorKind::InvalidInput, "missing bucket name"));
}
if config.credentials_blob.is_empty() {
return Err(Error::new(ErrorKind::InvalidInput, "missing credentials"));
}
let svc_info = ServiceAccountInfo::deserialize(&config.credentials_blob)
.or_invalid_input("invalid credentials_blob")?;
let svc_access =
ServiceAccountAccess::new(svc_info).or_invalid_input("invalid credentials_blob")?;
let client = Client::builder()
.build()
.or_io_error("unable to create reqwest client")?;
Ok(GCSStorage {
config: config.clone(),
svc_access: Arc::new(svc_access),
client,
})
}

fn maybe_prefix_key(&self, key: &str) -> String {
if !self.config.prefix.is_empty() {
return format!("{}/{}", self.config.prefix, key);
}
key.to_owned()
}

fn convert_request<R: 'static>(&self, req: http::Request<R>) -> Result<reqwest::Request>
where
R: AsyncRead + Send + Unpin,
{
let uri = req.uri().to_string();
self.client
.request(req.method().clone(), &uri)
.headers(req.headers().clone())
.body(Body::wrap_stream(AsyncReadAsSyncStreamOfBytes::new(
req.into_body(),
)))
.build()
.or_io_error("failed to build request")
}

async fn convert_response(
&self,
tag: &str,
res: reqwest::Response,
) -> Result<http::Response<Bytes>> {
let mut builder = http::Response::builder()
.status(res.status())
.version(res.version());
for (key, value) in res.headers().iter() {
builder = builder.header(key, value);
}
// convert_response is only used to read access token.
let content = res
.bytes()
.await
.or_io_error(format_args!("failed to read {} response", tag))?;
builder
.body(content)
.or_io_error(format_args!("failed to build {} response body", tag))
}

async fn set_auth(&self, req: &mut reqwest::Request, scope: tame_gcs::Scopes) -> Result<()> {
let token_or_request = self
.svc_access
.get_token(&[scope])
.or_io_error("failed to get token")?;
let token = match token_or_request {
TokenOrRequest::Token(token) => token,
TokenOrRequest::Request {
request,
scope_hash,
..
} => {
let res = self
.client
.execute(request.into())
.await
.or_io_error("request GCS access token failed")?;
let response = self.convert_response("GCS access token", res).await?;

self.svc_access
.parse_token_response(scope_hash, response)
.or_io_error("failed to parse GCS token response")?
}
};
req.headers_mut().insert(
http::header::AUTHORIZATION,
token
.try_into()
.or_io_error("failed to set GCS auth token")?,
);

Ok(())
}

async fn make_request(
&self,
mut req: reqwest::Request,
scope: tame_gcs::Scopes,
) -> Result<reqwest::Response> {
// replace the hard-coded GCS endpoint by the custom one.
let endpoint = self.config.get_endpoint();
if !endpoint.is_empty() {
let url = req.url().as_str();
for hardcoded in HARDCODED_ENDPOINTS {
if url.starts_with(hardcoded) {
*req.url_mut() = reqwest::Url::parse(
&[endpoint.trim_end_matches('/'), &url[hardcoded.len()..]].concat(),
)
.or_invalid_input("invalid custom GCS endpoint")?;
break;
}
}
}

self.set_auth(&mut req, scope).await?;
let response = self
.client
.execute(req)
.await
.or_io_error("make GCS request failed")?;
if !response.status().is_success() {
let status = response.status();
let text = response.text().await.or_io_error(format_args!(
"GCS request failed and failed to read error message, status: {}, error",
status
))?;
return Err(Error::new(
ErrorKind::Other,
format!("request failed. status: {}, text: {}", status, text),
));
}

Ok(response)
}

fn error_to_async_read<E>(kind: ErrorKind, e: E) -> Box<dyn AsyncRead + Unpin>
where
E: Into<Box<dyn std::error::Error + Send + Sync>>,
{
Box::new(error_stream(Error::new(kind, e)).into_async_read())
}
}

impl ExternalStorage for GCSStorage {
fn write(
&self,
name: &str,
reader: Box<dyn AsyncRead + Send + Unpin>,
content_length: u64,
) -> Result<()> {
use std::convert::TryFrom;

let key = self.maybe_prefix_key(name);
debug!("save file to GCS storage"; "key" => %key);
let bucket = BucketName::try_from(self.config.bucket.clone()).map_err(|e| {
Error::new(
ErrorKind::InvalidInput,
format!("invalid bucket {}: {}", self.config.bucket, e),
)
})?;
let storage_class: Option<StorageClass> = if self.config.storage_class.is_empty() {
None
} else {
Some(
serde_json::from_str(&self.config.storage_class).or_invalid_input(format_args!(
"invalid storage_class {}",
self.config.storage_class
))?,
)
};
// Convert manually since PredefinedAcl doesn't implement Deserialize.
let predefined_acl = match self.config.predefined_acl.as_ref() {
"" => None,
"authenticatedRead" => Some(PredefinedAcl::AuthenticatedRead),
"bucketOwnerFullControl" => Some(PredefinedAcl::BucketOwnerFullControl),
"bucketOwnerRead" => Some(PredefinedAcl::BucketOwnerRead),
"private" => Some(PredefinedAcl::Private),
"projectPrivate" => Some(PredefinedAcl::ProjectPrivate),
"publicRead" => Some(PredefinedAcl::PublicRead),
_ => {
return Err(Error::new(
ErrorKind::InvalidInput,
format!("invalid predefined_acl {}", self.config.predefined_acl),
));
}
};
let metadata = Metadata {
name: Some(key),
storage_class,
..Default::default()
};
let optional = Some(InsertObjectOptional {
predefined_acl,
..Default::default()
});
let req = Object::insert_multipart(&bucket, reader, content_length, &metadata, optional)
.or_io_error("failed to create GCS insert request")?;
block_on_external_io(
self.make_request(self.convert_request(req)?, tame_gcs::Scopes::ReadWrite),
)?;
Ok(())
}

fn read(&self, name: &str) -> Box<dyn AsyncRead + Unpin + '_> {
let bucket = self.config.bucket.clone();
let name = self.maybe_prefix_key(name);
debug!("read file from GCS storage"; "key" => %name);
let oid = match ObjectId::new(bucket, name) {
Ok(oid) => oid,
Err(e) => return GCSStorage::error_to_async_read(ErrorKind::InvalidInput, e),
};
let request = match Object::download(&oid, None /*optional*/) {
Ok(request) => request,
Err(e) => return GCSStorage::error_to_async_read(ErrorKind::Other, e),
};
// The body is actually an std::io::Empty. The use of read_to_end is only to convert it
// into something convenient to convert into reqwest::Body.
let (parts, mut body) = request.into_parts();
let mut body_content = vec![];
if let Err(e) = body.read_to_end(&mut body_content) {
return GCSStorage::error_to_async_read(ErrorKind::Other, e);
}

Box::new(
self.make_request(
http::Request::from_parts(parts, body_content).into(),
tame_gcs::Scopes::ReadOnly,
)
.boxed() // this `.boxed()` pin the future.
.map_ok(|response| {
response.bytes_stream().map_err(|e| {
Error::new(ErrorKind::Other, format!("download from gcs error {}", e))
})
})
.try_flatten_stream()
.into_async_read(),
)
}
}
8 changes: 8 additions & 0 deletions components/external_storage/src/util.rs
Expand Up @@ -55,6 +55,14 @@ pub fn error_stream(e: io::Error) -> impl Stream<Item = io::Result<Bytes>> + Unp
}

/// Runs a future on the current thread involving external storage.
///
/// # Caveat
///
/// This function must never be nested. The future invoked by
/// `block_on_external_io` must never call `block_on_external_io` again itself,
/// otherwise the executor's states may be disrupted.
///
/// This means the future must only use async functions.
// FIXME: get rid of this function, so that futures_executor::block_on is sufficient.
pub fn block_on_external_io<F: Future>(f: F) -> F::Output {
// we need a Tokio runtime rather than futures_executor::block_on because
Expand Down
2 changes: 1 addition & 1 deletion src/import/errors.rs
Expand Up @@ -135,7 +135,7 @@ quick_error! {
}
CannotReadExternalStorage(url: String, name: String, err: IoError) {
cause(err)
display("Cannot read {}/{}", url, name)
display("Cannot read {}/{}: {}", url, name, err)
}
WrongKeyPrefix(what: &'static str, key: Vec<u8>, prefix: Vec<u8>) {
display("\
Expand Down
22 changes: 18 additions & 4 deletions src/import/sst_importer.rs
Expand Up @@ -3,7 +3,11 @@
use std::borrow::Cow;
use std::fmt;
use std::fs::{self, File, OpenOptions};
<<<<<<< HEAD:src/import/sst_importer.rs
use std::io::Write as _;
=======
use std::io::{self, Write};
>>>>>>> 1bdab7b... external_storage: fix GCS download error, support GCS endpoints, and refactoring (#7734):components/sst_importer/src/sst_importer.rs
use std::ops::Bound;
use std::path::{Path, PathBuf};
use std::time::Instant;
Expand Down Expand Up @@ -115,11 +119,11 @@ impl SSTImporter {
);
match self.do_download(meta, backend, name, rewrite_rule, speed_limiter, sst_writer) {
Ok(r) => {
info!("download"; "meta" => ?meta, "range" => ?r);
info!("download"; "meta" => ?meta, "name" => name, "range" => ?r);
Ok(r)
}
Err(e) => {
error!("download failed"; "meta" => ?meta, "err" => %e);
error!("download failed"; "meta" => ?meta, "name" => name, "err" => %e);
Err(e)
}
}
Expand Down Expand Up @@ -151,8 +155,18 @@ impl SSTImporter {
Error::CannotReadExternalStorage(url.to_string(), name.to_owned(), e)
})?;
if meta.length != 0 && meta.length != file_length {
let reason = format!("length {}, expect {}", file_length, meta.length);
return Err(Error::FileCorrupted(path.temp, reason));
let reason = format!(
"downloaded size {}, expected {}, local path {}",
file_length,
meta.length,
path.temp.display()
);
let reason = io::Error::new(io::ErrorKind::InvalidData, reason);
return Err(Error::CannotReadExternalStorage(
url.to_string(),
name.to_owned(),
reason,
));
}
IMPORTER_DOWNLOAD_BYTES.observe(file_length as _);
file_writer.into_inner().sync_data()?;
Expand Down