Skip to content

Commit

Permalink
Get range headers when checking the status of the upload (#258)
Browse files Browse the repository at this point in the history
* Get range headers when checking the status of the upload

* Update reqwest and reqwest middleware

* Fix resumable status tests

* Fix doc test
  • Loading branch information
praveenperera committed May 15, 2024
1 parent 83ae2b5 commit 3ec403e
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 57 deletions.
90 changes: 45 additions & 45 deletions storage/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,68 +1,68 @@
[package]
name = "google-cloud-storage"
version = "0.18.0"
edition = "2021"
authors = ["yoshidan <naohiro.y@gmail.com>"]
repository = "https://github.com/yoshidan/google-cloud-rust/tree/main/storage"
description = "Google Cloud Platform storage client library."
documentation = "https://docs.rs/google-cloud-storage/latest/google_cloud_storage/"
edition = "2021"
keywords = ["gcp", "gcs", "storage", "googleapis", "google-cloud-rust"]
license = "MIT"
name = "google-cloud-storage"
readme = "README.md"
description = "Google Cloud Platform storage client library."
documentation = "https://docs.rs/google-cloud-storage/latest/google_cloud_storage/"
repository = "https://github.com/yoshidan/google-cloud-rust/tree/main/storage"
version = "0.18.0"

[dependencies]
google-cloud-token = { version = "0.1.1", path = "../foundation/token" }
pkcs8 = { version = "0.10", features = ["pem"] }
thiserror = "1.0"
time = { version = "0.3", features = [
"std",
"macros",
"formatting",
"parsing",
"serde",
] }
anyhow = "1.0"
async-stream = "0.3"
async-trait = "0.1"
base64 = "0.21"
bytes = "1.5"
futures-util = "0.3"
google-cloud-token = {version = "0.1.1", path = "../foundation/token"}
hex = "0.4"
once_cell = "1.18"
percent-encoding = "2.3"
pkcs8 = {version = "0.10", features = ["pem"]}
regex = "1.9"
sha2 = "0.10"
reqwest = {version = "0.12", features = [
"json",
"stream",
"multipart",
], default-features = false}
reqwest-middleware = {version = "0.3", features = ["json", "multipart"]}
ring = "0.17"
tokio = { version = "1.32", features = ["macros"] }
async-stream = "0.3"
once_cell = "1.18"
hex = "0.4"
url = "2.4"
tracing = "0.1"
reqwest = { version = "0.12.4", features = [
"json",
"stream",
"multipart",
], default-features = false }
reqwest-middleware = { version = "0.3", features = ["json", "multipart"] }
anyhow = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde = {version = "1.0", features = ["derive"]}
serde_json = "1.0"
percent-encoding = "2.3"
futures-util = "0.3"
bytes = "1.5"
async-trait = "0.1"
sha2 = "0.10"
thiserror = "1.0"
time = {version = "0.3", features = [
"std",
"macros",
"formatting",
"parsing",
"serde",
]}
tokio = {version = "1.32", features = ["macros"]}
tracing = "0.1"
url = "2.4"

google-cloud-metadata = { optional = true, version = "0.5", path = "../foundation/metadata" }
google-cloud-auth = { optional = true, version = "0.15", path = "../foundation/auth", default-features = false }
google-cloud-auth = {optional = true, version = "0.15", path = "../foundation/auth", default-features = false}
google-cloud-metadata = {optional = true, version = "0.5", path = "../foundation/metadata"}

[dev-dependencies]
tokio = { version = "1.32", features = ["rt-multi-thread"] }
serial_test = "0.9"
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
ctor = "0.1.26"
tokio-util = { version = "0.7", features = ["codec"] }
google-cloud-auth = { path = "../foundation/auth", default-features = false }
google-cloud-auth = {path = "../foundation/auth", default-features = false}
reqwest-retry = "0.5.0"
retry-policies = "0.3.0"
serial_test = "0.9"
tokio = {version = "1.32", features = ["rt-multi-thread"]}
tokio-util = {version = "0.7", features = ["codec"]}
tracing-subscriber = {version = "0.3.17", features = ["env-filter"]}

[features]
auth = ["google-cloud-auth", "google-cloud-metadata"]
default = ["default-tls", "auth"]
default-tls = ["reqwest/default-tls", "google-cloud-auth?/default-tls"]
rustls-tls = ["reqwest/rustls-tls", "google-cloud-auth?/rustls-tls"]
external-account = ["google-cloud-auth?/external-account"]
hickory-dns = ["reqwest/hickory-dns", "google-cloud-auth?/hickory-dns"]
rustls-tls = ["reqwest/rustls-tls", "google-cloud-auth?/rustls-tls"]
trace = []
auth = ["google-cloud-auth", "google-cloud-metadata"]
external-account = ["google-cloud-auth?/external-account"]
4 changes: 4 additions & 0 deletions storage/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ pub enum Error {
/// An error from a token source.
#[error("token source failed: {0}")]
TokenSource(Box<dyn std::error::Error + Send + Sync>),

/// Invalid Range error
#[error("invalid range header, received: {0}")]
InvalidRangeHeader(String),
}

impl From<reqwest_middleware::Error> for Error {
Expand Down
48 changes: 42 additions & 6 deletions storage/src/http/resumable_upload_client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::fmt;

use reqwest::header::{CONTENT_LENGTH, CONTENT_RANGE};
use reqwest::header::{CONTENT_LENGTH, CONTENT_RANGE, RANGE};
use reqwest::{Body, Response};
use reqwest_middleware::ClientWithMiddleware as Client;

Expand All @@ -20,7 +20,14 @@ pub enum ChunkError {
#[allow(clippy::large_enum_variant)]
pub enum UploadStatus {
Ok(Object),
ResumeIncomplete,
NotStarted,
ResumeIncomplete(UploadedRange),
}

#[derive(PartialEq, Debug)]
pub struct UploadedRange {
pub first_byte: u64,
pub last_byte: u64,
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -140,11 +147,40 @@ impl ResumableUploadClient {
}

async fn map_resume_response(response: Response) -> Result<UploadStatus, Error> {
if response.status() == 308 {
Ok(UploadStatus::ResumeIncomplete)
} else {
if response.status() != 308 {
let response = check_response_status(response).await?;
Ok(UploadStatus::Ok(response.json::<Object>().await?))
return Ok(UploadStatus::Ok(response.json::<Object>().await?));
}

let range = response.headers().get(RANGE);

if range.is_none() {
return Ok(UploadStatus::NotStarted);
}

let range = range
.unwrap()
.to_str()
.map_err(|error| Error::InvalidRangeHeader(error.to_string()))?;

let range = range
.split('=')
.nth(1)
.ok_or_else(|| Error::InvalidRangeHeader(range.to_string()))?;

let start_end: Vec<&str> = range.split('-').collect();
let first_byte = start_end
.first()
.unwrap_or(&"0")
.parse::<u64>()
.map_err(|_| Error::InvalidRangeHeader(range.to_string()))?;

let last_byte = start_end
.get(1)
.ok_or_else(|| Error::InvalidRangeHeader(range.to_string()))?
.parse::<u64>()
.map_err(|_| Error::InvalidRangeHeader(range.to_string()))?;

Ok(UploadStatus::ResumeIncomplete(UploadedRange { first_byte, last_byte }))
}
}
45 changes: 39 additions & 6 deletions storage/src/http/storage_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1108,7 +1108,13 @@ impl StorageClient {
/// // The chunk size should be multiple of 256KiB, unless it's the last chunk that completes the upload.
/// let chunk1 = ChunkSize::new(0, chunk1_size - 1, total_size.clone());
/// let status1 = uploader.upload_multiple_chunk(chunk1_data.clone(), &chunk1).await.unwrap();
/// assert_eq!(status1, UploadStatus::ResumeIncomplete);
/// assert_eq!(
/// status1,
/// UploadStatus::ResumeIncomplete(UploadedRange {
/// first_byte: 0,
/// last_byte: chunk1_data.len() as u64 - 1
/// })
/// );
///
/// let chunk2 = ChunkSize::new(chunk1_size, chunk1_size + chunk2_size - 1, total_size.clone());
/// let status2 = uploader.upload_multiple_chunk(chunk2_data.clone(), &chunk2).await.unwrap();
Expand Down Expand Up @@ -1396,7 +1402,7 @@ pub(crate) mod test {
use crate::http::objects::rewrite::RewriteObjectRequest;
use crate::http::objects::upload::{Media, UploadObjectRequest, UploadType};
use crate::http::objects::{Object, SourceObjects};
use crate::http::resumable_upload_client::{ChunkSize, UploadStatus};
use crate::http::resumable_upload_client::{ChunkSize, UploadStatus, UploadedRange};
use crate::http::storage_client::{StorageClient, SCOPES};

#[ctor::ctor]
Expand Down Expand Up @@ -2184,11 +2190,24 @@ pub(crate) mod test {
.upload_multiple_chunk(chunk1_data.clone(), &chunk1)
.await
.unwrap();
assert_eq!(status1, UploadStatus::ResumeIncomplete);

assert_eq!(
status1,
UploadStatus::ResumeIncomplete(UploadedRange {
first_byte: 0,
last_byte: chunk1_data.len() as u64 - 1
})
);

tracing::info!("check status chunk1");
let status_check = uploader.status(total_size).await.unwrap();
assert_eq!(status_check, UploadStatus::ResumeIncomplete);
assert_eq!(
status_check,
UploadStatus::ResumeIncomplete(UploadedRange {
first_byte: 0,
last_byte: chunk1_data.len() as u64 - 1
})
);

let chunk2 = ChunkSize::new(
chunk1_data.len() as u64,
Expand Down Expand Up @@ -2287,14 +2306,28 @@ pub(crate) mod test {
.upload_multiple_chunk(chunk1_data.clone(), &chunk1)
.await
.unwrap();
assert_eq!(status1, UploadStatus::ResumeIncomplete);

assert_eq!(
status1,
UploadStatus::ResumeIncomplete(UploadedRange {
first_byte: 0,
last_byte: chunk1_data.len() as u64 - 1
})
);

tracing::info!("upload chunk1 resume {:?}", chunk1);
let status1 = uploader
.upload_multiple_chunk(chunk1_data.clone(), &chunk1)
.await
.unwrap();
assert_eq!(status1, UploadStatus::ResumeIncomplete);

assert_eq!(
status1,
UploadStatus::ResumeIncomplete(UploadedRange {
first_byte: 0,
last_byte: chunk1_data.len() as u64 - 1
})
);

// total size is required for final chunk.
let remaining = chunk1_data.len() as u64 + chunk2_data.len() as u64;
Expand Down

0 comments on commit 3ec403e

Please sign in to comment.