diff --git a/.circleci/config.yml b/.circleci/config.yml index 5eff1f905643d..606720d18a82e 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -113,6 +113,7 @@ jobs: - xpack.ssl.certificate=certs/localhost.crt - xpack.ssl.key=certs/localhost.key - image: yandex/clickhouse-server:19 + - image: rabbitmq:alpine steps: - checkout - setup_remote_docker: @@ -578,7 +579,6 @@ require-tests-checks-and-verifications: &require-tests-checks-and-verifications - verify-rpm-artifact-on-centos-7 - verify-systemd-on-debian - # # Workflows # diff --git a/Cargo.lock b/Cargo.lock index f979586f9e11f..36a1252a0271d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -24,6 +24,60 @@ dependencies = [ "num-traits 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "amq-protocol" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "amq-protocol-codegen 3.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "amq-protocol-tcp 3.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "amq-protocol-types 3.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "amq-protocol-uri 3.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "cookie-factory 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "nom 5.0.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "amq-protocol-codegen" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "amq-protocol-types 3.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "handlebars 2.0.2 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.91 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.39 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "amq-protocol-tcp" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "amq-protocol-uri 3.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.6.19 (registry+https://github.com/rust-lang/crates.io-index)", + "tcp-stream 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "amq-protocol-types" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cookie-factory 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "nom 5.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.91 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.39 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "amq-protocol-uri" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "percent-encoding 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "url 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "ansi_term" version = "0.11.0" @@ -300,6 +354,11 @@ dependencies = [ "url 1.7.2 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "cookie-factory" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "cookie_store" version = "0.7.0" @@ -971,6 +1030,21 @@ dependencies = [ "tokio-io 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "handlebars" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "hashbrown 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "pest 2.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "pest_derive 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "quick-error 1.2.2 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.91 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.39 (registry+https://github.com/rust-lang/crates.io-index)", + "walkdir 2.2.7 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "hashbrown" version = "0.1.8" @@ -980,6 +1054,14 @@ dependencies = [ "scopeguard 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "hashbrown" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "serde 1.0.91 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "hdrhistogram" version = "6.1.1" @@ -1214,6 +1296,16 @@ dependencies = [ "unicode-normalization 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "idna" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", + "unicode-bidi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "unicode-normalization 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "indexmap" version = "1.0.2" @@ -1310,6 +1402,30 @@ dependencies = [ "winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "lapin" +version = "0.28.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "amq-protocol 3.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "amq-protocol-codegen 3.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-channel 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.6.19 (registry+https://github.com/rust-lang/crates.io-index)", + "parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.39 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "lapin-futures" +version = "0.28.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures 0.1.27 (registry+https://github.com/rust-lang/crates.io-index)", + "lapin 0.28.1 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "lazy_static" version = "0.2.11" @@ -1432,6 +1548,11 @@ dependencies = [ "cfg-if 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "maplit" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "matchers" version = "0.0.1" @@ -1892,6 +2013,45 @@ name = "percent-encoding" version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "pest" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "ucd-trie 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "pest_derive" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "pest 2.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "pest_generator 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "pest_generator" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "pest 2.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "pest_meta 2.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro2 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "pest_meta" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "maplit 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", + "pest 2.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "sha-1 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "petgraph" version = "0.4.13" @@ -3088,6 +3248,16 @@ dependencies = [ "xattr 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "tcp-stream" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cfg-if 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.6.19 (registry+https://github.com/rust-lang/crates.io-index)", + "native-tls 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "tempdir" version = "0.3.7" @@ -3745,6 +3915,11 @@ dependencies = [ "syn 0.15.34 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "ucd-trie" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "ucd-util" version = "0.1.3" @@ -3830,6 +4005,16 @@ dependencies = [ "percent-encoding 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "url" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "idna 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", + "percent-encoding 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "utf8-ranges" version = "1.0.2" @@ -3894,6 +4079,7 @@ dependencies = [ "indexmap 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", "jemallocator 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "journald 0.1.0", + "lapin-futures 0.28.1 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "leveldb 0.8.4 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.65 (registry+https://github.com/rust-lang/crates.io-index)", @@ -4102,6 +4288,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum adler32 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "7e522997b529f05601e05166c07ed17789691f562762c7f3b987263d2dedee5c" "checksum aho-corasick 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)" = "e6f484ae0c99fec2e858eb6134949117399f222608d84cadb3f58c1f97c2364c" "checksum alga 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)" = "2cc836ad7a40dc9d8049574e2a29979f5dc77deeea4d7ebcd29773452f0e9694" +"checksum amq-protocol 3.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "9555895e88d803ed36f2b2eb93aea0999fb1a2d14b6ecdead999516f321f8320" +"checksum amq-protocol-codegen 3.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c61982c3314daee58c7fc3e071406160b4fefd30159e1c6015d9f880316b52f7" +"checksum amq-protocol-tcp 3.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "11b5b4746400bd566541c1bf04bdd7295a534cab562da8a4946f2772a4d0dde8" +"checksum amq-protocol-types 3.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "dd44e5827bf094402bbe79cd4e496e7f18a8745e31e16ad7b176d2e2c8ef0485" +"checksum amq-protocol-uri 3.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d44f5e8628cb65410b02e68fdc1baff70a011d27091ef1ee5eb9faddff2e220d" "checksum ansi_term 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b" "checksum antidote 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "34fde25430d87a9388dadbe6e34d7f72a462c8b43ac8d309b42b0a8505d7e2a5" "checksum approx 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f0e60b75072ecd4168020818c0107f2857bb6c4e64252d8d3983f6263b40a5c3" @@ -4137,6 +4328,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f" "checksum constant_time_eq 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "8ff012e225ce166d4422e0e78419d901719760f62ae2b7969ca6b564d1b54a9e" "checksum cookie 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)" = "888604f00b3db336d2af898ec3c1d5d0ddf5e6d462220f2ededc33a87ac4bbd5" +"checksum cookie-factory 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5f2b81f8a9b8f5ffa2ed91a82c53392837eab151baf682acf8322c909b6d4fe9" "checksum cookie_store 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "46750b3f362965f197996c4448e4a0935e791bf7d6631bfce9ee0af3d24c919c" "checksum core-foundation 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)" = "25b9e03f145fd4f2bf705e07b900cd41fc636598fe5dc452fd0db1441c3f496d" "checksum core-foundation-sys 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e7ca8a5221364ef15ce201e8ed2f609fc312682a8f4e0e3d4aa5879764e0fa3b" @@ -4208,7 +4400,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum glob 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574" "checksum grok 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6e1f7ecdcd1bb90bea7aebeaea4d5d408cecd72c8616f0e58e6129adf4811220" "checksum h2 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)" = "876d91114d78abbde2e1910e3b2d9d0fd1d89b769e20816dfb68d77992cf4158" +"checksum handlebars 2.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "91ef1ac30f2eaaa2b835fce73c57091cb6b9fc62b7eef285efbf980b0f20001b" "checksum hashbrown 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "3bae29b6653b3412c2e71e9d486db9f9df5d701941d86683005efb9f2d28e3da" +"checksum hashbrown 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e1de41fb8dba9714efd92241565cdff73f78508c95697dd56787d3cba27e2353" "checksum hdrhistogram 6.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "9b4a1f8e87caaefdf781ca2c5bf6f2228fb88963d5010e8dc589bbdbaa4a423a" "checksum headers 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "dc6e2e51d356081258ef05ff4c648138b5d3fe64b7300aaad3b820554a2b7fb6" "checksum headers-core 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "51ae5b0b5417559ee1d2733b21d33b0868ae9e406bd32eb1a51d613f66ed472a" @@ -4229,6 +4423,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum hyper-tls 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "3a800d6aa50af4b5850b2b0f659625ce9504df908e9733b635720483be26174f" "checksum hyperlocal 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d063d6d5658623c6ef16f452e11437c0e7e23a6d327470573fe78892dafbc4fb" "checksum idna 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "38f09e0f0b1fb55fdee1f17470ad800da77af5186a1a76c026b679358b7e844e" +"checksum idna 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "02e2673c30ee86b5b96a9cb52ad15718aa1f966f5ab9ad54a8b95d5ca33120a9" "checksum indexmap 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7e81a7c05f79578dbc15793d8b619db9ba32b4577003ef3af1a91c416798c58d" "checksum inventory 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "21df85981fe094480bc2267723d3dc0fd1ae0d1f136affc659b7398be615d922" "checksum inventory-impl 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "8a877ae8bce77402d5e9ed870730939e097aad827b2a932b361958fa9d6e75aa" @@ -4239,6 +4434,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum jemalloc-sys 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7bef0d4ce37578dfd80b466e3d8324bd9de788e249f1accebb0c472ea4b52bdc" "checksum jemallocator 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c2b69163a3cf2d0fffcd4e1b57921bc6d8fb97ec27f2aeef00562abdaf4ffe2a" "checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" +"checksum lapin 0.28.1 (registry+https://github.com/rust-lang/crates.io-index)" = "29958a8b737705e04d7d526e6b6a5c9a7582472ef02f17aa2ba14068072b8b83" +"checksum lapin-futures 0.28.1 (registry+https://github.com/rust-lang/crates.io-index)" = "68f71b9c4d1f0ce2c9b39e852b60881974292fb5cdd440ab89bce7830aba1382" "checksum lazy_static 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "76f033c7ad61445c5b347c7382dd1237847eb1bce590fe50365dcb33d546be73" "checksum lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "bc5729f27f159ddd61f4df6228e827e86643d4d3e7c32183cb30a1c08f604a14" "checksum leveldb 0.8.4 (registry+https://github.com/rust-lang/crates.io-index)" = "8438a36a31c982ac399c4477d7e3c62cc7a6bf91bb6f42837b7e1033359fcbad" @@ -4254,6 +4451,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum lock_api 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "62ebf1391f6acad60e5c8b43706dde4582df75c06698ab44511d15016bc2442c" "checksum lock_api 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f8912e782533a93a167888781b836336a6ca5da6175c05944c86cf28c31104dc" "checksum log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)" = "14b6052be84e6b71ab17edffc2eeabf5c2c3ae1fdb464aae35ac50c67a44e1f7" +"checksum maplit 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d" "checksum matchers 0.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f099785f7595cc4b4553a174ce30dd7589ef93391ff414dbb67f62392b9e0ce1" "checksum matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08" "checksum matrixmultiply 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "dcfed72d871629daa12b25af198f110e8095d7650f5f4c61c5bac28364604f9b" @@ -4306,6 +4504,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum pdqselect 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "4ec91767ecc0a0bbe558ce8c9da33c068066c57ecc8bb8477ef8c1ad3ef77c27" "checksum percent-encoding 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "31010dd2e1ac33d5b46a5b413495239882813e0369f8ed8a5e266f173602f831" "checksum percent-encoding 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" +"checksum pest 2.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7e4fb201c5c22a55d8b24fef95f78be52738e5e1361129be1b5e862ecdb6894a" +"checksum pest_derive 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "833d1ae558dc601e9a60366421196a8d94bc0ac980476d0b67e1d0988d72b2d0" +"checksum pest_generator 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "7b9fcf299b5712d06ee128a556c94709aaa04512c4dffb8ead07c5c998447fc0" +"checksum pest_meta 2.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "df43fd99896fd72c485fe47542c7b500e4ac1e8700bf995544d1317a60ded547" "checksum petgraph 0.4.13 (registry+https://github.com/rust-lang/crates.io-index)" = "9c3659d1ee90221741f65dd128d9998311b0e40c5d3c23a62445938214abce4f" "checksum phf 0.7.24 (registry+https://github.com/rust-lang/crates.io-index)" = "b3da44b85f8e8dfaec21adae67f95d93244b2ecf6ad2a692320598dcc8e6dd18" "checksum phf_codegen 0.7.24 (registry+https://github.com/rust-lang/crates.io-index)" = "b03e85129e324ad4166b06b2c7491ae27fe3ec353af72e72cd1654c7225d517e" @@ -4431,6 +4633,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum synstructure 0.10.2 (registry+https://github.com/rust-lang/crates.io-index)" = "02353edf96d6e4dc81aea2d8490a7e9db177bf8acb0e951c24940bf866cb313f" "checksum syslog_rfc5424 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d1f5c427514a0cd2a4a7e28d307672e56ddd4af03c6685e8b8e7f148a8b77e7b" "checksum tar 0.4.26 (registry+https://github.com/rust-lang/crates.io-index)" = "b3196bfbffbba3e57481b6ea32249fbaf590396a52505a2615adbb79d9d826d3" +"checksum tcp-stream 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "73f7fc1356d9f5141f4a091ec2fddc152fd4f09c0cc9bddfa8146e0cff0cbd13" "checksum tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)" = "15f2b5fb00ccdf689e0149d1b1b3c03fead81c2b37735d812fa8bddbbf41b6d8" "checksum tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7a6e24d9338a0a5be79593e2fa15a648add6138caa803e2d5bc782c371732ca9" "checksum termcolor 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "4096add70612622289f2fdcdbd5086dc81c1e2675e6ae58d6c4f62a16c6d7f2f" @@ -4487,6 +4690,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum typenum 1.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "612d636f949607bdf9b123b4a6f6d966dedf3ff669f7f045890d3a4a73948169" "checksum typetag 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "7573a67ebbc8696e879d902067bc278671b7502abd8604b7336d87efd47e5858" "checksum typetag-impl 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ef49605e1070ec194bd9d2c360a975d47ba164460de183cbc53aa1be8c4db6dc" +"checksum ucd-trie 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "8f00ed7be0c1ff1e24f46c3d2af4859f7e863672ba3a6e92e7cff702bf9f06c2" "checksum ucd-util 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "535c204ee4d8434478593480b8f86ab45ec9aae0e83c568ca81abf0fd0e88f86" "checksum unicase 1.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7f4765f83163b74f957c797ad9253caf97f103fb064d3999aea9568d09fc8a33" "checksum unicase 2.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a84e5511b2a947f3ae965dcb29b13b7b1691b6e7332cf5dbc1744138d5acb7f6" @@ -4500,6 +4704,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum unreachable 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "382810877fe448991dfc7f0dd6e3ae5d58088fd0ea5e35189655f84e6814fa56" "checksum untrusted 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "55cd1f4b4e96b46aeb8d4855db4a7a9bd96eeeb5c6a1ab54593328761642ce2f" "checksum url 1.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "dd4e7c0d531266369519a4aa4f399d748bd37043b00bde1e4ff1f60a120b355a" +"checksum url 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "75b414f6c464c879d7f9babf951f23bc3743fb7313c081b2e6ca719067ea9d61" "checksum utf8-ranges 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "796f7e48bef87609f7ade7e06495a87d5cd06c7866e6a5cbfceffc558a243737" "checksum uuid 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)" = "e1436e58182935dcd9ce0add9ea0b558e8a87befe01c1a301e6020aeb0876363" "checksum uuid 0.7.4 (registry+https://github.com/rust-lang/crates.io-index)" = "90dbc611eb48397705a6b0f6e917da23ae517e4d127123d2cf7674206627d32a" diff --git a/Cargo.toml b/Cargo.toml index b96aee7b3fc63..f4598126cd030 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -127,6 +127,7 @@ base64 = "0.10.1" shiplift = { git = "https://github.com/LucioFranco/shiplift", branch = "timber" } owning_ref = "0.4.0" listenfd = "0.3.3" +lapin-futures = "0.28.0" [build-dependencies] prost-build = "0.4.0" @@ -157,7 +158,8 @@ docker = [ "kinesis-integration-tests", "s3-integration-tests", "splunk-integration-tests", - "docker-integration-tests" + "docker-integration-tests", + "rabbitmq-integration-tests" ] cloudwatch-logs-integration-tests = [] cloudwatch-metrics-integration-tests = [] @@ -168,6 +170,7 @@ kinesis-integration-tests = [] s3-integration-tests = [] splunk-integration-tests = [] docker-integration-tests = [] +rabbitmq-integration-tests = [] [[bench]] name = "bench" diff --git a/docker-compose.yml b/docker-compose.yml index ce580fbf66e9b..091327071c07c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -74,3 +74,7 @@ services: image: yandex/clickhouse-server:19 ports: - "8123:8123" + rabbitmq: + image: rabbitmq:alpine + ports: + - "5672:5672" diff --git a/src/sinks/mod.rs b/src/sinks/mod.rs index fdac93a9ddb15..6876e1cb4fd7e 100644 --- a/src/sinks/mod.rs +++ b/src/sinks/mod.rs @@ -15,6 +15,7 @@ pub mod http; #[cfg(feature = "rdkafka")] pub mod kafka; pub mod prometheus; +pub mod rabbitmq; pub mod splunk_hec; pub mod statsd; pub mod tcp; diff --git a/src/sinks/rabbitmq.rs b/src/sinks/rabbitmq.rs new file mode 100644 index 0000000000000..f706e84aa1f7f --- /dev/null +++ b/src/sinks/rabbitmq.rs @@ -0,0 +1,342 @@ +use crate::{ + buffers::Acker, + event::{self, Event}, + sinks::util::MetadataFuture, + topology::config::{DataType, SinkConfig}, +}; +use futures::{ + future::{self, poll_fn, IntoFuture}, + stream::FuturesUnordered, + Async, AsyncSink, Future, Poll, Sink, StartSend, Stream, +}; +use lapin_futures::{ + auth::SASLMechanism, + options::{BasicPublishOptions, QueueDeclareOptions}, + types::FieldTable, + BasicProperties, Client, ConfirmationFuture, ConnectionProperties, +}; +use serde::{Deserialize, Serialize}; +use std::collections::HashSet; + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub enum SASLMechanismDef { + AMQPlain, + External, + Plain, + RabbitCrDemo, +} + +impl SASLMechanismDef { + pub fn to_sasl_mechanism(&self) -> SASLMechanism { + match &self { + SASLMechanismDef::AMQPlain => SASLMechanism::AMQPlain, + SASLMechanismDef::External => SASLMechanism::External, + SASLMechanismDef::Plain => SASLMechanism::Plain, + SASLMechanismDef::RabbitCrDemo => SASLMechanism::RabbitCrDemo, + } + } +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct ConnectionPropertiesDef { + pub mechanism: SASLMechanismDef, + pub locale: String, + pub client_properties: FieldTable, + pub max_executor_threads: usize, +} + +impl Default for ConnectionPropertiesDef { + fn default() -> ConnectionPropertiesDef { + ConnectionPropertiesDef { + mechanism: SASLMechanismDef::Plain, + locale: "en_US".into(), + client_properties: FieldTable::default(), + max_executor_threads: 1, + } + } +} + +#[derive(Serialize, Deserialize, Default, Clone, Debug)] +pub struct QueueDeclareOptionsDef { + pub passive: bool, + pub durable: bool, + pub exclusive: bool, + pub auto_delete: bool, + pub nowait: bool, +} + +#[derive(Serialize, Deserialize, Default, Clone, Debug)] +pub struct BasicPublishOptionsDef { + pub mandatory: bool, + pub immediate: bool, +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct RabbitMQSinkConfig { + addr: String, + basic_publish_options: BasicPublishOptionsDef, + connection_properties: ConnectionPropertiesDef, + encoding: Encoding, + exchange: String, + field_table: FieldTable, + queue_name: String, + queue_declare_options: QueueDeclareOptionsDef, +} + +impl RabbitMQSinkConfig { + pub fn connection_properties(&self) -> ConnectionProperties { + ConnectionProperties { + mechanism: self.connection_properties.mechanism.to_sasl_mechanism(), + locale: self.connection_properties.locale.clone(), + client_properties: self.connection_properties.client_properties.clone(), + executor: None, + max_executor_threads: self.connection_properties.max_executor_threads, + } + } + + pub fn queue_declare_options(&self) -> QueueDeclareOptions { + QueueDeclareOptions { + passive: self.queue_declare_options.passive, + durable: self.queue_declare_options.durable, + exclusive: self.queue_declare_options.exclusive, + auto_delete: self.queue_declare_options.auto_delete, + nowait: self.queue_declare_options.nowait, + } + } + + pub fn basic_publish_options(&self) -> BasicPublishOptions { + BasicPublishOptions { + immediate: self.basic_publish_options.mandatory, + mandatory: self.basic_publish_options.mandatory, + } + } +} + +#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone)] +#[serde(rename_all = "snake_case")] +pub enum Encoding { + Text, + Json, +} + +pub struct RabbitMQSink { + acker: Acker, + basic_publish_options: BasicPublishOptions, + channel: lapin_futures::Channel, + encoding: Encoding, + exchange: String, + in_flight: FuturesUnordered, usize>>, + seqno: usize, + queue_name: String, + pending_acks: HashSet, +} + +impl RabbitMQSink { + fn new(config: RabbitMQSinkConfig, acker: Acker) -> crate::Result { + let channel = Client::connect(&config.addr, config.connection_properties()) + .and_then(|client| client.create_channel()) + .wait()?; + channel + .queue_declare( + &config.queue_name, + config.queue_declare_options(), + config.field_table.clone(), + ) + .wait()?; + Ok(RabbitMQSink { + acker, + basic_publish_options: config.basic_publish_options(), + channel, + encoding: config.encoding, + exchange: config.exchange, + in_flight: FuturesUnordered::new(), + seqno: 0, + queue_name: config.queue_name, + pending_acks: HashSet::new(), + }) + } +} + +#[typetag::serde(name = "rabbitmq")] +impl SinkConfig for RabbitMQSinkConfig { + fn build(&self, acker: Acker) -> crate::Result<(super::RouterSink, super::Healthcheck)> { + let sink = RabbitMQSink::new(self.clone(), acker)?; + let hc = healthcheck(self.clone()); + Ok((Box::new(sink), hc)) + } + + fn input_type(&self) -> DataType { + DataType::Log + } +} + +impl Sink for RabbitMQSink { + type SinkItem = Event; + type SinkError = (); + + fn start_send(&mut self, item: Self::SinkItem) -> StartSend { + let payload = encode_event(&item, &self.encoding); + let future = self.channel.basic_publish( + &self.exchange, + &self.queue_name, + payload, + self.basic_publish_options.clone(), + BasicProperties::default(), + ); + self.in_flight.push(future.join(future::ok(self.seqno))); + self.pending_acks.insert(self.seqno); + self.seqno += 1; + + Ok(AsyncSink::Ready) + } + + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + loop { + match self.in_flight.poll() { + // nothing ready yet + Ok(Async::NotReady) => return Ok(Async::NotReady), + + // nothing in flight + Ok(Async::Ready(None)) => return Ok(Async::Ready(())), + + // request finished, check for success + Ok(Async::Ready(Some(((), seqno)))) => { + if self.pending_acks.remove(&seqno) { + self.acker.ack(1); + trace!("published message to rabbitmq"); + } else { + error!("message already published"); + } + } + + Err(e) => error!("publishing message failed: {}", e), + } + } + } +} + +fn healthcheck(config: RabbitMQSinkConfig) -> super::Healthcheck { + let check = poll_fn(move || { + tokio_threadpool::blocking(|| { + Client::connect(&config.addr, config.connection_properties()) + .map(|_| ()) + .map_err(|err| err.into()) + }) + }) + .map_err(|err| err.into()) + .and_then(|result| result.into_future()); + + Box::new(check) +} + +fn encode_event(event: &Event, encoding: &Encoding) -> Vec { + let payload = match encoding { + &Encoding::Json => serde_json::to_vec(&event.as_log().clone().unflatten()).unwrap(), + &Encoding::Text => event + .as_log() + .get(&event::MESSAGE) + .map(|v| v.as_bytes().to_vec()) + .unwrap_or(Vec::new()), + }; + + payload +} + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::HashMap; + + #[test] + fn rabbitmq_encode_event_text() { + let message = "hello world".to_string(); + let bytes = encode_event(&message.clone().into(), &Encoding::Text); + assert_eq!(&bytes[..], message.as_bytes()); + } + + #[test] + fn rabbitmq_encode_event_json() { + let message = "hello world".to_string(); + let event = Event::from(message.clone()); + let bytes = encode_event(&event, &Encoding::Json); + let map: HashMap = serde_json::from_slice(&bytes[..]).unwrap(); + assert_eq!(map[&event::MESSAGE.to_string()], message); + } +} + +#[cfg(feature = "rabbitmq-integration-tests")] +#[cfg(test)] +mod integration_test { + use super::*; + use crate::test_util::{block_on, random_lines_with_stream, random_string}; + use lapin_futures::options::BasicConsumeOptions; + use std::{collections::HashSet, iter::FromIterator}; + + #[test] + fn publish_messages() { + let queue_name = format!("test-{}", random_string(10)); + let addr = String::from("amqp://127.0.0.1:5672/%2f"); + let config = RabbitMQSinkConfig { + addr: addr.clone(), + basic_publish_options: BasicPublishOptionsDef::default(), + connection_properties: ConnectionPropertiesDef::default(), + encoding: Encoding::Text, + exchange: String::from(""), + field_table: FieldTable::default(), + queue_name: queue_name.clone(), + queue_declare_options: QueueDeclareOptionsDef::default(), + }; + // publish messages to test rabbit queue + let (acker, ack_counter) = Acker::new_for_testing(); + let rabbit = RabbitMQSink::new(config, acker).unwrap(); + let number_of_events = 1000; + let (input, events) = random_lines_with_stream(100, number_of_events); + let pump = rabbit.send_all(events); + block_on(pump).unwrap(); + let mut messages: HashSet = HashSet::from_iter(input); + + // create consumer to check the existence of the previously pushed messages + let channel = Client::connect(&addr, ConnectionProperties::default()) + .and_then(|client| client.create_channel()) + .wait() + .unwrap(); + let consumer_name = format!("consumer-{}", random_string(5)); + let consumer = channel + .queue_declare( + &queue_name, + QueueDeclareOptions::default(), + FieldTable::default(), + ) + .and_then(|queue| { + channel.basic_consume( + &queue, + &consumer_name, + BasicConsumeOptions::default(), + FieldTable::default(), + ) + }) + .wait() + .unwrap(); + // check that all messages exist in rabbitmq + let mut counter = 0; + for item in consumer.wait() { + match item { + Ok(message) => { + let string_message = String::from_utf8_lossy(&message.data); + messages.remove(&string_message[..]); + channel.basic_ack(message.delivery_tag, false); + } + Err(e) => error!("failed to run rabbitmq test: {}", e), + } + counter += 1; + if counter == number_of_events { + break; + } + } + assert_eq!(messages.len(), 0); + assert_eq!( + ack_counter.load(std::sync::atomic::Ordering::Relaxed), + number_of_events + ); + } +}