Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(fetcher): support socks5 proxy #15

Merged
merged 1 commit into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rsync-fetcher/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ sqlx = { version = "0.7", features = ["runtime-tokio-rustls", "postgres", "chron
tap = "1.0"
tempfile = "3.3"
tokio = { version = "1.25", features = ["full"] }
tokio-socks = "0.5"
tokio-util = { version = "0.7", features = ["compat"] }
tracing = "0.1"
unix_mode = "0.1"
Expand Down
57 changes: 53 additions & 4 deletions rsync-fetcher/src/rsync.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use eyre::{Context, ContextCompat, Result};
use eyre::{bail, Context, ContextCompat, Result};
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio_socks::tcp::Socks5Stream;
use tracing::info;
use url::Url;

use crate::rsync::downloader::Downloader;
Expand Down Expand Up @@ -36,19 +38,66 @@ pub struct TaskBuilders {
pub progress: ProgressDisplay,
}

async fn connect_with_proxy(target: &str) -> Result<TcpStream> {
let proxy = std::env::var("SOCKS5_PROXY")
.ok()
.and_then(|s| (!s.is_empty()).then_some(s))
.or_else(|| {
std::env::var("socks5_proxy")
.ok()
.and_then(|s| (!s.is_empty()).then_some(s))
});

if let Some(proxy) = proxy {
let proxy = Url::parse(&proxy).context("invalid proxy URL")?;
if proxy.scheme().to_lowercase() != "socks5" {
bail!("unsupported proxy scheme: {}", proxy.scheme());
}
let proxy_addr = proxy.host_str().context("missing proxy host")?;
let proxy_port = proxy.port().unwrap_or(1080);
let proxy_username = proxy.username();
let proxy_password = proxy.password().unwrap_or_default();

let stream = if proxy_username.is_empty() {
info!("connecting to {} via SOCKS5 proxy {}", target, proxy);
Socks5Stream::connect((proxy_addr, proxy_port), target)
.await
.context("proxy or rsync server refused connection. Are they running?")?
} else {
info!(
"connecting to {} via SOCKS5 proxy {} as {}",
target, proxy, proxy_username
);
Socks5Stream::connect_with_password(
(proxy_addr, proxy_port),
target,
proxy_username,
proxy_password,
)
.await
.context("proxy or rsync server refused connection. Are they running?")?
};

Ok(stream.into_inner())
} else {
TcpStream::connect(target)
.await
.context("rsync server refused connection. Is it running?")
}
}

pub async fn start_handshake(url: &Url) -> Result<HandshakeConn> {
let port = url.port().unwrap_or(873);
let path = url.path().trim_start_matches('/');
let auth = Auth::from_url_and_env(url);
let module = path.split('/').next().context("empty remote path")?;

let stream = TcpStream::connect(format!(
let stream = connect_with_proxy(&format!(
"{}:{}",
url.host_str().context("missing remote host")?,
port
))
.await
.context("rsync server refused connection. Is it running?")?;
.await?;

let mut handshake = HandshakeConn::new(stream);
handshake.start_inband_exchange(module, path, auth).await?;
Expand Down
Loading