Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 38 additions & 26 deletions josh-proxy/src/bin/josh-proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#[macro_use]
extern crate lazy_static;

use josh_proxy::{FetchError, MetaConfig, RepoConfig, RepoUpdate};
use josh_proxy::{FetchError, MetaConfig, RemoteAuth, RepoConfig, RepoUpdate};
use opentelemetry::global;
use opentelemetry::sdk::propagation::TraceContextPropagator;
use tracing_opentelemetry::OpenTelemetrySpanExt;
Expand Down Expand Up @@ -102,12 +102,11 @@ async fn fetch_upstream(
service: Arc<JoshProxyService>,
upstream_protocol: UpstreamProtocol,
upstream_repo: String,
auth: &josh_proxy::auth::Handle,
remote_auth: &RemoteAuth,
remote_url: String,
headref: &str,
force: bool,
) -> Result<(), FetchError> {
let auth = auth.clone();
let key = remote_url.clone();

if upstream_protocol == UpstreamProtocol::Ssh {
Expand Down Expand Up @@ -173,23 +172,23 @@ async fn fetch_upstream(

let span = tracing::span!(tracing::Level::TRACE, "fetch worker");
let us = upstream_repo.clone();
let a = auth.clone();
let ru = remote_url.clone();
let permit = service.fetch_permits.acquire().await;
let task_remote_auth = remote_auth.clone();
let fetch_result = tokio::task::spawn_blocking(move || {
let _span_guard = span.enter();
josh_proxy::fetch_refs_from_url(&br_path, &us, &ru, &refs_to_fetch, &a)
josh_proxy::fetch_refs_from_url(&br_path, &us, &ru, &refs_to_fetch, &task_remote_auth)
})
.await?;

let us = upstream_repo.clone();
let s = tracing::span!(tracing::Level::TRACE, "get_head worker");
let br_path = service.repo_path.join("mirror");
let ru = remote_url.clone();
let a = auth.clone();
let task_remote_auth = remote_auth.clone();
let hres = tokio::task::spawn_blocking(move || {
let _e = s.enter();
josh_proxy::get_head(&br_path, &ru, &a)
josh_proxy::get_head(&br_path, &ru, &task_remote_auth)
})
.await?;

Expand All @@ -199,8 +198,8 @@ async fn fetch_upstream(

std::mem::drop(permit);

match fetch_result {
Ok(_) => {
match (fetch_result, remote_auth) {
(Ok(_), RemoteAuth::Http { auth }) => {
fetch_timers.write()?.insert(key, std::time::Instant::now());

let (auth_user, _) = auth.parse().map_err(FetchError::from_josh_error)?;
Expand All @@ -209,12 +208,13 @@ async fn fetch_upstream(
service
.poll
.lock()?
.insert((upstream_repo, auth, remote_url));
.insert((upstream_repo, auth.clone(), remote_url));
}

Ok(())
}
Err(_) => fetch_result,
(Ok(_), _) => Ok(()),
(Err(e), _) => Err(e),
}
}

Expand Down Expand Up @@ -444,7 +444,7 @@ async fn query_meta_repo(
meta_repo: &str,
upstream_protocol: UpstreamProtocol,
upstream_repo: &str,
auth: &josh_proxy::auth::Handle,
remote_auth: &RemoteAuth,
) -> josh::JoshResult<josh_proxy::MetaConfig> {
let upstream = serv
.upstream
Expand All @@ -455,7 +455,7 @@ async fn query_meta_repo(
serv.clone(),
upstream_protocol,
meta_repo.to_owned(),
&auth,
&remote_auth,
remote_url.to_owned(),
&"HEAD",
false,
Expand Down Expand Up @@ -510,26 +510,35 @@ async fn query_meta_repo(

async fn make_meta_config(
serv: Arc<JoshProxyService>,
auth: Option<&josh_proxy::auth::Handle>,
remote_auth: Option<&RemoteAuth>,
upstream_protocol: UpstreamProtocol,
parsed_url: &FilteredRepoUrl,
) -> josh::JoshResult<MetaConfig> {
let meta_repo = std::env::var("JOSH_META_REPO");
let auth_token = std::env::var("JOSH_META_AUTH_TOKEN");

match (auth, meta_repo) {
match (remote_auth, meta_repo) {
(None, _) | (_, Err(_)) => Ok(MetaConfig {
config: RepoConfig {
repo: parsed_url.upstream_repo.clone(),
..Default::default()
},
..Default::default()
}),
(Some(auth), Ok(meta_repo)) => {
let auth = if let Ok(token) = auth_token {
josh_proxy::auth::add_auth(&token)?
} else {
auth.clone()
(Some(remote_auth), Ok(meta_repo)) => {
let auth = match remote_auth {
RemoteAuth::Ssh { auth_socket } => RemoteAuth::Ssh {
auth_socket: auth_socket.clone(),
},
RemoteAuth::Http { auth } => {
let auth = if let Ok(token) = auth_token {
josh_proxy::auth::add_auth(&token)?
} else {
auth.clone()
};

RemoteAuth::Http { auth }
}
};

query_meta_repo(
Expand Down Expand Up @@ -847,9 +856,10 @@ async fn call_service(
}
};

let remote_auth = RemoteAuth::Http { auth: auth.clone() };
let meta = make_meta_config(
serv.clone(),
Some(&auth),
Some(&remote_auth),
UpstreamProtocol::Http,
&parsed_url,
)
Expand Down Expand Up @@ -928,7 +938,7 @@ async fn call_service(
serv.clone(),
UpstreamProtocol::Http,
meta.config.repo.to_owned(),
&auth,
&remote_auth,
remote_url.to_owned(),
&headref,
false,
Expand Down Expand Up @@ -989,7 +999,7 @@ async fn call_service(
let repo_update = josh_proxy::RepoUpdate {
refs: HashMap::new(),
remote_url: remote_url.clone(),
auth,
remote_auth,
port: serv.port.clone(),
filter_spec: josh::filter::spec(filter),
base_ns: josh::to_ns(&meta.config.repo),
Expand Down Expand Up @@ -1236,11 +1246,12 @@ async fn run_polling(serv: Arc<JoshProxyService>) -> josh::JoshResult<()> {
let polls = serv.poll.lock()?.clone();

for (upstream_repo, auth, url) in polls {
let remote_auth = RemoteAuth::Http { auth };
let fetch_result = fetch_upstream(
serv.clone(),
UpstreamProtocol::Http,
upstream_repo.clone(),
&auth,
&remote_auth,
url.clone(),
"",
true,
Expand Down Expand Up @@ -1374,6 +1385,7 @@ async fn serve_graphql(
false,
));

let remote_auth = RemoteAuth::Http { auth };
let res = {
// First attempt to serve GraphQL query. If we can serve it
// that means all requested revisions were specified by SHA and we could find
Expand All @@ -1389,7 +1401,7 @@ async fn serve_graphql(
serv.clone(),
UpstreamProtocol::Http,
upstream_repo.to_owned(),
&auth,
&remote_auth,
remote_url.to_owned(),
&"HEAD",
false,
Expand Down Expand Up @@ -1452,7 +1464,7 @@ async fn serve_graphql(
*oid,
&reference,
&remote_url,
&auth,
&remote_auth,
&temp_ns.name(),
"META_PUSH",
false,
Expand Down
Loading