Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: Use Attributes API to reduce the amount of ObjectStore instances #8517

Merged
merged 1 commit into from
Apr 25, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 36 additions & 58 deletions src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ use object_store::local::LocalFileSystem;
use object_store::memory::InMemory;
use object_store::path::Path;
use object_store::prefix::PrefixStore;
use object_store::{ClientOptions, ObjectStore, Result};
use reqwest::header::CACHE_CONTROL;
use reqwest::header::{HeaderMap, HeaderValue};
use object_store::{Attribute, Attributes, ClientOptions, ObjectStore, Result};
use secrecy::{ExposeSecret, SecretString};
use std::fs;
use std::path::PathBuf;
Expand Down Expand Up @@ -110,14 +108,8 @@ impl StorageConfig {

pub struct Storage {
cdn_prefix: Option<String>,

store: Box<dyn ObjectStore>,
crate_upload_store: Box<dyn ObjectStore>,
readme_upload_store: Box<dyn ObjectStore>,
db_dump_upload_store: Arc<dyn ObjectStore>,

index_store: Box<dyn ObjectStore>,
index_upload_store: Box<dyn ObjectStore>,
store: Arc<dyn ObjectStore>,
index_store: Arc<dyn ObjectStore>,
}

impl Storage {
Expand All @@ -130,37 +122,24 @@ impl Storage {

match &config.backend {
StorageBackend::S3 { default, index } => {
let options = ClientOptions::default();
let store = build_s3(default, options);

let options = client_options(CONTENT_TYPE_CRATE, CACHE_CONTROL_IMMUTABLE);
let crate_upload_store = build_s3(default, options);

let options = client_options(CONTENT_TYPE_README, CACHE_CONTROL_README);
let readme_upload_store = build_s3(default, options);
let options = ClientOptions::default()
// The `BufWriter::new()` API currently does not allow
// specifying any file attributes, so we need to set the
// content type here instead for the database dump upload.
.with_content_type_for_suffix("gz", CONTENT_TYPE_DB_DUMP);

let options =
ClientOptions::default().with_default_content_type(CONTENT_TYPE_DB_DUMP);
let db_dump_upload_store = build_s3(default, options);

let options = ClientOptions::default();
let index_store = build_s3(index, options);
let store = build_s3(default, options);

let options = client_options(CONTENT_TYPE_INDEX, CACHE_CONTROL_INDEX);
let index_upload_store = build_s3(index, options);
let index_store = build_s3(index, Default::default());

if cdn_prefix.is_none() {
panic!("Missing S3_CDN environment variable");
}

Self {
store: Box::new(store),
crate_upload_store: Box::new(crate_upload_store),
readme_upload_store: Box::new(readme_upload_store),
db_dump_upload_store: Arc::new(db_dump_upload_store),
cdn_prefix,
index_store: Box::new(index_store),
index_upload_store: Box::new(index_upload_store),
store: Arc::new(store),
index_store: Arc::new(index_store),
}
}

Expand All @@ -185,13 +164,9 @@ impl Storage {
let index_store: Arc<dyn ObjectStore> = Arc::new(local_index);

Self {
store: Box::new(store.clone()),
crate_upload_store: Box::new(store.clone()),
readme_upload_store: Box::new(store.clone()),
db_dump_upload_store: store,
cdn_prefix,
index_store: Box::new(index_store.clone()),
index_upload_store: Box::new(index_store),
store,
index_store,
}
}

Expand All @@ -200,13 +175,9 @@ impl Storage {
let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());

Self {
store: Box::new(store.clone()),
crate_upload_store: Box::new(store.clone()),
readme_upload_store: Box::new(store.clone()),
db_dump_upload_store: store.clone(),
cdn_prefix,
index_store: Box::new(PrefixStore::new(store.clone(), "index")),
index_upload_store: Box::new(PrefixStore::new(store, "index")),
store: store.clone(),
index_store: Arc::new(PrefixStore::new(store, "index")),
}
}
}
Expand Down Expand Up @@ -253,22 +224,38 @@ impl Storage {
#[instrument(skip(self, bytes))]
pub async fn upload_crate_file(&self, name: &str, version: &str, bytes: Bytes) -> Result<()> {
let path = crate_file_path(name, version);
self.crate_upload_store.put(&path, bytes.into()).await?;
let attributes = Attributes::from_iter([
(Attribute::ContentType, CONTENT_TYPE_CRATE),
(Attribute::CacheControl, CACHE_CONTROL_IMMUTABLE),
]);
let opts = attributes.into();
self.store.put_opts(&path, bytes.into(), opts).await?;
Ok(())
}

#[instrument(skip(self, bytes))]
pub async fn upload_readme(&self, name: &str, version: &str, bytes: Bytes) -> Result<()> {
let path = readme_path(name, version);
self.readme_upload_store.put(&path, bytes.into()).await?;
let attributes = Attributes::from_iter([
(Attribute::ContentType, CONTENT_TYPE_README),
(Attribute::CacheControl, CACHE_CONTROL_README),
]);
let opts = attributes.into();
self.store.put_opts(&path, bytes.into(), opts).await?;
Ok(())
}

#[instrument(skip(self, content))]
pub async fn sync_index(&self, name: &str, content: Option<String>) -> Result<()> {
let path = crates_io_index::Repository::relative_index_file_for_url(name).into();
if let Some(content) = content {
self.index_upload_store.put(&path, content.into()).await?;
let attributes = Attributes::from_iter([
(Attribute::ContentType, CONTENT_TYPE_INDEX),
(Attribute::CacheControl, CACHE_CONTROL_INDEX),
]);
let payload = content.into();
let opts = attributes.into();
self.index_store.put_opts(&path, payload, opts).await?;
} else {
self.index_store.delete(&path).await?;
}
Expand All @@ -278,7 +265,7 @@ impl Storage {

#[instrument(skip(self))]
pub async fn upload_db_dump(&self, target: &str, local_path: &StdPath) -> anyhow::Result<()> {
let store = self.db_dump_upload_store.clone();
let store = self.store.clone();

// Open the local tarball file
let mut local_file = File::open(local_path).await?;
Expand Down Expand Up @@ -318,15 +305,6 @@ impl Storage {
}
}

fn client_options(content_type: &str, cache_control: &'static str) -> ClientOptions {
let mut headers = HeaderMap::new();
headers.insert(CACHE_CONTROL, HeaderValue::from_static(cache_control));

ClientOptions::default()
.with_default_content_type(content_type)
.with_default_headers(headers)
}

fn build_s3(config: &S3Config, client_options: ClientOptions) -> AmazonS3 {
AmazonS3Builder::new()
.with_region(config.region.as_deref().unwrap_or(DEFAULT_REGION))
Expand Down