Skip to content

Commit

Permalink
Merge branch 'release-5.3' into release-5.3-3364702e1903
Browse files Browse the repository at this point in the history
  • Loading branch information
glorv committed Jun 14, 2022
2 parents f198359 + 89311ca commit 85d4261
Show file tree
Hide file tree
Showing 34 changed files with 723 additions and 69 deletions.
9 changes: 9 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

68 changes: 41 additions & 27 deletions components/backup/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,23 @@ impl<E: Engine, R: RegionInfoProvider + Clone + 'static> Endpoint<E, R> {
self.config_manager.clone()
}

fn get_config(&self) -> BackendConfig {
BackendConfig {
s3_multi_part_size: self.config_manager.0.read().unwrap().s3_multi_part_size.0 as usize,
hdfs_config: HdfsConfig {
hadoop_home: self.config_manager.0.read().unwrap().hadoop.home.clone(),
linux_user: self
.config_manager
.0
.read()
.unwrap()
.hadoop
.linux_user
.clone(),
},
}
}

fn spawn_backup_worker(
&self,
prs: Arc<Mutex<Progress<R>>>,
Expand All @@ -702,19 +719,21 @@ impl<E: Engine, R: RegionInfoProvider + Clone + 'static> Endpoint<E, R> {
let concurrency_manager = self.concurrency_manager.clone();
let batch_size = self.config_manager.0.read().unwrap().batch_size;
let sst_max_size = self.config_manager.0.read().unwrap().sst_max_size.0;
let config = BackendConfig {
hdfs_config: HdfsConfig {
hadoop_home: self.config_manager.0.read().unwrap().hadoop.home.clone(),
linux_user: self
.config_manager
.0
.read()
.unwrap()
.hadoop
.linux_user
.clone(),
},

// Check if we can open external storage.
let backend = match create_storage(&request.backend, self.get_config()) {
Ok(backend) => backend,
Err(err) => {
error_unknown!(?err; "backup create storage failed");
let mut response = BackupResponse::default();
response.set_error(crate::Error::Io(err).into());
if let Err(err) = tx.unbounded_send(response) {
error_unknown!(?err; "backup failed to send response");
}
return;
}
};
let backend = Arc::<dyn ExternalStorage>::from(backend);

let limit = self.softlimit.limit();
self.pool.borrow_mut().spawn(move || {
Expand All @@ -724,23 +743,9 @@ impl<E: Engine, R: RegionInfoProvider + Clone + 'static> Endpoint<E, R> {
tikv_alloc::remove_thread_memory_accessor();
});

// Check if we can open external storage.
let backend = match create_storage(&request.backend, config) {
Ok(backend) => backend,
Err(err) => {
error_unknown!(?err; "backup create storage failed");
let mut response = BackupResponse::default();
response.set_error(crate::Error::Io(err).into());
if let Err(err) = tx.unbounded_send(response) {
error_unknown!(?err; "backup failed to send response");
}
return;
}
};

let storage = LimitedStorage {
limiter: request.limiter,
storage: Arc::new(backend),
storage: backend.clone(),
};

loop {
Expand Down Expand Up @@ -1149,6 +1154,15 @@ pub mod tests {
assert_eq!(counter.load(Ordering::SeqCst), 0xffff);
}

#[test]
fn test_s3_config() {
let (_tmp, endpoint) = new_endpoint();
assert_eq!(
endpoint.config_manager.0.read().unwrap().s3_multi_part_size,
ReadableSize::mb(5)
);
}

#[test]
fn test_seek_range() {
let (_tmp, endpoint) = new_endpoint();
Expand Down
2 changes: 2 additions & 0 deletions components/cloud/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ rusoto_core = "0.46.0"
thiserror = "1.0"
tikv_util = { path = "../tikv_util", default-features = false }
url = "2.0"
lazy_static = "1.3"
prometheus = { version = "0.12", features = ["nightly"] }

[dev-dependencies]
fail = "0.4"
4 changes: 3 additions & 1 deletion components/cloud/aws/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ rusoto_core = "0.46.0"
rusoto_credential = "0.46.0"
rusoto_kms = { version = "0.46.0", features = ["serialize_structs"] }
rusoto_sts = "0.46.0"
rusoto_s3 = "0.46.0"
rusoto_s3 = { version = "0.46.0", features = ["serialize_structs"] }
slog = { version = "2.3", features = ["max_level_trace", "release_max_level_debug"] }
# better to not use slog-global, but pass in the logger
tokio = { version = "1.5", features = ["time"] }
slog-global = { version = "0.1", git = "https://github.com/breeswish/slog-global.git", rev = "d592f88e4dbba5eb439998463054f1a44fbf17b9" }
tikv_util = { path = "../../tikv_util", default-features = false }
url = "2.0"
lazy_static = "1.3"
prometheus = { version = "0.12", features = ["nightly"] }

[dev-dependencies]
futures = "0.3"
Expand Down
Loading

0 comments on commit 85d4261

Please sign in to comment.