From 725a4bbf788d73203c23ab2075bf2b5a4ea67173 Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Fri, 6 Sep 2024 02:20:45 +0000 Subject: [PATCH] feat(bolt): add workflow commands to bolt --- docs/libraries/workflow/DEVELOPING.md | 175 ----- lib/bolt/Cargo.lock | 734 +++++++++++++++++- lib/bolt/cli/Cargo.toml | 4 +- lib/bolt/cli/src/commands/cluster/server.rs | 1 + lib/bolt/cli/src/commands/mod.rs | 1 + lib/bolt/cli/src/commands/wf/mod.rs | 86 ++ lib/bolt/cli/src/commands/wf/signal.rs | 61 ++ lib/bolt/cli/src/main.rs | 7 + lib/bolt/core/Cargo.toml | 23 +- lib/bolt/core/src/tasks/api.rs | 42 - lib/bolt/core/src/tasks/{db.rs => db/mod.rs} | 337 ++++---- lib/bolt/core/src/tasks/db/sqlx.rs | 27 + lib/bolt/core/src/tasks/migrate.rs | 42 +- lib/bolt/core/src/tasks/mod.rs | 1 + lib/bolt/core/src/tasks/wf/mod.rs | 668 ++++++++++++++++ lib/bolt/core/src/tasks/wf/signal.rs | 282 +++++++ lib/bolt/core/src/utils/mod.rs | 67 +- .../standalone/metrics-publish/src/lib.rs | 2 +- 18 files changed, 2141 insertions(+), 419 deletions(-) delete mode 100644 docs/libraries/workflow/DEVELOPING.md create mode 100644 lib/bolt/cli/src/commands/wf/mod.rs create mode 100644 lib/bolt/cli/src/commands/wf/signal.rs rename lib/bolt/core/src/tasks/{db.rs => db/mod.rs} (71%) create mode 100644 lib/bolt/core/src/tasks/db/sqlx.rs create mode 100644 lib/bolt/core/src/tasks/wf/mod.rs create mode 100644 lib/bolt/core/src/tasks/wf/signal.rs diff --git a/docs/libraries/workflow/DEVELOPING.md b/docs/libraries/workflow/DEVELOPING.md deleted file mode 100644 index e7d9633e43..0000000000 --- a/docs/libraries/workflow/DEVELOPING.md +++ /dev/null @@ -1,175 +0,0 @@ -# Developing - -## View realtime logs - -```logql -{name="monolith-workflow-worker"} -``` - -## Fixing errors - -If you run in to a too many retries error on a workflow, then: - -1. Update the workflow code -2. Re-wake the workflow -3. Wait for the workflow to poll for new changes - -For a single workflow: - -```sql -UPDATE db_workflow.workflows SET wake_immediate = true WHERE workflow_id = 'MY_ID'; -``` - -For all workflows of a type: - -```sql -UPDATE db_workflow.workflows SET wake_immediate = true WHERE workflow_name = 'MY_NAME'; -``` - -# Visualize entire workflow history - -```sql -WITH workflow_events AS ( - SELECT 'WORKFLOW_ID'::uuid AS workflow_id -) -SELECT location, 'activity' AS t, activity_name, input, output, forgotten -FROM db_workflow.workflow_activity_events, workflow_events -WHERE - workflow_activity_events.workflow_id = workflow_events.workflow_id -UNION ALL -SELECT location, 'signal' AS t, signal_name AS name, null as input, null as output, forgotten -FROM db_workflow.workflow_signal_events, workflow_events -WHERE - workflow_signal_events.workflow_id = workflow_events.workflow_id -UNION ALL -SELECT location, 'sub_workflow' AS t, sub_workflow_id::STRING AS name, null as input, null as output, forgotten -FROM db_workflow.workflow_sub_workflow_events, workflow_events -WHERE - workflow_sub_workflow_events.workflow_id = workflow_events.workflow_id -UNION ALL -SELECT location, 'signal_send' AS t, signal_name AS name, null as input, null as output, forgotten -FROM db_workflow.workflow_signal_send_events, workflow_events -WHERE - workflow_signal_send_events.workflow_id = workflow_events.workflow_id -UNION ALL -SELECT location, 'message_send' AS t, message_name AS name, null as input, null as output, forgotten -FROM db_workflow.workflow_message_send_events, workflow_events -WHERE - workflow_message_send_events.workflow_id = workflow_events.workflow_id -UNION ALL -SELECT location, 'loop' AS t, NULL AS name, null as input, null as output, forgotten -FROM db_workflow.workflow_loop_events, workflow_events -WHERE - workflow_loop_events.workflow_id = workflow_events.workflow_id -ORDER BY location ASC; -``` - -## Completely delete workflow with ID - -```sql -WITH workflow_ids AS ( - SELECT 'WORKFLOW_ID'::uuid AS workflow_id -), -delete_activity_events AS ( - DELETE FROM db_workflow.workflow_activity_events - WHERE workflow_id IN (SELECT workflow_id FROM workflow_ids) - RETURNING 1 -), -delete_signal_events AS ( - DELETE FROM db_workflow.workflow_signal_events - WHERE workflow_id IN (SELECT workflow_id FROM workflow_ids) - RETURNING 1 -), -delete_sub_workflow_events AS ( - DELETE FROM db_workflow.workflow_sub_workflow_events - WHERE workflow_id IN (SELECT workflow_id FROM workflow_ids) - RETURNING 1 -), -delete_signal_send_events AS ( - DELETE FROM db_workflow.workflow_signal_send_events - WHERE workflow_id IN (SELECT workflow_id FROM workflow_ids) - RETURNING 1 -), -delete_message_send_events AS ( - DELETE FROM db_workflow.workflow_message_send_events - WHERE workflow_id IN (SELECT workflow_id FROM workflow_ids) - RETURNING 1 -), -delete_loop_events AS ( - DELETE FROM db_workflow.workflow_loop_events - WHERE workflow_id IN (SELECT workflow_id FROM workflow_ids) - RETURNING 1 -), -delete_activity_errors AS ( - DELETE FROM db_workflow.workflow_activity_errors - WHERE workflow_id IN (SELECT workflow_id FROM workflow_ids) - RETURNING 1 -) -DELETE FROM db_workflow.workflows -WHERE workflow_id IN (SELECT workflow_id FROM workflow_ids) -RETURNING 1; -``` - -## Completely delete workflow with name - -```sql -WITH workflow_ids AS ( - SELECT workflow_id - FROM db_workflow.workflows - WHERE workflow_name = 'WORKFLOW_NAME' -), -delete_activity_events AS ( - DELETE FROM db_workflow.workflow_activity_events - WHERE workflow_id IN (SELECT workflow_id FROM workflow_ids) - RETURNING 1 -), -delete_signal_events AS ( - DELETE FROM db_workflow.workflow_signal_events - WHERE workflow_id IN (SELECT workflow_id FROM workflow_ids) - RETURNING 1 -), -delete_sub_workflow_events AS ( - DELETE FROM db_workflow.workflow_sub_workflow_events - WHERE workflow_id IN (SELECT workflow_id FROM workflow_ids) - RETURNING 1 -), -delete_signal_send_events AS ( - DELETE FROM db_workflow.workflow_signal_send_events - WHERE workflow_id IN (SELECT workflow_id FROM workflow_ids) - RETURNING 1 -), -delete_message_send_events AS ( - DELETE FROM db_workflow.workflow_message_send_events - WHERE workflow_id IN (SELECT workflow_id FROM workflow_ids) - RETURNING 1 -), -delete_loop_events AS ( - DELETE FROM db_workflow.workflow_loop_events - WHERE workflow_id IN (SELECT workflow_id FROM workflow_ids) - RETURNING 1 -), -delete_activity_errors AS ( - DELETE FROM db_workflow.workflow_activity_errors - WHERE workflow_id IN (SELECT workflow_id FROM workflow_ids) - RETURNING 1 -) -DELETE FROM db_workflow.workflows -WHERE workflow_id IN (SELECT workflow_id FROM workflow_ids) -RETURNING 1; -``` - -## Misc - -```sql -select * -from db_workflow.workflows -where workflow_name = 'backend' -order by create_ts desc; -``` - -```sql -select workflow_activity_errors.* -from db_workflow.workflows -inner join db_workflow.workflow_activity_errors using (workflow_id) -where workflows.workflow_name = 'backend'; -``` diff --git a/lib/bolt/Cargo.lock b/lib/bolt/Cargo.lock index d8b8bf68f9..352a8dba4b 100644 --- a/lib/bolt/Cargo.lock +++ b/lib/bolt/Cargo.lock @@ -17,6 +17,19 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "ahash" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" +dependencies = [ + "cfg-if", + "getrandom", + "once_cell", + "version_check", + "zerocopy 0.7.35", +] + [[package]] name = "aho-corasick" version = "1.1.3" @@ -26,6 +39,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "allocator-api2" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" + [[package]] name = "android-tzdata" version = "0.1.1" @@ -41,6 +60,25 @@ dependencies = [ "libc", ] +[[package]] +name = "ansi-str" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cf4578926a981ab0ca955dc023541d19de37112bc24c1a197bd806d3d86ad1d" +dependencies = [ + "ansitok", +] + +[[package]] +name = "ansitok" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "220044e6a1bb31ddee4e3db724d29767f352de47445a6cd75e1a173142136c83" +dependencies = [ + "nom", + "vte", +] + [[package]] name = "anstream" version = "0.6.15" @@ -96,6 +134,12 @@ version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" +[[package]] +name = "arrayvec" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" + [[package]] name = "async-posthog" version = "0.2.3" @@ -131,6 +175,15 @@ dependencies = [ "syn 2.0.72", ] +[[package]] +name = "atoi" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f28d99ec8bfea296261ca1af174f24225171fea9664ba9003cbebee704810528" +dependencies = [ + "num-traits", +] + [[package]] name = "atty" version = "0.2.14" @@ -424,6 +477,18 @@ dependencies = [ "simd-abstraction", ] +[[package]] +name = "base64ct" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" + +[[package]] +name = "bit-vec" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "349f9b6a179ed607305526ca489b34ad0a41aed5f7980fa90eb03160b69598fb" + [[package]] name = "bitflags" version = "1.3.2" @@ -435,6 +500,9 @@ name = "bitflags" version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" +dependencies = [ + "serde", +] [[package]] name = "block-buffer" @@ -475,8 +543,8 @@ dependencies = [ "heck 0.3.3", "serde", "serde_json", - "strum", - "strum_macros", + "strum 0.25.0", + "strum_macros 0.25.3", "toml", "uuid", ] @@ -493,6 +561,8 @@ dependencies = [ "bolt-config", "chrono", "cjson", + "clap", + "colored_json", "derive_builder", "duct", "futures-util", @@ -518,6 +588,9 @@ dependencies = [ "serde_json", "serde_yaml", "sha2", + "sqlx", + "strum 0.26.3", + "tabled", "tempfile", "thiserror", "tokio", @@ -648,6 +721,17 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3fd119d74b830634cea2a0f58bbd0d54540518a14397557951e79340abc28c0" +[[package]] +name = "colored_json" +version = "5.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e35980a1b846f8e3e359fd18099172a0857140ba9230affc4f71348081e039b6" +dependencies = [ + "serde", + "serde_json", + "yansi", +] + [[package]] name = "console" version = "0.15.8" @@ -661,6 +745,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "const-oid" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" + [[package]] name = "core-foundation" version = "0.9.4" @@ -686,6 +776,21 @@ dependencies = [ "libc", ] +[[package]] +name = "crc" +version = "3.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69e6e4d7b33a94f0991c26729976b10ebde1d34c3ee82408fb536164fa10d636" +dependencies = [ + "crc-catalog", +] + +[[package]] +name = "crc-catalog" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" + [[package]] name = "crc32c" version = "0.6.8" @@ -704,6 +809,21 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df0346b5d5e76ac2fe4e327c5fd1118d6be7c51dfb18f9b7922923f287471e35" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" + [[package]] name = "crypto-common" version = "0.1.6" @@ -784,6 +904,17 @@ dependencies = [ "syn 2.0.72", ] +[[package]] +name = "der" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f55bf8e7b65898637379c1b74eb1551107c8294ed26d855ceb9fd1a09cfc9bc0" +dependencies = [ + "const-oid", + "pem-rfc7468", + "zeroize", +] + [[package]] name = "deranged" version = "0.3.11" @@ -832,10 +963,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ "block-buffer", + "const-oid", "crypto-common", "subtle", ] +[[package]] +name = "dotenvy" +version = "0.15.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" + [[package]] name = "duct" version = "0.13.7" @@ -853,6 +991,9 @@ name = "either" version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0" +dependencies = [ + "serde", +] [[package]] name = "encode_unicode" @@ -885,6 +1026,23 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "etcetera" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "136d1b5283a1ab77bd9257427ffd09d8667ced0570b6f938942bc7568ed5b943" +dependencies = [ + "cfg-if", + "home", + "windows-sys 0.48.0", +] + +[[package]] +name = "event-listener" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" + [[package]] name = "fastrand" version = "1.9.0" @@ -900,6 +1058,17 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a" +[[package]] +name = "flume" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55ac459de2512911e4b674ce33cf20befaba382d05b62b008afc1c8b57cbf181" +dependencies = [ + "futures-core", + "futures-sink", + "spin 0.9.8", +] + [[package]] name = "fnv" version = "1.0.7" @@ -937,6 +1106,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -945,6 +1115,34 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +[[package]] +name = "futures-executor" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-intrusive" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d930c203dd0b6ff06e0201a4a2fe9149b43c684fd4420555b26d21b1a02956f" +dependencies = [ + "futures-core", + "lock_api", + "parking_lot", +] + +[[package]] +name = "futures-io" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" + [[package]] name = "futures-macro" version = "0.3.30" @@ -975,8 +1173,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ "futures-core", + "futures-io", "futures-macro", + "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", "slab", @@ -1053,6 +1254,19 @@ name = "hashbrown" version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" +dependencies = [ + "ahash", + "allocator-api2", +] + +[[package]] +name = "hashlink" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8094feaf31ff591f651a2664fb9cfd92bba7a60ce3197265e9482ebe753c8f7" +dependencies = [ + "hashbrown 0.14.5", +] [[package]] name = "heck" @@ -1068,6 +1282,9 @@ name = "heck" version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" +dependencies = [ + "unicode-segmentation", +] [[package]] name = "heck" @@ -1096,6 +1313,15 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "hkdf" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b5f8eb2ad728638ea2c7d47a21db23b7b58a72ed6a38256b8a1849f15fbbdf7" +dependencies = [ + "hmac", +] + [[package]] name = "hmac" version = "0.12.1" @@ -1105,6 +1331,15 @@ dependencies = [ "digest", ] +[[package]] +name = "home" +version = "0.5.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3d1354bf6b7235cb4a0576c2619fd4ed18183f689b12b006a0ee7329eeff9a5" +dependencies = [ + "windows-sys 0.52.0", +] + [[package]] name = "http" version = "0.2.12" @@ -1302,6 +1537,15 @@ dependencies = [ "serde", ] +[[package]] +name = "ipnetwork" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf466541e9d546596ee94f9f69590f89473455f88372423e0008fc1a7daf100e" +dependencies = [ + "serde", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.1" @@ -1354,6 +1598,9 @@ name = "lazy_static" version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" +dependencies = [ + "spin 0.9.8", +] [[package]] name = "libc" @@ -1361,6 +1608,23 @@ version = "0.2.155" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" +[[package]] +name = "libm" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" + +[[package]] +name = "libsqlite3-sys" +version = "0.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf4e226dcd58b4be396f7bd3c20da8fdee2911400705297ba7d2d7cc2c30f716" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "linux-raw-sys" version = "0.4.14" @@ -1421,6 +1685,12 @@ dependencies = [ "unicase", ] +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "miniz_oxide" version = "0.7.4" @@ -1459,6 +1729,33 @@ dependencies = [ "tempfile", ] +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + +[[package]] +name = "num-bigint-dig" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc84195820f291c7697304f3cbdadd1cb7199c0efc917ff5eafd71225c136151" +dependencies = [ + "byteorder", + "lazy_static", + "libm", + "num-integer", + "num-iter", + "num-traits", + "rand", + "smallvec", + "zeroize", +] + [[package]] name = "num-conv" version = "0.1.0" @@ -1474,6 +1771,17 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-iter" +version = "0.1.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1429034a0490724d0075ebb2bc9e875d6503c3cf69e235a8941aa757d83ef5bf" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.19" @@ -1481,6 +1789,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" dependencies = [ "autocfg", + "libm", ] [[package]] @@ -1566,10 +1875,12 @@ checksum = "7f222829ae9293e33a9f5e9f440c6760a3d450a64affe1846486b140db81c1f4" [[package]] name = "papergrid" -version = "0.5.1" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "453cf71f2a37af495a1a124bf30d4d7469cfbea58e9f2479be9d222396a518a2" +checksum = "c7419ad52a7de9b60d33e11085a0fe3df1fbd5926aa3f93d3dd53afbc9e86725" dependencies = [ + "ansi-str", + "ansitok", "bytecount", "fnv", "unicode-width", @@ -1598,6 +1909,21 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "paste" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" + +[[package]] +name = "pem-rfc7468" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88b39c9bfcfc231068454382784bb460aae594343fb030d46e9f50a645418412" +dependencies = [ + "base64ct", +] + [[package]] name = "percent-encoding" version = "2.3.1" @@ -1681,6 +2007,27 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkcs1" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8ffb9f10fa047879315e6625af03c164b16962a5368d724ed16323b68ace47f" +dependencies = [ + "der", + "pkcs8", + "spki", +] + +[[package]] +name = "pkcs8" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f950b2377845cebe5cf8b5165cb3cc1a5e0fa5cfa3e1f7f55707d8fd82e0a7b7" +dependencies = [ + "der", + "spki", +] + [[package]] name = "pkg-config" version = "0.3.30" @@ -1719,7 +2066,7 @@ version = "0.2.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dee4364d9f3b902ef14fab8a1ddffb783a1cb6b4bba3bfc1fa3922732c7de97f" dependencies = [ - "zerocopy", + "zerocopy 0.6.6", ] [[package]] @@ -1946,7 +2293,7 @@ dependencies = [ [[package]] name = "rivet-term" version = "0.1.0" -source = "git+https://github.com/rivet-gg/rivet-term.git?rev=f70f76f63eba12e535e53db1b436d5297edf979a#f70f76f63eba12e535e53db1b436d5297edf979a" +source = "git+https://github.com/rivet-gg/rivet-term.git?rev=d539a07d2920d47b88410f20e6d106b497cff1f5#d539a07d2920d47b88410f20e6d106b497cff1f5" dependencies = [ "console", "derive_builder", @@ -1956,6 +2303,26 @@ dependencies = [ "tokio", ] +[[package]] +name = "rsa" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d0e5124fcb30e76a7e79bfee683a2746db83784b86289f6251b54b7950a0dfc" +dependencies = [ + "const-oid", + "digest", + "num-bigint-dig", + "num-integer", + "num-traits", + "pkcs1", + "pkcs8", + "rand_core", + "signature", + "spki", + "subtle", + "zeroize", +] + [[package]] name = "rustc-demangle" version = "0.1.24" @@ -2263,6 +2630,16 @@ dependencies = [ "libc", ] +[[package]] +name = "signature" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de" +dependencies = [ + "digest", + "rand_core", +] + [[package]] name = "simd-abstraction" version = "0.7.1" @@ -2308,6 +2685,237 @@ name = "spin" version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] + +[[package]] +name = "spki" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d91ed6c858b01f942cd56b37a94b3e0a1798290327d1236e4d9cf4eaca44d29d" +dependencies = [ + "base64ct", + "der", +] + +[[package]] +name = "sqlformat" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f895e3734318cc55f1fe66258926c9b910c124d47520339efecbb6c59cec7c1f" +dependencies = [ + "nom", + "unicode_categories", +] + +[[package]] +name = "sqlx" +version = "0.7.4" +source = "git+https://github.com/rivet-gg/sqlx?rev=08d6e61aa0572e7ec557abbedb72cebb96e1ac5b#08d6e61aa0572e7ec557abbedb72cebb96e1ac5b" +dependencies = [ + "sqlx-core", + "sqlx-macros", + "sqlx-mysql", + "sqlx-postgres", + "sqlx-sqlite", +] + +[[package]] +name = "sqlx-core" +version = "0.7.4" +source = "git+https://github.com/rivet-gg/sqlx?rev=08d6e61aa0572e7ec557abbedb72cebb96e1ac5b#08d6e61aa0572e7ec557abbedb72cebb96e1ac5b" +dependencies = [ + "ahash", + "atoi", + "bit-vec", + "byteorder", + "bytes", + "crc", + "crossbeam-queue", + "either", + "event-listener", + "futures-channel", + "futures-core", + "futures-intrusive", + "futures-io", + "futures-util", + "hashlink", + "hex", + "indexmap 2.3.0", + "ipnetwork", + "log", + "memchr", + "native-tls", + "once_cell", + "paste", + "percent-encoding", + "rand", + "serde", + "serde_json", + "sha2", + "smallvec", + "sqlformat", + "thiserror", + "tokio", + "tokio-stream", + "tracing", + "url", + "uuid", +] + +[[package]] +name = "sqlx-macros" +version = "0.7.4" +source = "git+https://github.com/rivet-gg/sqlx?rev=08d6e61aa0572e7ec557abbedb72cebb96e1ac5b#08d6e61aa0572e7ec557abbedb72cebb96e1ac5b" +dependencies = [ + "proc-macro2", + "quote", + "sqlx-core", + "sqlx-macros-core", + "syn 1.0.109", +] + +[[package]] +name = "sqlx-macros-core" +version = "0.7.4" +source = "git+https://github.com/rivet-gg/sqlx?rev=08d6e61aa0572e7ec557abbedb72cebb96e1ac5b#08d6e61aa0572e7ec557abbedb72cebb96e1ac5b" +dependencies = [ + "dotenvy", + "either", + "heck 0.4.1", + "hex", + "once_cell", + "proc-macro2", + "quote", + "serde", + "serde_json", + "sha2", + "sqlx-core", + "sqlx-mysql", + "sqlx-postgres", + "sqlx-sqlite", + "syn 1.0.109", + "tempfile", + "tokio", + "url", +] + +[[package]] +name = "sqlx-mysql" +version = "0.7.4" +source = "git+https://github.com/rivet-gg/sqlx?rev=08d6e61aa0572e7ec557abbedb72cebb96e1ac5b#08d6e61aa0572e7ec557abbedb72cebb96e1ac5b" +dependencies = [ + "atoi", + "base64 0.21.7", + "bitflags 2.6.0", + "byteorder", + "bytes", + "crc", + "digest", + "dotenvy", + "either", + "futures-channel", + "futures-core", + "futures-io", + "futures-util", + "generic-array", + "hex", + "hkdf", + "hmac", + "itoa 1.0.11", + "log", + "md-5", + "memchr", + "once_cell", + "percent-encoding", + "rand", + "rsa", + "serde", + "sha1", + "sha2", + "smallvec", + "sqlx-core", + "stringprep", + "thiserror", + "tracing", + "uuid", + "whoami", +] + +[[package]] +name = "sqlx-postgres" +version = "0.7.4" +source = "git+https://github.com/rivet-gg/sqlx?rev=08d6e61aa0572e7ec557abbedb72cebb96e1ac5b#08d6e61aa0572e7ec557abbedb72cebb96e1ac5b" +dependencies = [ + "atoi", + "base64 0.21.7", + "bit-vec", + "bitflags 2.6.0", + "byteorder", + "crc", + "dotenvy", + "etcetera", + "futures-channel", + "futures-core", + "futures-io", + "futures-util", + "hex", + "hkdf", + "hmac", + "home", + "ipnetwork", + "itoa 1.0.11", + "log", + "md-5", + "memchr", + "once_cell", + "rand", + "serde", + "serde_json", + "sha2", + "smallvec", + "sqlx-core", + "stringprep", + "thiserror", + "tracing", + "uuid", + "whoami", +] + +[[package]] +name = "sqlx-sqlite" +version = "0.7.4" +source = "git+https://github.com/rivet-gg/sqlx?rev=08d6e61aa0572e7ec557abbedb72cebb96e1ac5b#08d6e61aa0572e7ec557abbedb72cebb96e1ac5b" +dependencies = [ + "atoi", + "flume", + "futures-channel", + "futures-core", + "futures-executor", + "futures-intrusive", + "futures-util", + "libsqlite3-sys", + "log", + "percent-encoding", + "serde", + "sqlx-core", + "tracing", + "url", + "urlencoding", + "uuid", +] + +[[package]] +name = "stringprep" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b4df3d392d81bd458a8a621b8bffbd2302a12ffe288a9d931670948749463b1" +dependencies = [ + "unicode-bidi", + "unicode-normalization", + "unicode-properties", +] [[package]] name = "strsim" @@ -2327,6 +2935,15 @@ version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125" +[[package]] +name = "strum" +version = "0.26.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06" +dependencies = [ + "strum_macros 0.26.4", +] + [[package]] name = "strum_macros" version = "0.25.3" @@ -2340,6 +2957,19 @@ dependencies = [ "syn 2.0.72", ] +[[package]] +name = "strum_macros" +version = "0.26.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c6bee85a5a24955dc440386795aa378cd9cf82acd5f764469152d2270e581be" +dependencies = [ + "heck 0.5.0", + "proc-macro2", + "quote", + "rustversion", + "syn 2.0.72", +] + [[package]] name = "subtle" version = "2.6.1" @@ -2397,20 +3027,21 @@ dependencies = [ [[package]] name = "tabled" -version = "0.8.0" +version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5b2f8c37d26d87d2252187b0a45ea3cbf42baca10377c7e7eaaa2800fa9bf97" +checksum = "77c9303ee60b9bedf722012ea29ae3711ba13a67c9b9ae28993838b63057cb1b" dependencies = [ + "ansi-str", + "ansitok", "papergrid", "tabled_derive", - "unicode-width", ] [[package]] name = "tabled_derive" -version = "0.4.0" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9ee618502f497abf593e1c5c9577f34775b111480009ffccd7ad70d23fcaba8" +checksum = "bf0fb8bfdc709786c154e24a66777493fb63ae97e3036d914c8666774c477069" dependencies = [ "heck 0.4.1", "proc-macro-error", @@ -2734,6 +3365,12 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "unicode-properties" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52ea75f83c0137a9b98608359a5f1af8144876eb67bcb1ce837368e906a9f524" + [[package]] name = "unicode-segmentation" version = "1.11.0" @@ -2742,9 +3379,15 @@ checksum = "d4c87d22b6e3f4a18d4d40ef354e97c90fcb14dd91d7dc0aa9d8a1172ebf7202" [[package]] name = "unicode-width" -version = "0.1.13" +version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0336d538f7abc86d282a4189614dfaa90810dfc2c6f6427eaf88e16311dd225d" +checksum = "e51733f11c9c4f72aa0c160008246859e340b00807569a0da0e7a1079b27ba85" + +[[package]] +name = "unicode_categories" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e" [[package]] name = "unsafe-libyaml" @@ -2809,6 +3452,27 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "vte" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6cbce692ab4ca2f1f3047fcf732430249c0e971bfdd2b234cf2c47ad93af5983" +dependencies = [ + "arrayvec", + "utf8parse", + "vte_generate_state_changes", +] + +[[package]] +name = "vte_generate_state_changes" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e369bee1b05d510a7b4ed645f5faa90619e05437111783ea5848f28d97d3c2e" +dependencies = [ + "proc-macro2", + "quote", +] + [[package]] name = "want" version = "0.3.1" @@ -2824,6 +3488,12 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasite" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" + [[package]] name = "wasm-bindgen" version = "0.2.92" @@ -2925,6 +3595,16 @@ version = "0.25.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" +[[package]] +name = "whoami" +version = "1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "372d5b87f58ec45c384ba03563b03544dc5fadc3983e434b286913f5b4a9bb6d" +dependencies = [ + "redox_syscall", + "wasite", +] + [[package]] name = "wildmatch" version = "2.3.4" @@ -3135,6 +3815,12 @@ version = "0.13.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4" +[[package]] +name = "yansi" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfe53a6657fd280eaa890a3bc59152892ffa3e30101319d168b781ed6529b049" + [[package]] name = "zerocopy" version = "0.6.6" @@ -3142,7 +3828,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "854e949ac82d619ee9a14c66a1b674ac730422372ccb759ce0c39cabcf2bf8e6" dependencies = [ "byteorder", - "zerocopy-derive", + "zerocopy-derive 0.6.6", +] + +[[package]] +name = "zerocopy" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" +dependencies = [ + "zerocopy-derive 0.7.35", ] [[package]] @@ -3156,6 +3851,17 @@ dependencies = [ "syn 2.0.72", ] +[[package]] +name = "zerocopy-derive" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.72", +] + [[package]] name = "zeroize" version = "1.8.1" diff --git a/lib/bolt/cli/Cargo.toml b/lib/bolt/cli/Cargo.toml index 6690e0e511..9f98957eee 100644 --- a/lib/bolt/cli/Cargo.toml +++ b/lib/bolt/cli/Cargo.toml @@ -14,10 +14,10 @@ clap = { version = "4.3", features = ["derive", "env"] } duct = "0.13" json-patch = "1.2" rivet-api = { path = "../../../sdks/full/rust" } -rivet-term = { git = "https://github.com/rivet-gg/rivet-term.git", rev = "f70f76f63eba12e535e53db1b436d5297edf979a" } +rivet-term = { git = "https://github.com/rivet-gg/rivet-term.git", rev = "d539a07d2920d47b88410f20e6d106b497cff1f5" } serde_json = "1.0" serde_plain = "1.0.2" -tabled = "0.8.0" +tabled = "0.16.0" tempfile = "3.2" tokio = { version = "1.29", features = ["full"] } toml = "0.7" diff --git a/lib/bolt/cli/src/commands/cluster/server.rs b/lib/bolt/cli/src/commands/cluster/server.rs index 4cfa0bd044..c12c4c3b0d 100644 --- a/lib/bolt/cli/src/commands/cluster/server.rs +++ b/lib/bolt/cli/src/commands/cluster/server.rs @@ -94,6 +94,7 @@ pub enum SubCommand { }, } +// TODO: Move API calls and rendering to bolt core impl SubCommand { pub async fn execute(self, ctx: ProjectContext) -> Result<()> { match self { diff --git a/lib/bolt/cli/src/commands/mod.rs b/lib/bolt/cli/src/commands/mod.rs index 35295dc034..1a003ffa8e 100644 --- a/lib/bolt/cli/src/commands/mod.rs +++ b/lib/bolt/cli/src/commands/mod.rs @@ -12,3 +12,4 @@ pub mod secret; pub mod terraform; pub mod test; pub mod up; +pub mod wf; diff --git a/lib/bolt/cli/src/commands/wf/mod.rs b/lib/bolt/cli/src/commands/wf/mod.rs new file mode 100644 index 0000000000..b5cb0f3a08 --- /dev/null +++ b/lib/bolt/cli/src/commands/wf/mod.rs @@ -0,0 +1,86 @@ +use anyhow::*; +use bolt_core::{ + context::ProjectContext, + tasks::{ + self, + wf::{KvPair, WorkflowState}, + }, +}; +use clap::Parser; +use uuid::Uuid; + +mod signal; + +#[derive(Parser)] +pub enum SubCommand { + /// Prints the given workflow. + Get { + #[clap(index = 1)] + workflow_id: Uuid, + }, + /// Finds workflows with the given tags, name and state. + List { + tags: Vec, + /// Workflow name. + #[clap(long, short = 'n')] + name: Option, + #[clap(long, short = 's')] + state: Option, + /// Prints paragraphs instead of a table. + #[clap(long, short = 'p')] + pretty: bool, + }, + /// Silences a workflow from showing up as dead or running again. + Ack { + #[clap(index = 1)] + workflow_id: Uuid, + }, + /// Sets the wake immediate property of a workflow to true. + Wake { + #[clap(index = 1)] + workflow_id: Uuid, + }, + /// Lists the entire event history of a workflow. + History { + #[clap(index = 1)] + workflow_id: Uuid, + #[clap(short = 'f', long)] + include_forgotten: bool, + #[clap(short = 'l', long)] + print_location: bool, + }, + Signal { + #[clap(subcommand)] + command: signal::SubCommand, + }, +} + +impl SubCommand { + pub async fn execute(self, ctx: ProjectContext) -> Result<()> { + match self { + Self::Get { workflow_id } => { + let workflow = tasks::wf::get_workflow(&ctx, workflow_id).await?; + tasks::wf::print_workflows(workflow.into_iter().collect(), true).await + } + Self::List { + tags, + name, + state, + pretty, + } => { + let workflows = tasks::wf::find_workflows(&ctx, tags, name, state).await?; + tasks::wf::print_workflows(workflows, pretty).await + } + Self::Ack { workflow_id } => tasks::wf::silence_workflow(&ctx, workflow_id).await, + Self::Wake { workflow_id } => tasks::wf::wake_workflow(&ctx, workflow_id).await, + Self::History { + workflow_id, + include_forgotten, + print_location, + } => { + tasks::wf::print_history(&ctx, workflow_id, include_forgotten, print_location).await + } + Self::Signal { command } => command.execute(ctx).await, + } + } +} diff --git a/lib/bolt/cli/src/commands/wf/signal.rs b/lib/bolt/cli/src/commands/wf/signal.rs new file mode 100644 index 0000000000..8061ec97e8 --- /dev/null +++ b/lib/bolt/cli/src/commands/wf/signal.rs @@ -0,0 +1,61 @@ +use anyhow::*; +use bolt_core::{ + context::ProjectContext, + tasks::{ + self, + wf::{signal::SignalState, KvPair}, + }, +}; +use clap::Parser; +use uuid::Uuid; + +#[derive(Parser)] +pub enum SubCommand { + /// Prints the given signal. + Get { + #[clap(index = 1)] + signal_id: Uuid, + }, + /// Finds signals that match the given tags. + List { + tags: Vec, + #[clap(long, short = 'w')] + workflow_id: Option, + /// Signal name. + #[clap(long, short = 'n')] + name: Option, + #[clap(long, short = 's')] + state: Option, + /// Prints paragraphs instead of a table. + #[clap(long, short = 'p')] + pretty: bool, + }, + /// Silences a signal from showing up as dead or running again. + Ack { + #[clap(index = 1)] + signal_id: Uuid, + }, +} + +impl SubCommand { + pub async fn execute(self, ctx: ProjectContext) -> Result<()> { + match self { + Self::Get { signal_id } => { + let signal = tasks::wf::signal::get_signal(&ctx, signal_id).await?; + tasks::wf::signal::print_signals(signal.into_iter().collect(), true).await + } + Self::List { + tags, + workflow_id, + name, + state, + pretty, + } => { + let signals = + tasks::wf::signal::find_signals(&ctx, tags, workflow_id, name, state).await?; + tasks::wf::signal::print_signals(signals, pretty).await + } + Self::Ack { signal_id } => tasks::wf::signal::silence_signal(&ctx, signal_id).await, + } + } +} diff --git a/lib/bolt/cli/src/main.rs b/lib/bolt/cli/src/main.rs index e88bf21a5c..4ec015015b 100644 --- a/lib/bolt/cli/src/main.rs +++ b/lib/bolt/cli/src/main.rs @@ -76,6 +76,12 @@ enum SubCommand { #[clap(subcommand)] command: admin::SubCommand, }, + /// Manages internal workflows. + #[clap(alias = "wf")] + Workflow { + #[clap(subcommand)] + command: wf::SubCommand, + }, } #[tokio::main] @@ -125,6 +131,7 @@ async fn main_inner() -> Result { SubCommand::Database { command } => command.execute(ctx).await?, SubCommand::Cluster { command } => command.execute(ctx).await?, SubCommand::Admin { command } => command.execute(ctx).await?, + SubCommand::Workflow { command } => command.execute(ctx).await?, } Ok(std::process::ExitCode::SUCCESS) diff --git a/lib/bolt/core/Cargo.toml b/lib/bolt/core/Cargo.toml index 7d5c9dd409..df4dc7d924 100644 --- a/lib/bolt/core/Cargo.toml +++ b/lib/bolt/core/Cargo.toml @@ -14,15 +14,17 @@ base64 = "0.13" bolt-config = { path = "../config" } chrono = "0.4" cjson = "0.1" +clap = { version = "4.3", features = ["derive"] } +colored_json = "5.0.0" derive_builder = "0.12" duct = "0.13" futures-util = "0.3" handlebars = "4.3.7" heck = "0.3" hex = "0.4" +indexmap = "2.0" indicatif = "0.16" indoc = "1.0" -indexmap = "2.0" ipnet = { version = "2.7", features = ["serde"] } json-patch = "1.2" lazy_static = "1.4" @@ -33,12 +35,14 @@ rand = "0.8" regex = "1.5" reqwest = { version = "0.11", features = ["json"] } rivet-api = { path = "../../../sdks/full/rust" } -rivet-term = { git = "https://github.com/rivet-gg/rivet-term.git", rev = "f70f76f63eba12e535e53db1b436d5297edf979a" } +rivet-term = { git = "https://github.com/rivet-gg/rivet-term.git", rev = "d539a07d2920d47b88410f20e6d106b497cff1f5" } s3-util = { path = "../../s3-util" } serde = { version ="1.0", features = ["derive"] } serde_json = "1.0" serde_yaml = "0.9" sha2 = "0.10" +strum = { version = "0.26", features = ["derive"] } +tabled = { version = "0.16.0", features = ["ansi"] } tempfile = "3.2" thiserror = "1.0" tokio = { version = "1.29", features = ["full"] } @@ -48,3 +52,18 @@ url = "2.3" urlencoding = "2.1" uuid = { version = "1", features = ["v4"] } wildmatch = "2.1" + +[dependencies.sqlx] +git = "https://github.com/rivet-gg/sqlx" +rev = "08d6e61aa0572e7ec557abbedb72cebb96e1ac5b" +default-features = false +features = [ + "runtime-tokio", + "runtime-tokio-native-tls", + "postgres", + "uuid", + "macros", + "ipnetwork", + "json", + "bit-vec", +] diff --git a/lib/bolt/core/src/tasks/api.rs b/lib/bolt/core/src/tasks/api.rs index 07a84a5ed6..cc824df195 100644 --- a/lib/bolt/core/src/tasks/api.rs +++ b/lib/bolt/core/src/tasks/api.rs @@ -1,8 +1,4 @@ use anyhow::*; -use rivet_api::{ - apis::{admin_clusters_api, admin_clusters_servers_api}, - models, -}; use serde_json::json; use crate::context::ProjectContext; @@ -44,41 +40,3 @@ pub async fn access_token_login(project_ctx: &ProjectContext, name: String) -> R Ok(()) } - -pub async fn get_cluster_server_ips( - project_ctx: &ProjectContext, - server_id: Option<&str>, - pools: Option<&str>, -) -> Result> { - todo!() -} - -// pub struct ServerFilterQuery { -// pool: Option, -// datacenter: Option, -// public_ip: Option, -// } -// -// pub async fn get_cluster_server_ips( -// project_ctx: &ProjectContext, -// cluster_id: Uuid, -// pool_type: Option<&str>, -// datacenter_id: Option<&str>, -// ) -> Result> { -// let server_ips = admin_clusters_servers_api::admin_clusters_servers_list( -// &project_ctx.openapi_config_cloud().await?, -// cluster_id, -// datacenter_id.as_ref().map(String::as_str), -// pool_type -// .map(|p| match p { -// "job" => Ok(models::AdminClustersPoolType::Job), -// "gg" => Ok(models::AdminClustersPoolType::Gg), -// "ats" => Ok(models::AdminClustersPoolType::Ats), -// _ => Err(anyhow!("invalid pool type")), -// }) -// .transpose()?, -// ) -// .await?; -// -// Ok(server_ips.ips) -// } diff --git a/lib/bolt/core/src/tasks/db.rs b/lib/bolt/core/src/tasks/db/mod.rs similarity index 71% rename from lib/bolt/core/src/tasks/db.rs rename to lib/bolt/core/src/tasks/db/mod.rs index 1743871a7b..75a4d33d99 100644 --- a/lib/bolt/core/src/tasks/db.rs +++ b/lib/bolt/core/src/tasks/db/mod.rs @@ -7,9 +7,12 @@ use tokio::{io::AsyncWriteExt, task::block_in_place}; use crate::{ config::{self, service::RuntimeKind}, context::{ProjectContext, ServiceContext}, + dep, utils::{self, db_conn::DatabaseConnections}, }; +pub mod sqlx; + const REDIS_IMAGE: &str = "ghcr.io/rivet-gg/redis:cc3241e"; pub enum LogType { @@ -150,63 +153,59 @@ async fn redis_shell(shell_ctx: ShellContext<'_>) -> Result<()> { cmd.push_str(" --cacert /local/redis-ca.crt"); } - let overrides = json!({ - "apiVersion": "v1", - "metadata": { - "namespace": "bolt", - }, - "spec": { - "containers": [ - { - "name": "redis", - "image": REDIS_IMAGE, - "command": ["sleep", "10000"], - "env": env, - "stdin": true, - "stdinOnce": true, - "tty": true, - "volumeMounts": if mount_ca { - json!([{ - "name": "redis-ca", - "mountPath": "/local/redis-ca.crt", - "subPath": "redis-ca.crt" - }]) - } else { - json!([]) - } + let pod_spec = json!({ + "restartPolicy": "Never", + "terminationGracePeriodSeconds": 0, + "containers": [ + { + "name": "redis", + "image": REDIS_IMAGE, + "command": ["sleep", "10000"], + "env": env, + "stdin": true, + "stdinOnce": true, + "tty": true, + "volumeMounts": if mount_ca { + json!([{ + "name": "redis-ca", + "mountPath": "/local/redis-ca.crt", + "subPath": "redis-ca.crt" + }]) + } else { + json!([]) } - ], - "volumes": if mount_ca { - json!([{ - "name": "redis-ca", - "configMap": { - "name": format!("redis-{}-ca", db_name), - "defaultMode": 420, - // Distributed clusters don't need a CA for redis - "optional": true, - "items": [ - { - "key": "ca.crt", - "path": "redis-ca.crt" - } - ] - } - }]) - } else { - json!([]) } + ], + "volumes": if mount_ca { + json!([{ + "name": "redis-ca", + "configMap": { + "name": format!("redis-{}-ca", db_name), + "defaultMode": 420, + // Distributed clusters don't need a CA for redis + "optional": true, + "items": [ + { + "key": "ca.crt", + "path": "redis-ca.crt" + } + ] + } + }]) + } else { + json!([]) } }); let pod_name = format!("redis-{db_name}-sh-persistent"); - start_persistent_pod(ctx, "Redis", &pod_name, overrides).await?; + start_persistent_pod(ctx, "Redis", &pod_name, pod_spec).await?; // Connect to persistent pod block_in_place(|| { cmd!( "kubectl", "exec", - format!("pod/{pod_name}"), + format!("job/{pod_name}"), "-it", "-n", "bolt", @@ -240,10 +239,10 @@ pub async fn crdb_shell(shell_ctx: ShellContext<'_>) -> Result<()> { let mut query_cmd = String::new(); for ShellQuery { svc, query } in queries { let db_name = svc.crdb_db_name(); - let conn = conn.cockroach_host.as_ref().unwrap(); + let host = conn.cockroach_host.as_ref().unwrap(); let username = ctx.read_secret(&["crdb", "username"]).await?; let password = ctx.read_secret(&["crdb", "password"]).await?; - let mut db_url = format!("postgres://{}:{}@{}/{}", username, password, conn, db_name); + let mut db_url = format!("postgres://{}:{}@{}/{}", username, password, host, db_name); // Add SSL if !forwarded { @@ -278,59 +277,55 @@ pub async fn crdb_shell(shell_ctx: ShellContext<'_>) -> Result<()> { block_in_place(|| cmd!("bash", "-c", query_cmd).run())?; } else { - let overrides = json!({ - "apiVersion": "v1", - "metadata": { - "namespace": "bolt", - }, - "spec": { - "containers": [ - { - "name": "postgres", - "image": "postgres", - "command": ["sleep", "10000"], - "env": [ - // See https://github.com/cockroachdb/cockroach/issues/37129#issuecomment-600115995 - { - "name": "PGCLIENTENCODING", - "value": "utf-8", - } - ], - "stdin": true, - "stdinOnce": true, - "tty": true, - "volumeMounts": [{ - "name": "crdb-ca", - "mountPath": "/local/crdb-ca.crt", - "subPath": "crdb-ca.crt" - }] - } - ], - "volumes": [{ - "name": "crdb-ca", - "configMap": { + let pod_spec = json!({ + "restartPolicy": "Never", + "terminationGracePeriodSeconds": 0, + "containers": [ + { + "name": "postgres", + "image": "postgres", + "command": ["sleep", "10000"], + "env": [ + // See https://github.com/cockroachdb/cockroach/issues/37129#issuecomment-600115995 + { + "name": "PGCLIENTENCODING", + "value": "utf-8", + } + ], + "stdin": true, + "stdinOnce": true, + "tty": true, + "volumeMounts": [{ "name": "crdb-ca", - "defaultMode": 420, - "items": [ - { - "key": "ca.crt", - "path": "crdb-ca.crt" - } - ] - } - }] - } + "mountPath": "/local/crdb-ca.crt", + "subPath": "crdb-ca.crt" + }] + } + ], + "volumes": [{ + "name": "crdb-ca", + "configMap": { + "name": "crdb-ca", + "defaultMode": 420, + "items": [ + { + "key": "ca.crt", + "path": "crdb-ca.crt" + } + ] + } + }] }); let pod_name = "crdb-sh-persistent"; - start_persistent_pod(ctx, "Cockroach", pod_name, overrides).await?; + start_persistent_pod(ctx, "Cockroach", pod_name, pod_spec).await?; // Connect to persistent pod block_in_place(|| { cmd!( "kubectl", "exec", - format!("pod/{pod_name}"), + format!("job/{pod_name}"), "-it", "-n", "bolt", @@ -414,58 +409,54 @@ pub async fn clickhouse_shell(shell_ctx: ShellContext<'_>, no_db: bool) -> Resul block_in_place(|| cmd!("bash", "-c", query_cmd).run())?; } else { - let overrides = json!({ - "apiVersion": "v1", - "metadata": { - "namespace": "bolt", - }, - "spec": { - "containers": [ - { - "name": "clickhouse", - "image": "clickhouse/clickhouse-server", - "command": ["sh", "-c"], - "args": [query_cmd], - "stdin": true, - "stdinOnce": true, - "tty": true, - "volumeMounts": [ - { - "name": "clickhouse-ca", - "mountPath": "/local/clickhouse-ca.crt", - "subPath": "clickhouse-ca.crt" - }, - { - "name": "clickhouse-config", - "mountPath": "/local/config.yml", - "subPath": "config.yml", - } - ] - } - ], - "volumes": [{ + let pod_spec = json!({ + "restartPolicy": "Never", + "terminationGracePeriodSeconds": 0, + "containers": [ + { + "name": "clickhouse", + "image": "clickhouse/clickhouse-server", + "command": ["sh", "-c"], + "args": [query_cmd], + "stdin": true, + "stdinOnce": true, + "tty": true, + "volumeMounts": [ + { + "name": "clickhouse-ca", + "mountPath": "/local/clickhouse-ca.crt", + "subPath": "clickhouse-ca.crt" + }, + { + "name": "clickhouse-config", + "mountPath": "/local/config.yml", + "subPath": "config.yml", + } + ] + } + ], + "volumes": [{ + "name": "clickhouse-ca", + "configMap": { "name": "clickhouse-ca", - "configMap": { - "name": "clickhouse-ca", - "defaultMode": 420, - // Distributed clusters don't need a CA for clickhouse - "optional": true, - "items": [ - { - "key": "ca.crt", - "path": "clickhouse-ca.crt" - } - ] - } - }, { + "defaultMode": 420, + // Distributed clusters don't need a CA for clickhouse + "optional": true, + "items": [ + { + "key": "ca.crt", + "path": "clickhouse-ca.crt" + } + ] + } + }, { + "name": "clickhouse-config", + "configMap": { "name": "clickhouse-config", - "configMap": { - "name": "clickhouse-config", - "defaultMode": 420, - "optional": true - } - }] - } + "defaultMode": 420, + "optional": true + } + }] }); // Apply clickhouse config to K8s @@ -499,14 +490,14 @@ pub async fn clickhouse_shell(shell_ctx: ShellContext<'_>, no_db: bool) -> Resul } let pod_name = "clickhouse-sh-persistent"; - start_persistent_pod(ctx, "ClickHouse", pod_name, overrides).await?; + start_persistent_pod(ctx, "ClickHouse", pod_name, pod_spec).await?; // Connect to persistent pod block_in_place(|| { cmd!( "kubectl", "exec", - format!("pod/{pod_name}"), + format!("job/{pod_name}"), "-it", "-n", "bolt", @@ -527,14 +518,14 @@ pub async fn start_persistent_pod( ctx: &ProjectContext, title: &str, pod_name: &str, - overrides: serde_json::Value, + pod_spec: serde_json::Value, ) -> Result<()> { let res = block_in_place(|| { cmd!( "kubectl", "get", "pod", - pod_name, + format!("--selector=job-name={pod_name}"), "-n", "bolt", "--ignore-not-found" @@ -547,21 +538,49 @@ pub async fn start_persistent_pod( if !persistent_pod_exists { rivet_term::status::progress(&format!("Creating persistent {title} pod"), ""); - block_in_place(|| { - cmd!( - "kubectl", - "run", - "-q", - "--restart=Never", - "--image=postgres", - "-n", - "bolt", - format!("--overrides={overrides}"), - pod_name, - ) - .env("KUBECONFIG", ctx.gen_kubeconfig_path()) - .run() - })?; + let spec = json!({ + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": { + "name": pod_name, + "namespace": "bolt", + "labels": { + "app.kubernetes.io/name": pod_name + } + }, + "spec": { + "ttlSecondsAfterFinished": 0, + "completions": 1, + "backoffLimit": 0, + "template": { + "metadata": { + "labels": { + "app.kubernetes.io/name": pod_name, + }, + }, + "spec": pod_spec, + } + } + }); + + dep::k8s::cli::apply_specs(ctx, vec![spec]).await?; + + // block_in_place(|| { + // cmd!( + // "kubectl", + // "run", + // "-q", + // "--rm", + // "--restart=Never", + // "--image=postgres", + // "-n", + // "bolt", + // format!("--overrides={overrides}"), + // pod_name, + // ) + // .env("KUBECONFIG", ctx.gen_kubeconfig_path()) + // .run() + // })?; // Wait for ready tokio::time::sleep(std::time::Duration::from_secs(1)).await; diff --git a/lib/bolt/core/src/tasks/db/sqlx.rs b/lib/bolt/core/src/tasks/db/sqlx.rs new file mode 100644 index 0000000000..4f1eb2eefd --- /dev/null +++ b/lib/bolt/core/src/tasks/db/sqlx.rs @@ -0,0 +1,27 @@ +use std::time::Duration; + +use anyhow::*; +use sqlx::{pool::PoolConnection, PgPool, Postgres}; + +pub async fn build_pool(url: &str) -> Result { + sqlx::postgres::PgPoolOptions::new() + .acquire_timeout(Duration::from_secs(60)) + .max_lifetime(Duration::from_secs(15 * 60)) + .max_lifetime_jitter(Duration::from_secs(90)) + .idle_timeout(Some(Duration::from_secs(10 * 60))) + .min_connections(1) + .max_connections(4096) + .connect(url) + .await + .map_err(Into::into) +} + +pub async fn get_conn(pool: &PgPool) -> Result> { + // Attempt to use an existing connection + if let Some(conn) = pool.try_acquire() { + Ok(conn) + } else { + // Create a new connection + pool.acquire().await.map_err(Into::into) + } +} diff --git a/lib/bolt/core/src/tasks/migrate.rs b/lib/bolt/core/src/tasks/migrate.rs index d404536e86..d6762d4c60 100644 --- a/lib/bolt/core/src/tasks/migrate.rs +++ b/lib/bolt/core/src/tasks/migrate.rs @@ -599,37 +599,33 @@ async fn migration(ctx: &ProjectContext, migration_cmds: &[MigrateCmd]) -> Resul })); } - let overrides = json!({ - "apiVersion": "v1", - "metadata": { - "namespace": "bolt", - }, - "spec": { - "containers": [ - { - "name": "migrate", - "image": MIGRATE_IMAGE, - "command": ["sleep", "1000"], - // // See https://github.com/golang-migrate/migrate/issues/494 - // "env": [{ - // "name": "TZ", - // "value": "UTC" - // }], - "volumeMounts": mounts - } - ], - "volumes": volumes - } + let pod_spec = json!({ + "restartPolicy": "Never", + "terminationGracePeriodSeconds": 0, + "containers": [ + { + "name": "migrate", + "image": MIGRATE_IMAGE, + "command": ["sleep", "1000"], + // // See https://github.com/golang-migrate/migrate/issues/494 + // "env": [{ + // "name": "TZ", + // "value": "UTC" + // }], + "volumeMounts": mounts + } + ], + "volumes": volumes }); let pod_name = "migrate-sh-persistent"; - db::start_persistent_pod(ctx, "migrate", pod_name, overrides).await?; + db::start_persistent_pod(ctx, "migrate", pod_name, pod_spec).await?; block_in_place(|| { cmd!( "kubectl", "exec", - format!("pod/{pod_name}"), + format!("job/{pod_name}"), "-n", "bolt", "--", diff --git a/lib/bolt/core/src/tasks/mod.rs b/lib/bolt/core/src/tasks/mod.rs index 0512a2fd53..dd408f1757 100644 --- a/lib/bolt/core/src/tasks/mod.rs +++ b/lib/bolt/core/src/tasks/mod.rs @@ -9,3 +9,4 @@ pub mod migrate; pub mod ssh; pub mod test; pub mod up; +pub mod wf; diff --git a/lib/bolt/core/src/tasks/wf/mod.rs b/lib/bolt/core/src/tasks/wf/mod.rs new file mode 100644 index 0000000000..60575cc1b0 --- /dev/null +++ b/lib/bolt/core/src/tasks/wf/mod.rs @@ -0,0 +1,668 @@ +use std::cmp::Ordering; + +use anyhow::*; +use chrono::{Local, TimeZone}; +use clap::ValueEnum; +use indoc::indoc; +use rivet_term::console::style; +use sqlx::PgPool; +use uuid::Uuid; + +use crate::{ + context::ProjectContext, + tasks::db, + utils::{self, colored_json, db_conn::DatabaseConnections, indent_string}, +}; + +pub mod signal; + +#[derive(Debug, Clone)] +pub struct KvPair { + pub key: String, + pub value: String, +} + +#[derive(thiserror::Error, Debug)] +pub enum ParseKvPairError { + #[error("kv pairs must use `=` with no spaces (ex: foo=bar)")] + NoEquals, +} + +impl std::str::FromStr for KvPair { + type Err = ParseKvPairError; + + fn from_str(s: &str) -> Result { + let Some((key, value)) = s.split_once('=') else { + return Result::Err(ParseKvPairError::NoEquals); + }; + + let key = key.trim().to_string(); + let value = value.trim().to_string(); + + Result::Ok(KvPair { key, value }) + } +} + +#[derive(ValueEnum, Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +#[clap(rename_all = "kebab_case")] +pub enum WorkflowState { + Complete, + Running, + Sleeping, + Dead, +} + +#[derive(strum::FromRepr)] +enum EventType { + Activity = 0, + Signal = 1, + SubWorkflow = 2, + SignalSend = 3, + MessageSend = 4, + Loop = 5, +} + +impl EventType { + fn name(&self) -> &str { + match self { + EventType::Activity => "activity", + EventType::Signal => "signal receive", + EventType::SubWorkflow => "sub workflow", + EventType::SignalSend => "signal send", + EventType::MessageSend => "message send", + EventType::Loop => "loop", + } + } +} + +#[derive(Debug, sqlx::FromRow)] +pub struct WorkflowRow { + workflow_id: Uuid, + workflow_name: String, + tags: Option, + create_ts: i64, + input: serde_json::Value, + output: Option, + error: Option, + + is_active: bool, + has_wake_condition: bool, +} + +#[derive(sqlx::FromRow)] +struct HistoryEvent { + location: Vec, + t: i64, + name: Option, + input: Option, + output: Option, + forgotten: bool, +} + +pub async fn get_workflow(ctx: &ProjectContext, workflow_id: Uuid) -> Result> { + let (pool, _port) = build_pool(ctx).await?; + let mut conn = db::sqlx::get_conn(&pool).await?; + + let workflow = sqlx::query_as::<_, WorkflowRow>(indoc!( + " + SELECT + workflow_id, + workflow_name, + tags, + create_ts, + input, + output, + error, + worker_instance_id IS NOT NULL AS is_active, + ( + wake_immediate OR + wake_deadline_ts IS NOT NULL OR + cardinality(wake_signals) > 0 OR + wake_sub_workflow_id IS NOT NULL + ) AS has_wake_condition + FROM db_workflow.workflows + WHERE + workflow_id = $1 + " + )) + .bind(workflow_id) + .fetch_optional(&mut *conn) + .await?; + + Ok(workflow) +} + +pub async fn find_workflows( + ctx: &ProjectContext, + tags: Vec, + name: Option, + state: Option, +) -> Result> { + let (pool, _port) = build_pool(ctx).await?; + let mut conn = db::sqlx::get_conn(&pool).await?; + + let mut query_str = indoc!( + " + SELECT + workflow_id, + workflow_name, + tags, + create_ts, + input, + output, + error, + worker_instance_id IS NOT NULL AS is_active, + ( + wake_immediate OR + wake_deadline_ts IS NOT NULL OR + cardinality(wake_signals) > 0 OR + wake_sub_workflow_id IS NOT NULL + ) AS has_wake_condition + FROM db_workflow.workflows + WHERE + ($1 IS NULL OR workflow_name = $1) AND + silence_ts IS NULL AND + -- Complete + (NOT $2 OR output IS NOT NULL) AND + -- Running + (NOT $3 OR ( + output IS NULL AND + worker_instance_id IS NOT NULL + )) AND + -- Sleeping + (NOT $4 OR ( + output IS NULL AND + worker_instance_id IS NULL AND + ( + wake_immediate OR + wake_deadline_ts IS NOT NULL OR + cardinality(wake_signals) > 0 OR + wake_sub_workflow_id IS NOT NULL + ) + )) AND + -- Dead + (NOT $5 OR ( + output IS NULL AND + worker_instance_id IS NULL AND + wake_immediate = FALSE AND + wake_deadline_ts IS NULL AND + cardinality(wake_signals) = 0 AND + wake_sub_workflow_id IS NULL + )) + " + ) + .to_string(); + + // Procedurally add tags. We don't combine the tags into an object because we are comparing + // strings with `->>` whereas with @> and `serde_json::Map` we would have to know the type of the input + // given. + for i in 0..tags.len() { + let idx = i * 2 + 6; + let idx2 = idx + 1; + + query_str.push_str(&format!(" AND tags->>${idx} = ${idx2}")); + } + + query_str.push_str("LIMIT 100"); + + let mut query = sqlx::query_as::<_, WorkflowRow>(&query_str) + .bind(name) + .bind(matches!(state, Some(WorkflowState::Complete))) + .bind(matches!(state, Some(WorkflowState::Running))) + .bind(matches!(state, Some(WorkflowState::Sleeping))) + .bind(matches!(state, Some(WorkflowState::Dead))); + + for tag in tags { + query = query.bind(tag.key); + query = query.bind(tag.value); + } + + let workflows = query.fetch_all(&mut *conn).await?; + + Ok(workflows) +} + +pub async fn print_workflows(workflows: Vec, pretty: bool) -> Result<()> { + if workflows.is_empty() { + rivet_term::status::success("No workflows found", ""); + return Ok(()); + } + + rivet_term::status::success("Workflows", workflows.len()); + + if pretty { + for workflow in workflows { + println!(""); + + println!("{}", style(workflow.workflow_name).bold()); + + println!(" {} {}", style("id").bold(), workflow.workflow_id); + + let datetime = Local + .timestamp_millis_opt(workflow.create_ts) + .single() + .context("invalid ts")?; + let date = datetime.format("%Y-%m-%d %H:%M:%S"); + + println!(" {} {}", style("created at").bold(), style(date).magenta()); + + print!(" {} ", style("state").bold()); + if workflow.output.is_some() { + println!("{}", style("complete").bright().blue()); + } else if workflow.is_active { + println!("{}", style("running").green()); + } else if workflow.has_wake_condition { + println!("{}", style("sleeping").yellow()); + } else { + println!("{}", style("dead").red()); + } + + if let Some(error) = workflow.error { + println!( + " {} {}", + style("error").bold(), + style(&indent_string(&chunk_string(&error, 200).join("\n"), " ")[4..]) + .green() + ); + } + + println!( + " {} {}", + style("input").bold(), + &indent_string(&colored_json(&workflow.input)?, " ")[4..] + ); + + print!(" {} ", style("output").bold()); + if let Some(output) = workflow.output { + println!("{}", &indent_string(&colored_json(&output)?, " ")[4..]); + } else { + println!("{}", style("").dim()); + } + } + } else { + render::workflows(workflows)?; + } + + Ok(()) +} + +pub async fn silence_workflow(ctx: &ProjectContext, workflow_id: Uuid) -> Result<()> { + let (pool, _port) = build_pool(ctx).await?; + let mut conn = db::sqlx::get_conn(&pool).await?; + + sqlx::query(indoc!( + " + UPDATE db_workflow.workflows + SET silence_ts = $2 + WHERE workflow_id = $1 + " + )) + .bind(workflow_id) + .bind(utils::now()) + .execute(&mut *conn) + .await?; + + Ok(()) +} + +pub async fn wake_workflow(ctx: &ProjectContext, workflow_id: Uuid) -> Result<()> { + let (pool, _port) = build_pool(ctx).await?; + let mut conn = db::sqlx::get_conn(&pool).await?; + + sqlx::query(indoc!( + " + UPDATE db_workflow.workflows + SET wake_immediate = TRUE + WHERE workflow_id = $1 + " + )) + .bind(workflow_id) + .execute(&mut *conn) + .await?; + + Ok(()) +} + +pub async fn print_history( + ctx: &ProjectContext, + workflow_id: Uuid, + include_forgotten: bool, + print_location: bool, +) -> Result<()> { + let (pool, _port) = build_pool(ctx).await?; + let mut conn = db::sqlx::get_conn(&pool).await?; + let mut conn2 = db::sqlx::get_conn(&pool).await?; + + let (wf_row, events) = tokio::try_join!( + async move { + sqlx::query_as::<_, (String, serde_json::Value, Option)>(indoc!( + " + SELECT workflow_name, input, output + FROM db_workflow.workflows + WHERE workflow_id = $1 + " + )) + .bind(workflow_id) + .fetch_optional(&mut *conn) + .await + .map_err(Into::into) + }, + async move { + sqlx::query_as::<_, HistoryEvent>(indoc!( + " + WITH workflow_events AS ( + SELECT $1 AS workflow_id + ) + SELECT location, 0 AS t, activity_name AS name, input, output, forgotten + FROM db_workflow.workflow_activity_events, workflow_events + WHERE + workflow_activity_events.workflow_id = workflow_events.workflow_id AND ($2 OR NOT forgotten) + UNION ALL + SELECT location, 1 AS t, signal_name AS name, body as input, null as output, forgotten + FROM db_workflow.workflow_signal_events, workflow_events + WHERE + workflow_signal_events.workflow_id = workflow_events.workflow_id AND ($2 OR NOT forgotten) + UNION ALL + SELECT location, 2 AS t, w.workflow_name AS name, w.input, w.output, forgotten + FROM workflow_events, db_workflow.workflow_sub_workflow_events AS sw + JOIN db_workflow.workflows AS w + ON sw.sub_workflow_id = w.workflow_id + WHERE + sw.workflow_id = workflow_events.workflow_id AND ($2 OR NOT forgotten) + UNION ALL + SELECT location, 3 AS t, signal_name AS name, body as input, null as output, forgotten + FROM db_workflow.workflow_signal_send_events, workflow_events + WHERE + workflow_signal_send_events.workflow_id = workflow_events.workflow_id AND ($2 OR NOT forgotten) + UNION ALL + SELECT location, 4 AS t, message_name AS name, body as input, null as output, forgotten + FROM db_workflow.workflow_message_send_events, workflow_events + WHERE + workflow_message_send_events.workflow_id = workflow_events.workflow_id AND ($2 OR NOT forgotten) + UNION ALL + SELECT location, 5 AS t, NULL AS name, null as input, null as output, forgotten + FROM db_workflow.workflow_loop_events, workflow_events + WHERE + workflow_loop_events.workflow_id = workflow_events.workflow_id AND ($2 OR NOT forgotten) + ORDER BY location ASC; + " + )) + .bind(workflow_id) + .bind(include_forgotten) + .fetch_all(&mut *conn2) + .await + .map_err(Into::into) + } + )?; + + let Some((workflow_name, input, output)) = wf_row else { + rivet_term::status::success("No workflow found", ""); + + return Ok(()); + }; + + // Print header + { + println!(""); + + println!( + "{} {}", + style(workflow_name).yellow().bold(), + style(workflow_id).yellow() + ); + + print!( + " {} {}", + style("╰").yellow().dim(), + style("input").yellow() + ); + + let input = serde_json::to_string(&input)?; + let input_trim = input.chars().take(50).collect::(); + print!(" {}", style(input_trim).yellow().dim()); + if input.len() > 50 { + print!(" {}", style("...").yellow().dim()); + } + + println!("\n"); + } + + for i in 0..events.len() { + let event = events.get(i).unwrap(); + + let t = EventType::from_repr(event.t.try_into()?).context("invalid event type")?; + + // Indentation + print!("{}", " ".repeat(event.location.len().saturating_sub(1))); + + if print_location { + print!("{} ", style(event.location.last().unwrap()).dim()); + } + + if event.forgotten { + print!("{}", style(t.name()).red().dim().bold()); + } else { + print!("{}", style(t.name()).bold()); + } + + if let Some(name) = &event.name { + print!(" {}", style(name)); + } + + println!(""); + + if let Some(input) = &event.input { + print!("{}", " ".repeat(event.location.len())); + + let c = if event.output.is_none() { "╰" } else { "├" }; + let c = if event.forgotten { + style(c).red().dim() + } else { + style(c).dim() + }; + print!("{} ", c); + + print!("input"); + + let input = serde_json::to_string(&input)?; + let input_trim = input.chars().take(50).collect::(); + print!(" {}", style(input_trim).dim()); + if input.len() > 50 { + print!(" {}", style("...").dim()); + } + + println!(""); + } + + if let Some(output) = &event.output { + print!("{}", " ".repeat(event.location.len())); + + if event.forgotten { + print!("{} ", style("╰").red().dim()); + } else { + print!("{} ", style("╰").dim()); + } + + print!("output"); + + let output = serde_json::to_string(&output)?; + let output_trim = output.chars().take(50).collect::(); + print!(" {}", style(output_trim).dim()); + if output.len() > 50 { + print!(" {}", style("...").dim()); + } + + println!(""); + } + + if !matches!(t, EventType::Loop) { + println!(""); + } + + let next_event = events.get(i + 1); + if let Some(ne) = next_event { + match event.location.len().cmp(&ne.location.len()) { + Ordering::Equal => {} + Ordering::Less => { + let start = if matches!(t, EventType::Loop) { 1 } else { 0 }; + + for i in start..ne.location.len() - event.location.len() { + print!( + "{}", + " ".repeat(event.location.len().saturating_sub(1) + i) + ); + + if print_location { + print!( + "{} ", + style(ne.location.get(ne.location.len() - 2).unwrap()).dim() + ); + } + + if ne.forgotten { + print!("{}", style("branch").red().dim().bold()); + } else { + print!("{}", style("branch").bold()); + } + + println!(); + } + } + Ordering::Greater => { + for (j, (a, b)) in event.location.iter().zip(ne.location.iter()).enumerate() { + if a != b { + if j + 1 != ne.location.len() { + // Indentation + print!("{}", " ".repeat(j)); + + if print_location { + print!("{} ", style(b).dim()); + } + + if ne.forgotten { + print!("{}", style("branch").red().dim().bold()); + } else { + print!("{}", style("branch").bold()); + } + + println!(); + } + + break; + } + } + } + } + } + } + + // Print footer + if let Some(output) = output { + println!("{}", style("Workflow complete").yellow().bold()); + + print!( + " {} {}", + style("╰").yellow().dim(), + style("output").yellow() + ); + + let output = serde_json::to_string(&output)?; + let output_trim = output.chars().take(50).collect::(); + print!(" {}", style(output_trim).yellow().dim()); + if output.len() > 50 { + print!(" {}", style("...").yellow().dim()); + } + + println!(""); + } + + Ok(()) +} + +async fn build_pool(ctx: &ProjectContext) -> Result<(PgPool, utils::DroppablePort)> { + let db_workflow = ctx.service_with_name("db-workflow").await; + let db_conn = DatabaseConnections::create(ctx, &[db_workflow], true).await?; + + let host = db_conn.cockroach_host.as_ref().unwrap(); + let username = ctx.read_secret(&["crdb", "username"]).await?; + let password = ctx.read_secret(&["crdb", "password"]).await?; + let db_url = format!("postgres://{}:{}@{}", username, password, host); + + // rivet_term::status::progress("Forwarding...", ""); + let port = utils::kubectl_port_forward(ctx, "cockroachdb", "cockroachdb", (26257, 26257))?; + port.check().await?; + + // Must return port so it isn't dropped + Ok((db::sqlx::build_pool(&db_url).await?, port)) +} + +fn chunk_string(s: &str, size: usize) -> Vec { + s.as_bytes() + .chunks(size) + .map(|chunk| String::from_utf8_lossy(chunk).to_string()) + .collect() +} + +mod render { + use anyhow::*; + use rivet_term::console::style; + use tabled::Tabled; + use uuid::Uuid; + + use super::{WorkflowRow, WorkflowState}; + use crate::utils::colored_json_ugly; + + #[derive(Tabled)] + struct WorkflowTableRow { + pub workflow_name: String, + pub workflow_id: Uuid, + #[tabled(display_with = "display_state")] + pub state: WorkflowState, + #[tabled(display_with = "display_option")] + pub tags: Option, + } + + pub fn workflows(workflows: Vec) -> Result<()> { + let mut rows = workflows + .iter() + .map(|w| { + Ok(WorkflowTableRow { + workflow_name: w.workflow_name.clone(), + workflow_id: w.workflow_id, + state: if w.output.is_some() { + WorkflowState::Complete + } else if w.is_active { + WorkflowState::Running + } else if w.has_wake_condition { + WorkflowState::Sleeping + } else { + WorkflowState::Dead + }, + tags: w.tags.as_ref().map(colored_json_ugly).transpose()?, + }) + }) + .collect::>>()?; + + rows.sort_by_key(|w| w.state); + + rivet_term::format::table(rows); + + Ok(()) + } + + fn display_state(state: &WorkflowState) -> String { + match state { + WorkflowState::Complete => style("complete").bright().blue().to_string(), + WorkflowState::Running => style("running").green().to_string(), + WorkflowState::Sleeping => style("sleeping").yellow().to_string(), + WorkflowState::Dead => style("dead").red().to_string(), + } + } + + pub(crate) fn display_option(item: &Option) -> String { + match item { + Some(s) => s.to_string(), + None => String::new(), + } + } +} diff --git a/lib/bolt/core/src/tasks/wf/signal.rs b/lib/bolt/core/src/tasks/wf/signal.rs new file mode 100644 index 0000000000..07e00fa7f9 --- /dev/null +++ b/lib/bolt/core/src/tasks/wf/signal.rs @@ -0,0 +1,282 @@ +use anyhow::*; +use chrono::{Local, TimeZone}; +use clap::ValueEnum; +use indoc::indoc; +use rivet_term::console::style; +use uuid::Uuid; + +use super::{build_pool, KvPair}; +use crate::{ + context::ProjectContext, + tasks::db, + utils::{self, colored_json, indent_string}, +}; + +#[derive(ValueEnum, Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +#[clap(rename_all = "kebab_case")] +pub enum SignalState { + Acked, + Pending, +} + +#[derive(Debug, sqlx::FromRow)] +pub struct SignalRow { + signal_id: Uuid, + signal_name: String, + tags: Option, + workflow_id: Option, + create_ts: i64, + body: serde_json::Value, + ack_ts: Option, +} + +pub async fn get_signal(ctx: &ProjectContext, signal_id: Uuid) -> Result> { + let (pool, _port) = build_pool(ctx).await?; + let mut conn = db::sqlx::get_conn(&pool).await?; + + let signal = sqlx::query_as::<_, SignalRow>(indoc!( + " + SELECT + signal_id, + signal_name, + NULL AS tags, + workflow_id, + create_ts, + body, + ack_ts + FROM db_workflow.signals + WHERE signal_id = $1 + UNION ALL + SELECT + signal_id, + signal_name, + tags, + NULL AS workflow_id, + create_ts, + body, + ack_ts + FROM db_workflow.tagged_signals + WHERE signal_id = $1 + " + )) + .bind(signal_id) + .fetch_optional(&mut *conn) + .await?; + + Ok(signal) +} + +pub async fn find_signals( + ctx: &ProjectContext, + tags: Vec, + workflow_id: Option, + name: Option, + state: Option, +) -> Result> { + let (pool, _port) = build_pool(ctx).await?; + let mut conn = db::sqlx::get_conn(&pool).await?; + + let mut query_str = indoc!( + " + SELECT + signal_id, + signal_name, + NULL AS tags, + workflow_id, + create_ts, + body, + ack_ts + FROM db_workflow.signals + WHERE + ($1 IS NULL OR signal_name = $1) AND + ($2 IS NULL OR workflow_id = $2) AND + silence_ts IS NULL AND + -- Acked + (NOT $3 OR ack_ts IS NOT NULL) AND + -- Pending + (NOT $4 OR ack_ts IS NULL) + UNION ALL + SELECT + signal_id, + signal_name, + tags, + NULL AS workflow_id, + create_ts, + body, + ack_ts + FROM db_workflow.tagged_signals + WHERE + ($1 IS NULL OR signal_name = $1) AND + silence_ts IS NULL AND + -- Acked + (NOT $3 OR ack_ts IS NOT NULL) AND + -- Pending + (NOT $4 OR ack_ts IS NULL) + " + ) + .to_string(); + + // Procedurally add tags. We don't combine the tags into an object because we are comparing + // strings with `->>` whereas with @> and `serde_json::Map` we would have to know the type of the input + // given. + for i in 0..tags.len() { + let idx = i * 2 + 5; + let idx2 = idx + 1; + + query_str.push_str(&format!(" AND tags->>${idx} = ${idx2}")); + } + + query_str.push_str("LIMIT 100"); + + // eprintln!( + // "{query_str} {name:?} {workflow_id:?} {} {} {}", + // tags.is_empty(), + // matches!(state, Some(SignalState::Acked)), + // matches!(state, Some(SignalState::Pending)) + // ); + + let mut query = sqlx::query_as::<_, SignalRow>(&query_str) + .bind(name) + .bind(workflow_id) + .bind(matches!(state, Some(SignalState::Acked))) + .bind(matches!(state, Some(SignalState::Pending))); + + for tag in tags { + query = query.bind(tag.key); + query = query.bind(tag.value); + } + + let signals = query.fetch_all(&mut *conn).await?; + + Ok(signals) +} + +pub async fn print_signals(signals: Vec, pretty: bool) -> Result<()> { + if signals.is_empty() { + rivet_term::status::success("No signals found", ""); + return Ok(()); + } + + rivet_term::status::success("Signals", signals.len()); + + if pretty { + for signal in signals { + println!(""); + + println!("{}", style(signal.signal_name).bold()); + + println!(" {} {}", style("id").bold(), signal.signal_id); + + let datetime = Local + .timestamp_millis_opt(signal.create_ts) + .single() + .context("invalid ts")?; + let date = datetime.format("%Y-%m-%d %H:%M:%S"); + + println!(" {} {}", style("created at").bold(), style(date).magenta()); + + print!(" {} ", style("state").bold()); + if signal.ack_ts.is_some() { + println!("{}", style("ack'd").bright().blue()); + } else { + println!("{}", style("pending").yellow()); + } + println!( + " {} {}", + style("body").bold(), + &indent_string(&colored_json(&signal.body)?, " ")[4..] + ); + } + } else { + render::signals(signals)?; + } + + Ok(()) +} + +pub async fn silence_signal(ctx: &ProjectContext, signal_id: Uuid) -> Result<()> { + let (pool, _port) = build_pool(ctx).await?; + let mut conn = db::sqlx::get_conn(&pool).await?; + + sqlx::query(indoc!( + " + WITH + update_signals AS ( + UPDATE db_workflow.signals + SET silence_ts = $2 + WHERE signal_id = $1 + RETURNING 1 + ), + update_tagged_signals AS ( + UPDATE db_workflow.tagged_signals + SET silence_ts = $2 + WHERE signal_id = $1 + RETURNING 1 + ) + SELECT 1 + " + )) + .bind(signal_id) + .bind(utils::now()) + .execute(&mut *conn) + .await?; + + Ok(()) +} + +mod render { + use anyhow::*; + use rivet_term::console::style; + use tabled::Tabled; + use uuid::Uuid; + + use super::{SignalRow, SignalState}; + use crate::utils::colored_json_ugly; + + #[derive(Tabled)] + struct SignalTableRow { + pub signal_name: String, + pub signal_id: Uuid, + #[tabled(display_with = "display_state")] + pub state: SignalState, + #[tabled(rename = "tags/workflow_id")] + pub id: String, + } + + pub fn signals(signals: Vec) -> Result<()> { + let mut rows = signals + .iter() + .map(|w| { + Ok(SignalTableRow { + signal_name: w.signal_name.clone(), + signal_id: w.signal_id, + state: if w.ack_ts.is_some() { + SignalState::Acked + } else { + SignalState::Pending + }, + id: w + .tags + .as_ref() + .map(colored_json_ugly) + .transpose()? + .or(w.workflow_id.map(|id| id.to_string())) + .unwrap(), + }) + }) + .collect::>>()?; + + rows.sort_by_key(|w| w.state); + + rivet_term::format::table(rows); + + Ok(()) + } + + fn display_state(state: &SignalState) -> String { + match state { + SignalState::Acked => style("ack'd").bright().blue().to_string(), + SignalState::Pending => style("pending").yellow().to_string(), + } + } +} diff --git a/lib/bolt/core/src/utils/mod.rs b/lib/bolt/core/src/utils/mod.rs index 04f87538c9..3e2c68c3b7 100644 --- a/lib/bolt/core/src/utils/mod.rs +++ b/lib/bolt/core/src/utils/mod.rs @@ -1,4 +1,4 @@ -use std::{fs, path::Path, process::Command, sync::Arc}; +use std::{convert::TryInto, fs, path::Path, process::Command, sync::Arc, time}; use anyhow::*; use duct::cmd; @@ -90,6 +90,15 @@ impl MultiProgress { } } +pub fn now() -> i64 { + time::SystemTime::now() + .duration_since(time::UNIX_EPOCH) + .unwrap_or_else(|err| unreachable!("time is broken: {}", err)) + .as_millis() + .try_into() + .expect("now doesn't fit in i64") +} + /// Returns the modified timestamp of all files recursively. pub fn deep_modified_ts(path: &Path) -> Result { let mut max_modified_ts = 0; @@ -317,3 +326,59 @@ pub fn render_diff(indent: usize, patches: &json_patch::Patch) { } } } + +pub fn indent_string(s: &str, indent: &str) -> String { + let mut out = String::with_capacity(s.len()); + let mut iter = s.split("\n"); + + if let Some(chunk) = iter.next() { + out.push_str(indent); + out.push_str(chunk); + } + + while let Some(chunk) = iter.next() { + out.push_str("\n"); + out.push_str(indent); + out.push_str(chunk); + } + + out +} + +pub fn colored_json(value: &serde_json::Value) -> Result { + colored_json_inner(value, colored_json::PrettyFormatter::new()) +} + +pub fn colored_json_ugly(value: &serde_json::Value) -> Result { + colored_json_inner(value, colored_json::CompactFormatter {}) +} + +fn colored_json_inner( + value: &serde_json::Value, + formatter: T, +) -> Result { + use colored_json::{ColorMode, ColoredFormatter, Output, Style, Styler}; + use serde::Serialize; + + let mut writer = Vec::::with_capacity(128); + + let mode = ColorMode::Auto(Output::StdOut); + if mode.use_color() { + let formatter = ColoredFormatter::with_styler( + formatter, + Styler { + object_brackets: Style::new(), + array_brackets: Style::new(), + ..Default::default() + }, + ); + + let mut serializer = serde_json::Serializer::with_formatter(&mut writer, formatter); + value.serialize(&mut serializer)?; + } else { + let mut serializer = serde_json::Serializer::with_formatter(&mut writer, formatter); + value.serialize(&mut serializer)?; + } + + Ok(String::from_utf8_lossy(&writer).to_string()) +} diff --git a/svc/pkg/workflow/standalone/metrics-publish/src/lib.rs b/svc/pkg/workflow/standalone/metrics-publish/src/lib.rs index 08bc5d6bbd..f4374a301d 100644 --- a/svc/pkg/workflow/standalone/metrics-publish/src/lib.rs +++ b/svc/pkg/workflow/standalone/metrics-publish/src/lib.rs @@ -56,7 +56,7 @@ pub async fn run_from_env(pools: rivet_pools::Pools) -> GlobalResult<()> { FROM db_workflow.workflows AS OF SYSTEM TIME '-1s' WHERE error IS NOT NULL AND - output IS NULL AND AND + output IS NULL AND silence_ts IS NULL AND wake_immediate = FALSE AND wake_deadline_ts IS NULL AND