From 4633d83f0b3854523ffda80e66f7055fda64cee1 Mon Sep 17 00:00:00 2001 From: AlyHKafoury Date: Thu, 24 Oct 2019 00:54:21 +0200 Subject: [PATCH 01/12] layout of rabbitmq sink Signed-off-by: AlyHKafoury --- Cargo.lock | 205 ++++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 1 + src/sinks/mod.rs | 1 + src/sinks/rabbitmq.rs | 149 ++++++++++++++++++++++++++++++ 4 files changed, 356 insertions(+) create mode 100644 src/sinks/rabbitmq.rs diff --git a/Cargo.lock b/Cargo.lock index f979586f9e11f..36a1252a0271d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -24,6 +24,60 @@ dependencies = [ "num-traits 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "amq-protocol" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "amq-protocol-codegen 3.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "amq-protocol-tcp 3.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "amq-protocol-types 3.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "amq-protocol-uri 3.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "cookie-factory 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "nom 5.0.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "amq-protocol-codegen" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "amq-protocol-types 3.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "handlebars 2.0.2 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.91 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.39 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "amq-protocol-tcp" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "amq-protocol-uri 3.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.6.19 (registry+https://github.com/rust-lang/crates.io-index)", + "tcp-stream 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "amq-protocol-types" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cookie-factory 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "nom 5.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.91 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.39 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "amq-protocol-uri" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "percent-encoding 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "url 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "ansi_term" version = "0.11.0" @@ -300,6 +354,11 @@ dependencies = [ "url 1.7.2 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "cookie-factory" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "cookie_store" version = "0.7.0" @@ -971,6 +1030,21 @@ dependencies = [ "tokio-io 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "handlebars" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "hashbrown 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "pest 2.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "pest_derive 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "quick-error 1.2.2 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.91 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.39 (registry+https://github.com/rust-lang/crates.io-index)", + "walkdir 2.2.7 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "hashbrown" version = "0.1.8" @@ -980,6 +1054,14 @@ dependencies = [ "scopeguard 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "hashbrown" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "serde 1.0.91 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "hdrhistogram" version = "6.1.1" @@ -1214,6 +1296,16 @@ dependencies = [ "unicode-normalization 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "idna" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", + "unicode-bidi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "unicode-normalization 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "indexmap" version = "1.0.2" @@ -1310,6 +1402,30 @@ dependencies = [ "winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "lapin" +version = "0.28.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "amq-protocol 3.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "amq-protocol-codegen 3.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-channel 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.6.19 (registry+https://github.com/rust-lang/crates.io-index)", + "parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.39 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "lapin-futures" +version = "0.28.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures 0.1.27 (registry+https://github.com/rust-lang/crates.io-index)", + "lapin 0.28.1 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "lazy_static" version = "0.2.11" @@ -1432,6 +1548,11 @@ dependencies = [ "cfg-if 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "maplit" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "matchers" version = "0.0.1" @@ -1892,6 +2013,45 @@ name = "percent-encoding" version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "pest" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "ucd-trie 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "pest_derive" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "pest 2.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "pest_generator 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "pest_generator" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "pest 2.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "pest_meta 2.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro2 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "pest_meta" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "maplit 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", + "pest 2.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "sha-1 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "petgraph" version = "0.4.13" @@ -3088,6 +3248,16 @@ dependencies = [ "xattr 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "tcp-stream" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cfg-if 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.6.19 (registry+https://github.com/rust-lang/crates.io-index)", + "native-tls 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "tempdir" version = "0.3.7" @@ -3745,6 +3915,11 @@ dependencies = [ "syn 0.15.34 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "ucd-trie" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "ucd-util" version = "0.1.3" @@ -3830,6 +4005,16 @@ dependencies = [ "percent-encoding 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "url" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "idna 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", + "percent-encoding 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "utf8-ranges" version = "1.0.2" @@ -3894,6 +4079,7 @@ dependencies = [ "indexmap 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", "jemallocator 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "journald 0.1.0", + "lapin-futures 0.28.1 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "leveldb 0.8.4 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.65 (registry+https://github.com/rust-lang/crates.io-index)", @@ -4102,6 +4288,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum adler32 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "7e522997b529f05601e05166c07ed17789691f562762c7f3b987263d2dedee5c" "checksum aho-corasick 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)" = "e6f484ae0c99fec2e858eb6134949117399f222608d84cadb3f58c1f97c2364c" "checksum alga 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)" = "2cc836ad7a40dc9d8049574e2a29979f5dc77deeea4d7ebcd29773452f0e9694" +"checksum amq-protocol 3.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "9555895e88d803ed36f2b2eb93aea0999fb1a2d14b6ecdead999516f321f8320" +"checksum amq-protocol-codegen 3.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c61982c3314daee58c7fc3e071406160b4fefd30159e1c6015d9f880316b52f7" +"checksum amq-protocol-tcp 3.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "11b5b4746400bd566541c1bf04bdd7295a534cab562da8a4946f2772a4d0dde8" +"checksum amq-protocol-types 3.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "dd44e5827bf094402bbe79cd4e496e7f18a8745e31e16ad7b176d2e2c8ef0485" +"checksum amq-protocol-uri 3.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d44f5e8628cb65410b02e68fdc1baff70a011d27091ef1ee5eb9faddff2e220d" "checksum ansi_term 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b" "checksum antidote 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "34fde25430d87a9388dadbe6e34d7f72a462c8b43ac8d309b42b0a8505d7e2a5" "checksum approx 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f0e60b75072ecd4168020818c0107f2857bb6c4e64252d8d3983f6263b40a5c3" @@ -4137,6 +4328,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f" "checksum constant_time_eq 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "8ff012e225ce166d4422e0e78419d901719760f62ae2b7969ca6b564d1b54a9e" "checksum cookie 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)" = "888604f00b3db336d2af898ec3c1d5d0ddf5e6d462220f2ededc33a87ac4bbd5" +"checksum cookie-factory 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5f2b81f8a9b8f5ffa2ed91a82c53392837eab151baf682acf8322c909b6d4fe9" "checksum cookie_store 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "46750b3f362965f197996c4448e4a0935e791bf7d6631bfce9ee0af3d24c919c" "checksum core-foundation 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)" = "25b9e03f145fd4f2bf705e07b900cd41fc636598fe5dc452fd0db1441c3f496d" "checksum core-foundation-sys 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e7ca8a5221364ef15ce201e8ed2f609fc312682a8f4e0e3d4aa5879764e0fa3b" @@ -4208,7 +4400,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum glob 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574" "checksum grok 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6e1f7ecdcd1bb90bea7aebeaea4d5d408cecd72c8616f0e58e6129adf4811220" "checksum h2 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)" = "876d91114d78abbde2e1910e3b2d9d0fd1d89b769e20816dfb68d77992cf4158" +"checksum handlebars 2.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "91ef1ac30f2eaaa2b835fce73c57091cb6b9fc62b7eef285efbf980b0f20001b" "checksum hashbrown 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "3bae29b6653b3412c2e71e9d486db9f9df5d701941d86683005efb9f2d28e3da" +"checksum hashbrown 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e1de41fb8dba9714efd92241565cdff73f78508c95697dd56787d3cba27e2353" "checksum hdrhistogram 6.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "9b4a1f8e87caaefdf781ca2c5bf6f2228fb88963d5010e8dc589bbdbaa4a423a" "checksum headers 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "dc6e2e51d356081258ef05ff4c648138b5d3fe64b7300aaad3b820554a2b7fb6" "checksum headers-core 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "51ae5b0b5417559ee1d2733b21d33b0868ae9e406bd32eb1a51d613f66ed472a" @@ -4229,6 +4423,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum hyper-tls 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "3a800d6aa50af4b5850b2b0f659625ce9504df908e9733b635720483be26174f" "checksum hyperlocal 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d063d6d5658623c6ef16f452e11437c0e7e23a6d327470573fe78892dafbc4fb" "checksum idna 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "38f09e0f0b1fb55fdee1f17470ad800da77af5186a1a76c026b679358b7e844e" +"checksum idna 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "02e2673c30ee86b5b96a9cb52ad15718aa1f966f5ab9ad54a8b95d5ca33120a9" "checksum indexmap 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7e81a7c05f79578dbc15793d8b619db9ba32b4577003ef3af1a91c416798c58d" "checksum inventory 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "21df85981fe094480bc2267723d3dc0fd1ae0d1f136affc659b7398be615d922" "checksum inventory-impl 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "8a877ae8bce77402d5e9ed870730939e097aad827b2a932b361958fa9d6e75aa" @@ -4239,6 +4434,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum jemalloc-sys 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7bef0d4ce37578dfd80b466e3d8324bd9de788e249f1accebb0c472ea4b52bdc" "checksum jemallocator 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c2b69163a3cf2d0fffcd4e1b57921bc6d8fb97ec27f2aeef00562abdaf4ffe2a" "checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" +"checksum lapin 0.28.1 (registry+https://github.com/rust-lang/crates.io-index)" = "29958a8b737705e04d7d526e6b6a5c9a7582472ef02f17aa2ba14068072b8b83" +"checksum lapin-futures 0.28.1 (registry+https://github.com/rust-lang/crates.io-index)" = "68f71b9c4d1f0ce2c9b39e852b60881974292fb5cdd440ab89bce7830aba1382" "checksum lazy_static 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "76f033c7ad61445c5b347c7382dd1237847eb1bce590fe50365dcb33d546be73" "checksum lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "bc5729f27f159ddd61f4df6228e827e86643d4d3e7c32183cb30a1c08f604a14" "checksum leveldb 0.8.4 (registry+https://github.com/rust-lang/crates.io-index)" = "8438a36a31c982ac399c4477d7e3c62cc7a6bf91bb6f42837b7e1033359fcbad" @@ -4254,6 +4451,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum lock_api 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "62ebf1391f6acad60e5c8b43706dde4582df75c06698ab44511d15016bc2442c" "checksum lock_api 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f8912e782533a93a167888781b836336a6ca5da6175c05944c86cf28c31104dc" "checksum log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)" = "14b6052be84e6b71ab17edffc2eeabf5c2c3ae1fdb464aae35ac50c67a44e1f7" +"checksum maplit 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d" "checksum matchers 0.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f099785f7595cc4b4553a174ce30dd7589ef93391ff414dbb67f62392b9e0ce1" "checksum matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08" "checksum matrixmultiply 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "dcfed72d871629daa12b25af198f110e8095d7650f5f4c61c5bac28364604f9b" @@ -4306,6 +4504,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum pdqselect 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "4ec91767ecc0a0bbe558ce8c9da33c068066c57ecc8bb8477ef8c1ad3ef77c27" "checksum percent-encoding 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "31010dd2e1ac33d5b46a5b413495239882813e0369f8ed8a5e266f173602f831" "checksum percent-encoding 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" +"checksum pest 2.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7e4fb201c5c22a55d8b24fef95f78be52738e5e1361129be1b5e862ecdb6894a" +"checksum pest_derive 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "833d1ae558dc601e9a60366421196a8d94bc0ac980476d0b67e1d0988d72b2d0" +"checksum pest_generator 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "7b9fcf299b5712d06ee128a556c94709aaa04512c4dffb8ead07c5c998447fc0" +"checksum pest_meta 2.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "df43fd99896fd72c485fe47542c7b500e4ac1e8700bf995544d1317a60ded547" "checksum petgraph 0.4.13 (registry+https://github.com/rust-lang/crates.io-index)" = "9c3659d1ee90221741f65dd128d9998311b0e40c5d3c23a62445938214abce4f" "checksum phf 0.7.24 (registry+https://github.com/rust-lang/crates.io-index)" = "b3da44b85f8e8dfaec21adae67f95d93244b2ecf6ad2a692320598dcc8e6dd18" "checksum phf_codegen 0.7.24 (registry+https://github.com/rust-lang/crates.io-index)" = "b03e85129e324ad4166b06b2c7491ae27fe3ec353af72e72cd1654c7225d517e" @@ -4431,6 +4633,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum synstructure 0.10.2 (registry+https://github.com/rust-lang/crates.io-index)" = "02353edf96d6e4dc81aea2d8490a7e9db177bf8acb0e951c24940bf866cb313f" "checksum syslog_rfc5424 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d1f5c427514a0cd2a4a7e28d307672e56ddd4af03c6685e8b8e7f148a8b77e7b" "checksum tar 0.4.26 (registry+https://github.com/rust-lang/crates.io-index)" = "b3196bfbffbba3e57481b6ea32249fbaf590396a52505a2615adbb79d9d826d3" +"checksum tcp-stream 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "73f7fc1356d9f5141f4a091ec2fddc152fd4f09c0cc9bddfa8146e0cff0cbd13" "checksum tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)" = "15f2b5fb00ccdf689e0149d1b1b3c03fead81c2b37735d812fa8bddbbf41b6d8" "checksum tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7a6e24d9338a0a5be79593e2fa15a648add6138caa803e2d5bc782c371732ca9" "checksum termcolor 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "4096add70612622289f2fdcdbd5086dc81c1e2675e6ae58d6c4f62a16c6d7f2f" @@ -4487,6 +4690,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum typenum 1.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "612d636f949607bdf9b123b4a6f6d966dedf3ff669f7f045890d3a4a73948169" "checksum typetag 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "7573a67ebbc8696e879d902067bc278671b7502abd8604b7336d87efd47e5858" "checksum typetag-impl 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ef49605e1070ec194bd9d2c360a975d47ba164460de183cbc53aa1be8c4db6dc" +"checksum ucd-trie 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "8f00ed7be0c1ff1e24f46c3d2af4859f7e863672ba3a6e92e7cff702bf9f06c2" "checksum ucd-util 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "535c204ee4d8434478593480b8f86ab45ec9aae0e83c568ca81abf0fd0e88f86" "checksum unicase 1.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7f4765f83163b74f957c797ad9253caf97f103fb064d3999aea9568d09fc8a33" "checksum unicase 2.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a84e5511b2a947f3ae965dcb29b13b7b1691b6e7332cf5dbc1744138d5acb7f6" @@ -4500,6 +4704,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum unreachable 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "382810877fe448991dfc7f0dd6e3ae5d58088fd0ea5e35189655f84e6814fa56" "checksum untrusted 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "55cd1f4b4e96b46aeb8d4855db4a7a9bd96eeeb5c6a1ab54593328761642ce2f" "checksum url 1.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "dd4e7c0d531266369519a4aa4f399d748bd37043b00bde1e4ff1f60a120b355a" +"checksum url 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "75b414f6c464c879d7f9babf951f23bc3743fb7313c081b2e6ca719067ea9d61" "checksum utf8-ranges 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "796f7e48bef87609f7ade7e06495a87d5cd06c7866e6a5cbfceffc558a243737" "checksum uuid 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)" = "e1436e58182935dcd9ce0add9ea0b558e8a87befe01c1a301e6020aeb0876363" "checksum uuid 0.7.4 (registry+https://github.com/rust-lang/crates.io-index)" = "90dbc611eb48397705a6b0f6e917da23ae517e4d127123d2cf7674206627d32a" diff --git a/Cargo.toml b/Cargo.toml index b96aee7b3fc63..1707af904fceb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -127,6 +127,7 @@ base64 = "0.10.1" shiplift = { git = "https://github.com/LucioFranco/shiplift", branch = "timber" } owning_ref = "0.4.0" listenfd = "0.3.3" +lapin-futures = "0.28.0" [build-dependencies] prost-build = "0.4.0" diff --git a/src/sinks/mod.rs b/src/sinks/mod.rs index fdac93a9ddb15..6876e1cb4fd7e 100644 --- a/src/sinks/mod.rs +++ b/src/sinks/mod.rs @@ -15,6 +15,7 @@ pub mod http; #[cfg(feature = "rdkafka")] pub mod kafka; pub mod prometheus; +pub mod rabbitmq; pub mod splunk_hec; pub mod statsd; pub mod tcp; diff --git a/src/sinks/rabbitmq.rs b/src/sinks/rabbitmq.rs new file mode 100644 index 0000000000000..49a57d7c9507e --- /dev/null +++ b/src/sinks/rabbitmq.rs @@ -0,0 +1,149 @@ +use crate::{ + buffers::Acker, + event::{self, Event}, + topology::config::{DataType, SinkConfig}, + Error, +}; +use futures::{ + future::{self, poll_fn, IntoFuture}, + stream::FuturesUnordered, + Async, AsyncSink, Future, Poll, Sink, StartSend, Stream, +}; +use lapin::options::{BasicPublishOptions, QueueDeclareOptions}; +use lapin::types::FieldTable; +use lapin::{BasicProperties, Client, ConnectionProperties}; +use lapin_futures as lapin; +use log::info; +use serde::{Deserialize, Serialize}; +use std::{thread, time::Duration}; +use string_cache::DefaultAtom as Atom; + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct RabbitMQSinkConfig { + addr: String, + encoding: Encoding, + queue_name: String, +} + +#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone)] +#[serde(rename_all = "snake_case")] +pub enum Encoding { + Text, + Json, +} + +pub struct RabbitMQSink { + acker: Acker, + channel: lapin_futures::Channel, + encoding: Encoding, + queue_name: String, +} + +impl RabbitMQSink { + fn new( + config: RabbitMQSinkConfig, + channel: lapin_futures::Channel, + acker: Acker, + ) -> crate::Result { + Ok(RabbitMQSink { + acker, + channel, + encoding: config.encoding, + queue_name: config.queue_name, + }) + } +} + +#[typetag::serde(name = "rabbitmq")] +impl SinkConfig for RabbitMQSinkConfig { + fn build(&self, acker: Acker) -> crate::Result<(super::RouterSink, super::Healthcheck)> { + let channel = Client::connect(&self.addr, ConnectionProperties::default()) + .and_then(|client| client.create_channel()) + .wait() + .unwrap(); + channel + .queue_declare( + &self.queue_name, + QueueDeclareOptions::default(), + FieldTable::default(), + ) + .wait() + .unwrap(); + let sink = RabbitMQSink::new(self.clone(), channel.clone(), acker)?; + let hc = healthcheck(self.clone()); + Ok((Box::new(sink), hc)) + } + + fn input_type(&self) -> DataType { + DataType::Log + } +} + +impl Sink for RabbitMQSink { + type SinkItem = Event; + type SinkError = (); + + fn start_send(&mut self, item: Self::SinkItem) -> StartSend { + self + .channel + .basic_publish( + "", + "hello", + b"hello from tokio".to_vec(), + BasicPublishOptions::default(), + BasicProperties::default(), + ) + .wait() + .unwrap(); + Ok(AsyncSink::Ready) + } + + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + Ok(Async::Ready(())) + } +} + +fn healthcheck(config: RabbitMQSinkConfig) -> super::Healthcheck { + let check = poll_fn(move || Ok(Async::Ready(()))); + + Box::new(check) +} + +fn encode_event( + event: &Event, + key_field: &Option, + encoding: &Encoding, +) -> (Vec, Vec) { + let key = key_field + .as_ref() + .and_then(|f| event.as_log().get(f)) + .map(|v| v.as_bytes().to_vec()) + .unwrap_or(Vec::new()); + + let body = match encoding { + &Encoding::Json => serde_json::to_vec(&event.as_log().clone().unflatten()).unwrap(), + &Encoding::Text => event + .as_log() + .get(&event::MESSAGE) + .map(|v| v.as_bytes().to_vec()) + .unwrap_or(Vec::new()), + }; + + (key, body) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn simple_test() { + let config = RabbitMQSinkConfig { + addr: String::from("amqp://127.0.0.1:5672/%2f"), + encoding: Encoding::Text, + queue_name: String::from("hello"), + }; + let acker = Acker::Null; + let mut rabbit = config.build(acker).unwrap(); + } +} From 8446bbbc7af1957ce1ee06f1268164f5955d1fd7 Mon Sep 17 00:00:00 2001 From: AlyHKafoury Date: Thu, 24 Oct 2019 04:00:22 +0200 Subject: [PATCH 02/12] layout of send and poll Signed-off-by: AlyHKafoury --- src/sinks/rabbitmq.rs | 65 ++++++++++++++++++++++--------------------- 1 file changed, 34 insertions(+), 31 deletions(-) diff --git a/src/sinks/rabbitmq.rs b/src/sinks/rabbitmq.rs index 49a57d7c9507e..05668af76cedb 100644 --- a/src/sinks/rabbitmq.rs +++ b/src/sinks/rabbitmq.rs @@ -1,6 +1,7 @@ use crate::{ buffers::Acker, event::{self, Event}, + sinks::util::MetadataFuture, topology::config::{DataType, SinkConfig}, Error, }; @@ -13,10 +14,9 @@ use lapin::options::{BasicPublishOptions, QueueDeclareOptions}; use lapin::types::FieldTable; use lapin::{BasicProperties, Client, ConnectionProperties}; use lapin_futures as lapin; -use log::info; +use lapin_futures::ConfirmationFuture; use serde::{Deserialize, Serialize}; use std::{thread, time::Duration}; -use string_cache::DefaultAtom as Atom; #[derive(Serialize, Deserialize, Clone, Debug)] pub struct RabbitMQSinkConfig { @@ -36,6 +36,7 @@ pub struct RabbitMQSink { acker: Acker, channel: lapin_futures::Channel, encoding: Encoding, + in_flight: FuturesUnordered, ()>>, queue_name: String, } @@ -49,6 +50,7 @@ impl RabbitMQSink { acker, channel, encoding: config.encoding, + in_flight: FuturesUnordered::new(), queue_name: config.queue_name, }) } @@ -59,16 +61,14 @@ impl SinkConfig for RabbitMQSinkConfig { fn build(&self, acker: Acker) -> crate::Result<(super::RouterSink, super::Healthcheck)> { let channel = Client::connect(&self.addr, ConnectionProperties::default()) .and_then(|client| client.create_channel()) - .wait() - .unwrap(); + .wait()?; channel .queue_declare( &self.queue_name, QueueDeclareOptions::default(), FieldTable::default(), ) - .wait() - .unwrap(); + .wait()?; let sink = RabbitMQSink::new(self.clone(), channel.clone(), acker)?; let hc = healthcheck(self.clone()); Ok((Box::new(sink), hc)) @@ -84,22 +84,35 @@ impl Sink for RabbitMQSink { type SinkError = (); fn start_send(&mut self, item: Self::SinkItem) -> StartSend { - self - .channel - .basic_publish( - "", - "hello", - b"hello from tokio".to_vec(), - BasicPublishOptions::default(), - BasicProperties::default(), - ) - .wait() - .unwrap(); + let payload = encode_event(&item, &self.encoding); + let future = self.channel.basic_publish( + "", + &self.queue_name, + payload, + BasicPublishOptions::default(), + BasicProperties::default(), + ); + self.in_flight.push(future.join(future::ok(()))); Ok(AsyncSink::Ready) } fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { - Ok(Async::Ready(())) + loop { + match self.in_flight.poll() { + // nothing ready yet + Ok(Async::NotReady) => return Ok(Async::NotReady), + + // nothing in flight + Ok(Async::Ready(None)) => return Ok(Async::Ready(())), + + // request finished, check for success + Ok(Async::Ready(Some(((), _)))) => { + trace!("published message to rabbitmq"); + } + + Err(e) => error!("publishing message failed: {}", e), + } + } } } @@ -109,18 +122,8 @@ fn healthcheck(config: RabbitMQSinkConfig) -> super::Healthcheck { Box::new(check) } -fn encode_event( - event: &Event, - key_field: &Option, - encoding: &Encoding, -) -> (Vec, Vec) { - let key = key_field - .as_ref() - .and_then(|f| event.as_log().get(f)) - .map(|v| v.as_bytes().to_vec()) - .unwrap_or(Vec::new()); - - let body = match encoding { +fn encode_event(event: &Event, encoding: &Encoding) -> Vec { + let payload = match encoding { &Encoding::Json => serde_json::to_vec(&event.as_log().clone().unflatten()).unwrap(), &Encoding::Text => event .as_log() @@ -129,7 +132,7 @@ fn encode_event( .unwrap_or(Vec::new()), }; - (key, body) + payload } #[cfg(test)] From e68a72a7d8c5564fe60760f5d3452c9aab93ec3c Mon Sep 17 00:00:00 2001 From: AlyHKafoury Date: Thu, 24 Oct 2019 04:16:56 +0200 Subject: [PATCH 03/12] move channel creation to sink new Signed-off-by: AlyHKafoury --- src/sinks/rabbitmq.rs | 28 ++++++++++++---------------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/src/sinks/rabbitmq.rs b/src/sinks/rabbitmq.rs index 05668af76cedb..f0bece37393ff 100644 --- a/src/sinks/rabbitmq.rs +++ b/src/sinks/rabbitmq.rs @@ -41,11 +41,17 @@ pub struct RabbitMQSink { } impl RabbitMQSink { - fn new( - config: RabbitMQSinkConfig, - channel: lapin_futures::Channel, - acker: Acker, - ) -> crate::Result { + fn new(config: RabbitMQSinkConfig, acker: Acker) -> crate::Result { + let channel = Client::connect(&config.addr, ConnectionProperties::default()) + .and_then(|client| client.create_channel()) + .wait()?; + channel + .queue_declare( + &config.queue_name, + QueueDeclareOptions::default(), + FieldTable::default(), + ) + .wait()?; Ok(RabbitMQSink { acker, channel, @@ -59,17 +65,7 @@ impl RabbitMQSink { #[typetag::serde(name = "rabbitmq")] impl SinkConfig for RabbitMQSinkConfig { fn build(&self, acker: Acker) -> crate::Result<(super::RouterSink, super::Healthcheck)> { - let channel = Client::connect(&self.addr, ConnectionProperties::default()) - .and_then(|client| client.create_channel()) - .wait()?; - channel - .queue_declare( - &self.queue_name, - QueueDeclareOptions::default(), - FieldTable::default(), - ) - .wait()?; - let sink = RabbitMQSink::new(self.clone(), channel.clone(), acker)?; + let sink = RabbitMQSink::new(self.clone(), acker)?; let hc = healthcheck(self.clone()); Ok((Box::new(sink), hc)) } From bba568a99f3880d9236e1891a6fdaa99338060d2 Mon Sep 17 00:00:00 2001 From: AlyHKafoury Date: Sat, 26 Oct 2019 04:26:43 +0200 Subject: [PATCH 04/12] move all lapin configs to sink config Signed-off-by: AlyHKafoury --- src/sinks/rabbitmq.rs | 104 ++++++++++++++++++++++++++++++++++++------ 1 file changed, 91 insertions(+), 13 deletions(-) diff --git a/src/sinks/rabbitmq.rs b/src/sinks/rabbitmq.rs index f0bece37393ff..ece75e2d5ed68 100644 --- a/src/sinks/rabbitmq.rs +++ b/src/sinks/rabbitmq.rs @@ -3,26 +3,98 @@ use crate::{ event::{self, Event}, sinks::util::MetadataFuture, topology::config::{DataType, SinkConfig}, - Error, }; use futures::{ - future::{self, poll_fn, IntoFuture}, + future::{self, poll_fn}, stream::FuturesUnordered, Async, AsyncSink, Future, Poll, Sink, StartSend, Stream, }; -use lapin::options::{BasicPublishOptions, QueueDeclareOptions}; -use lapin::types::FieldTable; -use lapin::{BasicProperties, Client, ConnectionProperties}; -use lapin_futures as lapin; -use lapin_futures::ConfirmationFuture; +use lapin_futures::{ + auth::SASLMechanism, + options::{BasicPublishOptions, QueueDeclareOptions}, + types::FieldTable, + BasicProperties, Client, ConfirmationFuture, ConnectionProperties, +}; use serde::{Deserialize, Serialize}; -use std::{thread, time::Duration}; + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub enum SASLMechanismDef { + AMQPlain, + External, + Plain, + RabbitCrDemo, +} + +impl SASLMechanismDef { + pub fn to_sasl_mechanism(&self) -> SASLMechanism { + match &self { + SASLMechanismDef::AMQPlain => SASLMechanism::AMQPlain, + SASLMechanismDef::External => SASLMechanism::External, + SASLMechanismDef::Plain => SASLMechanism::Plain, + SASLMechanismDef::RabbitCrDemo => SASLMechanism::RabbitCrDemo, + } + } +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct ConnectionPropertiesDef { + pub mechanism: SASLMechanismDef, + pub locale: String, + pub client_properties: FieldTable, + pub max_executor_threads: usize, +} + +impl Default for ConnectionPropertiesDef { + fn default() -> ConnectionPropertiesDef { + ConnectionPropertiesDef { + mechanism: SASLMechanismDef::Plain, + locale: "en_US".into(), + client_properties: FieldTable::default(), + max_executor_threads: 1, + } + } +} + +#[derive(Serialize, Deserialize, Default, Clone, Debug)] +pub struct QueueDeclareOptionsDef { + pub passive: bool, + pub durable: bool, + pub exclusive: bool, + pub auto_delete: bool, + pub nowait: bool, +} #[derive(Serialize, Deserialize, Clone, Debug)] pub struct RabbitMQSinkConfig { addr: String, + connection_properties: ConnectionPropertiesDef, encoding: Encoding, + exchange: String, + field_table: FieldTable, queue_name: String, + queue_declare_options: QueueDeclareOptionsDef, +} + +impl RabbitMQSinkConfig { + pub fn connection_properties(&self) -> ConnectionProperties { + ConnectionProperties { + mechanism: self.connection_properties.mechanism.to_sasl_mechanism(), + locale: self.connection_properties.locale.clone(), + client_properties: self.connection_properties.client_properties.clone(), + executor: None, + max_executor_threads: self.connection_properties.max_executor_threads, + } + } + + pub fn queue_declare_options(&self) -> QueueDeclareOptions { + QueueDeclareOptions { + passive: self.queue_declare_options.passive, + durable: self.queue_declare_options.durable, + exclusive: self.queue_declare_options.exclusive, + auto_delete: self.queue_declare_options.auto_delete, + nowait: self.queue_declare_options.nowait, + } + } } #[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone)] @@ -36,26 +108,28 @@ pub struct RabbitMQSink { acker: Acker, channel: lapin_futures::Channel, encoding: Encoding, + exchange: String, in_flight: FuturesUnordered, ()>>, queue_name: String, } impl RabbitMQSink { fn new(config: RabbitMQSinkConfig, acker: Acker) -> crate::Result { - let channel = Client::connect(&config.addr, ConnectionProperties::default()) + let channel = Client::connect(&config.addr, config.connection_properties()) .and_then(|client| client.create_channel()) .wait()?; channel .queue_declare( &config.queue_name, - QueueDeclareOptions::default(), - FieldTable::default(), + config.queue_declare_options(), + config.field_table, ) .wait()?; Ok(RabbitMQSink { acker, channel, encoding: config.encoding, + exchange: config.exchange, in_flight: FuturesUnordered::new(), queue_name: config.queue_name, }) @@ -82,7 +156,7 @@ impl Sink for RabbitMQSink { fn start_send(&mut self, item: Self::SinkItem) -> StartSend { let payload = encode_event(&item, &self.encoding); let future = self.channel.basic_publish( - "", + &self.exchange, &self.queue_name, payload, BasicPublishOptions::default(), @@ -139,10 +213,14 @@ mod tests { fn simple_test() { let config = RabbitMQSinkConfig { addr: String::from("amqp://127.0.0.1:5672/%2f"), + connection_properties: ConnectionPropertiesDef::default(), encoding: Encoding::Text, + exchange: String::from(""), + field_table: FieldTable::default(), queue_name: String::from("hello"), + queue_declare_options: QueueDeclareOptionsDef::default(), }; let acker = Acker::Null; - let mut rabbit = config.build(acker).unwrap(); + let rabbit = config.build(acker).unwrap(); } } From 6f8ae4a7f688adaa0cc70638e97f15d55421f575 Mon Sep 17 00:00:00 2001 From: AlyHKafoury Date: Sat, 26 Oct 2019 05:31:49 +0200 Subject: [PATCH 05/12] exposed basic publish options in sink config Signed-off-by: AlyHKafoury --- src/sinks/rabbitmq.rs | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/src/sinks/rabbitmq.rs b/src/sinks/rabbitmq.rs index ece75e2d5ed68..a3018ffdbcfc0 100644 --- a/src/sinks/rabbitmq.rs +++ b/src/sinks/rabbitmq.rs @@ -64,9 +64,16 @@ pub struct QueueDeclareOptionsDef { pub nowait: bool, } +#[derive(Serialize, Deserialize, Default, Clone, Debug)] +pub struct BasicPublishOptionsDef { + pub mandatory: bool, + pub immediate: bool, +} + #[derive(Serialize, Deserialize, Clone, Debug)] pub struct RabbitMQSinkConfig { addr: String, + basic_publish_options: BasicPublishOptionsDef, connection_properties: ConnectionPropertiesDef, encoding: Encoding, exchange: String, @@ -95,6 +102,13 @@ impl RabbitMQSinkConfig { nowait: self.queue_declare_options.nowait, } } + + pub fn basic_publish_options(&self) -> BasicPublishOptions { + BasicPublishOptions { + immediate: self.basic_publish_options.mandatory, + mandatory: self.basic_publish_options.mandatory, + } + } } #[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone)] @@ -106,6 +120,7 @@ pub enum Encoding { pub struct RabbitMQSink { acker: Acker, + basic_publish_options: BasicPublishOptions, channel: lapin_futures::Channel, encoding: Encoding, exchange: String, @@ -122,11 +137,12 @@ impl RabbitMQSink { .queue_declare( &config.queue_name, config.queue_declare_options(), - config.field_table, + config.field_table.clone(), ) .wait()?; Ok(RabbitMQSink { acker, + basic_publish_options: config.basic_publish_options(), channel, encoding: config.encoding, exchange: config.exchange, @@ -159,7 +175,7 @@ impl Sink for RabbitMQSink { &self.exchange, &self.queue_name, payload, - BasicPublishOptions::default(), + self.basic_publish_options.clone(), BasicProperties::default(), ); self.in_flight.push(future.join(future::ok(()))); @@ -213,6 +229,7 @@ mod tests { fn simple_test() { let config = RabbitMQSinkConfig { addr: String::from("amqp://127.0.0.1:5672/%2f"), + basic_publish_options: BasicPublishOptionsDef::default(), connection_properties: ConnectionPropertiesDef::default(), encoding: Encoding::Text, exchange: String::from(""), From cc889c63a47dd858faf9507c6ec94ec913e3642d Mon Sep 17 00:00:00 2001 From: AlyHKafoury Date: Sat, 26 Oct 2019 19:13:19 +0200 Subject: [PATCH 06/12] acking the sent message Signed-off-by: AlyHKafoury --- src/sinks/rabbitmq.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/sinks/rabbitmq.rs b/src/sinks/rabbitmq.rs index a3018ffdbcfc0..2e874e728ced8 100644 --- a/src/sinks/rabbitmq.rs +++ b/src/sinks/rabbitmq.rs @@ -124,7 +124,8 @@ pub struct RabbitMQSink { channel: lapin_futures::Channel, encoding: Encoding, exchange: String, - in_flight: FuturesUnordered, ()>>, + in_flight: FuturesUnordered, usize>>, + seqno: usize, queue_name: String, } @@ -147,6 +148,7 @@ impl RabbitMQSink { encoding: config.encoding, exchange: config.exchange, in_flight: FuturesUnordered::new(), + seqno: 0, queue_name: config.queue_name, }) } @@ -178,7 +180,8 @@ impl Sink for RabbitMQSink { self.basic_publish_options.clone(), BasicProperties::default(), ); - self.in_flight.push(future.join(future::ok(()))); + self.in_flight.push(future.join(future::ok(self.seqno))); + self.seqno += 1; Ok(AsyncSink::Ready) } @@ -192,7 +195,8 @@ impl Sink for RabbitMQSink { Ok(Async::Ready(None)) => return Ok(Async::Ready(())), // request finished, check for success - Ok(Async::Ready(Some(((), _)))) => { + Ok(Async::Ready(Some(((), seqno)))) => { + self.acker.ack(seqno); trace!("published message to rabbitmq"); } From d5da3553bb43c294a81d29a15cbb044083ca68ac Mon Sep 17 00:00:00 2001 From: AlyHKafoury Date: Sat, 26 Oct 2019 19:55:13 +0200 Subject: [PATCH 07/12] add health check Signed-off-by: AlyHKafoury --- src/sinks/rabbitmq.rs | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/src/sinks/rabbitmq.rs b/src/sinks/rabbitmq.rs index 2e874e728ced8..a0699d5d03da0 100644 --- a/src/sinks/rabbitmq.rs +++ b/src/sinks/rabbitmq.rs @@ -5,7 +5,7 @@ use crate::{ topology::config::{DataType, SinkConfig}, }; use futures::{ - future::{self, poll_fn}, + future::{self, poll_fn, IntoFuture}, stream::FuturesUnordered, Async, AsyncSink, Future, Poll, Sink, StartSend, Stream, }; @@ -207,7 +207,15 @@ impl Sink for RabbitMQSink { } fn healthcheck(config: RabbitMQSinkConfig) -> super::Healthcheck { - let check = poll_fn(move || Ok(Async::Ready(()))); + let check = poll_fn(move || { + tokio_threadpool::blocking(|| { + Client::connect(&config.addr, config.connection_properties()) + .map(|_| ()) + .map_err(|err| err.into()) + }) + }) + .map_err(|err| err.into()) + .and_then(|result| result.into_future()); Box::new(check) } @@ -242,6 +250,6 @@ mod tests { queue_declare_options: QueueDeclareOptionsDef::default(), }; let acker = Acker::Null; - let rabbit = config.build(acker).unwrap(); + let _rabbit = config.build(acker).unwrap(); } } From e69894679be4075dd1b0b2f01e5e5b7ce5a59f1d Mon Sep 17 00:00:00 2001 From: AlyHKafoury Date: Sun, 27 Oct 2019 04:24:52 +0200 Subject: [PATCH 08/12] simple unit tests Signed-off-by: AlyHKafoury --- src/sinks/rabbitmq.rs | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/src/sinks/rabbitmq.rs b/src/sinks/rabbitmq.rs index a0699d5d03da0..5c8f9a19df3a6 100644 --- a/src/sinks/rabbitmq.rs +++ b/src/sinks/rabbitmq.rs @@ -236,9 +236,31 @@ fn encode_event(event: &Event, encoding: &Encoding) -> Vec { #[cfg(test)] mod tests { use super::*; + use std::collections::HashMap; #[test] - fn simple_test() { + fn rabbitmq_encode_event_text() { + let message = "hello world".to_string(); + let bytes = encode_event(&message.clone().into(), &Encoding::Text); + assert_eq!(&bytes[..], message.as_bytes()); + } + + #[test] + fn rabbitmq_encode_event_json() { + let message = "hello world".to_string(); + let event = Event::from(message.clone()); + let bytes = encode_event(&event, &Encoding::Json); + let map: HashMap = serde_json::from_slice(&bytes[..]).unwrap(); + assert_eq!(map[&event::MESSAGE.to_string()], message); + } +} + +#[cfg(feature = "rabbitmq-integration-tests")] +#[cfg(test)] +mod integration_test { + use super::*; + #[test] + fn create_connection() { let config = RabbitMQSinkConfig { addr: String::from("amqp://127.0.0.1:5672/%2f"), basic_publish_options: BasicPublishOptionsDef::default(), From fa58841a6fbd6ab3237e87f67b5b5b662f463cc1 Mon Sep 17 00:00:00 2001 From: AlyHKafoury Date: Sun, 27 Oct 2019 23:06:35 +0200 Subject: [PATCH 09/12] adding rabbitmq integration test and feature Signed-off-by: AlyHKafoury --- Cargo.toml | 1 + src/sinks/rabbitmq.rs | 66 +++++++++++++++++++++++++++++++++++++++---- 2 files changed, 62 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 1707af904fceb..e04cda65945b3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -169,6 +169,7 @@ kinesis-integration-tests = [] s3-integration-tests = [] splunk-integration-tests = [] docker-integration-tests = [] +rabbitmq-integration-tests = [] [[bench]] name = "bench" diff --git a/src/sinks/rabbitmq.rs b/src/sinks/rabbitmq.rs index 5c8f9a19df3a6..09abf4822400b 100644 --- a/src/sinks/rabbitmq.rs +++ b/src/sinks/rabbitmq.rs @@ -259,19 +259,75 @@ mod tests { #[cfg(test)] mod integration_test { use super::*; + use crate::test_util::{block_on, random_lines_with_stream, random_string}; + use lapin_futures::options::BasicConsumeOptions; + use std::{collections::HashSet, iter::FromIterator}; + #[test] - fn create_connection() { + fn publish_messages() { + let queue_name = format!("test-{}", random_string(10)); + let addr = String::from("amqp://127.0.0.1:5672/%2f"); let config = RabbitMQSinkConfig { - addr: String::from("amqp://127.0.0.1:5672/%2f"), + addr: addr.clone(), basic_publish_options: BasicPublishOptionsDef::default(), connection_properties: ConnectionPropertiesDef::default(), encoding: Encoding::Text, exchange: String::from(""), field_table: FieldTable::default(), - queue_name: String::from("hello"), + queue_name: queue_name.clone(), queue_declare_options: QueueDeclareOptionsDef::default(), }; - let acker = Acker::Null; - let _rabbit = config.build(acker).unwrap(); + // publish messages to test rabbit queue + let (acker, ack_counter) = Acker::new_for_testing(); + let rabbit = RabbitMQSink::new(config, acker).unwrap(); + let number_of_events = 1000; + let (input, events) = random_lines_with_stream(100, number_of_events); + let pump = rabbit.send_all(events); + block_on(pump).unwrap(); + let mut messages: HashSet = HashSet::from_iter(input); + + // create consumer to check the existence of the previously pushed messages + let channel = Client::connect(&addr, ConnectionProperties::default()) + .and_then(|client| client.create_channel()) + .wait() + .unwrap(); + let consumer_name = format!("consumer-{}", random_string(5)); + let consumer = channel + .queue_declare( + &queue_name, + QueueDeclareOptions::default(), + FieldTable::default(), + ) + .and_then(|queue| { + channel.basic_consume( + &queue, + &consumer_name, + BasicConsumeOptions::default(), + FieldTable::default(), + ) + }) + .wait() + .unwrap(); + // check that all messages exist in rabbitmq + let mut counter = 0; + for item in consumer.wait() { + match item { + Ok(message) => { + let string_message = String::from_utf8_lossy(&message.data); + messages.remove(&string_message[..]); + channel.basic_ack(message.delivery_tag, false); + } + Err(e) => error!("failed to run rabbitmq test: {}", e), + } + counter += 1; + if counter == number_of_events { + break; + } + } + assert_eq!(messages.len(), 0); + assert_eq!( + ack_counter.load(std::sync::atomic::Ordering::Relaxed), + number_of_events + ); } } From 0e38a74f97e6d959127a2b8d79fb7465f2b58e61 Mon Sep 17 00:00:00 2001 From: AlyHKafoury Date: Sun, 27 Oct 2019 23:16:08 +0200 Subject: [PATCH 10/12] fix rustfmt Signed-off-by: AlyHKafoury --- src/sinks/rabbitmq.rs | 505 +++++++++++++++++++++--------------------- 1 file changed, 253 insertions(+), 252 deletions(-) diff --git a/src/sinks/rabbitmq.rs b/src/sinks/rabbitmq.rs index 09abf4822400b..d110d6a0cb2c8 100644 --- a/src/sinks/rabbitmq.rs +++ b/src/sinks/rabbitmq.rs @@ -1,333 +1,334 @@ use crate::{ - buffers::Acker, - event::{self, Event}, - sinks::util::MetadataFuture, - topology::config::{DataType, SinkConfig}, + buffers::Acker, + event::{self, Event}, + sinks::util::MetadataFuture, + topology::config::{DataType, SinkConfig}, }; use futures::{ - future::{self, poll_fn, IntoFuture}, - stream::FuturesUnordered, - Async, AsyncSink, Future, Poll, Sink, StartSend, Stream, + future::{self, poll_fn, IntoFuture}, + stream::FuturesUnordered, + Async, AsyncSink, Future, Poll, Sink, StartSend, Stream, }; use lapin_futures::{ - auth::SASLMechanism, - options::{BasicPublishOptions, QueueDeclareOptions}, - types::FieldTable, - BasicProperties, Client, ConfirmationFuture, ConnectionProperties, + auth::SASLMechanism, + options::{BasicPublishOptions, QueueDeclareOptions}, + types::FieldTable, + BasicProperties, Client, ConfirmationFuture, ConnectionProperties, }; use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize, Clone, Debug)] pub enum SASLMechanismDef { - AMQPlain, - External, - Plain, - RabbitCrDemo, + AMQPlain, + External, + Plain, + RabbitCrDemo, } impl SASLMechanismDef { - pub fn to_sasl_mechanism(&self) -> SASLMechanism { - match &self { - SASLMechanismDef::AMQPlain => SASLMechanism::AMQPlain, - SASLMechanismDef::External => SASLMechanism::External, - SASLMechanismDef::Plain => SASLMechanism::Plain, - SASLMechanismDef::RabbitCrDemo => SASLMechanism::RabbitCrDemo, + pub fn to_sasl_mechanism(&self) -> SASLMechanism { + match &self { + SASLMechanismDef::AMQPlain => SASLMechanism::AMQPlain, + SASLMechanismDef::External => SASLMechanism::External, + SASLMechanismDef::Plain => SASLMechanism::Plain, + SASLMechanismDef::RabbitCrDemo => SASLMechanism::RabbitCrDemo, + } } - } } #[derive(Serialize, Deserialize, Clone, Debug)] pub struct ConnectionPropertiesDef { - pub mechanism: SASLMechanismDef, - pub locale: String, - pub client_properties: FieldTable, - pub max_executor_threads: usize, + pub mechanism: SASLMechanismDef, + pub locale: String, + pub client_properties: FieldTable, + pub max_executor_threads: usize, } impl Default for ConnectionPropertiesDef { - fn default() -> ConnectionPropertiesDef { - ConnectionPropertiesDef { - mechanism: SASLMechanismDef::Plain, - locale: "en_US".into(), - client_properties: FieldTable::default(), - max_executor_threads: 1, + fn default() -> ConnectionPropertiesDef { + ConnectionPropertiesDef { + mechanism: SASLMechanismDef::Plain, + locale: "en_US".into(), + client_properties: FieldTable::default(), + max_executor_threads: 1, + } } - } } #[derive(Serialize, Deserialize, Default, Clone, Debug)] pub struct QueueDeclareOptionsDef { - pub passive: bool, - pub durable: bool, - pub exclusive: bool, - pub auto_delete: bool, - pub nowait: bool, + pub passive: bool, + pub durable: bool, + pub exclusive: bool, + pub auto_delete: bool, + pub nowait: bool, } #[derive(Serialize, Deserialize, Default, Clone, Debug)] pub struct BasicPublishOptionsDef { - pub mandatory: bool, - pub immediate: bool, + pub mandatory: bool, + pub immediate: bool, } #[derive(Serialize, Deserialize, Clone, Debug)] pub struct RabbitMQSinkConfig { - addr: String, - basic_publish_options: BasicPublishOptionsDef, - connection_properties: ConnectionPropertiesDef, - encoding: Encoding, - exchange: String, - field_table: FieldTable, - queue_name: String, - queue_declare_options: QueueDeclareOptionsDef, + addr: String, + basic_publish_options: BasicPublishOptionsDef, + connection_properties: ConnectionPropertiesDef, + encoding: Encoding, + exchange: String, + field_table: FieldTable, + queue_name: String, + queue_declare_options: QueueDeclareOptionsDef, } impl RabbitMQSinkConfig { - pub fn connection_properties(&self) -> ConnectionProperties { - ConnectionProperties { - mechanism: self.connection_properties.mechanism.to_sasl_mechanism(), - locale: self.connection_properties.locale.clone(), - client_properties: self.connection_properties.client_properties.clone(), - executor: None, - max_executor_threads: self.connection_properties.max_executor_threads, + pub fn connection_properties(&self) -> ConnectionProperties { + ConnectionProperties { + mechanism: self.connection_properties.mechanism.to_sasl_mechanism(), + locale: self.connection_properties.locale.clone(), + client_properties: self.connection_properties.client_properties.clone(), + executor: None, + max_executor_threads: self.connection_properties.max_executor_threads, + } } - } - - pub fn queue_declare_options(&self) -> QueueDeclareOptions { - QueueDeclareOptions { - passive: self.queue_declare_options.passive, - durable: self.queue_declare_options.durable, - exclusive: self.queue_declare_options.exclusive, - auto_delete: self.queue_declare_options.auto_delete, - nowait: self.queue_declare_options.nowait, + + pub fn queue_declare_options(&self) -> QueueDeclareOptions { + QueueDeclareOptions { + passive: self.queue_declare_options.passive, + durable: self.queue_declare_options.durable, + exclusive: self.queue_declare_options.exclusive, + auto_delete: self.queue_declare_options.auto_delete, + nowait: self.queue_declare_options.nowait, + } } - } - pub fn basic_publish_options(&self) -> BasicPublishOptions { - BasicPublishOptions { - immediate: self.basic_publish_options.mandatory, - mandatory: self.basic_publish_options.mandatory, + pub fn basic_publish_options(&self) -> BasicPublishOptions { + BasicPublishOptions { + immediate: self.basic_publish_options.mandatory, + mandatory: self.basic_publish_options.mandatory, + } } - } } #[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone)] #[serde(rename_all = "snake_case")] pub enum Encoding { - Text, - Json, + Text, + Json, } pub struct RabbitMQSink { - acker: Acker, - basic_publish_options: BasicPublishOptions, - channel: lapin_futures::Channel, - encoding: Encoding, - exchange: String, - in_flight: FuturesUnordered, usize>>, - seqno: usize, - queue_name: String, + acker: Acker, + basic_publish_options: BasicPublishOptions, + channel: lapin_futures::Channel, + encoding: Encoding, + exchange: String, + in_flight: FuturesUnordered, usize>>, + seqno: usize, + queue_name: String, } impl RabbitMQSink { - fn new(config: RabbitMQSinkConfig, acker: Acker) -> crate::Result { - let channel = Client::connect(&config.addr, config.connection_properties()) - .and_then(|client| client.create_channel()) - .wait()?; - channel - .queue_declare( - &config.queue_name, - config.queue_declare_options(), - config.field_table.clone(), - ) - .wait()?; - Ok(RabbitMQSink { - acker, - basic_publish_options: config.basic_publish_options(), - channel, - encoding: config.encoding, - exchange: config.exchange, - in_flight: FuturesUnordered::new(), - seqno: 0, - queue_name: config.queue_name, - }) - } + fn new(config: RabbitMQSinkConfig, acker: Acker) -> crate::Result { + let channel = Client::connect(&config.addr, config.connection_properties()) + .and_then(|client| client.create_channel()) + .wait()?; + channel + .queue_declare( + &config.queue_name, + config.queue_declare_options(), + config.field_table.clone(), + ) + .wait()?; + Ok(RabbitMQSink { + acker, + basic_publish_options: config.basic_publish_options(), + channel, + encoding: config.encoding, + exchange: config.exchange, + in_flight: FuturesUnordered::new(), + seqno: 0, + queue_name: config.queue_name, + }) + } } #[typetag::serde(name = "rabbitmq")] impl SinkConfig for RabbitMQSinkConfig { - fn build(&self, acker: Acker) -> crate::Result<(super::RouterSink, super::Healthcheck)> { - let sink = RabbitMQSink::new(self.clone(), acker)?; - let hc = healthcheck(self.clone()); - Ok((Box::new(sink), hc)) - } - - fn input_type(&self) -> DataType { - DataType::Log - } + fn build(&self, acker: Acker) -> crate::Result<(super::RouterSink, super::Healthcheck)> { + let sink = RabbitMQSink::new(self.clone(), acker)?; + let hc = healthcheck(self.clone()); + Ok((Box::new(sink), hc)) + } + + fn input_type(&self) -> DataType { + DataType::Log + } } impl Sink for RabbitMQSink { - type SinkItem = Event; - type SinkError = (); - - fn start_send(&mut self, item: Self::SinkItem) -> StartSend { - let payload = encode_event(&item, &self.encoding); - let future = self.channel.basic_publish( - &self.exchange, - &self.queue_name, - payload, - self.basic_publish_options.clone(), - BasicProperties::default(), - ); - self.in_flight.push(future.join(future::ok(self.seqno))); - self.seqno += 1; - Ok(AsyncSink::Ready) - } - - fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { - loop { - match self.in_flight.poll() { - // nothing ready yet - Ok(Async::NotReady) => return Ok(Async::NotReady), - - // nothing in flight - Ok(Async::Ready(None)) => return Ok(Async::Ready(())), - - // request finished, check for success - Ok(Async::Ready(Some(((), seqno)))) => { - self.acker.ack(seqno); - trace!("published message to rabbitmq"); - } + type SinkItem = Event; + type SinkError = (); + + fn start_send(&mut self, item: Self::SinkItem) -> StartSend { + let payload = encode_event(&item, &self.encoding); + let future = self.channel.basic_publish( + &self.exchange, + &self.queue_name, + payload, + self.basic_publish_options.clone(), + BasicProperties::default(), + ); + self.in_flight.push(future.join(future::ok(self.seqno))); + self.seqno += 1; + + Ok(AsyncSink::Ready) + } + + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + loop { + match self.in_flight.poll() { + // nothing ready yet + Ok(Async::NotReady) => return Ok(Async::NotReady), + + // nothing in flight + Ok(Async::Ready(None)) => return Ok(Async::Ready(())), + + // request finished, check for success + Ok(Async::Ready(Some(((), seqno)))) => { + self.acker.ack(seqno); + trace!("published message to rabbitmq"); + } - Err(e) => error!("publishing message failed: {}", e), - } + Err(e) => error!("publishing message failed: {}", e), + } + } } - } } fn healthcheck(config: RabbitMQSinkConfig) -> super::Healthcheck { - let check = poll_fn(move || { - tokio_threadpool::blocking(|| { - Client::connect(&config.addr, config.connection_properties()) - .map(|_| ()) - .map_err(|err| err.into()) + let check = poll_fn(move || { + tokio_threadpool::blocking(|| { + Client::connect(&config.addr, config.connection_properties()) + .map(|_| ()) + .map_err(|err| err.into()) + }) }) - }) - .map_err(|err| err.into()) - .and_then(|result| result.into_future()); + .map_err(|err| err.into()) + .and_then(|result| result.into_future()); - Box::new(check) + Box::new(check) } fn encode_event(event: &Event, encoding: &Encoding) -> Vec { - let payload = match encoding { - &Encoding::Json => serde_json::to_vec(&event.as_log().clone().unflatten()).unwrap(), - &Encoding::Text => event - .as_log() - .get(&event::MESSAGE) - .map(|v| v.as_bytes().to_vec()) - .unwrap_or(Vec::new()), - }; - - payload + let payload = match encoding { + &Encoding::Json => serde_json::to_vec(&event.as_log().clone().unflatten()).unwrap(), + &Encoding::Text => event + .as_log() + .get(&event::MESSAGE) + .map(|v| v.as_bytes().to_vec()) + .unwrap_or(Vec::new()), + }; + + payload } #[cfg(test)] mod tests { - use super::*; - use std::collections::HashMap; - - #[test] - fn rabbitmq_encode_event_text() { - let message = "hello world".to_string(); - let bytes = encode_event(&message.clone().into(), &Encoding::Text); - assert_eq!(&bytes[..], message.as_bytes()); - } - - #[test] - fn rabbitmq_encode_event_json() { - let message = "hello world".to_string(); - let event = Event::from(message.clone()); - let bytes = encode_event(&event, &Encoding::Json); - let map: HashMap = serde_json::from_slice(&bytes[..]).unwrap(); - assert_eq!(map[&event::MESSAGE.to_string()], message); - } + use super::*; + use std::collections::HashMap; + + #[test] + fn rabbitmq_encode_event_text() { + let message = "hello world".to_string(); + let bytes = encode_event(&message.clone().into(), &Encoding::Text); + assert_eq!(&bytes[..], message.as_bytes()); + } + + #[test] + fn rabbitmq_encode_event_json() { + let message = "hello world".to_string(); + let event = Event::from(message.clone()); + let bytes = encode_event(&event, &Encoding::Json); + let map: HashMap = serde_json::from_slice(&bytes[..]).unwrap(); + assert_eq!(map[&event::MESSAGE.to_string()], message); + } } #[cfg(feature = "rabbitmq-integration-tests")] #[cfg(test)] mod integration_test { - use super::*; - use crate::test_util::{block_on, random_lines_with_stream, random_string}; - use lapin_futures::options::BasicConsumeOptions; - use std::{collections::HashSet, iter::FromIterator}; - - #[test] - fn publish_messages() { - let queue_name = format!("test-{}", random_string(10)); - let addr = String::from("amqp://127.0.0.1:5672/%2f"); - let config = RabbitMQSinkConfig { - addr: addr.clone(), - basic_publish_options: BasicPublishOptionsDef::default(), - connection_properties: ConnectionPropertiesDef::default(), - encoding: Encoding::Text, - exchange: String::from(""), - field_table: FieldTable::default(), - queue_name: queue_name.clone(), - queue_declare_options: QueueDeclareOptionsDef::default(), - }; - // publish messages to test rabbit queue - let (acker, ack_counter) = Acker::new_for_testing(); - let rabbit = RabbitMQSink::new(config, acker).unwrap(); - let number_of_events = 1000; - let (input, events) = random_lines_with_stream(100, number_of_events); - let pump = rabbit.send_all(events); - block_on(pump).unwrap(); - let mut messages: HashSet = HashSet::from_iter(input); - - // create consumer to check the existence of the previously pushed messages - let channel = Client::connect(&addr, ConnectionProperties::default()) - .and_then(|client| client.create_channel()) - .wait() - .unwrap(); - let consumer_name = format!("consumer-{}", random_string(5)); - let consumer = channel - .queue_declare( - &queue_name, - QueueDeclareOptions::default(), - FieldTable::default(), - ) - .and_then(|queue| { - channel.basic_consume( - &queue, - &consumer_name, - BasicConsumeOptions::default(), - FieldTable::default(), - ) - }) - .wait() - .unwrap(); - // check that all messages exist in rabbitmq - let mut counter = 0; - for item in consumer.wait() { - match item { - Ok(message) => { - let string_message = String::from_utf8_lossy(&message.data); - messages.remove(&string_message[..]); - channel.basic_ack(message.delivery_tag, false); + use super::*; + use crate::test_util::{block_on, random_lines_with_stream, random_string}; + use lapin_futures::options::BasicConsumeOptions; + use std::{collections::HashSet, iter::FromIterator}; + + #[test] + fn publish_messages() { + let queue_name = format!("test-{}", random_string(10)); + let addr = String::from("amqp://127.0.0.1:5672/%2f"); + let config = RabbitMQSinkConfig { + addr: addr.clone(), + basic_publish_options: BasicPublishOptionsDef::default(), + connection_properties: ConnectionPropertiesDef::default(), + encoding: Encoding::Text, + exchange: String::from(""), + field_table: FieldTable::default(), + queue_name: queue_name.clone(), + queue_declare_options: QueueDeclareOptionsDef::default(), + }; + // publish messages to test rabbit queue + let (acker, ack_counter) = Acker::new_for_testing(); + let rabbit = RabbitMQSink::new(config, acker).unwrap(); + let number_of_events = 1000; + let (input, events) = random_lines_with_stream(100, number_of_events); + let pump = rabbit.send_all(events); + block_on(pump).unwrap(); + let mut messages: HashSet = HashSet::from_iter(input); + + // create consumer to check the existence of the previously pushed messages + let channel = Client::connect(&addr, ConnectionProperties::default()) + .and_then(|client| client.create_channel()) + .wait() + .unwrap(); + let consumer_name = format!("consumer-{}", random_string(5)); + let consumer = channel + .queue_declare( + &queue_name, + QueueDeclareOptions::default(), + FieldTable::default(), + ) + .and_then(|queue| { + channel.basic_consume( + &queue, + &consumer_name, + BasicConsumeOptions::default(), + FieldTable::default(), + ) + }) + .wait() + .unwrap(); + // check that all messages exist in rabbitmq + let mut counter = 0; + for item in consumer.wait() { + match item { + Ok(message) => { + let string_message = String::from_utf8_lossy(&message.data); + messages.remove(&string_message[..]); + channel.basic_ack(message.delivery_tag, false); + } + Err(e) => error!("failed to run rabbitmq test: {}", e), + } + counter += 1; + if counter == number_of_events { + break; + } } - Err(e) => error!("failed to run rabbitmq test: {}", e), - } - counter += 1; - if counter == number_of_events { - break; - } + assert_eq!(messages.len(), 0); + assert_eq!( + ack_counter.load(std::sync::atomic::Ordering::Relaxed), + number_of_events + ); } - assert_eq!(messages.len(), 0); - assert_eq!( - ack_counter.load(std::sync::atomic::Ordering::Relaxed), - number_of_events - ); - } } From 077f5914c784030004d7f2e2a5b0ee70f905e1cf Mon Sep 17 00:00:00 2001 From: AlyHKafoury Date: Wed, 30 Oct 2019 02:20:02 +0200 Subject: [PATCH 11/12] acking the correct number of message and confirming acks Signed-off-by: AlyHKafoury --- src/sinks/rabbitmq.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/sinks/rabbitmq.rs b/src/sinks/rabbitmq.rs index d110d6a0cb2c8..f706e84aa1f7f 100644 --- a/src/sinks/rabbitmq.rs +++ b/src/sinks/rabbitmq.rs @@ -16,6 +16,7 @@ use lapin_futures::{ BasicProperties, Client, ConfirmationFuture, ConnectionProperties, }; use serde::{Deserialize, Serialize}; +use std::collections::HashSet; #[derive(Serialize, Deserialize, Clone, Debug)] pub enum SASLMechanismDef { @@ -127,6 +128,7 @@ pub struct RabbitMQSink { in_flight: FuturesUnordered, usize>>, seqno: usize, queue_name: String, + pending_acks: HashSet, } impl RabbitMQSink { @@ -150,6 +152,7 @@ impl RabbitMQSink { in_flight: FuturesUnordered::new(), seqno: 0, queue_name: config.queue_name, + pending_acks: HashSet::new(), }) } } @@ -181,6 +184,7 @@ impl Sink for RabbitMQSink { BasicProperties::default(), ); self.in_flight.push(future.join(future::ok(self.seqno))); + self.pending_acks.insert(self.seqno); self.seqno += 1; Ok(AsyncSink::Ready) @@ -197,8 +201,12 @@ impl Sink for RabbitMQSink { // request finished, check for success Ok(Async::Ready(Some(((), seqno)))) => { - self.acker.ack(seqno); - trace!("published message to rabbitmq"); + if self.pending_acks.remove(&seqno) { + self.acker.ack(1); + trace!("published message to rabbitmq"); + } else { + error!("message already published"); + } } Err(e) => error!("publishing message failed: {}", e), From 2a6e0ee3228dc9dc84245c09f2252804d3e8861d Mon Sep 17 00:00:00 2001 From: AlyHKafoury Date: Sat, 2 Nov 2019 13:55:46 +0200 Subject: [PATCH 12/12] adding rabbitmq image to ci and docker-compose Signed-off-by: AlyHKafoury --- .circleci/config.yml | 2 +- Cargo.toml | 3 ++- docker-compose.yml | 4 ++++ 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 5eff1f905643d..606720d18a82e 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -113,6 +113,7 @@ jobs: - xpack.ssl.certificate=certs/localhost.crt - xpack.ssl.key=certs/localhost.key - image: yandex/clickhouse-server:19 + - image: rabbitmq:alpine steps: - checkout - setup_remote_docker: @@ -578,7 +579,6 @@ require-tests-checks-and-verifications: &require-tests-checks-and-verifications - verify-rpm-artifact-on-centos-7 - verify-systemd-on-debian - # # Workflows # diff --git a/Cargo.toml b/Cargo.toml index e04cda65945b3..f4598126cd030 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -158,7 +158,8 @@ docker = [ "kinesis-integration-tests", "s3-integration-tests", "splunk-integration-tests", - "docker-integration-tests" + "docker-integration-tests", + "rabbitmq-integration-tests" ] cloudwatch-logs-integration-tests = [] cloudwatch-metrics-integration-tests = [] diff --git a/docker-compose.yml b/docker-compose.yml index ce580fbf66e9b..091327071c07c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -74,3 +74,7 @@ services: image: yandex/clickhouse-server:19 ports: - "8123:8123" + rabbitmq: + image: rabbitmq:alpine + ports: + - "5672:5672"