Skip to content

Commit

Permalink
1.0.0 and add way better deduplication
Browse files Browse the repository at this point in the history
  • Loading branch information
kognise committed Mar 23, 2022
1 parent 3c1d68d commit 30dd388
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 39 deletions.
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@

pub mod error;
pub mod net;
pub mod ringbuffer;
pub mod ui;
81 changes: 53 additions & 28 deletions src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,23 @@ use pnet::datalink::{
use pnet::packet::ethernet::{EtherTypes, EthernetPacket, MutableEthernetPacket};
use pnet::packet::Packet as PnetPacket;
use pnet::util::MacAddr;
use rand::Rng;
use serde::{Deserialize, Serialize};

use crate::error::ArpchatError;
use crate::ringbuffer::Ringbuffer;

const ARP_HTYPE: &[u8] = &[0x00, 0x01]; // Hardware Type (Ethernet)
const ARP_HLEN: u8 = 6; // Hardware Address Length
const ARP_OPER: &[u8] = &[0, 1]; // Operation (Request)
const PACKET_PREFIX: &[u8] = b"uwu";

// Tag, seq, and total, are each one byte, thus the `+ 3`.
const PACKET_PART_SIZE: usize = u8::MAX as usize - (PACKET_PREFIX.len() + 3);

pub const ID_SIZE: usize = 8;
pub type Id = [u8; ID_SIZE];

// Tag, seq, and total, are each one byte, thus the `+ 3`.
const PACKET_PART_SIZE: usize = u8::MAX as usize - (PACKET_PREFIX.len() + 3 + ID_SIZE);

#[derive(Default, Serialize, Deserialize, Copy, Clone, Debug, PartialEq, Eq)]
pub enum EtherType {
#[default]
Expand Down Expand Up @@ -134,15 +136,16 @@ pub struct Channel {
tx: Box<dyn DataLinkSender>,
rx: Box<dyn DataLinkReceiver>,

/// Buffer of received packet parts, keyed by the number of parts and
/// its tag. This keying method is far from foolproof but reduces
/// the chance of colliding packets just a tiny bit.
/// Buffer of received packet parts, keyed by the packet id.
///
/// Each value is the Vec of its parts, and counts as a packet when
/// every part is non-empty. There are probably several optimization
/// opportunities here, but c'mon, a naive approach is perfectly fine
/// for a program this cursed.
buffer: HashMap<(u8, u8), Vec<Vec<u8>>>,
buffer: HashMap<Id, Vec<Vec<u8>>>,

/// Recent packet buffer for deduplication.
recent: Ringbuffer<Id>,
}

impl Channel {
Expand All @@ -159,6 +162,7 @@ impl Channel {
tx,
rx,
buffer: HashMap::new(),
recent: Ringbuffer::with_capacity(16),
})
}

Expand All @@ -171,23 +175,33 @@ impl Channel {
let mut parts: Vec<&[u8]> = data.chunks(PACKET_PART_SIZE).collect();

if parts.is_empty() {
// Empty packets still need one byte of data to go through :)
// We need to send some data so empty enums go through! Not entirely
// sure *why* this is the case... pushing an empty string feels like
// it should be fine, but it doesn't work.
parts.push(b".");
}
if parts.len() - 1 > u8::MAX as usize {
return Err(ArpchatError::MsgTooLong);
}

let total = (parts.len() - 1) as u8;
let id: Id = rand::thread_rng().gen();
for (seq, part) in parts.into_iter().enumerate() {
self.send_part(packet.tag(), seq as u8, total, part)?;
self.send_part(packet.tag(), seq as u8, total, id, part)?;
}

Ok(())
}

fn send_part(&mut self, tag: u8, seq: u8, total: u8, part: &[u8]) -> Result<(), ArpchatError> {
let data = &[PACKET_PREFIX, &[tag, seq, total], part].concat();
fn send_part(
&mut self,
tag: u8,
seq: u8,
total: u8,
id: Id,
part: &[u8],
) -> Result<(), ArpchatError> {
let data = &[PACKET_PREFIX, &[tag, seq, total], &id, part].concat();

// The length of the data must fit in a u8. This should also
// guarantee that we'll be inside the MTU.
Expand Down Expand Up @@ -247,28 +261,39 @@ impl Channel {
}

if let &[tag, seq, total, ref inner @ ..] = &data[PACKET_PREFIX.len()..] {
let key = (tag, total);

if let Some(parts) = self.buffer.get_mut(&key) {
parts[seq as usize] = inner.to_vec();
} else {
let mut parts = vec![vec![]; total as usize + 1];
parts[seq as usize] = inner.to_vec();
self.buffer.insert(key, parts);
}
Ok(try {
let id: Id = inner[..ID_SIZE].try_into().ok()?;
let inner = &inner[ID_SIZE..];

// SAFETY: Guaranteed to exist because it's populated directly above.
let parts = unsafe { self.buffer.get(&key).unwrap_unchecked() };
// Skip if we already have this packet.
if self.recent.contains(&id) {
None?;
}

if parts.iter().all(|p| !p.is_empty()) {
if let Some(parts) = self.buffer.get_mut(&id) {
parts[seq as usize] = inner.to_vec();
} else {
let mut parts = vec![vec![]; total as usize + 1];
parts[seq as usize] = inner.to_vec();
self.buffer.insert(id, parts);
}

// SAFETY: Guaranteed to exist because it's populated directly above.
let parts = unsafe { self.buffer.get(&id).unwrap_unchecked() };

// Short-circuit if we don't have all the parts yet.
if !parts.iter().all(|p| !p.is_empty()) {
None?;
}

// Put the packet together.
let packet = Packet::deserialize(tag, &parts.concat());
if packet.is_some() {
self.buffer.remove(&key);
self.buffer.remove(&id);
self.recent.push(id);
}
Ok(packet)
} else {
Ok(None)
}
packet?
})
} else {
Ok(None)
}
Expand Down
25 changes: 25 additions & 0 deletions src/ringbuffer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#[derive(Clone, Debug)]
pub struct Ringbuffer<T> {
data: Box<[Option<T>]>,
index: usize,
}

impl<T: Clone> Ringbuffer<T> {
pub fn with_capacity(capacity: usize) -> Self {
Self {
data: vec![None; capacity].into_boxed_slice(),
index: 0,
}
}

pub fn push(&mut self, item: T) {
self.data[self.index] = Some(item);
self.index = (self.index + 1) % self.data.len();
}
}

impl<T: PartialEq> Ringbuffer<T> {
pub fn contains(&self, item: &T) -> bool {
self.data.iter().any(|x| x.as_ref() == Some(item))
}
}
12 changes: 3 additions & 9 deletions src/ui.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ use std::thread;

use crossbeam_channel::unbounded;
use cursive::backends::crossterm::crossterm::style::Stylize;
use cursive::traits::Nameable;
use cursive::views::{Dialog, LinearLayout, TextView};
use cursive::views::{Dialog, LinearLayout};

use self::config::CONFIG;
use self::dialog::interface::show_iface_dialog;
Expand Down Expand Up @@ -45,13 +44,8 @@ pub fn run() {
while siv.is_running() {
while let Ok(cmd) = ui_rx.try_recv() {
match cmd {
UICommand::NewMessage(username, id, msg) => {
siv.call_on_name("chat_inner", |chat_inner: &mut LinearLayout| {
chat_inner.add_child(
TextView::new(format!("[{username}] {msg}"))
.with_name(format!("{id:x?}_msg")),
);
});
UICommand::NewMessage(username, msg) => {
append_txt(&mut siv, "chat_inner", format!("[{username}] {msg}"));
}
UICommand::UpdateUsername(new_username) => {
if new_username == username {
Expand Down
2 changes: 1 addition & 1 deletion src/ui/net_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub(super) fn start_net_thread(tx: Sender<UICommand>, rx: Receiver<NetCommand>)
Some((_, username)) => username.clone(),
None => "unknown".to_string(),
};
tx.send(UICommand::NewMessage(username, id, msg)).unwrap()
tx.send(UICommand::NewMessage(username, msg)).unwrap()
}
Some(Packet::PresenceReq) => {
if state == NetThreadState::NeedsInitialPresence {
Expand Down
2 changes: 1 addition & 1 deletion src/ui/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub enum UICommand {
SendMessage(String),
SetInterface(String),
SetEtherType(EtherType),
NewMessage(String, Id, String),
NewMessage(String, String),
PresenceUpdate(Id, String, bool, UpdatePresenceKind),
RemovePresence(Id, String),
Error(ArpchatError),
Expand Down

0 comments on commit 30dd388

Please sign in to comment.