Skip to content

Commit

Permalink
Reenable parallelization of packet parsing in packet tests. (#600)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Apr 6, 2022
1 parent d64e898 commit 37f3645
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 61 deletions.
12 changes: 1 addition & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions shotover-proxy/Cargo.toml
Expand Up @@ -75,14 +75,13 @@ csv = "1.1.6"
strum_macros = "0.24"

[dev-dependencies]
rayon = "1.5.1"
criterion = { git = "https://github.com/shotover/criterion.rs", branch = "version-0.4", version = "0.3", features = ["async_tokio", "html_reports"] }
redis = { version = "0.21.0", features = ["tokio-comp", "cluster"] }
pcap = "0.9.0"
pktparse = { version = "0.7.0", features = ["serde"] }
tls-parser = "0.11.0"
threadpool = "1.0"
tokio-io-timeout = "1.1.1"
num_cpus = "1.0"
serial_test = "0.6.0"
cassandra-cpp = "1.1.0"
test-helpers = { path = "../test-helpers" }
Expand Down
50 changes: 12 additions & 38 deletions shotover-proxy/tests/codec/util/packet_capture.rs
@@ -1,11 +1,9 @@
use crate::codec::util::packet_parse::{PacketHeader, PacketParse, ParsedPacket};

use crate::codec::util::packet_parse::{OwnedPcapPacket, PacketHeader, PacketParse, ParsedPacket};
use anyhow::Result;
use pcap::{Active, Capture, Device};
use rayon::prelude::*;
use std::net::IpAddr;
use std::path::Path;
use std::sync::{Arc, Mutex};
use threadpool::ThreadPool;

#[derive(Default)]
pub struct PacketCapture {
Expand Down Expand Up @@ -45,15 +43,9 @@ impl PacketCapture {
self.print_headers();

while let Ok(packet) = cap_handle.next() {
let data = packet.data.to_owned();
let len = packet.header.len;
let ts = format!(
"{}.{:06}",
&packet.header.ts.tv_sec, &packet.header.ts.tv_usec
);

let packet = OwnedPcapPacket::from(packet);
let packet_parse = PacketParse::default();
let parsed_packet = packet_parse.parse_packet(data, len, ts);
let parsed_packet = packet_parse.parse_packet(&packet);
match parsed_packet {
Ok(parsed_packet) => {
self.print_packet(&parsed_packet);
Expand Down Expand Up @@ -135,41 +127,23 @@ impl PacketCapture {
file_name: &Path,
filter: Option<String>,
) -> Vec<Result<ParsedPacket, String>> {
// TODO: Fix flakiness from out-of-order futures.
// let pool = ThreadPool::new(num_cpus::get() * 2);
let pool = ThreadPool::new(1);
let mut cap_handle = Capture::from_file(file_name).unwrap();
let packets = Arc::new(Mutex::new(Vec::new()));

if let Some(filter) = filter {
cap_handle
.filter(&filter, false)
.expect("Filters invalid, please check the documentation.");
}

while let Ok(packet) = cap_handle.next() {
let data = packet.data.to_owned();
let len = packet.header.len;
let ts = format!(
"{}.{:06}",
&packet.header.ts.tv_sec, &packet.header.ts.tv_usec
);
let packets: Vec<_> =
std::iter::from_fn(move || cap_handle.next().ok().map(OwnedPcapPacket::from)).collect();

let packets = packets.clone();

pool.execute(move || {
packets
.par_iter()
.map(|packet| {
let packet_parse = PacketParse::default();
let parsed_packet = packet_parse.parse_packet(data, len, ts);

packets.lock().unwrap().push(parsed_packet);
});
}

pool.join();

Arc::try_unwrap(packets)
.expect("more refs remaining")
.into_inner()
.unwrap()
packet_parse.parse_packet(packet)
})
.collect()
}
}
33 changes: 23 additions & 10 deletions shotover-proxy/tests/codec/util/packet_parse.rs
@@ -1,3 +1,4 @@
use pcap::{Packet as PcapPacket, PacketHeader as PcapPacketHeader};
use pktparse::arp::ArpPacket;
use pktparse::ethernet::{EtherType, EthernetFrame};
use pktparse::ip::IPProtocol;
Expand All @@ -6,10 +7,23 @@ use pktparse::ipv6::IPv6Header;
use pktparse::tcp::TcpHeader;
use pktparse::udp::UdpHeader;
use pktparse::*;
use serde::Deserialize;
use std::string::ToString;
use tls_parser::TlsMessage;

use serde::Deserialize;
pub struct OwnedPcapPacket {
header: PcapPacketHeader,
data: Vec<u8>,
}

impl OwnedPcapPacket {
pub fn from(packet: PcapPacket) -> OwnedPcapPacket {
OwnedPcapPacket {
header: *packet.header,
data: packet.data.to_owned(),
}
}
}

#[derive(Default)]
pub struct PacketParse {}
Expand Down Expand Up @@ -62,15 +76,14 @@ impl PacketParse {
PacketParse {}
}

pub fn parse_packet(
&self,
data: Vec<u8>,
len: u32,
ts: String,
) -> Result<ParsedPacket, String> {
let mut parsed_packet = self.parse_link_layer(&data)?;
parsed_packet.len = len;
parsed_packet.timestamp = ts;
pub fn parse_packet(&self, packet: &OwnedPcapPacket) -> Result<ParsedPacket, String> {
let mut parsed_packet = self.parse_link_layer(&packet.data)?;
let timestamp = format!(
"{}.{:06}",
&packet.header.ts.tv_sec, &packet.header.ts.tv_usec
);
parsed_packet.len = packet.header.len;
parsed_packet.timestamp = timestamp;
Ok(parsed_packet)
}

Expand Down

0 comments on commit 37f3645

Please sign in to comment.