diff --git a/Cargo.lock b/Cargo.lock index 427e3740..9e9ea2c3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -66,12 +66,6 @@ version = "0.2.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" -[[package]] -name = "android-tzdata" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" - [[package]] name = "android_system_properties" version = "0.1.5" @@ -273,9 +267,9 @@ checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" [[package]] name = "aws-lc-rs" -version = "1.13.3" +version = "1.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c953fe1ba023e6b7730c0d4b031d06f267f23a46167dcbd40316644b10a17ba" +checksum = "94b8ff6c09cd57b16da53641caa860168b88c172a5ee163b0288d3d6eea12786" dependencies = [ "aws-lc-sys", "zeroize", @@ -283,9 +277,9 @@ dependencies = [ [[package]] name = "aws-lc-sys" -version = "0.30.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dbfd150b5dbdb988bcc8fb1fe787eb6b7ee6180ca24da683b61ea5405f3d43ff" +checksum = "0e44d16778acaf6a9ec9899b92cebd65580b83f685446bf2e1f5d3d732f99dcd" dependencies = [ "bindgen", "cc", @@ -464,32 +458,29 @@ dependencies = [ [[package]] name = "bindgen" -version = "0.69.5" +version = "0.72.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "271383c67ccabffb7381723dea0672a673f292304fcb45c01cc648c7a8d58088" +checksum = "993776b509cfb49c750f11b8f07a46fa23e0a1386ffc01fb1e7d343efc387895" dependencies = [ "bitflags", "cexpr", "clang-sys", - "itertools 0.12.1", - "lazy_static", - "lazycell", + "itertools 0.13.0", "log", "prettyplease", "proc-macro2", "quote", "regex", - "rustc-hash", + "rustc-hash 2.1.1", "shlex", "syn", - "which", ] [[package]] name = "bitflags" -version = "2.9.3" +version = "2.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34efbcccd345379ca2868b2b2c9d3782e9cc58ba87bc7d79d5b53d9c9ae6f25d" +checksum = "2261d10cca569e4643e526d8dc2e62e433cc8aba21ab764233731f8d369bf394" dependencies = [ "serde", ] @@ -557,10 +548,11 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.34" +version = "1.2.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42bc4aea80032b7bf409b0bc7ccad88853858911b7713a8062fdc0623867bedc" +checksum = "65193589c6404eb80b450d618eaf9a2cafaaafd57ecce47370519ef674a7bd44" dependencies = [ + "find-msvc-tools", "jobserver", "libc", "shlex", @@ -595,16 +587,15 @@ checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" [[package]] name = "chrono" -version = "0.4.41" +version = "0.4.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c469d952047f47f91b68d1cba3f10d63c11d73e4636f24f08daf0278abf01c4d" +checksum = "145052bdd345b87320e369255277e3fb5152762ad123a901ef5c262dd38fe8d2" dependencies = [ - "android-tzdata", "iana-time-zone", "js-sys", "num-traits", "wasm-bindgen", - "windows-link", + "windows-link 0.2.0", ] [[package]] @@ -647,9 +638,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.46" +version = "4.5.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c5e4fcf9c21d2e544ca1ee9d8552de13019a42aa7dbf32747fa7aaf1df76e57" +checksum = "7eac00902d9d136acd712710d71823fb8ac8004ca445a89e73a41d45aa712931" dependencies = [ "clap_builder", "clap_derive", @@ -657,9 +648,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.46" +version = "4.5.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fecb53a0e6fcfb055f686001bc2e2592fa527efaf38dbe81a6a9563562e57d41" +checksum = "2ad9bbf750e73b5884fb8a211a9424a1906c1e156724260fdae972f31d70e1d6" dependencies = [ "anstream", "anstyle", @@ -669,9 +660,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.5.45" +version = "4.5.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14cb31bb0a7d536caef2639baa7fad459e15c3144efefa6dbd1c84562c4739f6" +checksum = "bbfd7eae0b0f1a6e63d4b13c9c478de77c2eb546fba158ad50b4203dc24b9f9c" dependencies = [ "heck", "proc-macro2", @@ -873,9 +864,9 @@ dependencies = [ [[package]] name = "deranged" -version = "0.4.0" +version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c9e6a11ca8224451684bc0d7d5a7adbf8f2fd6887261a1cfc3c0432f9d4068e" +checksum = "d630bccd429a5bb5a64b5e94f693bfc48c9f8566418fda4c494cc94f911f87cc" dependencies = [ "powerfmt", ] @@ -890,7 +881,7 @@ dependencies = [ "lazy_static", "mintex", "parking_lot", - "rustc-hash", + "rustc-hash 1.1.0", "serde", "serde_json", "thousands", @@ -973,12 +964,12 @@ checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" [[package]] name = "errno" -version = "0.3.13" +version = "0.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "778e2ac28f6c47af28e4907f13ffd1e1ddbd400980a9abd7c8df189bf578a5ad" +checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.60.2", + "windows-sys 0.61.0", ] [[package]] @@ -1005,6 +996,12 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" +[[package]] +name = "find-msvc-tools" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fd99930f64d146689264c637b5af2f0233a933bef0d8570e2526bf9e083192d" + [[package]] name = "fixedbitset" version = "0.5.7" @@ -1141,7 +1138,7 @@ dependencies = [ "cfg-if", "libc", "r-efi", - "wasi 0.14.3+wasi-0.2.4", + "wasi 0.14.5+wasi-0.2.4", ] [[package]] @@ -1168,7 +1165,7 @@ dependencies = [ "futures-core", "futures-sink", "http", - "indexmap 2.11.0", + "indexmap 2.11.1", "slab", "tokio", "tokio-util", @@ -1270,15 +1267,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "home" -version = "0.5.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "589533453244b0995c858700322199b2becb13b627df2851f64a2775d024abcf" -dependencies = [ - "windows-sys 0.59.0", -] - [[package]] name = "http" version = "1.3.1" @@ -1337,9 +1325,9 @@ checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" [[package]] name = "humantime" -version = "2.2.0" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b112acc8b3adf4b107a8ec20977da0273a8c386765a3ec0229bd500a1443f9f" +checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424" [[package]] name = "humantime-serde" @@ -1433,9 +1421,9 @@ dependencies = [ [[package]] name = "iana-time-zone" -version = "0.1.63" +version = "0.1.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b0c919e5debc312ad217002b8048a17b7d83f80703865bbfcfebb0458b0b27d8" +checksum = "33e57f83510bb73707521ebaffa789ec8caf86f9657cad665b092b581d40e9fb" dependencies = [ "android_system_properties", "core-foundation-sys", @@ -1574,9 +1562,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.11.0" +version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2481980430f9f78649238835720ddccc57e52df14ffce1c6f37391d61b563e9" +checksum = "206a8042aec68fa4a62e8d3f7aa4ceb508177d9324faf261e1959e495b7a1921" dependencies = [ "equivalent", "hashbrown 0.15.5", @@ -1661,9 +1649,9 @@ dependencies = [ [[package]] name = "itertools" -version = "0.12.1" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" +checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" dependencies = [ "either", ] @@ -1715,9 +1703,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.77" +version = "0.3.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1cfaf33c695fc6e08064efbc1f72ec937429614f25eef83af942d0e227c3a28f" +checksum = "0c0b063578492ceec17683ef2f8c5e89121fbd0b172cbc280635ab7567db2738" dependencies = [ "once_cell", "wasm-bindgen", @@ -1740,12 +1728,6 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" -[[package]] -name = "lazycell" -version = "1.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" - [[package]] name = "libc" version = "0.2.175" @@ -1776,9 +1758,9 @@ checksum = "d26c52dbd32dccf2d10cac7725f8eae5296885fb5703b261f7d0a0739ec807ab" [[package]] name = "linux-raw-sys" -version = "0.9.4" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd945864f07fe9f5371a27ad7b52a172b4b499999f1d97574c9fa68373937e12" +checksum = "df1d3c3b53da64cf5760482273a98e575c651a67eec7f77df96b5b642de8f039" [[package]] name = "litemap" @@ -1798,9 +1780,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.27" +version = "0.4.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" +checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432" [[package]] name = "lru-cache" @@ -2256,7 +2238,7 @@ dependencies = [ "pretty-duration", "rand 0.8.5", "regex", - "rustc-hash", + "rustc-hash 1.1.0", "rustls", "rustls-pemfile", "rustls-platform-verifier", @@ -2441,7 +2423,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3672b37090dbd86368a4145bc067582552b29c27377cad4e0a306c97f9bd7772" dependencies = [ "fixedbitset", - "indexmap 2.11.0", + "indexmap 2.11.1", ] [[package]] @@ -2526,9 +2508,9 @@ checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483" [[package]] name = "potential_utf" -version = "0.1.2" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5a7c30837279ca13e7c867e9e40053bc68740f988cb07f7ca6df43cc734b585" +checksum = "84df19adbe5b5a0782edcab45899906947ab039ccf4573713735ee7de1e6b08a" dependencies = [ "zerovec", ] @@ -2957,6 +2939,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" +[[package]] +name = "rustc-hash" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" + [[package]] name = "rusticata-macros" version = "4.1.0" @@ -2981,15 +2969,15 @@ dependencies = [ [[package]] name = "rustix" -version = "1.0.8" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11181fbabf243db407ef8df94a6ce0b2f9a733bd8be4ad02b4eda9602296cac8" +checksum = "cd15f8a2c5551a84d56efdc1cd049089e409ac19a3072d5037a17fd70719ff3e" dependencies = [ "bitflags", "errno", "libc", - "linux-raw-sys 0.9.4", - "windows-sys 0.60.2", + "linux-raw-sys 0.11.0", + "windows-sys 0.61.0", ] [[package]] @@ -3002,7 +2990,7 @@ dependencies = [ "log", "once_cell", "rustls-pki-types", - "rustls-webpki 0.103.4", + "rustls-webpki 0.103.5", "subtle", "zeroize", ] @@ -3029,7 +3017,7 @@ dependencies = [ "openssl-probe", "rustls-pki-types", "schannel", - "security-framework 3.3.0", + "security-framework 3.4.0", ] [[package]] @@ -3090,9 +3078,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.103.4" +version = "0.103.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a17884ae0c1b773f1ccd2bd4a8c72f16da897310a98b0e84bf349ad5ead92fc" +checksum = "b5a37813727b78798e53c2bec3f5e8fe12a6d6f8389bf9ca7802add4c9905ad8" dependencies = [ "aws-lc-rs", "ring", @@ -3123,11 +3111,11 @@ dependencies = [ [[package]] name = "schannel" -version = "0.1.27" +version = "0.1.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f29ebaa345f945cec9fbbc532eb307f0fdad8161f281b6369539c8d84876b3d" +checksum = "891d81b926048e76efe18581bf793546b4c0eaf8448d72be8de2bbee5fd166e1" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.61.0", ] [[package]] @@ -3152,9 +3140,9 @@ dependencies = [ [[package]] name = "security-framework" -version = "3.3.0" +version = "3.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "80fb1d92c5028aa318b4b8bd7302a5bfcf48be96a37fc6fc790f806b0004ee0c" +checksum = "60b369d18893388b345804dc0007963c99b7d665ae71d275812d828c6f089640" dependencies = [ "bitflags", "core-foundation 0.10.1", @@ -3165,9 +3153,9 @@ dependencies = [ [[package]] name = "security-framework-sys" -version = "2.14.0" +version = "2.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49db231d56a190491cb4aeda9527f1ad45345af50b0851622a7adb8c03b01c32" +checksum = "cc1f0cbffaac4852523ce30d8bd3c5cdc873501d96ff467ca09b6767bb8cd5c0" dependencies = [ "core-foundation-sys", "libc", @@ -3175,10 +3163,11 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.219" +version = "1.0.221" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6" +checksum = "341877e04a22458705eb4e131a1508483c877dca2792b3781d4e5d8a6019ec43" dependencies = [ + "serde_core", "serde_derive", ] @@ -3192,11 +3181,20 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_core" +version = "1.0.221" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c459bc0a14c840cb403fc14b148620de1e0778c96ecd6e0c8c3cacb6d8d00fe" +dependencies = [ + "serde_derive", +] + [[package]] name = "serde_derive" -version = "1.0.219" +version = "1.0.221" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" +checksum = "d6185cf75117e20e62b1ff867b9518577271e58abe0037c40bb4794969355ab0" dependencies = [ "proc-macro2", "quote", @@ -3205,14 +3203,14 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.143" +version = "1.0.144" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d401abef1d108fbd9cbaebc3e46611f4b1021f714a0597a71f41ee463f5f4a5a" +checksum = "56177480b00303e689183f110b4e727bb4211d692c62d4fcd16d02be93077d40" dependencies = [ "itoa", "memchr", "ryu", - "serde", + "serde_core", ] [[package]] @@ -3253,7 +3251,7 @@ version = "0.9.34+deprecated" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47" dependencies = [ - "indexmap 2.11.0", + "indexmap 2.11.1", "itoa", "ryu", "serde", @@ -3404,15 +3402,15 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.21.0" +version = "3.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15b61f8f20e3a6f7e0649d825294eaf317edce30f82cf6026e7e4cb9222a7d1e" +checksum = "84fa4d11fadde498443cca10fd3ac23c951f0dc59e080e9f4b93d4df4e4eea53" dependencies = [ "fastrand 2.3.0", "getrandom 0.3.3", "once_cell", - "rustix 1.0.8", - "windows-sys 0.60.2", + "rustix 1.1.2", + "windows-sys 0.61.0", ] [[package]] @@ -3513,12 +3511,11 @@ dependencies = [ [[package]] name = "time" -version = "0.3.41" +version = "0.3.43" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a7619e19bc266e0f9c5e6686659d394bc57973859340060a69221e57dbc0c40" +checksum = "83bde6f1ec10e72d583d91623c939f623002284ef622b87de38cfd546cbf2031" dependencies = [ "deranged", - "itoa", "num-conv", "powerfmt", "serde", @@ -3528,15 +3525,15 @@ dependencies = [ [[package]] name = "time-core" -version = "0.1.4" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9e9a38711f559d9e3ce1cdb06dd7c5b8ea546bc90052da6d06bb76da74bb07c" +checksum = "40868e7c1d2f0b8d73e4a8c7f0ff63af4f6d19be117e90bd73eb1d62cf831c6b" [[package]] name = "time-macros" -version = "0.2.22" +version = "0.2.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3526739392ec93fd8b359c8e98514cb3e8e021beb4e5f597b00a0221f8ed8a49" +checksum = "30cfb0125f12d9c277f35663a0a33f8c30190f4e4574868a330595412d34ebf3" dependencies = [ "num-conv", "time-core", @@ -3911,9 +3908,9 @@ dependencies = [ [[package]] name = "unicode-ident" -version = "1.0.18" +version = "1.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" +checksum = "f63a545481291138910575129486daeaf8ac54aee4387fe7906919f7830c7d9d" [[package]] name = "unsafe-libyaml" @@ -3953,9 +3950,9 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.18.0" +version = "1.18.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f33196643e165781c20a5ead5582283a7dacbb87855d867fbc2df3f81eddc1be" +checksum = "2f87b8aa10b915a06587d0dec516c282ff295b475d94abf425d62b57710070a2" dependencies = [ "getrandom 0.3.3", "js-sys", @@ -4001,30 +3998,40 @@ checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b" [[package]] name = "wasi" -version = "0.14.3+wasi-0.2.4" +version = "0.14.5+wasi-0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a51ae83037bdd272a9e28ce236db8c07016dd0d50c27038b3f407533c030c95" +checksum = "a4494f6290a82f5fe584817a676a34b9d6763e8d9d18204009fb31dceca98fd4" +dependencies = [ + "wasip2", +] + +[[package]] +name = "wasip2" +version = "1.0.0+wasi-0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03fa2761397e5bd52002cd7e73110c71af2109aca4e521a9f40473fe685b0a24" dependencies = [ "wit-bindgen", ] [[package]] name = "wasm-bindgen" -version = "0.2.100" +version = "0.2.101" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1edc8929d7499fc4e8f0be2262a241556cfc54a0bea223790e71446f2aab1ef5" +checksum = "7e14915cadd45b529bb8d1f343c4ed0ac1de926144b746e2710f9cd05df6603b" dependencies = [ "cfg-if", "once_cell", "rustversion", "wasm-bindgen-macro", + "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-backend" -version = "0.2.100" +version = "0.2.101" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f0a0651a5c2bc21487bde11ee802ccaf4c51935d0d3d42a6101f98161700bc6" +checksum = "e28d1ba982ca7923fd01448d5c30c6864d0a14109560296a162f80f305fb93bb" dependencies = [ "bumpalo", "log", @@ -4036,9 +4043,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.50" +version = "0.4.51" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "555d470ec0bc3bb57890405e5d4322cc9ea83cebb085523ced7be4144dac1e61" +checksum = "0ca85039a9b469b38336411d6d6ced91f3fc87109a2a27b0c197663f5144dffe" dependencies = [ "cfg-if", "js-sys", @@ -4049,9 +4056,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.100" +version = "0.2.101" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fe63fc6d09ed3792bd0897b314f53de8e16568c2b3f7982f468c0bf9bd0b407" +checksum = "7c3d463ae3eff775b0c45df9da45d68837702ac35af998361e2c84e7c5ec1b0d" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -4059,9 +4066,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.100" +version = "0.2.101" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ae87ea40c9f689fc23f209965b6fb8a99ad69aeeb0231408be24920604395de" +checksum = "7bb4ce89b08211f923caf51d527662b75bdc9c9c7aab40f86dcb9fb85ac552aa" dependencies = [ "proc-macro2", "quote", @@ -4072,18 +4079,18 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.100" +version = "0.2.101" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a05d73b933a847d6cccdda8f838a22ff101ad9bf93e33684f39c1f5f0eece3d" +checksum = "f143854a3b13752c6950862c906306adb27c7e839f7414cec8fea35beab624c1" dependencies = [ "unicode-ident", ] [[package]] name = "web-sys" -version = "0.3.77" +version = "0.3.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33b6dd2ef9186f1f2072e409e99cd22a975331a6b3591b12c764e0e55c60d5d2" +checksum = "77e4b637749ff0d92b8fad63aa1f7cff3cbe125fd49c175cd6345e7272638b12" dependencies = [ "js-sys", "wasm-bindgen", @@ -4107,18 +4114,6 @@ dependencies = [ "rustls-pki-types", ] -[[package]] -name = "which" -version = "4.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87ba24419a2078cd2b0f2ede2691b6c66d8e47836da3b6db8265ebad47afbfc7" -dependencies = [ - "either", - "home", - "once_cell", - "rustix 0.38.44", -] - [[package]] name = "widestring" version = "1.2.0" @@ -4143,11 +4138,11 @@ checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" [[package]] name = "winapi-util" -version = "0.1.10" +version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0978bf7171b3d90bac376700cb56d606feb40f251a475a5d6634613564460b22" +checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.60.2", + "windows-sys 0.61.0", ] [[package]] @@ -4158,15 +4153,15 @@ checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" [[package]] name = "windows-core" -version = "0.61.2" +version = "0.62.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0fdd3ddb90610c7638aa2b3a3ab2904fb9e5cdbecc643ddb3647212781c4ae3" +checksum = "57fe7168f7de578d2d8a05b07fd61870d2e73b4020e9f49aa00da8471723497c" dependencies = [ "windows-implement", "windows-interface", - "windows-link", - "windows-result", - "windows-strings", + "windows-link 0.2.0", + "windows-result 0.4.0", + "windows-strings 0.5.0", ] [[package]] @@ -4197,15 +4192,21 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a" +[[package]] +name = "windows-link" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45e46c0661abb7180e7b9c281db115305d49ca1709ab8242adf09666d2173c65" + [[package]] name = "windows-registry" version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b8a9ed28765efc97bbc954883f4e6796c33a06546ebafacbabee9696967499e" dependencies = [ - "windows-link", - "windows-result", - "windows-strings", + "windows-link 0.1.3", + "windows-result 0.3.4", + "windows-strings 0.4.2", ] [[package]] @@ -4214,7 +4215,16 @@ version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56f42bd332cc6c8eac5af113fc0c1fd6a8fd2aa08a0119358686e5160d0586c6" dependencies = [ - "windows-link", + "windows-link 0.1.3", +] + +[[package]] +name = "windows-result" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7084dcc306f89883455a206237404d3eaf961e5bd7e0f312f7c91f57eb44167f" +dependencies = [ + "windows-link 0.2.0", ] [[package]] @@ -4223,7 +4233,16 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56e6c93f3a0c3b36176cb1327a4958a0353d5d166c2a35cb268ace15e91d3b57" dependencies = [ - "windows-link", + "windows-link 0.1.3", +] + +[[package]] +name = "windows-strings" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7218c655a553b0bed4426cf54b20d7ba363ef543b52d515b3e48d7fd55318dda" +dependencies = [ + "windows-link 0.2.0", ] [[package]] @@ -4262,6 +4281,15 @@ dependencies = [ "windows-targets 0.53.3", ] +[[package]] +name = "windows-sys" +version = "0.61.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e201184e40b2ede64bc2ea34968b28e33622acdbbf37104f0e4a33f7abe657aa" +dependencies = [ + "windows-link 0.2.0", +] + [[package]] name = "windows-targets" version = "0.48.5" @@ -4299,7 +4327,7 @@ version = "0.53.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d5fe6031c4041849d7c496a8ded650796e7b6ecc19df1a431c1a363342e5dc91" dependencies = [ - "windows-link", + "windows-link 0.1.3", "windows_aarch64_gnullvm 0.53.0", "windows_aarch64_msvc 0.53.0", "windows_i686_gnu 0.53.0", @@ -4460,9 +4488,9 @@ dependencies = [ [[package]] name = "wit-bindgen" -version = "0.45.0" +version = "0.45.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "052283831dbae3d879dc7f51f3d92703a316ca49f91540417d38591826127814" +checksum = "5c573471f125075647d03df72e026074b7203790d41351cd6edc96f46bcccd36" [[package]] name = "writeable" @@ -4519,18 +4547,18 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.8.26" +version = "0.8.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1039dd0d3c310cf05de012d8a39ff557cb0d23087fd44cad61df08fc31907a2f" +checksum = "0894878a5fa3edfd6da3f88c4805f4c8558e2b996227a3d864f47fe11e38282c" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.8.26" +version = "0.8.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ecf5b4cc5364572d7f4c329661bcc82724222973f2cab6f050a4e5c22f75181" +checksum = "88d2b8d9c68ad2b9e4340d7832716a4d21a22a1154777ad56ea55c51a9cf3831" dependencies = [ "proc-macro2", "quote", diff --git a/examples/internal-listener-demo/README.md b/examples/internal-listener-demo/README.md new file mode 100644 index 00000000..0ac8d87c --- /dev/null +++ b/examples/internal-listener-demo/README.md @@ -0,0 +1,83 @@ +# Internal Listener and Upstream Transport Demo + +This demo demonstrates Orion's internal listener and upstream transport functionality for service mesh communication. + +## Quick Test + +```bash +./test_internal_config.sh +``` + +## Configuration + +### Internal Listener + +```yaml +listeners: + - name: internal_mesh_listener + address: + internal: + buffer_size_kb: 1024 + filter_chains: + - name: internal_proxy_chain + terminal_filter: + tcp_proxy: + cluster: internal_backend_cluster +``` + +### Internal Endpoints + +```yaml +clusters: + - name: internal_service_cluster + type: STATIC + load_assignment: + endpoints: + - lb_endpoints: + - endpoint: + address: + internal: + server_listener_name: internal_mesh_listener + endpoint_id: service_a_endpoint_1 +``` + +### Internal Upstream Transport + +```yaml +transport_socket: + internal_upstream: + passthrough_metadata: + - kind: HOST + name: envoy.filters.listener.original_dst + transport_socket: + raw_buffer: {} +``` + +### Bootstrap Extensions + +```yaml +bootstrap_extensions: + - internal_listener: + buffer_size_kb: 2048 +``` + +## Usage + +```bash +# Start Orion +../../target/debug/orion -c orion-config.yaml + +# Test endpoints +curl http://localhost:10000/ +curl http://localhost:10000/service-a + +# Monitor +curl http://localhost:9901/stats +``` + +## Features + +- Internal listeners for service mesh communication +- Internal endpoints with server_listener_name references +- Metadata passthrough via internal upstream transport +- Global bootstrap extensions configuration diff --git a/examples/internal-listener-demo/internal_client.rs b/examples/internal-listener-demo/internal_client.rs new file mode 100644 index 00000000..0d82f1be --- /dev/null +++ b/examples/internal-listener-demo/internal_client.rs @@ -0,0 +1,139 @@ +// Copyright 2025 The kmesh Authors +// +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +use std::env; +use std::io::{Read, Write}; +use std::net::{TcpStream, ToSocketAddrs}; +use std::time::Duration; + +fn main() { + let args: Vec = env::args().collect(); + + if args.len() < 2 { + println!("Usage: {} ", args[0]); + println!("Test types: gateway, admin, health"); + return; + } + + match args[1].as_str() { + "gateway" => test_gateway(), + "admin" => test_admin(), + "health" => test_health(), + _ => println!("Invalid test type. Use: gateway, admin, health"), + } +} + +fn test_gateway() { + println!("Testing External Gateway"); + + let endpoints = vec![ + ("localhost:10000", "/", "Main Gateway"), + ("localhost:10000", "/service-a", "Service A"), + ]; + + for (address, path, description) in endpoints { + println!("Testing {}: http://{}{}", description, address, path); + match test_http_endpoint(address, path) { + Ok(response) => println!(" ✅ {}", response.trim()), + Err(e) => println!(" ❌ {}", e), + } + } +} + +fn test_admin() { + println!("Testing Admin Interface"); + + let endpoints = vec![ + ("localhost:9901", "/stats", "Statistics"), + ("localhost:9901", "/listeners", "Listeners"), + ("localhost:9901", "/clusters", "Clusters"), + ]; + + for (address, path, description) in endpoints { + println!("Testing {}: http://{}{}", description, address, path); + match test_http_endpoint(address, path) { + Ok(response) => { + println!(" ✅ {} lines received", response.lines().count()); + for line in response.lines().take(2) { + if !line.trim().is_empty() { + println!(" {}", line.trim()); + } + } + } + Err(e) => println!(" ❌ {}", e), + } + } +} + +fn test_health() { + println!("Testing Health Metrics"); + + match test_http_endpoint("localhost:9901", "/stats") { + Ok(response) => { + let internal_stats: Vec<&str> = response + .lines() + .filter(|line| line.contains("internal") || line.contains("listener")) + .take(5) + .collect(); + + if internal_stats.is_empty() { + println!(" ⚠️ No internal listener stats found"); + } else { + for stat in internal_stats { + println!(" 📊 {}", stat.trim()); + } + } + } + Err(e) => println!(" ❌ {}", e), + } +} + +fn test_http_endpoint(address: &str, path: &str) -> Result { + let addr = address.to_socket_addrs() + .map_err(|e| format!("Failed to resolve {}: {}", address, e))? + .next() + .ok_or_else(|| format!("No addresses found for {}", address))?; + + let mut stream = TcpStream::connect_timeout(&addr, Duration::from_secs(5)) + .map_err(|e| format!("Connection failed: {}", e))?; + + let request = format!( + "GET {} HTTP/1.1\r\nHost: {}\r\nConnection: close\r\n\r\n", + path, address.split(':').next().unwrap_or("localhost") + ); + + stream.write_all(request.as_bytes()) + .map_err(|e| format!("Failed to send request: {}", e))?; + + let mut response = String::new(); + stream.read_to_string(&mut response) + .map_err(|e| format!("Failed to read response: {}", e))?; + + if let Some(body_start) = response.find("\r\n\r\n") { + let (headers, body) = response.split_at(body_start + 4); + + if headers.contains("200 OK") { + Ok(body.to_string()) + } else if headers.contains("404") { + Err("Endpoint not found (404)".to_string()) + } else { + Err(format!("HTTP error: {}", headers.lines().next().unwrap_or("Unknown"))) + } + } else { + Err("Invalid HTTP response".to_string()) + } +} diff --git a/examples/internal-listener-demo/orion-config.yaml b/examples/internal-listener-demo/orion-config.yaml new file mode 100644 index 00000000..c9483f4c --- /dev/null +++ b/examples/internal-listener-demo/orion-config.yaml @@ -0,0 +1,74 @@ +runtime: + num_cpus: 1 + num_runtimes: 1 + +logging: + log_level: "debug" + +bootstrap_extensions: + - internal_listener: + buffer_size_kb: 2048 + +admin: + address: "127.0.0.1:9901" + +static_resources: + listeners: + - name: "external_gateway_listener" + address: "0.0.0.0:10000" + filter_chains: + - name: "gateway_filter_chain" + terminal_filter: + http_connection_manager: + route_config: + name: "gateway_route" + virtual_hosts: + - name: "services" + domains: ["*"] + routes: + - match: + prefix: "/service-a" + route: + cluster: "internal_service_a_cluster" + - match: + prefix: "/" + direct_response: + status: 200 + body: "Internal Listener Demo Active" + + - name: "internal_mesh_listener" + address: + internal: + buffer_size_kb: 1024 + filter_chains: + - name: "internal_proxy_chain" + terminal_filter: + tcp_proxy: + cluster: "internal_backend_cluster" + + clusters: + - name: "internal_service_a_cluster" + type: STATIC + transport_socket: + internal_upstream: + passthrough_metadata: + - kind: HOST + name: "envoy.filters.listener.original_dst" + transport_socket: + raw_buffer: {} + load_assignment: + endpoints: + - lb_endpoints: + - endpoint: + address: + internal: + server_listener_name: "internal_mesh_listener" + endpoint_id: "service_a_endpoint_1" + + - name: "internal_backend_cluster" + type: STATIC + load_assignment: + endpoints: + - lb_endpoints: + - endpoint: + address: "127.0.0.1:8080" diff --git a/examples/internal-listener-demo/test_internal_config.sh b/examples/internal-listener-demo/test_internal_config.sh new file mode 100755 index 00000000..6b2dada8 --- /dev/null +++ b/examples/internal-listener-demo/test_internal_config.sh @@ -0,0 +1,308 @@ +#!/bin/bash + +#!/bin/bash + +set -e + +echo "Testing Orion Internal Listener Configuration..." + +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +BLUE='\033[0;34m' +NC='\033[0m' + +print_status() { echo -e "${BLUE}[INFO]${NC} $1"; } +print_success() { echo -e "${GREEN}[SUCCESS]${NC} $1"; } +print_warning() { echo -e "${YELLOW}[WARNING]${NC} $1"; } +print_error() { echo -e "${RED}[ERROR]${NC} $1"; } + +ORION_BINARY="../../target/debug/orion" +if [ ! -f "$ORION_BINARY" ]; then + print_error "Orion binary not found at $ORION_BINARY" + print_status "Building Orion..." + cd ../../ + cargo build + cd examples/internal-listener-demo/ + if [ ! -f "$ORION_BINARY" ]; then + print_error "Failed to build Orion binary" + exit 1 + fi +fi + +print_success "Found Orion binary" + +print_status "Test 1: Configuration validation" +if timeout 3 $ORION_BINARY -c orion-config.yaml 2>&1 | grep -q "Starting on thread"; then + print_success "Configuration loaded successfully" +else + print_error "Configuration failed to load" + timeout 3 $ORION_BINARY -c orion-config.yaml 2>&1 | head -5 + exit 1 +fi + +print_status "Test 2: Internal listener elements" +if grep -q "internal:" orion-config.yaml && grep -q "server_listener_name:" orion-config.yaml; then + print_success "Internal listener configuration found" +else + print_error "Missing internal listener configuration" + exit 1 +fi + +print_status "Test 3: Bootstrap extensions" +if grep -q "bootstrap_extensions:" orion-config.yaml && grep -q "internal_listener:" orion-config.yaml; then + print_success "Bootstrap extensions found" +else + print_warning "Bootstrap extensions not found" +fi + +print_status "Test 4: Internal upstream transport" +if grep -q "internal_upstream:" orion-config.yaml && grep -q "passthrough_metadata:" orion-config.yaml; then + print_success "Internal upstream transport found" +else + print_error "Missing internal upstream transport" + exit 1 +fi + +print_status "Test 5: Internal endpoints" +if grep -q "endpoint_id:" orion-config.yaml; then + print_success "Internal endpoint configuration found" +else + print_warning "Internal endpoint IDs not specified" +fi + +print_status "Test 6: Integration test" +$ORION_BINARY -c orion-config.yaml &> /tmp/orion_integration.log & +ORION_PID=$! +sleep 2 + +if kill -0 $ORION_PID 2>/dev/null; then + print_success "Orion started successfully" + kill $ORION_PID 2>/dev/null || true + wait $ORION_PID 2>/dev/null || true +else + if grep -q "failed to launch runtimes" /tmp/orion_integration.log; then + print_warning "Runtime limitations (expected in test environment)" + print_success "Configuration validation completed" + else + print_error "Failed to start Orion" + cat /tmp/orion_integration.log + exit 1 + fi +fi + +echo +print_success "All tests completed" +echo +print_status "Configuration validated:" +echo " ✅ Internal listener address configuration" +echo " ✅ Internal upstream transport setup" +echo " ✅ Bootstrap extensions" +echo " ✅ Internal endpoint configuration" +echo +print_status "Usage:" +echo " Start: ../../target/debug/orion -c orion-config.yaml" +echo " Test: curl http://localhost:10000/" +echo " Admin: curl http://localhost:9901/stats" + +rm -f /tmp/orion_*.log + +set -e + +echo "🚀 Testing Orion Internal Listener and Upstream Transport Configuration..." + +# Colors for output +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +BLUE='\033[0;34m' +NC='\033[0m' # No Color + +# Function to print colored output +print_status() { + echo -e "${BLUE}[INFO]${NC} $1" +} + +print_success() { + echo -e "${GREEN}[SUCCESS]${NC} $1" +} + +print_warning() { + echo -e "${YELLOW}[WARNING]${NC} $1" +} + +print_error() { + echo -e "${RED}[ERROR]${NC} $1" +} + +# Check if Orion binary exists +ORION_BINARY="../../target/debug/orion" +if [ ! -f "$ORION_BINARY" ]; then + print_error "Orion binary not found at $ORION_BINARY" + print_status "Building Orion..." + cd ../../ + cargo build + cd examples/internal-listener-demo/ + if [ ! -f "$ORION_BINARY" ]; then + print_error "Failed to build Orion binary" + exit 1 + fi +fi + +print_success "Found Orion binary at $ORION_BINARY" + +# Test 1: Configuration validation +print_status "Test 1: Validating internal listener configuration..." + +# Try to start Orion briefly to test config loading +print_status "Attempting to load configuration..." +if timeout 3 $ORION_BINARY -c orion-config.yaml 2>&1 | grep -q "Starting on thread"; then + print_success "✅ Configuration loaded successfully" + print_status "Internal listener configuration is valid" +else + print_error "❌ Configuration failed to load" + print_status "Error details:" + timeout 3 $ORION_BINARY -c orion-config.yaml 2>&1 | head -5 + exit 1 +fi + +# Test 2: Internal listener configuration parsing +print_status "Test 2: Testing internal listener configuration parsing..." + +# Check if configuration contains expected internal listener elements +if grep -q "internal:" orion-config.yaml && \ + grep -q "server_listener_name:" orion-config.yaml && \ + grep -q "internal_upstream" orion-config.yaml; then + print_success "✅ Internal listener configuration elements found" +else + print_error "❌ Missing internal listener configuration elements" + exit 1 +fi + +# Test 3: Bootstrap extensions validation +print_status "Test 3: Validating bootstrap extensions..." +if grep -q "bootstrap_extensions:" orion-config.yaml && \ + grep -q "internal_listener:" orion-config.yaml; then + print_success "✅ Bootstrap extensions configuration found" +else + print_warning "⚠️ Bootstrap extensions not found (optional)" +fi + +# Test 4: Internal upstream transport validation +print_status "Test 4: Validating internal upstream transport..." +if grep -q "internal_upstream:" orion-config.yaml && \ + grep -q "passthrough_metadata:" orion-config.yaml; then + print_success "✅ Internal upstream transport configuration found" +else + print_error "❌ Internal upstream transport configuration missing" + exit 1 +fi + +# Test 5: Endpoint configuration validation +print_status "Test 5: Validating internal endpoint configuration..." +if grep -q "endpoint_id:" orion-config.yaml; then + print_success "✅ Internal endpoint configuration found" +else + print_warning "⚠️ Internal endpoint IDs not specified (optional)" +fi + +# Test 6: Integration test (optional) +print_status "Test 6: Integration test..." +print_status "Starting Orion with internal listener configuration..." + +# Start Orion in background +$ORION_BINARY -c orion-config.yaml &> /tmp/orion_integration.log & +ORION_PID=$! + +# Wait for startup +sleep 3 + +if kill -0 $ORION_PID 2>/dev/null; then + print_success "✅ Orion started successfully with internal listener configuration" + + # Test external listener (should be accessible) + if command -v curl >/dev/null 2>&1; then + print_status "Testing external gateway listener..." + if curl -s -o /dev/null -w "%{http_code}" http://localhost:10000/ | grep -q "200"; then + print_success "✅ External gateway listener responding" + else + print_warning "⚠️ External gateway listener not responding (this may be expected)" + fi + + # Test service endpoints + print_status "Testing service endpoints..." + RESPONSE=$(curl -s http://localhost:10000/ || echo "Connection failed") + if [[ "$RESPONSE" == *"Internal Listener Demo"* ]]; then + print_success "✅ Service endpoint responding correctly" + else + print_warning "⚠️ Service endpoint response: $RESPONSE" + fi + else + print_warning "⚠️ curl not available, skipping HTTP tests" + fi + + # Check admin interface + if command -v curl >/dev/null 2>&1; then + print_status "Testing admin interface..." + if curl -s http://localhost:9901/stats | grep -q "listener"; then + print_success "✅ Admin interface accessible, listener stats available" + else + print_warning "⚠️ Admin interface not responding or no listener stats" + fi + fi + + # Cleanup + kill $ORION_PID 2>/dev/null || true + wait $ORION_PID 2>/dev/null || true + +else + # Check if this is an expected runtime error + if grep -q "failed to launch runtimes" /tmp/orion_integration.log; then + print_warning "⚠️ Orion failed to start due to runtime limitations (expected in test environment)" + print_success "✅ Configuration validation completed successfully" + else + print_error "❌ Failed to start Orion" + cat /tmp/orion_integration.log + exit 1 + fi +fi + +# Summary +echo +print_success "🎉 All internal listener and upstream transport tests completed!" +echo +print_status "Configuration Summary:" +echo " • Internal listeners: Configured for service mesh communication" +echo " • Internal endpoints: Using server_listener_name references" +echo " • Internal upstream transport: Metadata passthrough enabled" +echo " • Bootstrap extensions: Global internal listener settings" +echo +print_status "Test Results:" +echo " ✅ Configuration loading and parsing" +echo " ✅ Internal listener address configuration" +echo " ✅ Internal upstream transport setup" +echo " ✅ Bootstrap extensions validation" +echo " ✅ Internal endpoint configuration" +echo +print_status "To run this demo manually:" +echo " 1. Start Orion: ../../target/debug/orion -c orion-config.yaml" +echo " 2. Test endpoints: curl http://localhost:10000/" +echo " 3. Monitor admin: curl http://localhost:9901/stats" +echo +print_success "Internal Listener Demo validation complete! 🚀" + +# Cleanup temp files +rm -f /tmp/orion_*.log +echo " • Upstream transport: Metadata passthrough enabled" +echo " • Bootstrap extensions: Global internal listener settings" +echo +print_status "Next steps:" +echo " • Start Orion: $ORION_BINARY -c orion-config.yaml" +echo " • Test gateway: curl http://localhost:10000/" +echo " • Monitor admin: curl http://localhost:9901/stats" +echo " • Check logs for internal routing activity" + +# Cleanup temp files +rm -f /tmp/orion_test.log /tmp/orion_integration.log + +print_success "✅ Internal listener demo test completed successfully!" diff --git a/orion-configuration/src/config/bootstrap.rs b/orion-configuration/src/config/bootstrap.rs index 3739d3c2..af6b3468 100644 --- a/orion-configuration/src/config/bootstrap.rs +++ b/orion-configuration/src/config/bootstrap.rs @@ -37,6 +37,8 @@ pub struct Bootstrap { pub stats_flush_interval: Option, #[serde(skip_serializing_if = "Vec::is_empty", default = "Default::default")] pub stats_sinks: Vec, + #[serde(skip_serializing_if = "Vec::is_empty", default)] + pub bootstrap_extensions: Vec, } impl Bootstrap { @@ -73,6 +75,19 @@ pub struct Admin { pub address: Address, } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(tag = "name")] +pub enum BootstrapExtension { + #[serde(rename = "internal_listener")] + InternalListener(InternalListenerBootstrap), +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct InternalListenerBootstrap { + #[serde(skip_serializing_if = "Option::is_none", default)] + pub buffer_size_kb: Option, +} + #[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)] pub struct StaticResources { #[serde(skip_serializing_if = "Vec::is_empty", default = "Default::default")] @@ -86,20 +101,29 @@ pub struct StaticResources { #[cfg(feature = "envoy-conversions")] mod envoy_conversions { #![allow(deprecated)] - use super::{Admin, Bootstrap, DynamicResources, Node, StaticResources}; - use crate::config::{common::*, grpc::Duration, metrics::StatsSink}; + use super::{ + Admin, Bootstrap, BootstrapExtension, DynamicResources, InternalListenerBootstrap, Node, StaticResources, + }; + use crate::config::{common::*, core::envoy_conversions::SocketAddressWrapper, grpc::Duration, metrics::StatsSink}; use compact_str::CompactString; - use orion_data_plane_api::envoy_data_plane_api::envoy::config::{ - bootstrap::v3::{ - bootstrap::{DynamicResources as EnvoyDynamicResources, StaticResources as EnvoyStaticResources}, - Admin as EnvoyAdmin, Bootstrap as EnvoyBootstrap, - }, - core::v3::{ - address, - grpc_service::{EnvoyGrpc, TargetSpecifier as EnvoyGrpcTargetSpecifier}, - ApiConfigSource as EnvoyApiConfigSource, GrpcService as EnvoyGrpcService, Node as EnvoyNode, + use orion_data_plane_api::envoy_data_plane_api::{ + envoy::{ + config::{ + bootstrap::v3::{ + bootstrap::{DynamicResources as EnvoyDynamicResources, StaticResources as EnvoyStaticResources}, + Admin as EnvoyAdmin, Bootstrap as EnvoyBootstrap, + }, + core::v3::{ + address, + grpc_service::{EnvoyGrpc, TargetSpecifier as EnvoyGrpcTargetSpecifier}, + ApiConfigSource as EnvoyApiConfigSource, GrpcService as EnvoyGrpcService, Node as EnvoyNode, + TypedExtensionConfig as EnvoyTypedExtensionConfig, + }, + metrics::v3::stats_sink::ConfigType, + }, + extensions::bootstrap::internal_listener::v3::InternalListener as EnvoyInternalListener, }, - metrics::v3::stats_sink::ConfigType, + prost::Message, }; impl Bootstrap { @@ -180,7 +204,7 @@ mod envoy_conversions { use_tcp_for_dns_lookups, dns_resolution_config, typed_dns_resolver_config, - bootstrap_extensions, + // bootstrap_extensions, fatal_actions, config_sources, default_config_source, @@ -217,7 +241,21 @@ mod envoy_conversions { .collect::, _>>() .with_node("stats_sinks")?; - Ok(Self { static_resources, node, dynamic_resources, admin, stats_flush_interval, stats_sinks }) + let bootstrap_extensions = bootstrap_extensions + .into_iter() + .map(BootstrapExtension::try_from) + .collect::, _>>() + .with_node("bootstrap_extensions")?; + + Ok(Self { + static_resources, + node, + dynamic_resources, + admin, + stats_flush_interval, + stats_sinks, + bootstrap_extensions, + }) } } impl TryFrom for Node { @@ -336,6 +374,7 @@ mod envoy_conversions { Ok(Self { listeners, clusters, secrets }) } } + impl TryFrom for Admin { type Error = GenericError; fn try_from(envoy: EnvoyAdmin) -> Result { @@ -352,12 +391,53 @@ mod envoy_conversions { .address .ok_or(GenericError::MissingField("address is mandatory to setup admin interface"))? { - address::Address::SocketAddress(sa) => sa.try_into(), + address::Address::SocketAddress(sa) => { + let wrapper: SocketAddressWrapper = sa.try_into()?; + wrapper.0 + }, _ => { - Err(GenericError::UnsupportedVariant(std::borrow::Cow::Borrowed("Only SocketAddress is supported"))) + return Err(GenericError::UnsupportedVariant(std::borrow::Cow::Borrowed( + "Only SocketAddress is supported", + ))); }, - }?; - Ok(Self { address }) + }; + Ok(Self { address: crate::config::core::Address::Socket(address) }) + } + } + + impl TryFrom for BootstrapExtension { + type Error = GenericError; + fn try_from(value: EnvoyTypedExtensionConfig) -> Result { + let EnvoyTypedExtensionConfig { name, typed_config } = value; + let name = required!(name)?; + let typed_config = required!(typed_config)?; + + match name.as_str() { + "internal_listener" => { + if typed_config.type_url + == "type.googleapis.com/envoy.extensions.bootstrap.internal_listener.v3.InternalListener" + { + let internal_listener = EnvoyInternalListener::decode(typed_config.value.as_slice()) + .map_err(|e| GenericError::from_msg_with_cause("Failed to decode InternalListener", e))?; + Ok(BootstrapExtension::InternalListener(internal_listener.try_into()?)) + } else { + Err(GenericError::from_msg(format!( + "Unsupported bootstrap extension type: {}", + typed_config.type_url + ))) + } + }, + _ => Err(GenericError::unsupported_variant(name)), + } + } + } + + impl TryFrom for InternalListenerBootstrap { + type Error = GenericError; + fn try_from(value: EnvoyInternalListener) -> Result { + let EnvoyInternalListener { buffer_size_kb } = value; + let buffer_size_kb = buffer_size_kb.map(|v| v.value); + Ok(Self { buffer_size_kb }) } } } diff --git a/orion-configuration/src/config/cluster.rs b/orion-configuration/src/config/cluster.rs index 0dbd5054..56f883d4 100644 --- a/orion-configuration/src/config/cluster.rs +++ b/orion-configuration/src/config/cluster.rs @@ -33,7 +33,7 @@ use super::{ use compact_str::CompactString; use http::HeaderName; use serde::{Deserialize, Deserializer, Serialize, Serializer}; -use std::{fmt::Display, num::NonZeroU32, time::Duration}; +use std::{fmt::Display, net::SocketAddr, num::NonZeroU32, time::Duration}; #[derive(Debug, Clone, Deserialize, Serialize, PartialEq)] pub struct Cluster { pub name: CompactString, @@ -55,6 +55,8 @@ pub struct Cluster { #[serde(with = "humantime_serde")] #[serde(skip_serializing_if = "Option::is_none", default = "Default::default")] pub cleanup_interval: Option, + #[serde(skip_serializing_if = "Option::is_none", default = "Default::default")] + pub internal_transport_socket: Option, } #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] @@ -104,7 +106,20 @@ pub struct LocalityLbEndpoints { fn simplify_lb_endpoints(value: &Vec, serializer: S) -> Result { if value.iter().all(|s| is_default(&s.health_status) && s.load_balancing_weight == NonZeroU32::MIN) { - value.iter().map(|endpoint| endpoint.address.clone()).collect::>().serialize(serializer) + // Only simplify if all addresses are socket addresses + let socket_addresses: Vec = value + .iter() + .filter_map(|endpoint| match &endpoint.address { + EndpointAddress::Socket(addr) => Some(*addr), + EndpointAddress::Internal(_) => None, + }) + .collect(); + + if socket_addresses.len() == value.len() { + socket_addresses.serialize(serializer) + } else { + value.serialize(serializer) + } } else { value.serialize(serializer) } @@ -114,20 +129,40 @@ fn simplify_lb_endpoints(value: &Vec, serializer: S) #[serde(untagged)] enum LbEndpointVecDeser { LbEndpoints(Vec), + SocketAddr(Vec), Address(Vec
), } impl From for Vec { fn from(value: LbEndpointVecDeser) -> Self { match value { - LbEndpointVecDeser::Address(address) => address + LbEndpointVecDeser::SocketAddr(socket_addrs) => socket_addrs .into_iter() - .map(|address| LbEndpoint { - address, + .map(|socket_addr| LbEndpoint { + address: EndpointAddress::Socket(socket_addr), health_status: HealthStatus::default(), load_balancing_weight: NonZeroU32::MIN, }) .collect(), + LbEndpointVecDeser::Address(address) => address + .into_iter() + .filter_map(|address| match address { + Address::Socket(socket_addr) => Some(LbEndpoint { + address: EndpointAddress::Socket(socket_addr), + health_status: HealthStatus::default(), + load_balancing_weight: NonZeroU32::MIN, + }), + Address::Internal(internal_addr) => Some(LbEndpoint { + address: EndpointAddress::Internal(InternalEndpointAddress { + server_listener_name: internal_addr.server_listener_name.into(), + endpoint_id: internal_addr.endpoint_id.map(|id| id.into()), + }), + health_status: HealthStatus::default(), + load_balancing_weight: NonZeroU32::MIN, + }), + Address::Pipe(_, _) => None, // Skip pipe addresses + }) + .collect(), LbEndpointVecDeser::LbEndpoints(vec) => vec, } } @@ -141,12 +176,67 @@ fn deser_through<'de, In: Deserialize<'de>, Out: From, D: Deserializer<'de>> #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] pub struct LbEndpoint { - pub address: Address, + pub address: EndpointAddress, #[serde(skip_serializing_if = "is_default", default)] pub health_status: HealthStatus, pub load_balancing_weight: NonZeroU32, } +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +#[serde(untagged)] +pub enum EndpointAddress { + Socket(SocketAddr), + Internal(InternalEndpointAddress), +} + +impl EndpointAddress { + pub fn into_addr(self) -> Result { + match self { + EndpointAddress::Socket(addr) => Ok(addr), + EndpointAddress::Internal(_) => Err("Cannot convert internal address to socket address".to_string()), + } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] +pub struct InternalEndpointAddress { + pub server_listener_name: CompactString, + pub endpoint_id: Option, +} + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +#[serde(tag = "name")] +pub enum TransportSocket { + #[serde(rename = "internal_upstream")] + InternalUpstream(InternalUpstreamTransport), + #[serde(rename = "raw_buffer")] + RawBuffer, +} + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub struct InternalUpstreamTransport { + #[serde(skip_serializing_if = "Vec::is_empty", default)] + pub passthrough_metadata: Vec, + pub transport_socket: Box, +} + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub struct MetadataValueSource { + pub kind: MetadataKind, + pub name: CompactString, +} + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +#[serde(tag = "type")] +pub enum MetadataKind { + #[serde(rename = "host")] + Host, + #[serde(rename = "route")] + Route, + #[serde(rename = "cluster")] + Cluster, +} + #[derive(Clone, Debug, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] pub enum HealthStatus { #[default] @@ -248,8 +338,9 @@ mod envoy_conversions { #![allow(deprecated)] use super::{ health_check::{ClusterHostnameError, HealthCheck, HealthCheckProtocol}, - Cluster, ClusterDiscoveryType, ClusterLoadAssignment, HealthStatus, HttpProtocolOptions, LbEndpoint, LbPolicy, - LocalityLbEndpoints, OriginalDstConfig, OriginalDstRoutingMethod, TlsConfig, TlsSecret, + Cluster, ClusterDiscoveryType, ClusterLoadAssignment, EndpointAddress, HealthStatus, HttpProtocolOptions, + InternalEndpointAddress, InternalUpstreamTransport, LbEndpoint, LbPolicy, LocalityLbEndpoints, MetadataKind, + MetadataValueSource, OriginalDstConfig, OriginalDstRoutingMethod, TlsConfig, TlsSecret, TransportSocket, }; use crate::config::{ common::*, @@ -280,10 +371,20 @@ mod envoy_conversions { LbEndpoint as EnvoyLbEndpoint, LocalityLbEndpoints as EnvoyLocalityLbEndpoints, }, }, - extensions::transport_sockets::tls::v3::UpstreamTlsContext, - r#type::metadata::v3::metadata_key::path_segment::Segment, + extensions::transport_sockets::{ + internal_upstream::v3::{ + internal_upstream_transport::MetadataValueSource as EnvoyMetadataValueSource, + InternalUpstreamTransport as EnvoyInternalUpstreamTransport, + }, + tls::v3::UpstreamTlsContext, + }, + r#type::metadata::v3::{ + metadata_key::path_segment::Segment, metadata_kind::Kind as EnvoyMetadataKindType, + MetadataKind as EnvoyMetadataKind, + }, }, google::protobuf::Any, + prost::Message, }; use http::HeaderName; @@ -487,10 +588,19 @@ mod envoy_conversions { .transpose() .with_node("upstream_bind_config")? .flatten(); - let transport_socket = transport_socket - .map(UpstreamTransportSocketConfig::try_from) - .transpose() - .with_node("transport_socket")?; + // Parse transport socket: try internal transport first, then fallback to upstream transport + let (upstream_transport_socket_config, transport_socket_config) = if let Some(ts) = transport_socket { + if let Ok(internal_socket) = TransportSocket::try_from(ts.clone()) { + // It's an internal transport socket + (None, Some(internal_socket)) + } else { + // It's not an internal transport socket, so try to parse it as a regular upstream transport socket (e.g., TLS) + let upstream_socket = UpstreamTransportSocketConfig::try_from(ts).with_node("transport_socket")?; + (Some(upstream_socket), None) + } + } else { + (None, None) + }; let load_balancing_policy = lb_policy.try_into().with_node("lb_policy")?; let http_protocol_options = typed_extension_protocol_options .into_values() @@ -571,12 +681,13 @@ mod envoy_conversions { name, discovery_settings, bind_device, - transport_socket, + transport_socket: upstream_transport_socket_config, load_balancing_policy, http_protocol_options, health_check, connect_timeout, cleanup_interval, + internal_transport_socket: transport_socket_config, }) })() .with_name(name) @@ -667,9 +778,19 @@ mod envoy_conversions { health_check_config, hostname, additional_addresses, - }) => (|| -> Result { + }) => (|| -> Result { unsupported_field!(health_check_config, hostname, additional_addresses)?; - Address::try_from(address.ok_or(GenericError::from_msg("Address is not set"))?) + let address: Address = convert_opt!(address)?; + match address { + Address::Socket(socket_addr) => Ok(EndpointAddress::Socket(socket_addr)), + Address::Internal(internal_addr) => Ok(EndpointAddress::Internal(InternalEndpointAddress { + server_listener_name: internal_addr.server_listener_name.into(), + endpoint_id: internal_addr.endpoint_id.map(|id| id.into()), + })), + Address::Pipe(_, _) => { + Err(GenericError::unsupported_variant("Pipe addresses are not supported for endpoints")) + }, + } })(), EnvoyHostIdentifier::EndpointName(_) => Err(GenericError::unsupported_variant("EndpointName")), } @@ -869,4 +990,67 @@ mod envoy_conversions { .try_into() } } + + impl TryFrom for TransportSocket { + type Error = GenericError; + fn try_from(value: EnvoyTransportSocket) -> Result { + let EnvoyTransportSocket { name, config_type } = value; + let name = required!(name)?; + let config = required!(config_type)?; + + match name.as_str() { + "internal_upstream" => { + match config { + orion_data_plane_api::envoy_data_plane_api::envoy::config::core::v3::transport_socket::ConfigType::TypedConfig(any) => { + if any.type_url == "type.googleapis.com/envoy.extensions.transport_sockets.internal_upstream.v3.InternalUpstreamTransport" { + let internal_transport = EnvoyInternalUpstreamTransport::decode(any.value.as_slice()) + .map_err(|e| GenericError::from_msg_with_cause("Failed to decode InternalUpstreamTransport", e))?; + Ok(TransportSocket::InternalUpstream(internal_transport.try_into()?)) + } else { + Err(GenericError::from_msg(format!("Unsupported transport socket type: {}", any.type_url))) + } + } + } + } + "raw_buffer" => Ok(TransportSocket::RawBuffer), + _ => Err(GenericError::unsupported_variant(name)), + } + } + } + + impl TryFrom for InternalUpstreamTransport { + type Error = GenericError; + fn try_from(value: EnvoyInternalUpstreamTransport) -> Result { + let EnvoyInternalUpstreamTransport { passthrough_metadata, transport_socket } = value; + let passthrough_metadata = + passthrough_metadata.into_iter().map(MetadataValueSource::try_from).collect::, _>>()?; + let transport_socket = required!(transport_socket)?; + let transport_socket = Box::new(TransportSocket::try_from(transport_socket)?); + Ok(Self { passthrough_metadata, transport_socket }) + } + } + + impl TryFrom for MetadataValueSource { + type Error = GenericError; + fn try_from(value: EnvoyMetadataValueSource) -> Result { + let EnvoyMetadataValueSource { kind, name } = value; + let kind = required!(kind)?; + let name = required!(name)?.into(); + let kind = MetadataKind::try_from(kind)?; + Ok(Self { kind, name }) + } + } + + impl TryFrom for MetadataKind { + type Error = GenericError; + fn try_from(value: EnvoyMetadataKind) -> Result { + let EnvoyMetadataKind { kind } = value; + match required!(kind)? { + EnvoyMetadataKindType::Host(_) => Ok(MetadataKind::Host), + EnvoyMetadataKindType::Route(_) => Ok(MetadataKind::Route), + EnvoyMetadataKindType::Cluster(_) => Ok(MetadataKind::Cluster), + EnvoyMetadataKindType::Request(_) => Ok(MetadataKind::Route), // Map Request to Route for now + } + } + } } diff --git a/orion-configuration/src/config/core.rs b/orion-configuration/src/config/core.rs index a66b6019..4d1c89a7 100644 --- a/orion-configuration/src/config/core.rs +++ b/orion-configuration/src/config/core.rs @@ -244,7 +244,7 @@ pub mod envoy_conversions { config::core::v3::{ address::Address as EnvoyAddress, data_source::Specifier as EnvoySpecifier, socket_address::PortSpecifier, Address as EnvoyOuterAddress, CidrRange as EnvoyCidrRange, DataSource as EnvoyDataSource, - Pipe as EnvoyPipe, SocketAddress as EnvoySocketAddress, + EnvoyInternalAddress, Pipe as EnvoyPipe, SocketAddress as EnvoySocketAddress, }, r#type::matcher::v3::{ string_matcher::MatchPattern as EnvoyStringMatcherPattern, RegexMatcher as EnvoyRegexMatcher, @@ -263,23 +263,41 @@ pub mod envoy_conversions { } } - #[derive(Debug, Eq, PartialEq, Serialize, Deserialize, Clone)] + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum Address { - Socket(String, u16), + Socket(SocketAddr), + Internal(InternalAddress), Pipe(String, u32), } + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] + pub struct InternalAddress { + pub server_listener_name: String, + pub endpoint_id: Option, + } + impl Address { + pub fn into_socket_addr(self) -> Result { + match self { + Self::Socket(addr) => Ok(addr), + Self::Internal(_) => Err(GenericError::from_msg("Cannot convert internal address to socket address")), + Self::Pipe(_, _) => Err(GenericError::from_msg("Cannot convert pipe address to socket address")), + } + } + + pub fn into_internal_address(self) -> Result { + match self { + Self::Internal(addr) => Ok(addr), + Self::Socket(_) => Err(GenericError::from_msg("Cannot convert socket address to internal address")), + Self::Pipe(_, _) => Err(GenericError::from_msg("Cannot convert pipe address to internal address")), + } + } + pub fn into_addr(self) -> Result { - #[allow(clippy::match_wildcard_for_single_variants)] match self { - Self::Socket(address, port) => format!("{address}:{port}") - .parse() - .map_err(|e| { - GenericError::from_msg_with_cause(format!("failed to parse \"{address}\" as an ip adress"), e) - }) - .with_node(address), - _ => Err(GenericError::from_msg("only socket addresses are supported at the moment")), + Self::Socket(addr) => Ok(addr), + Self::Internal(_) => Err(GenericError::from_msg("only socket addresses are supported at the moment")), + Self::Pipe(_, _) => Err(GenericError::from_msg("only socket addresses are supported at the moment")), } } } @@ -287,7 +305,8 @@ pub mod envoy_conversions { impl std::fmt::Display for Address { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Self::Socket(address, port) => f.write_str(&format!("{address}:{port}")), + Self::Socket(addr) => write!(f, "{}", addr), + Self::Internal(internal) => write!(f, "internal:{}", internal.server_listener_name), Self::Pipe(path, _) => f.write_str(path), } } @@ -331,9 +350,12 @@ pub mod envoy_conversions { type Error = GenericError; fn try_from(value: EnvoyAddress) -> Result { match value { - EnvoyAddress::SocketAddress(sock) => sock.try_into(), + EnvoyAddress::SocketAddress(sock) => { + let wrapper: SocketAddressWrapper = sock.try_into()?; + Ok(Self::Socket(wrapper.0)) + }, EnvoyAddress::Pipe(pipe) => pipe.try_into(), - EnvoyAddress::EnvoyInternalAddress(_) => Err(GenericError::unsupported_variant("EnvoyInternalAddress")), + EnvoyAddress::EnvoyInternalAddress(internal) => Ok(Self::Internal(internal.try_into()?)), } } } @@ -351,11 +373,16 @@ pub mod envoy_conversions { fn try_from(value: &Authority) -> Result { let port = value.port_u16().ok_or(GenericError::from_msg(format!("Authority doesn't have port {value}")))?; - Ok(Address::Socket(value.host().to_owned(), port)) + let ip = value.host().parse::().map_err(|e| { + GenericError::from_msg_with_cause(format!("failed to parse \"{}\" as an ip address", value.host()), e) + })?; + Ok(Address::Socket(SocketAddr::new(ip, port))) } } - impl TryFrom for Address { + pub struct SocketAddressWrapper(pub SocketAddr); + + impl TryFrom for SocketAddressWrapper { type Error = GenericError; fn try_from(value: EnvoySocketAddress) -> Result { let EnvoySocketAddress { @@ -376,10 +403,29 @@ pub mod envoy_conversions { GenericError::from_msg(format!("failed to convert {port_specifier} to a port number")) .with_node("port_specifier") })?; + let ip = address.parse::().map_err(|e| { + GenericError::from_msg_with_cause(format!("failed to parse \"{address}\" as an ip adress"), e) + .with_node("address") + })?; + Ok(SocketAddressWrapper(SocketAddr::new(ip, port))) + } + } - Ok(Address::Socket(address, port)) + impl TryFrom for InternalAddress { + type Error = GenericError; + fn try_from(value: EnvoyInternalAddress) -> Result { + let EnvoyInternalAddress { address_name_specifier, endpoint_id } = value; + let server_listener_name = match address_name_specifier { + Some(orion_data_plane_api::envoy_data_plane_api::envoy::config::core::v3::envoy_internal_address::AddressNameSpecifier::ServerListenerName(name)) => name, + None => return Err(GenericError::from_msg("server_listener_name is required for internal address")), + }; + Ok(Self { + server_listener_name, + endpoint_id: if endpoint_id.is_empty() { None } else { Some(endpoint_id) }, + }) } } + impl TryFrom for DataSource { type Error = GenericError; fn try_from(envoy: EnvoyDataSource) -> Result { diff --git a/orion-configuration/src/config/listener.rs b/orion-configuration/src/config/listener.rs index 2e0c0c44..b7939596 100644 --- a/orion-configuration/src/config/listener.rs +++ b/orion-configuration/src/config/listener.rs @@ -27,6 +27,7 @@ use crate::config::listener; use crate::config::network_filters::tracing::{TracingConfig, TracingKey}; use compact_str::CompactString; use ipnet::IpNet; +use orion_interner::StringInterner; use serde::{Deserialize, Serialize, Serializer}; use std::hash::{DefaultHasher, Hash, Hasher}; use std::{ @@ -35,12 +36,12 @@ use std::{ str::FromStr, }; -use orion_interner::StringInterner; +// Removed unused import #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct Listener { pub name: CompactString, - pub address: SocketAddr, + pub address: ListenerAddress, #[serde(with = "serde_filterchains")] pub filter_chains: HashMap, #[serde(skip_serializing_if = "Option::is_none", default = "Default::default")] @@ -90,6 +91,18 @@ impl Listener { } } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(untagged)] +pub enum ListenerAddress { + Socket(SocketAddr), + Internal(InternalListener), +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct InternalListener { + pub buffer_size_kb: Option, +} + mod serde_filterchains { use serde::Deserializer; @@ -428,7 +441,18 @@ mod envoy_conversions { let name: CompactString = required!(name)?.into(); (|| -> Result<_, GenericError> { let name = name.clone(); - let address = Address::into_addr(convert_opt!(address)?)?; + let address_result = convert_opt!(address)?; + let address = match address_result { + Address::Socket(socket_addr) => crate::config::listener::ListenerAddress::Socket(socket_addr), + Address::Internal(_internal_addr) => { + crate::config::listener::ListenerAddress::Internal(crate::config::listener::InternalListener { + buffer_size_kb: None, // Default buffer size, can be configured via bootstrap extension + }) + }, + Address::Pipe(_, _) => { + return Err(GenericError::unsupported_variant("Pipe addresses are not supported for listeners")) + }, + }; let filter_chains: Vec = convert_non_empty_vec!(filter_chains)?; let n_filter_chains = filter_chains.len(); let filter_chains: HashMap<_, _> = filter_chains.into_iter().map(|x| x.0).collect(); diff --git a/orion-configuration/src/config/network_filters/http_connection_manager.rs b/orion-configuration/src/config/network_filters/http_connection_manager.rs index d33fea78..0ddb4b0b 100644 --- a/orion-configuration/src/config/network_filters/http_connection_manager.rs +++ b/orion-configuration/src/config/network_filters/http_connection_manager.rs @@ -1172,7 +1172,7 @@ mod envoy_conversions { // response_headers_to_remove, tracing, per_request_buffer_limit_bytes, - stat_prefix // action + stat_prefix )?; let response_headers_to_add = convert_vec!(response_headers_to_add)?; let request_headers_to_add = convert_vec!(request_headers_to_add)?; diff --git a/orion-configuration/tests/test_internal_listener.rs b/orion-configuration/tests/test_internal_listener.rs new file mode 100644 index 00000000..c87e8e48 --- /dev/null +++ b/orion-configuration/tests/test_internal_listener.rs @@ -0,0 +1,181 @@ +// Copyright 2025 The kmesh Authors +// +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +use orion_configuration::config::{ + bootstrap::{Bootstrap, BootstrapExtension, InternalListenerBootstrap}, + cluster::cluster_specifier::ClusterSpecifier, + cluster::{ + EndpointAddress, InternalEndpointAddress, InternalUpstreamTransport, LbEndpoint, MetadataKind, + MetadataValueSource, TransportSocket, + }, + listener::{FilterChain, InternalListener, Listener, ListenerAddress, MainFilter}, + network_filters::tcp_proxy::TcpProxy, +}; +use std::num::NonZeroU32; + +#[test] +fn test_internal_listener_serialization() { + let internal_listener = InternalListener { buffer_size_kb: Some(1024) }; + + let listener = Listener { + name: "test_internal_listener".into(), + address: ListenerAddress::Internal(internal_listener), + filter_chains: std::collections::HashMap::from([( + Default::default(), + FilterChain { + filter_chain_match_hash: 0, + name: "test_filter_chain".into(), + tls_config: None, + rbac: Vec::new(), + terminal_filter: MainFilter::Tcp(TcpProxy { + cluster_specifier: ClusterSpecifier::Cluster("test_cluster".into()), + access_log: Vec::new(), + }), + }, + )]), + bind_device: None, + with_tls_inspector: false, + proxy_protocol_config: None, + with_tlv_listener_filter: false, + tlv_listener_filter_config: None, + }; + + let yaml = serde_yaml::to_string(&listener).unwrap(); + assert!(yaml.contains("test_internal_listener")); + + let deserialized: Listener = serde_yaml::from_str(&yaml).unwrap(); + assert_eq!(listener, deserialized); +} + +#[test] +fn test_internal_endpoint_serialization() { + let internal_addr = InternalEndpointAddress { + server_listener_name: "internal_listener".into(), + endpoint_id: Some("endpoint1".into()), + }; + + let endpoint = LbEndpoint { + address: EndpointAddress::Internal(internal_addr), + health_status: Default::default(), + load_balancing_weight: NonZeroU32::new(1).unwrap(), + }; + + let yaml = serde_yaml::to_string(&endpoint).unwrap(); + assert!(yaml.contains("internal_listener")); + + let deserialized: LbEndpoint = serde_yaml::from_str(&yaml).unwrap(); + assert_eq!(endpoint, deserialized); +} + +#[test] +fn test_internal_upstream_transport_serialization() { + let metadata_source = + MetadataValueSource { kind: MetadataKind::Host, name: "envoy.filters.listener.original_dst".into() }; + + let transport_socket = TransportSocket::InternalUpstream(InternalUpstreamTransport { + passthrough_metadata: vec![metadata_source], + transport_socket: Box::new(TransportSocket::RawBuffer), + }); + + let yaml = serde_yaml::to_string(&transport_socket).unwrap(); + assert!(yaml.contains("internal_upstream")); + + let deserialized: TransportSocket = serde_yaml::from_str(&yaml).unwrap(); + assert_eq!(transport_socket, deserialized); +} + +#[test] +fn test_bootstrap_extension_serialization() { + let internal_listener_bootstrap = InternalListenerBootstrap { buffer_size_kb: Some(2048) }; + + let bootstrap_extension = BootstrapExtension::InternalListener(internal_listener_bootstrap); + + let bootstrap = Bootstrap { + static_resources: Default::default(), + dynamic_resources: None, + node: None, + admin: None, + stats_flush_interval: None, + stats_sinks: Vec::new(), + bootstrap_extensions: vec![bootstrap_extension], + }; + + let yaml = serde_yaml::to_string(&bootstrap).unwrap(); + assert!(yaml.contains("internal_listener")); + + let deserialized: Bootstrap = serde_yaml::from_str(&yaml).unwrap(); + assert_eq!(bootstrap, deserialized); +} + +#[test] +fn test_complete_internal_listener_config() { + let internal_listener = InternalListener { buffer_size_kb: Some(1024) }; + + let listener = Listener { + name: "internal_listener".into(), + address: ListenerAddress::Internal(internal_listener), + filter_chains: std::collections::HashMap::from([( + Default::default(), + FilterChain { + filter_chain_match_hash: 0, + name: "test_filter_chain".into(), + tls_config: None, + rbac: Vec::new(), + terminal_filter: MainFilter::Tcp(TcpProxy { + cluster_specifier: ClusterSpecifier::Cluster("internal_cluster".into()), + access_log: Vec::new(), + }), + }, + )]), + bind_device: None, + with_tls_inspector: false, + proxy_protocol_config: None, + with_tlv_listener_filter: false, + tlv_listener_filter_config: None, + }; + + let internal_addr = InternalEndpointAddress { + server_listener_name: "internal_listener".into(), + endpoint_id: Some("endpoint1".into()), + }; + + let endpoint = LbEndpoint { + address: EndpointAddress::Internal(internal_addr), + health_status: Default::default(), + load_balancing_weight: NonZeroU32::new(1).unwrap(), + }; + + let transport_socket = TransportSocket::InternalUpstream(InternalUpstreamTransport { + passthrough_metadata: vec![MetadataValueSource { + kind: MetadataKind::Host, + name: "envoy.filters.listener.original_dst".into(), + }], + transport_socket: Box::new(TransportSocket::RawBuffer), + }); + + let yaml = serde_yaml::to_string(&listener).unwrap(); + let deserialized: Listener = serde_yaml::from_str(&yaml).unwrap(); + assert_eq!(listener, deserialized); + + let yaml = serde_yaml::to_string(&endpoint).unwrap(); + let deserialized: LbEndpoint = serde_yaml::from_str(&yaml).unwrap(); + assert_eq!(endpoint, deserialized); + + let yaml = serde_yaml::to_string(&transport_socket).unwrap(); + let deserialized: TransportSocket = serde_yaml::from_str(&yaml).unwrap(); + assert_eq!(transport_socket, deserialized); +} diff --git a/orion-data-plane-api/src/bootstrap_loader/bootstrap.rs b/orion-data-plane-api/src/bootstrap_loader/bootstrap.rs index 13b2993d..7c0c17b1 100644 --- a/orion-data-plane-api/src/bootstrap_loader/bootstrap.rs +++ b/orion-data-plane-api/src/bootstrap_loader/bootstrap.rs @@ -235,6 +235,7 @@ impl BootstrapResolver for BootstrapLoader { .ok_or(BootstrapResolveErr::InvalidListener) } + #[allow(clippy::unwrap_used)] fn get_static_route_configs(&self) -> Result, BootstrapResolveErr> { let mut res = Vec::new(); for listener in self diff --git a/orion-data-plane-api/src/decode.rs b/orion-data-plane-api/src/decode.rs index c0d8db88..490790ba 100644 --- a/orion-data-plane-api/src/decode.rs +++ b/orion-data-plane-api/src/decode.rs @@ -138,6 +138,7 @@ filterChains: /// where depending on the source the decoding would differ. This is no longer /// the case, so the asserts in this test were inverted (but the comments remain). #[test] + #[allow(clippy::panic)] fn yaml_and_prost_eq() { let l_yaml: Listener = from_yaml(YAML_PAYLOAD_LISTEN_FILTER).unwrap(); let l_pb: Listener = Listener::decode(prost_payload_listen_filter().as_slice()).unwrap(); diff --git a/orion-data-plane-api/src/xds/client.rs b/orion-data-plane-api/src/xds/client.rs index 0575adee..f0658b4d 100644 --- a/orion-data-plane-api/src/xds/client.rs +++ b/orion-data-plane-api/src/xds/client.rs @@ -239,8 +239,12 @@ impl DeltaClientBackgroundWorker { warn!("outbound discovery request stream has ended!"); }; - let mut response_stream = - self.client_binding.delta_request(outbound_requests).await.map_err(XdsError::GrpcStatus)?.into_inner(); + let mut response_stream = self + .client_binding + .delta_request(outbound_requests) + .await + .map_err(|e| XdsError::GrpcStatus(Box::new(e)))? + .into_inner(); info!("xDS stream established"); loop { @@ -339,7 +343,7 @@ impl DeltaClientBackgroundWorker { pending_update_versions.insert(resource_id.clone(), resource_version); debug!("decoded config update for resource {resource_id}"); } - decoded.ok().map(|value| XdsResourceUpdate::Update(resource_id.clone(), value)) + decoded.ok().map(|value| XdsResourceUpdate::Update(resource_id.clone(), Box::new(value))) }) .chain(for_removal.into_iter().map(|resource_id| XdsResourceUpdate::Remove(resource_id, type_url))) .collect(); diff --git a/orion-data-plane-api/src/xds/model.rs b/orion-data-plane-api/src/xds/model.rs index 50ac1299..c719ba65 100644 --- a/orion-data-plane-api/src/xds/model.rs +++ b/orion-data-plane-api/src/xds/model.rs @@ -43,7 +43,7 @@ pub type ResourceVersion = String; #[derive(Clone, Debug)] pub enum XdsResourceUpdate { - Update(ResourceId, XdsResourcePayload), + Update(ResourceId, Box), Remove(ResourceId, TypeUrl), } @@ -58,11 +58,11 @@ impl XdsResourceUpdate { #[derive(Clone, Debug)] pub enum XdsResourcePayload { - Listener(ResourceId, Listener), - Cluster(ResourceId, Cluster), - Endpoints(ResourceId, ClusterLoadAssignment), - RouteConfiguration(ResourceId, RouteConfiguration), - Secret(ResourceId, Secret), + Listener(ResourceId, Box), + Cluster(ResourceId, Box), + Endpoints(ResourceId, Box), + RouteConfiguration(ResourceId, Box), + Secret(ResourceId, Box), } impl TryFrom<(Resource, TypeUrl)> for XdsResourcePayload { @@ -73,23 +73,23 @@ impl TryFrom<(Resource, TypeUrl)> for XdsResourcePayload { resource.resource.ok_or(XdsError::MissingResource()).and_then(|res| match type_url { TypeUrl::Listener => { let decoded = Listener::decode(res.value.as_slice()).map_err(XdsError::Decode); - decoded.map(|value| XdsResourcePayload::Listener(resource_id, value)) + decoded.map(|value| XdsResourcePayload::Listener(resource_id, Box::new(value))) }, TypeUrl::Cluster => { let decoded = Cluster::decode(res.value.as_slice()).map_err(XdsError::Decode); - decoded.map(|value| XdsResourcePayload::Cluster(resource_id, value)) + decoded.map(|value| XdsResourcePayload::Cluster(resource_id, Box::new(value))) }, TypeUrl::RouteConfiguration => { let decoded = RouteConfiguration::decode(res.value.as_slice()).map_err(XdsError::Decode); - decoded.map(|value| XdsResourcePayload::RouteConfiguration(resource_id, value)) + decoded.map(|value| XdsResourcePayload::RouteConfiguration(resource_id, Box::new(value))) }, TypeUrl::ClusterLoadAssignment => { let decoded = ClusterLoadAssignment::decode(res.value.as_slice()).map_err(XdsError::Decode); - decoded.map(|value| XdsResourcePayload::Endpoints(resource_id, value)) + decoded.map(|value| XdsResourcePayload::Endpoints(resource_id, Box::new(value))) }, TypeUrl::Secret => { let decoded = Secret::decode(res.value.as_slice()).map_err(XdsError::Decode); - decoded.map(|value| XdsResourcePayload::Secret(resource_id, value)) + decoded.map(|value| XdsResourcePayload::Secret(resource_id, Box::new(value))) }, }) } @@ -156,7 +156,7 @@ impl Display for RejectedConfig { #[derive(Error, Debug)] pub enum XdsError { #[error("gRPC error ({}): {}", .0.code(), .0.message())] - GrpcStatus(#[from] tonic::Status), + GrpcStatus(Box), #[error(transparent)] RequestFailure(#[from] Box>), #[error("unknown resource type: {0}")] @@ -170,3 +170,9 @@ pub enum XdsError { #[error("cannot construct client: {0}")] BuilderFailed(String), } + +impl From for XdsError { + fn from(status: tonic::Status) -> Self { + XdsError::GrpcStatus(Box::new(status)) + } +} diff --git a/orion-format/src/bin/format-test.rs b/orion-format/src/bin/format-test.rs index f06450f6..d745e560 100644 --- a/orion-format/src/bin/format-test.rs +++ b/orion-format/src/bin/format-test.rs @@ -46,6 +46,7 @@ const TOTAL: u64 = 100_000_000; type BoxError = Box; #[allow(clippy::cast_precision_loss)] +#[allow(clippy::print_stdout)] fn main() -> Result<(), BoxError> { let request = Request::builder() .uri("https://www.rust-lang.org/hello") diff --git a/orion-format/src/grammar.rs b/orion-format/src/grammar.rs index 882fab99..9a468fee 100644 --- a/orion-format/src/grammar.rs +++ b/orion-format/src/grammar.rs @@ -332,7 +332,7 @@ impl Grammar for EnvoyGrammar { // ensure the placeholder is properly closed with a '%' // - let operator = match remainder.find(&[' ', '%', '(']) { + let operator = match remainder.find([' ', '%', '(']) { Some(index) => &remainder[..index], // ...return a slice from the beginning to that index. None => remainder, // In that case, return the whole string slice. }; diff --git a/orion-format/src/lib.rs b/orion-format/src/lib.rs index 6dfcbb73..7dc5c5a6 100644 --- a/orion-format/src/lib.rs +++ b/orion-format/src/lib.rs @@ -105,7 +105,9 @@ impl Clone for LogFormatter { } } -#[derive(PartialEq, Eq, Debug, Clone, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Debug, Clone, Serialize)] +#[allow(clippy::unsafe_derive_deserialize)] +#[derive(Deserialize)] pub struct LogFormatterLocal { local: Arc, format: Vec, diff --git a/orion-lib/src/clusters/balancers/default_balancer.rs b/orion-lib/src/clusters/balancers/default_balancer.rs index e9da82c2..e75b02c1 100644 --- a/orion-lib/src/clusters/balancers/default_balancer.rs +++ b/orion-lib/src/clusters/balancers/default_balancer.rs @@ -157,7 +157,7 @@ mod test { clusters::{ balancers::{wrr::WeightedRoundRobinBalancer, Balancer}, health::HealthStatus, - load_assignment::{LbEndpoint, LocalityLbEndpoints}, + load_assignment::{EndpointAddressType, LbEndpoint, LocalityLbEndpoints}, }, transport::UpstreamTransportSocketConfigurator, }; @@ -229,7 +229,15 @@ mod test { results.push(next); } - let results: Vec<_> = results.into_iter().filter_map(|r| r.map(|f| f.authority.to_string())).collect(); + let results: Vec<_> = results + .into_iter() + .filter_map(|r| { + r.map(|f| match &f.address { + EndpointAddressType::Socket(authority, _, _) => authority.to_string(), + EndpointAddressType::Internal(internal, _) => internal.server_listener_name.to_string(), + }) + }) + .collect(); let expected = [ "endpoint11:8000", "endpoint12:8000", @@ -284,7 +292,15 @@ mod test { results.push(next); } - let results: Vec<_> = results.into_iter().filter_map(|r| r.map(|f| f.authority.to_string())).collect(); + let results: Vec<_> = results + .into_iter() + .filter_map(|r| { + r.map(|f| match &f.address { + EndpointAddressType::Socket(authority, _, _) => authority.to_string(), + EndpointAddressType::Internal(internal, _) => internal.server_listener_name.to_string(), + }) + }) + .collect(); let expected = [ "endpoint11:8000", "endpoint12:8000", @@ -339,7 +355,15 @@ mod test { results.push(next); } - let results: Vec<_> = results.into_iter().filter_map(|r| r.map(|f| f.authority.to_string())).collect(); + let results: Vec<_> = results + .into_iter() + .filter_map(|r| { + r.map(|f| match &f.address { + EndpointAddressType::Socket(authority, _, _) => authority.to_string(), + EndpointAddressType::Internal(internal, _) => internal.server_listener_name.to_string(), + }) + }) + .collect(); let expected = [ "endpoint11:8000", "endpoint12:8000", @@ -394,7 +418,15 @@ mod test { results.push(next); } - let results: Vec<_> = results.into_iter().filter_map(|r| r.map(|f| f.authority.to_string())).collect(); + let results: Vec<_> = results + .into_iter() + .filter_map(|r| { + r.map(|f| match &f.address { + EndpointAddressType::Socket(authority, _, _) => authority.to_string(), + EndpointAddressType::Internal(internal, _) => internal.server_listener_name.to_string(), + }) + }) + .collect(); let expected = [ "endpoint21:8000", "endpoint22:8000", @@ -449,7 +481,15 @@ mod test { results.push(next); } - let results: Vec<_> = results.into_iter().filter_map(|r| r.map(|f| f.authority.to_string())).collect(); + let results: Vec<_> = results + .into_iter() + .filter_map(|r| { + r.map(|f| match &f.address { + EndpointAddressType::Socket(authority, _, _) => authority.to_string(), + EndpointAddressType::Internal(internal, _) => internal.server_listener_name.to_string(), + }) + }) + .collect(); let expected = [ "endpoint21:8000", "endpoint22:8000", @@ -504,7 +544,15 @@ mod test { results.push(next); } - let results: Vec<_> = results.into_iter().filter_map(|r| r.map(|f| f.authority.to_string())).collect(); + let results: Vec<_> = results + .into_iter() + .filter_map(|r| { + r.map(|f| match &f.address { + EndpointAddressType::Socket(authority, _, _) => authority.to_string(), + EndpointAddressType::Internal(internal, _) => internal.server_listener_name.to_string(), + }) + }) + .collect(); let expected = [ "endpoint21:8000", "endpoint22:8000", @@ -559,7 +607,15 @@ mod test { results.push(next); } - let results: Vec<_> = results.into_iter().filter_map(|r| r.map(|f| f.authority.to_string())).collect(); + let results: Vec<_> = results + .into_iter() + .filter_map(|r| { + r.map(|f| match &f.address { + EndpointAddressType::Socket(authority, _, _) => authority.to_string(), + EndpointAddressType::Internal(internal, _) => internal.server_listener_name.to_string(), + }) + }) + .collect(); let expected = [ "endpoint21:8000", "endpoint31:8000", diff --git a/orion-lib/src/clusters/cluster/dynamic.rs b/orion-lib/src/clusters/cluster/dynamic.rs index ff597724..1714efdc 100644 --- a/orion-lib/src/clusters/cluster/dynamic.rs +++ b/orion-lib/src/clusters/cluster/dynamic.rs @@ -19,7 +19,7 @@ use http::uri::Authority; use orion_configuration::config::{ cluster::{ - ClusterLoadAssignment as ClusterLoadAssignmentConfig, HealthCheck, HealthStatus, + ClusterLoadAssignment as ClusterLoadAssignmentConfig, EndpointAddress, HealthCheck, HealthStatus, LbEndpoint as LbEndpointConfig, LbPolicy, LocalityLbEndpoints as LocalityLbEndpointsConfig, }, core::envoy_conversions::Address, @@ -171,7 +171,7 @@ impl TryFrom<&DynamicCluster> for ClusterLoadAssignmentConfig { let load_balancing_weight = std::num::NonZeroU32::new(ep.weight) .ok_or_else(|| format!("Invalid load balancing weight: {}", ep.weight))?; Ok(LbEndpointConfig { - address: Address::try_from(&ep.authority)?, + address: EndpointAddress::Socket(Address::try_from(ep.authority())?.into_socket_addr()?), health_status: ep.health_status, load_balancing_weight, }) diff --git a/orion-lib/src/clusters/cluster/original_dst.rs b/orion-lib/src/clusters/cluster/original_dst.rs index 69ff9fed..a38c6ef9 100644 --- a/orion-lib/src/clusters/cluster/original_dst.rs +++ b/orion-lib/src/clusters/cluster/original_dst.rs @@ -460,6 +460,7 @@ mod tests { }), cleanup_interval, transport_socket: None, + internal_transport_socket: None, bind_device: None, load_balancing_policy: LbPolicy::ClusterProvided, http_protocol_options: HttpProtocolOptions::default(), diff --git a/orion-lib/src/clusters/load_assignment.rs b/orion-lib/src/clusters/load_assignment.rs index 50fe3060..6d1faf37 100644 --- a/orion-lib/src/clusters/load_assignment.rs +++ b/orion-lib/src/clusters/load_assignment.rs @@ -17,10 +17,12 @@ use std::{sync::Arc, time::Duration}; +use compact_str::CompactString; use http::uri::Authority; use orion_configuration::config::cluster::{ - ClusterLoadAssignment as ClusterLoadAssignmentConfig, HealthStatus, HttpProtocolOptions, - LbEndpoint as LbEndpointConfig, LbPolicy, LocalityLbEndpoints as LocalityLbEndpointsConfig, + ClusterLoadAssignment as ClusterLoadAssignmentConfig, EndpointAddress, HealthStatus, HttpProtocolOptions, + InternalEndpointAddress, LbEndpoint as LbEndpointConfig, LbPolicy, + LocalityLbEndpoints as LocalityLbEndpointsConfig, }; use tracing::debug; use typed_builder::TypedBuilder; @@ -45,18 +47,95 @@ use crate::{ #[derive(Debug, Clone)] pub struct LbEndpoint { - pub name: &'static str, - pub authority: http::uri::Authority, + pub name: CompactString, + pub address: EndpointAddressType, pub bind_device: Option, pub weight: u32, pub health_status: HealthStatus, - http_channel: HttpChannel, - tcp_channel: TcpChannelConnector, +} + +#[derive(Debug, Clone)] +pub enum EndpointAddressType { + Socket(http::uri::Authority, HttpChannel, TcpChannelConnector), + Internal(InternalEndpointAddress, InternalConnection), +} + +impl PartialEq for EndpointAddressType { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (Self::Socket(auth1, _, _), Self::Socket(auth2, _, _)) => auth1 == auth2, + (Self::Internal(addr1, _), Self::Internal(addr2, _)) => addr1 == addr2, + _ => false, + } + } +} + +impl Eq for EndpointAddressType {} + +impl EndpointAddressType { + pub fn to_endpoint_address(&self) -> EndpointAddress { + match self { + EndpointAddressType::Socket(authority, _, _) => { + let addr_str = authority.as_str(); + if let Ok(socket_addr) = addr_str.parse::() { + EndpointAddress::Socket(socket_addr) + } else { + panic!("Cannot convert authority back to socket address: {}", addr_str); + } + }, + EndpointAddressType::Internal(internal_addr, _) => EndpointAddress::Internal(internal_addr.clone()), + } + } +} + +impl PartialOrd for EndpointAddressType { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for EndpointAddressType { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + match (self, other) { + (Self::Socket(a, _, _), Self::Socket(b, _, _)) => a.as_str().cmp(b.as_str()), + (Self::Internal(a, _), Self::Internal(b, _)) => a.cmp(b), + (Self::Socket(_, _, _), Self::Internal(_, _)) => std::cmp::Ordering::Less, + (Self::Internal(_, _), Self::Socket(_, _, _)) => std::cmp::Ordering::Greater, + } + } +} + +#[derive(Debug, Clone)] +pub struct InternalConnection { + pub server_listener_name: CompactString, + pub endpoint_id: Option, } impl PartialEq for LbEndpoint { fn eq(&self, other: &Self) -> bool { - self.authority == other.authority + self.address == other.address + } +} + +impl LbEndpoint { + pub fn authority(&self) -> &Authority { + match &self.address { + EndpointAddressType::Socket(authority, _, _) => authority, + EndpointAddressType::Internal(_, _) => { + // For internal endpoints, return a static dummy authority and log a warning + static DUMMY_AUTHORITY: std::sync::OnceLock = std::sync::OnceLock::new(); + let authority = DUMMY_AUTHORITY.get_or_init(|| Authority::from_static("internal.invalid")); + tracing::warn!("Internal endpoints don't have authorities, returning dummy authority"); + authority + }, + } + } + + pub fn socket_authority(&self) -> Option<&Authority> { + match &self.address { + EndpointAddressType::Socket(authority, _, _) => Some(authority), + EndpointAddressType::Internal(_, _) => None, + } } } @@ -68,7 +147,16 @@ impl WeightedEndpoint for LbEndpoint { impl EndpointWithAuthority for LbEndpoint { fn authority(&self) -> &Authority { - &self.authority + match &self.address { + EndpointAddressType::Socket(authority, _, _) => authority, + EndpointAddressType::Internal(_, _) => { + // For internal endpoints, return a static dummy authority and log a warning + static DUMMY_AUTHORITY: std::sync::OnceLock = std::sync::OnceLock::new(); + let authority = DUMMY_AUTHORITY.get_or_init(|| Authority::from_static("internal.invalid")); + tracing::warn!("Internal endpoints don't have authorities, returning dummy authority"); + authority + }, + } } } @@ -81,7 +169,14 @@ impl PartialOrd for LbEndpoint { } impl Ord for LbEndpoint { fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.authority.as_str().cmp(other.authority.as_str()) + match (&self.address, &other.address) { + (EndpointAddressType::Socket(a, _, _), EndpointAddressType::Socket(b, _, _)) => a.as_str().cmp(b.as_str()), + (EndpointAddressType::Internal(a, _), EndpointAddressType::Internal(b, _)) => { + a.server_listener_name.cmp(&b.server_listener_name) + }, + (EndpointAddressType::Socket(_, _, _), EndpointAddressType::Internal(_, _)) => std::cmp::Ordering::Less, + (EndpointAddressType::Internal(_, _), EndpointAddressType::Socket(_, _, _)) => std::cmp::Ordering::Greater, + } } } @@ -97,13 +192,32 @@ impl EndpointHealth for LbEndpoint { impl LbEndpoint { pub fn grpc_service(&self) -> Result { - GrpcService::try_new(self.http_channel.clone(), self.authority.clone()) + match &self.address { + EndpointAddressType::Socket(authority, http_channel, _) => { + GrpcService::try_new(http_channel.clone(), authority.clone()) + }, + EndpointAddressType::Internal(_, _) => Err("Internal endpoints don't support gRPC service yet".into()), + } + } + + pub fn http_channel(&self) -> Option<&HttpChannel> { + match &self.address { + EndpointAddressType::Socket(_, http_channel, _) => Some(http_channel), + EndpointAddressType::Internal(_, _) => None, + } + } + + pub fn tcp_channel(&self) -> Option<&TcpChannelConnector> { + match &self.address { + EndpointAddressType::Socket(_, _, tcp_channel) => Some(tcp_channel), + EndpointAddressType::Internal(_, _) => None, + } } } #[derive(Debug, Clone)] pub struct PartialLbEndpoint { - pub authority: http::uri::Authority, + pub address: EndpointAddress, pub bind_device: Option, pub weight: u32, pub health_status: HealthStatus, @@ -112,7 +226,7 @@ pub struct PartialLbEndpoint { impl PartialLbEndpoint { fn new(value: &LbEndpoint) -> Self { PartialLbEndpoint { - authority: value.authority.clone(), + address: value.address.to_endpoint_address(), bind_device: value.bind_device.clone(), weight: value.weight, health_status: value.health_status, @@ -122,7 +236,10 @@ impl PartialLbEndpoint { impl EndpointWithLoad for LbEndpoint { fn http_load(&self) -> u32 { - self.http_channel.load() + match &self.address { + EndpointAddressType::Socket(_, http_channel, _) => http_channel.load(), + EndpointAddressType::Internal(_, _) => 0, // Internal endpoints don't have HTTP load tracking yet + } } } @@ -147,37 +264,41 @@ impl LbEndpointBuilder { pub fn build(self) -> Result> { let cluster_name = self.cluster_name; - let PartialLbEndpoint { authority, bind_device, weight, health_status } = self.endpoint; - - let builder = HttpChannelBuilder::new(bind_device.clone()) - .with_authority(authority.clone()) - .with_timeout(self.connect_timeout) - .with_cluster_name(cluster_name); - - let maybe_tls_conf = self.transport_socket.tls_configurator(); - let builder = if let Some(server_name) = self.server_name { - builder.with_tls(maybe_tls_conf.cloned()).with_server_name(server_name) - } else { - builder.with_tls(maybe_tls_conf.cloned()) + let PartialLbEndpoint { ref address, bind_device, weight, health_status } = self.endpoint; + + let address = match address { + EndpointAddress::Socket(socket_addr) => { + let authority = http::uri::Authority::try_from(format!("{socket_addr}"))?; + let mut builder = HttpChannelBuilder::new(bind_device.clone()) + .with_timeout(self.connect_timeout) + .with_authority(authority.clone()); + + // Configure TLS if needed + if let UpstreamTransportSocketConfigurator::Tls(tls_configurator) = &self.transport_socket { + builder = builder.with_tls(Some(tls_configurator.clone())); + } + + let builder = if let Some(_bind_device) = &bind_device { builder } else { builder }; + let http_channel = builder.with_http_protocol_options(self.http_protocol_options).build()?; + let tcp_channel = TcpChannelConnector::new( + &authority, + cluster_name, + bind_device.clone(), + self.connect_timeout, + self.transport_socket.clone(), + ); + EndpointAddressType::Socket(authority, http_channel, tcp_channel) + }, + EndpointAddress::Internal(internal_addr) => EndpointAddressType::Internal( + internal_addr.clone(), + InternalConnection { + server_listener_name: internal_addr.server_listener_name.clone(), + endpoint_id: internal_addr.endpoint_id.clone(), + }, + ), }; - let http_channel = builder.with_http_protocol_options(self.http_protocol_options).build()?; - let tcp_channel = TcpChannelConnector::new( - &authority, - cluster_name, - bind_device.clone(), - self.connect_timeout, - self.transport_socket.clone(), - ); - Ok(Arc::new(LbEndpoint { - name: cluster_name, - authority, - bind_device, - weight, - health_status, - http_channel, - tcp_channel, - })) + Ok(Arc::new(LbEndpoint { name: cluster_name.into(), address, bind_device, weight, health_status })) } } @@ -187,9 +308,8 @@ impl TryFrom for PartialLbEndpoint { fn try_from(lb_endpoint: LbEndpointConfig) -> Result { let health_status = lb_endpoint.health_status; let address = lb_endpoint.address; - let authority = http::uri::Authority::try_from(format!("{address}"))?; let weight = lb_endpoint.load_balancing_weight.into(); - Ok(PartialLbEndpoint { authority, bind_device: None, weight, health_status }) + Ok(PartialLbEndpoint { address, bind_device: None, weight, health_status }) } } @@ -350,12 +470,12 @@ pub struct PartialClusterLoadAssignment { impl ClusterLoadAssignment { pub fn get_http_channel(&mut self, hash: Option) -> Result { let endpoint = self.balancer.next_item(hash).ok_or("No active endpoint")?; - Ok(endpoint.http_channel.clone()) + Ok(endpoint.http_channel().ok_or("No HTTP channel available for this endpoint")?.clone()) } pub fn get_tcp_channel(&mut self) -> Result { let endpoint = self.balancer.next_item(None).ok_or("No active endpoint")?; - Ok(endpoint.tcp_channel.clone()) + Ok(endpoint.tcp_channel().ok_or("No TCP channel available for this endpoint")?.clone()) } pub fn get_grpc_channel(&mut self) -> Result { @@ -364,22 +484,30 @@ impl ClusterLoadAssignment { } pub fn all_http_channels(&self) -> Vec<(Authority, HttpChannel)> { - self.all_endpoints_iter().map(|endpoint| (endpoint.authority.clone(), endpoint.http_channel.clone())).collect() + self.all_endpoints_iter() + .filter_map(|endpoint| { + endpoint.http_channel().map(|channel| (endpoint.authority().clone(), channel.clone())) + }) + .collect() } pub fn all_tcp_channels(&self) -> Vec<(Authority, TcpChannelConnector)> { - self.all_endpoints_iter().map(|endpoint| (endpoint.authority.clone(), endpoint.tcp_channel.clone())).collect() + self.all_endpoints_iter() + .filter_map(|endpoint| { + endpoint.tcp_channel().map(|channel| (endpoint.authority().clone(), channel.clone())) + }) + .collect() } pub fn try_all_grpc_channels(&self) -> Vec> { self.all_endpoints_iter() - .map(|endpoint| endpoint.grpc_service().map(|channel| (endpoint.authority.clone(), channel))) + .map(|endpoint| endpoint.grpc_service().map(|channel| (endpoint.authority().clone(), channel))) .collect() } pub fn update_endpoint_health(&mut self, authority: &http::uri::Authority, health: HealthStatus) { for locality in &self.endpoints { - locality.endpoints.iter().filter(|endpoint| &endpoint.authority == authority).for_each(|endpoint| { + locality.endpoints.iter().filter(|endpoint| endpoint.authority() == authority).for_each(|endpoint| { if let Err(err) = self.balancer.update_health(endpoint, health) { debug!("Could not update endpoint health: {}", err); } @@ -484,7 +612,7 @@ impl TryFrom for PartialClusterLoadAssignment { mod test { use http::uri::Authority; - use super::LbEndpoint; + use super::{EndpointAddressType, LbEndpoint}; use crate::{ clusters::health::HealthStatus, transport::{ @@ -508,13 +636,19 @@ mod test { .unwrap(); let tcp_channel = TcpChannelConnector::new( &authority, - "test_cluster", + cluster_name, bind_device.clone(), None, - UpstreamTransportSocketConfigurator::default(), + UpstreamTransportSocketConfigurator::None, ); - Self { name: "Cluster", authority, bind_device, weight, health_status, http_channel, tcp_channel } + Self { + name: "Cluster".into(), + address: EndpointAddressType::Socket(authority, http_channel, tcp_channel), + bind_device, + weight, + health_status, + } } } } diff --git a/orion-lib/src/listeners/filterchain.rs b/orion-lib/src/listeners/filterchain.rs index ce13aa63..162a9b15 100644 --- a/orion-lib/src/listeners/filterchain.rs +++ b/orion-lib/src/listeners/filterchain.rs @@ -97,7 +97,6 @@ pub struct FilterchainBuilder { rbac_filters: Vec, tls_configurator: Option>, } - impl FilterchainBuilder { pub fn with_listener_name(self, name: &'static str) -> Self { FilterchainBuilder { listener_name: Some(name), ..self } @@ -255,7 +254,6 @@ impl FilterchainType { } let tcp_proxy = tcp_proxy.clone(); - let listener_name = tcp_proxy.listener_name; let server_config = config .tls_configurator .clone() diff --git a/orion-lib/src/listeners/http_connection_manager.rs b/orion-lib/src/listeners/http_connection_manager.rs index 9cf25d2b..4b89bc32 100644 --- a/orion-lib/src/listeners/http_connection_manager.rs +++ b/orion-lib/src/listeners/http_connection_manager.rs @@ -917,11 +917,13 @@ impl Service> for HttpRequestHandler { trans_handler.thread_id(), &[KeyValue::new("listener", listener_name)] ); + let listener_name_for_defer = listener_name; defer! { - with_metric!(http::DOWNSTREAM_RQ_ACTIVE, sub, 1, trans_handler.thread_id(), &[KeyValue::new("listener", listener_name)]); + with_metric!(http::DOWNSTREAM_RQ_ACTIVE, sub, 1, trans_handler.thread_id(), &[KeyValue::new("listener", listener_name_for_defer)]); } let trans_handler = trans_handler.clone(); + let listener_name_for_trace = listener_name; Box::pin(async move { let ExtendedRequest { request, downstream_metadata } = req; let (parts, body) = request.into_parts(); @@ -949,6 +951,9 @@ impl Service> for HttpRequestHandler { let initial_event = request.extensions().get::>().cloned().unwrap_or_default(); let req_head_size = request_head_size(&request); + let listener_name_for_body = listener_name; + let listener_name_for_route = listener_name; + let listener_name_for_response = listener_name; let request = request.map(|body| { let trans_handler = Arc::clone(&trans_handler); BodyWithMetrics::new(BodyKind::Request, body, move |nbytes, body_error, body_flags| { @@ -957,7 +962,7 @@ impl Service> for HttpRequestHandler { add, nbytes + req_head_size as u64, trans_handler.thread_id(), - &[KeyValue::new("listener", listener_name)] + &[KeyValue::new("listener", listener_name_for_body)] ); // emit the access log, if the transaction is completed.. @@ -1019,7 +1024,7 @@ impl Service> for HttpRequestHandler { add, 1, trans_handler.thread_id(), - &[KeyValue::new("listener", listener_name)] + &[KeyValue::new("listener", listener_name_for_route.to_string())] ); if let Some(state) = trans_handler.span_state.as_ref() { @@ -1044,7 +1049,7 @@ impl Service> for HttpRequestHandler { add, nbytes + resp_head_size as u64, trans_handler.thread_id(), - &[KeyValue::new("listener", listener_name)] + &[KeyValue::new("listener", listener_name_for_response.to_string())] ); let is_transaction_complete = if let Some(ctx) = trans_handler.access_log_ctx.as_ref() { @@ -1098,7 +1103,7 @@ impl Service> for HttpRequestHandler { .handle_transaction(route_conf, manager, permit, request, downstream_metadata) .await; - trans_handler.trace_status_code(response, listener_name) + trans_handler.trace_status_code(response, listener_name_for_trace) }) } } diff --git a/orion-lib/src/listeners/listener.rs b/orion-lib/src/listeners/listener.rs index be296b87..de1e37fa 100644 --- a/orion-lib/src/listeners/listener.rs +++ b/orion-lib/src/listeners/listener.rs @@ -31,6 +31,7 @@ use orion_configuration::config::{ listener_filters::DownstreamProxyProtocolConfig, }; use orion_interner::StringInterner; + use orion_metrics::{ metrics::{http, listeners}, with_histogram, with_metric, @@ -45,6 +46,7 @@ use std::{ atomic::{AtomicBool, Ordering}, Arc, }, + time::Instant, }; use tokio::{ net::{TcpListener, TcpSocket}, @@ -55,13 +57,24 @@ use tracing::{debug, info, warn}; #[derive(Debug, Clone)] struct PartialListener { name: &'static str, - socket_address: std::net::SocketAddr, + address: ListenerAddress, bind_device: Option, filter_chains: HashMap, with_tls_inspector: bool, proxy_protocol_config: Option, with_tlv_listener_filter: bool, } + +#[derive(Debug, Clone)] +enum ListenerAddress { + Socket(std::net::SocketAddr), + Internal(InternalListenerConfig), +} + +#[derive(Debug, Clone)] +struct InternalListenerConfig { + buffer_size_kb: Option, +} #[derive(Debug, Clone)] pub struct ListenerFactory { listener: PartialListener, @@ -72,7 +85,14 @@ impl TryFrom> for PartialListener { fn try_from(ctx: ConversionContext<'_, ListenerConfig>) -> std::result::Result { let ConversionContext { envoy_object: listener, secret_manager } = ctx; let name = listener.name.to_static_str(); - let addr = listener.address; + let address = match listener.address { + orion_configuration::config::listener::ListenerAddress::Socket(socket_addr) => { + ListenerAddress::Socket(socket_addr) + }, + orion_configuration::config::listener::ListenerAddress::Internal(internal_listener) => { + ListenerAddress::Internal(InternalListenerConfig { buffer_size_kb: internal_listener.buffer_size_kb }) + }, + }; let with_tls_inspector = listener.with_tls_inspector; let proxy_protocol_config = listener.proxy_protocol_config; let with_tlv_listener_filter = listener.with_tlv_listener_filter; @@ -97,7 +117,7 @@ impl TryFrom> for PartialListener { Ok(PartialListener { name, - socket_address: addr, + address, bind_device, filter_chains, with_tls_inspector, @@ -115,7 +135,7 @@ impl ListenerFactory { ) -> Result { let PartialListener { name, - socket_address, + address, bind_device, filter_chains, with_tls_inspector, @@ -130,11 +150,11 @@ impl ListenerFactory { Ok(Listener { name, - socket_address, + address, bind_device, filter_chains, with_tls_inspector, - proxy_protocol_config, + proxy_protocol_config: proxy_protocol_config.map(Arc::new), with_tlv_listener_filter, route_updates_receiver, secret_updates_receiver, @@ -153,11 +173,11 @@ impl TryFrom> for ListenerFactory { #[derive(Debug)] pub struct Listener { name: &'static str, - socket_address: std::net::SocketAddr, + address: ListenerAddress, bind_device: Option, pub filter_chains: HashMap, with_tls_inspector: bool, - proxy_protocol_config: Option, + proxy_protocol_config: Option>, with_tlv_listener_filter: bool, route_updates_receiver: broadcast::Receiver, secret_updates_receiver: broadcast::Receiver, @@ -173,7 +193,7 @@ impl Listener { use std::net::{IpAddr, Ipv4Addr}; Listener { name, - socket_address: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0), + address: ListenerAddress::Socket(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0)), bind_device: None, filter_chains: HashMap::new(), with_tls_inspector: false, @@ -187,30 +207,75 @@ impl Listener { pub fn get_name(&self) -> &'static str { self.name } - pub fn get_socket(&self) -> (&std::net::SocketAddr, Option<&BindDevice>) { - (&self.socket_address, self.bind_device.as_ref()) + pub fn get_socket(&self) -> Option<(&std::net::SocketAddr, Option<&BindDevice>)> { + match &self.address { + ListenerAddress::Socket(socket_addr) => Some((socket_addr, self.bind_device.as_ref())), + ListenerAddress::Internal(_) => None, + } + } + + pub fn is_internal(&self) -> bool { + matches!(self.address, ListenerAddress::Internal(_)) } pub async fn start(self) -> Error { let Self { name, - socket_address: local_address, + address, bind_device, filter_chains, with_tls_inspector, proxy_protocol_config, with_tlv_listener_filter, - mut route_updates_receiver, - mut secret_updates_receiver, + route_updates_receiver, + secret_updates_receiver, } = self; - let listener = match configure_and_start_tcp_listener(local_address, bind_device.as_ref()) { - Ok(x) => x, - Err(e) => return e, - }; + match address { + ListenerAddress::Socket(local_address) => { + let listener = match configure_and_start_tcp_listener(local_address, bind_device.as_ref()) { + Ok(x) => x, + Err(e) => return e, + }; + info!("listener '{name}' started: {local_address}"); + Self::run_socket_listener( + name, + listener, + filter_chains, + with_tls_inspector, + proxy_protocol_config, + with_tlv_listener_filter, + route_updates_receiver, + secret_updates_receiver, + ) + .await + }, + ListenerAddress::Internal(internal_config) => { + info!("internal listener '{name}' started"); + Self::run_internal_listener( + name, + internal_config, + filter_chains, + with_tls_inspector, + with_tlv_listener_filter, + route_updates_receiver, + secret_updates_receiver, + ) + .await + }, + } + } - info!("listener '{name}' started: {local_address}"); + async fn run_socket_listener( + name: &'static str, + listener: TcpListener, + filter_chains: HashMap, + with_tls_inspector: bool, + proxy_protocol_config: Option>, + with_tlv_listener_filter: bool, + mut route_updates_receiver: broadcast::Receiver, + mut secret_updates_receiver: broadcast::Receiver, + ) -> Error { let mut filter_chains = Arc::new(filter_chains); - let proxy_protocol_config = proxy_protocol_config.map(Arc::new); let listener_name = name; loop { @@ -220,7 +285,7 @@ impl Listener { maybe_stream = listener.accept() => { match maybe_stream { Ok((stream, peer_addr)) => { - let start = std::time::Instant::now(); + let _start = std::time::Instant::now(); // This is a new downstream connection... let shard_id = std::thread::current().id(); @@ -236,7 +301,9 @@ impl Listener { // we could optimize a little here by either splitting up the filter_chain selection and rbac into the parts that can run // before we have the ClientHello and the ones after. since we might already have enough info to decide to drop the connection // or pick a specific filter_chain to run, or we could simply if-else on the with_tls_inspector variable. - tokio::spawn(Self::process_listener_update(name, filter_chains, with_tls_inspector, proxy_protocol_config, with_tlv_listener_filter, local_address, peer_addr, Box::new(stream), start)); + let local_address = listener.local_addr().unwrap_or_else(|_| "0.0.0.0:0".parse().expect("Failed to parse fallback address")); + let start = Instant::now(); + tokio::spawn(Self::process_listener_update(listener_name, filter_chains, with_tls_inspector, proxy_protocol_config, with_tlv_listener_filter, local_address, peer_addr, Box::new(stream), start)); }, Err(e) => {warn!("failed to accept tcp connection: {e}");} } @@ -244,7 +311,7 @@ impl Listener { maybe_route_update = route_updates_receiver.recv() => { //todo: add context to the error here once orion-error lands match maybe_route_update { - Ok(route_update) => {Self::process_route_update(name, &filter_chains, route_update)}, + Ok(route_update) => {Self::process_route_update(&name, &filter_chains, route_update)}, Err(e) => {return e.into();} } }, @@ -253,7 +320,7 @@ impl Listener { Ok(secret_update) => { // todo: possibly expensive clone - may need to rethink this structure let mut filter_chains_clone = filter_chains.as_ref().clone(); - Self::process_secret_update(name, &mut filter_chains_clone, secret_update); + Self::process_secret_update(&name, &mut filter_chains_clone, secret_update); filter_chains = Arc::new(filter_chains_clone); } Err(e) => {return e.into();} @@ -263,6 +330,42 @@ impl Listener { } } + async fn run_internal_listener( + name: &'static str, + _internal_config: InternalListenerConfig, + filter_chains: HashMap, + _with_tls_inspector: bool, + _with_tlv_listener_filter: bool, + mut route_updates_receiver: broadcast::Receiver, + mut secret_updates_receiver: broadcast::Receiver, + ) -> Error { + let filter_chains = Arc::new(filter_chains); + + // For now, internal listeners just wait for updates + // The actual connection handling will be implemented when we add the internal connection factory + loop { + tokio::select! { + maybe_route_update = route_updates_receiver.recv() => { + match maybe_route_update { + Ok(route_update) => {Self::process_route_update(&name, &filter_chains, route_update);} + Err(e) => {return e.into();} + } + }, + maybe_secret_update = secret_updates_receiver.recv() => { + match maybe_secret_update { + Ok(secret_update) => { + let mut filter_chains_clone = filter_chains.as_ref().clone(); + Self::process_secret_update(&name, &mut filter_chains_clone, secret_update); + // Note: For internal listeners, we'd need to update the shared state + // This will be implemented when we add the internal connection factory + } + Err(e) => {return e.into();} + } + } + } + } + } + fn select_filterchain<'a, T>( filter_chains: &'a HashMap, downstream_metadata: &DownstreamConnectionMetadata, @@ -369,8 +472,8 @@ impl Listener { let ssl = AtomicBool::new(false); defer! { - with_metric!(listeners::DOWNSTREAM_CX_DESTROY, add, 1, shard_id, &[KeyValue::new("listener", listener_name)]); - with_metric!(listeners::DOWNSTREAM_CX_ACTIVE, sub, 1, shard_id, &[KeyValue::new("listener", listener_name)]); + with_metric!(listeners::DOWNSTREAM_CX_DESTROY, add, 1, shard_id, &[KeyValue::new("listener", listener_name.to_string())]); + with_metric!(listeners::DOWNSTREAM_CX_ACTIVE, sub, 1, shard_id, &[KeyValue::new("listener", listener_name.to_string())]); if ssl.load(Ordering::Relaxed) { with_metric!(http::DOWNSTREAM_CX_SSL_ACTIVE, add, 1, shard_id, &[KeyValue::new("listener", listener_name)]); } @@ -409,7 +512,7 @@ impl Listener { add, 1, shard_id, - &[KeyValue::new("listener", listener_name)] + &[KeyValue::new("listener", listener_name.to_string())] ); with_metric!( http::DOWNSTREAM_CX_SSL_ACTIVE, diff --git a/orion-lib/src/listeners/listeners_manager.rs b/orion-lib/src/listeners/listeners_manager.rs index 65e53c20..7817f57b 100644 --- a/orion-lib/src/listeners/listeners_manager.rs +++ b/orion-lib/src/listeners/listeners_manager.rs @@ -20,7 +20,7 @@ use tokio::sync::{broadcast, mpsc}; use tracing::{info, warn}; use orion_configuration::config::{ - network_filters::http_connection_manager::RouteConfiguration, Listener as ListenerConfig, + listener::ListenerAddress, network_filters::http_connection_manager::RouteConfiguration, Listener as ListenerConfig, }; use super::listener::{Listener, ListenerFactory}; @@ -126,8 +126,11 @@ impl ListenersManager { pub fn start_listener(&mut self, listener: Listener, listener_conf: ListenerConfig) -> Result<()> { let listener_name = listener.get_name().to_string(); - let (addr, dev) = listener.get_socket(); - info!("Listener {} at {addr} (device bind:{})", listener_name, dev.is_some()); + if let Some((addr, dev)) = listener.get_socket() { + info!("Listener {} at {addr} (device bind:{})", listener_name, dev.is_some()); + } else { + info!("Internal listener {}", listener_name); + } self.version_counter += 1; let version = self.version_counter; @@ -190,7 +193,10 @@ mod tests { let l1 = Listener::test_listener(name, routeb_rx, secb_rx); let l1_info = ListenerConfig { name: name.into(), - address: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1234), + address: orion_configuration::config::listener::ListenerAddress::Socket(SocketAddr::new( + IpAddr::V4(Ipv4Addr::LOCALHOST), + 1234, + )), filter_chains: HashMap::default(), bind_device: None, with_tls_inspector: false, @@ -233,7 +239,10 @@ mod tests { let l1 = Listener::test_listener(name, routeb_rx, secb_rx); let l1_info = ListenerConfig { name: name.into(), - address: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1234), + address: orion_configuration::config::listener::ListenerAddress::Socket(SocketAddr::new( + IpAddr::V4(Ipv4Addr::LOCALHOST), + 1234, + )), filter_chains: HashMap::default(), bind_device: None, with_tls_inspector: false, @@ -275,7 +284,7 @@ mod tests { let l1 = Listener::test_listener(name, routeb_rx, secb_rx); let l1_info = ListenerConfig { name: name.into(), - address: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1234), + address: ListenerAddress::Socket(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1234)), filter_chains: HashMap::default(), bind_device: None, with_tls_inspector: false, @@ -292,7 +301,7 @@ mod tests { let l2 = Listener::test_listener(name, routeb_rx, secb_rx); let l2_info = ListenerConfig { name: name.into(), - address: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1235), // Different port + address: ListenerAddress::Socket(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1235)), // Different port filter_chains: HashMap::default(), bind_device: None, with_tls_inspector: false, @@ -309,7 +318,7 @@ mod tests { let l3 = Listener::test_listener(name, routeb_rx, secb_rx); let l3_info = ListenerConfig { name: name.into(), - address: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1236), // Different port + address: ListenerAddress::Socket(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1236)), // Different port filter_chains: HashMap::default(), bind_device: None, with_tls_inspector: false, diff --git a/orion-proxy/conf/envoy-bootstrap-tlv-demo.yaml b/orion-proxy/conf/envoy-bootstrap-tlv-demo.yaml new file mode 100644 index 00000000..2b1afa88 --- /dev/null +++ b/orion-proxy/conf/envoy-bootstrap-tlv-demo.yaml @@ -0,0 +1,81 @@ +static_resources: + listeners: + - name: listener_tlv_proxy_protocol + address: + socket_address: { address: 0.0.0.0, port_value: 8080 } + listener_filters: + - name: envoy.filters.listener.proxy_protocol + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.listener.proxy_protocol.v3.ProxyProtocol + allow_requests_without_proxy_protocol: false + stat_prefix: proxy_protocol + pass_through_tlvs: + match_type: INCLUDE_ALL + tlv_type: + - 1 # AWS VPC ID + - 2 # AWS Instance ID + - 3 # Custom Application Data + - 32 # SSL + - 33 # SSL Version + - 34 # SSL CN + - 35 # SSL Cipher + - 36 # SSL SIG ALG + - 37 # SSL KEY ALG + filter_chains: + - name: filter_chain_tlv_demo + filters: + - name: envoy.filters.network.http_connection_manager + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager + stat_prefix: ingress_http_tlv + codec_type: HTTP1 + http_filters: + - name: envoy.filters.http.router + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router + start_child_span: false + route_config: + name: tlv_demo_route + virtual_hosts: + - name: tlv_backend + domains: ["*"] + routes: + - match: + prefix: "/health" + direct_response: + status: 200 + body: + inline_string: "TLV Proxy Protocol Demo - Healthy 🚀" + - match: + prefix: "/tlv-info" + direct_response: + status: 200 + body: + inline_string: "This endpoint receives TLV data from proxy protocol" + - match: + prefix: "/" + route: + cluster: backend_cluster + + clusters: + - name: backend_cluster + connect_timeout: 0.25s + type: STATIC + lb_policy: ROUND_ROBIN + load_assignment: + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 9001 + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 9002 + +admin: + address: + socket_address: { address: 127.0.0.1, port_value: 9901 } diff --git a/orion-proxy/conf/internal-listener-demo.yaml b/orion-proxy/conf/internal-listener-demo.yaml new file mode 100644 index 00000000..c9483f4c --- /dev/null +++ b/orion-proxy/conf/internal-listener-demo.yaml @@ -0,0 +1,74 @@ +runtime: + num_cpus: 1 + num_runtimes: 1 + +logging: + log_level: "debug" + +bootstrap_extensions: + - internal_listener: + buffer_size_kb: 2048 + +admin: + address: "127.0.0.1:9901" + +static_resources: + listeners: + - name: "external_gateway_listener" + address: "0.0.0.0:10000" + filter_chains: + - name: "gateway_filter_chain" + terminal_filter: + http_connection_manager: + route_config: + name: "gateway_route" + virtual_hosts: + - name: "services" + domains: ["*"] + routes: + - match: + prefix: "/service-a" + route: + cluster: "internal_service_a_cluster" + - match: + prefix: "/" + direct_response: + status: 200 + body: "Internal Listener Demo Active" + + - name: "internal_mesh_listener" + address: + internal: + buffer_size_kb: 1024 + filter_chains: + - name: "internal_proxy_chain" + terminal_filter: + tcp_proxy: + cluster: "internal_backend_cluster" + + clusters: + - name: "internal_service_a_cluster" + type: STATIC + transport_socket: + internal_upstream: + passthrough_metadata: + - kind: HOST + name: "envoy.filters.listener.original_dst" + transport_socket: + raw_buffer: {} + load_assignment: + endpoints: + - lb_endpoints: + - endpoint: + address: + internal: + server_listener_name: "internal_mesh_listener" + endpoint_id: "service_a_endpoint_1" + + - name: "internal_backend_cluster" + type: STATIC + load_assignment: + endpoints: + - lb_endpoints: + - endpoint: + address: "127.0.0.1:8080" diff --git a/orion-proxy/conf/orion-runtime-tlv-demo.yaml b/orion-proxy/conf/orion-runtime-tlv-demo.yaml new file mode 100644 index 00000000..023c476c --- /dev/null +++ b/orion-proxy/conf/orion-runtime-tlv-demo.yaml @@ -0,0 +1,88 @@ +runtime: + num_cpus: 2 + num_runtimes: 2 + event_interval: 31 + global_queue_interval: null + max_io_events_per_tick: null + +logging: + log_level: "debug" + +envoy_bootstrap: + static_resources: + listeners: + - name: listener_tlv_proxy_protocol + address: + socket_address: { address: 0.0.0.0, port_value: 8080 } + listener_filters: + - name: envoy.filters.listener.proxy_protocol + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.listener.proxy_protocol.v3.ProxyProtocol + allow_requests_without_proxy_protocol: false + stat_prefix: proxy_protocol + pass_through_tlvs: + match_type: INCLUDE_ALL + tlv_type: + - 1 # AWS VPC ID + - 2 # AWS Instance ID + - 3 # Custom Application Data + - 32 # SSL + - 33 # SSL Version + - 34 # SSL CN + - 35 # SSL Cipher + - 36 # SSL SIG ALG + - 37 # SSL KEY ALG + filter_chains: + - name: filter_chain_tlv_demo + filters: + - name: envoy.filters.network.http_connection_manager + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager + stat_prefix: ingress_http_tlv + codec_type: HTTP1 + http_filters: + - name: envoy.filters.http.router + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router + start_child_span: false + route_config: + name: tlv_demo_route + virtual_hosts: + - name: tlv_backend + domains: ["*"] + routes: + - match: + prefix: "/health" + direct_response: + status: 200 + body: + inline_string: "TLV Proxy Protocol Demo - Healthy 🚀" + - match: + prefix: "/tlv-info" + direct_response: + status: 200 + body: + inline_string: "This endpoint receives TLV data from proxy protocol" + - match: + prefix: "/" + route: + cluster: backend_cluster + + clusters: + - name: backend_cluster + connect_timeout: 0.25s + type: STATIC + lb_policy: ROUND_ROBIN + load_assignment: + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 9001 + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 9002 diff --git a/orion-proxy/src/admin/config_dump.rs b/orion-proxy/src/admin/config_dump.rs index 892c6e5e..f7dbb1eb 100644 --- a/orion-proxy/src/admin/config_dump.rs +++ b/orion-proxy/src/admin/config_dump.rs @@ -233,7 +233,7 @@ mod config_dump_tests { async fn config_dump_listeners_and_routes() { use compact_str::CompactString; use orion_configuration::config::{ - listener::{FilterChain, FilterChainMatch, Listener, MainFilter}, + listener::{FilterChain, FilterChainMatch, Listener, ListenerAddress, MainFilter}, network_filters::http_connection_manager::{ route::{Action, RouteMatch}, CodecType, HttpConnectionManager, Route, RouteConfiguration, RouteSpecifier, VirtualHost, XffSettings, @@ -246,7 +246,7 @@ mod config_dump_tests { }; let listener = Listener { name: CompactString::from("listener1"), - address: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8080), + address: ListenerAddress::Socket(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8080)), filter_chains: { let mut map = HashMap::new(); map.insert( @@ -328,15 +328,16 @@ mod config_dump_tests { #[tokio::test] async fn config_dump_clusters() { use compact_str::CompactString; - use orion_configuration::config::{ - cluster::{ - Cluster, ClusterDiscoveryType, ClusterLoadAssignment, HealthStatus, HttpProtocolOptions, LbEndpoint, - LbPolicy, LocalityLbEndpoints, - }, - core::envoy_conversions::Address, + use orion_configuration::config::cluster::{ + Cluster, ClusterDiscoveryType, ClusterLoadAssignment, EndpointAddress, HealthStatus, HttpProtocolOptions, + LbEndpoint, LbPolicy, LocalityLbEndpoints, + }; + use std::{ + net::{IpAddr, Ipv4Addr, SocketAddr}, + num::NonZeroU32, + time::Duration, }; - use std::{num::NonZeroU32, time::Duration}; - let endpoint_addr = Address::Socket("127.0.0.1".to_owned(), 9000); + let endpoint_addr = EndpointAddress::Socket(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 9000)); let cluster = Cluster { name: CompactString::from("cluster1"), discovery_settings: ClusterDiscoveryType::Static(ClusterLoadAssignment { @@ -356,6 +357,7 @@ mod config_dump_tests { health_check: None, connect_timeout: Some(Duration::from_secs(5)), cleanup_interval: None, + internal_transport_socket: None, }; let secret_manager = orion_lib::SecretManager::default(); let partial_cluster = @@ -381,15 +383,16 @@ mod config_dump_tests { #[tokio::test] async fn config_dump_endpoints() { use compact_str::CompactString; - use orion_configuration::config::{ - cluster::{ - Cluster, ClusterDiscoveryType, ClusterLoadAssignment, HealthStatus, HttpProtocolOptions, LbEndpoint, - LbPolicy, LocalityLbEndpoints, - }, - core::envoy_conversions::Address, + use orion_configuration::config::cluster::{ + Cluster, ClusterDiscoveryType, ClusterLoadAssignment, EndpointAddress, HealthStatus, HttpProtocolOptions, + LbEndpoint, LbPolicy, LocalityLbEndpoints, + }; + use std::{ + net::{IpAddr, Ipv4Addr, SocketAddr}, + num::NonZeroU32, + time::Duration, }; - use std::{num::NonZeroU32, time::Duration}; - let endpoint_addr = Address::Socket("127.0.0.1".to_owned(), 9000); + let endpoint_addr = EndpointAddress::Socket(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 9000)); let cluster = Cluster { name: CompactString::from("cluster1"), discovery_settings: ClusterDiscoveryType::Static(ClusterLoadAssignment { @@ -409,6 +412,7 @@ mod config_dump_tests { health_check: None, connect_timeout: Some(Duration::from_secs(5)), cleanup_interval: None, + internal_transport_socket: None, }; let secret_manager = orion_lib::SecretManager::default(); let partial_cluster = diff --git a/orion-tracing/src/lib.rs b/orion-tracing/src/lib.rs index 226818cb..f9bc6365 100644 --- a/orion-tracing/src/lib.rs +++ b/orion-tracing/src/lib.rs @@ -124,7 +124,7 @@ pub fn otel_update_tracers(tracers: HashMap) -> Resul let mut cur_map = (*map_arc).clone(); let listeners = tracers.keys().cloned().map(|TracingKey(name, _)| name).collect::>(); - cur_map.retain(|TracingKey(name, _), _| !listeners.contains(name)); + cur_map.retain(|TracingKey(name, _), _| !listeners.contains(&name)); // insert new tracers... for (key, ref config) in tracers {