From f9d9296f15bd092f414bb3c8d1eb96808ac5f3a0 Mon Sep 17 00:00:00 2001 From: Lethe Lee Date: Fri, 16 Jan 2026 13:22:46 +0000 Subject: [PATCH] fix: issue #135 --- rattan-core/src/cells/external.rs | 10 +++++----- rattan-core/src/radix/mod.rs | 17 ++++++++++++++--- rattan-log/src/log_entry/entry/flow_entry.rs | 6 +++--- rattan-log/src/logger/mmap.rs | 11 +++++------ rattan-log/src/logger/mod.rs | 4 ++-- rattan-log/src/logger/pcap.rs | 8 +++----- rattan-log/src/logger/reader.rs | 9 ++++++++- rattan-log/src/logger/writer.rs | 4 ++-- 8 files changed, 42 insertions(+), 27 deletions(-) diff --git a/rattan-core/src/cells/external.rs b/rattan-core/src/cells/external.rs index 85a4d5c3..6536a817 100644 --- a/rattan-core/src/cells/external.rs +++ b/rattan-core/src/cells/external.rs @@ -193,7 +193,7 @@ impl FlowMap { } } - fn get_id(&self, desc: FlowDesc, log_tx: &UnboundedSender, base_ts: i64) -> u32 { + fn get_id(&self, desc: FlowDesc, log_tx: &UnboundedSender) -> u32 { { let map = self.map.read(); if let Some(meta) = map.get(&desc) { @@ -204,7 +204,7 @@ impl FlowMap { let id = self.id.fetch_add(1, std::sync::atomic::Ordering::SeqCst); map.insert(desc.clone(), id); - let op = RattanLogOp::Flow(id, base_ts, desc); + let op = RattanLogOp::Flow(id, BASE_TS.1, desc); if log_tx.send(op).is_err() { cnt_log_op_error(); } @@ -285,7 +285,7 @@ where // Avoid doing so when packet log is not enabled. if let (Some(&log_mode), Some(log_tx)) = (packet_log_mode, log_tx.as_ref()) { if let Some(desc) = packet.flow_desc() { - let id = flow_map.get_id(desc, log_tx, base_ts); + let id = flow_map.get_id(desc, log_tx); packet.set_flow_id(id); } log_packet(log_tx, &packet, PktAction::Recv, base_ts, log_mode); @@ -435,7 +435,7 @@ fn log_packet( // tracing::debug!(target: "veth::egress::packet_log", "At {} veth {} recv pkt len {} desc {}", ts, id, p.length(), p.desc()); } -fn get_clock_ns() -> i64 { +pub fn get_clock_ns() -> i64 { nix::time::clock_gettime(nix::time::ClockId::CLOCK_MONOTONIC) .map(|ts| ts.tv_sec() * 1_000_000_000 + ts.tv_nsec()) .unwrap_or(0) @@ -494,7 +494,7 @@ where let driver = D::bind_cell(cell.clone())?; let dev_senders = driver.iter().map(|d| d.sender()).collect(); let log_tx = LOGGING_TX.get().cloned(); - let base_ts = *BASE_TS.get_or_init(get_clock_ns); + let base_ts = BASE_TS.0; Ok(Self { _cell: cell, ingress: Arc::new(VirtualEthernetIngress::new( diff --git a/rattan-core/src/radix/mod.rs b/rattan-core/src/radix/mod.rs index 5cf5abeb..413ade14 100644 --- a/rattan-core/src/radix/mod.rs +++ b/rattan-core/src/radix/mod.rs @@ -2,10 +2,11 @@ use std::{ net::IpAddr, sync::{mpsc, Arc}, thread, + time::{SystemTime, UNIX_EPOCH}, }; use backon::{BlockingRetryable, ExponentialBuilder}; -use once_cell::sync::OnceCell; +use once_cell::sync::{Lazy, OnceCell}; use rattan_log::{file_logging_thread, RattanLogOp, LOGGING_TX}; use tokio::runtime::Runtime; use tokio_util::sync::CancellationToken; @@ -17,7 +18,7 @@ use tracing::{debug, error, info, span, warn, Level}; use crate::{ cells::{ - external::{VirtualEthernet, VirtualEthernetId}, + external::{get_clock_ns, VirtualEthernet, VirtualEthernetId}, Cell, Packet, }, config::{CellBuildConfig, RattanConfig}, @@ -34,7 +35,17 @@ use crate::{control::http::HttpControlEndpoint, error::HttpServerError}; use std::net::{Ipv4Addr, SocketAddr}; pub static INSTANCE_ID: OnceCell = OnceCell::new(); -pub static BASE_TS: OnceCell = OnceCell::new(); + +pub static BASE_TS: Lazy<(i64, u64)> = Lazy::new(|| { + // Internal use + let machine_time = get_clock_ns(); + // Used as base timestamp in Packet Logs. + let unix_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Time went backwards") + .as_micros(); + (machine_time, unix_time as u64) +}); #[derive(Clone, Copy, Debug, clap::ValueEnum, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] diff --git a/rattan-log/src/log_entry/entry/flow_entry.rs b/rattan-log/src/log_entry/entry/flow_entry.rs index 2e1f10d1..1a98b712 100644 --- a/rattan-log/src/log_entry/entry/flow_entry.rs +++ b/rattan-log/src/log_entry/entry/flow_entry.rs @@ -79,7 +79,7 @@ pub struct TCPFlow { pub dst_ip: u32, pub src_port: u16, pub dst_port: u16, - pub base_ts: i64, + pub base_ts: u64, pub _reserved: u32, pub options: TCPOption, } @@ -158,9 +158,9 @@ impl FlowEntryVariant { } } -impl From<(u32, i64, FlowDesc)> for FlowEntryVariant { +impl From<(u32, u64, FlowDesc)> for FlowEntryVariant { // (flow_id, base_ts, flow_desc) - fn from(value: (u32, i64, FlowDesc)) -> Self { + fn from(value: (u32, u64, FlowDesc)) -> Self { let (flow_id, base_ts, flow_desc) = value; let mut entryheader = FlowEntryHeader::default(); entryheader.set_length(32); diff --git a/rattan-log/src/logger/mmap.rs b/rattan-log/src/logger/mmap.rs index 08ed5a11..95e89527 100644 --- a/rattan-log/src/logger/mmap.rs +++ b/rattan-log/src/logger/mmap.rs @@ -193,12 +193,11 @@ where self.current_chunk_offset + current_chunk_len, current_chunk_len ); - debug_assert_eq!( - self.prologue_len, - self.writer - .chunk - .write_at(&header, self.current_chunk_offset) - ); + + self.writer + .chunk + .write_at(&header, self.current_chunk_offset); + if is_final { return; } diff --git a/rattan-log/src/logger/mod.rs b/rattan-log/src/logger/mod.rs index 92b9ac13..74a0c3f1 100644 --- a/rattan-log/src/logger/mod.rs +++ b/rattan-log/src/logger/mod.rs @@ -56,8 +56,8 @@ pub enum RattanLogOp { /// its recorded position (represented as a byte range `[offset, offset + len)`). /// We use this information (offset and len), along with the flow_id, to construct the raw log entry. RawEntry(u32, RawLogEntry, Vec), - /// Represents a flow consists of (flow_id, base_time_ns, flow_desc). - Flow(u32, i64, FlowDesc), + /// Represents a flow consists of (flow_id, base_time_us, flow_desc). + Flow(u32, u64, FlowDesc), /// End of Log End, } diff --git a/rattan-log/src/logger/pcap.rs b/rattan-log/src/logger/pcap.rs index ba1766ed..ca988268 100644 --- a/rattan-log/src/logger/pcap.rs +++ b/rattan-log/src/logger/pcap.rs @@ -147,7 +147,7 @@ impl PacketWriter { let enhanced_packet = EnhancedPacketBlock { interface_id: 0, timestamp: Duration::from_micros( - tcp_entry.general_pkt_entry.ts as u64 + flow_desc.base_ts as u64 / 1000, + tcp_entry.general_pkt_entry.ts as u64 + flow_desc.base_ts, ), original_len: tcp_entry.general_pkt_entry.pkt_length as u32, data: Cow::from(data), @@ -165,13 +165,11 @@ impl PacketWriter { &mut self, raw_log_entry: &RawLogEntry, packet: Vec, - base_ts: i64, + base_ts: u64, ) -> Result<()> { let enhanced_packet = EnhancedPacketBlock { interface_id: 0, - timestamp: Duration::from_micros( - raw_log_entry.general_pkt_entry.ts as u64 + base_ts as u64 / 1000, - ), + timestamp: Duration::from_micros(raw_log_entry.general_pkt_entry.ts as u64 + base_ts), original_len: raw_log_entry.general_pkt_entry.pkt_length as u32, data: Cow::from(packet), options: vec![], diff --git a/rattan-log/src/logger/reader.rs b/rattan-log/src/logger/reader.rs index 6ae4afb2..9d79696c 100644 --- a/rattan-log/src/logger/reader.rs +++ b/rattan-log/src/logger/reader.rs @@ -26,7 +26,7 @@ type FlowIndex = u16; struct ParseContext { pub flows: HashMap, pub flow_index: HashMap, - pub base_ts: i64, + pub base_ts: u64, } impl ParseContext { @@ -221,6 +221,13 @@ pub fn convert_log_to_pcapng( continue; }; + if raw_header[0] != 0x45 { + dbg!(offset); + dbg!(pointer.get_offset()); + dbg!(chunk_offset); + panic!("Invalid IPv4 Header"); + } + let (mut packet_header, mock_l3) = match raw_entry.general_pkt_entry.header.get_type().try_into() { Ok(GeneralPacketType::RawIP) => { diff --git a/rattan-log/src/logger/writer.rs b/rattan-log/src/logger/writer.rs index d578c0b5..4b62b96e 100644 --- a/rattan-log/src/logger/writer.rs +++ b/rattan-log/src/logger/writer.rs @@ -149,9 +149,9 @@ where } } -fn build_chunk_prologue(data_length: usize, time_offset: &Option) -> Vec { +fn build_chunk_prologue(data_length: usize, chunk_offset: &Option) -> Vec { let header = new_log_entry_chunk_prologue( - time_offset.unwrap_or_default(), + chunk_offset.unwrap_or_default(), LOGICAL_CHUNK_SIZE_1M, data_length, );