Skip to content

Commit

Permalink
Tag S3 objects
Browse files Browse the repository at this point in the history
  • Loading branch information
petuhovskiy committed Mar 19, 2024
1 parent e9a8aec commit c40b754
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 4 deletions.
10 changes: 10 additions & 0 deletions libs/remote_storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,16 @@ impl GenericRemoteStorage {
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct StorageMetadata(HashMap<String, String>);

impl<const N: usize> From<[(&str, &str); N]> for StorageMetadata {
fn from(arr: [(&str, &str); N]) -> Self {
let map: HashMap<String, String> = arr
.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect();
Self(map)
}
}

/// External backup storage configuration, enough for creating a client for that storage.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RemoteStorageConfig {
Expand Down
32 changes: 29 additions & 3 deletions safekeeper/src/wal_backup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::time::Duration;
use postgres_ffi::v14::xlog_utils::XLogSegNoOffsetToRecPtr;
use postgres_ffi::XLogFileName;
use postgres_ffi::{XLogSegNo, PG_TLI};
use remote_storage::{GenericRemoteStorage, RemotePath};
use remote_storage::{GenericRemoteStorage, RemotePath, StorageMetadata};
use tokio::fs::File;

use tokio::select;
Expand Down Expand Up @@ -500,7 +500,27 @@ fn get_segments(start: Lsn, end: Lsn, seg_size: usize) -> Vec<Segment> {
res
}

pub async fn backup_object(
async fn backup_object(
source_file: &Utf8Path,
target_file: &RemotePath,
size: usize,
) -> Result<()> {
let storage = get_configured_remote_storage();

let file = File::open(&source_file)
.await
.with_context(|| format!("Failed to open file {source_file:?} for wal backup"))?;

let file = tokio_util::io::ReaderStream::with_capacity(file, BUFFER_SIZE);

let cancel = CancellationToken::new();

storage
.upload_storage_object(file, size, target_file, &cancel)
.await
}

pub(crate) async fn backup_partial_segment(
source_file: &Utf8Path,
target_file: &RemotePath,
size: usize,
Expand All @@ -519,7 +539,13 @@ pub async fn backup_object(
let cancel = CancellationToken::new();

storage
.upload_storage_object(file, size, target_file, &cancel)
.upload(
file,
size,
target_file,
Some(StorageMetadata::from([("sk_type", "partial_segment")])),
&cancel,
)
.await
}

Expand Down
2 changes: 1 addition & 1 deletion safekeeper/src/wal_backup_partial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl PartialBackup {
let remote_path = RemotePath::new(self.remote_prefix.join(&prepared.name).as_ref())?;

// Upload first `backup_bytes` bytes of the segment to the remote storage.
wal_backup::backup_object(&local_path, &remote_path, backup_bytes).await?;
wal_backup::backup_partial_segment(&local_path, &remote_path, backup_bytes).await?;
PARTIAL_BACKUP_UPLOADED_BYTES.inc_by(backup_bytes as u64);

// We uploaded the segment, now let's verify that the data is still actual.
Expand Down

0 comments on commit c40b754

Please sign in to comment.