diff --git a/Cargo.lock b/Cargo.lock index 1cef2df0..443f1b09 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -50,9 +50,9 @@ dependencies = [ [[package]] name = "aho-corasick" -version = "1.1.1" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea5d730647d4fadd988536d06fecce94b7b4f2a7efdae548f1cf4b63205518ab" +checksum = "b2969dcb958b36655471fc61f7e416fa76033bdd4bfed0678d8fee1e2d07a1f0" dependencies = [ "memchr", ] @@ -155,9 +155,9 @@ checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6" [[package]] name = "arbitrary" -version = "1.3.0" +version = "1.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2d098ff73c1ca148721f37baad5ea6a465a13f9573aba8641fbbbae8164a54e" +checksum = "a2e1373abdaa212b704512ec2bd8b26bd0b7d5c3f70117411a5d9a451383c859" dependencies = [ "derive_arbitrary", ] @@ -214,7 +214,7 @@ version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ - "proc-macro2 1.0.68", + "proc-macro2 1.0.69", "quote 1.0.33", "syn 2.0.38", ] @@ -225,7 +225,7 @@ version = "0.1.73" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" dependencies = [ - "proc-macro2 1.0.68", + "proc-macro2 1.0.69", "quote 1.0.33", "syn 2.0.38", ] @@ -278,7 +278,7 @@ dependencies = [ "hex", "http", "hyper", - "ring", + "ring 0.16.20", "time", "tokio", "tower", @@ -747,7 +747,7 @@ dependencies = [ "lazycell", "peeking_take_while", "prettyplease", - "proc-macro2 1.0.68", + "proc-macro2 1.0.69", "quote 1.0.33", "regex", "rustc-hash", @@ -769,7 +769,7 @@ dependencies = [ "log", "peeking_take_while", "prettyplease", - "proc-macro2 1.0.68", + "proc-macro2 1.0.69", "quote 1.0.33", "regex", "rustc-hash", @@ -896,7 +896,7 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "965ab7eb5f8f97d2a083c799f3a1b994fc397b2fe2da5d1da1626ce15a39f2b1" dependencies = [ - "proc-macro2 1.0.68", + "proc-macro2 1.0.69", "quote 1.0.33", "syn 2.0.38", ] @@ -959,7 +959,7 @@ dependencies = [ "io-lifetimes", "ipnet", "maybe-owned", - "rustix 0.35.14", + "rustix 0.35.15", "winapi-util", "windows-sys 0.36.1", "winx", @@ -985,7 +985,7 @@ dependencies = [ "io-extras", "io-lifetimes", "ipnet", - "rustix 0.35.14", + "rustix 0.35.15", ] [[package]] @@ -996,7 +996,7 @@ checksum = "c3a0524f7c4cff2ea547ae2b652bf7a348fd3e48f76556dc928d8b45ab2f1d50" dependencies = [ "cap-primitives", "once_cell", - "rustix 0.35.14", + "rustix 0.35.15", "winx", ] @@ -1080,7 +1080,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0862016ff20d69b84ef8247369fabf5c008a7417002411897d40ee1f4532b873" dependencies = [ "heck", - "proc-macro2 1.0.68", + "proc-macro2 1.0.69", "quote 1.0.33", "syn 2.0.38", ] @@ -1430,7 +1430,7 @@ version = "1.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53e0efad4403bfc52dc201159c4b842a246a14b98c64b55dfd0f2d89729dfeb8" dependencies = [ - "proc-macro2 1.0.68", + "proc-macro2 1.0.69", "quote 1.0.33", "syn 2.0.38", ] @@ -1546,11 +1546,10 @@ dependencies = [ [[package]] name = "errno" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "add4f07d43996f76ef320709726a556a9d4f965d9410d8d0271132d2f8293480" +checksum = "ac3e13f66a2f95e32a39eaa81f6b95d42878ca0e1db0c7543723dfe12557e860" dependencies = [ - "errno-dragonfly", "libc", "windows-sys 0.48.0", ] @@ -1652,7 +1651,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a267b6a9304912e018610d53fe07115d8b530b160e85db4d2d3a59f3ddde1aec" dependencies = [ "io-lifetimes", - "rustix 0.35.14", + "rustix 0.35.15", "windows-sys 0.36.1", ] @@ -1710,7 +1709,7 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ - "proc-macro2 1.0.68", + "proc-macro2 1.0.69", "quote 1.0.33", "syn 2.0.38", ] @@ -2155,9 +2154,9 @@ dependencies = [ [[package]] name = "insta" -version = "1.33.0" +version = "1.34.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1aa511b2e298cd49b1856746f6bb73e17036bcd66b25f5e92cdcdbec9bd75686" +checksum = "5d64600be34b2fcfc267740a243fa7744441bb4947a619ac4e5bb6507f35fbfc" dependencies = [ "console", "lazy_static", @@ -2210,7 +2209,7 @@ checksum = "0d508111813f9af3afd2f92758f77e4ed2cc9371b642112c6a48d22eb73105c5" dependencies = [ "hermit-abi 0.2.6", "io-lifetimes", - "rustix 0.35.14", + "rustix 0.35.15", "windows-sys 0.36.1", ] @@ -2221,7 +2220,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b" dependencies = [ "hermit-abi 0.3.3", - "rustix 0.38.17", + "rustix 0.38.19", "windows-sys 0.48.0", ] @@ -2271,9 +2270,9 @@ dependencies = [ [[package]] name = "jobserver" -version = "0.1.26" +version = "0.1.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "936cfd212a0155903bcbc060e316fb6cc7cbf2e1907329391ebadc1fe0ce77c2" +checksum = "8c37f63953c4c63420ed5fd3d6d398c719489b9f872b9fa683262f8edd363c7d" dependencies = [ "libc", ] @@ -2295,7 +2294,7 @@ checksum = "6971da4d9c3aa03c3d8f3ff0f4155b534aad021292003895a469716b2a230378" dependencies = [ "base64 0.21.4", "pem", - "ring", + "ring 0.16.20", "serde", "serde_json", "simple_asn1", @@ -2321,9 +2320,9 @@ checksum = "884e2677b40cc8c339eaefcb701c32ef1fd2493d71118dc0ca4b6a736c93bd67" [[package]] name = "libc" -version = "0.2.148" +version = "0.2.149" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cdc71e17332e86d2e1d38c1f99edcb6288ee11b815fb1a4b049eaa2114d369b" +checksum = "a08173bc88b7955d1b3145aa561539096c421ac8debde8cbc3612ec635fee29b" [[package]] name = "libloading" @@ -2337,9 +2336,9 @@ dependencies = [ [[package]] name = "libm" -version = "0.2.7" +version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7012b1bbb0719e1097c47611d3898568c546d597c2e74d66f6087edd5233ff4" +checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" [[package]] name = "libmimalloc-sys" @@ -2457,9 +2456,9 @@ checksum = "d4d2456c373231a208ad294c33dc5bff30051eafd954cd4caae83a712b12854d" [[package]] name = "linux-raw-sys" -version = "0.4.8" +version = "0.4.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3852614a3bd9ca9804678ba6be5e3b8ce76dfc902cae004e3e0c44051b6e88db" +checksum = "da2479e8c062e40bf0066ffa0bc823de0a9368974af99c9f6df941d2c231e03f" [[package]] name = "lock_api" @@ -2538,7 +2537,7 @@ version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b2cffa4ad52c6f791f4f8b15f0c05f9824b2ced1160e88cc393d64fff9a8ac64" dependencies = [ - "rustix 0.38.17", + "rustix 0.38.19", ] [[package]] @@ -2613,7 +2612,7 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ddece26afd34c31585c74a4db0630c376df271c285d682d1e55012197830b6df" dependencies = [ - "proc-macro2 1.0.68", + "proc-macro2 1.0.69", "quote 1.0.33", "syn 2.0.38", ] @@ -2736,9 +2735,9 @@ dependencies = [ [[package]] name = "num-traits" -version = "0.2.16" +version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f30b0abd723be7e2ffca1272140fac1a2f084c77ec3e123c192b66af1ee9e6c2" +checksum = "39e3200413f237f41ab11ad6d161bc7239c84dcb631773ccd7de3dfe4b5c267c" dependencies = [ "autocfg", "libm", @@ -2922,7 +2921,7 @@ version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ - "proc-macro2 1.0.68", + "proc-macro2 1.0.69", "quote 1.0.33", "syn 2.0.38", ] @@ -2963,7 +2962,7 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae005bd773ab59b4725093fd7df83fd7892f7d8eafb48dbd7de6e024e4215f9d" dependencies = [ - "proc-macro2 1.0.68", + "proc-macro2 1.0.69", "syn 2.0.38", ] @@ -2988,9 +2987,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.68" +version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b1106fec09662ec6dd98ccac0f81cef56984d0b49f75c92d8cbad76e20c005c" +checksum = "134c189feb4956b20f6f547d2cf727d4c0fe06722b20a0eec87ed445a97f92da" dependencies = [ "unicode-ident", ] @@ -3055,7 +3054,7 @@ checksum = "265baba7fabd416cf5078179f7d2cbeca4ce7a9041111900675ea7c4cb8a4c32" dependencies = [ "anyhow", "itertools 0.11.0", - "proc-macro2 1.0.68", + "proc-macro2 1.0.69", "quote 1.0.33", "syn 2.0.38", ] @@ -3124,7 +3123,7 @@ version = "1.0.33" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae" dependencies = [ - "proc-macro2 1.0.68", + "proc-macro2 1.0.69", ] [[package]] @@ -3248,14 +3247,14 @@ dependencies = [ [[package]] name = "regex" -version = "1.9.6" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ebee201405406dbf528b8b672104ae6d6d63e6d118cb10e4d51abbc7b58044ff" +checksum = "d119d7c7ca818f8a53c300863d4f87566aac09943aef5b355bb83969dae75d87" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.3.9", - "regex-syntax 0.7.5", + "regex-automata 0.4.1", + "regex-syntax 0.8.1", ] [[package]] @@ -3269,13 +3268,13 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.3.9" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59b23e92ee4318893fa3fe3e6fb365258efbfe6ac6ab30f090cdcbb7aa37efa9" +checksum = "465c6fc0621e4abc4187a2bda0937bfd4f722c2730b29562e19689ea796c9a4b" dependencies = [ "aho-corasick", "memchr", - "regex-syntax 0.7.5", + "regex-syntax 0.8.1", ] [[package]] @@ -3290,6 +3289,12 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da" +[[package]] +name = "regex-syntax" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56d84fdd47036b038fc80dd333d10b6aab10d5d31f4a366e20014def75328d33" + [[package]] name = "reqwest" version = "0.11.22" @@ -3339,12 +3344,26 @@ dependencies = [ "cc", "libc", "once_cell", - "spin", - "untrusted", + "spin 0.5.2", + "untrusted 0.7.1", "web-sys", "winapi", ] +[[package]] +name = "ring" +version = "0.17.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9babe80d5c16becf6594aa32ad2be8fe08498e7ae60b77de8df700e67f191d7e" +dependencies = [ + "cc", + "getrandom", + "libc", + "spin 0.9.8", + "untrusted 0.9.0", + "windows-sys 0.48.0", +] + [[package]] name = "rusqlite" version = "0.29.0" @@ -3381,9 +3400,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.35.14" +version = "0.35.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6380889b07a03b5ecf1d44dc9ede6fd2145d84b502a2a9ca0b03c48e0cc3220f" +checksum = "413c4d41e2f1b0814c63567d11618483de0bd64f451b452f2ca43896579486ba" dependencies = [ "bitflags 1.3.2", "errno 0.2.8", @@ -3397,14 +3416,14 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.17" +version = "0.38.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f25469e9ae0f3d0047ca8b93fc56843f38e6774f0914a107ff8b41be8be8e0b7" +checksum = "745ecfa778e66b2b63c88a61cb36e0eea109e803b0b86bf9879fbc77c70e86ed" dependencies = [ "bitflags 2.4.0", - "errno 0.3.4", + "errno 0.3.5", "libc", - "linux-raw-sys 0.4.8", + "linux-raw-sys 0.4.10", "windows-sys 0.48.0", ] @@ -3415,7 +3434,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b80e3dec595989ea8510028f30c408a4630db12c9cbb8de34203b89d6577e99" dependencies = [ "log", - "ring", + "ring 0.16.20", "sct", "webpki", ] @@ -3427,7 +3446,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd8d6c9f025a446bc4d18ad9632e69aec8f287aa84499ee335599fabd20c3fd8" dependencies = [ "log", - "ring", + "ring 0.16.20", "rustls-webpki 0.101.6", "sct", ] @@ -3459,8 +3478,8 @@ version = "0.100.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f6a5fc258f1c1276dfe3016516945546e2d5383911efc0fc4f1cdc5df3a4ae3" dependencies = [ - "ring", - "untrusted", + "ring 0.16.20", + "untrusted 0.7.1", ] [[package]] @@ -3469,8 +3488,8 @@ version = "0.101.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c7d5dece342910d9ba34d259310cae3e0154b873b35408b787b59bce53d34fe" dependencies = [ - "ring", - "untrusted", + "ring 0.16.20", + "untrusted 0.7.1", ] [[package]] @@ -3524,8 +3543,8 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" dependencies = [ - "ring", - "untrusted", + "ring 0.16.20", + "untrusted 0.7.1", ] [[package]] @@ -3553,26 +3572,26 @@ dependencies = [ [[package]] name = "semver" -version = "1.0.19" +version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad977052201c6de01a8ef2aa3378c4bd23217a056337d1d6da40468d267a4fb0" +checksum = "836fa6a3e1e547f9a2c4040802ec865b5d85f4014efe00555d7090a3dcaa1090" [[package]] name = "serde" -version = "1.0.188" +version = "1.0.189" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf9e0fcba69a370eed61bcf2b728575f726b50b55cba78064753d708ddc7549e" +checksum = "8e422a44e74ad4001bdc8eede9a4570ab52f71190e9c076d14369f38b9200537" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.188" +version = "1.0.189" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4eca7ac642d82aa35b60049a6eccb4be6be75e599bd2e9adb5f875a737654af2" +checksum = "1e48d1f918009ce3145511378cf68d613e3b3d9137d67272562080d68a2b32d5" dependencies = [ - "proc-macro2 1.0.68", + "proc-macro2 1.0.69", "quote 1.0.33", "syn 2.0.38", ] @@ -3681,9 +3700,9 @@ dependencies = [ [[package]] name = "similar" -version = "2.2.1" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "420acb44afdae038210c99e69aae24109f32f15500aa708e81d46c9f29d55fcf" +checksum = "2aeaf503862c419d66959f5d7ca015337d864e9c49485d771b732e2a20453597" [[package]] name = "simple_asn1" @@ -3756,6 +3775,12 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" + [[package]] name = "sqld" version = "0.21.9" @@ -3794,7 +3819,6 @@ dependencies = [ "jsonwebtoken", "libsql", "libsql-client", - "memmap", "metrics", "metrics-exporter-prometheus", "mimalloc", @@ -3903,7 +3927,7 @@ version = "1.0.109" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" dependencies = [ - "proc-macro2 1.0.68", + "proc-macro2 1.0.69", "quote 1.0.33", "unicode-ident", ] @@ -3914,7 +3938,7 @@ version = "2.0.38" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e96b79aaa137db8f61e26363a0c9b47d8b4ec75da28b7d1d614c2303e232408b" dependencies = [ - "proc-macro2 1.0.68", + "proc-macro2 1.0.69", "quote 1.0.33", "unicode-ident", ] @@ -3957,7 +3981,7 @@ dependencies = [ "cap-fs-ext", "cap-std", "io-lifetimes", - "rustix 0.35.14", + "rustix 0.35.15", "windows-sys 0.36.1", "winx", ] @@ -3977,7 +4001,7 @@ dependencies = [ "cfg-if", "fastrand 2.0.1", "redox_syscall 0.3.5", - "rustix 0.38.17", + "rustix 0.38.19", "windows-sys 0.48.0", ] @@ -4005,7 +4029,7 @@ version = "1.0.49" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "10712f02019e9288794769fba95cd6847df9874d49d871d062172f9dd41bc4cc" dependencies = [ - "proc-macro2 1.0.68", + "proc-macro2 1.0.69", "quote 1.0.33", "syn 2.0.38", ] @@ -4067,9 +4091,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.32.0" +version = "1.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9" +checksum = "4f38200e3ef7995e5ef13baec2f432a6da0aa9ac495b2c0e8f3b7eec2c92d653" dependencies = [ "backtrace", "bytes", @@ -4101,7 +4125,7 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ - "proc-macro2 1.0.68", + "proc-macro2 1.0.69", "quote 1.0.33", "syn 2.0.38", ] @@ -4225,7 +4249,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d021fc044c18582b9a2408cd0dd05b1596e3ecdb5c4df822bb0183545683889" dependencies = [ "prettyplease", - "proc-macro2 1.0.68", + "proc-macro2 1.0.69", "prost-build", "quote 1.0.33", "syn 2.0.38", @@ -4344,7 +4368,7 @@ version = "0.1.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" dependencies = [ - "proc-macro2 1.0.68", + "proc-macro2 1.0.69", "quote 1.0.33", "syn 2.0.38", ] @@ -4503,6 +4527,12 @@ version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" +[[package]] +name = "untrusted" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" + [[package]] name = "url" version = "2.4.1" @@ -4619,7 +4649,7 @@ dependencies = [ "io-lifetimes", "is-terminal 0.3.0", "once_cell", - "rustix 0.35.14", + "rustix 0.35.15", "system-interface", "tracing", "wasi-common", @@ -4637,7 +4667,7 @@ dependencies = [ "cap-rand", "cap-std", "io-extras", - "rustix 0.35.14", + "rustix 0.35.15", "thiserror", "tracing", "wasmtime", @@ -4664,7 +4694,7 @@ dependencies = [ "bumpalo", "log", "once_cell", - "proc-macro2 1.0.68", + "proc-macro2 1.0.69", "quote 1.0.33", "syn 2.0.38", "wasm-bindgen-shared", @@ -4698,7 +4728,7 @@ version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" dependencies = [ - "proc-macro2 1.0.68", + "proc-macro2 1.0.69", "quote 1.0.33", "syn 2.0.38", "wasm-bindgen-backend", @@ -4713,9 +4743,9 @@ checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1" [[package]] name = "wasm-encoder" -version = "0.33.2" +version = "0.34.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34180c89672b3e4825c3a8db4b61a674f1447afd5fe2445b2d22c3d8b6ea086c" +checksum = "f14a94e06a3e2ed1af4e80cac712fed883142019ebe33c3899fd1b5e8550df9d" dependencies = [ "leb128", ] @@ -4781,7 +4811,7 @@ dependencies = [ "directories-next", "file-per-thread-logger", "log", - "rustix 0.35.14", + "rustix 0.35.15", "serde", "sha2", "toml", @@ -4837,7 +4867,7 @@ checksum = "e5f54abc960b4a055ba16b942cbbd1da641e0ad44cc97a7608f3d43c069b120e" dependencies = [ "cc", "cfg-if", - "rustix 0.35.14", + "rustix 0.35.15", "wasmtime-asm-macros", "windows-sys 0.36.1", ] @@ -4876,7 +4906,7 @@ checksum = "fe057012a0ba6cee3685af1e923d6e0a6cb9baf15fb3ffa4be3d7f712c7dec42" dependencies = [ "object 0.29.0", "once_cell", - "rustix 0.35.14", + "rustix 0.35.15", ] [[package]] @@ -4907,7 +4937,7 @@ dependencies = [ "memoffset 0.6.5", "paste", "rand", - "rustix 0.35.14", + "rustix 0.35.15", "thiserror", "wasmtime-asm-macros", "wasmtime-environ", @@ -4952,9 +4982,9 @@ dependencies = [ [[package]] name = "wast" -version = "66.0.0" +version = "66.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0da7529bb848d58ab8bf32230fc065b363baee2bd338d5e58c589a1e7d83ad07" +checksum = "49d1457e95d4b8e1f72bd50f5ed804931f94cf1b5449697255aef466e46fa4b0" dependencies = [ "leb128", "memchr", @@ -4964,11 +4994,11 @@ dependencies = [ [[package]] name = "wat" -version = "1.0.75" +version = "1.0.76" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4780374047c65b6b6e86019093fe80c18b66825eb684df778a4e068282a780e7" +checksum = "964639e3c731f12b7bf6be78f0b2c3e646321acab18e7cb9f18e44c6720bb4fa" dependencies = [ - "wast 66.0.0", + "wast 66.0.1", ] [[package]] @@ -4983,12 +5013,12 @@ dependencies = [ [[package]] name = "webpki" -version = "0.22.2" +version = "0.22.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07ecc0cd7cac091bf682ec5efa18b1cff79d617b84181f38b3951dbe135f607f" +checksum = "ed63aea5ce73d0ff405984102c42de94fc55a6b75765d621c65262469b3c9b53" dependencies = [ - "ring", - "untrusted", + "ring 0.17.3", + "untrusted 0.9.0", ] [[package]] @@ -5015,7 +5045,7 @@ dependencies = [ "either", "home", "once_cell", - "rustix 0.38.17", + "rustix 0.38.19", ] [[package]] @@ -5041,7 +5071,7 @@ checksum = "ba5796f53b429df7d44cfdaae8f6d9cd981d82aec3516561352ca9c5e73ee185" dependencies = [ "anyhow", "heck", - "proc-macro2 1.0.68", + "proc-macro2 1.0.69", "quote 1.0.33", "shellexpand", "syn 1.0.109", @@ -5054,7 +5084,7 @@ version = "3.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b830eb7203d48942fb8bc8bb105f76e7d09c33a082d638e990e02143bb2facd" dependencies = [ - "proc-macro2 1.0.68", + "proc-macro2 1.0.69", "quote 1.0.33", "syn 1.0.109", "wiggle-generate", @@ -5365,11 +5395,10 @@ dependencies = [ [[package]] name = "zstd-sys" -version = "2.0.8+zstd.1.5.5" +version = "2.0.9+zstd.1.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5556e6ee25d32df2586c098bbfa278803692a20d0ab9565e049480d52707ec8c" +checksum = "9e16efa8a874a0481a574084d34cc26fdb3b99627480f785888deb6386506656" dependencies = [ "cc", - "libc", "pkg-config", ] diff --git a/sqld/Cargo.toml b/sqld/Cargo.toml index e75fef3c..806062d2 100644 --- a/sqld/Cargo.toml +++ b/sqld/Cargo.toml @@ -30,7 +30,6 @@ hyper = { version = "0.14.23", features = ["http2"] } hyper-tungstenite = "0.10" itertools = "0.10.5" jsonwebtoken = "8.2.0" -memmap = "0.7.0" mimalloc = { version = "0.1.36", default-features = false } nix = { version = "0.26.2", features = ["fs"] } once_cell = "1.17.0" diff --git a/sqld/assets/test/simple_wallog b/sqld/assets/test/simple_wallog new file mode 100644 index 00000000..42e5b3a9 Binary files /dev/null and b/sqld/assets/test/simple_wallog differ diff --git a/sqld/build.rs b/sqld/build.rs index e719001c..b4519612 100644 --- a/sqld/build.rs +++ b/sqld/build.rs @@ -7,7 +7,7 @@ fn main() -> Result<(), Box> { std::env::set_var("PROTOC", protobuf_src::protoc()); let mut config = Config::new(); - config.bytes([".wal_log", ".proxy.ProgramReq.namespace"]); + config.bytes([".wal_log"]); tonic_build::configure() .protoc_arg("--experimental_allow_proto3_optional") .type_attribute(".proxy", "#[cfg_attr(test, derive(arbitrary::Arbitrary))]") diff --git a/sqld/proto/replication_log.proto b/sqld/proto/replication_log.proto index b7f2ef89..8ac5db45 100644 --- a/sqld/proto/replication_log.proto +++ b/sqld/proto/replication_log.proto @@ -8,12 +8,7 @@ message LogOffset { message HelloRequest {} message HelloResponse { - /// Uuid of the current generation - string generation_id = 1; - /// First frame_no in the current generation - uint64 generation_start_index = 2; - /// Uuid of the database being replicated - string database_id = 3; + string log_id = 3; } message Frame { diff --git a/sqld/src/error.rs b/sqld/src/error.rs index 66ca58dc..5343d8db 100644 --- a/sqld/src/error.rs +++ b/sqld/src/error.rs @@ -79,6 +79,8 @@ pub enum Error { ConflictingRestoreParameters, #[error("failed to fork database: {0}")] Fork(#[from] ForkError), + #[error("fatal replication error")] + FatalReplicationError, } trait ResponseError: std::error::Error { @@ -129,6 +131,7 @@ impl IntoResponse for Error { LoadDumpExistingDb => self.format_err(StatusCode::BAD_REQUEST), ConflictingRestoreParameters => self.format_err(StatusCode::BAD_REQUEST), Fork(e) => e.into_response(), + FatalReplicationError => self.format_err(StatusCode::INTERNAL_SERVER_ERROR), } } } diff --git a/sqld/src/namespace/fork.rs b/sqld/src/namespace/fork.rs index bdcb577a..a12a4fc2 100644 --- a/sqld/src/namespace/fork.rs +++ b/sqld/src/namespace/fork.rs @@ -10,7 +10,7 @@ use tokio::time::Duration; use tokio_stream::StreamExt; use crate::database::PrimaryDatabase; -use crate::replication::frame::Frame; +use crate::replication::frame::FrameBorrowed; use crate::replication::primary::frame_stream::FrameStream; use crate::replication::{LogReadError, ReplicationLogger}; use crate::{BLOCKING_RT, LIBSQL_PAGE_SIZE}; @@ -41,7 +41,7 @@ impl From for ForkError { } } -async fn write_frame(frame: Frame, temp_file: &mut tokio::fs::File) -> Result<()> { +async fn write_frame(frame: &FrameBorrowed, temp_file: &mut tokio::fs::File) -> Result<()> { let page_no = frame.header().page_no; let page_pos = (page_no - 1) as usize * LIBSQL_PAGE_SIZE as usize; temp_file.seek(SeekFrom::Start(page_pos as u64)).await?; @@ -128,7 +128,7 @@ impl ForkTask<'_> { match res { Ok(frame) => { next_frame_no = next_frame_no.max(frame.header().frame_no + 1); - write_frame(frame, &mut data_file).await?; + write_frame(&frame, &mut data_file).await?; } Err(LogReadError::SnapshotRequired) => { let snapshot = loop { @@ -147,7 +147,7 @@ impl ForkTask<'_> { for frame in iter { let frame = frame.map_err(ForkError::LogRead)?; next_frame_no = next_frame_no.max(frame.header().frame_no + 1); - write_frame(frame, &mut data_file).await?; + write_frame(&frame, &mut data_file).await?; } } Err(LogReadError::Ahead) => { diff --git a/sqld/src/namespace/mod.rs b/sqld/src/namespace/mod.rs index 2ee10b3d..4d39d7ba 100644 --- a/sqld/src/namespace/mod.rs +++ b/sqld/src/namespace/mod.rs @@ -561,30 +561,27 @@ impl Namespace { DatabaseConfigStore::load(&db_path).context("Could not load database config")?, ); - let mut join_set = JoinSet::new(); let replicator = Replicator::new( db_path.clone(), config.channel.clone(), config.uri.clone(), name.clone(), - &mut join_set, reset, ) .await?; - - let applied_frame_no_receiver = replicator.current_frame_no_notifier.clone(); + let applied_frame_no_receiver = replicator.current_frame_no_notifier.subscribe(); + let mut join_set = JoinSet::new(); + join_set.spawn(replicator.run()); let stats = make_stats( &db_path, &mut join_set, config.stats_sender.clone(), name.clone(), - replicator.current_frame_no_notifier.clone(), + applied_frame_no_receiver.clone(), ) .await?; - join_set.spawn(replicator.run()); - let connection_maker = MakeWriteProxyConn::new( db_path.clone(), config.extensions.clone(), diff --git a/sqld/src/replication/frame.rs b/sqld/src/replication/frame.rs index ae186fc4..a32c7e50 100644 --- a/sqld/src/replication/frame.rs +++ b/sqld/src/replication/frame.rs @@ -1,10 +1,10 @@ -use std::borrow::Cow; +use std::alloc::Layout; use std::fmt; -use std::mem::{size_of, transmute}; -use std::ops::Deref; +use std::mem::size_of; +use std::ops::{Deref, DerefMut}; -use bytemuck::{bytes_of, pod_read_unaligned, try_from_bytes, Pod, Zeroable}; -use bytes::{Bytes, BytesMut}; +use bytemuck::{bytes_of, from_bytes, Pod, Zeroable}; +use bytes::Bytes; use crate::LIBSQL_PAGE_SIZE; @@ -28,10 +28,18 @@ pub struct FrameHeader { } #[derive(Clone, serde::Serialize, serde::Deserialize)] -/// The owned version of a replication frame. +/// The shared version of a replication frame. /// Cloning this is cheap. pub struct Frame { - data: Bytes, + inner: Bytes, +} + +impl TryFrom<&[u8]> for Frame { + type Error = anyhow::Error; + + fn try_from(data: &[u8]) -> Result { + Ok(FrameMut::try_from(data)?.into()) + } } impl fmt::Debug for Frame { @@ -43,57 +51,108 @@ impl fmt::Debug for Frame { } } -impl Frame { - /// size of a single frame - pub const SIZE: usize = size_of::() + LIBSQL_PAGE_SIZE as usize; +/// Owned version of a frame, on the heap +pub struct FrameMut { + inner: Box, +} - pub fn from_parts(header: &FrameHeader, data: &[u8]) -> Self { - assert_eq!(data.len(), LIBSQL_PAGE_SIZE as usize); - let mut buf = BytesMut::with_capacity(Self::SIZE); - buf.extend_from_slice(bytes_of(header)); - buf.extend_from_slice(data); +impl TryFrom<&[u8]> for FrameMut { + type Error = anyhow::Error; + + fn try_from(data: &[u8]) -> Result { + anyhow::ensure!( + data.len() == size_of::(), + "invalid frame size" + ); + // frames are relatively large (~4ko), we want to avoid allocating them on the stack and + // then copying them to the heap, and instead copy them to the heap directly. + let inner = unsafe { + let layout = Layout::new::(); + let ptr = std::alloc::alloc(layout); + ptr.copy_from(data.as_ptr(), data.len()); + Box::from_raw(ptr as *mut FrameBorrowed) + }; + + Ok(Self { inner }) + } +} + +impl From for Frame { + fn from(value: FrameMut) -> Self { + // transmute the FrameBorrowed into a Box<[u8; _]>. This is safe because the alignment of + // [u8] divides the alignement of FrameBorrowed + let data = unsafe { + Vec::from_raw_parts( + Box::into_raw(value.inner) as *mut u8, + size_of::(), + size_of::(), + ) + }; + + Self { + inner: Bytes::from(data), + } + } +} - Self { data: buf.freeze() } +impl From for FrameMut { + fn from(inner: FrameBorrowed) -> Self { + Self { + inner: Box::new(inner), + } } +} - pub fn try_from_bytes(data: Bytes) -> anyhow::Result { - anyhow::ensure!(data.len() == Self::SIZE, "invalid frame size"); - Ok(Self { data }) +impl Frame { + pub fn from_parts(header: &FrameHeader, data: &[u8]) -> Self { + FrameBorrowed::from_parts(header, data).into() } pub fn bytes(&self) -> Bytes { - self.data.clone() + self.inner.clone() + } +} + +impl From for Frame { + fn from(value: FrameBorrowed) -> Self { + FrameMut::from(value).into() } } /// The borrowed version of Frame -#[repr(transparent)] +#[repr(C)] +#[derive(Pod, Zeroable, Copy, Clone)] pub struct FrameBorrowed { - data: [u8], + header: FrameHeader, + page: [u8; LIBSQL_PAGE_SIZE as usize], } impl FrameBorrowed { - pub fn header(&self) -> Cow { - let data = &self.data[..size_of::()]; - try_from_bytes(data) - .map(Cow::Borrowed) - .unwrap_or_else(|_| Cow::Owned(pod_read_unaligned(data))) - } - /// Returns the bytes for this frame. Includes the header bytes. pub fn as_slice(&self) -> &[u8] { - &self.data - } - - pub fn from_bytes(data: &[u8]) -> &Self { - assert_eq!(data.len(), Frame::SIZE); - // SAFETY: &FrameBorrowed is equivalent to &[u8] - unsafe { transmute(data) } + bytes_of(self) } /// returns this frame's page data. pub fn page(&self) -> &[u8] { - &self.data[size_of::()..] + &self.page + } + + pub fn header(&self) -> &FrameHeader { + &self.header + } + + pub fn header_mut(&mut self) -> &mut FrameHeader { + &mut self.header + } + + pub fn from_parts(header: &FrameHeader, page: &[u8]) -> Self { + assert_eq!(page.len(), LIBSQL_PAGE_SIZE as usize); + + FrameBorrowed { + header: *header, + page: page.try_into().unwrap(), + } } } @@ -101,6 +160,20 @@ impl Deref for Frame { type Target = FrameBorrowed; fn deref(&self) -> &Self::Target { - FrameBorrowed::from_bytes(&self.data) + from_bytes(&self.inner) + } +} + +impl Deref for FrameMut { + type Target = FrameBorrowed; + + fn deref(&self) -> &Self::Target { + self.inner.as_ref() + } +} + +impl DerefMut for FrameMut { + fn deref_mut(&mut self) -> &mut Self::Target { + self.inner.as_mut() } } diff --git a/sqld/src/replication/primary/logger.rs b/sqld/src/replication/primary/logger.rs index 172a7f0f..9ac83e97 100644 --- a/sqld/src/replication/primary/logger.rs +++ b/sqld/src/replication/primary/logger.rs @@ -23,7 +23,7 @@ use crate::libsql_bindings::ffi::{ PageHdrIter, PgHdr, Wal, SQLITE_CHECKPOINT_TRUNCATE, SQLITE_IOERR, SQLITE_OK, }; use crate::libsql_bindings::wal_hook::WalHook; -use crate::replication::frame::{Frame, FrameHeader}; +use crate::replication::frame::{Frame, FrameHeader, FrameMut}; use crate::replication::snapshot::{find_snapshot_file, LogCompactor, SnapshotFile}; use crate::replication::{FrameNo, SnapshotCallback, CRC_64_GO_ISO, WAL_MAGIC}; use crate::LIBSQL_PAGE_SIZE; @@ -388,14 +388,14 @@ impl LogFile { let file_end = file.metadata()?.len(); let header = if file_end == 0 { - let db_id = Uuid::new_v4(); + let log_id = Uuid::new_v4(); LogFileHeader { version: 2, start_frame_no: 0, magic: WAL_MAGIC, page_size: LIBSQL_PAGE_SIZE as i32, start_checksum: 0, - db_id: db_id.as_u128(), + log_id: log_id.as_u128(), frame_count: 0, sqld_version: Version::current().0, } @@ -467,8 +467,9 @@ impl LogFile { } /// Returns an iterator over the WAL frame headers - #[allow(dead_code)] - fn frames_iter(&self) -> anyhow::Result> + '_> { + pub(crate) fn frames_iter( + &self, + ) -> anyhow::Result> + '_> { let mut current_frame_offset = 0; Ok(std::iter::from_fn(move || { if current_frame_offset >= self.header.frame_count { @@ -476,14 +477,17 @@ impl LogFile { } let read_byte_offset = Self::absolute_byte_offset(current_frame_offset); current_frame_offset += 1; - Some(self.read_frame_byte_offset(read_byte_offset)) + Some( + self.read_frame_byte_offset_mut(read_byte_offset) + .map(|f| f.into()), + ) })) } /// Returns an iterator over the WAL frame headers - pub fn rev_frames_iter( + pub fn rev_frames_iter_mut( &self, - ) -> anyhow::Result> + '_> { + ) -> anyhow::Result> + '_> { let mut current_frame_offset = self.header.frame_count; Ok(std::iter::from_fn(move || { @@ -492,7 +496,7 @@ impl LogFile { } current_frame_offset -= 1; let read_byte_offset = Self::absolute_byte_offset(current_frame_offset); - let frame = self.read_frame_byte_offset(read_byte_offset); + let frame = self.read_frame_byte_offset_mut(read_byte_offset); Some(frame) })) } @@ -564,9 +568,9 @@ impl LogFile { return Err(LogReadError::Ahead); } - let frame = self.read_frame_byte_offset(self.byte_offset(frame_no)?.unwrap())?; + let frame = self.read_frame_byte_offset_mut(self.byte_offset(frame_no)?.unwrap())?; - Ok(frame) + Ok(frame.into()) } fn should_compact(&self) -> bool { @@ -630,12 +634,11 @@ impl LogFile { Ok(()) } - fn read_frame_byte_offset(&self, offset: u64) -> anyhow::Result { + fn read_frame_byte_offset_mut(&self, offset: u64) -> anyhow::Result { let mut buffer = BytesMut::zeroed(LogFile::FRAME_SIZE); self.file.read_exact_at(&mut buffer, offset)?; - let buffer = buffer.freeze(); - Frame::try_from_bytes(buffer) + FrameMut::try_from(&*buffer) } fn last_commited_frame_no(&self) -> Option { @@ -704,8 +707,8 @@ pub struct LogFileHeader { /// Initial checksum value for the rolling CRC checksum /// computed with the 64 bits CRC_64_GO_ISO pub start_checksum: u64, - /// Uuid of the database associated with this log. - pub db_id: u128, + /// Uuid of the this log. + pub log_id: u128, /// Frame_no of the first frame in the log pub start_frame_no: FrameNo, /// entry count in file @@ -833,7 +836,11 @@ impl ReplicationLogger { Ok(Self { generation: Generation::new(generation_start_frame_no.unwrap_or(0)), - compactor: LogCompactor::new(&db_path, log_file.header.db_id, callback)?, + compactor: LogCompactor::new( + &db_path, + Uuid::from_u128(log_file.header.log_id), + callback, + )?, log_file: RwLock::new(log_file), db_path, closed_signal, @@ -882,8 +889,8 @@ impl ReplicationLogger { Self::from_log_file(data_path, log_file, callback, auto_checkpoint) } - pub fn database_id(&self) -> anyhow::Result { - Ok(Uuid::from_u128((self.log_file.read()).header().db_id)) + pub fn log_id(&self) -> Uuid { + Uuid::from_u128((self.log_file.read()).header().log_id) } /// Write pages to the log, without updating the file header. @@ -937,7 +944,7 @@ impl ReplicationLogger { } let last_frame = { - let mut frames_iter = log_file.rev_frames_iter()?; + let mut frames_iter = log_file.rev_frames_iter_mut()?; let Some(last_frame_res) = frames_iter.next() else { // the log file is empty, nothing to compact return Ok(false); diff --git a/sqld/src/replication/replica/error.rs b/sqld/src/replication/replica/error.rs index dbcf7644..fc5ce521 100644 --- a/sqld/src/replication/replica/error.rs +++ b/sqld/src/replication/replica/error.rs @@ -1,9 +1,7 @@ #[derive(Debug, thiserror::Error)] pub enum ReplicationError { - #[error("Replica is ahead of primary")] - Lagging, - #[error("Trying to replicate incompatible databases")] - DbIncompatible, + #[error("Primary has incompatible log")] + LogIncompatible, #[error("{0}")] Other(#[from] anyhow::Error), } diff --git a/sqld/src/replication/replica/injector.rs b/sqld/src/replication/replica/injector.rs deleted file mode 100644 index 28bdd333..00000000 --- a/sqld/src/replication/replica/injector.rs +++ /dev/null @@ -1,51 +0,0 @@ -use std::path::Path; - -use crate::DEFAULT_AUTO_CHECKPOINT; -use rusqlite::OpenFlags; - -use crate::replication::replica::hook::{SQLITE_CONTINUE_REPLICATION, SQLITE_EXIT_REPLICATION}; - -use super::hook::{InjectorHook, InjectorHookCtx, INJECTOR_METHODS}; - -pub struct FrameInjector { - conn: sqld_libsql_bindings::Connection, -} - -impl FrameInjector { - pub fn new(db_path: &Path, hook_ctx: InjectorHookCtx) -> anyhow::Result { - let conn = sqld_libsql_bindings::Connection::open( - db_path, - OpenFlags::SQLITE_OPEN_READ_WRITE - | OpenFlags::SQLITE_OPEN_CREATE - | OpenFlags::SQLITE_OPEN_URI - | OpenFlags::SQLITE_OPEN_NO_MUTEX, - &INJECTOR_METHODS, - hook_ctx, - // It's ok to leave auto-checkpoint to default, since replicas don't use bottomless. - DEFAULT_AUTO_CHECKPOINT, - )?; - - Ok(Self { conn }) - } - - pub fn step(&mut self) -> anyhow::Result { - self.conn.pragma_update(None, "writable_schema", "on")?; - let res = self.conn.execute("create table __dummy__ (dummy);", ()); - - match res { - Ok(_) => panic!("replication hook was not called"), - Err(e) => { - if let Some(e) = e.sqlite_error() { - if e.extended_code == SQLITE_EXIT_REPLICATION { - self.conn.pragma_update(None, "writable_schema", "reset")?; - return Ok(false); - } - if e.extended_code == SQLITE_CONTINUE_REPLICATION { - return Ok(true); - } - } - anyhow::bail!(e); - } - } - } -} diff --git a/sqld/src/replication/replica/injector/headers.rs b/sqld/src/replication/replica/injector/headers.rs new file mode 100644 index 00000000..0973d65b --- /dev/null +++ b/sqld/src/replication/replica/injector/headers.rs @@ -0,0 +1,47 @@ +use std::marker::PhantomData; + +use rusqlite::ffi::PgHdr; + +pub struct Headers<'a> { + ptr: *mut PgHdr, + _pth: PhantomData<&'a ()>, +} + +impl<'a> Headers<'a> { + // safety: ptr is guaranteed to be valid for 'a + pub(crate) unsafe fn new(ptr: *mut PgHdr) -> Self { + Self { + ptr, + _pth: PhantomData, + } + } + + pub(crate) fn as_ptr(&mut self) -> *mut PgHdr { + self.ptr + } + + pub(crate) fn all_applied(&self) -> bool { + let mut current = self.ptr; + while !current.is_null() { + unsafe { + // WAL appended + if (*current).flags & 0x040 == 0 { + return false; + } + current = (*current).pDirty; + } + } + + true + } +} + +impl Drop for Headers<'_> { + fn drop(&mut self) { + let mut current = self.ptr; + while !current.is_null() { + let h: Box = unsafe { Box::from_raw(current as _) }; + current = h.pDirty; + } + } +} diff --git a/sqld/src/replication/replica/injector/hook.rs b/sqld/src/replication/replica/injector/hook.rs new file mode 100644 index 00000000..273cb5bd --- /dev/null +++ b/sqld/src/replication/replica/injector/hook.rs @@ -0,0 +1,162 @@ +use std::ffi::{c_int, CStr}; + +use rusqlite::ffi::{libsql_wal as Wal, PgHdr}; +use sqld_libsql_bindings::ffi::types::XWalFrameFn; +use sqld_libsql_bindings::init_static_wal_method; +use sqld_libsql_bindings::wal_hook::WalHook; + +use crate::replication::frame::FrameBorrowed; +use crate::LIBSQL_PAGE_SIZE; + +use super::headers::Headers; +use super::FrameBuffer; + +// Those are custom error codes returned by the replicator hook. +pub const LIBSQL_INJECT_FATAL: c_int = 200; +/// Injection succeeded, left on a open txn state +pub const LIBSQL_INJECT_OK_TXN: c_int = 201; +/// Injection succeeded +pub const LIBSQL_INJECT_OK: c_int = 202; + +pub struct InjectorHookCtx { + /// shared frame buffer + buffer: FrameBuffer, + /// currently in a txn + is_txn: bool, +} + +impl InjectorHookCtx { + pub fn new(buffer: FrameBuffer) -> Self { + Self { + buffer, + is_txn: false, + } + } + + fn inject_pages( + &mut self, + sync_flags: i32, + orig: XWalFrameFn, + wal: *mut Wal, + ) -> anyhow::Result<()> { + self.is_txn = true; + let buffer = self.buffer.lock(); + let (mut headers, size_after) = make_page_header(buffer.iter().map(|f| &**f)); + + let ret = unsafe { + orig( + wal, + LIBSQL_PAGE_SIZE as _, + headers.as_ptr(), + size_after, + (size_after != 0) as _, + sync_flags, + ) + }; + + if ret == 0 { + debug_assert!(headers.all_applied()); + if size_after != 0 { + self.is_txn = false; + } + tracing::trace!("applied frame batch"); + + Ok(()) + } else { + anyhow::bail!("failed to apply pages"); + } + } +} + +/// Turn a list of `WalFrame` into a list of PgHdr. +/// The caller has the responsibility to free the returned headers. +/// return (headers, last_frame_no, size_after) +fn make_page_header<'a>(frames: impl Iterator) -> (Headers<'a>, u32) { + let mut first_pg: *mut PgHdr = std::ptr::null_mut(); + let mut current_pg; + let mut size_after = 0; + + let mut headers_count = 0; + let mut prev_pg: *mut PgHdr = std::ptr::null_mut(); + let mut frames = frames.peekable(); + while let Some(frame) = frames.next() { + // the last frame in a batch marks the end of the txn + if frames.peek().is_none() { + size_after = frame.header().size_after; + } + + let page = PgHdr { + pPage: std::ptr::null_mut(), + pData: frame.page().as_ptr() as _, + pExtra: std::ptr::null_mut(), + pCache: std::ptr::null_mut(), + pDirty: std::ptr::null_mut(), + pPager: std::ptr::null_mut(), + pgno: frame.header().page_no, + pageHash: 0, + flags: 0x02, // PGHDR_DIRTY - it works without the flag, but why risk it + nRef: 0, + pDirtyNext: std::ptr::null_mut(), + pDirtyPrev: std::ptr::null_mut(), + }; + headers_count += 1; + current_pg = Box::into_raw(Box::new(page)); + if first_pg.is_null() { + first_pg = current_pg; + } + if !prev_pg.is_null() { + unsafe { + (*prev_pg).pDirty = current_pg; + } + } + prev_pg = current_pg; + } + + tracing::trace!("built {headers_count} page headers"); + + let headers = unsafe { Headers::new(first_pg) }; + (headers, size_after) +} + +init_static_wal_method!(INJECTOR_METHODS, InjectorHook); + +/// The injector hook hijacks a call to xframes, and replace the content of the call with it's own +/// frames. +/// The Caller must first call `set_frames`, passing the frames to be injected, then trigger a call +/// to xFrames from the libsql connection (see dummy write in `injector`), and can then collect the +/// result on the injection with `take_result` +pub enum InjectorHook {} + +unsafe impl WalHook for InjectorHook { + type Context = InjectorHookCtx; + + fn on_frames( + wal: &mut Wal, + _page_size: c_int, + _page_headers: *mut PgHdr, + _size_after: u32, + _is_commit: c_int, + sync_flags: c_int, + orig: XWalFrameFn, + ) -> c_int { + let wal_ptr = wal as *mut _; + let ctx = Self::wal_extract_ctx(wal); + let ret = ctx.inject_pages(sync_flags, orig, wal_ptr); + if let Err(e) = ret { + tracing::error!("fatal replication error: {e}"); + return LIBSQL_INJECT_FATAL; + } + + ctx.buffer.lock().clear(); + + if !ctx.is_txn { + LIBSQL_INJECT_OK + } else { + LIBSQL_INJECT_OK_TXN + } + } + + fn name() -> &'static CStr { + CStr::from_bytes_with_nul(b"frame_injector_hook\0").unwrap() + } +} diff --git a/sqld/src/replication/replica/injector/mod.rs b/sqld/src/replication/replica/injector/mod.rs new file mode 100644 index 00000000..0360214c --- /dev/null +++ b/sqld/src/replication/replica/injector/mod.rs @@ -0,0 +1,251 @@ +use std::collections::VecDeque; +use std::path::Path; +use std::sync::Arc; + +use parking_lot::Mutex; +use rusqlite::OpenFlags; +use sqld_libsql_bindings::wal_hook::TRANSPARENT_METHODS; + +use crate::error::Error; +use crate::replication::frame::Frame; +use crate::{replication::FrameNo, DEFAULT_AUTO_CHECKPOINT}; + +use hook::{ + InjectorHookCtx, INJECTOR_METHODS, LIBSQL_INJECT_FATAL, LIBSQL_INJECT_OK, LIBSQL_INJECT_OK_TXN, +}; + +use self::hook::InjectorHook; + +mod headers; +mod hook; + +#[derive(Debug)] +pub enum InjectError {} + +pub type FrameBuffer = Arc>>; + +pub struct Injector { + /// The injector is in a transaction state + is_txn: bool, + /// Buffer for holding current transaction frames + buffer: FrameBuffer, + /// Maximum capacity of the frame buffer + capacity: usize, + /// Injector connection + // connection must be dropped before the hook context + connection: Arc>>, +} + +/// Methods from this trait are called before and after performing a frame injection. +/// This trait trait is used to record the last committed frame_no to the log. +/// The implementer can persist the pre and post commit frame no, and compare them in the event of +/// a crash; if the pre and post commit frame_no don't match, then the log may be corrupted. +impl Injector { + pub fn new(path: &Path, buffer_capacity: usize) -> crate::Result { + let buffer = FrameBuffer::default(); + let ctx = InjectorHookCtx::new(buffer.clone()); + std::fs::create_dir_all(path)?; + + { + // create the replication table if it doesn't exist. We need to do that without hooks. + let connection = sqld_libsql_bindings::Connection::open( + path, + OpenFlags::SQLITE_OPEN_READ_WRITE + | OpenFlags::SQLITE_OPEN_CREATE + | OpenFlags::SQLITE_OPEN_URI + | OpenFlags::SQLITE_OPEN_NO_MUTEX, + &TRANSPARENT_METHODS, + // safety: hook is dropped after connection + (), + u32::MAX, + )?; + + connection.execute("CREATE TABLE IF NOT EXISTS libsql_temp_injection (x)", ())?; + } + + let connection = sqld_libsql_bindings::Connection::open( + path, + OpenFlags::SQLITE_OPEN_READ_WRITE + | OpenFlags::SQLITE_OPEN_CREATE + | OpenFlags::SQLITE_OPEN_URI + | OpenFlags::SQLITE_OPEN_NO_MUTEX, + &INJECTOR_METHODS, + // safety: hook is dropped after connection + ctx, + DEFAULT_AUTO_CHECKPOINT, + )?; + + connection.execute("CREATE TABLE IF NOT EXISTS libsql_temp_injection (x)", ())?; + + Ok(Self { + is_txn: false, + buffer, + capacity: buffer_capacity, + connection: Arc::new(Mutex::new(connection)), + }) + } + + /// Inject on frame into the log. If this was a commit frame, returns Ok(Some(FrameNo)). + pub(crate) fn inject_frame(&mut self, frame: Frame) -> crate::Result> { + let frame_close_txn = frame.header().size_after != 0; + self.buffer.lock().push_back(frame); + if frame_close_txn || self.buffer.lock().len() >= self.capacity { + if !self.is_txn { + self.begin_txn()?; + } + return self.flush(); + } + + Ok(None) + } + + /// Flush the buffer to libsql WAL. + /// Trigger a dummy write, and flush the cache to trigger a call to xFrame. The buffer's frame + /// are then injected into the wal. + fn flush(&mut self) -> crate::Result> { + let lock = self.buffer.lock(); + // the frames in the buffer are either monotonically increasing (log) or decreasing + // (snapshot). Either way, we want to find the biggest frameno we're about to commit, and + // that is either the front or the back of the buffer + let last_frame_no = match lock.back().zip(lock.front()) { + Some((b, f)) => f.header().frame_no.max(b.header().frame_no), + None => { + tracing::trace!("nothing to inject"); + return Ok(None); + } + }; + + drop(lock); + + let connection = self.connection.lock(); + // use prepare cached to avoid parsing the same statement over and over again. + let mut stmt = + connection.prepare_cached("INSERT INTO libsql_temp_injection VALUES (42)")?; + + // We execute the statement, and then force a call to xframe if necesacary. If the execute + // succeeds, then xframe wasn't called, in this case, we call cache_flush, and then process + // the error. + // It is unexpected that execute flushes, but it is possible, so we handle that case. + match stmt.execute(()).and_then(|_| connection.cache_flush()) { + Ok(_) => panic!("replication hook was not called"), + Err(e) => { + if let Some(e) = e.sqlite_error() { + if e.extended_code == LIBSQL_INJECT_OK { + // refresh schema + connection.pragma_update(None, "writable_schema", "reset")?; + if let Err(e) = connection.execute("COMMIT", ()) { + if !matches!(e.sqlite_error(), Some(rusqlite::ffi::Error{ extended_code, .. }) if *extended_code == 201) + { + tracing::error!("injector failed to commit: {e}"); + return Err(Error::FatalReplicationError); + } + } + self.is_txn = false; + assert!(self.buffer.lock().is_empty()); + return Ok(Some(last_frame_no)); + } else if e.extended_code == LIBSQL_INJECT_OK_TXN { + self.is_txn = true; + assert!(self.buffer.lock().is_empty()); + return Ok(None); + } else if e.extended_code == LIBSQL_INJECT_FATAL { + return Err(Error::FatalReplicationError); + } + } + + Err(Error::FatalReplicationError) + } + } + } + + fn begin_txn(&mut self) -> crate::Result<()> { + let conn = self.connection.lock(); + conn.execute("BEGIN IMMEDIATE", ())?; + Ok(()) + } + + pub fn clear_buffer(&mut self) { + self.buffer.lock().clear(); + } +} + +#[cfg(test)] +mod test { + use crate::replication::primary::logger::LogFile; + + use super::Injector; + + #[test] + fn test_simple_inject_frames() { + let log_file = std::fs::File::open("assets/test/simple_wallog").unwrap(); + let log = LogFile::new(log_file, 10000, None).unwrap(); + let temp = tempfile::tempdir().unwrap(); + + let mut injector = Injector::new(temp.path(), 10).unwrap(); + for frame in log.frames_iter().unwrap() { + let frame = frame.unwrap(); + injector.inject_frame(frame).unwrap(); + } + + let conn = rusqlite::Connection::open(temp.path().join("data")).unwrap(); + + conn.query_row("SELECT COUNT(*) FROM test", (), |row| { + assert_eq!(row.get::<_, usize>(0).unwrap(), 5); + Ok(()) + }) + .unwrap(); + } + + #[test] + fn test_inject_frames_split_txn() { + let log_file = std::fs::File::open("assets/test/simple_wallog").unwrap(); + let log = LogFile::new(log_file, 10000, None).unwrap(); + let temp = tempfile::tempdir().unwrap(); + + // inject one frame at a time + let mut injector = Injector::new(temp.path(), 1).unwrap(); + for frame in log.frames_iter().unwrap() { + let frame = frame.unwrap(); + injector.inject_frame(frame).unwrap(); + } + + let conn = rusqlite::Connection::open(temp.path().join("data")).unwrap(); + + conn.query_row("SELECT COUNT(*) FROM test", (), |row| { + assert_eq!(row.get::<_, usize>(0).unwrap(), 5); + Ok(()) + }) + .unwrap(); + } + + #[test] + fn test_inject_partial_txn_isolated() { + let log_file = std::fs::File::open("assets/test/simple_wallog").unwrap(); + let log = LogFile::new(log_file, 10000, None).unwrap(); + let temp = tempfile::tempdir().unwrap(); + + // inject one frame at a time + let mut injector = Injector::new(temp.path(), 10).unwrap(); + let mut iter = log.frames_iter().unwrap(); + + assert!(injector + .inject_frame(iter.next().unwrap().unwrap()) + .unwrap() + .is_none()); + let conn = rusqlite::Connection::open(temp.path().join("data")).unwrap(); + assert!(conn + .query_row("SELECT COUNT(*) FROM test", (), |_| Ok(())) + .is_err()); + + while injector + .inject_frame(iter.next().unwrap().unwrap()) + .unwrap() + .is_none() + {} + + // reset schema + conn.pragma_update(None, "writable_schema", "reset") + .unwrap(); + conn.query_row("SELECT COUNT(*) FROM test", (), |_| Ok(())) + .unwrap(); + } +} diff --git a/sqld/src/replication/replica/meta.rs b/sqld/src/replication/replica/meta.rs index b38534e1..1216cd39 100644 --- a/sqld/src/replication/replica/meta.rs +++ b/sqld/src/replication/replica/meta.rs @@ -1,12 +1,13 @@ -use std::fs::{File, OpenOptions}; -use std::io::ErrorKind; +use std::io::{ErrorKind, SeekFrom}; use std::mem::size_of; -use std::os::unix::prelude::FileExt; use std::path::Path; use std::str::FromStr; use anyhow::Context; -use bytemuck::{try_pod_read_unaligned, Pod, Zeroable}; +use bytemuck::{bytes_of, try_pod_read_unaligned, Pod, Zeroable}; +use tokio::fs::File; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; +use tokio::pin; use uuid::Uuid; use crate::{replication::FrameNo, rpc::replication_log::rpc::HelloResponse}; @@ -15,43 +16,20 @@ use super::error::ReplicationError; #[repr(C)] #[derive(Debug, Pod, Zeroable, Clone, Copy)] -pub struct WalIndexMeta { - /// This is the anticipated next frame_no to request - pub pre_commit_frame_no: FrameNo, - /// After we have written the frames back to the wal, we set this value to the same value as - /// pre_commit_index - /// On startup we check this value against the pre-commit value to check for consistency - pub post_commit_frame_no: FrameNo, - /// Generation Uuid - /// This number is generated on each primary restart. This let's us know that the primary, and - /// we need to make sure that we are not ahead of the primary. - generation_id: u128, - /// Uuid of the database this instance is a replica of - database_id: u128, +pub struct WalIndexMetaData { + /// id of the replicated log + log_id: u128, + /// commited frame index + pub committed_frame_no: FrameNo, + _padding: u64, } -impl WalIndexMeta { - pub fn open(db_path: &Path) -> crate::Result { - let path = db_path.join("client_wal_index"); - std::fs::create_dir_all(db_path)?; - - Ok(OpenOptions::new() - .create(true) - .read(true) - .write(true) - .open(path)?) - } - - pub fn read_from_path(db_path: &Path) -> anyhow::Result> { - let file = Self::open(db_path)?; - Ok(Self::read(&file)?) - } - - fn read(file: &File) -> crate::Result> { - let mut buf = [0; size_of::()]; - let meta = match file.read_exact_at(&mut buf, 0) { - Ok(()) => { - file.read_exact_at(&mut buf, 0)?; +impl WalIndexMetaData { + async fn read(file: impl AsyncRead) -> crate::Result> { + pin!(file); + let mut buf = [0; size_of::()]; + let meta = match file.read_exact(&mut buf).await { + Ok(_) => { let meta: Self = try_pod_read_unaligned(&buf) .map_err(|_| anyhow::anyhow!("invalid index meta file"))?; Some(meta) @@ -62,44 +40,94 @@ impl WalIndexMeta { Ok(meta) } +} + +pub struct WalIndexMeta { + file: File, + data: Option, +} + +impl WalIndexMeta { + pub async fn open(db_path: &Path) -> crate::Result { + let path = db_path.join("client_wal_index"); + + tokio::fs::create_dir_all(db_path).await?; + + let mut file = tokio::fs::OpenOptions::new() + .create(true) + .read(true) + .write(true) + .open(&path) + .await?; + + let data = WalIndexMetaData::read(&mut file).await?; + + Ok(Self { file, data }) + } /// attempts to merge two meta files. - pub fn merge_from_hello(mut self, hello: HelloResponse) -> Result { - let hello_db_id = Uuid::from_str(&hello.database_id) + pub fn merge_hello(&mut self, hello: HelloResponse) -> Result<(), ReplicationError> { + let hello_log_id = Uuid::from_str(&hello.log_id) .context("invalid database id from primary")? .as_u128(); - let hello_gen_id = Uuid::from_str(&hello.generation_id) - .context("invalid generation id from primary")? - .as_u128(); - if hello_db_id != self.database_id { - return Err(ReplicationError::DbIncompatible); + match self.data { + Some(meta) => { + if meta.log_id != hello_log_id { + Err(ReplicationError::LogIncompatible) + } else { + Ok(()) + } + } + None => { + self.data = Some(WalIndexMetaData { + log_id: hello_log_id, + committed_frame_no: FrameNo::MAX, + _padding: 0, + }); + Ok(()) + } } + } - if self.generation_id == hello_gen_id { - Ok(self) - } else if self.pre_commit_frame_no <= hello.generation_start_index { - // Ok: generation changed, but we aren't ahead of primary - self.generation_id = hello_gen_id; - Ok(self) - } else { - Err(ReplicationError::Lagging) + pub async fn flush(&mut self) -> crate::Result<()> { + if let Some(data) = self.data { + // FIXME: we can save a syscall by calling read_exact_at, but let's use tokio API for now + self.file.seek(SeekFrom::Start(0)).await?; + let s = self.file.write(bytes_of(&data)).await?; + // WalIndexMeta is smaller than a page size, and aligned at the beginning of the file, if + // should always be written in a single call + assert_eq!(s, size_of::()); + self.file.flush().await?; } + + Ok(()) } - pub fn new_from_hello(hello: HelloResponse) -> anyhow::Result { - let database_id = Uuid::from_str(&hello.database_id) - .context("invalid database id from primary")? - .as_u128(); - let generation_id = Uuid::from_str(&hello.generation_id) - .context("invalid generation id from primary")? - .as_u128(); + /// Apply the last commit frame no to the meta file. + /// This function must be called after each injection, because it's idempotent to re-apply the + /// last transaction, but not idempotent if we lose track of more than one. + pub async fn set_commit_frame_no(&mut self, commit_fno: FrameNo) -> crate::Result<()> { + { + let data = self + .data + .as_mut() + .expect("call set_commit_frame_no before initializing meta"); + data.committed_frame_no = commit_fno; + } - Ok(Self { - pre_commit_frame_no: FrameNo::MAX, - post_commit_frame_no: FrameNo::MAX, - generation_id, - database_id, + self.flush().await?; + + Ok(()) + } + + pub(crate) fn current_frame_no(&self) -> Option { + self.data.and_then(|d| { + if d.committed_frame_no == FrameNo::MAX { + None + } else { + Some(d.committed_frame_no) + } }) } } diff --git a/sqld/src/replication/replica/mod.rs b/sqld/src/replication/replica/mod.rs index a6e7e63c..3fc946b7 100644 --- a/sqld/src/replication/replica/mod.rs +++ b/sqld/src/replication/replica/mod.rs @@ -1,8 +1,6 @@ pub mod error; -mod hook; mod injector; mod meta; mod replicator; -mod snapshot; pub use replicator::Replicator; diff --git a/sqld/src/replication/replica/replicator.rs b/sqld/src/replication/replica/replicator.rs index 4a16c5d3..f562b0b5 100644 --- a/sqld/src/replication/replica/replicator.rs +++ b/sqld/src/replication/replica/replicator.rs @@ -1,11 +1,10 @@ -use std::os::unix::prelude::FileExt; use std::path::PathBuf; use std::sync::Arc; -use bytemuck::bytes_of; use futures::StreamExt; -use tokio::sync::{mpsc, oneshot, watch, Mutex}; -use tokio::task::JoinSet; +use parking_lot::Mutex; +use tokio::sync::watch; +use tokio::task::spawn_blocking; use tokio::time::Duration; use tonic::metadata::BinaryMetadataValue; use tonic::transport::Channel; @@ -14,17 +13,14 @@ use tonic::{Code, Request}; use crate::namespace::{NamespaceName, ResetCb, ResetOp}; use crate::replication::frame::Frame; use crate::replication::replica::error::ReplicationError; -use crate::replication::replica::snapshot::TempSnapshot; use crate::replication::FrameNo; use crate::rpc::replication_log::rpc::{ replication_log_client::ReplicationLogClient, HelloRequest, LogOffset, }; use crate::rpc::replication_log::NEED_SNAPSHOT_ERROR_MSG; use crate::rpc::{NAMESPACE_DOESNT_EXIST, NAMESPACE_METADATA_KEY}; -use crate::BLOCKING_RT; -use super::hook::{Frames, InjectorHookCtx}; -use super::injector::FrameInjector; +use super::injector::Injector; use super::meta::WalIndexMeta; const HANDSHAKE_MAX_RETRIES: usize = 100; @@ -35,97 +31,42 @@ type Client = ReplicationLogClient; /// transaction boundaries. pub struct Replicator { client: Client, - db_path: PathBuf, namespace: NamespaceName, - meta: Arc>>, - pub current_frame_no_notifier: watch::Receiver>, - frames_sender: mpsc::Sender, - /// hard reset channel: send the namespace there, to reset it + meta: WalIndexMeta, + injector: Arc>, + pub current_frame_no_notifier: watch::Sender>, reset: ResetCb, } +const INJECTOR_BUFFER_CAPACITY: usize = 10; + impl Replicator { pub async fn new( db_path: PathBuf, channel: Channel, uri: tonic::transport::Uri, namespace: NamespaceName, - join_set: &mut JoinSet>, reset: ResetCb, ) -> anyhow::Result { + let (current_frame_no_notifier, _) = watch::channel(None); + let injector = { + let db_path = db_path.clone(); + spawn_blocking(move || Injector::new(&db_path, INJECTOR_BUFFER_CAPACITY)).await?? + }; let client = Client::with_origin(channel, uri); - let (applied_frame_notifier, current_frame_no_notifier) = watch::channel(None); - let (frames_sender, receiver) = tokio::sync::mpsc::channel(1); + let meta = WalIndexMeta::open(&db_path).await?; let mut this = Self { namespace, client, - db_path: db_path.clone(), current_frame_no_notifier, - meta: Arc::new(Mutex::new(None)), - frames_sender, + meta, + injector: Arc::new(Mutex::new(injector)), reset, }; this.try_perform_handshake().await?; - let meta_file = Arc::new(WalIndexMeta::open(&db_path)?); - let meta = this.meta.clone(); - - let pre_commit = { - let meta = meta.clone(); - let meta_file = meta_file.clone(); - move |fno| { - let mut lock = meta.blocking_lock(); - let meta = lock - .as_mut() - .expect("commit called before meta inialization"); - meta.pre_commit_frame_no = fno; - meta_file.write_all_at(bytes_of(meta), 0)?; - - Ok(()) - } - }; - - let post_commit = { - let meta = meta.clone(); - let meta_file = meta_file; - let notifier = applied_frame_notifier; - move |fno| { - let mut lock = meta.blocking_lock(); - let meta = lock - .as_mut() - .expect("commit called before meta inialization"); - assert_eq!(meta.pre_commit_frame_no, fno); - meta.post_commit_frame_no = fno; - meta_file.write_all_at(bytes_of(meta), 0)?; - let _ = notifier.send(Some(fno)); - - Ok(()) - } - }; - - let (snd, rcv) = oneshot::channel(); - let handle = BLOCKING_RT.spawn_blocking({ - move || -> anyhow::Result<()> { - let ctx = InjectorHookCtx::new(receiver, pre_commit, post_commit); - let mut injector = FrameInjector::new(&db_path, ctx)?; - let _ = snd.send(()); - - while injector.step()? {} - - Ok(()) - } - }); - - join_set.spawn(async move { - handle.await??; - Ok(()) - }); - - // injector is ready: - rcv.await?; - Ok(this) } @@ -143,10 +84,13 @@ impl Replicator { loop { self.try_perform_handshake().await?; - if let Err(e) = self.replicate().await { - // Replication encountered an error. We log the error, and then shut down the - // injector and propagate a potential panic from there. - tracing::warn!("replication error: {e}"); + loop { + if let Err(e) = self.replicate().await { + // Replication encountered an error. We log the error, and then shut down the + // injector and propagate a potential panic from there. + tracing::warn!("replication error: {e}"); + break; + } } tokio::time::sleep(Duration::from_secs(1)).await; } @@ -154,20 +98,13 @@ impl Replicator { async fn handle_replication_error(&self, error: ReplicationError) -> crate::error::Error { match error { - ReplicationError::Lagging => { - tracing::error!("Replica ahead of primary: hard-reseting replica"); + ReplicationError::LogIncompatible => { + tracing::error!("Primary's replication log incompatible with ours: repairing."); + (self.reset)(ResetOp::Reset(self.namespace.clone())); + error.into() } - ReplicationError::DbIncompatible => { - tracing::error!( - "Primary is attempting to replicate a different database, overwriting replica." - ); - } - _ => return error.into(), + _ => error.into(), } - - (self.reset)(ResetOp::Reset(self.namespace.clone())); - - error.into() } async fn try_perform_handshake(&mut self) -> crate::Result<()> { @@ -179,22 +116,12 @@ impl Replicator { Ok(resp) => { let hello = resp.into_inner(); - let mut lock = self.meta.lock().await; - let meta = match *lock { - Some(meta) => match meta.merge_from_hello(hello) { - Ok(meta) => meta, - Err(e) => return Err(self.handle_replication_error(e).await), - }, - None => match WalIndexMeta::read_from_path(&self.db_path)? { - Some(meta) => match meta.merge_from_hello(hello) { - Ok(meta) => meta, - Err(e) => return Err(self.handle_replication_error(e).await), - }, - None => WalIndexMeta::new_from_hello(hello)?, - }, - }; - - *lock = Some(meta); + if let Err(e) = self.meta.merge_hello(hello) { + return Err(self.handle_replication_error(e).await); + } + + self.current_frame_no_notifier + .send_replace(self.meta.current_frame_no()); return Ok(()); } @@ -222,9 +149,7 @@ impl Replicator { } async fn replicate(&mut self) -> anyhow::Result<()> { - const MAX_REPLICA_REPLICATION_BUFFER_LEN: usize = 10_000_000 / 4096; // ~10MB let offset = LogOffset { - // if current == FrameNo::Max then it means that we're starting fresh next_offset: self.next_offset(), }; @@ -232,20 +157,11 @@ impl Replicator { let mut stream = self.client.log_entries(req).await?.into_inner(); - let mut buffer = Vec::new(); loop { match stream.next().await { Some(Ok(frame)) => { - let frame = Frame::try_from_bytes(frame.data)?; - buffer.push(frame.clone()); - if frame.header().size_after != 0 - || buffer.len() > MAX_REPLICA_REPLICATION_BUFFER_LEN - { - let _ = self - .frames_sender - .send(Frames::Vec(std::mem::take(&mut buffer))) - .await; - } + let frame = Frame::try_from(&*frame.data)?; + self.inject_frame(frame).await?; } Some(Err(err)) if err.code() == tonic::Code::FailedPrecondition @@ -254,7 +170,6 @@ impl Replicator { tracing::debug!("loading snapshot"); // remove any outstanding frames in the buffer that are not part of a // transaction: they are now part of the snapshot. - buffer.clear(); self.load_snapshot().await?; } Some(Err(e)) => return Err(e.into()), @@ -264,19 +179,48 @@ impl Replicator { } async fn load_snapshot(&mut self) -> anyhow::Result<()> { + self.injector.lock().clear_buffer(); let next_offset = self.next_offset(); let req = self.make_request(LogOffset { next_offset }); + // FIXME: check for unavailable snapshot and try again, or make primary wait for snapshot + // to become available let frames = self.client.snapshot(req).await?.into_inner(); - let stream = frames.map(|data| match data { - Ok(frame) => Frame::try_from_bytes(frame.data), + let mut stream = frames.map(|data| match data { + Ok(frame) => Frame::try_from(&*frame.data), Err(e) => anyhow::bail!(e), }); - let snap = TempSnapshot::from_stream(&self.db_path, stream).await?; - let _ = self.frames_sender.send(Frames::Snapshot(snap)).await; + while let Some(frame) = stream.next().await { + let frame = frame?; + self.inject_frame(frame).await?; + } + + Ok(()) + } + + async fn inject_frame(&mut self, frame: Frame) -> anyhow::Result<()> { + let injector = self.injector.clone(); + match spawn_blocking(move || injector.lock().inject_frame(frame)).await? { + Ok(Some(commit_fno)) => { + self.meta.set_commit_frame_no(commit_fno).await?; + self.current_frame_no_notifier + .send_replace(Some(commit_fno)); + } + Ok(None) => (), + Err(e @ crate::Error::FatalReplicationError) => { + // we conservatively nuke the replica and start replicating from scractch + tracing::error!( + "fatal error replicating `{}` from primary, reseting namespace...", + self.namespace + ); + (self.reset)(ResetOp::Destroy(self.namespace.clone())); + Err(e)? + } + Err(e) => Err(e)?, + } Ok(()) } @@ -286,6 +230,6 @@ impl Replicator { } fn current_frame_no(&mut self) -> Option { - *self.current_frame_no_notifier.borrow_and_update() + self.meta.current_frame_no() } } diff --git a/sqld/src/replication/replica/snapshot.rs b/sqld/src/replication/replica/snapshot.rs deleted file mode 100644 index 523f55e7..00000000 --- a/sqld/src/replication/replica/snapshot.rs +++ /dev/null @@ -1,50 +0,0 @@ -use std::path::{Path, PathBuf}; - -use futures::{Stream, StreamExt}; -use tempfile::NamedTempFile; -use tokio::io::{AsyncWriteExt, BufWriter}; - -use crate::replication::frame::{Frame, FrameBorrowed}; - -#[derive(Debug)] -pub struct TempSnapshot { - path: PathBuf, - map: memmap::Mmap, -} - -impl TempSnapshot { - pub async fn from_stream( - db_path: &Path, - mut s: impl Stream> + Unpin, - ) -> anyhow::Result { - let temp_dir = db_path.join("temp"); - tokio::fs::create_dir_all(&temp_dir).await?; - let file = NamedTempFile::new_in(temp_dir)?; - let tokio_file = tokio::fs::File::from_std(file.as_file().try_clone()?); - - let mut tokio_file = BufWriter::new(tokio_file); - while let Some(frame) = s.next().await { - let frame = frame?; - tokio_file.write_all(frame.as_slice()).await?; - } - - tokio_file.flush().await?; - - let (file, path) = file.keep()?; - - let map = unsafe { memmap::Mmap::map(&file)? }; - - Ok(Self { path, map }) - } - - pub fn iter(&self) -> impl Iterator { - self.map.chunks(Frame::SIZE).map(FrameBorrowed::from_bytes) - } -} - -impl Drop for TempSnapshot { - fn drop(&mut self) { - let path = std::mem::take(&mut self.path); - let _ = std::fs::remove_file(path); - } -} diff --git a/sqld/src/replication/snapshot.rs b/sqld/src/replication/snapshot.rs index adc42a67..a9e2fd35 100644 --- a/sqld/src/replication/snapshot.rs +++ b/sqld/src/replication/snapshot.rs @@ -21,7 +21,7 @@ use uuid::Uuid; use crate::namespace::NamespaceName; -use super::frame::Frame; +use super::frame::FrameMut; use super::primary::logger::LogFile; use super::FrameNo; @@ -35,7 +35,7 @@ const MAX_SNAPSHOT_NUMBER: usize = 32; #[repr(C)] pub struct SnapshotFileHeader { /// id of the database - pub db_id: u128, + pub log_id: u128, /// first frame in the snapshot pub start_frame_no: u64, /// end frame in the snapshot @@ -134,7 +134,7 @@ impl SnapshotFile { } /// Iterator on the frames contained in the snapshot file, in reverse frame_no order. - pub fn frames_iter(&self) -> impl Iterator> + '_ { + pub fn frames_iter_mut(&self) -> impl Iterator> + '_ { let mut current_offset = 0; std::iter::from_fn(move || { if current_offset >= self.header.frame_count { @@ -145,7 +145,7 @@ impl SnapshotFile { current_offset += 1; let mut buf = BytesMut::zeroed(LogFile::FRAME_SIZE); match self.file.read_exact_at(&mut buf, read_offset as _) { - Ok(_) => match Frame::try_from_bytes(buf.freeze()) { + Ok(_) => match FrameMut::try_from(&*buf) { Ok(frame) => Some(Ok(frame)), Err(e) => Some(Err(e)), }, @@ -154,12 +154,14 @@ impl SnapshotFile { }) } - /// Like `frames_iter`, but stops as soon as a frame with frame_no <= `frame_no` is reached + /// Like `frames_iter`, but stops as soon as a frame with frame_no <= `frame_no` is reached. + /// The frames are returned in monotonically strictly decreasing frame_no. In other words, the + /// most recent frames come first. pub fn frames_iter_from( &self, frame_no: u64, - ) -> impl Iterator> + '_ { - let mut iter = self.frames_iter(); + ) -> impl Iterator> + '_ { + let mut iter = self.frames_iter_mut(); std::iter::from_fn(move || match iter.next() { Some(Ok(frame)) => { if frame.header().frame_no < frame_no { @@ -171,6 +173,10 @@ impl SnapshotFile { other => other, }) } + + pub fn header(&self) -> &SnapshotFileHeader { + &self.header + } } #[derive(Clone, Debug)] @@ -183,18 +189,19 @@ pub type NamespacedSnapshotCallback = Arc anyhow::Result<()> + Send + Sync>; impl LogCompactor { - pub fn new(db_path: &Path, db_id: u128, callback: SnapshotCallback) -> anyhow::Result { + pub fn new(db_path: &Path, log_id: Uuid, callback: SnapshotCallback) -> anyhow::Result { // we create a 0 sized channel, in order to create backpressure when we can't // keep up with snapshop creation: if there isn't any ongoind comptaction task processing, // the compact does not block, and the log is compacted in the background. Otherwise, the // block until there is a free slot to perform compaction. let (sender, receiver) = bounded::<(LogFile, PathBuf, u32)>(0); - let mut merger = SnapshotMerger::new(db_path, db_id)?; + let mut merger = SnapshotMerger::new(db_path, log_id)?; let db_path = db_path.to_path_buf(); let snapshot_dir_path = snapshot_dir_path(&db_path); + // FIXME: use tokio task for compaction let _handle = std::thread::spawn(move || { while let Ok((file, log_path, size_after)) = receiver.recv() { - match perform_compaction(&db_path, file, db_id) { + match perform_compaction(&db_path, file, log_id) { Ok((snapshot_name, snapshot_frame_count)) => { tracing::info!("snapshot `{snapshot_name}` successfully created"); @@ -252,12 +259,12 @@ struct SnapshotMerger { } impl SnapshotMerger { - fn new(db_path: &Path, db_id: u128) -> anyhow::Result { + fn new(db_path: &Path, log_id: Uuid) -> anyhow::Result { let (sender, receiver) = mpsc::channel(); let db_path = db_path.to_path_buf(); let handle = - std::thread::spawn(move || Self::run_snapshot_merger_loop(receiver, &db_path, db_id)); + std::thread::spawn(move || Self::run_snapshot_merger_loop(receiver, &db_path, log_id)); Ok(Self { sender, @@ -274,13 +281,13 @@ impl SnapshotMerger { fn run_snapshot_merger_loop( receiver: mpsc::Receiver<(String, u64, u32)>, db_path: &Path, - db_id: u128, + log_id: Uuid, ) -> anyhow::Result<()> { let mut snapshots = Self::init_snapshot_info_list(db_path)?; while let Ok((name, size, db_page_count)) = receiver.recv() { snapshots.push((name, size)); if Self::should_compact(&snapshots, db_page_count) { - let compacted_snapshot_info = Self::merge_snapshots(&snapshots, db_path, db_id)?; + let compacted_snapshot_info = Self::merge_snapshots(&snapshots, db_path, log_id)?; snapshots.clear(); snapshots.push(compacted_snapshot_info); } @@ -323,13 +330,18 @@ impl SnapshotMerger { fn merge_snapshots( snapshots: &[(String, u64)], db_path: &Path, - db_id: u128, + log_id: Uuid, ) -> anyhow::Result<(String, u64)> { - let mut builder = SnapshotBuilder::new(db_path, db_id)?; + let mut builder = SnapshotBuilder::new(db_path, log_id)?; let snapshot_dir_path = snapshot_dir_path(db_path); + let mut size_after = None; for (name, _) in snapshots.iter().rev() { let snapshot = SnapshotFile::open(&snapshot_dir_path.join(name))?; - let iter = snapshot.frames_iter(); + // The size after the merged snapshot is the size after the first snapshot to be merged + if size_after.is_none() { + size_after.replace(snapshot.header.size_after); + } + let iter = snapshot.frames_iter_mut(); builder.append_frames(iter)?; } @@ -338,6 +350,7 @@ impl SnapshotMerger { builder.header.start_frame_no = start_frame_no; builder.header.end_frame_no = end_frame_no; + builder.header.size_after = size_after.unwrap(); let compacted_snapshot_infos = builder.finish()?; @@ -386,7 +399,7 @@ fn snapshot_dir_path(db_path: &Path) -> PathBuf { } impl SnapshotBuilder { - fn new(db_path: &Path, db_id: u128) -> anyhow::Result { + fn new(db_path: &Path, log_id: Uuid) -> anyhow::Result { let snapshot_dir_path = snapshot_dir_path(db_path); std::fs::create_dir_all(&snapshot_dir_path)?; let mut target = BufWriter::new(NamedTempFile::new_in(&snapshot_dir_path)?); @@ -396,7 +409,7 @@ impl SnapshotBuilder { Ok(Self { seen_pages: HashSet::new(), header: SnapshotFileHeader { - db_id, + log_id: log_id.as_u128(), start_frame_no: u64::MAX, end_frame_no: u64::MIN, frame_count: 0, @@ -412,7 +425,7 @@ impl SnapshotBuilder { /// append frames to the snapshot. Frames must be in decreasing frame_no order. fn append_frames( &mut self, - frames: impl Iterator>, + frames: impl Iterator>, ) -> anyhow::Result<()> { // We iterate on the frames starting from the end of the log and working our way backward. We // make sure that only the most recent version of each file is present in the resulting @@ -421,7 +434,7 @@ impl SnapshotBuilder { // The snapshot file contains the most recent version of each page, in descending frame // number order. That last part is important for when we read it later on. for frame in frames { - let frame = frame?; + let mut frame = frame?; assert!(frame.header().frame_no < self.last_seen_frame_no); self.last_seen_frame_no = frame.header().frame_no; if frame.header().frame_no < self.header.start_frame_no { @@ -433,6 +446,11 @@ impl SnapshotBuilder { self.header.size_after = frame.header().size_after; } + // set all frames as non-commit frame in a snapshot, and let the client decide when to + // commit. This is ok because the client will stream frames backward until caught up, + // and then commit. + frame.header_mut().size_after = 0; + if !self.seen_pages.contains(&frame.header().page_no) { self.seen_pages.insert(frame.header().page_no); self.snapshot_file.write_all(frame.as_slice())?; @@ -450,7 +468,7 @@ impl SnapshotBuilder { file.as_file().write_all_at(bytes_of(&self.header), 0)?; let snapshot_name = format!( "{}-{}-{}.snap", - Uuid::from_u128(self.header.db_id), + Uuid::from_u128(self.header.log_id), self.header.start_frame_no, self.header.end_frame_no, ); @@ -464,10 +482,10 @@ impl SnapshotBuilder { fn perform_compaction( db_path: &Path, file_to_compact: LogFile, - db_id: u128, + log_id: Uuid, ) -> anyhow::Result<(String, u64)> { - let mut builder = SnapshotBuilder::new(db_path, db_id)?; - builder.append_frames(file_to_compact.rev_frames_iter()?)?; + let mut builder = SnapshotBuilder::new(db_path, log_id)?; + builder.append_frames(file_to_compact.rev_frames_iter_mut()?)?; builder.finish() } @@ -480,6 +498,7 @@ mod test { use bytes::Bytes; use tempfile::tempdir; + use crate::replication::frame::Frame; use crate::replication::primary::logger::WalPage; use crate::replication::snapshot::SnapshotFile; @@ -489,8 +508,8 @@ mod test { fn compact_file_create_snapshot() { let temp = tempfile::NamedTempFile::new().unwrap(); let mut log_file = LogFile::new(temp.as_file().try_clone().unwrap(), 0, None).unwrap(); - let db_id = Uuid::new_v4(); - log_file.header.db_id = db_id.as_u128(); + let log_id = Uuid::new_v4(); + log_file.header.log_id = log_id.as_u128(); log_file.write_header().unwrap(); // add 50 pages, each one in two versions @@ -509,8 +528,7 @@ mod test { log_file.commit().unwrap(); let dump_dir = tempdir().unwrap(); - let compactor = - LogCompactor::new(dump_dir.path(), db_id.as_u128(), Box::new(|_| Ok(()))).unwrap(); + let compactor = LogCompactor::new(dump_dir.path(), log_id, Box::new(|_| Ok(()))).unwrap(); compactor .compact(log_file, temp.path().to_path_buf(), 25) .unwrap(); @@ -518,7 +536,7 @@ mod test { thread::sleep(Duration::from_secs(1)); let snapshot_path = - snapshot_dir_path(dump_dir.path()).join(format!("{}-{}-{}.snap", db_id, 0, 49)); + snapshot_dir_path(dump_dir.path()).join(format!("{}-{}-{}.snap", log_id, 0, 49)); let snapshot = read(&snapshot_path).unwrap(); let header: SnapshotFileHeader = pod_read_unaligned(&snapshot[..std::mem::size_of::()]); @@ -526,14 +544,14 @@ mod test { assert_eq!(header.start_frame_no, 0); assert_eq!(header.end_frame_no, 49); assert_eq!(header.frame_count, 25); - assert_eq!(header.db_id, db_id.as_u128()); + assert_eq!(header.log_id, log_id.as_u128()); assert_eq!(header.size_after, 25); let mut seen_frames = HashSet::new(); let mut seen_page_no = HashSet::new(); let data = &snapshot[std::mem::size_of::()..]; data.chunks(LogFile::FRAME_SIZE).for_each(|f| { - let frame = Frame::try_from_bytes(Bytes::copy_from_slice(f)).unwrap(); + let frame = Frame::try_from(f).unwrap(); assert!(!seen_frames.contains(&frame.header().frame_no)); assert!(!seen_page_no.contains(&frame.header().page_no)); seen_page_no.insert(frame.header().page_no); diff --git a/sqld/src/rpc/replication_log.rs b/sqld/src/rpc/replication_log.rs index c82ede3f..79b7c4f8 100644 --- a/sqld/src/rpc/replication_log.rs +++ b/sqld/src/rpc/replication_log.rs @@ -239,9 +239,7 @@ impl ReplicationLog for ReplicationLogService { })?; let response = HelloResponse { - database_id: logger.database_id().unwrap().to_string(), - generation_start_index: logger.generation.start_index, - generation_id: logger.generation.id.to_string(), + log_id: logger.log_id().to_string(), }; Ok(tonic::Response::new(response)) @@ -268,12 +266,18 @@ impl ReplicationLog for ReplicationLogService { { Ok(Ok(Some(snapshot))) => { BLOCKING_RT.spawn_blocking(move || { - let mut frames = snapshot.frames_iter_from(offset); + let size_after = snapshot.header().size_after; + let mut frames = snapshot.frames_iter_from(offset).peekable(); loop { match frames.next() { - Some(Ok(frame)) => { + Some(Ok(mut frame)) => { + // this is the last frame we're sending for this snapshot, set the + // frame_no + if frames.peek().is_none() { + frame.header_mut().size_after = size_after; + } let _ = sender.blocking_send(Ok(Frame { - data: frame.bytes(), + data: crate::replication::frame::Frame::from(frame).bytes(), })); } Some(Err(e)) => {