diff --git a/Cargo.lock b/Cargo.lock index 96c53a9c0e0..c8cec392202 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -78,47 +78,48 @@ dependencies = [ [[package]] name = "anstream" -version = "0.6.13" +version = "0.6.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d96bd03f33fe50a863e394ee9718a706f988b9079b20c3784fb726e7678b62fb" +checksum = "418c75fa768af9c03be99d17643f93f79bbba589895012a80e3452a19ddda15b" dependencies = [ "anstyle", "anstyle-parse", "anstyle-query", "anstyle-wincon", "colorchoice", + "is_terminal_polyfill", "utf8parse", ] [[package]] name = "anstyle" -version = "1.0.6" +version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8901269c6307e8d93993578286ac0edf7f195079ffff5ebdeea6a59ffb7e36bc" +checksum = "038dfcf04a5feb68e9c60b21c9625a54c2c0616e79b72b0fd87075a056ae1d1b" [[package]] name = "anstyle-parse" -version = "0.2.3" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c75ac65da39e5fe5ab759307499ddad880d724eed2f6ce5b5e8a26f4f387928c" +checksum = "c03a11a9034d92058ceb6ee011ce58af4a9bf61491aa7e1e59ecd24bd40d22d4" dependencies = [ "utf8parse", ] [[package]] name = "anstyle-query" -version = "1.0.2" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e28923312444cdd728e4738b3f9c9cac739500909bb3d3c94b43551b16517648" +checksum = "a64c907d4e79225ac72e2a354c9ce84d50ebb4586dee56c82b3ee73004f537f5" dependencies = [ "windows-sys 0.52.0", ] [[package]] name = "anstyle-wincon" -version = "3.0.2" +version = "3.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1cd54b81ec8d6180e24654d0b371ad22fc3dd083b6ff8ba325b72e00c87660a7" +checksum = "61a38449feb7068f52bb06c12759005cf459ee52bb4adc1d5a7c4322d716fb19" dependencies = [ "anstyle", "windows-sys 0.52.0", @@ -210,7 +211,7 @@ dependencies = [ "chrono", "chrono-tz", "half", - "hashbrown 0.14.3", + "hashbrown 0.14.5", "num", ] @@ -391,7 +392,7 @@ dependencies = [ "arrow-data", "arrow-schema", "half", - "hashbrown 0.14.3", + "hashbrown 0.14.5", ] [[package]] @@ -405,7 +406,7 @@ dependencies = [ "arrow-data", "arrow-schema", "half", - "hashbrown 0.14.3", + "hashbrown 0.14.5", ] [[package]] @@ -480,7 +481,7 @@ dependencies = [ "arrow", "chrono", "comfy-table", - "hashbrown 0.14.3", + "hashbrown 0.14.5", "num-traits", "once_cell", "regex", @@ -528,16 +529,16 @@ checksum = "136d4d23bcc79e27423727b36823d86233aad06dfea531837b038394d11e9928" dependencies = [ "concurrent-queue", "event-listener 5.3.0", - "event-listener-strategy 0.5.1", + "event-listener-strategy 0.5.2", "futures-core", "pin-project-lite", ] [[package]] name = "async-compression" -version = "0.4.8" +version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07dbbf24db18d609b1462965249abdf49129ccad073ec257da372adc83259c60" +checksum = "4e9eabd7a98fe442131a17c316bd9349c43695e49e730c3c8e12cfb5f4da2693" dependencies = [ "bzip2", "flate2", @@ -611,7 +612,7 @@ source = "git+https://github.com/influxdata/influxdb3_core?rev=b546e7f86ee9adbff dependencies = [ "async-trait", "backoff 0.1.0", - "base64 0.22.0", + "base64 0.22.1", "generated_types", "http", "iox_time", @@ -719,9 +720,9 @@ checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" [[package]] name = "base64" -version = "0.22.0" +version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9475866fec1451be56a3c2400fd081ff546538961565ccb5b7142cbd22bc7a51" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" [[package]] name = "base64ct" @@ -887,9 +888,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.0.95" +version = "1.0.96" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d32a725bc159af97c3e629873bb9f88fb8cf8a4867175f76dc987815ea07c83b" +checksum = "065a29261d53ba54260972629f9ca6bffa69bac13cd1fed61420f7fa68b9f8bd" dependencies = [ "jobserver", "libc", @@ -1027,9 +1028,9 @@ dependencies = [ [[package]] name = "colorchoice" -version = "1.0.0" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" +checksum = "0b6a852b24ab71dffc585bcb46eaf7959d175cb865a7152e35b348d1b2960422" [[package]] name = "comfy-table" @@ -1044,9 +1045,9 @@ dependencies = [ [[package]] name = "concurrent-queue" -version = "2.4.0" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d16048cd947b08fa32c24458a22f5dc5e835264f689f4f5653210c69fd107363" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" dependencies = [ "crossbeam-utils", ] @@ -1359,7 +1360,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" dependencies = [ "cfg-if", - "hashbrown 0.14.3", + "hashbrown 0.14.5", "lock_api", "once_cell", "parking_lot_core", @@ -1424,7 +1425,7 @@ dependencies = [ "futures", "glob", "half", - "hashbrown 0.14.3", + "hashbrown 0.14.5", "indexmap 2.2.6", "itertools 0.12.1", "log", @@ -1483,7 +1484,7 @@ dependencies = [ "datafusion-common", "datafusion-expr", "futures", - "hashbrown 0.14.3", + "hashbrown 0.14.5", "log", "object_store", "parking_lot", @@ -1515,7 +1516,7 @@ source = "git+https://github.com/influxdata/arrow-datafusion.git?rev=581e74785b8 dependencies = [ "arrow", "arrow-array", - "base64 0.22.0", + "base64 0.22.1", "chrono", "datafusion-common", "datafusion-execution", @@ -1551,7 +1552,7 @@ dependencies = [ "datafusion-common", "datafusion-expr", "datafusion-physical-expr", - "hashbrown 0.14.3", + "hashbrown 0.14.5", "itertools 0.12.1", "log", "regex-syntax 0.8.3", @@ -1569,7 +1570,7 @@ dependencies = [ "arrow-ord 50.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "arrow-schema", "arrow-string 50.0.0 (registry+https://github.com/rust-lang/crates.io-index)", - "base64 0.22.0", + "base64 0.22.1", "blake2", "blake3", "chrono", @@ -1577,7 +1578,7 @@ dependencies = [ "datafusion-execution", "datafusion-expr", "half", - "hashbrown 0.14.3", + "hashbrown 0.14.5", "hex", "indexmap 2.2.6", "itertools 0.12.1", @@ -1611,7 +1612,7 @@ dependencies = [ "datafusion-physical-expr", "futures", "half", - "hashbrown 0.14.3", + "hashbrown 0.14.5", "indexmap 2.2.6", "itertools 0.12.1", "log", @@ -1741,7 +1742,7 @@ source = "git+https://github.com/influxdata/influxdb3_core?rev=b546e7f86ee9adbff dependencies = [ "arrow_util", "data_types", - "hashbrown 0.14.3", + "hashbrown 0.14.5", "mutable_batch", "schema", "trace", @@ -1881,9 +1882,9 @@ dependencies = [ [[package]] name = "event-listener-strategy" -version = "0.5.1" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "332f51cb23d20b0de8458b86580878211da09bcd4503cb579c225b3d124cabb3" +checksum = "0f214dc438f977e6d4e3500aaa277f5ad94ca83fbbd9b1a15713ce2344ccc5a1" dependencies = [ "event-listener 5.3.0", "pin-project-lite", @@ -1911,9 +1912,9 @@ dependencies = [ [[package]] name = "fastrand" -version = "2.0.2" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "658bd65b1cf4c852a3cc96f18a8ce7b5640f6b703f905c7d74532294c2a63984" +checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a" [[package]] name = "fiat-crypto" @@ -1929,7 +1930,7 @@ checksum = "1ee447700ac8aa0b2f2bd7bc4462ad686ba06baa6727ac149a2d6277f0d240fd" dependencies = [ "cfg-if", "libc", - "redox_syscall", + "redox_syscall 0.4.1", "windows-sys 0.52.0", ] @@ -1969,9 +1970,9 @@ dependencies = [ [[package]] name = "flate2" -version = "1.0.28" +version = "1.0.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46303f565772937ffe1d394a4fac6f411c6013172fadde9dcdb1e147a086940e" +checksum = "5f54427cfd1c7829e2a139fcefea601bf088ebca651d2bf53ebc600eac295dae" dependencies = [ "crc32fast", "miniz_oxide", @@ -2226,9 +2227,9 @@ checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" [[package]] name = "hashbrown" -version = "0.14.3" +version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" dependencies = [ "ahash", "allocator-api2", @@ -2240,7 +2241,7 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8094feaf31ff591f651a2664fb9cfd92bba7a60ce3197265e9482ebe753c8f7" dependencies = [ - "hashbrown 0.14.3", + "hashbrown 0.14.5", ] [[package]] @@ -2479,7 +2480,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "168fb715dda47215e360912c096649d23d58bf392ac62f73919e831745e40f26" dependencies = [ "equivalent", - "hashbrown 0.14.3", + "hashbrown 0.14.5", ] [[package]] @@ -2523,7 +2524,7 @@ dependencies = [ "assert_cmd", "authz", "backtrace", - "base64 0.22.0", + "base64 0.22.1", "clap", "clap_blocks", "console-subscriber", @@ -2637,7 +2638,7 @@ dependencies = [ "arrow-schema", "async-trait", "authz", - "base64 0.22.0", + "base64 0.22.1", "bytes", "chrono", "data_types", @@ -2782,7 +2783,7 @@ version = "0.1.0" source = "git+https://github.com/influxdata/influxdb3_core?rev=b546e7f86ee9adbff0dd3c5e687140848397604a#b546e7f86ee9adbff0dd3c5e687140848397604a" dependencies = [ "arrow", - "base64 0.22.0", + "base64 0.22.1", "bytes", "data_types", "datafusion", @@ -2864,7 +2865,7 @@ dependencies = [ "data_types", "futures", "generated_types", - "hashbrown 0.14.3", + "hashbrown 0.14.5", "http", "iox_time", "log", @@ -2918,7 +2919,7 @@ dependencies = [ "datafusion_util", "executor", "futures", - "hashbrown 0.14.3", + "hashbrown 0.14.5", "indexmap 2.2.6", "iox_query_params", "iox_time", @@ -3013,7 +3014,7 @@ dependencies = [ "flate2", "futures", "generated_types", - "hashbrown 0.14.3", + "hashbrown 0.14.5", "heappy", "http", "hyper", @@ -3061,6 +3062,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "is_terminal_polyfill" +version = "1.70.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8478577c03552c21db0e2724ffb8986a5ce7af88107e6be5d2ee6e158c12800" + [[package]] name = "itertools" version = "0.10.5" @@ -3262,7 +3269,7 @@ dependencies = [ "backoff 0.4.0", "derivative", "futures", - "hashbrown 0.14.3", + "hashbrown 0.14.5", "json-patch", "k8s-openapi", "kube-client", @@ -3352,9 +3359,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.153" +version = "0.2.154" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" +checksum = "ae743338b92ff9146ce83992f766a31066a91a8c84a45e0e9f21e7cf6de6d346" [[package]] name = "libm" @@ -3387,9 +3394,9 @@ checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c" [[package]] name = "lock_api" -version = "0.4.11" +version = "0.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c168f8615b12bc01f9c17e2eb0cc07dcae1940121185446edc3744920e8ef45" +checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17" dependencies = [ "autocfg", "scopeguard", @@ -3597,7 +3604,7 @@ dependencies = [ "arrow", "arrow_util", "data_types", - "hashbrown 0.14.3", + "hashbrown 0.14.5", "iox_time", "itertools 0.12.1", "schema", @@ -3610,7 +3617,7 @@ name = "mutable_batch_lp" version = "0.1.0" source = "git+https://github.com/influxdata/influxdb3_core?rev=b546e7f86ee9adbff0dd3c5e687140848397604a#b546e7f86ee9adbff0dd3c5e687140848397604a" dependencies = [ - "hashbrown 0.14.3", + "hashbrown 0.14.5", "influxdb-line-protocol", "itertools 0.12.1", "mutable_batch", @@ -3626,7 +3633,7 @@ dependencies = [ "arrow_util", "dml", "generated_types", - "hashbrown 0.14.3", + "hashbrown 0.14.5", "mutable_batch", "schema", "snafu 0.8.2", @@ -3932,9 +3939,9 @@ checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" [[package]] name = "parking_lot" -version = "0.12.1" +version = "0.12.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" +checksum = "7e4af0ca4f6caed20e900d564c242b8e5d4903fdacf31d3daf527b66fe6f42fb" dependencies = [ "lock_api", "parking_lot_core", @@ -3942,15 +3949,15 @@ dependencies = [ [[package]] name = "parking_lot_core" -version = "0.9.9" +version = "0.9.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e" +checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" dependencies = [ "cfg-if", "libc", - "redox_syscall", + "redox_syscall 0.5.1", "smallvec", - "windows-targets 0.48.5", + "windows-targets 0.52.5", ] [[package]] @@ -3974,7 +3981,7 @@ dependencies = [ "flate2", "futures", "half", - "hashbrown 0.14.3", + "hashbrown 0.14.5", "lz4_flex", "num", "num-bigint", @@ -4035,7 +4042,7 @@ version = "0.1.0" source = "git+https://github.com/influxdata/influxdb3_core?rev=b546e7f86ee9adbff0dd3c5e687140848397604a#b546e7f86ee9adbff0dd3c5e687140848397604a" dependencies = [ "arrow", - "base64 0.22.0", + "base64 0.22.1", "bytes", "data_types", "datafusion", @@ -4060,9 +4067,9 @@ dependencies = [ [[package]] name = "parse-zoneinfo" -version = "0.3.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c705f256449c60da65e11ff6626e0c16a0a0b96aaa348de61376b249bc340f41" +checksum = "1f2a05b18d44e2957b88f96ba460715e295bc1d7510468a2f3d3b44535d26c24" dependencies = [ "regex", ] @@ -4116,7 +4123,7 @@ version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e459365e590736a54c3fa561947c84837534b8e9af6fc5bf781307e82658fae" dependencies = [ - "base64 0.22.0", + "base64 0.22.1", "serde", ] @@ -4137,9 +4144,9 @@ checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" [[package]] name = "pest" -version = "2.7.9" +version = "2.7.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "311fb059dee1a7b802f036316d790138c613a4e8b180c822e3925a662e9f0c95" +checksum = "560131c633294438da9f7c4b08189194b20946c8274c6b9e38881a7874dc8ee8" dependencies = [ "memchr", "thiserror", @@ -4148,9 +4155,9 @@ dependencies = [ [[package]] name = "pest_derive" -version = "2.7.9" +version = "2.7.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f73541b156d32197eecda1a4014d7f868fd2bcb3c550d5386087cfba442bf69c" +checksum = "26293c9193fbca7b1a3bf9b79dc1e388e927e6cacaa78b4a3ab705a1d3d41459" dependencies = [ "pest", "pest_generator", @@ -4158,9 +4165,9 @@ dependencies = [ [[package]] name = "pest_generator" -version = "2.7.9" +version = "2.7.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c35eeed0a3fab112f75165fdc026b3913f4183133f19b49be773ac9ea966e8bd" +checksum = "3ec22af7d3fb470a85dd2ca96b7c577a1eb4ef6f1683a9fe9a8c16e136c04687" dependencies = [ "pest", "pest_meta", @@ -4171,9 +4178,9 @@ dependencies = [ [[package]] name = "pest_meta" -version = "2.7.9" +version = "2.7.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2adbf29bb9776f28caece835398781ab24435585fe0d4dc1374a61db5accedca" +checksum = "d7a240022f37c361ec1878d646fc5b7d7c4d28d5946e1a80ad5a7a4f4ca0bdcd" dependencies = [ "once_cell", "pest", @@ -4621,9 +4628,9 @@ dependencies = [ [[package]] name = "raw-cpuid" -version = "11.0.1" +version = "11.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d86a7c4638d42c44551f4791a20e687dbb4c3de1f33c43dd71e355cd429def1" +checksum = "e29830cbb1290e404f24c73af91c5d8d631ce7e128691e9477556b540cd01ecd" dependencies = [ "bitflags 2.5.0", ] @@ -4657,6 +4664,15 @@ dependencies = [ "bitflags 1.3.2", ] +[[package]] +name = "redox_syscall" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "469052894dcb553421e483e4209ee581a45100d31b4018de03e5a7ad86374a7e" +dependencies = [ + "bitflags 2.5.0", +] + [[package]] name = "regex" version = "1.10.4" @@ -4819,9 +4835,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.21.11" +version = "0.21.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fecbfb7b1444f477b345853b1fce097a2c6fb637b2bfb87e6bc5db0f043fae4" +checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" dependencies = [ "log", "ring", @@ -4856,7 +4872,7 @@ version = "2.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "29993a25686778eb88d4189742cd713c9bce943bc54251a33509dc63cbacf73d" dependencies = [ - "base64 0.22.0", + "base64 0.22.1", "rustls-pki-types", ] @@ -4912,7 +4928,7 @@ version = "0.1.0" source = "git+https://github.com/influxdata/influxdb3_core?rev=b546e7f86ee9adbff0dd3c5e687140848397604a#b546e7f86ee9adbff0dd3c5e687140848397604a" dependencies = [ "arrow", - "hashbrown 0.14.3", + "hashbrown 0.14.5", "indexmap 2.2.6", "observability_deps", "once_cell", @@ -4922,9 +4938,9 @@ dependencies = [ [[package]] name = "schemars" -version = "0.8.16" +version = "0.8.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45a28f4c49489add4ce10783f7911893516f15afe45d015608d41faca6bc4d29" +checksum = "7f55c82c700538496bdc329bb4918a81f87cc8888811bd123cf325a0f2f8d309" dependencies = [ "dyn-clone", "schemars_derive", @@ -4934,14 +4950,14 @@ dependencies = [ [[package]] name = "schemars_derive" -version = "0.8.16" +version = "0.8.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c767fd6fa65d9ccf9cf026122c1b555f2ef9a4f0cea69da4d7dbc3e258d30967" +checksum = "83263746fe5e32097f06356968a077f96089739c927a61450efa069905eec108" dependencies = [ "proc-macro2", "quote", "serde_derive_internals", - "syn 1.0.109", + "syn 2.0.60", ] [[package]] @@ -5007,9 +5023,9 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" [[package]] name = "serde" -version = "1.0.198" +version = "1.0.200" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9846a40c979031340571da2545a4e5b7c4163bdae79b301d5f86d03979451fcc" +checksum = "ddc6f9cc94d67c0e21aaf7eda3a010fd3af78ebf6e096aa6e2e13c79749cce4f" dependencies = [ "serde_derive", ] @@ -5042,9 +5058,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.198" +version = "1.0.200" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e88edab869b01783ba905e7d0153f9fc1a6505a96e4ad3018011eedb838566d9" +checksum = "856f046b9400cee3c8c94ed572ecdb752444c24528c035cd35882aad6f492bcb" dependencies = [ "proc-macro2", "quote", @@ -5053,13 +5069,13 @@ dependencies = [ [[package]] name = "serde_derive_internals" -version = "0.26.0" +version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85bf8229e7920a9f636479437026331ce11aa132b4dde37d121944a44d6e5f3c" +checksum = "330f01ce65a3a5fe59a60c82f3c9a024b573b8a6e875bd233fe5f934e71d54e3" dependencies = [ "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.60", ] [[package]] @@ -5287,9 +5303,9 @@ checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b" [[package]] name = "socket2" -version = "0.5.6" +version = "0.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05ffd9c0a93b7543e062e759284fcf5f5e3b098501104bfbdde4d404db792871" +checksum = "ce305eb0b4296696835b71df73eb912e0f1ffd2556a501fcede6e0c50349191c" dependencies = [ "libc", "windows-sys 0.52.0", @@ -6200,7 +6216,7 @@ source = "git+https://github.com/influxdata/influxdb3_core?rev=b546e7f86ee9adbff dependencies = [ "bytes", "futures", - "hashbrown 0.14.3", + "hashbrown 0.14.5", "http", "http-body", "itertools 0.12.1", @@ -6296,7 +6312,7 @@ version = "0.1.0" source = "git+https://github.com/influxdata/influxdb3_core?rev=b546e7f86ee9adbff0dd3c5e687140848397604a#b546e7f86ee9adbff0dd3c5e687140848397604a" dependencies = [ "futures", - "hashbrown 0.14.3", + "hashbrown 0.14.5", "iox_time", "lock_api", "metric", @@ -6401,9 +6417,9 @@ checksum = "d4c87d22b6e3f4a18d4d40ef354e97c90fcb14dd91d7dc0aa9d8a1172ebf7202" [[package]] name = "unicode-width" -version = "0.1.11" +version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e51733f11c9c4f72aa0c160008246859e340b00807569a0da0e7a1079b27ba85" +checksum = "68f5e5f3158ecfd4b8ff6fe086db7c8467a2dfdac97fe420f2b7c4aa97af66d6" [[package]] name = "unicode_categories" @@ -6620,7 +6636,7 @@ version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a44ab49fad634e88f55bf8f9bb3abd2f27d7204172a112c7c9987e01c1c94ea9" dependencies = [ - "redox_syscall", + "redox_syscall 0.4.1", "wasite", ] @@ -6852,7 +6868,7 @@ dependencies = [ "futures-task", "futures-util", "getrandom", - "hashbrown 0.14.3", + "hashbrown 0.14.5", "heck 0.4.1", "hyper", "hyper-rustls", diff --git a/influxdb3_server/src/query_executor.rs b/influxdb3_server/src/query_executor.rs index d5a762d7277..64288824621 100644 --- a/influxdb3_server/src/query_executor.rs +++ b/influxdb3_server/src/query_executor.rs @@ -13,6 +13,7 @@ use datafusion::datasource::{TableProvider, TableType}; use datafusion::error::DataFusionError; use datafusion::execution::context::SessionState; use datafusion::execution::SendableRecordBatchStream; +use datafusion::logical_expr::TableProviderFilterPushDown; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::Expr; use datafusion_util::config::DEFAULT_SCHEMA; @@ -468,6 +469,13 @@ impl TableProvider for QueryTable { TableType::Base } + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> datafusion::common::Result> { + Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()]) + } + async fn scan( &self, ctx: &SessionState, diff --git a/influxdb3_write/src/catalog.rs b/influxdb3_write/src/catalog.rs index b70953f12ff..8433cf82add 100644 --- a/influxdb3_write/src/catalog.rs +++ b/influxdb3_write/src/catalog.rs @@ -319,6 +319,19 @@ impl TableDefinition { self.schema = schema; } + pub(crate) fn index_columns(&self) -> Vec { + self.columns + .iter() + .filter_map(|(name, column_type)| { + if *column_type == ColumnType::Tag as i16 { + Some(name.clone()) + } else { + None + } + }) + .collect() + } + #[allow(dead_code)] pub(crate) fn schema(&self) -> &Schema { &self.schema diff --git a/influxdb3_write/src/write_buffer/buffer_segment.rs b/influxdb3_write/src/write_buffer/buffer_segment.rs index 71a10a0183f..d9bd9fcce1f 100644 --- a/influxdb3_write/src/write_buffer/buffer_segment.rs +++ b/influxdb3_write/src/write_buffer/buffer_segment.rs @@ -2,12 +2,11 @@ //! single WAL segment. Only one segment should be open for writes in the write buffer at any //! given time. -use crate::catalog::Catalog; +use crate::catalog::{Catalog, DatabaseSchema}; use crate::chunk::BufferChunk; use crate::paths::ParquetFilePath; use crate::write_buffer::flusher::BufferedWriteResult; -use crate::write_buffer::table_buffer::Builder; -use crate::write_buffer::table_buffer::TableBuffer; +use crate::write_buffer::table_buffer::{Builder, Result as TableBufferResult, TableBuffer}; use crate::write_buffer::{ parse_validate_and_update_catalog, Error, TableBatch, ValidSegmentedData, }; @@ -16,19 +15,20 @@ use crate::{ Persister, Precision, SegmentDuration, SegmentId, SegmentRange, SequenceNumber, TableParquetFiles, WalOp, WalSegmentReader, WalSegmentWriter, }; +use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use data_types::ChunkId; use data_types::ChunkOrder; use data_types::TableId; use data_types::TransitionPartitionId; use data_types::{NamespaceName, PartitionKey}; +use datafusion::logical_expr::Expr; use datafusion_util::stream_from_batches; use iox_query::chunk_statistics::create_chunk_statistics; use iox_query::frontend::reorg::ReorgPlanner; use iox_query::QueryChunk; use iox_time::Time; use schema::sort::SortKey; -use schema::Schema; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; @@ -43,6 +43,7 @@ pub struct OpenBufferSegment { segment_key: PartitionKey, buffered_data: BufferedData, segment_open_time: Time, + catalog: Arc, #[allow(dead_code)] starting_catalog_sequence_number: SequenceNumber, // TODO: This is temporarily just the number of rows in the segment. When the buffer gets refactored to use @@ -52,6 +53,7 @@ pub struct OpenBufferSegment { impl OpenBufferSegment { pub fn new( + catalog: Arc, segment_id: SegmentId, segment_range: SegmentRange, segment_open_time: Time, @@ -64,6 +66,7 @@ impl OpenBufferSegment { let segment_duration = SegmentDuration::from_range(segment_range); Self { + catalog, segment_writer, segment_id, segment_range, @@ -105,7 +108,16 @@ impl OpenBufferSegment { .buffered_data .database_buffers .entry(db_name.to_string()) - .or_default(); + .or_insert_with(|| { + let db_schema = self + .catalog + .db_schema(&db_name) + .expect("db schema should exist"); + DatabaseBuffer { + table_buffers: HashMap::new(), + db_schema, + } + }); for (table_name, table_batch) in db_batch.table_batches { // TODO: for now we'll just have the number of rows represent the segment size. The entire @@ -120,14 +132,15 @@ impl OpenBufferSegment { } /// Returns the table data as record batches - pub(crate) fn table_record_batches( + pub(crate) fn table_record_batch( &self, db_name: &str, table_name: &str, - schema: &Schema, - ) -> Option> { + schema: SchemaRef, + filter: &[Expr], + ) -> Option> { self.buffered_data - .table_record_batches(db_name, table_name, schema) + .table_record_batches(db_name, table_name, schema, filter) } /// Returns true if the segment should be persisted. A segment should be persisted if both of @@ -186,10 +199,18 @@ pub(crate) fn load_buffer_from_segment( Precision::Nanosecond, )?; - let db_buffer = buffered_data - .database_buffers - .entry(write.db_name) - .or_default(); + let db_name = &write.db_name; + if !buffered_data.database_buffers.contains_key(db_name) { + let db_schema = catalog.db_schema(db_name).expect("db schema should exist"); + buffered_data.database_buffers.insert( + db_name.clone(), + DatabaseBuffer { + table_buffers: HashMap::new(), + db_schema, + }, + ); + } + let db_buffer = buffered_data.database_buffers.get_mut(db_name).unwrap(); // there should only ever be data for a single segment as this is all read // from one segment file @@ -261,12 +282,13 @@ impl BufferedData { &self, db_name: &str, table_name: &str, - schema: &Schema, - ) -> Option> { + schema: SchemaRef, + filter: &[Expr], + ) -> Option> { self.database_buffers .get(db_name) .and_then(|db_buffer| db_buffer.table_buffers.get(table_name)) - .map(|table_buffer| table_buffer.record_batches(schema)) + .map(|table_buffer| table_buffer.record_batch(schema, filter)) } /// Verifies that the passed in buffer has the same data as this buffer @@ -282,8 +304,10 @@ impl BufferedData { let other_table_buffer = other_db_buffer.table_buffers.get(table_name).unwrap(); let schema = db_schema.get_table_schema(table_name).unwrap(); - let table_data = table_buffer.record_batches(schema); - let other_table_data = other_table_buffer.record_batches(schema); + let table_data = table_buffer.record_batch(schema.as_arrow(), &[]).unwrap(); + let other_table_data = other_table_buffer + .record_batch(schema.as_arrow(), &[]) + .unwrap(); assert_eq!(table_data, other_table_data); } @@ -291,9 +315,10 @@ impl BufferedData { } } -#[derive(Debug, Default)] +#[derive(Debug)] struct DatabaseBuffer { table_buffers: HashMap, + db_schema: Arc, } impl DatabaseBuffer { @@ -303,10 +328,22 @@ impl DatabaseBuffer { segment_key: &PartitionKey, table_batch: TableBatch, ) { + if !self.table_buffers.contains_key(&table_name) { + // TODO: this check shouldn't be necessary. If the table doesn't exist in the catalog + // and we've gotten here, it means we're dropping a write. + if let Some(table) = self.db_schema.get_table(&table_name) { + self.table_buffers.insert( + table_name.clone(), + TableBuffer::new(segment_key.clone(), &table.index_columns()), + ); + } else { + return; + } + } let table_buffer = self .table_buffers - .entry(table_name) - .or_insert_with(|| TableBuffer::new(segment_key.clone())); + .get_mut(&table_name) + .expect("table buffer should exist"); table_buffer.add_rows(table_batch.rows); } @@ -389,8 +426,8 @@ impl ClosedBufferSegment { // All of the record batches for this table that we will // want to dedupe - let batches = table_buffer.record_batches(table.schema()); - let row_count = batches.iter().map(|b| b.num_rows()).sum(); + let batch = table_buffer.record_batch(table.schema().as_arrow(), &[])?; + let row_count = batch.num_rows(); // Dedupe and sort using the COMPACT query built into // iox_query @@ -406,7 +443,7 @@ impl ClosedBufferSegment { ); chunks.push(Arc::new(BufferChunk { - batches, + batches: vec![batch], schema: schema.clone(), stats: Arc::new(chunk_stats), partition_id: TransitionPartitionId::new( @@ -536,7 +573,9 @@ pub(crate) mod tests { #[test] fn buffers_rows() { + let catalog = Arc::new(Catalog::new()); let mut open_segment = OpenBufferSegment::new( + Arc::clone(&catalog), SegmentId::new(0), SegmentRange::test_range(), Time::from_timestamp_nanos(0), @@ -546,7 +585,6 @@ pub(crate) mod tests { ); let db_name: NamespaceName<'static> = NamespaceName::new("db1").unwrap(); - let catalog = Catalog::new(); let batches = lp_to_table_batches( &catalog, @@ -565,7 +603,13 @@ pub(crate) mod tests { let db_schema = catalog.db_schema("db1").unwrap(); let cpu_table = open_segment - .table_record_batches("db1", "cpu", db_schema.get_table_schema("cpu").unwrap()) + .table_record_batch( + "db1", + "cpu", + db_schema.get_table_schema("cpu").unwrap().as_arrow(), + &[], + ) + .unwrap() .unwrap(); let expected_cpu_table = vec![ "+------------------------------------------------------------------+-----+----------+--------------------------------+", @@ -575,10 +619,16 @@ pub(crate) mod tests { "| 505f9f5fc3347ac9d6ba45f2b2c94ad53a313e456e86e61db85ba1935369b238 | 2.0 | cupcakes | 1970-01-01T00:00:00.000000030Z |", "+------------------------------------------------------------------+-----+----------+--------------------------------+", ]; - assert_batches_eq!(&expected_cpu_table, &cpu_table); + assert_batches_eq!(&expected_cpu_table, &[cpu_table]); let mem_table = open_segment - .table_record_batches("db1", "mem", db_schema.get_table_schema("mem").unwrap()) + .table_record_batch( + "db1", + "mem", + db_schema.get_table_schema("mem").unwrap().as_arrow(), + &[], + ) + .unwrap() .unwrap(); let expected_mem_table = vec![ "+------------------------------------------------------------------+-----+--------+--------------------------------+", @@ -587,12 +637,14 @@ pub(crate) mod tests { "| 5ae2bb295e8b0dec713daf0da555ecd3f2899a8967f18db799e26557029198f3 | 2.0 | snakes | 1970-01-01T00:00:00.000000020Z |", "+------------------------------------------------------------------+-----+--------+--------------------------------+", ]; - assert_batches_eq!(&expected_mem_table, &mem_table); + assert_batches_eq!(&expected_mem_table, &[mem_table]); } #[tokio::test] async fn buffers_schema_update() { + let catalog = Arc::new(Catalog::new()); let mut open_segment = OpenBufferSegment::new( + Arc::clone(&catalog), SegmentId::new(0), SegmentRange::test_range(), Time::from_timestamp_nanos(0), @@ -602,7 +654,6 @@ pub(crate) mod tests { ); let db_name: NamespaceName<'static> = NamespaceName::new("db1").unwrap(); - let catalog = Catalog::new(); let batches = lp_to_table_batches(&catalog, "db1", "cpu,tag1=cupcakes bar=1 10", 10); let mut write_batch = WriteBatch::default(); @@ -628,7 +679,13 @@ pub(crate) mod tests { let db_schema = catalog.db_schema("db1").unwrap(); println!("{:?}", db_schema); let cpu_table = open_segment - .table_record_batches("db1", "cpu", db_schema.get_table_schema("cpu").unwrap()) + .table_record_batch( + "db1", + "cpu", + db_schema.get_table_schema("cpu").unwrap().as_arrow(), + &[], + ) + .unwrap() .unwrap(); let expected_cpu_table = vec![ "+------------------------------------------------------------------+-----+------+------+----------+------+--------------------------------+", @@ -641,7 +698,7 @@ pub(crate) mod tests { "| e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855 | | 2.1 | | | | 1970-01-01T00:00:00.000000040Z |", "+------------------------------------------------------------------+-----+------+------+----------+------+--------------------------------+", ]; - assert_batches_eq!(&expected_cpu_table, &cpu_table); + assert_batches_eq!(&expected_cpu_table, &[cpu_table]); } #[tokio::test] @@ -650,7 +707,9 @@ pub(crate) mod tests { let segment_id = SegmentId::new(4); let segment_writer = Box::new(WalSegmentWriterNoopImpl::new(segment_id)); + let catalog = Arc::new(Catalog::new()); let mut open_segment = OpenBufferSegment::new( + Arc::clone(&catalog), segment_id, SegmentRange::test_range(), Time::from_timestamp_nanos(0), @@ -659,8 +718,6 @@ pub(crate) mod tests { None, ); - let catalog = Catalog::new(); - // When we persist the data all of these duplicates should be removed let lp = "cpu,tag1=cupcakes bar=1 10\n\ cpu,tag1=cupcakes bar=1 10\n\ @@ -747,7 +804,9 @@ pub(crate) mod tests { #[test] fn should_persist() { + let catalog = Arc::new(Catalog::new()); let segment = OpenBufferSegment::new( + Arc::clone(&catalog), SegmentId::new(0), SegmentRange::from_time_and_duration( Time::from_timestamp_nanos(0), @@ -770,6 +829,7 @@ pub(crate) mod tests { assert!(segment.should_persist(Time::from_timestamp(61 + 30, 0).unwrap())); let segment = OpenBufferSegment::new( + Arc::clone(&catalog), SegmentId::new(0), SegmentRange::from_time_and_duration( Time::from_timestamp_nanos(0), diff --git a/influxdb3_write/src/write_buffer/flusher.rs b/influxdb3_write/src/write_buffer/flusher.rs index aca85e13104..0fa1b0dacf0 100644 --- a/influxdb3_write/src/write_buffer/flusher.rs +++ b/influxdb3_write/src/write_buffer/flusher.rs @@ -222,8 +222,10 @@ mod tests { #[tokio::test] async fn flushes_to_open_segment() { + let catalog = Arc::new(Catalog::new()); let segment_id = SegmentId::new(3); let open_segment = OpenBufferSegment::new( + Arc::clone(&catalog), segment_id, SegmentRange::test_range(), Time::from_timestamp_nanos(0), @@ -235,6 +237,7 @@ mod tests { let next_segment_range = SegmentRange::test_range().next(); let next_segment = OpenBufferSegment::new( + Arc::clone(&catalog), next_segment_id, next_segment_range, Time::from_timestamp_nanos(0), @@ -242,7 +245,6 @@ mod tests { Box::new(WalSegmentWriterNoopImpl::new(next_segment_id)), None, ); - let catalog = Arc::new(Catalog::new()); let segment_state = Arc::new(RwLock::new(SegmentState::::new( SegmentDuration::new_5m(), next_segment_id, @@ -294,17 +296,19 @@ mod tests { assert_eq!(segment.segment_id(), segment_id); let data = segment - .table_record_batches( + .table_record_batch( db_name.as_str(), "cpu", catalog .db_schema("db1") .unwrap() .get_table_schema("cpu") - .unwrap(), + .unwrap() + .as_arrow(), + &[], ) + .unwrap() .unwrap(); - let row_count = data.iter().map(|batch| batch.num_rows()).sum::(); - assert_eq!(row_count, 2); + assert_eq!(data.num_rows(), 2); } } diff --git a/influxdb3_write/src/write_buffer/loader.rs b/influxdb3_write/src/write_buffer/loader.rs index d34bcb6d629..d107708d5bf 100644 --- a/influxdb3_write/src/write_buffer/loader.rs +++ b/influxdb3_write/src/write_buffer/loader.rs @@ -75,6 +75,7 @@ where let buffer = load_buffer_from_segment(&catalog, segment_reader)?; let segment = OpenBufferSegment::new( + Arc::clone(&catalog), segment_header.id, segment_header.range, server_load_time, @@ -100,6 +101,7 @@ where max_segment_id = current_segment_id; let current_segment = OpenBufferSegment::new( + Arc::clone(&catalog), current_segment_id, current_segment_range, server_load_time, @@ -116,6 +118,7 @@ where max_segment_id = current_segment_id; let current_segment = OpenBufferSegment::new( + Arc::clone(&catalog), current_segment_id, current_segment_range, server_load_time, @@ -157,10 +160,12 @@ mod tests { async fn loads_without_wal() { let object_store: Arc = Arc::new(InMemory::new()); let persister = Arc::new(PersisterImpl::new(Arc::clone(&object_store))); + let catalog = Arc::new(Catalog::new()); let segment_id = SegmentId::new(4); let segment_writer = Box::new(WalSegmentWriterNoopImpl::new(segment_id)); let mut open_segment = OpenBufferSegment::new( + Arc::clone(&catalog), segment_id, SegmentRange::test_range(), Time::from_timestamp_nanos(0), @@ -169,8 +174,6 @@ mod tests { None, ); - let catalog = Catalog::new(); - let lp = "cpu,tag1=cupcakes bar=1 10\nmem,tag2=turtles bar=3 15\nmem,tag2=snakes bar=2 20"; let wal_op = WalOp::LpWrite(LpWriteOp { @@ -290,7 +293,8 @@ mod tests { let cpu_table = db.get_table("cpu").unwrap(); let cpu_data = current_segment - .table_record_batches(db_name, "cpu", cpu_table.schema()) + .table_record_batch(db_name, "cpu", cpu_table.schema().as_arrow(), &[]) + .unwrap() .unwrap(); let expected = [ "+------------------------------------------------------------------+-----+----------+--------------------------------+", @@ -299,11 +303,12 @@ mod tests { "| 505f9f5fc3347ac9d6ba45f2b2c94ad53a313e456e86e61db85ba1935369b238 | 1.0 | cupcakes | 1970-01-01T00:00:00.000000010Z |", "+------------------------------------------------------------------+-----+----------+--------------------------------+", ]; - assert_batches_eq!(&expected, &cpu_data); + assert_batches_eq!(&expected, &[cpu_data]); let mem_table = db.get_table("mem").unwrap(); let mem_data = current_segment - .table_record_batches(db_name, "mem", mem_table.schema()) + .table_record_batch(db_name, "mem", mem_table.schema().as_arrow(), &[]) + .unwrap() .unwrap(); let expected = [ "+------------------------------------------------------------------+-----+---------+--------------------------------+", @@ -313,7 +318,7 @@ mod tests { "| 5ae2bb295e8b0dec713daf0da555ecd3f2899a8967f18db799e26557029198f3 | 2.0 | snakes | 1970-01-01T00:00:00.000000020Z |", "+------------------------------------------------------------------+-----+---------+--------------------------------+", ]; - assert_batches_eq!(&expected, &mem_data); + assert_batches_eq!(&expected, &[mem_data]); assert_eq!(loaded_state.last_segment_id, SegmentId::new(1)); } @@ -382,6 +387,7 @@ mod tests { .new_segment_writer(next_segment_id, next_segment_range) .unwrap(); let mut next_segment = OpenBufferSegment::new( + Arc::clone(&catalog), SegmentId::new(2), SegmentRange::test_range().next(), Time::from_timestamp_nanos(0), @@ -463,7 +469,8 @@ mod tests { let cpu_table = db.get_table("cpu").unwrap(); let cpu_data = loaded_state.open_segments[0] - .table_record_batches(db_name, "cpu", cpu_table.schema()) + .table_record_batch(db_name, "cpu", cpu_table.schema().as_arrow(), &[]) + .unwrap() .unwrap(); let expected = [ "+------------------------------------------------------------------+-----+----------+--------------------------------+", @@ -472,11 +479,12 @@ mod tests { "| 505f9f5fc3347ac9d6ba45f2b2c94ad53a313e456e86e61db85ba1935369b238 | 3.0 | cupcakes | 1970-01-01T00:00:00.000000020Z |", "+------------------------------------------------------------------+-----+----------+--------------------------------+", ]; - assert_batches_eq!(&expected, &cpu_data); + assert_batches_eq!(&expected, &[cpu_data]); let foo_table = db.get_table("foo").unwrap(); let foo_data = loaded_state.open_segments[0] - .table_record_batches(db_name, "foo", foo_table.schema()) + .table_record_batch(db_name, "foo", foo_table.schema().as_arrow(), &[]) + .unwrap() .unwrap(); let expected = [ "+------------------------------------------------------------------+--------------------------------+-----+", @@ -485,7 +493,7 @@ mod tests { "| e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855 | 1970-01-01T00:00:00.000000123Z | 1.0 |", "+------------------------------------------------------------------+--------------------------------+-----+", ]; - assert_batches_eq!(&expected, &foo_data); + assert_batches_eq!(&expected, &[foo_data]); assert_eq!(loaded_state.last_segment_id, SegmentId::new(2)); } @@ -546,6 +554,7 @@ mod tests { .new_segment_writer(next_segment_id, next_segment_range) .unwrap(); let mut next_segment = OpenBufferSegment::new( + Arc::clone(&catalog), SegmentId::new(2), SegmentRange::test_range().next(), Time::from_timestamp_nanos(0), @@ -593,7 +602,8 @@ mod tests { let cpu_table = db.get_table("cpu").unwrap(); let cpu_data = loaded_state.open_segments[0] - .table_record_batches(db_name, "cpu", cpu_table.schema()) + .table_record_batch(db_name, "cpu", cpu_table.schema().as_arrow(), &[]) + .unwrap() .unwrap(); let expected = [ "+------------------------------------------------------------------+-----+--------+--------------------------------+", @@ -602,11 +612,12 @@ mod tests { "| 82a59579ecb9ae1adf113fe3a09a2ebd61aa15f92c570d26278d3f1dfe8bcbd8 | 3.0 | apples | 1970-01-01T00:00:00.000000020Z |", "+------------------------------------------------------------------+-----+--------+--------------------------------+", ]; - assert_batches_eq!(&expected, &cpu_data); + assert_batches_eq!(&expected, &[cpu_data]); let foo_table = db.get_table("foo").unwrap(); let foo_data = loaded_state.open_segments[0] - .table_record_batches(db_name, "foo", foo_table.schema()) + .table_record_batch(db_name, "foo", foo_table.schema().as_arrow(), &[]) + .unwrap() .unwrap(); let expected = [ "+------------------------------------------------------------------+--------------------------------+-----+", @@ -615,7 +626,7 @@ mod tests { "| e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855 | 1970-01-01T00:00:00.000000123Z | 1.0 |", "+------------------------------------------------------------------+--------------------------------+-----+", ]; - assert_batches_eq!(&expected, &foo_data); + assert_batches_eq!(&expected, &[foo_data]); assert_eq!(loaded_state.last_segment_id, SegmentId::new(2)); } diff --git a/influxdb3_write/src/write_buffer/mod.rs b/influxdb3_write/src/write_buffer/mod.rs index f44ed01a3bb..a04188056a0 100644 --- a/influxdb3_write/src/write_buffer/mod.rs +++ b/influxdb3_write/src/write_buffer/mod.rs @@ -77,6 +77,9 @@ pub enum Error { #[error("walop in file {0} contained data for more than one segment, which is invalid")] WalOpForMultipleSegments(String), + + #[error("error from table buffer: {0}")] + TableBufferError(#[from] table_buffer::Error), } pub type Result = std::result::Result; diff --git a/influxdb3_write/src/write_buffer/segment_state.rs b/influxdb3_write/src/write_buffer/segment_state.rs index d783c00b796..bf7a3851a4a 100644 --- a/influxdb3_write/src/write_buffer/segment_state.rs +++ b/influxdb3_write/src/write_buffer/segment_state.rs @@ -8,6 +8,7 @@ use crate::{ persister, wal, write_buffer, ParquetFile, PersistedSegment, Persister, SegmentDuration, SegmentId, SegmentRange, SequenceNumber, Wal, WalOp, }; +use arrow::datatypes::SchemaRef; #[cfg(test)] use arrow::record_batch::RecordBatch; use data_types::{ChunkId, ChunkOrder, TableId, TransitionPartitionId}; @@ -112,23 +113,36 @@ impl SegmentState { &self, db_schema: Arc, table_name: &str, - _filters: &[Expr], - _projection: Option<&Vec>, + filters: &[Expr], + projection: Option<&Vec>, _ctx: &SessionState, ) -> Result>, DataFusionError> { let table = db_schema .tables .get(table_name) .ok_or_else(|| DataFusionError::Execution(format!("table {} not found", table_name)))?; - let schema = table.schema.clone(); + + let arrow_schema: SchemaRef = match projection { + Some(projection) => Arc::new(table.schema.as_arrow().project(projection).unwrap()), + None => table.schema.as_arrow(), + }; + + let schema = schema::Schema::try_from(Arc::clone(&arrow_schema)) + .map_err(|e| DataFusionError::Execution(format!("schema error {}", e)))?; let mut chunks: Vec> = vec![]; for segment in self.segments.values() { - if let Some(batches) = - segment.table_record_batches(&db_schema.name, table_name, &schema) - { - let row_count = batches.iter().map(|b| b.num_rows()).sum(); + if let Some(batch) = segment.table_record_batch( + &db_schema.name, + table_name, + Arc::clone(&arrow_schema), + filters, + ) { + let batch = batch.map_err(|e| { + DataFusionError::Execution(format!("error getting batches {}", e)) + })?; + let row_count = batch.num_rows(); let chunk_stats = create_chunk_statistics( Some(row_count), @@ -138,7 +152,7 @@ impl SegmentState { ); chunks.push(Arc::new(BufferChunk { - batches, + batches: vec![batch], schema: schema.clone(), stats: Arc::new(chunk_stats), partition_id: TransitionPartitionId::new( @@ -158,12 +172,16 @@ impl SegmentState { } for persisting_segment in self.persisting_segments.values() { - if let Some(batches) = persisting_segment.buffered_data.table_record_batches( + if let Some(batch) = persisting_segment.buffered_data.table_record_batches( &db_schema.name, table_name, - &schema, + Arc::clone(&arrow_schema), + filters, ) { - let row_count = batches.iter().map(|b| b.num_rows()).sum(); + let batch = batch.map_err(|e| { + DataFusionError::Execution(format!("error getting batches {}", e)) + })?; + let row_count = batch.num_rows(); let chunk_stats = create_chunk_statistics( Some(row_count), @@ -173,7 +191,7 @@ impl SegmentState { ); chunks.push(Arc::new(BufferChunk { - batches, + batches: vec![batch], schema: schema.clone(), stats: Arc::new(chunk_stats), partition_id: TransitionPartitionId::new( @@ -232,8 +250,12 @@ impl SegmentState { ) -> Vec { self.segments .values() - .filter_map(|segment| segment.table_record_batches(db_name, table_name, schema)) - .flatten() + .map(|segment| { + segment + .table_record_batch(db_name, table_name, schema.as_arrow(), &[]) + .unwrap() + .unwrap() + }) .collect() } @@ -292,6 +314,7 @@ impl SegmentState { }; let segment = OpenBufferSegment::new( + Arc::clone(&self.catalog), segment_id, segment_range, self.time_provider.now(), @@ -470,6 +493,7 @@ mod tests { ); let open_segment1 = OpenBufferSegment::new( + Arc::clone(&catalog), SegmentId::new(1), first_segment_range, time_provider.now(), @@ -479,6 +503,7 @@ mod tests { ); let open_segment2 = OpenBufferSegment::new( + Arc::clone(&catalog), SegmentId::new(2), SegmentRange::from_time_and_duration( Time::from_timestamp(300, 0).unwrap(), @@ -492,6 +517,7 @@ mod tests { ); let open_segment3 = OpenBufferSegment::new( + Arc::clone(&catalog), SegmentId::new(3), SegmentRange::from_time_and_duration( Time::from_timestamp(600, 0).unwrap(), @@ -539,6 +565,7 @@ mod tests { ); let mut open_segment1 = OpenBufferSegment::new( + Arc::clone(&catalog), SegmentId::new(1), first_segment_range, time_provider.now(), @@ -551,6 +578,7 @@ mod tests { .unwrap(); let mut open_segment2 = OpenBufferSegment::new( + Arc::clone(&catalog), SegmentId::new(2), SegmentRange::from_time_and_duration( Time::from_timestamp(300, 0).unwrap(), @@ -567,6 +595,7 @@ mod tests { .unwrap(); let mut open_segment3 = OpenBufferSegment::new( + Arc::clone(&catalog), SegmentId::new(3), SegmentRange::from_time_and_duration( Time::from_timestamp(600, 0).unwrap(), diff --git a/influxdb3_write/src/write_buffer/table_buffer.rs b/influxdb3_write/src/write_buffer/table_buffer.rs index 2e1761ba112..c25fceb4b66 100644 --- a/influxdb3_write/src/write_buffer/table_buffer.rs +++ b/influxdb3_write/src/write_buffer/table_buffer.rs @@ -1,18 +1,30 @@ -//! The in memory bufffer of a table that can be quickly added to and queried +//! The in memory buffer of a table that can be quickly added to and queried use crate::write_buffer::{FieldData, Row}; use arrow::array::{ - ArrayRef, BooleanBuilder, Float64Builder, Int64Builder, StringBuilder, StringDictionaryBuilder, - TimestampNanosecondBuilder, UInt64Builder, + ArrayBuilder, ArrayRef, BooleanBuilder, Float64Builder, GenericByteDictionaryBuilder, + Int64Builder, StringArray, StringBuilder, StringDictionaryBuilder, TimestampNanosecondBuilder, + UInt64Builder, }; -use arrow::datatypes::Int32Type; +use arrow::datatypes::{GenericStringType, Int32Type, SchemaRef}; use arrow::record_batch::RecordBatch; use data_types::{PartitionKey, TimestampMinMax}; +use datafusion::logical_expr::{BinaryExpr, Expr}; use observability_deps::tracing::debug; -use schema::Schema; -use std::collections::{BTreeMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::sync::Arc; -use std::vec; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum Error { + #[error("Field not found in table buffer: {0}")] + FieldNotFound(String), + + #[error("Error creating record batch: {0}")] + RecordBatchError(#[from] arrow::error::ArrowError), +} + +pub type Result = std::result::Result; pub struct TableBuffer { pub segment_key: PartitionKey, @@ -20,16 +32,18 @@ pub struct TableBuffer { timestamp_max: i64, pub(crate) data: BTreeMap, row_count: usize, + index: BufferIndex, } impl TableBuffer { - pub fn new(segment_key: PartitionKey) -> Self { + pub fn new(segment_key: PartitionKey, index_columns: &[String]) -> Self { Self { segment_key, timestamp_min: i64::MAX, timestamp_max: i64::MIN, data: Default::default(), row_count: 0, + index: BufferIndex::new(index_columns), } } @@ -64,16 +78,22 @@ impl TableBuffer { } } FieldData::Tag(v) => { - let b = self.data.entry(f.name).or_insert_with(|| { + if !self.data.contains_key(&f.name) { let mut tag_builder = StringDictionaryBuilder::new(); // append nulls for all previous rows for _ in 0..(row_index + self.row_count) { tag_builder.append_null(); } - Builder::Tag(tag_builder) - }); + self.data.insert(f.name.clone(), Builder::Tag(tag_builder)); + } + let b = self + .data + .get_mut(&f.name) + .expect("tag builder should exist"); if let Builder::Tag(b) = b { - b.append(v).unwrap(); // we won't overflow the 32-bit integer for this dictionary + self.index.add_row_if_indexed_column(b.len(), &f.name, &v); + b.append(v) + .expect("shouldn't be able to overflow 32 bit dictionary"); } else { panic!("unexpected field type"); } @@ -183,20 +203,33 @@ impl TableBuffer { } } - pub fn record_batches(&self, schema: &Schema) -> Vec { - // ensure the order of the columns matches their order in the Arrow schema definition - let mut cols = Vec::with_capacity(self.data.len()); - let schema = schema.as_arrow(); - for f in &schema.fields { - cols.push( - self.data - .get(f.name()) - .unwrap_or_else(|| panic!("missing field in table buffer: {}", f.name())) - .as_arrow(), - ); + pub fn record_batch(&self, schema: SchemaRef, filter: &[Expr]) -> Result { + let row_ids = self.index.get_rows_from_index_for_filter(filter); + + let mut cols = Vec::with_capacity(schema.fields().len()); + + for f in schema.fields() { + match row_ids { + Some(row_ids) => { + let b = self + .data + .get(f.name()) + .ok_or_else(|| Error::FieldNotFound(f.name().to_string()))? + .get_rows(row_ids); + cols.push(b); + } + None => { + let b = self + .data + .get(f.name()) + .ok_or_else(|| Error::FieldNotFound(f.name().to_string()))? + .as_arrow(); + cols.push(b); + } + } } - vec![RecordBatch::try_new(schema, cols).unwrap()] + Ok(RecordBatch::try_new(schema, cols)?) } } @@ -212,6 +245,49 @@ impl std::fmt::Debug for TableBuffer { } } +#[derive(Debug, Default)] +struct BufferIndex { + // column name -> string value -> row indexes + columns: HashMap>>, +} + +impl BufferIndex { + fn new(columns: &[String]) -> Self { + let columns = columns + .iter() + .map(|c| (c.clone(), HashMap::new())) + .collect(); + Self { columns } + } + + fn add_row_if_indexed_column(&mut self, row_index: usize, column_name: &str, value: &str) { + if let Some(column) = self.columns.get_mut(column_name) { + column + .entry(value.to_string()) + .or_insert_with(Vec::new) + .push(row_index); + } + } + + fn get_rows_from_index_for_filter(&self, filter: &[Expr]) -> Option<&Vec> { + for expr in filter { + if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = expr { + if *op == datafusion::logical_expr::Operator::Eq { + if let Expr::Column(c) = left.as_ref() { + if let Expr::Literal(datafusion::scalar::ScalarValue::Utf8(Some(v))) = + right.as_ref() + { + return self.columns.get(c.name.as_str()).and_then(|m| m.get(v)); + } + } + } + } + } + + None + } +} + pub enum Builder { Bool(BooleanBuilder), I64(Int64Builder), @@ -234,4 +310,208 @@ impl Builder { Self::Time(b) => Arc::new(b.finish_cloned()), } } + + fn get_rows(&self, rows: &[usize]) -> ArrayRef { + match self { + Self::Bool(b) => { + let b = b.finish_cloned(); + let mut builder = BooleanBuilder::with_capacity(rows.len()); + for row in rows { + builder.append_value(b.value(*row)); + } + Arc::new(builder.finish()) + } + Self::I64(b) => { + let b = b.finish_cloned(); + let mut builder = Int64Builder::with_capacity(rows.len()); + for row in rows { + builder.append_value(b.value(*row)); + } + Arc::new(builder.finish()) + } + Self::F64(b) => { + let b = b.finish_cloned(); + let mut builder = Float64Builder::with_capacity(rows.len()); + for row in rows { + builder.append_value(b.value(*row)); + } + Arc::new(builder.finish()) + } + Self::U64(b) => { + let b = b.finish_cloned(); + let mut builder = UInt64Builder::with_capacity(rows.len()); + for row in rows { + builder.append_value(b.value(*row)); + } + Arc::new(builder.finish()) + } + Self::String(b) => { + let b = b.finish_cloned(); + let mut builder = StringBuilder::new(); + for row in rows { + builder.append_value(b.value(*row)); + } + Arc::new(builder.finish()) + } + Self::Tag(b) => { + let b = b.finish_cloned(); + let bv = b.values(); + let bva: &StringArray = bv.as_any().downcast_ref::().unwrap(); + + let mut builder: GenericByteDictionaryBuilder> = + StringDictionaryBuilder::new(); + for row in rows { + let val = b.key(*row).unwrap(); + let tag_val = bva.value(val); + + builder + .append(tag_val) + .expect("shouldn't be able to overflow 32 bit dictionary"); + } + Arc::new(builder.finish()) + } + Self::Time(b) => { + let b = b.finish_cloned(); + let mut builder = TimestampNanosecondBuilder::with_capacity(rows.len()); + for row in rows { + builder.append_value(b.value(*row)); + } + Arc::new(builder.finish()) + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::write_buffer::Field; + use arrow_util::assert_batches_eq; + use datafusion::common::Column; + use schema::{InfluxFieldType, SchemaBuilder}; + + #[test] + fn tag_row_index() { + let mut table_buffer = TableBuffer::new(PartitionKey::from("table"), &["tag".to_string()]); + let schema = SchemaBuilder::with_capacity(3) + .tag("tag") + .influx_field("value", InfluxFieldType::Integer) + .timestamp() + .build() + .unwrap(); + + let rows = vec![ + Row { + time: 1, + fields: vec![ + Field { + name: "tag".to_string(), + value: FieldData::Tag("a".to_string()), + }, + Field { + name: "value".to_string(), + value: FieldData::Integer(1), + }, + Field { + name: "time".to_string(), + value: FieldData::Timestamp(1), + }, + ], + }, + Row { + time: 2, + fields: vec![ + Field { + name: "tag".to_string(), + value: FieldData::Tag("b".to_string()), + }, + Field { + name: "value".to_string(), + value: FieldData::Integer(2), + }, + Field { + name: "time".to_string(), + value: FieldData::Timestamp(2), + }, + ], + }, + Row { + time: 3, + fields: vec![ + Field { + name: "tag".to_string(), + value: FieldData::Tag("a".to_string()), + }, + Field { + name: "value".to_string(), + value: FieldData::Integer(3), + }, + Field { + name: "time".to_string(), + value: FieldData::Timestamp(3), + }, + ], + }, + ]; + + table_buffer.add_rows(rows); + + let filter = &[Expr::BinaryExpr(BinaryExpr { + left: Box::new(Expr::Column(Column { + relation: None, + name: "tag".to_string(), + })), + op: datafusion::logical_expr::Operator::Eq, + right: Box::new(Expr::Literal(datafusion::scalar::ScalarValue::Utf8(Some( + "a".to_string(), + )))), + })]; + let a_rows = table_buffer + .index + .get_rows_from_index_for_filter(filter) + .unwrap(); + assert_eq!(a_rows, &[0, 2]); + + let a = table_buffer + .record_batch(schema.as_arrow(), filter) + .unwrap(); + let expected_a = vec![ + "+-----+-------+--------------------------------+", + "| tag | value | time |", + "+-----+-------+--------------------------------+", + "| a | 1 | 1970-01-01T00:00:00.000000001Z |", + "| a | 3 | 1970-01-01T00:00:00.000000003Z |", + "+-----+-------+--------------------------------+", + ]; + assert_batches_eq!(&expected_a, &[a]); + + let filter = &[Expr::BinaryExpr(BinaryExpr { + left: Box::new(Expr::Column(Column { + relation: None, + name: "tag".to_string(), + })), + op: datafusion::logical_expr::Operator::Eq, + right: Box::new(Expr::Literal(datafusion::scalar::ScalarValue::Utf8(Some( + "b".to_string(), + )))), + })]; + + let b_rows = table_buffer + .index + .get_rows_from_index_for_filter(filter) + .unwrap(); + assert_eq!(b_rows, &[1]); + + let b = table_buffer + .record_batch(schema.as_arrow(), filter) + .unwrap(); + let expected_b = vec![ + "+-----+-------+--------------------------------+", + "| tag | value | time |", + "+-----+-------+--------------------------------+", + "| b | 2 | 1970-01-01T00:00:00.000000002Z |", + "+-----+-------+--------------------------------+", + ]; + assert_batches_eq!(&expected_b, &[b]); + } }