Skip to content

Commit

Permalink
Untested updates for modern Rust
Browse files Browse the repository at this point in the history
  • Loading branch information
jedisct1 committed Aug 23, 2018
1 parent 774634e commit 4329c03
Show file tree
Hide file tree
Showing 25 changed files with 753 additions and 723 deletions.
878 changes: 455 additions & 423 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 1 addition & 6 deletions Cargo.toml
Expand Up @@ -6,7 +6,6 @@ build = "build.rs"

[features]
capnp-recompile = ["capnpc"]
cargo-clippy = ["clippy"]
coroutines = ["coio"]

default = ["kafka"]
Expand All @@ -27,12 +26,8 @@ serde = "~0.8"
serde_json = "~0.8"
toml = "~0.2"

[dependencies.clippy]
git = "https://github.com/Manishearth/rust-clippy"
optional = true

[dependencies.coio]
git = "https://github.com/zonyitoo/coio-rs"
git = "https://github.com/jedisct1/coio-rs"
optional = true

[dependencies.kafka]
Expand Down
11 changes: 0 additions & 11 deletions rustfmt.toml

This file was deleted.

2 changes: 1 addition & 1 deletion src/flowgger/config.rs
@@ -1,6 +1,6 @@
use std::fs::File;
use std::io::{Error, ErrorKind};
use std::io::prelude::*;
use std::io::{Error, ErrorKind};
use std::path::Path;
use toml;

Expand Down
2 changes: 1 addition & 1 deletion src/flowgger/decoder/gelf_decoder.rs
Expand Up @@ -30,7 +30,7 @@ impl Decoder for GelfDecoder {
Err(Syntax(ErrorCode::InvalidUnicodeCodePoint, ..)) => {
de::from_str(&line.replace('\n', r"\n"))
}
x @ _ => x,
x => x,
};
let obj: Value = obj.or(Err("Invalid GELF input, unable to parse as a JSON object"))?;
let obj = obj.as_object().ok_or("Empty GELF input")?;
Expand Down
32 changes: 20 additions & 12 deletions src/flowgger/decoder/ltsv_decoder.rs
Expand Up @@ -132,9 +132,11 @@ impl Decoder for LTSVDecoder {
};
(
final_name,
SDValue::Bool(value
.parse::<bool>()
.or(Err("Type error; boolean was expected"))?),
SDValue::Bool(
value
.parse::<bool>()
.or(Err("Type error; boolean was expected"))?,
),
)
}
Some(&SDValueType::F64) => {
Expand All @@ -146,9 +148,11 @@ impl Decoder for LTSVDecoder {
};
(
final_name,
SDValue::F64(value
.parse::<f64>()
.or(Err("Type error; f64 was expected"))?),
SDValue::F64(
value
.parse::<f64>()
.or(Err("Type error; f64 was expected"))?,
),
)
}
Some(&SDValueType::I64) => {
Expand All @@ -160,9 +164,11 @@ impl Decoder for LTSVDecoder {
};
(
final_name,
SDValue::I64(value
.parse::<i64>()
.or(Err("Type error; i64 was expected"))?),
SDValue::I64(
value
.parse::<i64>()
.or(Err("Type error; i64 was expected"))?,
),
)
}
Some(&SDValueType::U64) => {
Expand All @@ -174,9 +180,11 @@ impl Decoder for LTSVDecoder {
};
(
final_name,
SDValue::U64(value
.parse::<u64>()
.or(Err("Type error; u64 was expected"))?),
SDValue::U64(
value
.parse::<u64>()
.or(Err("Type error; u64 was expected"))?,
),
)
}
}
Expand Down
9 changes: 4 additions & 5 deletions src/flowgger/decoder/rfc5424_decoder.rs
Expand Up @@ -184,11 +184,10 @@ fn parse_data(line: &str) -> Result<(Option<StructuredData>, Option<String>), &'
in_value = false;
let value = unescape_sd_value(&sd[value_start..i]);
let pair = (
"_".to_owned()
+ name.expect(
"Name in structured data contains an invalid UTF-8 \
sequence",
),
"_".to_owned() + name.expect(
"Name in structured data contains an invalid UTF-8 \
sequence",
),
SDValue::String(value),
);
sd_res.pairs.push(pair);
Expand Down
16 changes: 8 additions & 8 deletions src/flowgger/encoder/capnp_encoder.rs
Expand Up @@ -25,8 +25,7 @@ impl CapnpEncoder {
.expect("output.capnp_extra values must be strings")
.to_owned(),
)
})
.collect(),
}).collect(),
};
CapnpEncoder { extra: extra }
}
Expand Down Expand Up @@ -75,12 +74,13 @@ fn build_record<T: Allocator>(
root.set_full_msg(&full_msg);
}
if let Some(sd) = record.sd {
sd.sd_id
.as_ref()
.and_then(|sd_id| Some(root.set_sd_id(sd_id)));
let mut pairs = root.borrow().init_pairs(sd.pairs.len() as u32);
sd.sd_id.as_ref().and_then(|sd_id| {
root.set_sd_id(sd_id);
Some(())
});
let mut pairs = root.reborrow().init_pairs(sd.pairs.len() as u32);
for (i, (name, value)) in sd.pairs.into_iter().enumerate() {
let mut pair = pairs.borrow().get(i as u32);
let mut pair = pairs.reborrow().get(i as u32);
pair.set_key(&name);
let mut v = pair.init_value();
match value {
Expand All @@ -96,7 +96,7 @@ fn build_record<T: Allocator>(
if !extra.is_empty() {
let mut pairs = root.init_extra(extra.len() as u32);
for (i, &(ref name, ref value)) in extra.into_iter().enumerate() {
let mut pair = pairs.borrow().get((i) as u32);
let mut pair = pairs.reborrow().get((i) as u32);
pair.set_key(name);
let mut v = pair.init_value();
v.set_string(value)
Expand Down
6 changes: 2 additions & 4 deletions src/flowgger/encoder/gelf_encoder.rs
Expand Up @@ -25,8 +25,7 @@ impl GelfEncoder {
.expect("output.gelf_extra values must be strings")
.to_owned(),
)
})
.collect(),
}).collect(),
};
GelfEncoder { extra: extra }
}
Expand All @@ -40,8 +39,7 @@ impl Encoder for GelfEncoder {
.insert(
"short_message".to_owned(),
Value::String(record.msg.unwrap_or_else(|| "-".to_owned())),
)
.insert("timestamp".to_owned(), Value::F64(record.ts));
).insert("timestamp".to_owned(), Value::F64(record.ts));
if let Some(severity) = record.severity {
map = map.insert("level".to_owned(), Value::U64(u64::from(severity)));
}
Expand Down
3 changes: 1 addition & 2 deletions src/flowgger/encoder/ltsv_encoder.rs
Expand Up @@ -22,8 +22,7 @@ impl LTSVEncoder {
.expect("output.ltsv_extra values must be strings")
.to_owned(),
)
})
.collect(),
}).collect(),
};
LTSVEncoder { extra: extra }
}
Expand Down
6 changes: 2 additions & 4 deletions src/flowgger/input/redis_input.rs
Expand Up @@ -40,14 +40,12 @@ impl RedisInput {
.map_or(DEFAULT_CONNECT, |x| {
x.as_str()
.expect("input.redis_connect must be an ip:port string")
})
.to_owned();
}).to_owned();
let queue_key = config
.lookup("input.redis_queue_key")
.map_or(DEFAULT_QUEUE_KEY, |x| {
x.as_str().expect("input.redis_queue_key must be a string")
})
.to_owned();
}).to_owned();
let threads = config
.lookup("input.redis_threads")
.map_or(DEFAULT_THREADS, |x| {
Expand Down
3 changes: 1 addition & 2 deletions src/flowgger/input/stdin_input.rs
Expand Up @@ -24,8 +24,7 @@ impl StdinInput {
.map_or(DEFAULT_FRAMING, |x| {
x.as_str()
.expect(r#"input.framing must be a string set to "line", "nul" or "syslen""#)
})
.to_owned();
}).to_owned();
let stdin_config = StdinConfig { framing: framing };
StdinInput {
stdin_config: stdin_config,
Expand Down
6 changes: 2 additions & 4 deletions src/flowgger/input/tcp/mod.rs
Expand Up @@ -38,8 +38,7 @@ pub fn config_parse(config: &Config) -> (TcpConfig, String, u64) {
.lookup("input.listen")
.map_or(DEFAULT_LISTEN, |x| {
x.as_str().expect("input.listen must be an ip:port string")
})
.to_owned();
}).to_owned();
let threads = get_default_threads(config);
let timeout = config.lookup("input.timeout").map_or(DEFAULT_TIMEOUT, |x| {
x.as_integer()
Expand All @@ -57,8 +56,7 @@ pub fn config_parse(config: &Config) -> (TcpConfig, String, u64) {
.map_or(framing, |x| {
x.as_str()
.expect(r#"input.framing must be a string set to "line", "nul" or "syslen""#)
})
.to_owned();
}).to_owned();
let tcp_config = TcpConfig {
framing: framing,
threads: threads,
Expand Down
5 changes: 2 additions & 3 deletions src/flowgger/input/tcp/tcpco_input.rs
@@ -1,6 +1,6 @@
use super::*;
use coio::Scheduler;
use coio::net::{TcpListener, TcpStream};
use coio::Scheduler;
use flowgger::config::Config;
use flowgger::decoder::Decoder;
use flowgger::encoder::Encoder;
Expand Down Expand Up @@ -51,8 +51,7 @@ impl Input for TcpCoInput {
Err(_) => {}
}
}
})
.unwrap();
}).unwrap();
}
}

Expand Down
26 changes: 10 additions & 16 deletions src/flowgger/input/tls/mod.rs
Expand Up @@ -42,9 +42,9 @@ pub struct TlsConfig {
fn set_fs(ctx: &mut SslContextBuilder) {
let p = BigNum::from_hex_str("87A8E61DB4B6663CFFBBD19C651959998CEEF608660DD0F25D2CEED4435E3B00E00DF8F1D61957D4FAF7DF4561B2AA3016C3D91134096FAA3BF4296D830E9A7C209E0C6497517ABD5A8A9D306BCF67ED91F9E6725B4758C022E0B1EF4275BF7B6C5BFC11D45F9088B941F54EB1E59BB8BC39A0BF12307F5C4FDB70C581B23F76B63ACAE1CAA6B7902D52526735488A0EF13C6D9A51BFA4AB3AD8347796524D8EF6A167B5A41825D967E144E5140564251CCACB83E6B486F6B3CA3F7971506026C0B857F689962856DED4010ABD0BE621C3A3960A54E710C375F26375D7014103A4B54330C198AF126116D2276E11715F693877FAD7EF09CADB094AE91E1A1597").unwrap();
let g = BigNum::from_hex_str("3FB32C9B73134D0B2E77506660EDBD484CA7B18F21EF205407F4793A1A0BA12510DBC15077BE463FFF4FED4AAC0BB555BE3A6C1B0C6B47B1BC3773BF7E8C6F62901228F8C28CBB18A55AE31341000A650196F931C77A57F2DDF463E5E9EC144B777DE62AAAB8A8628AC376D282D6ED3864E67982428EBC831D14348F6F2F9193B5045AF2767164E1DFC967C1FB3F2E55A4BD1BFFE83B9C80D052B985D182EA0ADB2A3B7313D3FE14C8484B1E052588B9B7D2BBD2DF016199ECD06E1557CD0915B3353BBB64E0EC377FD028370DF92B52C7891428CDC67EB6184B523D1DB246C32F63078490F00EF8D647D148D47954515E2327CFEF98C582664B4C0F6CC41659").unwrap();
let q = BigNum::from_hex_str(
"8CF83642A709A097B447997640129DA299B1A47D1EB3750BA308B0FE64F5FBD3",
).unwrap();
let q =
BigNum::from_hex_str("8CF83642A709A097B447997640129DA299B1A47D1EB3750BA308B0FE64F5FBD3")
.unwrap();
let dh = Dh::from_params(p, g, q).unwrap();
ctx.set_tmp_dh(&dh).unwrap();
}
Expand All @@ -69,39 +69,34 @@ pub fn config_parse(config: &Config) -> (TlsConfig, String, u64) {
.lookup("input.listen")
.map_or(DEFAULT_LISTEN, |x| {
x.as_str().expect("input.listen must be an ip:port string")
})
.to_owned();
}).to_owned();
let threads = get_default_threads(config);
let cert = config
.lookup("input.tls_cert")
.map_or(DEFAULT_CERT, |x| {
x.as_str()
.expect("input.tls_cert must be a path to a .pem file")
})
.to_owned();
}).to_owned();
let key = config
.lookup("input.tls_key")
.map_or(DEFAULT_KEY, |x| {
x.as_str()
.expect("input.tls_key must be a path to a .pem file")
})
.to_owned();
}).to_owned();
let ciphers = config
.lookup("input.tls_ciphers")
.map_or(DEFAULT_CIPHERS, |x| {
x.as_str()
.expect("input.tls_ciphers must be a string with a cipher suite")
})
.to_owned();
}).to_owned();

let tls_modern = match config
.lookup("input.tls_compatibility_level")
.map_or(DEFAULT_TLS_COMPATIBILITY_LEVEL, |x| {
x.as_str().expect(
"input.tls_compatibility_level must be a string with the comptibility level",
)
})
.to_lowercase()
}).to_lowercase()
.as_ref()
{
"default" | "any" | "intermediate" => false,
Expand Down Expand Up @@ -141,15 +136,14 @@ pub fn config_parse(config: &Config) -> (TlsConfig, String, u64) {
.map_or(framing, |x| {
x.as_str()
.expect(r#"input.framing must be a string set to "line", "nul" or "syslen""#)
})
.to_owned();
}).to_owned();
let mut acceptor_builder = (if tls_modern {
SslAcceptorBuilder::mozilla_modern_raw(SslMethod::tls())
} else {
SslAcceptorBuilder::mozilla_intermediate_raw(SslMethod::tls())
}).unwrap();
{
let mut ctx = acceptor_builder.builder_mut();
let mut ctx = &mut acceptor_builder;
if let Some(ca_file) = ca_file {
ctx.set_ca_file(&ca_file)
.expect("Unable to read the trusted CA file");
Expand Down
8 changes: 3 additions & 5 deletions src/flowgger/input/tls/tlsco_input.rs
@@ -1,11 +1,10 @@
use super::*;
use coio::Scheduler;
use coio::net::{TcpListener, TcpStream};
use coio::Scheduler;
use flowgger::config::Config;
use flowgger::decoder::Decoder;
use flowgger::encoder::Encoder;
use flowgger::splitter::{CapnpSplitter, LineSplitter, NulSplitter, Splitter, SyslenSplitter};
use openssl::ssl::*;
use std::io::{stderr, BufReader, Write};
use std::net::SocketAddr;
use std::sync::mpsc::SyncSender;
Expand Down Expand Up @@ -52,8 +51,7 @@ impl Input for TlsCoInput {
Err(_) => {}
}
}
})
.unwrap();
}).unwrap();
}
}

Expand All @@ -67,7 +65,7 @@ fn handle_client(
if let Ok(peer_addr) = client.peer_addr() {
println!("Connection over TLS<coroutines> from [{}]", peer_addr);
}
let sslclient = match SslStream::accept(&*tls_config.arc_ctx, client) {
let sslclient = match tls_config.acceptor.accept(client) {
Err(_) => {
let _ = writeln!(stderr(), "SSL handshake aborted by the client");
return;
Expand Down

0 comments on commit 4329c03

Please sign in to comment.