Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 85 additions & 30 deletions src/domain_fronter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ pub enum FronterError {
}

type PooledStream = TlsStream<TcpStream>;
const POOL_TTL_SECS: u64 = 45;
const POOL_TTL_SECS: u64 = 60;
const POOL_MIN: usize = 8;
const POOL_REFILL_INTERVAL_SECS: u64 = 5;
const POOL_MAX: usize = 80;
const REQUEST_TIMEOUT_SECS: u64 = 25;
const RANGE_PARALLEL_CHUNK_BYTES: u64 = 256 * 1024;
Expand Down Expand Up @@ -644,33 +646,31 @@ impl DomainFronter {
Ok(tls)
}

/// Open `n` outbound TLS connections in parallel and park them in the
/// pool so the first few user requests don't pay the handshake cost.
/// Errors are logged but not returned — best-effort.
/// Open `n` outbound TLS connections sequentially (500 ms apart) and
/// park them in the pool. Staggered so we don't burst N TLS handshakes
/// at Google edge simultaneously, and each connection gets an 8 s
/// expiry offset so they roll off gradually instead of all hitting
/// POOL_TTL_SECS at once.
pub async fn warm(self: &Arc<Self>, n: usize) {
let mut set = tokio::task::JoinSet::new();
for _ in 0..n {
let me = self.clone();
set.spawn(async move {
match me.open().await {
Ok(s) => Some(PoolEntry {
let mut warmed = 0usize;
for i in 0..n {
if i > 0 {
tokio::time::sleep(Duration::from_millis(500)).await;
}
match self.open().await {
Ok(s) => {
let entry = PoolEntry {
stream: s,
created: Instant::now(),
}),
Err(e) => {
tracing::debug!("pool warm: open failed: {}", e);
None
created: Instant::now() - Duration::from_secs(8 * i as u64),
};
let mut pool = self.pool.lock().await;
if pool.len() < POOL_MAX {
pool.push(entry);
warmed += 1;
}
}
});
}
let mut warmed = 0;
while let Some(res) = set.join_next().await {
if let Ok(Some(entry)) = res {
let mut pool = self.pool.lock().await;
if pool.len() < POOL_MAX {
pool.push(entry);
warmed += 1;
Err(e) => {
tracing::debug!("pool warm: open failed: {}", e);
}
}
}
Expand All @@ -679,6 +679,56 @@ impl DomainFronter {
}
}

/// Background loop that keeps at least `POOL_MIN` valid connections
/// ready. A connection only counts toward the minimum if it has at
/// least 20 s of TTL remaining — nearly-expired entries don't help.
/// Checks every `POOL_REFILL_INTERVAL_SECS`, evicts expired entries,
/// and opens replacements one at a time so there's no burst.
pub async fn run_pool_refill(self: Arc<Self>) {
const MIN_REMAINING_SECS: u64 = 20;
loop {
tokio::time::sleep(Duration::from_secs(POOL_REFILL_INTERVAL_SECS)).await;

// Evict expired entries first.
{
let mut pool = self.pool.lock().await;
pool.retain(|e| e.created.elapsed().as_secs() < POOL_TTL_SECS);
}

// Count only connections with enough life left.
// Refill one at a time to avoid bursting TLS handshakes.
loop {
let healthy = {
let pool = self.pool.lock().await;
pool.iter()
.filter(|e| {
let age = e.created.elapsed().as_secs();
age + MIN_REMAINING_SECS < POOL_TTL_SECS
})
.count()
};
if healthy >= POOL_MIN {
break;
}
match self.open().await {
Ok(s) => {
let mut pool = self.pool.lock().await;
if pool.len() < POOL_MAX {
pool.push(PoolEntry {
stream: s,
created: Instant::now(),
});
}
}
Err(e) => {
tracing::debug!("pool refill: open failed: {}", e);
break;
}
}
}
}
}

/// Keep the Apps Script container warm with a periodic HEAD ping.
///
/// `acquire()` keeps the *TCP/TLS pool* warm but does nothing for the
Expand Down Expand Up @@ -721,12 +771,17 @@ impl DomainFronter {
async fn acquire(&self) -> Result<PoolEntry, FronterError> {
{
let mut pool = self.pool.lock().await;
while let Some(entry) = pool.pop() {
if entry.created.elapsed().as_secs() < POOL_TTL_SECS {
return Ok(entry);
}
// expired — drop it
drop(entry);
// Evict expired, then hand out the freshest (most remaining TTL).
pool.retain(|e| e.created.elapsed().as_secs() < POOL_TTL_SECS);
if !pool.is_empty() {
// Freshest = smallest elapsed time. swap_remove is O(1).
let freshest = pool
.iter()
.enumerate()
.min_by_key(|(_, e)| e.created.elapsed())
.map(|(i, _)| i)
.unwrap();
return Ok(pool.swap_remove(freshest));
}
}
let stream = self.open().await?;
Expand Down
11 changes: 11 additions & 0 deletions src/proxy_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,16 @@ impl ProxyServer {
tokio::spawn(async move { std::future::pending::<()>().await })
};

// Background pool refill: keeps at least POOL_MIN ready TLS
// connections so acquire() never pays a cold handshake.
let refill_task = if let Some(refill_fronter) = self.fronter.clone() {
tokio::spawn(async move {
refill_fronter.run_pool_refill().await;
})
} else {
tokio::spawn(async move { std::future::pending::<()>().await })
};

let stats_task = if let Some(stats_fronter) = self.fronter.clone() {
tokio::spawn(async move {
let mut ticker = tokio::time::interval(std::time::Duration::from_secs(60));
Expand Down Expand Up @@ -697,6 +707,7 @@ impl ProxyServer {
tracing::info!("Shutdown signal received, stopping listeners");
stats_task.abort();
keepalive_task.abort();
refill_task.abort();
http_task.abort();
socks_task.abort();
}
Expand Down
21 changes: 10 additions & 11 deletions src/tunnel_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,16 @@ const CLIENT_FIRST_DATA_WAIT: Duration = Duration::from_millis(50);
/// step for more ops. Resets on every arrival, up to max from the first
/// op. Overridable via config `coalesce_step_ms` / `coalesce_max_ms`.
///
/// 10 ms is enough to catch ops that arrive in the same event-loop tick
/// (e.g. a browser opening 6 parallel connections) without adding
/// perceptible latency to downloads where the tunnel-node reply — not
/// coalescing — is the real bottleneck. When both sides *do* have data
/// in flight (uploads, bursty page loads), the adaptive reset still
/// packs batches efficiently: each arriving op resets the step timer, so
/// a rapid burst naturally coalesces up to `DEFAULT_COALESCE_MAX_MS`
/// without an explicit upload/download distinction. The net effect is
/// "don't wait when there's nothing to wait for; batch aggressively when
/// there is."
const DEFAULT_COALESCE_STEP_MS: u64 = 10;
/// 200 ms balances latency against batching efficiency. The dominant
/// bottleneck is the Apps Script round-trip (~1.5 s), so the extra
/// 200 ms wait is negligible to the user but lets significantly more
/// ops land in each batch — a page load that would fire 10 separate
/// 1-op batches at 10 ms now packs 3–5 ops per batch, cutting the
/// number of round-trips roughly in half. On idle sessions the step
/// timer fires once with nothing queued (no cost); under load each
/// arriving op resets the timer, so rapid bursts still coalesce up to
/// `DEFAULT_COALESCE_MAX_MS` naturally.
const DEFAULT_COALESCE_STEP_MS: u64 = 200;
const DEFAULT_COALESCE_MAX_MS: u64 = 1000;

/// Structured error code the tunnel-node returns when it doesn't know the
Expand Down
Loading