Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions src/client/connection.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use anyhow::{Context, Result, anyhow};
use bytes::{Buf, Bytes, BytesMut};
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tracing::{Instrument, info, warn};

use crate::client::{
constants::{
CONNECT_RESPONSE, DOWNLOAD_CONNECT_TIMEOUT, EARLY_READ_WINDOW,
CONNECT_RESPONSE, DOWNLOAD_CONNECT_TIMEOUT, EARLY_READ_WINDOW, MASTER_RESUME_WINDOW_SECS,
PROXY_AUTH_REQUIRED_RESPONSE, PROXY_REQUEST_PARSE_TIMEOUT,
},
handshake::{self, try_pq_connect},
Expand Down Expand Up @@ -119,7 +118,7 @@ async fn handle_pq_proxy(
{
let mut master_guard = state.initial_master.lock().await;
if let Some((session_id, master, created)) = master_guard.as_ref() {
if created.elapsed() < Duration::from_secs(1200 - 30) {
if crate::now_secs().saturating_sub(*created) < MASTER_RESUME_WINDOW_SECS {
let (session_id, master) = (session_id.clone(), **master);
drop(master_guard);
match try_pq_connect(
Expand Down Expand Up @@ -158,7 +157,7 @@ async fn handle_pq_proxy(
{
let master_guard = state.initial_master.lock().await;
if let Some((session_id, master, created)) = master_guard.as_ref()
&& created.elapsed() < Duration::from_secs(1200 - 30)
&& crate::now_secs().saturating_sub(*created) < MASTER_RESUME_WINDOW_SECS
{
let (session_id, master) = (session_id.clone(), **master);
drop(master_guard);
Expand Down
2 changes: 2 additions & 0 deletions src/client/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,7 @@ pub const UPLOAD_CONCURRENCY: usize = 128;

pub const DECODE_BUF_CAPACITY: usize = 16 * 1024 + 2396;

pub const MASTER_RESUME_WINDOW_SECS: u64 = 1170;

pub const PADDING_POOL: &[u8] = b"padding=XXXXXXXXXXXXXXXXXXXXXXXXXX";
pub const MIN_PADDING: usize = 16;
2 changes: 1 addition & 1 deletion src/client/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ pub async fn full_handshake(
*lock = Some((
session_id.clone(),
Zeroizing::new(*master),
std::time::Instant::now(),
crate::now_secs(),
));
}

Expand Down
3 changes: 1 addition & 2 deletions src/client/state.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::{Mutex, OnceCell};
use zeroize::Zeroizing;

use crate::bypass::BypassRules;
use crate::shaper::TrafficConfig;

pub type InitialMasterEntry = (String, Zeroizing<[u8; 32]>, Instant);
pub type InitialMasterEntry = (String, Zeroizing<[u8; 32]>, u64);

pub struct ManualResolver {
pub target_addr: String,
Expand Down
10 changes: 5 additions & 5 deletions src/dns/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ fn deserialize_upstream<'de, D: serde::Deserializer<'de>>(d: D) -> Result<Socket
#[derive(Clone)]
struct CacheEntry {
ips: Vec<IpAddr>,
created_at: Instant,
created_at: u64,
ttl: Duration,
is_refreshing: Arc<AtomicBool>,
}
Expand Down Expand Up @@ -376,14 +376,14 @@ impl DnsClient {
let key = (domain.to_string(), rtype.to_int(), ecs);

if let Some(entry) = self.cache.get(&key).await {
let elapsed = entry.created_at.elapsed();
let elapsed = crate::now_secs().saturating_sub(entry.created_at);

if elapsed < entry.ttl {
if elapsed < entry.ttl.as_secs() {
debug!("cache hit: {} {:?}, ips: {:?}", domain, rtype, entry.ips);
return Ok(entry.ips);
}

if elapsed < entry.ttl + Duration::from_secs(self.config.options.swr_ttl) {
if elapsed < entry.ttl.as_secs() + self.config.options.swr_ttl {
if entry
.is_refreshing
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
Expand Down Expand Up @@ -433,7 +433,7 @@ impl DnsClient {
.map_err(Arc::new)?;
let entry = CacheEntry {
ips: ips.clone(),
created_at: Instant::now(),
created_at: crate::now_secs(),
ttl,
is_refreshing: Arc::new(false.into()),
};
Expand Down
9 changes: 9 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
use std::{sync::LazyLock, time::Instant};

pub static START: LazyLock<Instant> = LazyLock::new(Instant::now);

#[inline(always)]
pub fn now_secs() -> u64 {
START.elapsed().as_secs()
}

pub mod bypass;
pub mod client;
pub mod config;
Expand Down
12 changes: 2 additions & 10 deletions src/server/constants.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
use std::{
sync::LazyLock,
time::{Duration, Instant},
};
use std::time::Duration;

pub const MAX_UPLOAD_BODY_SIZE: usize = 1024 * 1024;

Expand All @@ -21,9 +18,4 @@ pub const MASTER_EXPIRY: Duration = Duration::from_secs(1200);

pub const PADDING_POOL: &[u8] = b"padding=XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX";

pub static START: LazyLock<Instant> = LazyLock::new(Instant::now);

#[inline(always)]
pub fn now_secs() -> u64 {
START.elapsed().as_secs()
}
pub use crate::now_secs;
4 changes: 2 additions & 2 deletions src/server/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ async fn handle_fresh_handshake(
let session_id = uuid::Uuid::new_v4().to_string();
state.master_store.insert(
session_id.clone(),
(user.clone(), master, std::time::Instant::now()),
(user.clone(), master, crate::now_secs()),
);

info!(session_id = %session_id, user = %user, "handshake: master key derived");
Expand Down Expand Up @@ -342,7 +342,7 @@ async fn handle_pq_download(
master.copy_from_slice(&**master_z);
let username = username.clone();
let created = *created;
if created.elapsed() > MASTER_EXPIRY {
if crate::now_secs().saturating_sub(created) > MASTER_EXPIRY.as_secs() {
drop(entry);
state.master_store.remove(session_id);
return Err(ServerError::precondition_required("master key expired"));
Expand Down
4 changes: 2 additions & 2 deletions src/server/janitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use dashmap::{DashMap, DashSet};
use std::sync::Arc;
use tracing::warn;

use crate::server::constants::{JANITOR_INTERVAL, MASTER_EXPIRY, NONCE_CLEANUP_INTERVAL};
use crate::server::constants::{JANITOR_INTERVAL, MASTER_EXPIRY, NONCE_CLEANUP_INTERVAL, now_secs};
use crate::server::state::UploadStream;

pub async fn stream_janitor(streams: Arc<DashMap<String, Arc<UploadStream>>>) {
Expand Down Expand Up @@ -34,7 +34,7 @@ pub async fn master_and_nonce_janitor(
loop {
interval.tick().await;
master_store.retain(|session_id, (_, _master, created)| {
if created.elapsed() >= MASTER_EXPIRY {
if now_secs().saturating_sub(*created) >= MASTER_EXPIRY.as_secs() {
used_nonces.remove(session_id);
false
} else {
Expand Down
3 changes: 1 addition & 2 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@ use std::{
Arc,
atomic::{AtomicU64, Ordering},
},
time::Instant,
};
use tower::ServiceBuilder;
use tower_http::trace::TraceLayer;
use tracing::info;
use zeroize::Zeroizing;

pub type MasterStoreEntry = (String, Zeroizing<[u8; 32]>, Instant);
pub type MasterStoreEntry = (String, Zeroizing<[u8; 32]>, u64);

pub static NEXT_STREAM_ID: AtomicU64 = AtomicU64::new(1);

Expand Down
Loading