Skip to content

Commit

Permalink
Jq rs/aes support (#9)
Browse files Browse the repository at this point in the history
Aes support for transport traffic towards Mles server.
  • Loading branch information
jq-rs committed Feb 13, 2020
1 parent 54c7f6b commit 5f9d2fa
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 7 deletions.
7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "arki-server"
version = "0.4.1"
version = "0.5.0"
authors = ["jq-rs"]
edition = "2018"

Expand All @@ -13,3 +13,8 @@ tokio-io = "0.1"
mles-utils = "1.1.0"
bytes = "0.4"
x509-parser = "0.6"
blake2 = "0.8"
aes-ctr = "0.3"
aes = "0.3"
block-modes = "0.3"
base64 = "0.11"
124 changes: 118 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,25 @@ use std::io::{self};
use tokio_io::codec::{Encoder as TokioEncoder, Decoder as TokioDecoder};
use core::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::Mutex;

use std::str::FromStr;

use blake2::{Blake2s, Digest};
use aes::block_cipher_trait::generic_array::GenericArray;
use block_modes::{BlockMode, Ecb};
use block_modes::block_padding::ZeroPadding;
use aes::Aes128;
type Aes128Ecb = Ecb<Aes128, ZeroPadding>;

use base64::{encode as b64encode, decode as b64decode};

use aes_ctr::Aes128Ctr;
use aes_ctr::stream_cipher::{
NewStreamCipher, SyncStreamCipher
};


use tokio::timer::Interval;
use std::time::Duration;
use mles_utils::*;
Expand Down Expand Up @@ -92,7 +108,6 @@ fn main() {
tokio::run(server);
});
}

let www_root_inner = www_root_dir.clone();
let (tx, rx) = oneshot::channel();
{
Expand Down Expand Up @@ -322,6 +337,8 @@ fn time_to_expiration<P: AsRef<std::path::Path>>(p: P) -> Option<std::time::Dura
.time_to_expiration()
}



fn run_websocket_proxy(websocket: warp::ws::WebSocket) -> impl Future<Item = (), Error = ()> + Send + 'static {
let raddr = SRV_ADDR.parse().unwrap();

Expand All @@ -338,6 +355,9 @@ fn run_websocket_proxy(websocket: warp::ws::WebSocket) -> impl Future<Item = (),
let ping_cntr = Arc::new(AtomicUsize::new(0));
let pong_cntr = Arc::new(AtomicUsize::new(0));

let aeschannel = Arc::new(Mutex::new(Vec::new()));
let aesecb = Arc::new(Mutex::new(Vec::new()));

let mut cid: Option<u32> = None;
let mut key: Option<u64> = None;
let mut keys = Vec::new();
Expand All @@ -347,6 +367,8 @@ fn run_websocket_proxy(websocket: warp::ws::WebSocket) -> impl Future<Item = (),
let (mut mles_tx, mles_rx) = unbounded();
let (mut combined_tx, combined_rx) = unbounded();

let aeschannel_inner = aeschannel.clone();
let aesecb_inner = aesecb.clone();
let tcp = TcpStream::connect(&raddr);
let client = tcp
.and_then(move |stream| {
Expand Down Expand Up @@ -377,22 +399,78 @@ fn run_websocket_proxy(websocket: warp::ws::WebSocket) -> impl Future<Item = (),

let (tcp_sink, tcp_stream) = Bytes.framed(stream).split();


let mles_rx = mles_rx.map_err(|_| panic!()); // errors not possible on rx XXX
let mles_rx = mles_rx.and_then(move |buf: Vec<_>| {
if buf.is_empty() {
//println!("Empty buffer!");
return Err(Error::new(ErrorKind::BrokenPipe, "broken pipe"));
}
let decoded_message = Msg::decode(buf.as_slice());
if None == key {
let mut hasher = Blake2s::new();
hasher.input(decoded_message.get_channel());
let mut aeschannel_locked = aeschannel_inner.lock().unwrap();
let mut vec: Vec<u8> = hasher.result().as_slice().to_vec();
vec.truncate(16);
*aeschannel_locked = vec;
let mut hasher_ecb = Blake2s::new();
hasher_ecb.input(decoded_message.get_channel());
let mut hasher_ecb_final = Blake2s::new();
hasher_ecb_final.input(hasher_ecb.result().as_slice());
let mut aesecb_locked = aesecb_inner.lock().unwrap();
let mut vec: Vec<u8> = hasher_ecb_final.result().as_slice().to_vec();
vec.truncate(16);
*aesecb_locked = vec;

let uid: Vec<u8> = b64decode(decoded_message.get_uid()).unwrap();
let channel: Vec<u8> = b64decode(decoded_message.get_channel()).unwrap();

let cipher = Aes128Ecb::new_var(&*aesecb_locked, Default::default()).unwrap();
let uid = cipher.encrypt_vec(&uid);
let cipher = Aes128Ecb::new_var(&*aesecb_locked, Default::default()).unwrap();
let channel = cipher.encrypt_vec(&channel);

//create hash for verification
let decoded_message = Msg::decode(buf.as_slice());
keys.push(decoded_message.get_uid().to_string());
keys.push(decoded_message.get_channel().to_string());
keys.push(b64encode(&uid));
keys.push(b64encode(&channel));
key = Some(MsgHdr::do_hash(&keys));
cid = Some(MsgHdr::select_cid(key.unwrap()));
cid_val = cid.unwrap();
println!("Adding TLS client with cid {:x}", cid.unwrap());
}
let aeschannel_locked = aeschannel_inner.lock().unwrap();
let aeskey = GenericArray::from_slice(&*aeschannel_locked);
let aesecb_locked = aesecb_inner.lock().unwrap();
let aesecbkey = &*aesecb_locked;

let uid: Vec<u8> = b64decode(decoded_message.get_uid()).unwrap();
let channel: Vec<u8> = b64decode(decoded_message.get_channel()).unwrap();

let cipher = Aes128Ecb::new_var(&aesecbkey, Default::default()).unwrap();
let uid = cipher.encrypt_vec(&uid);
let cipher = Aes128Ecb::new_var(&aesecbkey, Default::default()).unwrap();
let channel = cipher.encrypt_vec(&channel);

let mut msg: Vec<u8> = decoded_message.get_message().clone();
let mut aesnonce = Vec::new();
aesnonce.extend_from_slice(&msg[0..8]);
aesnonce.extend_from_slice(&msg[0..8]);
let nonce = GenericArray::from_slice(&aesnonce);

// create cipher instance
let mut cipher = Aes128Ctr::new(&aeskey, &nonce);
// apply keystream (encrypt)
cipher.apply_keystream(&mut msg[8..]);

let decoded_message = decoded_message.set_uid(b64encode(&uid));
let decoded_message = decoded_message.set_channel(b64encode(&channel));
let decoded_message = decoded_message.set_message(msg);

let buf = decoded_message.encode();

//decoded_message.set_msg(msg);

let msghdr = MsgHdr::new(buf.len() as u32, cid.unwrap(), key.unwrap());
let mut msgv = msghdr.encode();
msgv.extend(buf);
Expand Down Expand Up @@ -461,8 +539,42 @@ fn run_websocket_proxy(websocket: warp::ws::WebSocket) -> impl Future<Item = (),
Ok(())
});

let aeschannel_inner = aeschannel.clone();
let aesecb_inner = aesecb.clone();
let tcp_to_ws_writer = ws_rx.for_each(move |msg: Vec<_>| {
let msg = Message::binary(msg);
let decoded_message = Msg::decode(&msg);

let aeschannel_locked = aeschannel_inner.lock().unwrap();
let aeskey = GenericArray::from_slice(&*aeschannel_locked);
let aesecb_locked = aesecb_inner.lock().unwrap();
let aesecbkey = &*aesecb_locked;

let uid: Vec<u8> = b64decode(decoded_message.get_uid()).unwrap();
let channel: Vec<u8> = b64decode(decoded_message.get_channel()).unwrap();

let cipher = Aes128Ecb::new_var(&aesecbkey, Default::default()).unwrap();
let uid = cipher.decrypt_vec(&uid).unwrap();
let cipher = Aes128Ecb::new_var(&aesecbkey, Default::default()).unwrap();
let channel = cipher.decrypt_vec(&channel).unwrap();

let mut msg: Vec<u8> = decoded_message.get_message().clone();
let mut aesnonce = Vec::new();
aesnonce.extend_from_slice(&msg[0..8]);
aesnonce.extend_from_slice(&msg[0..8]);
let nonce = GenericArray::from_slice(&aesnonce);

// create cipher instance
let mut cipher = Aes128Ctr::new(&aeskey, &nonce);
// apply keystream (decrypt)
cipher.apply_keystream(&mut msg[8..]);

let decoded_message = decoded_message.set_uid(b64encode(&uid));
let decoded_message = decoded_message.set_channel(b64encode(&channel));
let decoded_message = decoded_message.set_message(msg);

let buf = decoded_message.encode();

let msg = Message::binary(buf);
let _ = combined_tx
.start_send(msg)
.map_err(|err| Error::new(ErrorKind::Other, err));
Expand All @@ -472,7 +584,7 @@ fn run_websocket_proxy(websocket: warp::ws::WebSocket) -> impl Future<Item = (),
Ok(())
});

let ws_writer = combined_rx.fold(sink, |mut sink, msg| {
let ws_writer = combined_rx.fold(sink, move |mut sink, msg| {
let _ = sink
.start_send(msg)
.map_err(|err| Error::new(ErrorKind::Other, err));
Expand Down

0 comments on commit 5f9d2fa

Please sign in to comment.