diff --git a/Cargo.lock b/Cargo.lock index 8d93187f..f960d74c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -53,6 +53,11 @@ name = "approx" version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "ascii" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "atty" version = "0.2.3" @@ -191,6 +196,7 @@ dependencies = [ "libc 0.2.34 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", "lua 0.0.11 (git+https://github.com/blt/rust-lua53.git)", + "mio 0.6.11 (registry+https://github.com/rust-lang/crates.io-index)", "openssl-probe 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "protobuf 1.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "quantiles 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -202,7 +208,9 @@ dependencies = [ "serde 1.0.24 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.24 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.8 (registry+https://github.com/rust-lang/crates.io-index)", + "slab 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "tempdir 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", + "tiny_http 0.5.8 (registry+https://github.com/rust-lang/crates.io-index)", "toml 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", "url 1.6.0 (registry+https://github.com/rust-lang/crates.io-index)", "uuid 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -242,6 +250,15 @@ dependencies = [ "libc 0.2.34 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "chrono" +version = "0.2.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "num 0.1.41 (registry+https://github.com/rust-lang/crates.io-index)", + "time 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "chrono" version = "0.4.0" @@ -252,6 +269,11 @@ dependencies = [ "time 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "chunked_transfer" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "clamp" version = "0.1.0" @@ -357,7 +379,7 @@ dependencies = [ "serde 1.0.24 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.24 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.8 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-core 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-core 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", "uuid 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -379,7 +401,7 @@ dependencies = [ "reqwest 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.24 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.8 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-core 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-core 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", "url 1.6.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -434,6 +456,63 @@ dependencies = [ "syn 0.11.11 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "encoding" +version = "0.2.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "encoding-index-japanese 1.20141219.5 (registry+https://github.com/rust-lang/crates.io-index)", + "encoding-index-korean 1.20141219.5 (registry+https://github.com/rust-lang/crates.io-index)", + "encoding-index-simpchinese 1.20141219.5 (registry+https://github.com/rust-lang/crates.io-index)", + "encoding-index-singlebyte 1.20141219.5 (registry+https://github.com/rust-lang/crates.io-index)", + "encoding-index-tradchinese 1.20141219.5 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "encoding-index-japanese" +version = "1.20141219.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "encoding_index_tests 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "encoding-index-korean" +version = "1.20141219.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "encoding_index_tests 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "encoding-index-simpchinese" +version = "1.20141219.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "encoding_index_tests 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "encoding-index-singlebyte" +version = "1.20141219.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "encoding_index_tests 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "encoding-index-tradchinese" +version = "1.20141219.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "encoding_index_tests 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "encoding_index_tests" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "env_logger" version = "0.3.5" @@ -622,7 +701,7 @@ dependencies = [ "percent-encoding 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "relay 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-core 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-core 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-proto 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-service 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -647,7 +726,7 @@ dependencies = [ "futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.11.9 (registry+https://github.com/rust-lang/crates.io-index)", "native-tls 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-core 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-core 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-service 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-tls 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1146,7 +1225,7 @@ dependencies = [ "serde 1.0.24 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.8 (registry+https://github.com/rust-lang/crates.io-index)", "serde_urlencoded 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-core 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-core 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-tls 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "url 1.6.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1196,7 +1275,7 @@ dependencies = [ "hyper-tls 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "regex 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.8 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-core 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-core 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -1488,9 +1567,23 @@ dependencies = [ "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "tiny_http" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "ascii 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", + "chrono 0.2.25 (registry+https://github.com/rust-lang/crates.io-index)", + "chunked_transfer 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "encoding 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", + "rustc-serialize 0.3.24 (registry+https://github.com/rust-lang/crates.io-index)", + "url 0.2.38 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "tokio-core" -version = "0.1.10" +version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "bytes 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1525,7 +1618,7 @@ dependencies = [ "slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "smallvec 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "take 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-core 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-core 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-service 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1545,7 +1638,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", "native-tls 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-core 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-core 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1624,6 +1717,16 @@ name = "untrusted" version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "url" +version = "0.2.38" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "matches 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "rustc-serialize 0.3.24 (registry+https://github.com/rust-lang/crates.io-index)", + "uuid 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "url" version = "1.6.0" @@ -1644,6 +1747,15 @@ name = "utf8-ranges" version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "uuid" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "rand 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)", + "rustc-serialize 0.3.24 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "uuid" version = "0.5.1" @@ -1708,6 +1820,7 @@ dependencies = [ "checksum ansi_term 0.10.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6b3568b48b7cefa6b8ce125f9bb4989e52fbcc29ebea88df04cc7c5f12f70455" "checksum antidote 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "34fde25430d87a9388dadbe6e34d7f72a462c8b43ac8d309b42b0a8505d7e2a5" "checksum approx 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "08abcc3b4e9339e33a3d0a5ed15d84a687350c05689d825e0f6655eef9e76a94" +"checksum ascii 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3ae7d751998c189c1d4468cf0a39bb2eae052a9c58d50ebb3b9591ee3813ad50" "checksum atty 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "21e50800ec991574876040fff8ee46b136a53e985286fbe6a3bdfe6421b78860" "checksum backtrace 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "8709cc7ec06f6f0ae6c2c7e12f6ed41540781f72b488d83734978295ceae182e" "checksum backtrace-sys 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)" = "44585761d6161b0f57afc49482ab6bd067e4edef48c12a152c237eb0203f7661" @@ -1728,7 +1841,9 @@ dependencies = [ "checksum cgmath 0.14.1 (registry+https://github.com/rust-lang/crates.io-index)" = "87f025a17ad3f30d49015c787903976d5f9cd6115ece1eb7f4d6ffe06b8c4080" "checksum chan 0.1.19 (registry+https://github.com/rust-lang/crates.io-index)" = "f93bfe971116428a9066c1c3c69a09ae3ef69432f8418be28ab50f96783e6a50" "checksum chan-signal 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f1f1e11f6e1c14c9e805a87c622cb8fcb636283b3119a2150af390cc6702d7fe" +"checksum chrono 0.2.25 (registry+https://github.com/rust-lang/crates.io-index)" = "9213f7cd7c27e95c2b57c49f0e69b1ea65b27138da84a170133fd21b07659c00" "checksum chrono 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7c20ebe0b2b08b0aeddba49c609fe7957ba2e33449882cb186a180bc60682fa9" +"checksum chunked_transfer 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "498d20a7aaf62625b9bf26e637cf7736417cde1d0c99f1d04d1170229a85cf87" "checksum clamp 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b0113a8ae379a061c89a71d57a809439f5ce550b6a76063ab5ba2b1cb180971f" "checksum clap 2.29.0 (registry+https://github.com/rust-lang/crates.io-index)" = "110d43e343eb29f4f51c1db31beb879d546db27998577e5715270a54bcf41d3f" "checksum coco 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "c06169f5beb7e31c7c67ebf5540b8b472d23e3eade3b2ec7d1f5b504a85f91bd" @@ -1747,6 +1862,13 @@ dependencies = [ "checksum elastic_types 0.20.6 (registry+https://github.com/rust-lang/crates.io-index)" = "1fa6399965bb591298757055ed1c94556b38116e48761a311327a57f993623b5" "checksum elastic_types_derive 0.20.6 (registry+https://github.com/rust-lang/crates.io-index)" = "7a7b9243ec6b2596eb663d61806f8d67431211e7a1e62f7ad84671a31678418f" "checksum elastic_types_derive_internals 0.20.6 (registry+https://github.com/rust-lang/crates.io-index)" = "4d9a2ecdadedfe80b681736be7ae8fe4ad6dc384fd576adf0a519e26c57cd053" +"checksum encoding 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)" = "6b0d943856b990d12d3b55b359144ff341533e516d94098b1d3fc1ac666d36ec" +"checksum encoding-index-japanese 1.20141219.5 (registry+https://github.com/rust-lang/crates.io-index)" = "04e8b2ff42e9a05335dbf8b5c6f7567e5591d0d916ccef4e0b1710d32a0d0c91" +"checksum encoding-index-korean 1.20141219.5 (registry+https://github.com/rust-lang/crates.io-index)" = "4dc33fb8e6bcba213fe2f14275f0963fd16f0a02c878e3095ecfdf5bee529d81" +"checksum encoding-index-simpchinese 1.20141219.5 (registry+https://github.com/rust-lang/crates.io-index)" = "d87a7194909b9118fc707194baa434a4e3b0fb6a5a757c73c3adb07aa25031f7" +"checksum encoding-index-singlebyte 1.20141219.5 (registry+https://github.com/rust-lang/crates.io-index)" = "3351d5acffb224af9ca265f435b859c7c01537c0849754d3db3fdf2bfe2ae84a" +"checksum encoding-index-tradchinese 1.20141219.5 (registry+https://github.com/rust-lang/crates.io-index)" = "fd0e20d5688ce3cab59eb3ef3a2083a5c77bf496cb798dc6fcdb75f323890c18" +"checksum encoding_index_tests 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "a246d82be1c9d791c5dfde9a2bd045fc3cbba3fa2b11ad558f27d01712f00569" "checksum env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "15abd780e45b3ea4f76b4e9a26ff4843258dd8a3eed2775a0e7368c2e7936c2f" "checksum error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ff511d5dc435d703f4971bc399647c9bc38e20cb41452e3b9feb4765419ed3f3" "checksum fern 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "1f0297637852905c7ce4cefd8d8a6719f6e3cfb22e2e9f0aa0650f4804a6f360" @@ -1869,7 +1991,8 @@ dependencies = [ "checksum thread_local 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)" = "8576dbbfcaef9641452d5cf0df9b0e7eeab7694956dd33bb61515fb8f18cfdd5" "checksum thread_local 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "279ef31c19ededf577bfd12dfae728040a21f635b06a24cd670ff510edd38963" "checksum time 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)" = "d5d788d3aa77bc0ef3e9621256885555368b47bd495c13dd2e7413c89f845520" -"checksum tokio-core 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "c843a027f7c1df5f81e7734a0df3f67bf329411781ebf36393ce67beef6071e3" +"checksum tiny_http 0.5.8 (registry+https://github.com/rust-lang/crates.io-index)" = "016f040cfc9b5be610de3619eaaa57017fa0b0b678187327bde75fc146e2a41f" +"checksum tokio-core 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)" = "c87c27560184212c9dc45cd8f38623f37918248aad5b58fb65303b5d07a98c6e" "checksum tokio-io 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "514aae203178929dbf03318ad7c683126672d4d96eccb77b29603d33c9e25743" "checksum tokio-proto 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8fbb47ae81353c63c487030659494b295f6cb6576242f907f203473b191b0389" "checksum tokio-service 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "24da22d077e0f15f55162bdbdc661228c1581892f52074fb242678d015b45162" @@ -1886,9 +2009,11 @@ dependencies = [ "checksum unicode-xid 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f860d7d29cf02cb2f3f359fd35991af3d30bac52c57d265a3c461074cb4dc" "checksum unreachable 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "382810877fe448991dfc7f0dd6e3ae5d58088fd0ea5e35189655f84e6814fa56" "checksum untrusted 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f392d7819dbe58833e26872f5f6f0d68b7bbbe90fc3667e98731c4a15ad9a7ae" +"checksum url 0.2.38 (registry+https://github.com/rust-lang/crates.io-index)" = "cbaa8377a162d88e7d15db0cf110c8523453edcbc5bc66d2b6fffccffa34a068" "checksum url 1.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "fa35e768d4daf1d85733418a49fb42e10d7f633e394fccab4ab7aba897053fe2" "checksum utf8-ranges 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "a1ca13c08c41c9c3e04224ed9ff80461d97e121589ff27c753a16cb10830ae0f" "checksum utf8-ranges 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "662fab6525a98beff2921d7f61a39e7d59e0b425ebc7d0d9e66d316e55124122" +"checksum uuid 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)" = "78c590b5bd79ed10aad8fb75f078a59d8db445af6c743e55c4a53227fc01c13f" "checksum uuid 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "bcc7e3b898aa6f6c08e5295b6c89258d1331e9ac578cc992fb818759951bdc22" "checksum vcpkg 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "9e0a7d8bed3178a8fb112199d466eeca9ed09a14ba8ad67718179b4fd5487d0b" "checksum vec_map 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "887b5b631c2ad01628bbbaa7dd4c869f80d3186688f8d0b6f58774fbe324988c" diff --git a/Cargo.toml b/Cargo.toml index e9e5507a..07de87ea 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,10 +39,13 @@ seahash = "3.0" serde = { version = "1.0", features = ["rc"] } serde_derive = "1.0" serde_json = "1.0" +slab = "0.4" toml = "0.4" url = "1.6" uuid = {version = "0.5", features = ["v4"]} -chan-signal = "0.3" +chan-signal = "0.3.1" +mio = "0.6.11" +tiny_http = "0.5.8" [dev-dependencies] tempdir = "0.3" diff --git a/src/bin/cernan.rs b/src/bin/cernan.rs index c3f33f9e..34f18771 100644 --- a/src/bin/cernan.rs +++ b/src/bin/cernan.rs @@ -5,17 +5,19 @@ extern crate chan_signal; extern crate chrono; extern crate fern; extern crate hopper; +extern crate mio; + #[macro_use] extern crate log; extern crate openssl_probe; +use cernan::constants; use cernan::filter::{DelayFilterConfig, Filter, FlushBoundaryFilterConfig, ProgrammableFilterConfig}; use cernan::metric; use cernan::sink::Sink; use cernan::source::Source; use cernan::util; -use chan_signal::Signal; use chrono::Utc; use std::collections::{HashMap, HashSet}; use std::mem; @@ -23,6 +25,18 @@ use std::process; use std::str; use std::thread; +#[derive(Debug)] +struct SourceWorker { + thread: std::thread::JoinHandle<()>, + readiness: mio::SetReadiness, +} + +#[derive(Debug)] +struct SinkWorker { + thread: std::thread::JoinHandle<()>, + sender: hopper::Sender, +} + fn populate_forwards( sends: &mut util::Channel, mut top_level_forwards: Option<&mut HashSet>, @@ -49,6 +63,26 @@ fn populate_forwards( } } +fn join_all(workers: HashMap) { + for (_worker_id, worker) in workers { + worker.thread.join().expect("Failed to join worker"); + } +} + +fn broadcast_shutdown(workers: &HashMap) { + let mut source_channels = Vec::new(); + for (id, worker) in workers.iter() { + println!("Signaling shutdown to {:?}", id); + source_channels.push(worker.sender.clone()); + } + + if !source_channels.is_empty() { + cernan::util::send(&mut source_channels, cernan::metric::Event::Shutdown); + } + + drop(source_channels); +} + macro_rules! cfg_conf { ($config:ident) => { $config.config_path.clone().expect("[INTERNAL ERROR] no config_path") @@ -69,7 +103,8 @@ fn main() { _ => log::LogLevelFilter::Trace, }; - let signal = chan_signal::notify(&[Signal::INT, Signal::TERM]); + let signal = + chan_signal::notify(&[chan_signal::Signal::INT, chan_signal::Signal::TERM]); fern::Dispatch::new() .format(|out, message, record| { @@ -88,7 +123,20 @@ fn main() { .expect("could not set up logging"); info!("cernan - {}", args.version); - let mut joins = Vec::new(); + + // We track the various child threads in order to support graceful shutdown. + // There are currently two paths used to communicate shutdown: + // + // 1) A semaphore is used to signal shutdown to sources. As generates of + // events, sources are shutdown first to ensure at least once processing. + // + // 2) metrics::Event::Shutdown is sent to Hopper channels for Filters and Sinks + // after all sources have shutdown. Shutdown events serve to bookend queued + // events, once read it is safe for these workers to flush any pending writes + // and shutdown. + let mut sources: HashMap = HashMap::new(); + let mut sinks: HashMap = HashMap::new(); + let mut filters: HashMap = HashMap::new(); let mut senders: HashMap> = HashMap::new(); let mut receivers: HashMap> = HashMap::new(); @@ -237,6 +285,7 @@ fn main() { } } // SOURCES + // if let Some(ref configs) = args.native_server_config { for (config_path, config) in configs { config_topology.insert(config_path.clone(), config.forwards.clone()); @@ -259,7 +308,8 @@ fn main() { } if let Some(ref configs) = args.files { for config in configs { - config_topology.insert(cfg_conf!(config), config.forwards.clone()); + let config_path = cfg_conf!(config); + config_topology.insert(config_path.clone(), config.forwards.clone()); } } @@ -283,76 +333,146 @@ fn main() { // if let Some(config) = mem::replace(&mut args.null, None) { let recv = receivers.remove(&config.config_path).unwrap(); - joins.push(thread::spawn(move || { - cernan::sink::Null::new(&config).run(recv); - })); + sinks.insert( + config.config_path.clone(), + SinkWorker { + sender: senders + .get(&config.config_path.clone()) + .expect("Oops") + .clone(), + thread: thread::spawn(move || { + cernan::sink::Null::new(&config).run(recv); + }), + }, + ); } if let Some(config) = mem::replace(&mut args.console, None) { let recv = receivers .remove(&config.config_path.clone().unwrap()) .unwrap(); - joins.push(thread::spawn(move || { - cernan::sink::Console::new(&config).run(recv); - })); + sinks.insert( + config.config_path.clone().unwrap(), + SinkWorker { + sender: senders + .get(&config.config_path.clone().unwrap()) + .expect("Oops") + .clone(), + thread: thread::spawn(move || { + cernan::sink::Console::new(&config).run(recv); + }), + }, + ); } if let Some(config) = mem::replace(&mut args.wavefront, None) { let recv = receivers .remove(&config.config_path.clone().unwrap()) .unwrap(); - joins.push(thread::spawn( - move || match cernan::sink::Wavefront::new(config) { - Ok(mut w) => { - w.run(recv); - } - Err(e) => { - error!("Configuration error for Wavefront: {}", e); - process::exit(1); - } + sinks.insert( + config.config_path.clone().unwrap(), + SinkWorker { + sender: senders + .get(&config.config_path.clone().unwrap()) + .expect("Oops") + .clone(), + thread: thread::spawn(move || { + match cernan::sink::Wavefront::new(config) { + Ok(mut w) => { + w.run(recv); + } + Err(e) => { + error!("Configuration error for Wavefront: {}", e); + process::exit(1); + } + } + }), }, - )); + ); } if let Some(config) = mem::replace(&mut args.prometheus, None) { let recv = receivers .remove(&config.config_path.clone().unwrap()) .unwrap(); - joins.push(thread::spawn(move || { - cernan::sink::Prometheus::new(&config).run(recv); - })); + sinks.insert( + config.config_path.clone().unwrap(), + SinkWorker { + sender: senders + .get(&config.config_path.clone().unwrap()) + .expect("Oops") + .clone(), + thread: thread::spawn(move || { + cernan::sink::Prometheus::new(&config).run(recv); + }), + }, + ); } if let Some(config) = mem::replace(&mut args.influxdb, None) { let recv = receivers .remove(&config.config_path.clone().unwrap()) .unwrap(); - joins.push(thread::spawn(move || { - cernan::sink::InfluxDB::new(&config).run(recv); - })); + sinks.insert( + config.config_path.clone().unwrap(), + SinkWorker { + sender: senders + .get(&config.config_path.clone().unwrap()) + .expect("Oops") + .clone(), + thread: thread::spawn(move || { + cernan::sink::InfluxDB::new(&config).run(recv); + }), + }, + ); } if let Some(config) = mem::replace(&mut args.native_sink_config, None) { let recv = receivers .remove(&config.config_path.clone().unwrap()) .unwrap(); - joins.push(thread::spawn(move || { - cernan::sink::Native::new(config).run(recv); - })); + sinks.insert( + config.config_path.clone().unwrap(), + SinkWorker { + sender: senders + .get(&config.config_path.clone().unwrap()) + .expect("Oops") + .clone(), + thread: thread::spawn(move || { + cernan::sink::Native::new(config).run(recv); + }), + }, + ); } - if let Some(config) = mem::replace(&mut args.elasticsearch, None) { let recv = receivers .remove(&config.config_path.clone().unwrap()) .unwrap(); - joins.push(thread::spawn(move || { - cernan::sink::Elasticsearch::new(config).run(recv); - })); + sinks.insert( + config.config_path.clone().unwrap(), + SinkWorker { + sender: senders + .get(&config.config_path.clone().unwrap()) + .expect("Oops") + .clone(), + thread: thread::spawn(move || { + cernan::sink::Elasticsearch::new(config).run(recv); + }), + }, + ); } - if let Some(cfgs) = mem::replace(&mut args.firehosen, None) { for config in cfgs { let recv = receivers .remove(&config.config_path.clone().unwrap()) .unwrap(); - joins.push(thread::spawn(move || { - cernan::sink::Firehose::new(config).run(recv); - })); + sinks.insert( + config.config_path.clone().unwrap(), + SinkWorker { + sender: senders + .get(&config.config_path.clone().unwrap()) + .expect("Oops") + .clone(), + thread: thread::spawn(move || { + cernan::sink::Firehose::new(config).run(recv); + }), + }, + ); } } @@ -375,9 +495,19 @@ fn main() { .expect("[INTERNAL ERROR] no config_path"), &senders, ); - joins.push(thread::spawn(move || { - cernan::filter::ProgrammableFilter::new(c).run(recv, downstream_sends); - })); + filters.insert( + config.config_path.clone().unwrap(), + SinkWorker { + sender: senders + .get(&config.config_path.clone().unwrap()) + .expect("Oops") + .clone(), + thread: thread::spawn(move || { + cernan::filter::ProgrammableFilter::new(c) + .run(recv, downstream_sends); + }), + }, + ); } }); @@ -398,9 +528,19 @@ fn main() { .expect("[INTERNAL ERROR] no config_path"), &senders, ); - joins.push(thread::spawn(move || { - cernan::filter::DelayFilter::new(&c).run(recv, downstream_sends); - })); + filters.insert( + config.config_path.clone().unwrap(), + SinkWorker { + sender: senders + .get(&config.config_path.clone().unwrap()) + .expect("Oops") + .clone(), + thread: thread::spawn(move || { + cernan::filter::DelayFilter::new(&c) + .run(recv, downstream_sends); + }), + }, + ); } }); @@ -421,18 +561,29 @@ fn main() { .expect("[INTERNAL ERROR] no config_path"), &senders, ); - joins.push(thread::spawn(move || { - cernan::filter::FlushBoundaryFilter::new(&c) - .run(recv, downstream_sends); - })); + filters.insert( + config.config_path.clone().unwrap(), + SinkWorker { + sender: senders + .get(&config.config_path.clone().unwrap()) + .expect("Oops") + .clone(), + thread: thread::spawn(move || { + cernan::filter::FlushBoundaryFilter::new(&c) + .run(recv, downstream_sends); + }), + }, + ); } }); // SOURCES // mem::replace(&mut args.native_server_config, None).map(|cfg_map| { - for (_, config) in cfg_map { + for (config_path, config) in cfg_map { let mut native_server_send = Vec::new(); + let poll = mio::Poll::new().unwrap(); + let (registration, readiness) = mio::Registration::new2(); populate_forwards( &mut native_server_send, Some(&mut flush_sends), @@ -440,13 +591,28 @@ fn main() { &cfg_conf!(config), &senders, ); - joins.push(thread::spawn(move || { - cernan::source::NativeServer::new(native_server_send, config).run(); - })) + sources.insert( + config_path.clone(), + SourceWorker { + readiness: readiness, + thread: thread::spawn(move || { + poll.register( + ®istration, + constants::SYSTEM, + mio::Ready::readable(), + mio::PollOpt::edge(), + ).expect("Poll failed to return a result!"); + cernan::source::NativeServer::new(native_server_send, config) + .run(poll); + }), + }, + ); } }); let internal_config = mem::replace(&mut args.internal, Default::default()); + let poll = mio::Poll::new().unwrap(); + let (registration, readiness) = mio::Registration::new2(); let mut internal_send = Vec::new(); populate_forwards( &mut internal_send, @@ -455,13 +621,28 @@ fn main() { &cfg_conf!(internal_config), &senders, ); - joins.push(thread::spawn(move || { - cernan::source::Internal::new(internal_send, internal_config).run(); - })); + sources.insert( + internal_config.config_path.clone().unwrap(), + SourceWorker { + readiness: readiness, + thread: thread::spawn(move || { + poll.register( + ®istration, + constants::SYSTEM, + mio::Ready::readable(), + mio::PollOpt::edge(), + ).expect("Poll failed to return a result!"); + cernan::source::Internal::new(internal_send, internal_config) + .run(poll); + }), + }, + ); mem::replace(&mut args.statsds, None).map(|cfg_map| { - for (_, config) in cfg_map { + for (config_path, config) in cfg_map { let mut statsd_sends = Vec::new(); + let poll = mio::Poll::new().unwrap(); + let (registration, readiness) = mio::Registration::new2(); populate_forwards( &mut statsd_sends, Some(&mut flush_sends), @@ -469,15 +650,29 @@ fn main() { &cfg_conf!(config), &senders, ); - joins.push(thread::spawn(move || { - cernan::source::Statsd::new(statsd_sends, config).run(); - })); + sources.insert( + config_path.clone(), + SourceWorker { + readiness: readiness.clone(), + thread: thread::spawn(move || { + poll.register( + ®istration, + constants::SYSTEM, + mio::Ready::readable(), + mio::PollOpt::edge(), + ).expect("Poll failed to return a result!"); + cernan::source::Statsd::new(statsd_sends, config).run(poll); + }), + }, + ); } }); mem::replace(&mut args.graphites, None).map(|cfg_map| { - for (_, config) in cfg_map { + for (config_path, config) in cfg_map { let mut graphite_sends = Vec::new(); + let poll = mio::Poll::new().unwrap(); + let (registration, readiness) = mio::Registration::new2(); populate_forwards( &mut graphite_sends, Some(&mut flush_sends), @@ -485,15 +680,30 @@ fn main() { &cfg_conf!(config), &senders, ); - joins.push(thread::spawn(move || { - cernan::source::Graphite::new(graphite_sends, config).run(); - })); + sources.insert( + config_path.clone(), + SourceWorker { + readiness: readiness, + thread: thread::spawn(move || { + poll.register( + ®istration, + constants::SYSTEM, + mio::Ready::readable(), + mio::PollOpt::edge(), + ).expect("Poll failed to return a result!"); + cernan::source::Graphite::new(graphite_sends, config) + .run(poll); + }), + }, + ); } }); mem::replace(&mut args.files, None).map(|cfg| { for config in cfg { let mut fp_sends = Vec::new(); + let poll = mio::Poll::new().unwrap(); + let (registration, readiness) = mio::Registration::new2(); populate_forwards( &mut fp_sends, Some(&mut flush_sends), @@ -501,16 +711,29 @@ fn main() { &cfg_conf!(config), &senders, ); - joins.push(thread::spawn(move || { - cernan::source::FileServer::new(fp_sends, config).run(); - })); + sources.insert( + cfg_conf!(config).clone(), + SourceWorker { + readiness: readiness, + thread: thread::spawn(move || { + poll.register( + ®istration, + constants::SYSTEM, + mio::Ready::readable(), + mio::PollOpt::edge(), + ).expect("Poll failed to return a result!"); + cernan::source::FileServer::new(fp_sends, config).run(poll); + }), + }, + ); } }); // BACKGROUND // - joins.push(thread::spawn(move || { + thread::spawn(move || { let mut flush_channels = Vec::new(); + let poll = mio::Poll::new().unwrap(); for destination in &flush_sends { match senders.get(destination) { Some(snd) => { @@ -527,21 +750,31 @@ fn main() { } drop(flush_sends); drop(senders); - cernan::source::FlushTimer::new(flush_channels).run(); - })); + cernan::source::FlushTimer::new(flush_channels).run(poll); + }); - joins.push(thread::spawn(move || { + thread::spawn(move || { cernan::time::update_time(); - })); + }); drop(args); drop(config_topology); drop(receivers); - // TODO - Enable IPC - // TODO - Make use of IPC to: - // * Monitor child thread application-layer health. - // * Communicate graceful shutdown to source/sink/filter child threads on - // signal. signal.recv().unwrap(); + + // First, we shut down source to quiesce event generation. + for (id, source_worker) in sources { + println!("Signaling shutdown to {:?}", id); + source_worker + .readiness + .set_readiness(mio::Ready::readable()) + .expect("Oops!"); + source_worker.thread.join().expect("Failed during join!"); + } + + broadcast_shutdown(&filters); + broadcast_shutdown(&sinks); + join_all(sinks); + join_all(filters); } diff --git a/src/constants.rs b/src/constants.rs new file mode 100644 index 00000000..0071f6b9 --- /dev/null +++ b/src/constants.rs @@ -0,0 +1,10 @@ +//! Library level constants + +extern crate mio; + +/// MIO token used to distinguish system events +/// from other event sources. +/// +/// Note - It is assumed that sources will not hold +/// more than 2048 addressable streams, 0 indexed. +pub const SYSTEM: mio::Token = mio::Token(2048); diff --git a/src/filter/delay_filter.rs b/src/filter/delay_filter.rs index 26616b94..8ba1221a 100644 --- a/src/filter/delay_filter.rs +++ b/src/filter/delay_filter.rs @@ -80,6 +80,9 @@ impl filter::Filter for DelayFilter { metric::Event::TimerFlush(f) => { res.push(metric::Event::TimerFlush(f)); } + metric::Event::Shutdown => { + res.push(metric::Event::Shutdown); + } } Ok(()) } diff --git a/src/filter/programmable_filter.rs b/src/filter/programmable_filter.rs index d90f453d..73aa10e7 100644 --- a/src/filter/programmable_filter.rs +++ b/src/filter/programmable_filter.rs @@ -638,6 +638,10 @@ impl filter::Filter for ProgrammableFilter { } Ok(()) } + metric::Event::Shutdown => { + res.push(metric::Event::Shutdown); + Ok(()) + } } } } diff --git a/src/http.rs b/src/http.rs new file mode 100644 index 00000000..637464fd --- /dev/null +++ b/src/http.rs @@ -0,0 +1,87 @@ +//! Tiny, unassuming HTTP Server + +extern crate tiny_http; + +use std; +use thread; + +/// HTTP request. Alias of `tiny_http::Request`. +pub type Request = tiny_http::Request; +/// HTTP response. Alias of `tiny_http::Response`. +pub type Response<'a> = tiny_http::Response<&'a [u8]>; +/// HTTP header. Alias of `tiny_http::Header`. +pub type Header = tiny_http::Header; +/// HTTP header field. Alias of `tiny_http::HeaderField`. +pub type HeaderField = tiny_http::HeaderField; +/// HTTP status code. Alias of `tiny_http::StatusCode`. +pub type StatusCode = tiny_http::StatusCode; + +/// Simple single threaded HTTP request handler. +pub trait Handler: Sync + Send { + /// Handler for a single HTTP request. + fn handle(&self, request: Request) -> (); +} + +/// Single threaded HTTP server. +pub struct Server { + /// Thread handle for the operating HTTP server. + thread: thread::ThreadHandle, +} + +fn http_server( + poller: &thread::Poll, + tiny_http_server: &tiny_http::Server, + handler: &H, +) -> () +where + H: Handler, +{ + loop { + let mut events = thread::Events::with_capacity(1024); + match poller.poll(&mut events, Some(std::time::Duration::from_millis(5))) { + Ok(num_events) if num_events > 0 => { + break; + } + + Ok(_) => match tiny_http_server + .recv_timeout(std::time::Duration::from_millis(1000)) + { + Ok(maybe_a_request) => if let Some(request) = maybe_a_request { + handler.handle(request); + }, + + Err(e) => { + panic!(format!("Failed during recv_timeout {:?}", e)); + } + }, + + Err(e) => { + panic!(format!("Failed during poll {:?}", e)); + } + }; + } +} + +/// Single threaded HTTP server implementation. +impl Server { + /// Create and start an HTTP server on the given host and port. + pub fn new(host_port: String, handler: H) -> Self { + Server { + thread: thread::spawn(move |poller| { + let tiny_http_server = tiny_http::Server::http(host_port).unwrap(); + http_server(&poller, &tiny_http_server, &handler) + }), + } + } +} + +/// Graceful shutdown support for Server. +impl thread::Stoppable for Server { + fn join(self) { + self.thread.join(); + } + + fn shutdown(self) { + self.thread.shutdown(); + } +} diff --git a/src/lib.rs b/src/lib.rs index 8c10086c..ef240187 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,6 +28,7 @@ extern crate hopper; extern crate hyper; extern crate libc; extern crate lua; +extern crate mio; extern crate protobuf; extern crate quantiles; extern crate regex; @@ -36,6 +37,7 @@ extern crate rusoto_firehose; extern crate seahash; #[macro_use] extern crate serde_json; +extern crate slab; extern crate toml; extern crate url; extern crate uuid; @@ -60,4 +62,7 @@ pub mod time; pub mod source; pub mod filter; pub mod util; +pub mod constants; +pub mod thread; pub mod protocols; +pub mod http; diff --git a/src/metric/event.rs b/src/metric/event.rs index 693f5ca7..eac4c50c 100644 --- a/src/metric/event.rs +++ b/src/metric/event.rs @@ -18,6 +18,10 @@ pub enum Event { /// flushes made in this cernan's run. See `source::Flush` for the origin of /// these pulses in cernan operation. TimerFlush(u64), + /// Shutdown event which marks the location in the queue after which no + /// more events will appear. It is expected that after receiving this + /// marker the given source will exit cleanly. + Shutdown, } impl Event { @@ -48,7 +52,7 @@ impl Event { None => None, } } - Event::TimerFlush(_) => None, + Event::TimerFlush(_) | Event::Shutdown => None, } } } diff --git a/src/sink/console.rs b/src/sink/console.rs index 1ad4c52f..fff1e8d7 100644 --- a/src/sink/console.rs +++ b/src/sink/console.rs @@ -199,6 +199,10 @@ impl Sink for Console { self.aggrs.reset(); } + + fn shutdown(mut self) -> () { + self.flush(); + } } #[inline] diff --git a/src/sink/elasticsearch.rs b/src/sink/elasticsearch.rs index 23b85649..9ef611af 100644 --- a/src/sink/elasticsearch.rs +++ b/src/sink/elasticsearch.rs @@ -174,6 +174,7 @@ impl Sink for Elasticsearch { Some(self.flush_interval) } + #[allow(cyclomatic_complexity)] fn flush(&mut self) { if self.buffer.is_empty() { return; @@ -337,6 +338,10 @@ impl Sink for Elasticsearch { } } + fn shutdown(mut self) -> () { + self.flush(); + } + fn deliver(&mut self, _: sync::Arc>) -> () { // nothing, intentionally } diff --git a/src/sink/firehose.rs b/src/sink/firehose.rs index 68cb1a38..d6f56f7a 100644 --- a/src/sink/firehose.rs +++ b/src/sink/firehose.rs @@ -295,6 +295,10 @@ impl Sink for Firehose { self.buffer.clear(); } + fn shutdown(mut self) -> () { + self.flush(); + } + fn deliver(&mut self, _: sync::Arc>) -> () { // nothing, intentionally } diff --git a/src/sink/influxdb.rs b/src/sink/influxdb.rs index 01504939..c38ad1c6 100644 --- a/src/sink/influxdb.rs +++ b/src/sink/influxdb.rs @@ -264,6 +264,10 @@ impl Sink for InfluxDB { self.aggrs.clear(); } + fn shutdown(mut self) -> () { + self.flush(); + } + fn deliver(&mut self, mut point: sync::Arc>) -> () { self.aggrs .push(sync::Arc::make_mut(&mut point).take().unwrap()); diff --git a/src/sink/mod.rs b/src/sink/mod.rs index 2721afd9..b8d501c7 100644 --- a/src/sink/mod.rs +++ b/src/sink/mod.rs @@ -50,6 +50,9 @@ pub trait Sink { /// Deliver a `LogLine` to the `Sink`. Exact behaviour varies by /// implementation. fn deliver_line(&mut self, line: sync::Arc>) -> (); + /// Provide a hook to shutdown a sink. This is necessary for sinks which + /// have their own long-running threads. + fn shutdown(self) -> (); /// The run-loop of the `Sink`. It's expect that few sinks will ever need to /// provide their own implementation. Please take care to obey `Valve` /// states and `flush_interval` configurations. @@ -121,6 +124,17 @@ pub trait Sink { self.deliver_line(line); break; } + Event::Shutdown => { + // Invariant - In order to ensure at least once delivery + // at the sink level, the following properties must hold: + // + // 1) Shutdown events only ever appear at the end + // of a queue. + // + // 2) The given sink synchronously flushes any + // internal memory. + return; + } }, Valve::Closed => { self.flush(); diff --git a/src/sink/native.rs b/src/sink/native.rs index c0bd7152..c37ec4ff 100644 --- a/src/sink/native.rs +++ b/src/sink/native.rs @@ -234,4 +234,8 @@ impl Sink for Native { } } } + + fn shutdown(mut self) -> () { + self.flush(); + } } diff --git a/src/sink/null.rs b/src/sink/null.rs index ddcd3977..ba9b9d6b 100644 --- a/src/sink/null.rs +++ b/src/sink/null.rs @@ -51,4 +51,8 @@ impl Sink for Null { fn flush(&mut self) { // do nothing } + + fn shutdown(mut self) -> () { + self.flush(); + } } diff --git a/src/sink/prometheus.rs b/src/sink/prometheus.rs index cd6825b4..cdb8228d 100644 --- a/src/sink/prometheus.rs +++ b/src/sink/prometheus.rs @@ -10,18 +10,14 @@ //! //! All points are retained indefinitely in their aggregation. -use flate2::Compression; +extern crate log; + use flate2::write::GzEncoder; -use hyper::header::{ContentEncoding, Encoding}; -use hyper::server::{Handler, Listening, Request, Response, Server}; +use http; use metric; use metric::{AggregationMethod, TagMap}; -use protobuf::Message; -use protobuf::repeated::RepeatedField; -use protocols::prometheus::*; use quantiles::histogram::Bound; use sink::{Sink, Valve}; -use std::collections::HashSet; use std::f64; use std::io; use std::io::Write; @@ -31,6 +27,7 @@ use std::sync::Arc; use std::sync::Mutex; use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::Instant; +use thread::Stoppable; use time; lazy_static! { @@ -42,10 +39,8 @@ lazy_static! { pub static ref PROMETHEUS_AGGR_WINDOWED_LEN: Arc = Arc::new(AtomicUsize::new(0)); /// Total remaining metrics in aggr pub static ref PROMETHEUS_AGGR_REMAINING: Arc = Arc::new(AtomicUsize::new(0)); - /// Total writes to binary - pub static ref PROMETHEUS_WRITE_BINARY: Arc = Arc::new(AtomicUsize::new(0)); - /// Total writes to text - pub static ref PROMETHEUS_WRITE_TEXT: Arc = Arc::new(AtomicUsize::new(0)); + /// Total report successes + pub static ref PROMETHEUS_REPORT_SUCCESS: Arc = Arc::new(AtomicUsize::new(0)); /// Total report errors pub static ref PROMETHEUS_REPORT_ERROR: Arc = Arc::new(AtomicUsize::new(0)); /// Sum of delays in reporting (microseconds) @@ -63,9 +58,7 @@ pub struct Prometheus { thrd_aggr: sync::Arc>>, aggrs: PrometheusAggr, age_threshold: Option, - // `http_srv` is never used but we must keep it in this struct to avoid the - // listening server being dropped - http_srv: Listening, + http_srv: http::Server, } /// The configuration for Prometheus sink @@ -97,10 +90,6 @@ impl Default for PrometheusConfig { } } -struct SenderHandler { - aggr: sync::Arc>>, -} - #[derive(Clone, Debug)] enum Accumulator { Perpetual(metric::Telemetry), @@ -386,57 +375,54 @@ impl<'a> Iterator for Iter<'a> { } } -impl Handler for SenderHandler { - fn handle(&self, req: Request, res: Response) { +struct PrometheusHandler { + aggr: sync::Arc>>, +} + +impl http::Handler for PrometheusHandler { + fn handle(&self, request: http::Request) -> () { match *self.aggr.lock().unwrap() { - None => { - return; - } + None => {} Some(ref aggr) => { PROMETHEUS_AGGR_REPORTABLE.store(aggr.count(), Ordering::Relaxed); PROMETHEUS_AGGR_REMAINING.store(aggr.count(), Ordering::Relaxed); - // Typed hyper::mime is challenging to use. In particular, matching - // does not seem to work like I expect and handling - // all other MIME cases in the existing enum strikes - // me as a fool's errand, on account of there may be an - // infinite number of MIMEs that'll come right on in. We'll - // just be monsters and assume if you aren't asking for protobuf you're - // asking for plaintext. - let accept: Vec<&str> = req.headers - .get_raw("accept") - .unwrap_or(&[]) - .iter() - .map(|x| str::from_utf8(x)) - .filter(|x| x.is_ok()) - .map(|x| x.unwrap()) - .collect(); - let mut accept_proto = false; - for hdr in &accept { - if hdr.contains("application/vnd.google.protobuf;") { - accept_proto = true; - break; - } - } let reportable = aggr.reportable(); let now = Instant::now(); - let (res, elapsed) = if accept_proto { - PROMETHEUS_WRITE_BINARY.fetch_add(1, Ordering::Relaxed); - let res = write_binary(reportable, res); - let elapsed = now.elapsed(); - (res, elapsed) - } else { - PROMETHEUS_WRITE_TEXT.fetch_add(1, Ordering::Relaxed); - let res = write_text(reportable, res); - let elapsed = now.elapsed(); - (res, elapsed) - }; + let mut buffer = Vec::new(); + let buffer = write_text(reportable, buffer).unwrap(); + let elapsed = now.elapsed(); let us = ((elapsed.as_secs() as f64) * 10_000.0) + (f64::from(elapsed.subsec_nanos()) / 100_000.0); PROMETHEUS_RESPONSE_DELAY_SUM .fetch_add(us as usize, Ordering::Relaxed); - if res.is_err() { - PROMETHEUS_REPORT_ERROR.fetch_add(1, Ordering::Relaxed); - } + let content_encoding = "gzip"; + let content_type = "text/plain; version=0.0.4"; + let headers = [ + http::Header::from_bytes(&b"Content-Type"[..], content_type) + .unwrap(), + http::Header::from_bytes( + &b"Content-Encoding"[..], + content_encoding, + ).unwrap(), + ]; + + let response = http::Response::new( + http::StatusCode::from(200), + headers.to_vec(), + &buffer[..], + Some(buffer.len()), + None, + ); + + match request.respond(response) { + Ok(_) => { + PROMETHEUS_REPORT_SUCCESS.fetch_add(1, Ordering::Relaxed); + } + Err(e) => { + PROMETHEUS_REPORT_ERROR.fetch_add(1, Ordering::Relaxed); + warn!("Failed to send prometheus response! {:?}", e); + } + }; } } } @@ -446,153 +432,31 @@ impl Prometheus { /// Create a new prometheus sink /// /// Please see documentation on `PrometheusConfig` for more details. - pub fn new(config: &PrometheusConfig) -> Prometheus { + pub fn new(config: &PrometheusConfig) -> Self { let aggrs = PrometheusAggr::new(config.capacity_in_seconds); let thrd_aggrs = sync::Arc::new(sync::Mutex::new(None)); let srv_aggrs = sync::Arc::clone(&thrd_aggrs); - let listener = Server::http((config.host.as_str(), config.port)) - .unwrap() - .handle_threads(SenderHandler { aggr: srv_aggrs }, 1) - .unwrap(); + let host_port = + format!("{}:{}", config.host.as_str(), config.port.to_string()); Prometheus { aggrs: aggrs, thrd_aggr: thrd_aggrs, - http_srv: listener, age_threshold: config.age_threshold, + http_srv: http::Server::new( + host_port, + PrometheusHandler { aggr: srv_aggrs }, + ), } } } -fn write_binary(aggrs: Iter, mut res: Response) -> io::Result<()> { - res.headers_mut().set_raw( - "content-type", - vec![ - b"application/vnd.google.protobuf; \ - proto=io.prometheus.client.MetricFamily; encoding=delimited" - .to_vec(), - ], - ); - let mut res = res.start()?; - for value in aggrs { - let sanitized_name: String = sanitize(&value.name); - match value.kind() { - AggregationMethod::Sum => if let Some(v) = value.sum() { - let mut metric_family = MetricFamily::new(); - metric_family.set_name(sanitized_name); - let mut metric = Metric::new(); - let mut label_pairs = Vec::with_capacity(8); - for &(ref k, ref v) in value.tags.iter() { - let mut lp = LabelPair::new(); - lp.set_name(k.clone()); - lp.set_value(v.clone()); - label_pairs.push(lp); - } - metric.set_label(RepeatedField::from_vec(label_pairs)); - let mut counter = Counter::new(); - counter.set_value(v); - metric.set_counter(counter); - metric_family.set_field_type(MetricType::COUNTER); - metric_family.set_metric(RepeatedField::from_vec(vec![metric])); - metric_family.write_length_delimited_to_writer(res.by_ref())? - }, - AggregationMethod::Set => if let Some(v) = value.set() { - let mut metric_family = MetricFamily::new(); - metric_family.set_name(sanitized_name); - let mut metric = Metric::new(); - let mut label_pairs = Vec::with_capacity(8); - for &(ref k, ref v) in value.tags.iter() { - let mut lp = LabelPair::new(); - lp.set_name(k.clone()); - lp.set_value(v.clone()); - label_pairs.push(lp); - } - metric.set_label(RepeatedField::from_vec(label_pairs)); - let mut gauge = Gauge::new(); - gauge.set_value(v); - metric.set_gauge(gauge); - metric_family.set_field_type(MetricType::GAUGE); - metric_family.set_metric(RepeatedField::from_vec(vec![metric])); - metric_family.write_length_delimited_to_writer(res.by_ref())? - }, - AggregationMethod::Summarize => { - let mut metric_family = MetricFamily::new(); - metric_family.set_name(sanitized_name); - let mut metric = Metric::new(); - let mut label_pairs = Vec::with_capacity(8); - for &(ref k, ref v) in value.tags.iter() { - let mut lp = LabelPair::new(); - lp.set_name(k.clone()); - lp.set_value(v.clone()); - label_pairs.push(lp); - } - metric.set_label(RepeatedField::from_vec(label_pairs)); - let retained_count = value.count(); - let retained_sum = value.samples_sum().unwrap_or(0.0); - let mut summary = Summary::new(); - summary.set_sample_count((retained_count + value.count()) as u64); - if let Some(val) = value.samples_sum() { - summary.set_sample_sum(retained_sum + val); - } - let mut quantiles = Vec::with_capacity(9); - for q in &[0.0, 1.0, 0.25, 0.5, 0.75, 0.90, 0.95, 0.99, 0.999] { - let mut quantile = Quantile::new(); - quantile.set_quantile(*q); - quantile.set_value(value.query(*q).unwrap()); - quantiles.push(quantile); - } - summary.set_quantile(RepeatedField::from_vec(quantiles)); - metric.set_summary(summary); - metric_family.set_field_type(MetricType::SUMMARY); - metric_family.set_metric(RepeatedField::from_vec(vec![metric])); - metric_family.write_length_delimited_to_writer(res.by_ref())? - } - AggregationMethod::Histogram => { - let mut metric_family = MetricFamily::new(); - metric_family.set_name(sanitized_name); - let mut metric = Metric::new(); - let mut label_pairs = Vec::with_capacity(8); - for &(ref k, ref v) in value.tags.iter() { - let mut lp = LabelPair::new(); - lp.set_name(k.clone()); - lp.set_value(v.clone()); - label_pairs.push(lp); - } - metric.set_label(RepeatedField::from_vec(label_pairs)); - let mut histogram = Histogram::new(); - histogram.set_sample_count(value.count() as u64); - if let Some(val) = value.samples_sum() { - histogram.set_sample_sum(val) - } - let mut buckets = Vec::with_capacity(16); - if let Some(bin_iter) = value.bins() { - let mut cummulative: u64 = 0; - for &(bound, val) in bin_iter { - let bnd = match bound { - Bound::Finite(bnd) => bnd, - Bound::PosInf => f64::INFINITY, - }; - cummulative += val as u64; - let mut bucket = Bucket::new(); - bucket.set_cumulative_count(cummulative); - bucket.set_upper_bound(bnd); - buckets.push(bucket); - } - } - histogram.set_bucket(RepeatedField::from_vec(buckets)); - metric.set_histogram(histogram); - metric_family.set_field_type(MetricType::HISTOGRAM); - metric_family.set_metric(RepeatedField::from_vec(vec![metric])); - metric_family.write_length_delimited_to_writer(res.by_ref())? - } - } - } - res.end() -} - #[allow(cyclomatic_complexity)] #[inline] -fn fmt_tags(tags: &TagMap, s: &mut GzEncoder>) -> () { +fn fmt_tags(tags: &TagMap, s: &mut GzEncoder) -> () +where + W: Write, +{ if tags.is_empty() { let _ = s.write(b""); } else { @@ -618,15 +482,15 @@ fn fmt_tags(tags: &TagMap, s: &mut GzEncoder>) -> () { } } -fn write_text(aggrs: Iter, mut res: Response) -> io::Result<()> { - { - let headers = res.headers_mut(); - headers.set(ContentEncoding(vec![Encoding::Gzip])); - headers.set_raw("content-type", vec![b"text/plain; version=0.0.4".to_vec()]); - } - let mut res = res.start()?; +fn write_text(aggrs: Iter, buffer: W) -> io::Result +where + W: Write, +{ + use flate2::Compression; + use std::collections::HashSet; + let mut seen: HashSet = HashSet::new(); - let mut enc = GzEncoder::new(Vec::with_capacity(1024), Compression::Fast); + let mut enc = GzEncoder::new(buffer, Compression::Fast); for value in aggrs { let sanitized_name: String = sanitize(&value.name); match value.kind() { @@ -719,6 +583,7 @@ fn write_text(aggrs: Iter, mut res: Response) -> io::Result<()> { enc.write_all(v.as_bytes())?; } enc.write_all(b"\"} ")?; + enc.write_all(value.query(*q).unwrap().to_string().as_bytes())?; enc.write_all(b"\n")?; } @@ -741,9 +606,7 @@ fn write_text(aggrs: Iter, mut res: Response) -> io::Result<()> { } } } - let encoded = enc.finish()?; - res.write_all(&encoded)?; - res.end() + enc.finish() } /// Sanitize cernan Telemetry into prometheus' notion @@ -796,6 +659,11 @@ impl Sink for Prometheus { // nothing, intentionally } + fn shutdown(mut self) -> () { + self.flush(); + self.http_srv.shutdown(); + } + fn valve_state(&self) -> Valve { Valve::Open } diff --git a/src/sink/wavefront.rs b/src/sink/wavefront.rs index 7cf8f723..43e3b8e6 100644 --- a/src/sink/wavefront.rs +++ b/src/sink/wavefront.rs @@ -620,6 +620,10 @@ impl Sink for Wavefront { } } + fn shutdown(mut self) -> () { + self.flush(); + } + fn deliver(&mut self, mut point: sync::Arc>) -> () { let telem: Telemetry = sync::Arc::make_mut(&mut point).take().unwrap(); if let Some(age_threshold) = self.age_threshold { diff --git a/src/source/file/file_server.rs b/src/source/file/file_server.rs index 9f870ec3..ac8d59cc 100644 --- a/src/source/file/file_server.rs +++ b/src/source/file/file_server.rs @@ -1,20 +1,16 @@ use glob::glob; use metric; -use seahash::SeaHasher; +use mio; use source::Source; use source::file::file_watcher::FileWatcher; use source::internal::report_full_telemetry; -use std::collections::HashMap; -use std::hash::BuildHasherDefault; use std::mem; use std::path::PathBuf; use std::str; -use std::thread; +use std::time; use util; use util::send; -type HashMapFnv = HashMap>; - /// `FileServer` is a Source which cooperatively schedules reads over files, /// converting the lines of said files into `LogLine` structures. As /// `FileServer` is intended to be useful across multiple operating systems with @@ -87,11 +83,11 @@ impl FileServer { /// Specific operating systems support evented interfaces that correct this /// problem but your intrepid authors know of no generic solution. impl Source for FileServer { - fn run(&mut self) -> () { + fn run(&mut self, poller: mio::Poll) { let mut buffer = String::new(); - let mut fp_map: HashMapFnv = Default::default(); - let mut fp_map_alt: HashMapFnv = Default::default(); + let mut fp_map: util::HashMap = Default::default(); + let mut fp_map_alt: util::HashMap = Default::default(); let mut backoff_cap: usize = 1; let mut lines = Vec::new(); @@ -172,7 +168,16 @@ impl Source for FileServer { backoff_cap = 1; } let backoff = backoff_cap.saturating_sub(global_lines_read); - thread::sleep(::std::time::Duration::from_millis(backoff as u64)); + let mut events = mio::Events::with_capacity(1024); + match poller.poll(& mut events, Some(time::Duration::from_millis(backoff as u64))){ + Err(e) => + panic!(format!("Failed during poll {:?}", e)), + Ok(_num_events) => + // File server doesn't poll for anything other than SYSTEM events. + // As currently there are no system events other than SHUTDOWN, + // we immediately exit. + return, + } } } } diff --git a/src/source/flush.rs b/src/source/flush.rs index 036c5661..0c3a9ae7 100644 --- a/src/source/flush.rs +++ b/src/source/flush.rs @@ -1,4 +1,5 @@ use metric; +use mio; use source::Source; use std::thread::sleep; use std::time::Duration; @@ -19,7 +20,7 @@ impl FlushTimer { } impl Source for FlushTimer { - fn run(&mut self) { + fn run(&mut self, _poll: mio::Poll) { let one_second = Duration::new(1, 0); // idx will _always_ increase. If it's kept at u64 or greater it will // overflow long past the collapse of our industrial civilization only diff --git a/src/source/graphite.rs b/src/source/graphite.rs index 70c8377d..617af2ec 100644 --- a/src/source/graphite.rs +++ b/src/source/graphite.rs @@ -1,22 +1,25 @@ -use super::Source; +use constants; use metric; +use mio; use protocols::graphite::parse_graphite; +use source::Source; +use std; use std::io::BufReader; use std::io::prelude::*; -use std::net::{TcpListener, TcpStream}; use std::net::ToSocketAddrs; use std::str; -use std::sync::Arc; +use std::sync; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::thread; +use thread; +use thread::Stoppable; use util; use util::send; lazy_static! { - pub static ref GRAPHITE_NEW_PEER: Arc = Arc::new(AtomicUsize::new(0)); - pub static ref GRAPHITE_GOOD_PACKET: Arc = Arc::new(AtomicUsize::new(0)); - pub static ref GRAPHITE_TELEM: Arc = Arc::new(AtomicUsize::new(0)); - pub static ref GRAPHITE_BAD_PACKET: Arc = Arc::new(AtomicUsize::new(0)); + pub static ref GRAPHITE_NEW_PEER: sync::Arc = sync::Arc::new(AtomicUsize::new(0)); + pub static ref GRAPHITE_GOOD_PACKET: sync::Arc = sync::Arc::new(AtomicUsize::new(0)); + pub static ref GRAPHITE_TELEM: sync::Arc = sync::Arc::new(AtomicUsize::new(0)); + pub static ref GRAPHITE_BAD_PACKET: sync::Arc = sync::Arc::new(AtomicUsize::new(0)); } /// Graphite protocol source @@ -26,7 +29,7 @@ pub struct Graphite { chans: util::Channel, host: String, port: u16, - tags: Arc, + tags: sync::Arc, } /// Configured for the `metric::Telemetry` source. @@ -64,87 +67,156 @@ impl Graphite { chans: chans, host: config.host, port: config.port, - tags: Arc::new(config.tags), + tags: sync::Arc::new(config.tags), } } } -fn handle_tcp( +fn spawn_stream_handlers( chans: util::Channel, - tags: Arc, - listner: TcpListener, -) -> thread::JoinHandle<()> { - thread::spawn(move || { - for stream in listner.incoming() { - if let Ok(stream) = stream { - GRAPHITE_NEW_PEER.fetch_add(1, Ordering::Relaxed); - debug!( - "new peer at {:?} | local addr for peer {:?}", - stream.peer_addr(), - stream.local_addr() - ); - let tags = Arc::clone(&tags); - let chans = chans.clone(); - thread::spawn(move || { - handle_stream(chans, tags, stream); + tags: &sync::Arc, + listener: &mio::net::TcpListener, + stream_handlers: &mut Vec, +) -> () { + loop { + match listener.accept() { + Ok((stream, _addr)) => { + let rchans = chans.clone(); + let rtags = sync::Arc::clone(tags); + let new_stream = thread::spawn(move |poller| { + poller + .register( + &stream, + mio::Token(0), + mio::Ready::readable(), + mio::PollOpt::edge(), + ) + .unwrap(); + + handle_stream(rchans, &rtags, &poller, stream); }); + stream_handlers.push(new_stream); } - } - }) + + Err(e) => match e.kind() { + std::io::ErrorKind::WouldBlock => { + break; + } + _ => unimplemented!(), + }, + }; + } } fn handle_stream( mut chans: util::Channel, - tags: Arc, - stream: TcpStream, + tags: &sync::Arc, + poller: &mio::Poll, + stream: mio::net::TcpStream, ) { - thread::spawn(move || { - let mut line = String::new(); - let mut res = Vec::new(); - let mut line_reader = BufReader::new(stream); - let basic_metric = Arc::new(Some( - metric::Telemetry::default().overlay_tags_from_map(&tags), - )); - while let Some(len) = line_reader.read_line(&mut line).ok() { - if len > 0 { - if parse_graphite(&line, &mut res, &basic_metric) { - assert!(!res.is_empty()); - GRAPHITE_GOOD_PACKET.fetch_add(1, Ordering::Relaxed); - GRAPHITE_TELEM.fetch_add(1, Ordering::Relaxed); - for m in res.drain(..) { - send(&mut chans, metric::Event::Telemetry(Arc::new(Some(m)))); + let mut line = String::new(); + let mut res = Vec::new(); + let mut line_reader = BufReader::new(stream); + let basic_metric = sync::Arc::new(Some( + metric::Telemetry::default().overlay_tags_from_map(tags), + )); + + loop { + let mut events = mio::Events::with_capacity(1024); + match poller.poll(&mut events, None) { + Err(e) => panic!(format!("Failed during poll {:?}", e)), + Ok(_num_events) => for event in events { + match event.token() { + constants::SYSTEM => return, + _stream_token => if let Ok(len) = line_reader.read_line(&mut line) + { + if len > 0 { + if parse_graphite(&line, &mut res, &basic_metric) { + assert!(!res.is_empty()); + GRAPHITE_GOOD_PACKET.fetch_add(1, Ordering::Relaxed); + GRAPHITE_TELEM.fetch_add(1, Ordering::Relaxed); + for m in res.drain(..) { + send( + &mut chans, + metric::Event::Telemetry(sync::Arc::new( + Some(m), + )), + ); + } + line.clear(); + } else { + GRAPHITE_BAD_PACKET.fetch_add(1, Ordering::Relaxed); + error!("bad packet: {:?}", line); + line.clear(); + } + } else { + break; + } + }, + } + }, + } + } +} + +fn handle_tcp( + chans: util::Channel, + tags: &sync::Arc, + listeners: util::TokenSlab, + poll: &mio::Poll, +) { + let mut stream_handlers: Vec = Vec::new(); + loop { + let mut events = mio::Events::with_capacity(1024); + match poll.poll(&mut events, None) { + Err(e) => panic!(format!("Failed during poll {:?}", e)), + Ok(_num_events) => { + for event in events { + match event.token() { + constants::SYSTEM => { + for handler in stream_handlers { + handler.shutdown(); + } + return; + } + listener_token => { + let listener = &listeners[listener_token]; + spawn_stream_handlers( + chans.clone(), // TODO: do not clone, make an Arc + tags, + listener, + &mut stream_handlers, + ); + } } - line.clear(); - } else { - GRAPHITE_BAD_PACKET.fetch_add(1, Ordering::Relaxed); - error!("bad packet: {:?}", line); - line.clear(); } - } else { - break; } } - }); + } } impl Source for Graphite { - fn run(&mut self) { - let mut joins = Vec::new(); - + fn run(&mut self, poll: mio::Poll) { let addrs = (self.host.as_str(), self.port).to_socket_addrs(); match addrs { Ok(ips) => { let ips: Vec<_> = ips.collect(); + let mut listeners = + util::TokenSlab::::new(); for addr in ips { - let listener = - TcpListener::bind(addr).expect("Unable to bind to TCP socket"); - let chans = self.chans.clone(); - let tags = Arc::clone(&self.tags); - info!("server started on {:?} {}", addr, self.port); - joins.push(thread::spawn(move || { - handle_tcp(chans, tags, listener) - })); + let listener = mio::net::TcpListener::bind(&addr) + .expect("Unable to bind to TCP socket"); + info!("registered listener for {:?} {}", addr, self.port); + let token = listeners.insert(listener); + poll.register( + &listeners[token], + token, + mio::Ready::readable(), + mio::PollOpt::edge(), + ).unwrap(); } + + handle_tcp(self.chans.clone(), &self.tags, listeners, &poll); } Err(e) => { info!( @@ -153,13 +225,5 @@ impl Source for Graphite { ); } } - - // TODO thread spawn trick, join on results - for jh in joins { - // TODO Having sub-threads panic will not cause a bubble-up if that - // thread is not the currently examined one. We're going to have to have - // some manner of sub-thread communication going on. - jh.join().expect("Uh oh, child thread panicked!"); - } } } diff --git a/src/source/internal.rs b/src/source/internal.rs index a9658c63..cdb62f2a 100644 --- a/src/source/internal.rs +++ b/src/source/internal.rs @@ -10,6 +10,7 @@ use std::sync; use std::sync::atomic::Ordering; use time; use util; +use mio; lazy_static! { static ref Q: Stack = Stack::new(); @@ -106,318 +107,321 @@ macro_rules! atom_non_zero_telem { /// floor. impl Source for Internal { #[allow(cyclomatic_complexity)] - fn run(&mut self) { + fn run(&mut self, poll: mio::Poll) { let slp = std::time::Duration::from_millis(1_000); loop { - std::thread::sleep(slp); - if !self.chans.is_empty() { - // source::graphite - atom_non_zero_telem!( - "cernan.graphite.new_peer", - source::graphite::GRAPHITE_NEW_PEER, - self.tags, - self.chans - ); - atom_non_zero_telem!( - "cernan.graphite.packet", - source::graphite::GRAPHITE_GOOD_PACKET, - self.tags, - self.chans - ); - atom_non_zero_telem!( - "cernan.graphite.telemetry.received", - source::graphite::GRAPHITE_TELEM, - self.tags, - self.chans - ); - atom_non_zero_telem!( - "cernan.graphite.bad_packet", - source::graphite::GRAPHITE_BAD_PACKET, - self.tags, - self.chans - ); - // source::statsd - atom_non_zero_telem!( - "cernan.statsd.packet", - source::statsd::STATSD_GOOD_PACKET, - self.tags, - self.chans - ); - atom_non_zero_telem!( - "cernan.statsd.bad_packet", - source::statsd::STATSD_BAD_PACKET, - self.tags, - self.chans - ); - // sink::elasticsearch - atom_non_zero_telem!( - "cernan.sinks.elasticsearch.records.delivery", - sink::elasticsearch::ELASTIC_RECORDS_DELIVERY, - self.tags, - self.chans - ); - atom_non_zero_telem!( - "cernan.sinks.elasticsearch.internal.buffer_len", - sink::elasticsearch::ELASTIC_INTERNAL_BUFFER_LEN, - self.tags, - self.chans - ); - atom_non_zero_telem!( - "cernan.sinks.elasticsearch.records.total_delivered", - sink::elasticsearch::ELASTIC_RECORDS_TOTAL_DELIVERED, - self.tags, - self.chans - ); - atom_non_zero_telem!( - "cernan.sinks.elasticsearch.records.total_failed", - sink::elasticsearch::ELASTIC_RECORDS_TOTAL_FAILED, - self.tags, - self.chans - ); - atom_non_zero_telem!( - "cernan.sinks.elasticsearch.error.unknown", - sink::elasticsearch::ELASTIC_ERROR_UNKNOWN, - self.tags, - self.chans - ); - atom_non_zero_telem!( - "cernan.sinks.elasticsearch.error.bulk_action.index", - sink::elasticsearch::ELASTIC_BULK_ACTION_INDEX_ERR, - self.tags, - self.chans - ); - atom_non_zero_telem!( - "cernan.sinks.elasticsearch.error.bulk_action.create", - sink::elasticsearch::ELASTIC_BULK_ACTION_CREATE_ERR, - self.tags, - self.chans - ); - atom_non_zero_telem!( - "cernan.sinks.elasticsearch.error.bulk_action.update", - sink::elasticsearch::ELASTIC_BULK_ACTION_UPDATE_ERR, - self.tags, - self.chans - ); - atom_non_zero_telem!( - "cernan.sinks.elasticsearch.error.bulk_action.delete", - sink::elasticsearch::ELASTIC_BULK_ACTION_DELETE_ERR, - self.tags, - self.chans - ); - atom_non_zero_telem!( - "cernan.sinks.elasticsearch.error.api.index_not_found", - sink::elasticsearch::ELASTIC_ERROR_API_INDEX_NOT_FOUND, - self.tags, - self.chans - ); - atom_non_zero_telem!( - "cernan.sinks.elasticsearch.error.api.parsing", - sink::elasticsearch::ELASTIC_ERROR_API_PARSING, - self.tags, - self.chans - ); - atom_non_zero_telem!( - "cernan.sinks.elasticsearch.error.api.mapper_parsing", - sink::elasticsearch::ELASTIC_ERROR_API_MAPPER_PARSING, - self.tags, - self.chans - ); - atom_non_zero_telem!( - "cernan.sinks.elasticsearch.error.api.action_request_validation", - sink::elasticsearch::ELASTIC_ERROR_API_ACTION_REQUEST_VALIDATION, - self.tags, - self.chans - ); - atom_non_zero_telem!( - "cernan.sinks.elasticsearch.error.api.action_document_missing", - sink::elasticsearch::ELASTIC_ERROR_API_DOCUMENT_MISSING, - self.tags, - self.chans - ); - atom_non_zero_telem!( - "cernan.sinks.elasticsearch.error.api.index_already_exists", - sink::elasticsearch::ELASTIC_ERROR_API_INDEX_ALREADY_EXISTS, - self.tags, - self.chans - ); - atom_non_zero_telem!( - "cernan.sinks.elasticsearch.error.api.unknown", - sink::elasticsearch::ELASTIC_ERROR_API_UNKNOWN, - self.tags, - self.chans - ); - atom_non_zero_telem!( - "cernan.sinks.elasticsearch.error.client", - sink::elasticsearch::ELASTIC_ERROR_CLIENT, - self.tags, - self.chans - ); - // sink::wavefront - atom_non_zero_telem!( - "cernan.sinks.wavefront.aggregation.histogram", - sink::wavefront::WAVEFRONT_AGGR_HISTO, - self.tags, - self.chans - ); - atom_non_zero_telem!( - "cernan.sinks.wavefront.aggregation.sum", - sink::wavefront::WAVEFRONT_AGGR_SUM, - self.tags, - self.chans - ); - atom_non_zero_telem!( - "cernan.sinks.wavefront.aggregation.set", - sink::wavefront::WAVEFRONT_AGGR_SET, - self.tags, - self.chans - ); - atom_non_zero_telem!( - "cernan.sinks.wavefront.aggregation.summarize", - sink::wavefront::WAVEFRONT_AGGR_SUMMARIZE, - self.tags, - self.chans - ); - atom_non_zero_telem!( - "cernan.sinks.wavefront.aggregation.summarize.total_percentiles", - sink::wavefront::WAVEFRONT_AGGR_TOT_PERCENT, - self.tags, - self.chans - ); - atom_non_zero_telem!( - "cernan.sinks.wavefront.delivery_attempts", - sink::wavefront::WAVEFRONT_DELIVERY_ATTEMPTS, - self.tags, - self.chans - ); - atom_non_zero_telem!( - "cernan.sinks.wavefront.value.closed", - sink::wavefront::WAVEFRONT_VALVE_CLOSED, - self.tags, - self.chans - ); - atom_non_zero_telem!( - "cernan.sinks.wavefront.valve.open", - sink::wavefront::WAVEFRONT_VALVE_OPEN, - self.tags, - self.chans - ); - // sink::prometheus - atom_non_zero_telem!( - "cernan.sinks.prometheus.aggregation.inside_baseball.windowed.total", - sink::prometheus::PROMETHEUS_AGGR_WINDOWED_LEN, - self.tags, - self.chans - ); - atom_non_zero_telem!( - "cernan.sinks.prometheus.exposition.inside_baseball.delay.sum", - sink::prometheus::PROMETHEUS_RESPONSE_DELAY_SUM, - self.tags, - self.chans - ); - atom_non_zero_telem!( - "cernan.sinks.prometheus.aggregation.inside_baseball.perpetual.total", - sink::prometheus::PROMETHEUS_AGGR_PERPETUAL_LEN, - self.tags, - self.chans - ); - atom_non_zero_telem!( - "cernan.sinks.prometheus.aggregation.reportable", - sink::prometheus::PROMETHEUS_AGGR_REPORTABLE, - self.tags, - self.chans - ); - atom_non_zero_telem!( - "cernan.sinks.prometheus.aggregation.remaining", - sink::prometheus::PROMETHEUS_AGGR_REMAINING, - self.tags, - self.chans - ); - atom_non_zero_telem!( - "cernan.sinks.prometheus.write.binary", - sink::prometheus::PROMETHEUS_WRITE_BINARY, - self.tags, - self.chans - ); - atom_non_zero_telem!( - "cernan.sinks.prometheus.write.text", - sink::prometheus::PROMETHEUS_WRITE_TEXT, - self.tags, - self.chans - ); - atom_non_zero_telem!( - "cernan.sinks.prometheus.report_error", - sink::prometheus::PROMETHEUS_REPORT_ERROR, - self.tags, - self.chans - ); - // sink::influxdb - atom_non_zero_telem!( - "cernan.sinks.influxdb.delivery_attempts", - sink::influxdb::INFLUX_DELIVERY_ATTEMPTS, - self.tags, - self.chans - ); - atom_non_zero_telem!( - "cernan.sinks.influxdb.success", - sink::influxdb::INFLUX_SUCCESS, - self.tags, - self.chans - ); - atom_non_zero_telem!( - "cernan.sinks.influxdb.failure.client_error", - sink::influxdb::INFLUX_FAILURE_CLIENT, - self.tags, - self.chans - ); - atom_non_zero_telem!( - "cernan.sinks.influxdb.failure.server_error", - sink::influxdb::INFLUX_FAILURE_SERVER, - self.tags, - self.chans - ); - // filter::delay_filter - atom_non_zero_telem!( - "cernan.filters.delay.telemetry.accept", - filter::delay_filter::DELAY_TELEM_ACCEPT, - self.tags, - self.chans - ); - atom_non_zero_telem!( - "cernan.filters.delay.telemetry.reject", - filter::delay_filter::DELAY_TELEM_REJECT, - self.tags, - self.chans - ); - atom_non_zero_telem!( - "cernan.filters.delay.log.reject", - filter::delay_filter::DELAY_LOG_REJECT, - self.tags, - self.chans - ); - atom_non_zero_telem!( - "cernan.filters.delay.log.accept", - filter::delay_filter::DELAY_LOG_ACCEPT, - self.tags, - self.chans - ); - while let Some(mut telem) = Q.pop() { + let mut events = mio::Events::with_capacity(1024); + match poll.poll(&mut events, Some(slp)) { + Err(_) => error!("Failed to poll for system events"), + // Internal source doesn't register any external evented sources. + // Any event must be a system event which, at the time of this writing, + // can only be a shutdown event. + Ok(num_events) if num_events > 0 => return, + Ok(_) => { if !self.chans.is_empty() { - telem = telem.overlay_tags_from_map(&self.tags); - util::send( - &mut self.chans, - metric::Event::new_telemetry(telem), + // source::graphite + atom_non_zero_telem!( + "cernan.graphite.new_peer", + source::graphite::GRAPHITE_NEW_PEER, + self.tags, + self.chans ); + atom_non_zero_telem!( + "cernan.graphite.packet", + source::graphite::GRAPHITE_GOOD_PACKET, + self.tags, + self.chans + ); + atom_non_zero_telem!( + "cernan.graphite.telemetry.received", + source::graphite::GRAPHITE_TELEM, + self.tags, + self.chans + ); + atom_non_zero_telem!( + "cernan.graphite.bad_packet", + source::graphite::GRAPHITE_BAD_PACKET, + self.tags, + self.chans + ); + // source::statsd + atom_non_zero_telem!( + "cernan.statsd.packet", + source::statsd::STATSD_GOOD_PACKET, + self.tags, + self.chans + ); + atom_non_zero_telem!( + "cernan.statsd.bad_packet", + source::statsd::STATSD_BAD_PACKET, + self.tags, + self.chans + ); + // sink::elasticsearch + atom_non_zero_telem!( + "cernan.sinks.elasticsearch.records.delivery", + sink::elasticsearch::ELASTIC_RECORDS_DELIVERY, + self.tags, + self.chans + ); + atom_non_zero_telem!( + "cernan.sinks.elasticsearch.internal.buffer_len", + sink::elasticsearch::ELASTIC_INTERNAL_BUFFER_LEN, + self.tags, + self.chans + ); + atom_non_zero_telem!( + "cernan.sinks.elasticsearch.records.total_delivered", + sink::elasticsearch::ELASTIC_RECORDS_TOTAL_DELIVERED, + self.tags, + self.chans + ); + atom_non_zero_telem!( + "cernan.sinks.elasticsearch.records.total_failed", + sink::elasticsearch::ELASTIC_RECORDS_TOTAL_FAILED, + self.tags, + self.chans + ); + atom_non_zero_telem!( + "cernan.sinks.elasticsearch.error.unknown", + sink::elasticsearch::ELASTIC_ERROR_UNKNOWN, + self.tags, + self.chans + ); + atom_non_zero_telem!( + "cernan.sinks.elasticsearch.error.bulk_action.index", + sink::elasticsearch::ELASTIC_BULK_ACTION_INDEX_ERR, + self.tags, + self.chans + ); + atom_non_zero_telem!( + "cernan.sinks.elasticsearch.error.bulk_action.create", + sink::elasticsearch::ELASTIC_BULK_ACTION_CREATE_ERR, + self.tags, + self.chans + ); + atom_non_zero_telem!( + "cernan.sinks.elasticsearch.error.bulk_action.update", + sink::elasticsearch::ELASTIC_BULK_ACTION_UPDATE_ERR, + self.tags, + self.chans + ); + atom_non_zero_telem!( + "cernan.sinks.elasticsearch.error.bulk_action.delete", + sink::elasticsearch::ELASTIC_BULK_ACTION_DELETE_ERR, + self.tags, + self.chans + ); + atom_non_zero_telem!( + "cernan.sinks.elasticsearch.error.api.index_not_found", + sink::elasticsearch::ELASTIC_ERROR_API_INDEX_NOT_FOUND, + self.tags, + self.chans + ); + atom_non_zero_telem!( + "cernan.sinks.elasticsearch.error.api.parsing", + sink::elasticsearch::ELASTIC_ERROR_API_PARSING, + self.tags, + self.chans + ); + atom_non_zero_telem!( + "cernan.sinks.elasticsearch.error.api.mapper_parsing", + sink::elasticsearch::ELASTIC_ERROR_API_MAPPER_PARSING, + self.tags, + self.chans + ); + atom_non_zero_telem!( + "cernan.sinks.elasticsearch.error.api.action_request_validation", + sink::elasticsearch::ELASTIC_ERROR_API_ACTION_REQUEST_VALIDATION, + self.tags, + self.chans + ); + atom_non_zero_telem!( + "cernan.sinks.elasticsearch.error.api.action_document_missing", + sink::elasticsearch::ELASTIC_ERROR_API_DOCUMENT_MISSING, + self.tags, + self.chans + ); + atom_non_zero_telem!( + "cernan.sinks.elasticsearch.error.api.index_already_exists", + sink::elasticsearch::ELASTIC_ERROR_API_INDEX_ALREADY_EXISTS, + self.tags, + self.chans + ); + atom_non_zero_telem!( + "cernan.sinks.elasticsearch.error.api.unknown", + sink::elasticsearch::ELASTIC_ERROR_API_UNKNOWN, + self.tags, + self.chans + ); + atom_non_zero_telem!( + "cernan.sinks.elasticsearch.error.client", + sink::elasticsearch::ELASTIC_ERROR_CLIENT, + self.tags, + self.chans + ); + // sink::wavefront + atom_non_zero_telem!( + "cernan.sinks.wavefront.aggregation.histogram", + sink::wavefront::WAVEFRONT_AGGR_HISTO, + self.tags, + self.chans + ); + atom_non_zero_telem!( + "cernan.sinks.wavefront.aggregation.sum", + sink::wavefront::WAVEFRONT_AGGR_SUM, + self.tags, + self.chans + ); + atom_non_zero_telem!( + "cernan.sinks.wavefront.aggregation.set", + sink::wavefront::WAVEFRONT_AGGR_SET, + self.tags, + self.chans + ); + atom_non_zero_telem!( + "cernan.sinks.wavefront.aggregation.summarize", + sink::wavefront::WAVEFRONT_AGGR_SUMMARIZE, + self.tags, + self.chans + ); + atom_non_zero_telem!( + "cernan.sinks.wavefront.aggregation.summarize.total_percentiles", + sink::wavefront::WAVEFRONT_AGGR_TOT_PERCENT, + self.tags, + self.chans + ); + atom_non_zero_telem!( + "cernan.sinks.wavefront.delivery_attempts", + sink::wavefront::WAVEFRONT_DELIVERY_ATTEMPTS, + self.tags, + self.chans + ); + atom_non_zero_telem!( + "cernan.sinks.wavefront.value.closed", + sink::wavefront::WAVEFRONT_VALVE_CLOSED, + self.tags, + self.chans + ); + atom_non_zero_telem!( + "cernan.sinks.wavefront.valve.open", + sink::wavefront::WAVEFRONT_VALVE_OPEN, + self.tags, + self.chans + ); + // sink::prometheus + atom_non_zero_telem!( + "cernan.sinks.prometheus.aggregation.inside_baseball.windowed.total", + sink::prometheus::PROMETHEUS_AGGR_WINDOWED_LEN, + self.tags, + self.chans + ); + atom_non_zero_telem!( + "cernan.sinks.prometheus.exposition.inside_baseball.delay.sum", + sink::prometheus::PROMETHEUS_RESPONSE_DELAY_SUM, + self.tags, + self.chans + ); + atom_non_zero_telem!( + "cernan.sinks.prometheus.aggregation.inside_baseball.perpetual.total", + sink::prometheus::PROMETHEUS_AGGR_PERPETUAL_LEN, + self.tags, + self.chans + ); + atom_non_zero_telem!( + "cernan.sinks.prometheus.aggregation.reportable", + sink::prometheus::PROMETHEUS_AGGR_REPORTABLE, + self.tags, + self.chans + ); + atom_non_zero_telem!( + "cernan.sinks.prometheus.aggregation.remaining", + sink::prometheus::PROMETHEUS_AGGR_REMAINING, + self.tags, + self.chans + ); + atom_non_zero_telem!( + "cernan.sinks.prometheus.report.success", + sink::prometheus::PROMETHEUS_REPORT_SUCCESS, + self.tags, + self.chans + ); + atom_non_zero_telem!( + "cernan.sinks.prometheus.report.error", + sink::prometheus::PROMETHEUS_REPORT_ERROR, + self.tags, + self.chans + ); + // sink::influxdb + atom_non_zero_telem!( + "cernan.sinks.influxdb.delivery_attempts", + sink::influxdb::INFLUX_DELIVERY_ATTEMPTS, + self.tags, + self.chans + ); + atom_non_zero_telem!( + "cernan.sinks.influxdb.success", + sink::influxdb::INFLUX_SUCCESS, + self.tags, + self.chans + ); + atom_non_zero_telem!( + "cernan.sinks.influxdb.failure.client_error", + sink::influxdb::INFLUX_FAILURE_CLIENT, + self.tags, + self.chans + ); + atom_non_zero_telem!( + "cernan.sinks.influxdb.failure.server_error", + sink::influxdb::INFLUX_FAILURE_SERVER, + self.tags, + self.chans + ); + // filter::delay_filter + atom_non_zero_telem!( + "cernan.filters.delay.telemetry.accept", + filter::delay_filter::DELAY_TELEM_ACCEPT, + self.tags, + self.chans + ); + atom_non_zero_telem!( + "cernan.filters.delay.telemetry.reject", + filter::delay_filter::DELAY_TELEM_REJECT, + self.tags, + self.chans + ); + atom_non_zero_telem!( + "cernan.filters.delay.log.reject", + filter::delay_filter::DELAY_LOG_REJECT, + self.tags, + self.chans + ); + atom_non_zero_telem!( + "cernan.filters.delay.log.accept", + filter::delay_filter::DELAY_LOG_ACCEPT, + self.tags, + self.chans + ); + while let Some(mut telem) = Q.pop() { + if !self.chans.is_empty() { + telem = telem.overlay_tags_from_map(&self.tags); + util::send( + &mut self.chans, + metric::Event::new_telemetry(telem), + ); + } else { + // do nothing, intentionally + } + } } else { - // do nothing, intentionally + // There are no chans available. In this case we want to drain + // and deallocate any internal telemetry that may have made it + // to Q. + while let Some(_) = Q.pop() { + // do nothing, intentionally + } } } - } else { - // There are no chans available. In this case we want to drain - // and deallocate any internal telemetry that may have made it - // to Q. - while let Some(_) = Q.pop() { - // do nothing, intentionally - } } } } diff --git a/src/source/mod.rs b/src/source/mod.rs index 14e6d19e..6f379835 100644 --- a/src/source/mod.rs +++ b/src/source/mod.rs @@ -3,6 +3,8 @@ //! In cernan a `Source` is a place where all `metric::Event` come from, feeding //! down into the source's forwards for further processing. Statsd is a source //! that creates `Telemetry`, `FileServer` is a source that creates `LogLine`s. +use mio; + mod file; mod flush; mod graphite; @@ -17,6 +19,7 @@ pub use self::internal::{report_full_telemetry, Internal, InternalConfig}; pub use self::native::{NativeServer, NativeServerConfig}; pub use self::statsd::{Statsd, StatsdConfig, StatsdParseConfig}; + /// cernan Source, the originator of all `metric::Event`. /// /// A cernan Source creates all `metric::Event`, doing so by listening to @@ -24,5 +27,5 @@ pub use self::statsd::{Statsd, StatsdConfig, StatsdParseConfig}; /// topology. pub trait Source { /// Run the Source, the exact mechanism here depends on the Source itself. - fn run(&mut self) -> (); + fn run(&mut self, _poll: mio::Poll) -> (); } diff --git a/src/source/native.rs b/src/source/native.rs index de624b6b..4adf788b 100644 --- a/src/source/native.rs +++ b/src/source/native.rs @@ -1,14 +1,18 @@ use super::Source; use byteorder::{BigEndian, ReadBytesExt}; +use constants; use hopper; use metric; +use mio; use protobuf; use protocols::native::{AggregationMethod, Payload}; use std::io; use std::io::Read; -use std::net::{TcpListener, TcpStream, ToSocketAddrs}; +use std::net::ToSocketAddrs; use std::str; -use std::thread; +use std::sync; +use thread; +use thread::Stoppable; use util; /// The native source @@ -70,121 +74,203 @@ impl NativeServer { fn handle_tcp( chans: util::Channel, - tags: metric::TagMap, - listner: TcpListener, -) -> thread::JoinHandle<()> { - thread::spawn(move || { - for stream in listner.incoming() { - if let Ok(stream) = stream { - debug!( - "new peer at {:?} | local addr for peer {:?}", - stream.peer_addr(), - stream.local_addr() - ); - let tags = tags.clone(); - let chans = chans.clone(); - thread::spawn(move || { - handle_stream(chans, tags, stream); + tags: &sync::Arc, + listeners: util::TokenSlab, + poller: &mio::Poll, +) { + let mut stream_handlers: Vec = Vec::new(); + loop { + let mut events = mio::Events::with_capacity(1024); + match poller.poll(&mut events, None) { + Err(e) => panic!(format!("Failed during poll {:?}", e)), + Ok(_num_events) => for event in events { + match event.token() { + constants::SYSTEM => { + for handler in stream_handlers { + handler.shutdown(); + } + return; + } + listener_token => { + let listener = &listeners[listener_token]; + spawn_stream_handlers( + chans.clone(), + tags, + listener, + &mut stream_handlers, + ) + } + } + }, + } + } +} + +fn spawn_stream_handlers( + chans: util::Channel, + tags: &sync::Arc, + listener: &mio::net::TcpListener, + stream_handlers: &mut Vec, +) -> () { + loop { + match listener.accept() { + Ok((stream, _addr)) => { + let rtags = sync::Arc::clone(tags); + let rchans = chans.clone(); + + let new_stream = thread::spawn(move |poller| { + poller + .register( + &stream, + mio::Token(0), + mio::Ready::readable(), + mio::PollOpt::edge(), + ) + .unwrap(); + + handle_stream(rchans, &rtags, &poller, stream); }); + stream_handlers.push(new_stream); } + + Err(e) => match e.kind() { + io::ErrorKind::WouldBlock => { + break; + } + _ => unimplemented!(), + }, } - }) + } } -fn handle_stream(mut chans: util::Channel, tags: metric::TagMap, stream: TcpStream) { - thread::spawn(move || { - let mut reader = io::BufReader::new(stream); - let mut buf = Vec::with_capacity(4000); - loop { - let payload_size_in_bytes = match reader.read_u32::() { - Ok(i) => i as usize, - Err(_) => return, - }; - buf.resize(payload_size_in_bytes, 0); - if reader.read_exact(&mut buf).is_err() { - return; - } - match protobuf::parse_from_bytes::(&buf) { - // TODO we have to handle bin_bounds. We'll use samples to get - // the values of each bounds' counter. - Ok(mut pyld) => { - for mut point in pyld.take_points().into_iter() { - let name: String = point.take_name(); - let smpls: Vec = point.take_samples(); - let aggr_type: AggregationMethod = point.get_method(); - let mut meta = point.take_metadata(); - // FIXME #166 - let ts: i64 = (point.get_timestamp_ms() as f64 * 0.001) as i64; - - if smpls.is_empty() { - continue; - } - let mut metric = metric::Telemetry::new().name(name); - metric = metric.value(smpls[0]); - metric = match aggr_type { - AggregationMethod::SET => { - metric.kind(metric::AggregationMethod::Set) - } - AggregationMethod::SUM => { - metric.kind(metric::AggregationMethod::Sum) - } - AggregationMethod::SUMMARIZE => { - metric.kind(metric::AggregationMethod::Summarize) - } - AggregationMethod::BIN => { - metric.kind(metric::AggregationMethod::Histogram) - } - }; - metric = metric.persist(point.get_persisted()); - metric = metric.timestamp(ts); - let mut metric = metric.harden().unwrap(); // todo don't unwrap - metric = metric.overlay_tags_from_map(&tags); - for (key, value) in meta.drain() { - metric = metric.overlay_tag(key, value); - } - for smpl in &smpls[1..] { - metric = metric.insert(*smpl); - } - util::send(&mut chans, metric::Event::new_telemetry(metric)); +fn handle_stream( + chans: util::Channel, + tags: &sync::Arc, + poller: &mio::Poll, + stream: mio::net::TcpStream, +) { + let mut reader = io::BufReader::new(stream); + loop { + let mut events = mio::Events::with_capacity(1024); + match poller.poll(&mut events, None) { + Err(e) => panic!(format!("Failed during poll {:?}", e)), + Ok(_num_events) => for event in events { + match event.token() { + constants::SYSTEM => return, + _stream_token => { + let rchans = chans.clone(); + handle_stream_payload(rchans, tags, &mut reader); } - for mut line in pyld.take_lines().into_iter() { - let path: String = line.take_path(); - let value: String = line.take_value(); - let mut meta = line.take_metadata(); - // FIXME #166 - let ts: i64 = (line.get_timestamp_ms() as f64 * 0.001) as i64; - - let mut logline = metric::LogLine::new(path, value); - logline = logline.time(ts); - logline = logline.overlay_tags_from_map(&tags); - for (key, value) in meta.drain() { - logline = logline.overlay_tag(key, value); - } - util::send(&mut chans, metric::Event::new_log(logline)); + } + }, + } + } +} + +fn handle_stream_payload( + mut chans: util::Channel, + tags: &sync::Arc, + reader: &mut io::BufReader, +) { + let mut buf = Vec::with_capacity(4000); + let payload_size_in_bytes = match reader.read_u32::() { + Ok(i) => i as usize, + Err(_) => return, + }; + buf.resize(payload_size_in_bytes, 0); + if reader.read_exact(&mut buf).is_err() { + return; + } + match protobuf::parse_from_bytes::(&buf) { + // TODO we have to handle bin_bounds. We'll use samples to get + // the values of each bounds' counter. + Ok(mut pyld) => { + for mut point in pyld.take_points().into_iter() { + let name: String = point.take_name(); + let smpls: Vec = point.take_samples(); + let aggr_type: AggregationMethod = point.get_method(); + let mut meta = point.take_metadata(); + // FIXME #166 + let ts: i64 = (point.get_timestamp_ms() as f64 * 0.001) as i64; + + if smpls.is_empty() { + continue; + } + let mut metric = metric::Telemetry::new().name(name); + metric = metric.value(smpls[0]); + metric = match aggr_type { + AggregationMethod::SET => { + metric.kind(metric::AggregationMethod::Set) + } + AggregationMethod::SUM => { + metric.kind(metric::AggregationMethod::Sum) } + AggregationMethod::SUMMARIZE => { + metric.kind(metric::AggregationMethod::Summarize) + } + AggregationMethod::BIN => { + metric.kind(metric::AggregationMethod::Histogram) + } + }; + metric = metric.persist(point.get_persisted()); + metric = metric.timestamp(ts); + let mut metric = metric.harden().unwrap(); // todo don't unwrap + metric = metric.overlay_tags_from_map(tags); + for (key, value) in meta.drain() { + metric = metric.overlay_tag(key, value); + } + for smpl in &smpls[1..] { + metric = metric.insert(*smpl); } - Err(err) => { - trace!("Unable to read payload: {:?}", err); - return; + util::send(&mut chans, metric::Event::new_telemetry(metric)); + } + for mut line in pyld.take_lines().into_iter() { + let path: String = line.take_path(); + let value: String = line.take_value(); + let mut meta = line.take_metadata(); + // FIXME #166 + let ts: i64 = (line.get_timestamp_ms() as f64 * 0.001) as i64; + + let mut logline = metric::LogLine::new(path, value); + logline = logline.time(ts); + logline = logline.overlay_tags_from_map(tags); + for (key, value) in meta.drain() { + logline = logline.overlay_tag(key, value); } + util::send(&mut chans, metric::Event::new_log(logline)); } } - }); + Err(err) => { + trace!("Unable to read payload: {:?}", err); + return; + } + } } impl Source for NativeServer { - fn run(&mut self) { + fn run(&mut self, poller: mio::Poll) { let srv: Vec<_> = (self.ip.as_str(), self.port) .to_socket_addrs() .expect("unable to make socket addr") .collect(); - let listener = TcpListener::bind(srv.first().unwrap()) + + let mut listeners: util::TokenSlab = + util::TokenSlab::new(); + let listener = mio::net::TcpListener::bind(srv.first().unwrap()) .expect("Unable to bind to TCP socket"); + let token = listeners.insert(listener); + poller + .register( + &listeners[token], + token, + mio::Ready::readable(), + mio::PollOpt::edge(), + ) + .unwrap(); + let chans = self.chans.clone(); - let tags = self.tags.clone(); + let tags = sync::Arc::new(self.tags.clone()); info!("server started on {}:{}", self.ip, self.port); - let jh = thread::spawn(move || handle_tcp(chans, tags, listener)); - - jh.join().expect("Uh oh, child thread panicked!"); + handle_tcp(chans, &tags, listeners, &poller); } } diff --git a/src/source/statsd.rs b/src/source/statsd.rs index d51ad533..f277c207 100644 --- a/src/source/statsd.rs +++ b/src/source/statsd.rs @@ -1,13 +1,14 @@ +use constants; use metric; +use mio; use protocols::statsd::parse_statsd; use regex::Regex; use source::Source; -use std::net::{ToSocketAddrs, UdpSocket}; +use std::net::ToSocketAddrs; use std::str; use std::sync; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::thread; use util; use util::send; @@ -106,7 +107,8 @@ fn handle_udp( mut chans: util::Channel, tags: &sync::Arc, parse_config: &sync::Arc, - socket: &UdpSocket, + conns: &util::TokenSlab, + poll: &mio::Poll, ) { let mut buf = vec![0; 16_250]; let mut metrics = Vec::new(); @@ -114,47 +116,70 @@ fn handle_udp( metric::Telemetry::default().overlay_tags_from_map(tags), )); loop { - let (len, _) = match socket.recv_from(&mut buf) { - Ok(r) => r, - Err(e) => panic!(format!("Could not read UDP socket with error {:?}", e)), - }; - match str::from_utf8(&buf[..len]) { - Ok(val) => if parse_statsd(val, &mut metrics, &basic_metric, parse_config) - { - for m in metrics.drain(..) { - send(&mut chans, metric::Event::new_telemetry(m)); + let mut events = mio::Events::with_capacity(1024); + match poll.poll(&mut events, None) { + Ok(_num_events) => for event in events { + match event.token() { + constants::SYSTEM => return, + token => { + // Get the socket to receive from: + let socket = &conns[token]; + + let (len, _) = match socket.recv_from(&mut buf) { + Ok(r) => r, + Err(e) => panic!(format!( + "Could not read UDP socket with error {:?}", + e + )), + }; + match str::from_utf8(&buf[..len]) { + Ok(val) => if parse_statsd( + val, + &mut metrics, + &basic_metric, + parse_config, + ) { + for m in metrics.drain(..) { + send(&mut chans, metric::Event::new_telemetry(m)); + } + STATSD_GOOD_PACKET.fetch_add(1, Ordering::Relaxed); + } else { + STATSD_BAD_PACKET.fetch_add(1, Ordering::Relaxed); + error!("BAD PACKET: {:?}", val); + }, + Err(e) => { + error!("Payload not valid UTF-8: {:?}", e); + } + } + } } - STATSD_GOOD_PACKET.fetch_add(1, Ordering::Relaxed); - } else { - STATSD_BAD_PACKET.fetch_add(1, Ordering::Relaxed); - error!("BAD PACKET: {:?}", val); }, - Err(e) => { - error!("Payload not valid UTF-8: {:?}", e); - } + Err(e) => panic!(format!("Failed during poll {:?}", e)), } - } -} + } // loop +} // handle_udp impl Source for Statsd { - fn run(&mut self) { - let mut joins = Vec::new(); - + fn run(&mut self, poll: mio::Poll) { let addrs = (self.host.as_str(), self.port).to_socket_addrs(); match addrs { Ok(ips) => { - let ips: Vec<_> = ips.collect(); + let mut conns = util::TokenSlab::::new(); for addr in ips { - let listener = - UdpSocket::bind(addr).expect("Unable to bind to TCP socket"); - let chans = self.chans.clone(); - let tags = sync::Arc::clone(&self.tags); - let parse_config = sync::Arc::clone(&self.parse_config); - info!("server started on {:?} {}", addr, self.port); - joins.push(thread::spawn(move || { - handle_udp(chans, &tags, &parse_config, &listener) - })); + let socket = mio::net::UdpSocket::bind(&addr) + .expect("Unable to bind to UDP socket"); + let token = conns.insert(socket); + poll.register( + &conns[token], + token, + mio::Ready::readable(), + mio::PollOpt::edge(), + ).unwrap(); } + + let chans = self.chans.clone(); + info!("server started on *:{}", self.port); + handle_udp(chans, &self.tags, &self.parse_config, &conns, &poll); } Err(e) => { info!( @@ -163,12 +188,5 @@ impl Source for Statsd { ); } } - - for jh in joins { - // TODO Having sub-threads panic will not cause a bubble-up if that - // thread is not the currently examined one. We're going to have to have - // some manner of sub-thread communication going on. - jh.join().expect("Uh oh, child thread panicked!"); - } } } diff --git a/src/thread.rs b/src/thread.rs new file mode 100644 index 00000000..8fc885eb --- /dev/null +++ b/src/thread.rs @@ -0,0 +1,72 @@ +//! Mio enabled threading library. +use constants; +use mio; +use std::thread; + +/// Event polling structure. Alias of `mio::Poll`. +pub type Poll = mio::Poll; +/// Events buffer type. Alias of `mio::Events`. +pub type Events = mio::Events; + +/// Mio enabled thread state. +pub struct ThreadHandle { + /// JoinHandle for the executing thread. + pub handle: thread::JoinHandle<()>, + + /// Readiness signal used to notify the given thread when an event is ready + /// to be consumed on the SYSTEM channel. + readiness: mio::SetReadiness, +} + +/// Trait for stoppable processes. +pub trait Stoppable { + /// Join the given process, blocking until it exits. + fn join(self) -> (); + + /// Gracefully shutdown the process, blocking until exit. + fn shutdown(self) -> (); +} + +impl Stoppable for ThreadHandle { + /// Join the given Thread, blocking until it exits. + fn join(self) { + self.handle.join().expect("Failed to join child thread!"); + } + + /// Gracefully shutdown the given Thread, blocking until it exists. + /// + /// Note - It is the responsability of the developer to ensure + /// that thread logic polls for events occuring on the SYSTEM token. + fn shutdown(self) { + self.readiness + .set_readiness(mio::Ready::readable()) + .expect("Failed to notify child thread of shutdown!"); + self.join(); + } +} + +/// Spawns a new thread executing the provided closure. +pub fn spawn(f: F) -> ThreadHandle +where + F: Send + 'static + FnOnce(mio::Poll) -> (), +{ + let poller = mio::Poll::new().unwrap(); + let (registration, readiness) = mio::Registration::new2(); + + ThreadHandle { + readiness: readiness, + + handle: thread::spawn(move || { + poller + .register( + ®istration, + constants::SYSTEM, + mio::Ready::readable(), + mio::PollOpt::edge(), + ) + .expect("Failed to register system pipe"); + + f(poller); + }), + } +} diff --git a/src/util.rs b/src/util.rs index dc14fadc..2da5adc1 100644 --- a/src/util.rs +++ b/src/util.rs @@ -1,7 +1,24 @@ //! Utility module, a grab-bag of functionality - +use std::ops::Index; +use mio; +use slab; use hopper; use metric; +use seahash::SeaHasher; +use std::collections; +use std::hash; +use constants; + +/// Cernan hashmap +/// +/// In most cases where cernan needs a hashmap we've got smallish inputs as keys +/// and, more, have a smallish number of total elements (< 100k) to store in the +/// map. This hashmap is specialized to address that common use-case. +pub type HashMap = collections::HashMap< + K, + V, + hash::BuildHasherDefault, +>; /// A vector of `hopper::Sender`s. pub type Channel = Vec>; @@ -34,3 +51,45 @@ pub enum Valve { /// them up in the communication queue. Closed, } + +#[inline] +fn token_to_idx(token: &mio::Token) -> usize { + match *token { + mio::Token(idx) => idx, + } +} + +/// Wrapper around Slab +pub struct TokenSlab { + tokens : slab::Slab, +} + +impl Index for TokenSlab { + type Output = E; + + /// Returns Evented object corresponding to Token. + fn index(&self, token: mio::Token) -> &E { + &self.tokens[token_to_idx(&token)] + } +} + +/// Interface wrapping a subset of Slab such +/// that we can magically translate indices to +/// mio::tokens. +impl TokenSlab { + + /// Constructs a new TokenSlab with a capacity derived from the value + /// of constants::SYSTEM. + pub fn new () -> TokenSlab { + TokenSlab { + tokens: slab::Slab::with_capacity(token_to_idx(&constants::SYSTEM)), + } + } + + /// Inserts a new Evented into the slab, returning a mio::Token + /// corresponding to the index of the newly inserted type. + pub fn insert(&mut self, thing : E) -> mio::Token { + let idx = self.tokens.insert(thing); + mio::Token::from(idx) + } +}