diff --git a/Cargo.toml b/Cargo.toml index 9b68f615e6..0ad495ce99 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [workspace] resolver = "2" -members = ["packages/common/api-helper/build","packages/common/api-helper/macros","packages/common/cache/build","packages/common/cache/result","packages/common/chirp-workflow/core","packages/common/chirp-workflow/macros","packages/common/chirp/client","packages/common/chirp/metrics","packages/common/chirp/perf","packages/common/chirp/types","packages/common/chirp/worker","packages/common/chirp/worker-attributes","packages/common/claims","packages/common/config","packages/common/connection","packages/common/convert","packages/common/deno-embed","packages/common/env","packages/common/fdb-util","packages/common/formatted-error","packages/common/global-error","packages/common/health-checks","packages/common/hub-embed","packages/common/kv-str","packages/common/logs","packages/common/metrics","packages/common/migrate","packages/common/nomad-util","packages/common/operation/core","packages/common/operation/macros","packages/common/pools","packages/common/redis-util","packages/common/runtime","packages/common/s3-util","packages/common/schemac","packages/common/server-cli","packages/common/service-discovery","packages/common/service-manager","packages/common/smithy-output/api-auth/rust","packages/common/smithy-output/api-auth/rust-server","packages/common/smithy-output/api-cf-verification/rust","packages/common/smithy-output/api-cf-verification/rust-server","packages/common/smithy-output/api-cloud/rust","packages/common/smithy-output/api-cloud/rust-server","packages/common/smithy-output/api-group/rust","packages/common/smithy-output/api-group/rust-server","packages/common/smithy-output/api-identity/rust","packages/common/smithy-output/api-identity/rust-server","packages/common/smithy-output/api-job/rust","packages/common/smithy-output/api-job/rust-server","packages/common/smithy-output/api-kv/rust","packages/common/smithy-output/api-kv/rust-server","packages/common/smithy-output/api-matchmaker/rust","packages/common/smithy-output/api-matchmaker/rust-server","packages/common/smithy-output/api-party/rust","packages/common/smithy-output/api-party/rust-server","packages/common/smithy-output/api-portal/rust","packages/common/smithy-output/api-portal/rust-server","packages/common/smithy-output/api-status/rust","packages/common/smithy-output/api-status/rust-server","packages/common/smithy-output/api-traefik-provider/rust","packages/common/smithy-output/api-traefik-provider/rust-server","packages/common/test","packages/common/test-images","packages/common/types-proto/build","packages/common/types-proto/core","packages/common/util/core","packages/common/util/macros","packages/common/util/search","packages/core/api/actor","packages/core/api/auth","packages/core/api/cf-verification","packages/core/api/cloud","packages/core/api/games","packages/core/api/group","packages/core/api/identity","packages/core/api/intercom","packages/core/api/job","packages/core/api/matchmaker","packages/core/api/monolith-edge","packages/core/api/monolith-public","packages/core/api/portal","packages/core/api/provision","packages/core/api/status","packages/core/api/traefik-provider","packages/core/api/ui","packages/core/infra/legacy/job-runner","packages/core/infra/schema-generator","packages/core/infra/server","packages/core/services/build","packages/core/services/build/ops/create","packages/core/services/build/ops/get","packages/core/services/build/ops/list-for-env","packages/core/services/build/ops/list-for-game","packages/core/services/build/standalone/default-create","packages/core/services/build/util","packages/core/services/captcha/ops/hcaptcha-config-get","packages/core/services/captcha/ops/hcaptcha-verify","packages/core/services/captcha/ops/request","packages/core/services/captcha/ops/turnstile-config-get","packages/core/services/captcha/ops/turnstile-verify","packages/core/services/captcha/ops/verify","packages/core/services/captcha/util","packages/core/services/cdn/ops/namespace-auth-user-remove","packages/core/services/cdn/ops/namespace-auth-user-update","packages/core/services/cdn/ops/namespace-create","packages/core/services/cdn/ops/namespace-domain-create","packages/core/services/cdn/ops/namespace-domain-remove","packages/core/services/cdn/ops/namespace-get","packages/core/services/cdn/ops/namespace-resolve-domain","packages/core/services/cdn/ops/ns-auth-type-set","packages/core/services/cdn/ops/ns-enable-domain-public-auth-set","packages/core/services/cdn/ops/site-create","packages/core/services/cdn/ops/site-get","packages/core/services/cdn/ops/site-list-for-game","packages/core/services/cdn/ops/version-get","packages/core/services/cdn/ops/version-prepare","packages/core/services/cdn/ops/version-publish","packages/core/services/cdn/util","packages/core/services/cdn/worker","packages/core/services/cf-custom-hostname/ops/get","packages/core/services/cf-custom-hostname/ops/list-for-namespace-id","packages/core/services/cf-custom-hostname/ops/resolve-hostname","packages/core/services/cf-custom-hostname/worker","packages/core/services/cloud/ops/device-link-create","packages/core/services/cloud/ops/game-config-create","packages/core/services/cloud/ops/game-config-get","packages/core/services/cloud/ops/game-token-create","packages/core/services/cloud/ops/namespace-create","packages/core/services/cloud/ops/namespace-get","packages/core/services/cloud/ops/namespace-token-development-create","packages/core/services/cloud/ops/namespace-token-public-create","packages/core/services/cloud/ops/version-get","packages/core/services/cloud/ops/version-publish","packages/core/services/cloud/standalone/default-create","packages/core/services/cloud/worker","packages/core/services/cluster","packages/core/services/cluster/standalone/datacenter-tls-renew","packages/core/services/cluster/standalone/default-update","packages/core/services/cluster/standalone/gc","packages/core/services/cluster/standalone/metrics-publish","packages/core/services/custom-user-avatar/ops/list-for-game","packages/core/services/custom-user-avatar/ops/upload-complete","packages/core/services/debug/ops/email-res","packages/core/services/dynamic-config","packages/core/services/email-verification/ops/complete","packages/core/services/email-verification/ops/create","packages/core/services/email/ops/send","packages/core/services/external/ops/request-validate","packages/core/services/external/worker","packages/core/services/faker/ops/build","packages/core/services/faker/ops/cdn-site","packages/core/services/faker/ops/game","packages/core/services/faker/ops/game-namespace","packages/core/services/faker/ops/game-version","packages/core/services/faker/ops/job-run","packages/core/services/faker/ops/job-template","packages/core/services/faker/ops/mm-lobby","packages/core/services/faker/ops/mm-lobby-row","packages/core/services/faker/ops/mm-player","packages/core/services/faker/ops/region","packages/core/services/faker/ops/team","packages/core/services/faker/ops/user","packages/core/services/game/ops/banner-upload-complete","packages/core/services/game/ops/create","packages/core/services/game/ops/get","packages/core/services/game/ops/list-all","packages/core/services/game/ops/list-for-team","packages/core/services/game/ops/logo-upload-complete","packages/core/services/game/ops/namespace-create","packages/core/services/game/ops/namespace-get","packages/core/services/game/ops/namespace-list","packages/core/services/game/ops/namespace-resolve-name-id","packages/core/services/game/ops/namespace-resolve-url","packages/core/services/game/ops/namespace-validate","packages/core/services/game/ops/namespace-version-history-list","packages/core/services/game/ops/namespace-version-set","packages/core/services/game/ops/recommend","packages/core/services/game/ops/resolve-name-id","packages/core/services/game/ops/resolve-namespace-id","packages/core/services/game/ops/token-development-validate","packages/core/services/game/ops/validate","packages/core/services/game/ops/version-create","packages/core/services/game/ops/version-get","packages/core/services/game/ops/version-list","packages/core/services/game/ops/version-validate","packages/core/services/ip/ops/info","packages/core/services/job-log/ops/read","packages/core/services/job-log/worker","packages/core/services/job-run","packages/core/services/job/standalone/gc","packages/core/services/job/util","packages/core/services/linode","packages/core/services/linode/standalone/gc","packages/core/services/load-test/standalone/api-cloud","packages/core/services/load-test/standalone/mm","packages/core/services/load-test/standalone/mm-sustain","packages/core/services/load-test/standalone/sqlx","packages/core/services/load-test/standalone/watch-requests","packages/core/services/mm-config/ops/game-get","packages/core/services/mm-config/ops/game-upsert","packages/core/services/mm-config/ops/lobby-group-get","packages/core/services/mm-config/ops/lobby-group-resolve-name-id","packages/core/services/mm-config/ops/lobby-group-resolve-version","packages/core/services/mm-config/ops/namespace-config-set","packages/core/services/mm-config/ops/namespace-config-validate","packages/core/services/mm-config/ops/namespace-create","packages/core/services/mm-config/ops/namespace-get","packages/core/services/mm-config/ops/version-get","packages/core/services/mm-config/ops/version-prepare","packages/core/services/mm-config/ops/version-publish","packages/core/services/mm/ops/dev-player-token-create","packages/core/services/mm/ops/lobby-find-fail","packages/core/services/mm/ops/lobby-find-lobby-query-list","packages/core/services/mm/ops/lobby-find-try-complete","packages/core/services/mm/ops/lobby-for-run-id","packages/core/services/mm/ops/lobby-get","packages/core/services/mm/ops/lobby-history","packages/core/services/mm/ops/lobby-idle-update","packages/core/services/mm/ops/lobby-list-for-namespace","packages/core/services/mm/ops/lobby-list-for-user-id","packages/core/services/mm/ops/lobby-player-count","packages/core/services/mm/ops/lobby-runtime-aggregate","packages/core/services/mm/ops/lobby-state-get","packages/core/services/mm/ops/player-count-for-namespace","packages/core/services/mm/ops/player-get","packages/core/services/mm/standalone/gc","packages/core/services/mm/util","packages/core/services/mm/worker","packages/core/services/monolith/standalone/worker","packages/core/services/monolith/standalone/workflow-worker","packages/core/services/nomad/standalone/monitor","packages/core/services/region/ops/get","packages/core/services/region/ops/list","packages/core/services/region/ops/list-for-game","packages/core/services/region/ops/recommend","packages/core/services/region/ops/resolve","packages/core/services/region/ops/resolve-for-game","packages/core/services/route","packages/core/services/server-spec","packages/core/services/team-invite/ops/get","packages/core/services/team-invite/worker","packages/core/services/team/ops/avatar-upload-complete","packages/core/services/team/ops/get","packages/core/services/team/ops/join-request-list","packages/core/services/team/ops/member-count","packages/core/services/team/ops/member-get","packages/core/services/team/ops/member-list","packages/core/services/team/ops/member-relationship-get","packages/core/services/team/ops/profile-validate","packages/core/services/team/ops/recommend","packages/core/services/team/ops/resolve-display-name","packages/core/services/team/ops/user-ban-get","packages/core/services/team/ops/user-ban-list","packages/core/services/team/ops/validate","packages/core/services/team/util","packages/core/services/team/worker","packages/core/services/telemetry/standalone/beacon","packages/core/services/tier","packages/core/services/token/ops/create","packages/core/services/token/ops/exchange","packages/core/services/token/ops/get","packages/core/services/token/ops/revoke","packages/core/services/upload/ops/complete","packages/core/services/upload/ops/file-list","packages/core/services/upload/ops/get","packages/core/services/upload/ops/list-for-user","packages/core/services/upload/ops/prepare","packages/core/services/upload/worker","packages/core/services/user","packages/core/services/user-identity/ops/create","packages/core/services/user-identity/ops/delete","packages/core/services/user-identity/ops/get","packages/core/services/user/ops/avatar-upload-complete","packages/core/services/user/ops/get","packages/core/services/user/ops/pending-delete-toggle","packages/core/services/user/ops/profile-validate","packages/core/services/user/ops/resolve-email","packages/core/services/user/ops/team-list","packages/core/services/user/ops/token-create","packages/core/services/user/standalone/delete-pending","packages/core/services/user/worker","packages/edge/api/actor","packages/edge/api/intercom","packages/edge/api/monolith-edge","packages/edge/api/monolith-public","packages/edge/api/traefik-provider","packages/edge/infra/client/actor-kv","packages/edge/infra/client/config","packages/edge/infra/client/container-runner","packages/edge/infra/client/echo","packages/edge/infra/client/manager","packages/edge/infra/edge-server","packages/edge/infra/guard/core","packages/edge/infra/guard/server","packages/edge/services/monolith/standalone/workflow-worker","packages/edge/services/pegboard","packages/edge/services/pegboard/standalone/ws","packages/toolchain/cli","packages/toolchain/js-utils-embed","packages/toolchain/toolchain","sdks/api/full/rust"] +members = ["packages/common/api-helper/build","packages/common/api-helper/macros","packages/common/cache/build","packages/common/cache/result","packages/common/chirp-workflow/core","packages/common/chirp-workflow/macros","packages/common/chirp/client","packages/common/chirp/metrics","packages/common/chirp/perf","packages/common/chirp/types","packages/common/chirp/worker","packages/common/chirp/worker-attributes","packages/common/claims","packages/common/clickhouse-inserter","packages/common/clickhouse-user-query","packages/common/config","packages/common/connection","packages/common/convert","packages/common/deno-embed","packages/common/env","packages/common/fdb-util","packages/common/formatted-error","packages/common/global-error","packages/common/health-checks","packages/common/kv-str","packages/common/logs","packages/common/metrics","packages/common/migrate","packages/common/nomad-util","packages/common/operation/core","packages/common/operation/macros","packages/common/pools","packages/common/redis-util","packages/common/runtime","packages/common/s3-util","packages/common/schemac","packages/common/server-cli","packages/common/service-discovery","packages/common/service-manager","packages/common/smithy-output/api-auth/rust","packages/common/smithy-output/api-auth/rust-server","packages/common/smithy-output/api-cf-verification/rust","packages/common/smithy-output/api-cf-verification/rust-server","packages/common/smithy-output/api-cloud/rust","packages/common/smithy-output/api-cloud/rust-server","packages/common/smithy-output/api-group/rust","packages/common/smithy-output/api-group/rust-server","packages/common/smithy-output/api-identity/rust","packages/common/smithy-output/api-identity/rust-server","packages/common/smithy-output/api-job/rust","packages/common/smithy-output/api-job/rust-server","packages/common/smithy-output/api-kv/rust","packages/common/smithy-output/api-kv/rust-server","packages/common/smithy-output/api-matchmaker/rust","packages/common/smithy-output/api-matchmaker/rust-server","packages/common/smithy-output/api-party/rust","packages/common/smithy-output/api-party/rust-server","packages/common/smithy-output/api-portal/rust","packages/common/smithy-output/api-portal/rust-server","packages/common/smithy-output/api-status/rust","packages/common/smithy-output/api-status/rust-server","packages/common/smithy-output/api-traefik-provider/rust","packages/common/smithy-output/api-traefik-provider/rust-server","packages/common/test","packages/common/test-images","packages/common/types-proto/build","packages/common/types-proto/core","packages/common/util/core","packages/common/util/macros","packages/common/util/search","packages/core/api/actor","packages/core/api/auth","packages/core/api/cf-verification","packages/core/api/cloud","packages/core/api/games","packages/core/api/group","packages/core/api/identity","packages/core/api/intercom","packages/core/api/job","packages/core/api/matchmaker","packages/core/api/monolith-edge","packages/core/api/monolith-public","packages/core/api/portal","packages/core/api/provision","packages/core/api/status","packages/core/api/traefik-provider","packages/core/api/ui","packages/core/infra/legacy/job-runner","packages/core/infra/schema-generator","packages/core/infra/server","packages/core/services/build","packages/core/services/build/ops/create","packages/core/services/build/ops/get","packages/core/services/build/ops/list-for-env","packages/core/services/build/ops/list-for-game","packages/core/services/build/standalone/default-create","packages/core/services/build/util","packages/core/services/captcha/ops/hcaptcha-config-get","packages/core/services/captcha/ops/hcaptcha-verify","packages/core/services/captcha/ops/request","packages/core/services/captcha/ops/turnstile-config-get","packages/core/services/captcha/ops/turnstile-verify","packages/core/services/captcha/ops/verify","packages/core/services/captcha/util","packages/core/services/cdn/ops/namespace-auth-user-remove","packages/core/services/cdn/ops/namespace-auth-user-update","packages/core/services/cdn/ops/namespace-create","packages/core/services/cdn/ops/namespace-domain-create","packages/core/services/cdn/ops/namespace-domain-remove","packages/core/services/cdn/ops/namespace-get","packages/core/services/cdn/ops/namespace-resolve-domain","packages/core/services/cdn/ops/ns-auth-type-set","packages/core/services/cdn/ops/ns-enable-domain-public-auth-set","packages/core/services/cdn/ops/site-create","packages/core/services/cdn/ops/site-get","packages/core/services/cdn/ops/site-list-for-game","packages/core/services/cdn/ops/version-get","packages/core/services/cdn/ops/version-prepare","packages/core/services/cdn/ops/version-publish","packages/core/services/cdn/util","packages/core/services/cdn/worker","packages/core/services/cf-custom-hostname/ops/get","packages/core/services/cf-custom-hostname/ops/list-for-namespace-id","packages/core/services/cf-custom-hostname/ops/resolve-hostname","packages/core/services/cf-custom-hostname/worker","packages/core/services/cloud/ops/device-link-create","packages/core/services/cloud/ops/game-config-create","packages/core/services/cloud/ops/game-config-get","packages/core/services/cloud/ops/game-token-create","packages/core/services/cloud/ops/namespace-create","packages/core/services/cloud/ops/namespace-get","packages/core/services/cloud/ops/namespace-token-development-create","packages/core/services/cloud/ops/namespace-token-public-create","packages/core/services/cloud/ops/version-get","packages/core/services/cloud/ops/version-publish","packages/core/services/cloud/standalone/default-create","packages/core/services/cloud/worker","packages/core/services/cluster","packages/core/services/cluster/standalone/datacenter-tls-renew","packages/core/services/cluster/standalone/default-update","packages/core/services/cluster/standalone/gc","packages/core/services/cluster/standalone/metrics-publish","packages/core/services/custom-user-avatar/ops/list-for-game","packages/core/services/custom-user-avatar/ops/upload-complete","packages/core/services/debug/ops/email-res","packages/core/services/dynamic-config","packages/core/services/email-verification/ops/complete","packages/core/services/email-verification/ops/create","packages/core/services/email/ops/send","packages/core/services/external/ops/request-validate","packages/core/services/external/worker","packages/core/services/faker/ops/build","packages/core/services/faker/ops/cdn-site","packages/core/services/faker/ops/game","packages/core/services/faker/ops/game-namespace","packages/core/services/faker/ops/game-version","packages/core/services/faker/ops/job-run","packages/core/services/faker/ops/job-template","packages/core/services/faker/ops/mm-lobby","packages/core/services/faker/ops/mm-lobby-row","packages/core/services/faker/ops/mm-player","packages/core/services/faker/ops/region","packages/core/services/faker/ops/team","packages/core/services/faker/ops/user","packages/core/services/game/ops/banner-upload-complete","packages/core/services/game/ops/create","packages/core/services/game/ops/get","packages/core/services/game/ops/list-all","packages/core/services/game/ops/list-for-team","packages/core/services/game/ops/logo-upload-complete","packages/core/services/game/ops/namespace-create","packages/core/services/game/ops/namespace-get","packages/core/services/game/ops/namespace-list","packages/core/services/game/ops/namespace-resolve-name-id","packages/core/services/game/ops/namespace-resolve-url","packages/core/services/game/ops/namespace-validate","packages/core/services/game/ops/namespace-version-history-list","packages/core/services/game/ops/namespace-version-set","packages/core/services/game/ops/recommend","packages/core/services/game/ops/resolve-name-id","packages/core/services/game/ops/resolve-namespace-id","packages/core/services/game/ops/token-development-validate","packages/core/services/game/ops/validate","packages/core/services/game/ops/version-create","packages/core/services/game/ops/version-get","packages/core/services/game/ops/version-list","packages/core/services/game/ops/version-validate","packages/core/services/ip/ops/info","packages/core/services/job-log/ops/read","packages/core/services/job-log/worker","packages/core/services/job-run","packages/core/services/job/standalone/gc","packages/core/services/job/util","packages/core/services/linode","packages/core/services/linode/standalone/gc","packages/core/services/load-test/standalone/api-cloud","packages/core/services/load-test/standalone/mm","packages/core/services/load-test/standalone/mm-sustain","packages/core/services/load-test/standalone/sqlx","packages/core/services/load-test/standalone/watch-requests","packages/core/services/mm-config/ops/game-get","packages/core/services/mm-config/ops/game-upsert","packages/core/services/mm-config/ops/lobby-group-get","packages/core/services/mm-config/ops/lobby-group-resolve-name-id","packages/core/services/mm-config/ops/lobby-group-resolve-version","packages/core/services/mm-config/ops/namespace-config-set","packages/core/services/mm-config/ops/namespace-config-validate","packages/core/services/mm-config/ops/namespace-create","packages/core/services/mm-config/ops/namespace-get","packages/core/services/mm-config/ops/version-get","packages/core/services/mm-config/ops/version-prepare","packages/core/services/mm-config/ops/version-publish","packages/core/services/mm/ops/dev-player-token-create","packages/core/services/mm/ops/lobby-find-fail","packages/core/services/mm/ops/lobby-find-lobby-query-list","packages/core/services/mm/ops/lobby-find-try-complete","packages/core/services/mm/ops/lobby-for-run-id","packages/core/services/mm/ops/lobby-get","packages/core/services/mm/ops/lobby-history","packages/core/services/mm/ops/lobby-idle-update","packages/core/services/mm/ops/lobby-list-for-namespace","packages/core/services/mm/ops/lobby-list-for-user-id","packages/core/services/mm/ops/lobby-player-count","packages/core/services/mm/ops/lobby-runtime-aggregate","packages/core/services/mm/ops/lobby-state-get","packages/core/services/mm/ops/player-count-for-namespace","packages/core/services/mm/ops/player-get","packages/core/services/mm/standalone/gc","packages/core/services/mm/util","packages/core/services/mm/worker","packages/core/services/monolith/standalone/worker","packages/core/services/monolith/standalone/workflow-worker","packages/core/services/nomad/standalone/monitor","packages/core/services/region/ops/get","packages/core/services/region/ops/list","packages/core/services/region/ops/list-for-game","packages/core/services/region/ops/recommend","packages/core/services/region/ops/resolve","packages/core/services/region/ops/resolve-for-game","packages/core/services/route","packages/core/services/server-spec","packages/core/services/team-invite/ops/get","packages/core/services/team-invite/worker","packages/core/services/team/ops/avatar-upload-complete","packages/core/services/team/ops/get","packages/core/services/team/ops/join-request-list","packages/core/services/team/ops/member-count","packages/core/services/team/ops/member-get","packages/core/services/team/ops/member-list","packages/core/services/team/ops/member-relationship-get","packages/core/services/team/ops/profile-validate","packages/core/services/team/ops/recommend","packages/core/services/team/ops/resolve-display-name","packages/core/services/team/ops/user-ban-get","packages/core/services/team/ops/user-ban-list","packages/core/services/team/ops/validate","packages/core/services/team/util","packages/core/services/team/worker","packages/core/services/telemetry/standalone/beacon","packages/core/services/tier","packages/core/services/token/ops/create","packages/core/services/token/ops/exchange","packages/core/services/token/ops/get","packages/core/services/token/ops/revoke","packages/core/services/upload/ops/complete","packages/core/services/upload/ops/file-list","packages/core/services/upload/ops/get","packages/core/services/upload/ops/list-for-user","packages/core/services/upload/ops/prepare","packages/core/services/upload/worker","packages/core/services/user","packages/core/services/user-identity/ops/create","packages/core/services/user-identity/ops/delete","packages/core/services/user-identity/ops/get","packages/core/services/user/ops/avatar-upload-complete","packages/core/services/user/ops/get","packages/core/services/user/ops/pending-delete-toggle","packages/core/services/user/ops/profile-validate","packages/core/services/user/ops/resolve-email","packages/core/services/user/ops/team-list","packages/core/services/user/ops/token-create","packages/core/services/user/standalone/delete-pending","packages/core/services/user/worker","packages/edge/api/actor","packages/edge/api/intercom","packages/edge/api/monolith-edge","packages/edge/api/monolith-public","packages/edge/api/traefik-provider","packages/edge/infra/client/actor-kv","packages/edge/infra/client/config","packages/edge/infra/client/container-runner","packages/edge/infra/client/echo","packages/edge/infra/client/manager","packages/edge/infra/edge-server","packages/edge/infra/guard/core","packages/edge/infra/guard/server","packages/edge/services/monolith/standalone/workflow-worker","packages/edge/services/pegboard","packages/edge/services/pegboard/standalone/usage-metrics-publish","packages/edge/services/pegboard/standalone/ws","packages/toolchain/cli","packages/toolchain/js-utils-embed","packages/toolchain/toolchain","sdks/api/full/rust"] [workspace.package] version = "25.5.3" diff --git a/docker/dev-full/prometheus/prometheus.yml b/docker/dev-full/prometheus/prometheus.yml index b080fff210..f54ec89667 100644 --- a/docker/dev-full/prometheus/prometheus.yml +++ b/docker/dev-full/prometheus/prometheus.yml @@ -14,4 +14,4 @@ scrape_configs: - job_name: rivet-client static_configs: - targets: - - rivet-client:6090 + - rivet-client:8091 diff --git a/docker/dev-full/vector-client/vector.yaml b/docker/dev-full/vector-client/vector.yaml index 008feaa39f..945d0711c2 100644 --- a/docker/dev-full/vector-client/vector.yaml +++ b/docker/dev-full/vector-client/vector.yaml @@ -8,7 +8,7 @@ sources: prometheus_pegboard: type: prometheus_scrape endpoints: - - http://rivet-client:6090 + - http://rivet-client:8091 scrape_interval_secs: 15 dynamic_events_http: diff --git a/docker/monolith/build-scripts/setup_s6.ts b/docker/monolith/build-scripts/setup_s6.ts index 8759b0df17..009cf4e0ca 100755 --- a/docker/monolith/build-scripts/setup_s6.ts +++ b/docker/monolith/build-scripts/setup_s6.ts @@ -103,7 +103,7 @@ const services: Service[] = [ rootUser: true, ports: { runner: 6080, - metrics: 6090, + metrics: 8091, }, }, diff --git a/docker/monolith/vector-client/vector.yaml b/docker/monolith/vector-client/vector.yaml index 1f892ffb6c..58ae722945 100644 --- a/docker/monolith/vector-client/vector.yaml +++ b/docker/monolith/vector-client/vector.yaml @@ -12,7 +12,7 @@ sources: prometheus_pegboard: type: prometheus_scrape endpoints: - - http://rivet-client:6090 + - http://rivet-client:8091 scrape_interval_secs: 15 pegboard_manager: diff --git a/packages/common/metrics/src/server.rs b/packages/common/metrics/src/server.rs index 3fa4faca8a..0ef6937dd3 100644 --- a/packages/common/metrics/src/server.rs +++ b/packages/common/metrics/src/server.rs @@ -1,3 +1,5 @@ +use std::net::SocketAddr; + use global_error::prelude::*; use hyper::{ header::CONTENT_TYPE, @@ -5,7 +7,6 @@ use hyper::{ Body, Request, Response, Server, }; use prometheus::{Encoder, TextEncoder}; -use std::net::SocketAddr; // TODO: Record extra labels diff --git a/packages/common/util/core/src/id.rs b/packages/common/util/core/src/id.rs index 9938aab4da..a0a9b5e17c 100644 --- a/packages/common/util/core/src/id.rs +++ b/packages/common/util/core/src/id.rs @@ -36,6 +36,8 @@ pub enum Id { V1([u8; 18]), } + + impl Id { /// Construct V0 from uuid. pub fn new_v0() -> Self { @@ -313,6 +315,12 @@ impl sqlx::postgres::PgHasArrayType for Id { } } +impl Default for Id { + fn default() -> Self { + Id::V0(Uuid::new_v4()) + } +} + /// Decode a base36 string into a fixed-size byte array. fn decode_base36(s: &str) -> Result<[u8; N], IdError> { let mut data = [0u8; N]; diff --git a/packages/core/services/cluster/src/workflows/server/install/install_scripts/components/vector.rs b/packages/core/services/cluster/src/workflows/server/install/install_scripts/components/vector.rs index edda3f641a..d3de063b3c 100644 --- a/packages/core/services/cluster/src/workflows/server/install/install_scripts/components/vector.rs +++ b/packages/core/services/cluster/src/workflows/server/install/install_scripts/components/vector.rs @@ -81,7 +81,7 @@ pub fn configure(namespace: &str, config: &Config, pool_type: PoolType) -> Globa } }); - // Add pegboard manager and runner logs + // Add pegboard manager and actor logs match pool_type { PoolType::Pegboard | PoolType::PegboardIsolate => { config_json["sources"]["pegboard_manager"] = json!({ diff --git a/packages/core/services/cluster/src/workflows/server/install/install_scripts/mod.rs b/packages/core/services/cluster/src/workflows/server/install/install_scripts/mod.rs index c0bce152e5..ca3c3b032d 100644 --- a/packages/core/services/cluster/src/workflows/server/install/install_scripts/mod.rs +++ b/packages/core/services/cluster/src/workflows/server/install/install_scripts/mod.rs @@ -180,7 +180,7 @@ pub async fn gen_initialize( "pegboard".into(), components::vector::PrometheusTarget { // Should match port from pb manager config - endpoint: "http://127.0.0.1:6090".into(), + endpoint: "http://127.0.0.1:8091".into(), scrape_interval: 15, }, ); @@ -195,7 +195,7 @@ pub async fn gen_initialize( "pegboard".into(), components::vector::PrometheusTarget { // Should match port from pb manager config - endpoint: "http://127.0.0.1:6090".into(), + endpoint: "http://127.0.0.1:8091".into(), scrape_interval: 15, }, ); diff --git a/packages/edge/infra/client/config/Cargo.toml b/packages/edge/infra/client/config/Cargo.toml index 695c7288d4..58b6f3c684 100644 --- a/packages/edge/infra/client/config/Cargo.toml +++ b/packages/edge/infra/client/config/Cargo.toml @@ -6,10 +6,13 @@ license.workspace = true edition.workspace = true [dependencies] +anyhow = "1.0" ipnet = { version = "2.10.1", features = ["serde"] } +pegboard.workspace = true +rivet-util.workspace = true schemars = { version = "0.8.21", features = ["url", "uuid1"] } serde = { version = "1.0.195", features = ["derive"] } +serde_json = "1.0" +tokio-util = { version = "0.7", features = ["codec"] } url = "2.2.2" -uuid = { version = "1.6.1", features = ["v4"] } -pegboard.workspace = true -rivet-util.workspace = true +uuid = { version = "1.6.1", features = ["v4"] } \ No newline at end of file diff --git a/packages/edge/infra/client/config/src/manager.rs b/packages/edge/infra/client/config/src/manager.rs index 040e2c30c9..c64c7ba041 100644 --- a/packages/edge/infra/client/config/src/manager.rs +++ b/packages/edge/infra/client/config/src/manager.rs @@ -94,9 +94,6 @@ pub struct Runner { /// ```` pub use_resource_constraints: Option, - /// WebSocket Port for runners on this machine to connect to. - pub port: Option, - pub container_runner_binary_path: Option, /// Custom host entries to append to /etc/hosts in actor containers. @@ -113,10 +110,6 @@ impl Runner { self.use_resource_constraints.unwrap_or(true) } - pub fn port(&self) -> u16 { - self.port.unwrap_or(6080) - } - pub fn container_runner_binary_path(&self) -> PathBuf { self.container_runner_binary_path .clone() @@ -267,7 +260,7 @@ pub struct Metrics { impl Metrics { pub fn port(&self) -> u16 { - self.port.unwrap_or(6090) + self.port.unwrap_or(8091) } } diff --git a/packages/edge/infra/client/config/src/runner_protocol.rs b/packages/edge/infra/client/config/src/runner_protocol.rs index d11a3f3736..afb92980cc 100644 --- a/packages/edge/infra/client/config/src/runner_protocol.rs +++ b/packages/edge/infra/client/config/src/runner_protocol.rs @@ -1,23 +1,29 @@ +use std::io::{Write, Cursor}; + +use anyhow::*; use pegboard::protocol; -use serde::{Deserialize, Serialize}; +use tokio_util::codec::LengthDelimitedCodec; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "snake_case", deny_unknown_fields)] pub enum ToManager { - Init { - // See `packages/edge/infra/client/manager/src/claims.rs` - access_token: String, - }, ActorStateUpdate { actor_id: rivet_util::Id, generation: u32, state: ActorState, }, + Ping, } #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "snake_case", deny_unknown_fields)] pub enum ToRunner { + Pong, + Close { + reason: Option, + }, + StartActor { actor_id: rivet_util::Id, generation: u32, @@ -38,3 +44,41 @@ pub enum ActorState { Running, Exited { exit_code: Option }, } + +pub fn codec() -> LengthDelimitedCodec { + LengthDelimitedCodec::builder() + .length_field_type::() + .length_field_length(4) + // No offset + .length_field_offset(0) + // Header length is not included in the length calculation + .length_adjustment(4) + // Skip length, but header is included in the returned bytes + .num_skip(4) + .new_codec() +} + +pub fn encode_frame(payload: &T) -> Result> { + let mut buf = Vec::with_capacity(4); + let mut cursor = Cursor::new(&mut buf); + + cursor.write(&[0u8; 4])?; // header (currently unused) + + serde_json::to_writer(&mut cursor, payload)?; + + cursor.flush()?; + + Ok(buf) +} + +pub fn decode_frame(frame: &[u8]) -> Result<([u8; 4], T)> { + ensure!(frame.len() >= 4, "Frame too short"); + + // Extract the header (first 4 bytes) + let header = [frame[0], frame[1], frame[2], frame[3]]; + + // Deserialize the rest of the frame (payload after the header) + let payload = serde_json::from_slice(&frame[4..])?; + + Ok((header, payload)) +} diff --git a/packages/edge/infra/client/container-runner/src/container.rs b/packages/edge/infra/client/container-runner/src/container.rs index bb03f52876..b60149bb3d 100644 --- a/packages/edge/infra/client/container-runner/src/container.rs +++ b/packages/edge/infra/client/container-runner/src/container.rs @@ -33,7 +33,7 @@ pub fn run( .context("empty `runner_path`")? .to_string_lossy() .to_string(); - let fs_path = actor_path.join("fs").join("upper"); + let fs_path = runner_path.join("fs").join("upper"); let oci_bundle_config_json = fs_path.join("config.json"); // Validate OCI bundle diff --git a/packages/edge/infra/client/container-runner/src/log_shipper.rs b/packages/edge/infra/client/container-runner/src/log_shipper.rs index d0c10100be..01e0acc9e0 100644 --- a/packages/edge/infra/client/container-runner/src/log_shipper.rs +++ b/packages/edge/infra/client/container-runner/src/log_shipper.rs @@ -93,8 +93,8 @@ impl LogShipper { println!("Log shipper connected"); while let Result::Ok(message) = self.msg_rx.recv() { - let vector_message = VectorMessage::Runners { - runner_id: self.runner_id.as_str(), + let vector_message = VectorMessage::Actors { + // runner_id: self.runner_id.as_str(), actor_id: self.actor_id.as_ref().map(|x| x.as_str()), env_id: self.env_id, stream_type: message.stream_type as u8, @@ -116,9 +116,9 @@ impl LogShipper { #[derive(Serialize)] #[serde(tag = "source")] enum VectorMessage<'a> { - #[serde(rename = "runners")] - Runners { - runner_id: &'a str, + #[serde(rename = "actors")] + Actors { + // runner_id: &'a str, actor_id: Option<&'a str>, env_id: Uuid, stream_type: u8, diff --git a/packages/edge/infra/client/echo/Dockerfile b/packages/edge/infra/client/echo/Dockerfile index d7e9dce650..7290142969 100644 --- a/packages/edge/infra/client/echo/Dockerfile +++ b/packages/edge/infra/client/echo/Dockerfile @@ -17,16 +17,20 @@ RUN useradd -m -d /home/nonroot -s /bin/sh nonroot RUN grep nonroot /etc/passwd > /passwd && \ grep nonroot /etc/group > /group -# Create an empty image and copy binaries into it to minimize the size of the image -FROM scratch +USER nonroot -# Copy passwd and group files -COPY --from=rust /passwd /etc/passwd -COPY --from=rust /group /etc/group +CMD ["/app/dist/pegboard-echo-server"] -COPY --from=rust /app/dist/ / +# # Create an empty image and copy binaries into it to minimize the size of the image +# FROM scratch -# Switch to the non-root user -USER nonroot +# # Copy passwd and group files +# COPY --from=rust /passwd /etc/passwd +# COPY --from=rust /group /etc/group + +# COPY --from=rust /app/dist/ / + +# # Switch to the non-root user +# USER nonroot -CMD ["/pegboard-echo-server"] +# CMD ["/pegboard-echo-server"] diff --git a/packages/edge/infra/client/echo/src/main.rs b/packages/edge/infra/client/echo/src/main.rs index e3788241d3..1a1b9af84b 100644 --- a/packages/edge/infra/client/echo/src/main.rs +++ b/packages/edge/infra/client/echo/src/main.rs @@ -1,4 +1,4 @@ -use std::{env, io::Cursor, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration}; +use std::{env, io::{Write, Cursor}, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration}; use anyhow::*; use bytes::Bytes; @@ -69,8 +69,8 @@ async fn run_socket_client(socket_path: PathBuf) -> Result<()> { .length_field_offset(0) // Header length is not included in the length calculation .length_adjustment(4) - // header is included in the returned bytes - .num_skip(0) + // Skip length, but header is included in the returned bytes + .num_skip(4) .new_codec(); let framed = Framed::new(stream, codec); @@ -86,7 +86,7 @@ async fn run_socket_client(socket_path: PathBuf) -> Result<()> { tokio::time::sleep(PING_INTERVAL).await; let payload = json!({ - "ping": {} + "ping": null }); if write2 @@ -103,7 +103,7 @@ async fn run_socket_client(socket_path: PathBuf) -> Result<()> { // Process incoming messages while let Some(frame) = read.next().await.transpose()? { - let (_, packet) = decode_frame::(&frame.freeze())?; + let (_, packet) = decode_frame::(&frame.freeze()).context("failed to decode frame")?; println!("Received packet: {packet:?}"); if let Some(packet) = packet.get("start_actor") { @@ -117,7 +117,7 @@ async fn run_socket_client(socket_path: PathBuf) -> Result<()> { }, }); - write.lock().await.send(encode_frame(&payload)?).await?; + write.lock().await.send(encode_frame(&payload).context("failed to encode frame")?).await?; } else if let Some(packet) = packet.get("signal_actor") { let payload = json!({ "actor_state_update": { @@ -139,6 +139,19 @@ async fn run_socket_client(socket_path: PathBuf) -> Result<()> { Ok(()) } +fn encode_frame(payload: &T) -> Result { + let mut buf = Vec::with_capacity(4); + let mut cursor = Cursor::new(&mut buf); + + cursor.write(&[0u8; 4]); // header (currently unused) + + serde_json::to_writer(&mut cursor, payload)?; + + cursor.flush()?; + + Ok(buf.into()) +} + fn decode_frame(frame: &Bytes) -> Result<([u8; 4], T)> { ensure!(frame.len() >= 4, "Frame too short"); @@ -150,13 +163,3 @@ fn decode_frame(frame: &Bytes) -> Result<([u8; 4], T)> { Ok((header, payload)) } - -fn encode_frame(payload: &T) -> Result { - let mut buf = Vec::with_capacity(4); - buf.extend_from_slice(&[0u8; 4]); // header (currently unused) - - let mut cursor = Cursor::new(&mut buf); - serde_json::to_writer(&mut cursor, payload)?; - - Ok(buf.into()) -} diff --git a/packages/edge/infra/client/manager/Cargo.toml b/packages/edge/infra/client/manager/Cargo.toml index d8411b3387..240d290d97 100644 --- a/packages/edge/infra/client/manager/Cargo.toml +++ b/packages/edge/infra/client/manager/Cargo.toml @@ -15,13 +15,11 @@ test = [] [dependencies] anyhow.workspace = true -base64 = "0.22" bytes = "1.0" futures-util = { version = "0.3" } hyper = { version = "0.14", features = ["server", "http1", "tcp", "stream"] } indoc = "2.0" json5.workspace = true -jsonwebtoken = "9.3.1" lazy_static = "1.4" nix.workspace = true notify = { version = "6.1.1", default-features = false, features = ["serde", "fsevent-sys"] } @@ -31,7 +29,6 @@ prometheus = "0.13" rand = "0.8" rand_chacha = "0.3.1" reqwest = { version = "0.12", default-features = false, features = ["stream", "rustls-tls", "json"] } -ring = "0.17" rivet-logs.workspace = true rivet-util.workspace = true serde = { version = "1.0.195", features = ["derive"] } diff --git a/packages/edge/infra/client/manager/src/actor/mod.rs b/packages/edge/infra/client/manager/src/actor/mod.rs index 2b88bd6e54..fa48d3d057 100644 --- a/packages/edge/infra/client/manager/src/actor/mod.rs +++ b/packages/edge/infra/client/manager/src/actor/mod.rs @@ -8,7 +8,6 @@ use indoc::indoc; use nix::{sys::signal::Signal, unistd::Pid}; use pegboard::protocol; use pegboard_config::runner_protocol; -use uuid::Uuid; use crate::{ctx::Ctx, runner, utils}; @@ -55,10 +54,9 @@ impl Actor { generation, runner_id, config, - start_ts, - image_id + start_ts ) - VALUES (?1, ?2, ?3, ?4, ?5, ?6) + VALUES (?1, ?2, ?3, ?4, ?5) ON CONFLICT (actor_id, generation) DO NOTHING ", )) @@ -67,7 +65,6 @@ impl Actor { .bind(runner_id) .bind(&config_json) .bind(utils::now()) - .bind(self.config.image.id) .execute(&mut *ctx.sql().await?) .await }) @@ -261,17 +258,14 @@ impl Actor { } // Send signal else { - self.runner.signal(signal).await?; + self.runner.signal(ctx, signal).await?; } } // Update stop_ts if matches!(signal, Signal::SIGTERM | Signal::SIGKILL) || !has_pid { let stop_ts_set = utils::sql::query(|| async { - let mut conn = ctx.sql().await?; - let mut tx = conn.begin().await?; - - let res = sqlx::query_as::<_, (bool,)>(indoc!( + sqlx::query_as::<_, (bool,)>(indoc!( " UPDATE actors SET stop_ts = ?3 @@ -285,27 +279,11 @@ impl Actor { .bind(self.actor_id) .bind(self.generation as i64) .bind(utils::now()) - .fetch_optional(&mut *tx) - .await?; - - // Update LRU cache - sqlx::query(indoc!( - " - UPDATE images_cache - SET last_used_ts = ?2 - WHERE image_id = ?1 - ", - )) - .bind(self.config.image.id) - .bind(utils::now()) - .execute(&mut *tx) - .await?; - - tx.commit().await?; - - Ok(res.is_some()) + .fetch_optional(&mut *ctx.sql().await?) + .await }) - .await?; + .await? + .is_some(); // Emit event if not stopped before if stop_ts_set { diff --git a/packages/edge/infra/client/manager/src/claims.rs b/packages/edge/infra/client/manager/src/claims.rs deleted file mode 100644 index ee431f983d..0000000000 --- a/packages/edge/infra/client/manager/src/claims.rs +++ /dev/null @@ -1,64 +0,0 @@ -use std::time::Duration; - -use anyhow::*; -use jsonwebtoken as jwt; -use serde::{Deserialize, Serialize}; -use uuid::Uuid; - -const CURRENT_KID: &str = "v1"; - -#[derive(Debug, Serialize, Deserialize)] -pub struct Claims { - iat: i64, - exp: i64, - jti: Uuid, - entitlements: Vec, -} - -impl Claims { - pub fn new(entitlement: Entitlement, ttl: Duration) -> Self { - let iat = crate::utils::now(); - let exp = iat + ttl.as_millis() as i64; - - Claims { - iat, - exp, - jti: Uuid::new_v4(), - entitlements: vec![entitlement], - } - } - - pub fn encode(&self, secret: &[u8]) -> Result { - let mut header = jwt::Header::new(jwt::Algorithm::HS256); - header.kid = Some(CURRENT_KID.to_string()); - - let token = jwt::encode(&header, &self, &jwt::EncodingKey::from_secret(secret))?; - - Ok(token) - } - - pub fn decode(token: &str, secret: &[u8]) -> Result { - let header = jwt::decode_header(token)?; - let kid = header.kid.context("token missing kid")?; - - ensure!(kid == CURRENT_KID, "invalid kid"); - - let token_data = jwt::decode::( - token, - &jwt::DecodingKey::from_secret(secret), - &jwt::Validation::default(), - )?; - - Ok(token_data.claims) - } - - pub fn ent(&self) -> &[Entitlement] { - &self.entitlements - } -} - -#[derive(Debug, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] -pub enum Entitlement { - Runner { runner_id: Uuid }, -} diff --git a/packages/edge/infra/client/manager/src/ctx.rs b/packages/edge/infra/client/manager/src/ctx.rs index 43ff3d7b86..d39423f737 100644 --- a/packages/edge/infra/client/manager/src/ctx.rs +++ b/packages/edge/infra/client/manager/src/ctx.rs @@ -18,18 +18,15 @@ use nix::{ unistd::Pid, }; use pegboard::{protocol, system_info::SystemInfo}; -use pegboard_config::{runner_protocol, Client, Config}; +use pegboard_config::{Client, Config}; use sqlx::{pool::PoolConnection, Acquire, Sqlite, SqlitePool}; use tokio::{ fs, - net::{TcpListener, TcpStream}, + net::TcpStream, sync::{Mutex, RwLock}, }; use tokio_tungstenite::{ - tungstenite::protocol::{ - frame::{coding::CloseCode, CloseFrame}, - Message, - }, + tungstenite::protocol::{frame::coding::CloseCode, Message}, MaybeTlsStream, WebSocketStream, }; use url::Url; @@ -37,7 +34,6 @@ use uuid::Uuid; use crate::{ actor::Actor, - claims::{Claims, Entitlement}, event_sender::EventSender, image_download_handler::ImageDownloadHandler, metrics, @@ -47,8 +43,6 @@ use crate::{ const PING_INTERVAL: Duration = Duration::from_secs(1); const ACK_INTERVAL: Duration = Duration::from_secs(60 * 5); -/// How long before killing a runner that hasn't sent an init packet. -const RUNNER_INIT_TIMEOUT: Duration = Duration::from_secs(5); /// How often to check for the actor's runner to start. const GET_RUNNER_INTERVAL: Duration = std::time::Duration::from_millis(250); /// How many times to check for the actor's runner to start. @@ -63,8 +57,6 @@ pub enum RuntimeError { }, #[error("ws failed: {0}")] SocketFailed(tokio_tungstenite::tungstenite::Error), - #[error("runner socket failed: {0}")] - RunnerSocketListenFailed(std::io::Error), #[error("socket closed: {0}, {1}")] SocketClosed(CloseCode, String), #[error("stream closed")] @@ -89,7 +81,6 @@ struct RunnerRow { pub struct Ctx { config: Config, system: SystemInfo, - secret: Vec, // This requires a RwLock because of the reset functionality which reinitialized the entire database. It // should never be written to besides that. @@ -106,14 +97,12 @@ impl Ctx { pub fn new( config: Config, system: SystemInfo, - secret: Vec, pool: SqlitePool, tx: SplitSink>, Message>, ) -> Arc { Arc::new(Ctx { config, system, - secret, pool: RwLock::new(pool), tx: Mutex::new(tx), @@ -220,44 +209,6 @@ impl Ctx { self.receive_init(&mut rx).await?; - // Start runner socket and attaches incoming connections to their corresponding runner - let self2 = self.clone(); - let runner_socket: tokio::task::JoinHandle> = tokio::spawn(async move { - tracing::info!(port=%self2.config().runner.port(), "listening for runner sockets"); - - let listener = - TcpListener::bind((self2.config().runner.ip(), self2.config().runner.port())) - .await - .map_err(RuntimeError::RunnerSocketListenFailed)?; - - loop { - match listener.accept().await { - Ok((stream, _)) => { - let mut ws_stream = Some(tokio_tungstenite::accept_async(stream).await?); - - tracing::info!("received new socket"); - - if let Err(err) = self2.receive_runner_init_message(&mut ws_stream).await { - tracing::error!( - ?err, - "failed to receive init message from runner socket" - ); - } - - // Close stream - if let Some(mut ws_stream) = ws_stream { - let close_frame = CloseFrame { - code: CloseCode::Error, - reason: "init failed".into(), - }; - ws_stream.send(Message::Close(Some(close_frame))).await?; - } - } - Err(err) => tracing::error!(?err, "failed to connect websocket"), - } - } - }); - // Start ping thread after init packet is received because ping denotes this client as "ready" let self2 = self.clone(); let ping_thread: tokio::task::JoinHandle> = tokio::spawn(async move { @@ -298,7 +249,6 @@ impl Ctx { }); tokio::try_join!( - async { runner_socket.await? }, async { ping_thread.await? }, async { ack_thread.await? }, self.receive_messages(rx), @@ -496,7 +446,7 @@ impl Ctx { } protocol::Command::SignalRunner { runner_id, signal } => { if let Some(runner) = self.runners.read().await.get(&runner_id) { - runner.signal(signal.try_into()?).await?; + runner.signal(self, signal.try_into()?).await?; } else { tracing::warn!( ?runner_id, @@ -854,8 +804,14 @@ impl Ctx { let runner = runner.clone(); let self2 = self.clone(); tokio::spawn(async move { - if let Err(err) = runner.observe(&self2, false).await { - tracing::error!(?runner_id, ?err, "observe failed"); + // The socket file already exists, this will reconnect and spawn the appropriate task to + // handle the connection + if let Err(err) = runner.start_socket(&self2).await { + tracing::error!(?runner_id, ?err, "start socket failed"); + + if let Err(err) = runner.observe(&self2, false).await { + tracing::error!(?runner_id, ?err, "observe failed"); + } } if let Err(err) = runner.cleanup(&self2).await { @@ -993,10 +949,6 @@ impl Ctx { &self.config.client } - pub fn secret(&self) -> &[u8] { - &self.secret - } - pub fn runners_path(&self) -> PathBuf { self.config().data_dir().join("runners") } @@ -1012,10 +964,6 @@ impl Ctx { pub fn image_path(&self, image_id: Uuid) -> PathBuf { self.images_path().join(image_id.to_string()) } - - pub fn isolate_runner_path(&self) -> PathBuf { - self.config().data_dir().join("runner") - } } // Test bindings diff --git a/packages/edge/infra/client/manager/src/image_download_handler.rs b/packages/edge/infra/client/manager/src/image_download_handler.rs index 8bfcffc57e..d972b1457f 100644 --- a/packages/edge/infra/client/manager/src/image_download_handler.rs +++ b/packages/edge/infra/client/manager/src/image_download_handler.rs @@ -116,60 +116,28 @@ impl ImageDownloadHandler { let mut conn = ctx.sql().await?; let mut tx = conn.begin().await?; - let ((cache_count, images_dir_size), image_download_size) = tokio::try_join!( - async { - // Get total size of images directory. Note that it doesn't matter if this doesn't - // match the actual fs size because it should either be exactly at or below actual fs - // size. Also calculating fs size manually is expensive. + // Get total size of images directory. Note that it doesn't matter if this doesn't + // match the actual fs size because it should either be exactly at or below actual fs + // size. Also calculating fs size manually is expensive. + let (cache_count, images_dir_size) = sqlx::query_as::<_, (i64, i64)>(indoc!( " SELECT COUNT(size), COALESCE(SUM(size), 0) FROM images_cache ", )) .fetch_one(&mut *tx) - .await - .map_err(Into::::into) - }, - // NOTE: The image size here is somewhat misleading because its only the size of the - // downloaded archive and not the total disk usage after it is unpacked. However, this is - // good enough - self.fetch_image_download_size(ctx, image_config), - )?; - - let rows = sqlx::query_as::<_, (Uuid, i64, Option, i64)>(indoc!( - " - SELECT - ic.image_id, - ic.size, - ic.last_used_ts, - SUM(ic.size) - OVER (ORDER BY ic.last_used_ts ROWS UNBOUNDED PRECEDING) - AS running_total - FROM images_cache AS ic - LEFT JOIN actors AS a - -- Filter out images that are currently in use by actors - ON - ic.image_id = a.image_id AND - a.stop_ts IS NULL AND - a.exit_ts IS NULL - WHERE - -- Filter out current image, will be upserted - ic.image_id != ?1 AND - a.image_id IS NULL - ORDER BY ic.last_used_ts - ", - )) - .bind(image_config.id) - .bind( - (images_dir_size as u64) - .saturating_add(image_download_size) - .saturating_sub(ctx.config().images.max_cache_size()) as i64, - ) - .fetch_all(&mut *tx) - .await?; + .await?; // Prune images - let (removed_count, removed_bytes) = if images_dir_size as u64 + image_download_size + // + // HACK: The artifact_size here is somewhat misleading because its only the size of the + // downloaded archive and not the total disk usage after it is unpacked. However, this is size + // is recalculated later once decompressed, so this will only ever exceed the cache + // size limit in edge cases by `actual size - compressed size`. In this situation, + // that extra difference is already reserved on the file system by the runner + // itself. + let (removed_count, removed_bytes) = if images_dir_size as u64 + + image_config.artifact_size > ctx.config().images.max_cache_size() { // Fetch as many images as it takes to clear up enough space for this new image. @@ -253,7 +221,7 @@ impl ImageDownloadHandler { }; metrics::IMAGE_CACHE_COUNT.set(cache_count + 1 - removed_count); - metrics::IMAGE_CACHE_SIZE.set(images_dir_size + image_download_size as i64 - removed_bytes); + metrics::IMAGE_CACHE_SIZE.set(images_dir_size + image_config.artifact_size as i64 - removed_bytes); sqlx::query(indoc!( " diff --git a/packages/edge/infra/client/manager/src/lib.rs b/packages/edge/infra/client/manager/src/lib.rs index 8d78e859e7..5239d4c7ad 100644 --- a/packages/edge/infra/client/manager/src/lib.rs +++ b/packages/edge/infra/client/manager/src/lib.rs @@ -3,8 +3,6 @@ #[cfg(feature = "test")] mod actor; #[cfg(feature = "test")] -mod claims; -#[cfg(feature = "test")] mod ctx; #[cfg(feature = "test")] pub mod event_sender; diff --git a/packages/edge/infra/client/manager/src/main.rs b/packages/edge/infra/client/manager/src/main.rs index 2ced8a8e78..38dcf6277a 100644 --- a/packages/edge/infra/client/manager/src/main.rs +++ b/packages/edge/infra/client/manager/src/main.rs @@ -20,7 +20,6 @@ use tracing_subscriber::{prelude::*, EnvFilter}; use url::Url; mod actor; -mod claims; mod ctx; mod event_sender; mod image_download_handler; @@ -36,7 +35,6 @@ const PROTOCOL_VERSION: u16 = 2; struct Init { config: Config, system: SystemInfo, - secret: Vec, pool: SqlitePool, } @@ -150,15 +148,12 @@ async fn init() -> Result { // Init project directories utils::init_dir(&config).await?; - let secret = utils::load_secret(&config).await?; - // Init sqlite db let pool = utils::init_sqlite_db(&config).await?; Ok(Init { config, system, - secret, pool, }) } @@ -176,7 +171,7 @@ async fn run(init: Init, first: bool) -> Result<()> { } // Start metrics server - let metrics_thread = tokio::spawn(metrics::run_standalone(init.config.client.metrics.port())); + let metrics_task = tokio::spawn(metrics::run_standalone(init.config.client.metrics.port())); let url = build_ws_url(&init.config).await?; tracing::info!("connecting to pegboard ws: {}", &url); @@ -192,10 +187,10 @@ async fn run(init: Init, first: bool) -> Result<()> { tracing::info!("connected to pegboard ws"); - let ctx = Ctx::new(init.config, init.system, init.secret, init.pool, tx); + let ctx = Ctx::new(init.config, init.system, init.pool, tx); tokio::try_join!( - async { metrics_thread.await?.map_err(Into::into) }, + async { metrics_task.await.map_err(Into::::into) }, ctx.run(rx), )?; diff --git a/packages/edge/infra/client/manager/src/metrics/mod.rs b/packages/edge/infra/client/manager/src/metrics/mod.rs index a47cdf35c9..4b9751b0d1 100644 --- a/packages/edge/infra/client/manager/src/metrics/mod.rs +++ b/packages/edge/infra/client/manager/src/metrics/mod.rs @@ -87,6 +87,13 @@ lazy_static::lazy_static! { *REGISTRY, ).unwrap(); + pub static ref DOWNLOAD_IMAGE_DURATION: Histogram = register_histogram_with_registry!( + "download_image_duration", + "Duration of image download", + BUCKETS.to_vec(), + *REGISTRY, + ).unwrap(); + pub static ref IMAGE_DOWNLOAD_REQUEST_TOTAL: IntCounter = register_int_counter_with_registry!( "image_download_request_total", "Total number of download requests.", diff --git a/packages/edge/infra/client/manager/src/runner/mod.rs b/packages/edge/infra/client/manager/src/runner/mod.rs index 074077662a..827f05f950 100644 --- a/packages/edge/infra/client/manager/src/runner/mod.rs +++ b/packages/edge/infra/client/manager/src/runner/mod.rs @@ -6,6 +6,7 @@ use std::{ }; use anyhow::*; +use bytes::Bytes; use futures_util::{ stream::{FuturesUnordered, SplitSink, SplitStream}, FutureExt, SinkExt, StreamExt, @@ -18,18 +19,13 @@ use nix::{ }; use pegboard::protocol; use pegboard_config::runner_protocol; +use sqlx::Acquire; use tokio::{ fs, - net::TcpStream, + net::UnixStream, sync::{broadcast, Mutex, RwLock}, }; -use tokio_tungstenite::{ - tungstenite::protocol::{ - frame::{coding::CloseCode, CloseFrame}, - Message, - }, - WebSocketStream, -}; +use tokio_util::codec::{Framed, LengthDelimitedCodec}; use uuid::Uuid; use crate::{ctx::Ctx, utils}; @@ -115,11 +111,7 @@ impl Runner { let _ = self.bump_channel.send(()); } - pub async fn attach_socket( - self: &Arc, - ctx: &Arc, - ws_stream: WebSocketStream, - ) -> Result<()> { + pub async fn attach_socket(self: &Arc, ctx: &Arc, stream: UnixStream) -> Result<()> { match &self.comms { Comms::Basic => bail!("attempt to attach socket to basic runner"), Comms::Socket(tx) => { @@ -127,36 +119,39 @@ impl Runner { let mut guard = tx.lock().await; - if let Some(existing_ws_tx) = &mut *guard { + if let Some(existing_tx) = &mut *guard { tracing::info!(runner_id=?self.runner_id, "runner received another socket, closing old one"); // Close the old socket - let close_frame = CloseFrame { - code: CloseCode::Error, - reason: "replacing with new socket".into(), + let buf = runner_protocol::encode_frame(&runner_protocol::ToRunner::Close { + reason: Some("replacing with new socket".into()), + })?; + + if let Err(err) = existing_tx.send(buf.into()).await { + tracing::error!(runner_id=?self.runner_id, ?err, "failed to close old socket"); }; - existing_ws_tx - .send(Message::Close(Some(close_frame))) - .await?; tracing::info!(runner_id=?self.runner_id, "socket replaced"); } else { tracing::info!(runner_id=?self.runner_id, "socket attached"); } - let (ws_tx, ws_rx) = ws_stream.split(); + // Wrap the stream in a framed transport + let framed = Framed::new(stream, runner_protocol::codec()); + let (tx, rx) = framed.split(); - *guard = Some(ws_tx); + *guard = Some(tx); self.bump(); + // TODO: need to kill old handler thread // Spawn a new thread to handle incoming messages let self2 = self.clone(); let ctx2 = ctx.clone(); tokio::task::spawn(async move { - if let Err(err) = self2.receive_messages(&ctx2, ws_rx).await { + if let Err(err) = self2.receive_frames(&ctx2, rx).await { tracing::error!(runner_id=?self2.runner_id, ?err, "socket error, killing runner"); - if let Err(err) = self2.signal(Signal::SIGKILL).await { + if let Err(err) = self2.signal(&ctx2, Signal::SIGKILL).await { tracing::error!(runner_id=?self2.runner_id, %err, "failed to kill runner"); } } @@ -167,52 +162,49 @@ impl Runner { Ok(()) } - async fn receive_messages( + async fn receive_frames( &self, - ctx: &Ctx, - mut ws_rx: SplitStream>, + _ctx: &Ctx, + mut ws_rx: SplitStream>, ) -> Result<()> { loop { - match tokio::time::timeout(PING_TIMEOUT, ws_rx.next()).await { - Ok(msg) => match msg { - Some(Ok(Message::Ping(_))) => { - // Pongs are sent automatically - } - Some(Ok(Message::Close(_))) | None => { - tracing::debug!(runner_id=?self.runner_id, "runner socket closed"); - break Ok(()); - } - Some(Ok(Message::Binary(buf))) => { - let packet = serde_json::from_slice::(&buf)?; + let Ok(frame) = tokio::time::timeout(PING_TIMEOUT, ws_rx.next()).await else { + bail!("runner socket ping timed out"); + }; - self.process_packet(ctx, packet).await?; - } - Some(Ok(packet)) => bail!("runner socket unexpected packet: {packet:?}"), - Some(Err(err)) => break Err(err).context("runner socket error"), - }, - Err(_) => bail!("socket ping timed out"), - } - } - } + let Some(buf) = frame else { + tracing::debug!(runner_id=?self.runner_id, "runner socket closed"); + break Ok(()); + }; + + let (_, packet) = runner_protocol::decode_frame::(&buf?).context("failed to decode frame")?; - async fn process_packet(&self, ctx: &Ctx, packet: runner_protocol::ToManager) -> Result<()> { - tracing::debug!(?packet, "runner received packet"); - - match packet { - runner_protocol::ToManager::Init { .. } => bail!("unexpected second init packet"), - runner_protocol::ToManager::ActorStateUpdate { - actor_id, - generation, - state, - } => { - // NOTE: We don't have to verify if the actor id given here is valid because only valid actors - // are listening to this runner's `actor_observer_tx`. This means invalid messages are ignored. - // NOTE: No receivers is not an error - let _ = self.actor_observer_tx.send((actor_id, generation, state)); + tracing::debug!(?packet, "runner received packet"); + + match packet { + runner_protocol::ToManager::Ping { .. } => { + // TODO: Rate limit? + self.send(&runner_protocol::ToRunner::Pong).await?; + } + runner_protocol::ToManager::ActorStateUpdate { + actor_id, + generation, + state, + } => { + match self.config.image.allocation_type { + protocol::ImageAllocationType::Single => { + tracing::debug!("unexpected state update from non-multi runner"); + } + protocol::ImageAllocationType::Multi => { + // NOTE: We don't have to verify if the actor id given here is valid because only valid actors + // are listening to this runner's `actor_observer_tx`. This means invalid messages are ignored. + // NOTE: No receivers is not an error + let _ = self.actor_observer_tx.send((actor_id, generation, state)); + } + } + } } } - - Ok(()) } pub async fn send(&self, packet: &runner_protocol::ToRunner) -> Result<()> { @@ -248,9 +240,9 @@ impl Runner { })??; let socket = guard.as_mut().expect("should exist"); - let buf = serde_json::to_vec(packet)?; + let buf = runner_protocol::encode_frame(packet).context("failed to encode frame")?; socket - .send(Message::Binary(buf)) + .send(buf.into()) .await .context("failed to send packet to socket")?; } @@ -279,9 +271,10 @@ impl Runner { runner_id, comms, config, + image_id, start_ts ) - VALUES (?1, ?2, ?3, ?4) + VALUES (?1, ?2, ?3, ?4, ?5) ON CONFLICT (runner_id) DO NOTHING ", )) @@ -292,6 +285,7 @@ impl Runner { setup::Comms::Basic } as i32) .bind(&config_json) + .bind(self.config.image.id) .bind(utils::now()) .execute(&mut *ctx.sql().await?) .await @@ -337,7 +331,7 @@ impl Runner { Ok(proxied_ports) } - async fn run(&self, ctx: &Ctx) -> Result<()> { + async fn run(&self, ctx: &Ctx, actor_id: Option) -> Result<()> { // NOTE: This is the env that goes to the container-runner process, NOT the env that is inserted into // the container. let mut runner_env = vec![ @@ -357,6 +351,10 @@ impl Runner { ), ]; + if let Some(actor_id) = actor_id { + runner_env.push(("ACTOR_ID", actor_id.to_string())); + } + if let Some(vector) = &ctx.config().vector { runner_env.push(("VECTOR_SOCKET_ADDR", vector.address.to_string())); } @@ -467,7 +465,7 @@ impl Runner { ActorObserver::new(actor_id, generation, self.actor_observer_tx.subscribe()) } - pub async fn signal(&self, signal: Signal) -> Result<()> { + pub async fn signal(&self, ctx: &Ctx, signal: Signal) -> Result<()> { // https://pubs.opengroup.org/onlinepubs/9699919799/functions/kill.html if (signal as i32) < 1 { bail!("signals < 1 not allowed"); @@ -483,6 +481,45 @@ impl Runner { Err(err) => return Err(err.into()), } + // Update DB + utils::sql::query(|| async { + let mut conn = ctx.sql().await?; + let mut tx = conn.begin().await?; + + sqlx::query(indoc!( + " + UPDATE runners + SET + stop_ts = ?2 + WHERE + runner_id = ?1 AND + stop_ts IS NULL + ", + )) + .bind(self.runner_id) + .bind(utils::now()) + .execute(&mut *tx) + .await?; + + // Update LRU cache + sqlx::query(indoc!( + " + UPDATE images_cache + SET last_used_ts = ?2 + WHERE image_id = ?1 + ", + )) + .bind(self.config.image.id) + .bind(utils::now()) + .execute(&mut *tx) + .await?; + + tx.commit().await?; + + Ok(()) + }) + .await?; + Ok(()) } @@ -640,7 +677,7 @@ impl Runner { pub enum Comms { Basic, - Socket(Mutex, Message>>>), + Socket(Mutex, Bytes>>>), } impl Comms { diff --git a/packages/edge/infra/client/manager/src/runner/oci_config.rs b/packages/edge/infra/client/manager/src/runner/oci_config.rs index 3ab8196ed9..698e349362 100644 --- a/packages/edge/infra/client/manager/src/runner/oci_config.rs +++ b/packages/edge/infra/client/manager/src/runner/oci_config.rs @@ -7,6 +7,7 @@ use super::{partial_oci_config::PartialOciConfigUser, seccomp}; pub struct ConfigOpts<'a> { pub runner_path: &'a Path, pub netns_path: &'a Path, + pub socket_path: &'a Path, pub args: Vec, pub env: Vec, pub user: PartialOciConfigUser, @@ -167,7 +168,7 @@ fn capabilities() -> Vec<&'static str> { } fn mounts(opts: &ConfigOpts) -> Result { - Ok(json!([ + let mounts = json!([ { "destination": "/proc", "type": "proc", @@ -258,7 +259,17 @@ fn mounts(opts: &ConfigOpts) -> Result { "source": opts.actor_path.join("hosts").to_str().context("hosts path")?, "options": ["rbind", "rprivate"] }, - ])) + + // Manager socket + { + "destination": socket_mount_dest_path().to_str().context("manager.sock dest path")?, + "type": "bind", + "source": opts.socket_path.to_str().context("manager.sock source path")?, + "options": ["bind", "rw"] + }, + ]); + + Ok(mounts) } fn linux_resources_devices() -> serde_json::Value { @@ -335,3 +346,8 @@ fn linux_resources_devices() -> serde_json::Value { } ]) } + +/// Mounting path of the manager socket inside of the container. +pub fn socket_mount_dest_path() -> &'static Path { + Path::new("/srv/sockets/manager.sock") +} diff --git a/packages/edge/infra/client/manager/src/runner/setup.rs b/packages/edge/infra/client/manager/src/runner/setup.rs index 1213dca07d..91d9977e4f 100644 --- a/packages/edge/infra/client/manager/src/runner/setup.rs +++ b/packages/edge/infra/client/manager/src/runner/setup.rs @@ -1,11 +1,11 @@ use std::{ collections::HashMap, - os::unix::process::CommandExt, + os::unix::{fs::PermissionsExt, process::CommandExt}, path::{Path, PathBuf}, process::Stdio, result::Result::{Err, Ok}, sync::Arc, - time::{Duration, Instant}, + time::Instant, }; use anyhow::*; @@ -21,19 +21,13 @@ use sqlx::Acquire; use strum::FromRepr; use tokio::{ fs::{self, File}, + net::UnixListener, process::Command, }; use uuid::Uuid; use super::{oci_config, Runner}; -use crate::{ - claims::{Claims, Entitlement}, - ctx::Ctx, - utils, -}; - -/// How long an access token given to a runner lasts. 1 Year. -const ACCESS_TOKEN_TTL: Duration = Duration::from_secs(60 * 60 * 24 * 365); +use crate::{ctx::Ctx, utils}; #[derive(Hash, Debug, Clone, Copy, PartialEq, Eq, FromRepr)] pub enum Comms { @@ -61,8 +55,8 @@ impl Runner { let (_, ports) = tokio::try_join!( async { - self.make_fs(&ctx).await?; self.download_image(&ctx).await?; + self.make_fs(&ctx).await?; Result::<(), anyhow::Error>::Ok(()) }, @@ -84,6 +78,8 @@ impl Runner { "setup tasks completed" ); + self.start_socket(ctx).await?; + tracing::info!(runner_id=?self.runner_id, "setting up runtime environment"); self.setup_oci_bundle(&ctx, &ports).await?; @@ -232,6 +228,49 @@ impl Runner { Ok(()) } + pub(crate) async fn start_socket(self: &Arc, ctx: &Arc) -> Result<()> { + tracing::info!(runner_id=?self.runner_id, "starting socket"); + + let runner_path = ctx.runner_path(self.runner_id); + let socket_path = runner_path.join("manager.sock"); + + ensure!( + socket_path.as_os_str().len() <= 104, + "socket path ({}) length is too long (> 104 bytes aka SUN_LEN)", + socket_path.display() + ); + + // Bind the listener (creates the socket file or reconnects if it already exists) + let listener = UnixListener::bind(&socket_path).context("failed to bind unix listener")?; + + // Allow the container process to listen to the socket file + fs::set_permissions(&socket_path, std::fs::Permissions::from_mode(0o666)) + .await + .context("failed to set permissions for socket file")?; + + // NOTE: This task and listener do not have to be cleaned up manually because they will stop when the + // socket file is deleted (upon `cleanup_setup` call). + let self2 = self.clone(); + let ctx2 = ctx.clone(); + tokio::task::spawn(async move { + if let Err(err) = self2.handle_socket(&ctx2, listener).await { + tracing::error!(runner_id=?self2.runner_id, ?err, "socket listener failed"); + + if let Err(err) = self2.cleanup(&ctx2).await { + tracing::error!(runner_id=?self2.runner_id, ?err, "cleanup failed"); + } + } + }); + + Ok(()) + } + + async fn handle_socket(self: &Arc, ctx: &Arc, listener: UnixListener) -> Result<()> { + let (stream, _) = listener.accept().await?; + + self.attach_socket(ctx, stream).await + } + pub async fn setup_oci_bundle( &self, ctx: &Ctx, @@ -243,6 +282,7 @@ impl Runner { let runner_path = ctx.runner_path(self.runner_id); let fs_path = runner_path.join("fs").join("upper"); let netns_path = self.netns_path(); + let socket_path = runner_path.join("manager.sock"); // Read the config.json from the user-provided OCI bundle tracing::info!( @@ -261,19 +301,12 @@ impl Runner { runner_id=?self.runner_id, "building environment variables", ); - let access_token = Claims::new( - Entitlement::Runner { - runner_id: self.runner_id, - }, - ACCESS_TOKEN_TTL, - ) - .encode(ctx.secret())?; let env = user_config .process .env .into_iter() .chain( - self.build_default_env(ctx, &ports, &access_token) + self.build_default_env(ctx, &ports) .into_iter() .map(|(k, v)| format!("{k}={v}")), ) @@ -289,6 +322,7 @@ impl Runner { let config = oci_config::config(oci_config::ConfigOpts { runner_path: &runner_path, netns_path: &netns_path, + socket_path: &socket_path, args: user_config.process.args, env, user: user_config.process.user, @@ -577,6 +611,19 @@ impl Runner { // Disassociate from the parent by creating a new session setsid().context("setsid failed")?; + // Adjust nice, cpu priority, and OOM score + let pid = std::process::id() as i32; + utils::libc::set_nice_level(pid, 0).context("failed to set nice level")?; + utils::libc::set_oom_score_adj(pid, 0) + .context("failed to set oom score adjustment")?; + utils::libc::set_scheduling_policy( + pid, + utils::libc::SchedPolicy::Other, + // Must be 0 with SCHED_OTHER + 0, + ) + .context("failed to set scheduling policy")?; + // Exit immediately on fail in order to not leak process let err = std::process::Command::new("sh") .args(&runner_args) @@ -609,15 +656,14 @@ impl Runner { if ctx.config().runner.use_mounts() { match Command::new("umount") .arg("-dl") - .arg(actor_path.join("fs").join("upper")) + .arg(runner_path.join("fs").join("upper")) .output() .await { Result::Ok(cmd_out) => { if !cmd_out.status.success() { tracing::error!( - actor_id=?self.actor_id, - generation=?self.generation, + runner_id=?self.runner_id, stdout=%std::str::from_utf8(&cmd_out.stdout).unwrap_or(""), stderr=%std::str::from_utf8(&cmd_out.stderr).unwrap_or(""), "failed overlay `umount` command", @@ -626,8 +672,7 @@ impl Runner { } Err(err) => { tracing::error!( - actor_id=?self.actor_id, - generation=?self.generation, + runner_id=?self.runner_id, ?err, "failed to run overlay `umount` command", ); @@ -784,22 +829,10 @@ impl Runner { } } - // Path to the created namespace - fn netns_path(&self) -> PathBuf { - if let protocol::NetworkMode::Host = self.config.network_mode { - // Host network - Path::new("/proc/1/ns/net").to_path_buf() - } else { - // CNI network that will be created - Path::new("/var/run/netns").join(self.runner_id.to_string()) - } - } - fn build_default_env( &self, ctx: &Ctx, ports: &protocol::HashableMap, - access_token: &str, ) -> HashMap { self.config .env @@ -813,19 +846,17 @@ impl Runner { ) })) .chain([ - ( - "RIVET_MANAGER_IP".to_string(), - ctx.config().runner.ip().to_string(), - ), - ( - "RIVET_MANAGER_PORT".to_string(), - ctx.config().runner.port().to_string(), - ), ( "RIVET_API_ENDPOINT".to_string(), ctx.config().cluster.api_endpoint.to_string(), ), - ("RIVET_ACCESS_TOKEN".to_string(), access_token.to_string()), + ( + "RIVET_MANAGER_SOCKET_PATH".to_string(), + oci_config::socket_mount_dest_path() + .to_str() + .expect("invalid `socket_mount_dest_path`") + .to_string(), + ), ]) .collect() } @@ -847,6 +878,19 @@ fn build_hosts_content(ctx: &Ctx) -> String { content } +impl Runner { + // Path to the created namespace + fn netns_path(&self) -> PathBuf { + if let protocol::NetworkMode::Host = self.config.network_mode { + // Host network + Path::new("/proc/1/ns/net").to_path_buf() + } else { + // CNI network that will be created + Path::new("/var/run/netns").join(self.runner_id.to_string()) + } + } +} + async fn bind_ports_inner( ctx: &Ctx, runner_id: Uuid, diff --git a/packages/edge/infra/client/manager/src/utils/mod.rs b/packages/edge/infra/client/manager/src/utils/mod.rs index d7cf48ba81..46d53231d1 100644 --- a/packages/edge/infra/client/manager/src/utils/mod.rs +++ b/packages/edge/infra/client/manager/src/utils/mod.rs @@ -5,14 +5,12 @@ use std::{ }; use anyhow::*; -use base64::{engine::general_purpose, Engine}; use indoc::indoc; use notify::{ event::{AccessKind, AccessMode}, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher, }; use pegboard_config::Config; -use ring::rand::{SecureRandom, SystemRandom}; use sql::SqlitePoolExt; use sqlx::{ migrate::MigrateDatabase, @@ -24,7 +22,6 @@ use sqlx::{ }; use tokio::{ fs, - io::AsyncWriteExt, sync::mpsc::{channel, Receiver}, }; @@ -95,32 +92,6 @@ pub async fn init_sqlite_db(config: &Config) -> Result { Ok(pool) } -pub async fn load_secret(config: &Config) -> Result> { - let secret_path = config.client.data_dir().join("secret.key"); - - // If the file doesn't exist, generate and persist it - if fs::metadata(&secret_path).await.is_err() { - // Generate new key - let rng = SystemRandom::new(); - let mut key = [0u8; 32]; - rng.fill(&mut key)?; - let b64 = general_purpose::STANDARD.encode(&key); - - let mut file = fs::File::create(&secret_path).await?; - file.write_all(b64.as_bytes()).await?; - file.flush().await?; - - Ok(key.into()) - } else { - let b64 = fs::read_to_string(&secret_path).await?; - let key = general_purpose::STANDARD.decode(b64.trim())?; - - ensure!(key.len() == 32, "Invalid key length"); - - Ok(key) - } -} - async fn build_sqlite_pool(db_url: &str) -> Result { let opts = db_url .parse::()? @@ -205,9 +176,11 @@ async fn init_sqlite_schema(pool: &SqlitePool) -> Result<()> { runner_id BLOB NOT NULL, -- UUID comms INTEGER NOT NULL, -- runner::setup::Comms config BLOB NOT NULL, + image_id BLOB NOT NULL, -- UUID start_ts INTEGER NOT NULL, running_ts INTEGER, + stop_ts INTEGER, exit_ts INTEGER, pid INTEGER, @@ -222,15 +195,44 @@ async fn init_sqlite_schema(pool: &SqlitePool) -> Result<()> { sqlx::query(indoc!( " - CREATE TABLE IF NOT EXISTS images_cache ( - image_id BLOB NOT NULL, -- UUID - size INTEGER NOT NULL, + CREATE INDEX IF NOT EXISTS runners_image_id_idx + ON runners(image_id) + WHERE stop_ts IS NULL + ", + )) + .execute(&mut *conn) + .await?; - last_used_ts INTEGER NOT NULL, - download_complete_ts INTEGER, + sqlx::query(indoc!( + " + CREATE TABLE IF NOT EXISTS runner_ports ( + runner_id BLOB NOT NULL, -- UUID + label TEXT NOT NULL, + source INT NOT NULL, + target INT, + protocol INT NOT NULL, -- protocol::TransportProtocol + + delete_ts INT + ) STRICT + ", + )) + .execute(&mut *conn) + .await?; - PRIMARY KEY (image_id) - ) STRICT + sqlx::query(indoc!( + " + CREATE INDEX IF NOT EXISTS runner_ports_runner_id_idx + ON runner_ports(runner_id) + ", + )) + .execute(&mut *conn) + .await?; + + sqlx::query(indoc!( + " + CREATE UNIQUE INDEX IF NOT EXISTS runner_ports_source_unique_idx + ON runner_ports(source, protocol) + WHERE delete_ts IS NULL ", )) .execute(&mut *conn) @@ -253,9 +255,6 @@ async fn init_sqlite_schema(pool: &SqlitePool) -> Result<()> { exit_code INTEGER, - -- Also exists in the config column but this is for indexing - image_id BLOB NOT NULL, -- UUID - PRIMARY KEY (actor_id, generation) ) STRICT ", @@ -265,49 +264,20 @@ async fn init_sqlite_schema(pool: &SqlitePool) -> Result<()> { sqlx::query(indoc!( " - CREATE INDEX IF NOT EXISTS actors_image_id_idx - ON actors(image_id) - WHERE stop_ts IS NULL - ", - )) - .execute(&mut *conn) - .await?; + CREATE TABLE IF NOT EXISTS images_cache ( + image_id BLOB NOT NULL, -- UUID + size INTEGER NOT NULL, - sqlx::query(indoc!( - " - CREATE TABLE IF NOT EXISTS runner_ports ( - runner_id BLOB NOT NULL, -- UUID - label TEXT NOT NULL, - source INT NOT NULL, - target INT, - protocol INT NOT NULL, -- protocol::TransportProtocol + last_used_ts INTEGER NOT NULL, + download_complete_ts INTEGER, - delete_ts INT + PRIMARY KEY (image_id) ) STRICT ", )) .execute(&mut *conn) .await?; - sqlx::query(indoc!( - " - CREATE INDEX IF NOT EXISTS runner_ports_runner_id_idx - ON runner_ports(runner_id) - ", - )) - .execute(&mut *conn) - .await?; - - sqlx::query(indoc!( - " - CREATE UNIQUE INDEX IF NOT EXISTS runner_ports_source_unique_idx - ON runner_ports(source, protocol) - WHERE delete_ts IS NULL - ", - )) - .execute(&mut *conn) - .await?; - Ok(()) } diff --git a/packages/edge/infra/client/manager/tests/client_rebuild_state.rs b/packages/edge/infra/client/manager/tests/client_rebuild_state.rs index 8ecd443743..fba7baabbb 100644 --- a/packages/edge/infra/client/manager/tests/client_rebuild_state.rs +++ b/packages/edge/infra/client/manager/tests/client_rebuild_state.rs @@ -14,7 +14,6 @@ use pegboard::protocol; use pegboard_manager::Ctx; use tokio::{net::TcpStream, sync::Mutex}; use tokio_tungstenite::tungstenite::protocol::Message; -use uuid::Uuid; mod common; use common::*; diff --git a/packages/edge/infra/client/manager/tests/client_state_external_kill.rs b/packages/edge/infra/client/manager/tests/client_state_external_kill.rs index f49b2df864..80634fac9e 100644 --- a/packages/edge/infra/client/manager/tests/client_state_external_kill.rs +++ b/packages/edge/infra/client/manager/tests/client_state_external_kill.rs @@ -17,7 +17,6 @@ use pegboard::protocol; use pegboard_manager::Ctx; use tokio::{net::TcpStream, sync::Mutex}; use tokio_tungstenite::tungstenite::protocol::Message; -use uuid::Uuid; mod common; use common::*; diff --git a/packages/edge/infra/client/manager/tests/common.rs b/packages/edge/infra/client/manager/tests/common.rs index 16f642052c..96888d684e 100644 --- a/packages/edge/infra/client/manager/tests/common.rs +++ b/packages/edge/infra/client/manager/tests/common.rs @@ -59,7 +59,7 @@ pub async fn send_init_packet(tx: &mut SplitSink( pub async fn init_client(gen_path: &Path, working_path: &Path) -> Config { let container_runner_binary_path = working_path.join("bin").join("container-runner"); - tokio::fs::create_dir(working_path.join("bin")) + tokio::fs::create_dir_all(working_path.join("bin")) .await .unwrap(); @@ -226,9 +225,8 @@ pub async fn init_client(gen_path: &Path, working_path: &Path) -> Config { api_endpoint: Url::parse("http://127.0.0.1").unwrap(), }, runner: Runner { - ip: None, - port: None, use_mounts: Some(true), + use_resource_constraints: Some(true), container_runner_binary_path: Some(container_runner_binary_path), }, images: Images { @@ -278,8 +276,6 @@ pub async fn start_client( ) { let system = system_info::fetch().await.unwrap(); - let secret = utils::load_secret(&config).await.unwrap(); - let pool = utils::init_sqlite_db(&config).await.unwrap(); // Build WS connection URL @@ -300,7 +296,7 @@ pub async fn start_client( tracing::info!("connected"); - let ctx = Ctx::new(config, system, secret, pool, tx); + let ctx = Ctx::new(config, system, pool, tx); // Share reference { diff --git a/packages/edge/infra/client/manager/tests/container_external_kill.rs b/packages/edge/infra/client/manager/tests/container_external_kill.rs index cc48f4da2d..336a0c897c 100644 --- a/packages/edge/infra/client/manager/tests/container_external_kill.rs +++ b/packages/edge/infra/client/manager/tests/container_external_kill.rs @@ -11,7 +11,6 @@ use pegboard::protocol; use pegboard_manager::Ctx; use tokio::{net::TcpStream, sync::Mutex}; use tokio_tungstenite::tungstenite::protocol::Message; -use uuid::Uuid; mod common; use common::*; diff --git a/packages/edge/infra/client/manager/tests/container_lifecycle.rs b/packages/edge/infra/client/manager/tests/container_lifecycle.rs index 7036c3a33e..e86cd3fb8f 100644 --- a/packages/edge/infra/client/manager/tests/container_lifecycle.rs +++ b/packages/edge/infra/client/manager/tests/container_lifecycle.rs @@ -8,7 +8,6 @@ use pegboard::protocol; use pegboard_manager::Ctx; use tokio::{net::TcpStream, sync::Mutex}; use tokio_tungstenite::tungstenite::protocol::Message; -use uuid::Uuid; mod common; use common::*; diff --git a/packages/edge/infra/client/manager/tests/vector.json b/packages/edge/infra/client/manager/tests/vector.json index 730cbc0cf8..8c2cdc760a 100644 --- a/packages/edge/infra/client/manager/tests/vector.json +++ b/packages/edge/infra/client/manager/tests/vector.json @@ -18,7 +18,7 @@ } }, "transforms": { - "runners": { + "actors": { "type": "filter", "inputs": [ "vector", @@ -26,19 +26,19 @@ ], "condition": { "type": "vrl", - "source": ".source == \"runners\"" + "source": ".source == \"actors\"" } }, "add_prefix": { "type": "remap", "inputs": [ - "runners" + "actors" ], - "source": ".message, err = \"\u001b[2m\" + \"runner_id=\" + .runner_id + \"\u001b[0m \" + .message" + "source": ".message, err = \"\u001b[2m\" + \"actor_id=\" + .actor_id + \"\u001b[0m \" + .message" } }, "sinks": { - "runner_logs": { + "actor_logs": { "type": "console", "inputs": [ "add_prefix" diff --git a/packages/edge/infra/guard/core/src/proxy_service.rs b/packages/edge/infra/guard/core/src/proxy_service.rs index bdf3031426..1217366614 100644 --- a/packages/edge/infra/guard/core/src/proxy_service.rs +++ b/packages/edge/infra/guard/core/src/proxy_service.rs @@ -327,8 +327,8 @@ pub struct ProxyState { routing_fn: RoutingFn, middleware_fn: MiddlewareFn, route_cache: RouteCache, - rate_limiters: Cache<(Uuid, std::net::IpAddr), Arc>>, - in_flight_counters: Cache<(Uuid, std::net::IpAddr), Arc>>, + rate_limiters: Cache<(rivet_util::Id, std::net::IpAddr), Arc>>, + in_flight_counters: Cache<(rivet_util::Id, std::net::IpAddr), Arc>>, port_type: PortType, clickhouse_inserter: Option, } @@ -536,7 +536,7 @@ impl ProxyState { async fn check_rate_limit( &self, ip_addr: std::net::IpAddr, - actor_id: &Option, + actor_id: &Option, ) -> GlobalResult { let Some(actor_id) = *actor_id else { // No rate limiting when actor_id is None @@ -576,7 +576,7 @@ impl ProxyState { async fn acquire_in_flight( &self, ip_addr: std::net::IpAddr, - actor_id: &Option, + actor_id: &Option, ) -> GlobalResult { let Some(actor_id) = *actor_id else { // No in-flight limiting when actor_id is None @@ -613,7 +613,7 @@ impl ProxyState { } #[tracing::instrument(skip_all)] - async fn release_in_flight(&self, ip_addr: std::net::IpAddr, actor_id: &Option) { + async fn release_in_flight(&self, ip_addr: std::net::IpAddr, actor_id: &Option) { // Skip if actor_id is None (no in-flight tracking) let actor_id = match actor_id { Some(id) => *id, diff --git a/packages/edge/infra/guard/core/src/request_context.rs b/packages/edge/infra/guard/core/src/request_context.rs index d66229646e..d64851228f 100644 --- a/packages/edge/infra/guard/core/src/request_context.rs +++ b/packages/edge/infra/guard/core/src/request_context.rs @@ -67,7 +67,7 @@ pub struct RequestContext { pub service_response_http_expires: Option, pub service_response_http_last_modified: Option, pub service_response_status: Option, - pub service_actor_id: Option, + pub service_actor_id: Option, pub service_server_id: Option, // ClickHouse inserter handle @@ -191,7 +191,7 @@ impl RequestContext { .clone() .unwrap_or_default(), service_response_status: self.service_response_status.unwrap_or_default(), - service_actor_id: self.service_actor_id.unwrap_or_default().to_string(), + service_actor_id: self.service_actor_id.map(|x| x.to_string()).unwrap_or_default(), service_server_id: self.service_server_id.unwrap_or_default(), }; diff --git a/packages/edge/services/pegboard/src/keys/client.rs b/packages/edge/services/pegboard/src/keys/client.rs index 39a247962a..60cc6c09e8 100644 --- a/packages/edge/services/pegboard/src/keys/client.rs +++ b/packages/edge/services/pegboard/src/keys/client.rs @@ -345,6 +345,10 @@ impl Actor2Key { pub fn subspace(client_id: Uuid) -> Actor2SubspaceKey { Actor2SubspaceKey::new(client_id) } + + pub fn entire_subspace() -> ActorSubspaceKey { + ActorSubspaceKey::entire() + } } impl FormalKey for Actor2Key { @@ -389,12 +393,18 @@ impl<'de> TupleUnpack<'de> for Actor2Key { } pub struct Actor2SubspaceKey { - client_id: Uuid, + client_id: Option, } impl Actor2SubspaceKey { fn new(client_id: Uuid) -> Self { - Actor2SubspaceKey { client_id } + Actor2SubspaceKey { + client_id: Some(client_id), + } + } + + fn entire() -> Self { + Actor2SubspaceKey { client_id: None } } } @@ -404,8 +414,16 @@ impl TuplePack for Actor2SubspaceKey { w: &mut W, tuple_depth: TupleDepth, ) -> std::io::Result { - let t = (CLIENT, ACTOR2, self.client_id); - t.pack(w, tuple_depth) + let mut offset = VersionstampOffset::None { size: 0 }; + + let t = (CLIENT, ACTOR2); + offset += t.pack(w, tuple_depth)?; + + if let Some(client_id) = &self.client_id { + offset += client_id.pack(w, tuple_depth)?; + } + + Ok(offset) } } diff --git a/packages/edge/services/pegboard/src/ops/actor/log/read.rs b/packages/edge/services/pegboard/src/ops/actor/log/read.rs index 90d2dc1bc1..0fdf533216 100644 --- a/packages/edge/services/pegboard/src/ops/actor/log/read.rs +++ b/packages/edge/services/pegboard/src/ops/actor/log/read.rs @@ -157,7 +157,5 @@ pub async fn pegboard_actor_log_read(ctx: &OperationCtx, input: &Input) -> Globa let entries = query_builder.fetch_all::().await?; // New actors first - Ok(Output { - entries, - }) + Ok(Output { entries }) } diff --git a/packages/edge/services/pegboard/src/protocol.rs b/packages/edge/services/pegboard/src/protocol.rs index 94bbea7eef..ea68b8289e 100644 --- a/packages/edge/services/pegboard/src/protocol.rs +++ b/packages/edge/services/pegboard/src/protocol.rs @@ -131,8 +131,7 @@ pub struct Image { /// Direct S3 url to download the image from without ATS. pub fallback_artifact_url: Option, /// Size in bytes of the artifact. - #[serde(default)] - pub artifact_size_bytes: u64, + pub artifact_size: u64, pub kind: ImageKind, pub compression: ImageCompression, pub allocation_type: ImageAllocationType, @@ -147,8 +146,8 @@ pub enum ImageKind { } impl From for ImageKind { - fn from(kind: build::types::BuildKind) -> Self { - match kind { + fn from(value: build::types::BuildKind) -> Self { + match value { build::types::BuildKind::DockerImage => ImageKind::DockerImage, build::types::BuildKind::OciBundle => ImageKind::OciBundle, build::types::BuildKind::JavaScript => ImageKind::JavaScript, @@ -171,8 +170,8 @@ pub enum ImageCompression { } impl From for ImageCompression { - fn from(compression: build::types::BuildCompression) -> Self { - match compression { + fn from(value: build::types::BuildCompression) -> Self { + match value { build::types::BuildCompression::None => ImageCompression::None, build::types::BuildCompression::Lz4 => ImageCompression::Lz4, } diff --git a/packages/edge/services/pegboard/src/workflows/actor/runtime.rs b/packages/edge/services/pegboard/src/workflows/actor/runtime.rs index 53baa814c0..9ad5454904 100644 --- a/packages/edge/services/pegboard/src/workflows/actor/runtime.rs +++ b/packages/edge/services/pegboard/src/workflows/actor/runtime.rs @@ -8,7 +8,6 @@ use foundationdb::{ options::{ConflictRangeType, StreamingMode}, }; use futures_util::{FutureExt, TryStreamExt}; -use rivet_api::models::actors_endpoint_type; use sqlx::Acquire; use super::{ @@ -686,11 +685,13 @@ pub async fn spawn_actor( id: actor_setup.image_id, artifact_url_stub: actor_setup.artifact_url_stub.clone(), fallback_artifact_url: actor_setup.fallback_artifact_url.clone(), - artifact_size_bytes: actor_setup.artifact_size_bytes, kind: actor_setup.meta.build_kind.into(), compression: actor_setup.meta.build_compression.into(), // Always single, this is the old actor wf allocation_type: protocol::ImageAllocationType::Single, + + // Calculated on the manager for old actors + artifact_size: 0, }, root_user_enabled: input.root_user_enabled, env: input.environment.clone(), diff --git a/packages/edge/services/pegboard/src/workflows/actor/setup.rs b/packages/edge/services/pegboard/src/workflows/actor/setup.rs index 90d4df9557..b7171265bf 100644 --- a/packages/edge/services/pegboard/src/workflows/actor/setup.rs +++ b/packages/edge/services/pegboard/src/workflows/actor/setup.rs @@ -401,36 +401,6 @@ async fn insert_db(ctx: &ActivityCtx, input: &InsertDbInput) -> GlobalResult GlobalResult<()> { - let pool = ctx.sqlite().await?; - - sql_execute!( - [ctx, pool] - " - UPDATE state - SET - project_id = ?, - build_kind = ?, - build_compression = ?, - root_user_enabled = ? - ", - input.meta.project_id, - input.meta.build_kind as i64, - input.meta.build_compression as i64, - input.root_user_enabled, - ) - .await?; - - Ok(()) -} - #[derive(Debug, Clone, Serialize, Deserialize, Hash)] struct InsertFdbInput { actor_id: Uuid, @@ -543,6 +513,36 @@ pub async fn get_meta(ctx: &ActivityCtx, input: &GetMetaInput) -> GlobalResult GlobalResult<()> { + let pool = ctx.sqlite().await?; + + sql_execute!( + [ctx, pool] + " + UPDATE state + SET + project_id = ?, + build_kind = ?, + build_compression = ?, + root_user_enabled = ? + ", + input.meta.project_id, + input.meta.build_kind as i64, + input.meta.build_compression as i64, + input.root_user_enabled, + ) + .await?; + + Ok(()) +} + pub enum SetupCtx { Init { network_ports: util::serde::HashableMap, @@ -559,7 +559,6 @@ pub struct ActorSetupCtx { pub resources: protocol::Resources, pub artifact_url_stub: String, pub fallback_artifact_url: Option, - pub artifact_size_bytes: u64, } pub async fn setup( @@ -631,7 +630,6 @@ pub async fn setup( resources, artifact_url_stub: artifacts_res.artifact_url_stub, fallback_artifact_url: artifacts_res.fallback_artifact_url, - artifact_size_bytes: artifacts_res.artifact_size_bytes, }) } @@ -710,7 +708,7 @@ struct ResolveArtifactsOutput { artifact_url_stub: String, fallback_artifact_url: Option, #[serde(default)] - artifact_size_bytes: u64, + artifact_size: u64, } #[activity(ResolveArtifacts)] @@ -748,7 +746,7 @@ async fn resolve_artifacts( let addr_str = presigned_req.uri().to_string(); tracing::debug!(addr = %addr_str, "resolved artifact s3 presigned request"); - Some(addr_str) + addr_str }; // Get the artifact size @@ -757,7 +755,6 @@ async fn resolve_artifacts( }) .await?; let upload = unwrap!(uploads_res.uploads.first()); - let artifact_size_bytes = upload.content_length; Ok(ResolveArtifactsOutput { artifact_url_stub: crate::util::image_artifact_url_stub( @@ -765,7 +762,7 @@ async fn resolve_artifacts( input.build_upload_id, &input.build_file_name, )?, - fallback_artifact_url, - artifact_size_bytes, + fallback_artifact_url: Some(fallback_artifact_url), + artifact_size: upload.content_length, }) } diff --git a/packages/edge/services/pegboard/src/workflows/actor2/analytics.rs b/packages/edge/services/pegboard/src/workflows/actor2/analytics.rs new file mode 100644 index 0000000000..36c684c82d --- /dev/null +++ b/packages/edge/services/pegboard/src/workflows/actor2/analytics.rs @@ -0,0 +1,313 @@ +use chirp_workflow::prelude::*; +use std::collections::HashMap; + +#[derive(Debug, Serialize, Deserialize, Hash)] +pub struct InsertClickHouseInput { + pub actor_id: util::Id, +} + +/// Row to be inserted in to ClickHouse +#[derive(Serialize)] +pub struct ActorClickHouseRow { + actor_id: String, + project_id: Uuid, + env_id: Uuid, + datacenter_id: Uuid, + tags: HashMap, + /// Alias of image_id + build_id: Uuid, + build_kind: i64, + build_compression: i64, + network_mode: i64, + network_ports: HashMap, + network_ports_ingress: HashMap, + network_ports_host: HashMap, + network_ports_proxied: HashMap, + client_id: Uuid, + client_wan_hostname: String, + selected_cpu_millicores: u32, + selected_memory_mib: u32, + root_user_enabled: bool, + env_vars: i64, + env_var_bytes: i64, + args: i64, + args_bytes: i64, + durable: bool, + kill_timeout: i64, + cpu_millicores: i64, + memory_mib: i64, + /// Used in ORDER BY for replacing the key so this must never change. + created_at: i64, + /// This will not be set until after the actor is destroyed because we only insert in to + /// ClickHouse after start & destroy. + /// + /// 0 = not set + started_at: i64, + /// See `started_at`. + connectable_at: i64, + /// See `started_at`. + finished_at: i64, + /// This column is used for configuring the TTL of the actor. + /// + /// 0 = not set + destroyed_at: i64, + row_updated_at: i64, +} + +#[derive(Serialize)] +pub struct ActorClickHouseRowPort { + /// Will be 0 if not configured + internal_port: u16, + routing_guard: bool, + routing_host: bool, + routing_guard_protocol: i64, + routing_host_protocol: i64, +} + +#[derive(Serialize)] +pub struct ActorClickHouseRowPortIngress { + port_number: u16, + ingress_port_number: u16, + protocol: i64, +} + +#[derive(Serialize)] +pub struct ActorClickHouseRowPortHost { + port_number: u16, + protocol: i64, +} + +#[derive(Serialize)] +pub struct ActorClickHouseRowPortProxied { + ip: String, + source: i64, +} + +/// State row to select from SQLite +#[derive(sqlx::FromRow)] +struct StateRow { + project_id: Uuid, + env_id: Uuid, + tags: sqlx::types::Json>, + resources_cpu_millicores: i64, + resources_memory_mib: i64, + selected_resources_cpu_millicores: Option, + selected_resources_memory_mib: Option, + client_id: Option, + client_workflow_id: Option, + client_wan_hostname: Option, + lifecycle_kill_timeout_ms: i64, + lifecycle_durable: bool, + create_ts: i64, + start_ts: Option, + connectable_ts: Option, + finish_ts: Option, + destroy_ts: Option, + image_id: Uuid, + build_kind: i64, + build_compression: i64, + root_user_enabled: bool, + args: sqlx::types::Json>, + network_mode: i64, + environment: sqlx::types::Json>, +} + +/// This activity is idempotent and will upsert the actor row. If we want to change the data in +/// ClickHouse, we need to use this. This gets inserted in to a ReplacingMergeTree so it's safe to +/// update frequently. +#[activity(InsertClickHouse)] +pub async fn insert_clickhouse( + ctx: &ActivityCtx, + input: &InsertClickHouseInput, +) -> GlobalResult<()> { + let Ok(inserter) = ctx.clickhouse_inserter().await else { + return Ok(()); + }; + + let dc_id = ctx.config().server()?.rivet.edge()?.datacenter_id; + + // Read extra information + let pool = ctx.sqlite().await?; + + // Read state + let state_row = sql_fetch_one!( + [ctx, StateRow, &pool] + " + SELECT + project_id, + env_id, + json(tags) AS tags, + resources_cpu_millicores, + resources_memory_mib, + selected_resources_cpu_millicores, + selected_resources_memory_mib, + client_id, + client_workflow_id, + client_wan_hostname, + lifecycle_kill_timeout_ms, + lifecycle_durable, + create_ts, + start_ts, + connectable_ts, + finish_ts, + destroy_ts, + image_id, + build_kind, + build_compression, + root_user_enabled, + json(args) AS args, + network_mode, + json(environment) AS environment + FROM state + ", + ) + .await?; + + // Read network ports from SQLite tables + let network_ports_data = sql_fetch_all!( + [ctx, (String, Option, i64, String), &pool] + " + SELECT port_name, port_number, protocol, 'ingress' as routing_type FROM ports_ingress + UNION ALL + SELECT port_name, port_number, protocol, 'host' as routing_type FROM ports_host + ", + ) + .await?; + + let network_ports: HashMap = network_ports_data + .into_iter() + .map(|(name, port_number, protocol, routing_type)| { + let (routing_guard, routing_host, routing_guard_protocol, routing_host_protocol) = + match routing_type.as_str() { + "ingress" => (true, false, protocol as i64, 0), + "host" => (false, true, 0, protocol as i64), + _ => (false, false, 0, 0), + }; + + ( + name, + ActorClickHouseRowPort { + internal_port: port_number.unwrap_or_default() as u16, + routing_guard, + routing_host, + routing_guard_protocol, + routing_host_protocol, + }, + ) + }) + .collect(); + + // Read ingress ports + let ingress_ports = sql_fetch_all!( + [ctx, (String, Option, i64, i64), &pool] + "SELECT port_name, port_number, ingress_port_number, protocol FROM ports_ingress", + ) + .await? + .into_iter() + .map(|(name, port_number, ingress_port_number, protocol)| { + ( + name, + ActorClickHouseRowPortIngress { + port_number: port_number.unwrap_or_default() as u16, + ingress_port_number: ingress_port_number as u16, + protocol: protocol as i64, + }, + ) + }) + .collect::>(); + + // Read host ports + let host_ports = sql_fetch_all!( + [ctx, (String, Option, i64), &pool] + "SELECT port_name, port_number, protocol FROM ports_host", + ) + .await? + .into_iter() + .map(|(name, port_number, protocol)| { + ( + name, + ActorClickHouseRowPortHost { + port_number: port_number.unwrap_or_default() as u16, + protocol: protocol as i64, + }, + ) + }) + .collect::>(); + + // Read proxied ports + let proxied_ports = sql_fetch_all!( + [ctx, (String, String, i64), &pool] + "SELECT port_name, ip, source FROM ports_proxied", + ) + .await? + .into_iter() + .map(|(name, ip, source)| { + ( + name, + ActorClickHouseRowPortProxied { + ip, + source: source as i64, + }, + ) + }) + .collect::>(); + + inserter.insert( + "db_pegboard_analytics", + "actors", + ActorClickHouseRow { + actor_id: input.actor_id.to_string(), + project_id: state_row.project_id, + env_id: state_row.env_id, + datacenter_id: dc_id, + tags: state_row.tags.0, + build_id: state_row.image_id, + build_kind: state_row.build_kind, + build_compression: state_row.build_compression, + network_mode: state_row.network_mode as i64, + network_ports, + network_ports_ingress: ingress_ports, + network_ports_host: host_ports, + network_ports_proxied: proxied_ports, + client_id: state_row.client_id.unwrap_or_default(), + client_wan_hostname: state_row.client_wan_hostname.unwrap_or_default(), + selected_cpu_millicores: state_row + .selected_resources_cpu_millicores + .unwrap_or_default() as u32, + selected_memory_mib: state_row.selected_resources_memory_mib.unwrap_or_default() as u32, + root_user_enabled: state_row.root_user_enabled, + env_vars: state_row.environment.len() as i64, + env_var_bytes: state_row + .environment + .iter() + .map(|(k, v)| k.len() + v.len()) + .sum::() as i64, + args: state_row.args.len() as i64, + args_bytes: state_row.args.iter().map(|arg| arg.len()).sum::() as i64, + durable: state_row.lifecycle_durable, + kill_timeout: state_row.lifecycle_kill_timeout_ms, + cpu_millicores: state_row.resources_cpu_millicores, + memory_mib: state_row.resources_memory_mib, + created_at: state_row.create_ts * 1_000_000, // Convert ms to ns for ClickHouse DateTime64(9) + started_at: state_row + .start_ts + .map(|ts| ts * 1_000_000) + .unwrap_or_default(), + connectable_at: state_row + .connectable_ts + .map(|ts| ts * 1_000_000) + .unwrap_or_default(), + finished_at: state_row + .finish_ts + .map(|ts| ts * 1_000_000) + .unwrap_or_default(), + destroyed_at: state_row + .destroy_ts + .map(|ts| ts * 1_000_000) + .unwrap_or_default(), + row_updated_at: util::timestamp::now() * 1_000_000, // Convert ms to ns for ClickHouse DateTime64(9) + }, + )?; + + Ok(()) +} diff --git a/packages/edge/services/pegboard/src/workflows/actor2/destroy.rs b/packages/edge/services/pegboard/src/workflows/actor2/destroy.rs index ab4427299a..3a5d3c9cbe 100644 --- a/packages/edge/services/pegboard/src/workflows/actor2/destroy.rs +++ b/packages/edge/services/pegboard/src/workflows/actor2/destroy.rs @@ -4,7 +4,7 @@ use fdb_util::{end_of_key_range, FormalKey, SERIALIZABLE}; use foundationdb::{self as fdb, options::ConflictRangeType}; use nix::sys::signal::Signal; -use super::{DestroyComplete, DestroyStarted}; +use super::{analytics::InsertClickHouseInput, DestroyComplete, DestroyStarted}; use crate::{keys, protocol, types::GameGuardProtocol}; #[derive(Debug, Serialize, Deserialize)] @@ -75,6 +75,13 @@ pub(crate) async fn pegboard_actor_destroy( } } + // Update ClickHouse analytics with destroyed timestamp + ctx.v(2) + .activity(InsertClickHouseInput { + actor_id: input.actor_id, + }) + .await?; + ctx.msg(DestroyComplete {}) .tag("actor_id", input.actor_id) .send() diff --git a/packages/edge/services/pegboard/src/workflows/actor2/mod.rs b/packages/edge/services/pegboard/src/workflows/actor2/mod.rs index fe6c383843..9c15240156 100644 --- a/packages/edge/services/pegboard/src/workflows/actor2/mod.rs +++ b/packages/edge/services/pegboard/src/workflows/actor2/mod.rs @@ -1,15 +1,14 @@ -use std::collections::HashMap; - +use analytics::InsertClickHouseInput; use chirp_workflow::prelude::*; use destroy::KillCtx; use futures_util::FutureExt; -use util::serde::AsHashableExt; use crate::{ protocol, types::{ActorLifecycle, ActorResources, EndpointType, NetworkMode, Routing}, }; +mod analytics; pub mod destroy; mod migrations; mod runtime; @@ -26,20 +25,24 @@ const ACTOR_START_THRESHOLD_MS: i64 = util::duration::seconds(30); const ACTOR_STOP_THRESHOLD_MS: i64 = util::duration::seconds(30); /// How long to wait after stopped and not receiving an exit state before setting actor as lost. const ACTOR_EXIT_THRESHOLD_MS: i64 = util::duration::seconds(5); +/// How long an actor goes without retries before it's retry count is reset to 0, effectively resetting its +/// backoff to 0. +const RETRY_RESET_DURATION_MS: i64 = util::duration::minutes(10); -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize, Hash)] pub struct Input { pub actor_id: util::Id, pub env_id: Uuid, - pub tags: HashMap, + pub tags: util::serde::HashableMap, + /// Used to override image resources. pub resources: Option, pub lifecycle: ActorLifecycle, pub image_id: Uuid, pub root_user_enabled: bool, pub args: Vec, pub network_mode: NetworkMode, - pub environment: HashMap, - pub network_ports: HashMap, + pub environment: util::serde::HashableMap, + pub network_ports: util::serde::HashableMap, pub endpoint_type: Option, } @@ -57,14 +60,14 @@ pub async fn pegboard_actor2(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResu let validation_res = ctx .activity(setup::ValidateInput { env_id: input.env_id, - tags: input.tags.as_hashable(), + tags: input.tags.clone(), resources: input.resources.clone(), image_id: input.image_id, root_user_enabled: input.root_user_enabled, args: input.args.clone(), network_mode: input.network_mode, - environment: input.environment.as_hashable(), - network_ports: input.network_ports.as_hashable(), + environment: input.environment.clone(), + network_ports: input.network_ports.clone(), }) .await?; @@ -82,7 +85,7 @@ pub async fn pegboard_actor2(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResu let network_ports = ctx .activity(setup::DisableTlsPortsInput { - network_ports: input.network_ports.as_hashable(), + network_ports: input.network_ports.clone(), }) .await?; @@ -120,6 +123,12 @@ pub async fn pegboard_actor2(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResu } }; + ctx.v(2) + .activity(InsertClickHouseInput { + actor_id: input.actor_id, + }) + .await?; + ctx.msg(CreateComplete {}) .tag("actor_id", input.actor_id) .send() @@ -146,6 +155,14 @@ pub async fn pegboard_actor2(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResu return Ok(()); }; + ctx.v(2) + .msg(Allocated { + client_id: allocate_res.client_id, + }) + .tag("actor_id", input.actor_id) + .send() + .await?; + let lifecycle_res = ctx .loope( runtime::State::new( @@ -262,6 +279,12 @@ pub async fn pegboard_actor2(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResu }) .await?; + ctx.v(2) + .activity(InsertClickHouseInput { + actor_id: input.actor_id, + }) + .await?; + if updated { ctx.msg(Ready {}) .tag("actor_id", input.actor_id) @@ -448,6 +471,11 @@ pub async fn pegboard_actor2(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResu #[message("pegboard_actor_create_complete")] pub struct CreateComplete {} +#[message("pegboard_actor_allocated")] +pub struct Allocated { + pub client_id: Uuid, +} + #[message("pegboard_actor_failed")] pub struct Failed { pub message: String, @@ -500,14 +528,3 @@ join_signal!(Main { Undrain, Destroy, }); - -// Stub definition -#[derive(Debug, Serialize, Deserialize, Hash)] -pub struct WaitForTraefikPollInput {} -#[activity(WaitForTraefikPoll)] -pub async fn wait_for_traefik_poll( - _ctx: &ActivityCtx, - _input: &WaitForTraefikPollInput, -) -> GlobalResult<()> { - Ok(()) -} diff --git a/packages/edge/services/pegboard/src/workflows/actor2/runtime.rs b/packages/edge/services/pegboard/src/workflows/actor2/runtime.rs index 3e0210f916..29e6040493 100644 --- a/packages/edge/services/pegboard/src/workflows/actor2/runtime.rs +++ b/packages/edge/services/pegboard/src/workflows/actor2/runtime.rs @@ -1,6 +1,6 @@ use std::time::Instant; -use build::types::{BuildAllocationType, BuildCompression, BuildKind}; +use build::types::{BuildAllocationType, BuildKind}; use chirp_workflow::prelude::*; use fdb_util::{end_of_key_range, FormalKey, SERIALIZABLE, SNAPSHOT}; use foundationdb::{ @@ -10,11 +10,11 @@ use foundationdb::{ use futures_util::{FutureExt, TryStreamExt}; use nix::sys::signal::Signal; use sqlx::Acquire; -use util::serde::AsHashableExt; use super::{ destroy::{self, KillCtx}, setup, Destroy, Input, ACTOR_START_THRESHOLD_MS, BASE_RETRY_TIMEOUT_MS, + RETRY_RESET_DURATION_MS, }; use crate::{ keys, metrics, @@ -28,11 +28,16 @@ use crate::{ pub struct State { pub generation: u32, pub runner_id: Uuid, + pub client_id: Uuid, pub client_workflow_id: Uuid, pub image_id: Uuid, + pub drain_timeout_ts: Option, pub gc_timeout_ts: Option, + + #[serde(default)] + reschedule_state: RescheduleState, } impl State { @@ -45,6 +50,7 @@ impl State { image_id, drain_timeout_ts: None, gc_timeout_ts: Some(util::timestamp::now() + ACTOR_START_THRESHOLD_MS), + reschedule_state: RescheduleState::default(), } } } @@ -55,6 +61,12 @@ pub struct LifecycleRes { pub kill: Option, } +#[derive(Serialize, Deserialize, Clone, Default)] +struct RescheduleState { + last_retry_ts: i64, + retry_count: usize, +} + #[derive(Debug, Serialize, Deserialize, Hash)] struct UpdateClientAndRunnerInput { client_id: Uuid, @@ -735,6 +747,19 @@ pub async fn insert_ports_fdb(ctx: &ActivityCtx, input: &InsertPortsFdbInput) -> Ok(()) } +#[derive(Debug, Serialize, Deserialize, Hash)] +struct CompareRetryInput { + last_retry_ts: i64, +} + +#[activity(CompareRetry)] +async fn compare_retry(ctx: &ActivityCtx, input: &CompareRetryInput) -> GlobalResult<(i64, bool)> { + let now = util::timestamp::now(); + + // If the last retry ts is more than RETRY_RESET_DURATION_MS, reset retry count + Ok((now, input.last_retry_ts < now - RETRY_RESET_DURATION_MS)) +} + /// Returns whether or not there was availability to spawn the actor. pub async fn spawn_actor( ctx: &mut WorkflowCtx, @@ -776,16 +801,14 @@ pub async fn spawn_actor( let image = protocol::Image { id: actor_setup.image_id, artifact_url_stub: actor_setup.artifact_url_stub.clone(), - fallback_artifact_url: actor_setup.fallback_artifact_url.clone(), + fallback_artifact_url: Some(actor_setup.fallback_artifact_url.clone()), + artifact_size: actor_setup.artifact_size, kind: match actor_setup.meta.build_kind { BuildKind::DockerImage => protocol::ImageKind::DockerImage, BuildKind::OciBundle => protocol::ImageKind::OciBundle, BuildKind::JavaScript => bail!("actors do not support js builds"), }, - compression: match actor_setup.meta.build_compression { - BuildCompression::None => protocol::ImageCompression::None, - BuildCompression::Lz4 => protocol::ImageCompression::Lz4, - }, + compression: actor_setup.meta.build_compression.into(), allocation_type: match actor_setup.meta.build_allocation_type { BuildAllocationType::None => bail!("actors do not support old builds"), BuildAllocationType::Single => protocol::ImageAllocationType::Single, @@ -839,7 +862,7 @@ pub async fn spawn_actor( image: image.clone(), root_user_enabled: input.root_user_enabled, resources: actor_setup.resources.clone(), - env: input.environment.as_hashable(), + env: input.environment.clone(), ports: ports.clone(), network_mode, }, @@ -849,11 +872,11 @@ pub async fn spawn_actor( runner_id: res.runner_id, }) }, - env: input.environment.as_hashable(), + env: input.environment.clone(), metadata: util::serde::Raw::new(&protocol::ActorMetadata { actor: protocol::ActorMetadataActor { actor_id: input.actor_id, - tags: input.tags.as_hashable(), + tags: input.tags.clone(), create_ts: ctx.ts(), }, network: Some(protocol::ActorMetadataNetwork { @@ -932,7 +955,7 @@ pub async fn reschedule_actor( // Waits for the actor to be ready (or destroyed) and automatically retries if failed to allocate. let res = ctx - .loope(RescheduleState::default(), |ctx, state| { + .loope(state.reschedule_state.clone(), |ctx, state| { let input = input.clone(); let actor_setup = actor_setup.clone(); @@ -941,14 +964,14 @@ pub async fn reschedule_actor( let mut backoff = util::Backoff::new_at(8, None, BASE_RETRY_TIMEOUT_MS, 500, state.retry_count); - // If the last retry ts is more than 2 * backoff ago, reset retry count to 0 - let now = util::timestamp::now(); - state.retry_count = - if state.last_retry_ts < now - i64::try_from(2 * backoff.current_duration())? { - 0 - } else { - state.retry_count + 1 - }; + let (now, reset) = ctx + .v(2) + .activity(CompareRetryInput { + last_retry_ts: state.last_retry_ts, + }) + .await?; + + state.retry_count = if reset { 0 } else { state.retry_count + 1 }; state.last_retry_ts = now; // Don't sleep for first retry @@ -967,7 +990,7 @@ pub async fn reschedule_actor( } if let Some(res) = spawn_actor(ctx, &input, &actor_setup, next_generation).await? { - Ok(Loop::Break(Ok(res))) + Ok(Loop::Break(Ok((state.clone(), res)))) } else { tracing::debug!(actor_id=?input.actor_id, "failed to reschedule actor, retrying"); @@ -980,12 +1003,15 @@ pub async fn reschedule_actor( // Update loop state match res { - Ok(res) => { + Ok((reschedule_state, res)) => { state.generation = next_generation; state.runner_id = res.runner_id; state.client_id = res.client_id; state.client_workflow_id = res.client_workflow_id; + // Save reschedule state in global state + state.reschedule_state = reschedule_state; + // Reset gc timeout once allocated state.gc_timeout_ts = Some(util::timestamp::now() + ACTOR_START_THRESHOLD_MS); @@ -995,12 +1021,6 @@ pub async fn reschedule_actor( } } -#[derive(Serialize, Deserialize, Default)] -struct RescheduleState { - last_retry_ts: i64, - retry_count: usize, -} - #[derive(Debug, Serialize, Deserialize, Hash)] struct ClearPortsAndResourcesInput { actor_id: util::Id, diff --git a/packages/edge/services/pegboard/src/workflows/actor2/setup.rs b/packages/edge/services/pegboard/src/workflows/actor2/setup.rs index dc9d992d8a..3b52269027 100644 --- a/packages/edge/services/pegboard/src/workflows/actor2/setup.rs +++ b/packages/edge/services/pegboard/src/workflows/actor2/setup.rs @@ -1,10 +1,14 @@ use build::types::{BuildAllocationType, BuildCompression, BuildKind, BuildResources}; use chirp_workflow::prelude::*; use cluster::types::BuildDeliveryMethod; -use fdb_util::FormalKey; -use foundationdb as fdb; +use fdb_util::{end_of_key_range, FormalKey, SNAPSHOT}; +use foundationdb::{ + self as fdb, + options::{ConflictRangeType, StreamingMode}, +}; +use futures_util::TryStreamExt; +use rand::Rng; use sqlx::Acquire; -use util::serde::AsHashableExt; use super::{Input, Port}; use crate::{ @@ -291,18 +295,15 @@ struct InsertDbInput { args: Vec, network_mode: NetworkMode, environment: util::serde::HashableMap, - network_ports: util::serde::HashableMap, } #[activity(InsertDb)] async fn insert_db(ctx: &ActivityCtx, input: &InsertDbInput) -> GlobalResult { let pool = ctx.sqlite().await?; - let mut conn = pool.conn().await?; - let mut tx = conn.begin().await?; let create_ts = ctx.ts(); sql_execute!( - [ctx, @tx &mut tx] + [ctx, &pool] " INSERT INTO state ( env_id, @@ -333,6 +334,25 @@ async fn insert_db(ctx: &ActivityCtx, input: &InsertDbInput) -> GlobalResult, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct AllocateIngressPortsOutput { + ports: Vec<(GameGuardProtocol, Vec)>, +} + +#[activity(AllocateIngressPorts)] +async fn allocate_ingress_ports( + ctx: &ActivityCtx, + input: &AllocateIngressPortsInput, +) -> GlobalResult { // Count up ports per protocol let mut port_counts = Vec::new(); for (_, port) in &input.network_ports { @@ -353,7 +373,8 @@ async fn insert_db(ctx: &ActivityCtx, input: &InsertDbInput) -> GlobalResult GlobalResult { + return Err(fdb::FdbBindingError::CustomError( + "Dynamic allocation not implemented for http/https ports".into(), + )); + } + GameGuardProtocol::Tcp | GameGuardProtocol::TcpTls => { + gg_config.min_ingress_port_tcp()..=gg_config.max_ingress_port_tcp() + } + GameGuardProtocol::Udp => { + gg_config.min_ingress_port_udp()..=gg_config.max_ingress_port_udp() + } + }; + + let mut last_port = None; + let mut ports = Vec::new(); + + // Choose a random starting port for better spread and less cache hits + let mut start = { + // It is important that we don't start at the end of the range so that the logic with + // `last_port` works correctly + let exclusive_port_range = *port_range.start()..*port_range.end(); + rand::thread_rng().gen_range(exclusive_port_range) + }; + + // Build start and end keys for ingress ports subspace + let start_key = keys::subspace() + .subspace(&keys::port::IngressKey2::subspace(*protocol, start)) + .range() + .0; + let end_key = keys::subspace() + .subspace(&keys::port::IngressKey2::subspace( + *protocol, + *port_range.end(), + )) + .range() + .1; + let mut stream = tx.get_ranges_keyvalues( + fdb::RangeOption { + mode: StreamingMode::Iterator, + ..(start_key, end_key.clone()).into() + }, + // NOTE: This is not SERIALIZABLE because we don't want to conflict with all of the keys, + // just the one we choose + SNAPSHOT, + ); + + // Continue iterating over the same stream until all of the required ports are found + for _ in 0..*count { + // Iterate through the subspace range until a port is found + let port = loop { + let Some(entry) = stream.try_next().await? else { + match last_port { + Some(port) if port == *port_range.end() => { + // End of range reached, start a new range read from the beginning (wrap around) + if start != *port_range.start() { + last_port = None; + + let old_start = start; + start = *port_range.start(); + + let start_key = keys::subspace() + .subspace(&keys::port::IngressKey2::subspace( + *protocol, start, + )) + .range() + .0; + stream = tx.get_ranges_keyvalues( + fdb::RangeOption { + mode: StreamingMode::Iterator, + limit: Some(old_start as usize), + ..(start_key, end_key.clone()).into() + }, + // NOTE: This is not SERIALIZABLE because we don't want to conflict + // with all of the keys, just the one we choose + SNAPSHOT, + ); + + continue; + } else { + break None; + } + } + // Return port after last port + Some(last_port) => { + break Some(last_port + 1); + } + // No ports were returned (range is empty) + None => { + break Some(start); + } + } + }; + + let key = keys::subspace() + .unpack::(entry.key()) + .map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?; + let current_port = key.port; + + if let Some(last_port) = last_port { + // Gap found + if current_port != last_port + 1 { + break Some(last_port + 1); + } + } + + last_port = Some(current_port); + }; + + let Some(port) = port else { + return Err(fdb::FdbBindingError::CustomError( + format!("not enough {protocol} ports available").into(), + )); + }; + + let ingress_port_key = + keys::port::IngressKey2::new(*protocol, port, input.actor_id); + let ingress_port_key_buf = keys::subspace().pack(&ingress_port_key); + + // Add read conflict only for this key + tx.add_conflict_range( + &ingress_port_key_buf, + &end_of_key_range(&ingress_port_key_buf), + ConflictRangeType::Read, + )?; + + // Set key + tx.set( + &ingress_port_key_buf, + &ingress_port_key + .serialize(()) + .map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?, + ); + + ports.push(port); + } + + results.push((*protocol, ports)); + } + + Ok(results) + } }) + .custom_instrument(tracing::info_span!("allocate_ingress_ports_tx")) .await?; - let mut ingress_ports = ingress_ports_res - .ports - .into_iter() - .map(|(protocol, ports)| (protocol, ports.into_iter())) - .collect::>(); + + Ok(AllocateIngressPortsOutput { ports }) +} + +#[derive(Debug, Clone, Serialize, Deserialize, Hash)] +struct InsertIngressPortsInput { + actor_id: util::Id, + network_ports: util::serde::HashableMap, + ingress_ports: Vec<(GameGuardProtocol, Vec)>, +} + +#[activity(InsertIngressPorts)] +async fn insert_ingress_ports( + ctx: &ActivityCtx, + input: &InsertIngressPortsInput, +) -> GlobalResult<()> { + let pool = ctx.sqlite().await?; + let mut conn = pool.conn().await?; + let mut tx = conn.begin().await?; let gg_config = &ctx.config().server()?.rivet.guard; + let mut ingress_ports = input + .ingress_ports + .iter() + .map(|(protocol, ports)| (protocol, ports.clone().into_iter())) + .collect::>(); + for (name, port) in input.network_ports.iter() { match port.routing { Routing::GameGuard { protocol } => { @@ -397,7 +588,7 @@ async fn insert_db(ctx: &ActivityCtx, input: &InsertDbInput) -> GlobalResult gg_config.https_port(), GameGuardProtocol::Tcp | GameGuardProtocol::TcpTls | GameGuardProtocol::Udp => { let (_, ports_iter) = unwrap!( - ingress_ports.iter_mut().find(|(p, _)| &protocol == p) + ingress_ports.iter_mut().find(|(p, _)| &&protocol == p) ); unwrap!(ports_iter.next(), "missing ingress port") }, @@ -427,7 +618,7 @@ async fn insert_db(ctx: &ActivityCtx, input: &InsertDbInput) -> GlobalResult GlobalResult GlobalResult<()> { + let pool = ctx.sqlite().await?; + + sql_execute!( + [ctx, pool] + " + UPDATE state + SET + project_id = ?, + build_kind = ?, + build_compression = ?, + root_user_enabled = ? + ", + input.project_id, + input.build_kind as i64, + input.build_compression as i64, + input.root_user_enabled, + ) + .await?; + + Ok(()) +} + pub enum SetupCtx { Init { network_ports: util::serde::HashableMap, @@ -563,7 +786,9 @@ pub struct ActorSetupCtx { pub meta: GetMetaOutput, pub resources: protocol::Resources, pub artifact_url_stub: String, - pub fallback_artifact_url: Option, + pub fallback_artifact_url: String, + /// Bytes. + pub artifact_size: u64, } pub async fn setup( @@ -573,7 +798,7 @@ pub async fn setup( ) -> GlobalResult { let image_id = match setup { SetupCtx::Init { network_ports } => { - let tags = input.tags.as_hashable(); + let tags = input.tags.clone(); let create_ts = ctx .activity(InsertDbInput { actor_id: input.actor_id, @@ -584,11 +809,24 @@ pub async fn setup( image_id: input.image_id, args: input.args.clone(), network_mode: input.network_mode, - environment: input.environment.as_hashable(), - network_ports, + environment: input.environment.clone(), + }) + .await?; + + let ingress_ports_res = ctx + .activity(AllocateIngressPortsInput { + actor_id: input.actor_id, + network_ports: network_ports.clone(), }) .await?; + ctx.activity(InsertIngressPortsInput { + actor_id: input.actor_id, + network_ports, + ingress_ports: ingress_ports_res.ports, + }) + .await?; + ctx.activity(InsertFdbInput { actor_id: input.actor_id, env_id: input.env_id, @@ -609,6 +847,15 @@ pub async fn setup( }) .await?; + ctx.v(2) + .activity(InsertMetaInput { + project_id: meta.project_id, + build_kind: meta.build_kind, + build_compression: meta.build_compression, + root_user_enabled: input.root_user_enabled, + }) + .await?; + // Use resources from build or from actor config let resources = match meta.build_allocation_type { BuildAllocationType::None => bail!("actors do not support old builds"), @@ -647,6 +894,7 @@ pub async fn setup( resources, artifact_url_stub: artifacts_res.artifact_url_stub, fallback_artifact_url: artifacts_res.fallback_artifact_url, + artifact_size: artifacts_res.artifact_size, }) } @@ -723,7 +971,9 @@ struct ResolveArtifactsInput { #[derive(Debug, Serialize, Deserialize, Hash)] struct ResolveArtifactsOutput { artifact_url_stub: String, - fallback_artifact_url: Option, + fallback_artifact_url: String, + /// Bytes. + artifact_size: u64, } #[activity(ResolveArtifacts)] @@ -731,40 +981,45 @@ async fn resolve_artifacts( ctx: &ActivityCtx, input: &ResolveArtifactsInput, ) -> GlobalResult { - let fallback_artifact_url = - if let BuildDeliveryMethod::S3Direct = input.dc_build_delivery_method { - tracing::debug!("using s3 direct delivery"); - - // Build client - let s3_client = s3_util::Client::with_bucket_and_endpoint( - ctx.config(), - "bucket-build", - s3_util::EndpointKind::EdgeInternal, + // Get the fallback URL + let fallback_artifact_url = { + tracing::debug!("using s3 direct delivery"); + + // Build client + let s3_client = s3_util::Client::with_bucket_and_endpoint( + ctx.config(), + "bucket-build", + s3_util::EndpointKind::EdgeInternal, + ) + .await?; + + let presigned_req = s3_client + .get_object() + .bucket(s3_client.bucket()) + .key(format!( + "{upload_id}/{file_name}", + upload_id = input.build_upload_id, + file_name = input.build_file_name, + )) + .presigned( + s3_util::aws_sdk_s3::presigning::PresigningConfig::builder() + .expires_in(std::time::Duration::from_secs(15 * 60)) + .build()?, ) .await?; - let presigned_req = s3_client - .get_object() - .bucket(s3_client.bucket()) - .key(format!( - "{upload_id}/{file_name}", - upload_id = input.build_upload_id, - file_name = input.build_file_name, - )) - .presigned( - s3_util::aws_sdk_s3::presigning::PresigningConfig::builder() - .expires_in(std::time::Duration::from_secs(15 * 60)) - .build()?, - ) - .await?; + let addr_str = presigned_req.uri().to_string(); + tracing::debug!(addr = %addr_str, "resolved artifact s3 presigned request"); - let addr_str = presigned_req.uri().to_string(); - tracing::debug!(addr = %addr_str, "resolved artifact s3 presigned request"); + addr_str + }; - Some(addr_str) - } else { - None - }; + // Get the artifact size + let uploads_res = op!([ctx] upload_get { + upload_ids: vec![input.build_upload_id.into()], + }) + .await?; + let upload = unwrap!(uploads_res.uploads.first()); Ok(ResolveArtifactsOutput { artifact_url_stub: crate::util::image_artifact_url_stub( @@ -773,5 +1028,6 @@ async fn resolve_artifacts( &input.build_file_name, )?, fallback_artifact_url, + artifact_size: upload.content_length, }) } diff --git a/packages/edge/services/pegboard/standalone/usage-metrics-publish/src/lib.rs b/packages/edge/services/pegboard/standalone/usage-metrics-publish/src/lib.rs index d3ccdbb824..636cc46e8b 100644 --- a/packages/edge/services/pegboard/standalone/usage-metrics-publish/src/lib.rs +++ b/packages/edge/services/pegboard/standalone/usage-metrics-publish/src/lib.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; -use build::types::BuildKind; +use build::types::{BuildAllocationType, BuildKind}; use chirp_workflow::prelude::*; use fdb_util::SNAPSHOT; use foundationdb::{self as fdb, options::StreamingMode}; @@ -57,22 +57,37 @@ pub async fn run_from_env( .run(|tx, _mc| async move { let actor_subspace = keys::subspace().subspace(&keys::client::ActorKey::entire_subspace()); + let actor2_subspace = + keys::subspace().subspace(&keys::client::Actor2Key::entire_subspace()); tx.get_ranges_keyvalues( fdb::RangeOption { mode: StreamingMode::WantAll, - ..(&actor_subspace).into() + ..(&actor2_subspace).into() }, // Not serializable because we don't want to interfere with normal operations SNAPSHOT, ) + .chain(tx.get_ranges_keyvalues( + fdb::RangeOption { + mode: StreamingMode::WantAll, + ..(&actor_subspace).into() + }, + // Not serializable because we don't want to interfere with normal operations + SNAPSHOT, + )) .map(|res| match res { Ok(entry) => { - let key = keys::subspace() - .unpack::(entry.key()) - .map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?; - - Ok(key.actor_id) + if let Ok(key) = keys::subspace().unpack::(entry.key()) + { + Ok(key.actor_id) + } else { + let key = keys::subspace() + .unpack::(entry.key()) + .map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?; + + Ok(util::Id::from(key.actor_id)) + } } Err(err) => Err(Into::::into(err)), }) @@ -117,17 +132,36 @@ pub async fn run_from_env( continue; }; - let client_flavor = match build.kind { - BuildKind::DockerImage | BuildKind::OciBundle => protocol::ClientFlavor::Container, - BuildKind::JavaScript => protocol::ClientFlavor::Isolate, + let client_flavor = match build.allocation_type { + BuildAllocationType::None => match build.kind { + BuildKind::DockerImage | BuildKind::OciBundle => protocol::ClientFlavor::Container, + BuildKind::JavaScript => protocol::ClientFlavor::Isolate, + }, + BuildAllocationType::Single | BuildAllocationType::Multi => { + protocol::ClientFlavor::Multi + } }; let env_usage = usage_by_env_and_flavor .entry((actor.env_id, client_flavor)) .or_insert(Usage { cpu: 0, memory: 0 }); - env_usage.cpu += actor.resources.cpu_millicores as u64; - env_usage.memory += actor.resources.memory_mib as u64; + let resources = match build.allocation_type { + BuildAllocationType::None | BuildAllocationType::Single => { + if let Some(resources) = &actor.resources { + env_usage.cpu += resources.cpu_millicores as u64; + env_usage.memory += resources.memory_mib as u64; + } + } + BuildAllocationType::Multi => { + // TODO: This calculates multi runner metrics wrong, it should only add this for the runner + // not for each actor in the runner + // if let Some(resources) = build.resources { + // env_usage.cpu += build_resources.cpu_millicores as u64; + // env_usage.memory += build_resources.memory_mib as u64; + // } + } + }; } // Clear old metrics because they will never be set to 0 (due to no actors being present and thus no