Skip to content

Commit

Permalink
chore: add full auth into http client (#5857)
Browse files Browse the repository at this point in the history
### Description

This PR stores `team_id` and `team_slug` in the http cache so they no
longer need to get plumbed through to any task cache method that might
hit the api. This does limit each `HttpCache` instance to use with a
single team, but we already were doing this with the
`ArtifactSignatureAuthenticator` if remote cache signature was enabled.
This is still more flexible than Go where each api client instance is
configured for a specific team.

The PR can be reviewed by-commit, to break up the moving of `team_id`
and `team_slug`

### Testing Instructions

`rustc`


Closes TURBO-1290

---------

Co-authored-by: Chris Olszewski <Chris Olszewski>
  • Loading branch information
chris-olszewski committed Sep 5, 2023
1 parent 14ad05a commit 0d899ad
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 45 deletions.
32 changes: 14 additions & 18 deletions crates/turborepo-cache/src/async_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,12 @@ impl AsyncCache {
&self,
anchor: &AbsoluteSystemPath,
key: &str,
team_id: &str,
team_slug: Option<&str>,
) -> Result<(CacheResponse, Vec<AnchoredSystemPathBuf>), CacheError> {
self.real_cache.fetch(anchor, key, team_id, team_slug).await
self.real_cache.fetch(anchor, key).await
}

pub async fn exists(
&mut self,
key: &str,
team_id: &str,
team_slug: Option<&str>,
) -> Result<CacheResponse, CacheError> {
self.real_cache.exists(key, team_id, team_slug).await
pub async fn exists(&mut self, key: &str) -> Result<CacheResponse, CacheError> {
self.real_cache.exists(key).await
}

// Used for testing to ensure that the workers resolve
Expand Down Expand Up @@ -140,11 +133,12 @@ mod tests {
let api_auth = Some(APIAuth {
team_id: "my-team-id".to_string(),
token: "my-token".to_string(),
team_slug: None,
});
let mut async_cache = AsyncCache::new(&opts, &repo_root_path, api_client, api_auth)?;

// Ensure that the cache is empty
let response = async_cache.exists(&hash, "my-team-id", None).await;
let response = async_cache.exists(&hash).await;

assert_matches!(response, Err(CacheError::CacheMiss));

Expand All @@ -171,7 +165,7 @@ mod tests {
// Confirm that fs cache file does *not* exist
assert!(!fs_cache_path.exists());

let response = async_cache.exists(&hash, "my-team-id", None).await?;
let response = async_cache.exists(&hash).await?;

// Confirm that we fetch from remote cache and not local.
assert_eq!(
Expand Down Expand Up @@ -209,11 +203,12 @@ mod tests {
let api_auth = Some(APIAuth {
team_id: "my-team-id".to_string(),
token: "my-token".to_string(),
team_slug: None,
});
let mut async_cache = AsyncCache::new(&opts, &repo_root_path, api_client, api_auth)?;

// Ensure that the cache is empty
let response = async_cache.exists(&hash, "my-team-id", None).await;
let response = async_cache.exists(&hash).await;

assert_matches!(response, Err(CacheError::CacheMiss));

Expand All @@ -240,7 +235,7 @@ mod tests {
// Confirm that fs cache file exists
assert!(fs_cache_path.exists());

let response = async_cache.exists(&hash, "my-team-id", None).await?;
let response = async_cache.exists(&hash).await?;

// Confirm that we fetch from local cache first.
assert_eq!(
Expand All @@ -254,7 +249,7 @@ mod tests {
// Remove fs cache file
fs_cache_path.remove_file()?;

let response = async_cache.exists(&hash, "my-team-id", None).await;
let response = async_cache.exists(&hash).await;

// Confirm that we get a cache miss
assert_matches!(response, Err(CacheError::CacheMiss));
Expand Down Expand Up @@ -284,11 +279,12 @@ mod tests {
let api_auth = Some(APIAuth {
team_id: "my-team-id".to_string(),
token: "my-token".to_string(),
team_slug: None,
});
let mut async_cache = AsyncCache::new(&opts, &repo_root_path, api_client, api_auth)?;

// Ensure that the cache is empty
let response = async_cache.exists(&hash, "my-team-id", None).await;
let response = async_cache.exists(&hash).await;

assert_matches!(response, Err(CacheError::CacheMiss));

Expand All @@ -315,7 +311,7 @@ mod tests {
// Confirm that fs cache file exists
assert!(fs_cache_path.exists());

let response = async_cache.exists(&hash, "my-team-id", None).await?;
let response = async_cache.exists(&hash).await?;

// Confirm that we fetch from local cache first.
assert_eq!(
Expand All @@ -329,7 +325,7 @@ mod tests {
// Remove fs cache file
fs_cache_path.remove_file()?;

let response = async_cache.exists(&hash, "my-team-id", None).await?;
let response = async_cache.exists(&hash).await?;

// Confirm that we still can fetch from remote cache
assert_eq!(
Expand Down
30 changes: 17 additions & 13 deletions crates/turborepo-cache/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@ pub struct HTTPCache {
signer_verifier: Option<ArtifactSignatureAuthenticator>,
repo_root: AbsoluteSystemPathBuf,
token: String,
team_id: String,
team_slug: Option<String>,
}

pub struct APIAuth {
pub team_id: String,
pub token: String,
pub team_slug: Option<String>,
}

impl HTTPCache {
Expand All @@ -40,12 +43,19 @@ impl HTTPCache {
} else {
None
};
let APIAuth {
team_id,
token,
team_slug,
} = api_auth;

HTTPCache {
client,
signer_verifier,
repo_root,
token: api_auth.token,
token,
team_id,
team_slug,
}
}

Expand Down Expand Up @@ -86,15 +96,10 @@ impl HTTPCache {
Ok(())
}

pub async fn exists(
&self,
hash: &str,
team_id: &str,
team_slug: Option<&str>,
) -> Result<CacheResponse, CacheError> {
pub async fn exists(&self, hash: &str) -> Result<CacheResponse, CacheError> {
let response = self
.client
.artifact_exists(hash, &self.token, team_id, team_slug)
.artifact_exists(hash, &self.token, &self.team_id, self.team_slug.as_deref())
.await?;

let duration = Self::get_duration_from_response(&response)?;
Expand Down Expand Up @@ -122,12 +127,10 @@ impl HTTPCache {
pub async fn fetch(
&self,
hash: &str,
team_id: &str,
team_slug: Option<&str>,
) -> Result<(CacheResponse, Vec<AnchoredSystemPathBuf>), CacheError> {
let response = self
.client
.fetch_artifact(hash, &self.token, team_id, team_slug)
.fetch_artifact(hash, &self.token, &self.team_id, self.team_slug.as_deref())
.await?;

let duration = Self::get_duration_from_response(&response)?;
Expand Down Expand Up @@ -232,6 +235,7 @@ mod test {
let api_auth = APIAuth {
team_id: "my-team".to_string(),
token: "my-token".to_string(),
team_slug: None,
};

let cache = HTTPCache::new(api_client, &opts, repo_root_path.to_owned(), api_auth);
Expand All @@ -241,12 +245,12 @@ mod test {
.put(&repo_root_path, hash, &anchored_files, duration)
.await?;

let cache_response = cache.exists(hash, "", None).await?;
let cache_response = cache.exists(hash).await?;

assert_eq!(cache_response.time_saved, duration);
assert_eq!(cache_response.source, CacheSource::Remote);

let (cache_response, received_files) = cache.fetch(hash, "", None).await?;
let (cache_response, received_files) = cache.fetch(hash).await?;
assert_eq!(cache_response.time_saved, duration);

for (test_file, received_file) in files.iter().zip(received_files) {
Expand Down
13 changes: 3 additions & 10 deletions crates/turborepo-cache/src/multiplexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,6 @@ impl CacheMultiplexer {
&self,
anchor: &AbsoluteSystemPath,
key: &str,
team_id: &str,
team_slug: Option<&str>,
) -> Result<(CacheResponse, Vec<AnchoredSystemPathBuf>), CacheError> {
if let Some(fs) = &self.fs {
if let Ok(cache_response) = fs.fetch(anchor, key) {
Expand All @@ -110,7 +108,7 @@ impl CacheMultiplexer {
}

if let Some(http) = self.get_http_cache() {
if let Ok((cache_response, files)) = http.fetch(key, team_id, team_slug).await {
if let Ok((cache_response, files)) = http.fetch(key).await {
// Store this into fs cache. We can ignore errors here because we know
// we have previously successfully stored in HTTP cache, and so the overall
// result is a success at fetching. Storing in lower-priority caches is an
Expand All @@ -126,12 +124,7 @@ impl CacheMultiplexer {
Err(CacheError::CacheMiss)
}

pub async fn exists(
&self,
key: &str,
team_id: &str,
team_slug: Option<&str>,
) -> Result<CacheResponse, CacheError> {
pub async fn exists(&self, key: &str) -> Result<CacheResponse, CacheError> {
if let Some(fs) = &self.fs {
match fs.exists(key) {
Ok(cache_response) => {
Expand All @@ -142,7 +135,7 @@ impl CacheMultiplexer {
}

if let Some(http) = self.get_http_cache() {
match http.exists(key, team_id, team_slug).await {
match http.exists(key).await {
Ok(cache_response) => {
return Ok(cache_response);
}
Expand Down
4 changes: 1 addition & 3 deletions crates/turborepo-lib/src/run/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,6 @@ impl TaskCache {

pub async fn restore_outputs(
&mut self,
team_id: &str,
team_slug: Option<&str>,
prefixed_ui: &mut PrefixedUI<impl Write>,
) -> Result<CacheResponse, anyhow::Error> {
if self.caching_disabled || self.run_cache.reads_disabled {
Expand Down Expand Up @@ -219,7 +217,7 @@ impl TaskCache {
let (cache_status, restored_files) = self
.run_cache
.cache
.fetch(&self.run_cache.repo_root, &self.hash, team_id, team_slug)
.fetch(&self.run_cache.repo_root, &self.hash)
.await
.map_err(|err| {
if matches!(err, CacheError::CacheMiss) {
Expand Down
5 changes: 4 additions & 1 deletion crates/turborepo-lib/src/run/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,16 @@ impl Run {
vec![],
)?;

let team_id = self.base.repo_config()?.team_id();
let repo_config = self.base.repo_config()?;
let team_id = repo_config.team_id();
let team_slug = repo_config.team_slug();

let token = self.base.user_config()?.token();

let api_auth = team_id.zip(token).map(|(team_id, token)| APIAuth {
team_id: team_id.to_string(),
token: token.to_string(),
team_slug: team_slug.map(|s| s.to_string()),
});

let async_cache = AsyncCache::new(
Expand Down

0 comments on commit 0d899ad

Please sign in to comment.