Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions josh-proxy/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ impl std::fmt::Debug for Handle {
}

impl Handle {
// Returns a pair: (username, password)
pub fn parse(&self) -> josh::JoshResult<(String, String)> {
let line = josh::some_or!(
AUTH.lock()
Expand Down
94 changes: 53 additions & 41 deletions josh-proxy/src/bin/josh-proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#[macro_use]
extern crate lazy_static;

use josh_proxy::{MetaConfig, RepoConfig, RepoUpdate};
use josh_proxy::{FetchError, MetaConfig, RepoConfig, RepoUpdate};
use opentelemetry::global;
use opentelemetry::sdk::propagation::TraceContextPropagator;
use tracing_opentelemetry::OpenTelemetrySpanExt;
Expand Down Expand Up @@ -77,7 +77,7 @@ async fn fetch_upstream(
remote_url: String,
headref: &str,
force: bool,
) -> josh::JoshResult<bool> {
) -> Result<(), FetchError> {
let auth = auth.clone();
let key = remote_url.clone();

Expand Down Expand Up @@ -118,7 +118,7 @@ async fn fetch_upstream(
tracing::trace!("fetch_cached_ok {:?}", fetch_cached_ok);

if fetch_cached_ok && headref.is_empty() {
return Ok(true);
return Ok(());
}

if fetch_cached_ok && !headref.is_empty() {
Expand All @@ -128,27 +128,28 @@ async fn fetch_upstream(
"refs/josh/upstream/{}/",
&josh::to_ns(&upstream_repo),
)),
)?;
)
.map_err(FetchError::from_josh_error)?;
let id = transaction
.repo()
.refname_to_id(&transaction.refname(headref));
tracing::trace!("refname_to_id: {:?}", id);
if id.is_ok() {
return Ok(true);
return Ok(());
}
}

let fetch_timers = service.fetch_timers.clone();
let heads_map = service.heads_map.clone();
let br_path = service.repo_path.join("mirror");

let s = tracing::span!(tracing::Level::TRACE, "fetch worker");
let span = tracing::span!(tracing::Level::TRACE, "fetch worker");
let us = upstream_repo.clone();
let a = auth.clone();
let ru = remote_url.clone();
let permit = service.fetch_permits.acquire().await;
let res = tokio::task::spawn_blocking(move || {
let _e = s.enter();
let fetch_result = tokio::task::spawn_blocking(move || {
let _span_guard = span.enter();
josh_proxy::fetch_refs_from_url(&br_path, &us, &ru, &refs_to_fetch, &a)
})
.await?;
Expand All @@ -170,20 +171,24 @@ async fn fetch_upstream(

std::mem::drop(permit);

if let Ok(res) = res {
if res {
match fetch_result {
Ok(_) => {
fetch_timers.write()?.insert(key, std::time::Instant::now());

if ARGS.get_one::<String>("poll").map(|v| v.as_str()) == Some(&auth.parse()?.0) {
let poll_user = ARGS.get_one::<String>("poll");
let (auth_user, _) = auth.parse().map_err(FetchError::from_josh_error)?;

if matches!(poll_user, Some(user) if auth_user == user.as_str()) {
service
.poll
.lock()?
.insert((upstream_repo, auth, remote_url));
}

Ok(())
}
return Ok(res);
Err(_) => fetch_result,
}
res
}

async fn static_paths(
Expand Down Expand Up @@ -422,8 +427,9 @@ async fn query_meta_repo(
.in_current_span()
.await
{
Ok(true) => {}
_ => return Err(josh::josh_error("meta fetch failed")),
Ok(_) => {}
Err(FetchError::AuthRequired) => return Err(josh_error("meta fetch: auth failed")),
Err(FetchError::Other(e)) => return Err(josh_error(&format!("meta fetch failed: {}", e))),
}

let transaction = josh::cache::Transaction::open(
Expand Down Expand Up @@ -865,20 +871,19 @@ async fn call_service(
.in_current_span()
.await
{
Ok(res) => {
if !res {
let builder = Response::builder()
.header(
hyper::header::WWW_AUTHENTICATE,
"Basic realm=User Visible Realm",
)
.status(hyper::StatusCode::UNAUTHORIZED);
return Ok(builder.body(hyper::Body::empty())?);
}
Ok(_) => {}
Err(FetchError::AuthRequired) => {
let builder = Response::builder()
.header(
hyper::header::WWW_AUTHENTICATE,
"Basic realm=User Visible Realm",
)
.status(hyper::StatusCode::UNAUTHORIZED);
return Ok(builder.body(hyper::Body::empty())?);
}
Err(res) => {
Err(FetchError::Other(e)) => {
let builder = Response::builder().status(hyper::StatusCode::INTERNAL_SERVER_ERROR);
return Ok(builder.body(hyper::Body::from(res.0))?);
return Ok(builder.body(hyper::Body::from(e.0))?);
}
}

Expand Down Expand Up @@ -1175,7 +1180,7 @@ async fn run_polling(serv: Arc<JoshProxyService>) -> josh::JoshResult<()> {
let polls = serv.poll.lock()?.clone();

for (upstream_repo, auth, url) in polls {
fetch_upstream(
let fetch_result = fetch_upstream(
serv.clone(),
upstream_repo.clone(),
&auth,
Expand All @@ -1184,7 +1189,15 @@ async fn run_polling(serv: Arc<JoshProxyService>) -> josh::JoshResult<()> {
true,
)
.in_current_span()
.await?;
.await;

match fetch_result {
Ok(()) => {}
Err(FetchError::Other(e)) => return Err(e),
Err(FetchError::AuthRequired) => {
return Err(josh_error("auth: access denied while polling"))
}
}
}
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
}
Expand Down Expand Up @@ -1383,21 +1396,20 @@ async fn serve_graphql(
.in_current_span()
.await
{
Ok(res) => {
if !res {
let builder = Response::builder()
.header(
hyper::header::WWW_AUTHENTICATE,
"Basic realm=User Visible Realm",
)
.status(hyper::StatusCode::UNAUTHORIZED);
return Ok(builder.body(hyper::Body::empty())?);
}
Ok(_) => {}
Err(FetchError::AuthRequired) => {
let builder = Response::builder()
.header(
hyper::header::WWW_AUTHENTICATE,
"Basic realm=User Visible Realm",
)
.status(hyper::StatusCode::UNAUTHORIZED);
return Ok(builder.body(hyper::Body::empty())?);
}
Err(res) => {
Err(FetchError::Other(e)) => {
let builder =
Response::builder().status(hyper::StatusCode::INTERNAL_SERVER_ERROR);
return Ok(builder.body(hyper::Body::from(res.0))?);
return Ok(builder.body(hyper::Body::from(e.0))?);
}
};

Expand Down
39 changes: 33 additions & 6 deletions josh-proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod juniper_hyper;
#[macro_use]
extern crate lazy_static;

use josh::JoshError;
use std::path::PathBuf;

#[derive(PartialEq)]
Expand Down Expand Up @@ -546,13 +547,33 @@ pub fn get_head(
Ok(head)
}

pub enum FetchError {
AuthRequired,
Other(JoshError),
}

impl<T> From<T> for FetchError
where
T: std::error::Error,
{
fn from(e: T) -> Self {
FetchError::Other(JoshError::from(e))
}
}

impl FetchError {
pub fn from_josh_error(e: JoshError) -> Self {
FetchError::Other(e)
}
}

pub fn fetch_refs_from_url(
path: &std::path::Path,
upstream_repo: &str,
url: &str,
refs_prefixes: &[String],
auth: &auth::Handle,
) -> josh::JoshResult<bool> {
) -> Result<(), FetchError> {
let specs: Vec<_> = refs_prefixes
.iter()
.map(|r| {
Expand All @@ -572,25 +593,31 @@ pub fn fetch_refs_from_url(
let cmd = format!("git fetch --prune --no-tags {} {}", &url, &specs.join(" "));
tracing::info!("fetch_refs_from_url {:?} {:?} {:?}", cmd, path, "");

let (username, password) = auth.parse()?;
let (username, password) = auth.parse().map_err(FetchError::from_josh_error)?;
let (_stdout, stderr, _) = shell.command_env(
&cmd,
&[],
&[("GIT_PASSWORD", &password), ("GIT_USER", &username)],
);
tracing::debug!("fetch_refs_from_url done {:?} {:?} {:?}", cmd, path, stderr);
if stderr.contains("fatal: Authentication failed") {
return Ok(false);
return Err(FetchError::AuthRequired);
}
if stderr.contains("fatal:") {
tracing::error!("{:?}", stderr);
return Err(josh::josh_error(&format!("git error: {:?}", stderr)));
return Err(FetchError::Other(josh::josh_error(&format!(
"git error: {:?}",
stderr
))));
}
if stderr.contains("error:") {
tracing::error!("{:?}", stderr);
return Err(josh::josh_error(&format!("git error: {:?}", stderr)));
return Err(FetchError::Other(
josh::josh_error(&format!("git error: {:?}", stderr)).into(),
));
}
Ok(true)

Ok(())
}

pub struct TmpGitNamespace {
Expand Down