From 56213bef120083cdc7245f61b9fe445dc7d59b79 Mon Sep 17 00:00:00 2001 From: Shams Date: Fri, 4 Oct 2019 00:31:45 -0500 Subject: [PATCH] adds support for references in the descriptor (#949) * adds support for references in the descriptor - references are now used to determine if a service is stale * avoids relatively expensive synchronize-fn invocation if reference already exists * renames direct-access to direct * adds timer metrics around synchronize-fn * adds documentation * changes descriptor reference to a map * addresses PR feedback * uses keywords instead of strings * updated documentation --- waiter/docs/service-references.md | 110 ++++++++ waiter/integration/waiter/basic_test.clj | 4 + .../integration/waiter/token_request_test.clj | 48 +++- waiter/src/waiter/core.clj | 50 ++-- waiter/src/waiter/handler.clj | 25 +- waiter/src/waiter/service_description.clj | 151 +++++++--- waiter/test/waiter/core_test.clj | 2 + waiter/test/waiter/descriptor_test.clj | 30 +- waiter/test/waiter/handler_test.clj | 38 ++- .../test/waiter/service_description_test.clj | 262 ++++++++++-------- 10 files changed, 524 insertions(+), 196 deletions(-) create mode 100644 waiter/docs/service-references.md diff --git a/waiter/docs/service-references.md b/waiter/docs/service-references.md new file mode 100644 index 000000000..b6c837b06 --- /dev/null +++ b/waiter/docs/service-references.md @@ -0,0 +1,110 @@ +# Service References + +To process an HTTP request, Waiter needs to resolve a request to a service description. +This service description tells Waiter how to locate the service instance to use to process the current request. + +Waiter uses request headers to construct the service description using the `ServiceDescriptionBuilder`. +It is possible for the _same_ service to be reachable from various combinations of headers: + 1. token - single or multiple + 1. on-the-fly services without tokens + 1. on-the-fly services that extend tokens, e.g. run-as-requester + +## Service GC + +By default, Waiter relies on the service idle timeout to GC services after periods of inactivity (not receiving requests). +However, services reachable only via references, e.g. tokens, can be GC-ed eagerly if the reference has been updated. + +**Note**: A service known to be referenced only via on-the-fly headers never goes stale. + +When a service description is constructed from a request, the set of references for a service are also updated. +Individual references are available in the `:reference-type->entry` key in the descriptor, + the descriptor being built per request. +It is possible to reference the same service using different combinations of on-the-fly headers and tokens. +As such, there is a set of references (the union of all `:reference-type->entry` from the descriptor) that + can be used to reference a service. + +### Reference (`:reference-type->entry` in the descriptor) examples + +The `reference-type->entry` is a map where the keys represent a `:type` parameter used during staleness checks. + +- A service created with on-the-fly header without tokens never goes stale. + It has an empty map as the value for `:reference-type->entry` in the descriptor: +``` + {} +``` + +- A service created with `x-waiter-token: foo` header on a request will have the + following value for `:reference-type->entry` in the descriptor: +``` + {:token {:sources [{:token "foo" :version "v0"}]}} +``` + +- A service created with `x-waiter-token: bar,baz` header on a request will have the + following value for `:reference-type->entry` in the descriptor: +``` + {:token {:sources [{:token "bar" :version "v1"} + {:token "baz" :version "v2"}]}} +``` + +As mentioned previously, as the same service can be accessed by another token, we end up + building a set of `reference-type->entry` maps as references that refer to the service. +E.g. if all the example requests above mapped to the same service, that service would have + the following references set: +``` + #{{} + {:token {:sources [{:token "foo" :version "v0"}]}} + {:token {:sources [{:token "bar" :version "v1"} + {:token "baz" :version "v2"}]}}} +``` + +## Staleness checks + +The service GC processes all known references for a service and marks the service as a candidate for eager GC + if _all_ references used to access the service are stale. +An individual reference, i.e. the `reference-type->entry` map computed in a request descriptor, + is stale if any of its value entries is stale. +This staleness check on the value is performed using the functions returned from + `retrieve-reference-type->stale-fn` of the builder and invoking the corresponding 'type' function on the value. + +**Note**: A service known to be directly referenced (i.e. has an empty map among its references) never goes stale. + +The default implementation of the `ServiceDescriptionBuilder` returns a map with a single entry for `:token` + from the `retrieve-reference-type->stale-fn`. +The provided `:token` staleness function deems services accessed via tokens to be stale if all tokens + used to access the service have been updated. + +Custom builder implementations can add additional reference types to services and + need to provide appropriate staleness check functions for each reference type. +E.g. a hypothetical implementation which treats the image parameter as docker images can + have the following entry for a service: +``` + {:image {:name "twosigma/kitchen" :build "20191001"} + :token {:sources [{"token" "foo" "version" "v1"}]}} +``` +The `retrieve-reference-type->stale-fn` must then provide an implementation for a function that + can check staleness of `:image` reference types. +E.g. if `retrieve-reference-type->stale-fn` returns: +``` + {:image check-image-for-staleness-fn + :token check-token-for-staleness-fn} +``` +and we have a service with the following set of references: +``` + #{{:image {:name "twosigma/courier" :build "20191002"}} + {:image {:name "twosigma/kitchen" :build "20191001"} + :token {:sources [{:token "foo" :version "v0"}]}} + {:token {:sources [{:token "bar" :version "v1"} + {:token "baz" :version "v2"}]}}} +``` +then the service goes stale when the following condition is true: +``` + (and + # single image expression in the or since there was only one entry in the map + (or (check-image-for-staleness-fn {:name "twosigma/courier" :build "20191002"})) + # two expressions in the or, one for image and one for token + (or (check-image-for-staleness-fn {:name "twosigma/kitchen" :build "20191001"}) + (check-token-for-staleness-fn {:sources [{:token "foo" :version "v0"}]})) + # single token expression in the or since there was only one entry in the map + (or (check-token-for-staleness-fn + {:sources [{:token "bar" :version "v1"} {:token "baz" :version "v2"}]}))) +``` diff --git a/waiter/integration/waiter/basic_test.clj b/waiter/integration/waiter/basic_test.clj index c6b6b94e4..4481a36eb 100644 --- a/waiter/integration/waiter/basic_test.clj +++ b/waiter/integration/waiter/basic_test.clj @@ -315,6 +315,10 @@ (is (= (:x-waiter-cmd headers) (get-in service-settings [:effective-parameters :cmd]))) (is (= "other" (get-in service-settings [:effective-parameters :metric-group])) service-id)) + (let [service-settings (service-settings waiter-url service-id + :query-params {"include" "references"})] + (is (= [{}] (get service-settings :references)) (str service-settings))) + (testing "metric group should be other" (is (= "other" (service-id->metric-group waiter-url service-id)) (str "Invalid metric group for " service-id))) diff --git a/waiter/integration/waiter/token_request_test.clj b/waiter/integration/waiter/token_request_test.clj index bca1727d2..c8257a97c 100644 --- a/waiter/integration/waiter/token_request_test.clj +++ b/waiter/integration/waiter/token_request_test.clj @@ -844,6 +844,10 @@ (is (str/includes? service-id-1 name-string) (str "ERROR: App-name is missing " name-string)) (assert-service-on-all-routers waiter-url service-id-1 cookies) (is (nil? (service-id->source-tokens-entries waiter-url service-id-1))) + + (let [service-settings (service-settings waiter-url service-id-1 :query-params {"include" "references"})] + (is (= [{}] (get service-settings :references)) (str service-settings))) + (let [token (str "^SERVICE-ID#" service-id-1) response (make-request-with-debug-info {:x-waiter-token token} #(make-request waiter-url "" :headers %)) service-id-2 (:service-id response)] @@ -854,7 +858,12 @@ (is (= service-id-1 service-id-2) "The on-the-fly and token-based service ids do not match") (assert-service-on-all-routers waiter-url service-id-1 cookies) (is (= #{(make-source-tokens-entries waiter-url token)} - (service-id->source-tokens-entries waiter-url service-id-2))))))))) + (service-id->source-tokens-entries waiter-url service-id-2))) + (let [service-settings (service-settings waiter-url service-id-2 :query-params {"include" "references"}) + references (set (get service-settings :references))] + (is (contains? references {}) (str service-settings)) + (is (contains? references {:token {:sources [{:token token :version (token->etag waiter-url token)}]}}) + (str service-settings))))))))) (deftest ^:parallel ^:integration-fast test-namespace-token (testing-using-waiter-url @@ -1406,21 +1415,42 @@ (let [service-id-a (retrieve-service-id waiter-url {:x-waiter-token combined-token-header}) new-service-description (update first-service-description :cpus #(+ % 0.1))] + + (let [service-settings (service-settings waiter-url service-id-a :query-params {"include" "references"}) + references (set (get service-settings :references))] + (is (not (contains? references {})) (str service-settings)) + (is (contains? references {:token {:sources [{:token token-name-a :version (token->etag waiter-url token-name-a)} + {:token token-name-b :version (token->etag waiter-url token-name-b)}]}}))) + (let [response (post-token waiter-url (assoc new-service-description :token token-name-a))] (assert-response-status response 200)) (let [service-id-b (retrieve-service-id waiter-url {:x-waiter-token combined-token-header})] + + (let [service-settings (service-settings waiter-url service-id-b :query-params {"include" "references"}) + references (set (get service-settings :references))] + (is (not (contains? references {})) (str service-settings)) + (is (contains? references {:token {:sources [{:token token-name-a :version (token->etag waiter-url token-name-a)} + {:token token-name-b :version (token->etag waiter-url token-name-b)}]}}))) + (let [response (post-token waiter-url (assoc new-service-description :token token-name-b))] (assert-response-status response 200)) - (let [service-id-c (retrieve-service-id waiter-url {:x-waiter-token combined-token-header}) - service-a-details (service-settings waiter-url service-id-a) - service-b-details (service-settings waiter-url service-id-b) - service-c-details (service-settings waiter-url service-id-c)] - (is (nil? (:current-for-tokens service-a-details))) - (is (nil? (:current-for-tokens service-b-details))) - (is (= [token-name-a token-name-b] - (:current-for-tokens service-c-details)))))) + (let [service-id-c (retrieve-service-id waiter-url {:x-waiter-token combined-token-header})] + + (let [service-settings (service-settings waiter-url service-id-c :query-params {"include" "references"}) + references (set (get service-settings :references))] + (is (not (contains? references {})) (str service-settings)) + (is (contains? references {:token {:sources [{:token token-name-a :version (token->etag waiter-url token-name-a)} + {:token token-name-b :version (token->etag waiter-url token-name-b)}]}}))) + + (let [service-a-details (service-settings waiter-url service-id-a) + service-b-details (service-settings waiter-url service-id-b) + service-c-details (service-settings waiter-url service-id-c)] + (is (nil? (:current-for-tokens service-a-details))) + (is (nil? (:current-for-tokens service-b-details))) + (is (= [token-name-a token-name-b] + (:current-for-tokens service-c-details))))))) (finally (delete-token-and-assert waiter-url token-name-a) (delete-token-and-assert waiter-url token-name-b)))))) diff --git a/waiter/src/waiter/core.clj b/waiter/src/waiter/core.clj index 2b559dd8d..5e201db5a 100644 --- a/waiter/src/waiter/core.clj +++ b/waiter/src/waiter/core.clj @@ -763,7 +763,11 @@ curator] (fn synchronize-fn [path f] (let [lock-path (str base-path "/" path)] - (curator/synchronize curator lock-path mutex-timeout-ms f))))}) + (timers/start-stop-time! + (metrics/waiter-timer "core" "synchronize" "all") + (timers/start-stop-time! + (metrics/waiter-timer "core" "synchronize" (str "cs-" path)) + (curator/synchronize curator lock-path mutex-timeout-ms f))))))}) (def scheduler {:scheduler (pc/fnk [[:settings scheduler-config scheduler-syncer-interval-secs] @@ -942,15 +946,17 @@ (sd/refresh-service-descriptions kv-store service-ids))) :request->descriptor-fn (pc/fnk [[:settings [:token-config history-length token-defaults] metric-group-mappings service-description-defaults] [:state fallback-state-atom kv-store service-description-builder service-id-prefix waiter-hostnames] - assoc-run-as-user-approved? can-run-as?-fn store-source-tokens-fn] + assoc-run-as-user-approved? can-run-as?-fn store-reference-fn store-source-tokens-fn] (fn request->descriptor-fn [request] (let [{:keys [latest-descriptor] :as result} (descriptor/request->descriptor assoc-run-as-user-approved? can-run-as?-fn fallback-state-atom kv-store metric-group-mappings history-length service-description-builder service-description-defaults service-id-prefix - token-defaults waiter-hostnames request)] - (when-let [source-tokens (-> latest-descriptor :source-tokens seq)] - (store-source-tokens-fn (:service-id latest-descriptor) source-tokens)) + token-defaults waiter-hostnames request) + {:keys [reference-type->entry service-id source-tokens]} latest-descriptor] + (when (seq source-tokens) + (store-source-tokens-fn service-id source-tokens)) + (store-reference-fn service-id reference-type->entry) result))) :router-metrics-helpers (pc/fnk [[:state passwords router-metrics-agent]] (let [password (first passwords)] @@ -964,14 +970,19 @@ (fn service-description->service-id [service-description] (sd/service-description->service-id service-id-prefix service-description))) :service-id->idle-timeout (pc/fnk [[:settings [:token-config token-defaults]] - service-id->service-description-fn service-id->source-tokens-entries-fn + [:state service-description-builder] + service-id->service-description-fn service-id->references-fn token->token-hash token->token-metadata] - (fn service-id->idle-timeout [service-id] - (sd/service-id->idle-timeout - service-id->service-description-fn service-id->source-tokens-entries-fn - token->token-hash token->token-metadata token-defaults service-id))) + (let [context {:token->token-hash token->token-hash} + reference-type->stale-fn (sd/retrieve-reference-type->stale-fn service-description-builder context)] + (fn service-id->idle-timeout [service-id] + (sd/service-id->idle-timeout + service-id->service-description-fn service-id->references-fn token->token-metadata + reference-type->stale-fn token-defaults service-id)))) :service-id->password-fn (pc/fnk [[:scheduler service-id->password-fn*]] service-id->password-fn*) + :service-id->references-fn (pc/fnk [[:state kv-store]] + (partial sd/service-id->references kv-store)) :service-id->service-description-fn (pc/fnk [[:scheduler service-id->service-description-fn*]] service-id->service-description-fn*) :service-id->source-tokens-entries-fn (pc/fnk [[:state kv-store]] @@ -1001,6 +1012,10 @@ (async/go (when-let [exit-chan (get work-stealing-chan-map [:exit-chan])] (async/>! exit-chan :exit))))) + :store-reference-fn (pc/fnk [[:curator synchronize-fn] + [:state kv-store]] + (fn store-reference-fn [service-id reference] + (sd/store-reference! synchronize-fn kv-store service-id reference))) :store-service-description-fn (pc/fnk [[:state kv-store] validate-service-description-fn] (fn store-service-description [{:keys [core-service-description service-id]}] @@ -1392,8 +1407,8 @@ router-metrics-agent metrics-sync-interval-ms bytes-encryptor bytes-decryptor request)))) :service-handler-fn (pc/fnk [[:daemons autoscaler router-state-maintainer] [:routines allowed-to-manage-service?-fn generate-log-url-fn make-inter-router-requests-sync-fn - router-metrics-helpers service-id->service-description-fn service-id->source-tokens-entries-fn - token->token-hash] + router-metrics-helpers service-id->references-fn service-id->service-description-fn + service-id->source-tokens-entries-fn token->token-hash] [:scheduler scheduler] [:state kv-store router-id scheduler-interactions-thread-pool] wrap-secure-request-fn] @@ -1405,8 +1420,9 @@ (handler/service-handler router-id service-id scheduler kv-store allowed-to-manage-service?-fn generate-log-url-fn make-inter-router-requests-sync-fn service-id->service-description-fn service-id->source-tokens-entries-fn - query-state-fn query-autoscaler-state-fn service-id->metrics-fn - scheduler-interactions-thread-pool token->token-hash request))))) + service-id->references-fn query-state-fn query-autoscaler-state-fn + service-id->metrics-fn scheduler-interactions-thread-pool token->token-hash + request))))) :service-id-handler-fn (pc/fnk [[:routines store-service-description-fn] [:state kv-store] wrap-descriptor-fn wrap-secure-request-fn] @@ -1415,7 +1431,7 @@ wrap-descriptor-fn wrap-secure-request-fn)) :service-list-handler-fn (pc/fnk [[:daemons autoscaler router-state-maintainer] - [:routines prepend-waiter-url router-metrics-helpers + [:routines prepend-waiter-url router-metrics-helpers service-id->references-fn service-id->service-description-fn service-id->source-tokens-entries-fn] [:state entitlement-manager] wrap-secure-request-fn] @@ -1426,8 +1442,8 @@ (fn service-list-handler-fn [request] (handler/list-services-handler entitlement-manager query-state-fn query-autoscaler-state-fn prepend-waiter-url service-id->service-description-fn - service-id->metrics-fn service-id->source-tokens-entries-fn - request))))) + service-id->metrics-fn service-id->references-fn + service-id->source-tokens-entries-fn request))))) :service-override-handler-fn (pc/fnk [[:routines allowed-to-manage-service?-fn make-inter-router-requests-sync-fn] [:state kv-store] wrap-secure-request-fn] diff --git a/waiter/src/waiter/handler.clj b/waiter/src/waiter/handler.clj index 8bee7383d..2a95b8a02 100644 --- a/waiter/src/waiter/handler.clj +++ b/waiter/src/waiter/handler.clj @@ -258,7 +258,7 @@ "Retrieves the list of services viewable by the currently logged in user. A service is viewable by the run-as-user or a waiter super-user." [entitlement-manager query-state-fn query-autoscaler-state-fn prepend-waiter-url service-id->service-description-fn - service-id->metrics-fn service-id->source-tokens-entries-fn request] + service-id->metrics-fn service-id->references-fn service-id->source-tokens-entries-fn request] (let [{:keys [all-available-service-ids service-id->healthy-instances service-id->unhealthy-instances] :as global-state} (query-state-fn)] (let [{:strs [run-as-user token token-version] :as request-params} (-> request ru/query-params-request :query-params) auth-user (get request :authorization/user) @@ -286,7 +286,9 @@ {:healthy-instances (-> service-id->healthy-instances (get service-id) count) :unhealthy-instances (-> service-id->unhealthy-instances (get service-id) count)}) service-id->metrics (service-id->metrics-fn) - include-effective-parameters? (utils/request-flag request-params "effective-parameters") + include-effective-parameters? (or (utils/request-flag request-params "effective-parameters") + (utils/param-contains? request-params "include" "request-info")) + include-references? (utils/param-contains? request-params "include" "references") response-data (map (fn service-id->service-info [service-id] (let [scaling-state (retrieve-scaling-state query-autoscaler-state-fn service-id) @@ -302,6 +304,8 @@ include-effective-parameters? (assoc :effective-parameters (service-id->service-description-fn service-id :effective? true)) + include-references? + (assoc :references (seq (service-id->references-fn service-id))) scaling-state (assoc :scaling-state scaling-state) (seq source-tokens-entries) @@ -382,8 +386,8 @@ (defn- get-service-handler "Returns details about the service such as the service description, metrics, instances, etc." [router-id service-id core-service-description kv-store generate-log-url-fn make-inter-router-requests-fn - service-id->service-description-fn service-id->source-tokens-entries-fn query-state-fn query-autoscaler-state-fn - service-id->metrics-fn token->token-hash request] + service-id->service-description-fn service-id->source-tokens-entries-fn service-id->references-fn + query-state-fn query-autoscaler-state-fn service-id->metrics-fn token->token-hash request] (let [global-state (query-state-fn) service-instance-maps (try (let [assoc-log-url-to-instances @@ -424,7 +428,9 @@ source-tokens-entries (service-id->source-tokens-entries-fn service-id) current-for-tokens (get-current-for-tokens source-tokens-entries token->token-hash) request-params (-> request ru/query-params-request :query-params) - include-effective-parameters? (utils/request-flag request-params "effective-parameters") + include-effective-parameters? (or (utils/request-flag request-params "effective-parameters") + (utils/param-contains? request-params "include" "request-info")) + include-references? (utils/param-contains? request-params "include" "references") last-request-time (get-in (service-id->metrics-fn) [service-id "last-request-time"]) scaling-state (retrieve-scaling-state query-autoscaler-state-fn service-id) result-map (cond-> {:num-routers (count router->metrics) @@ -441,6 +447,8 @@ (assoc-in [:metrics :aggregate] aggregate-metrics-map) (not-empty router->metrics) (assoc-in [:metrics :routers] router->metrics) + include-references? + (assoc :references (seq (service-id->references-fn service-id))) scaling-state (assoc :scaling-state scaling-state) (not-empty core-service-description) @@ -461,8 +469,8 @@ :delete deletes the service from the scheduler (after authorization checks). :get returns details about the service such as the service description, metrics, instances, etc." [router-id service-id scheduler kv-store allowed-to-manage-service?-fn generate-log-url-fn make-inter-router-requests-fn - service-id->service-description-fn service-id->source-tokens-entries-fn query-state-fn query-autoscaler-state-fn - service-id->metrics-fn scheduler-interactions-thread-pool token->token-hash request] + service-id->service-description-fn service-id->source-tokens-entries-fn service-id->references-fn query-state-fn + query-autoscaler-state-fn service-id->metrics-fn scheduler-interactions-thread-pool token->token-hash request] (try (when-not service-id (throw (ex-info "Missing service-id" {:log-level :info :status 400}))) @@ -474,7 +482,8 @@ scheduler-interactions-thread-pool request) :get (get-service-handler router-id service-id core-service-description kv-store generate-log-url-fn make-inter-router-requests-fn service-id->service-description-fn - service-id->source-tokens-entries-fn query-state-fn query-autoscaler-state-fn + service-id->source-tokens-entries-fn service-id->references-fn + query-state-fn query-autoscaler-state-fn service-id->metrics-fn token->token-hash request)))) (catch Exception ex (utils/exception->response ex request)))) diff --git a/waiter/src/waiter/service_description.clj b/waiter/src/waiter/service_description.clj index 44d37ab98..71b8b948e 100644 --- a/waiter/src/waiter/service_description.clj +++ b/waiter/src/waiter/service_description.clj @@ -18,6 +18,7 @@ [clojure.set :as set] [clojure.string :as str] [clojure.tools.logging :as log] + [clojure.walk :as walk] [digest] [metrics.meters :as meters] [plumbing.core :as pc] @@ -533,6 +534,10 @@ (build [this core-service-description args-map] "Returns a map of {:service-id ..., :service-description ..., :core-service-description...}") + (retrieve-reference-type->stale-fn [this context] + "Returns a map of reference type to stale function for references of the specified type. + The values are functions that have the following signature (fn reference-entry)") + (state [this] "Returns the global (i.e. non-service-specific) state the service description builder is maintaining") @@ -545,12 +550,21 @@ [service-description username] (assoc service-description "run-as-user" username "permitted-user" username)) +(defn service-token-references-stale? + "Returns true if every token used to access a service has been updated." + [token->token-hash source-tokens] + (and (seq source-tokens) + ;; safe assumption mark a service stale when every token used to access it is stale + (every? (fn [{:keys [token version]}] + (not= (token->token-hash token) version)) + source-tokens))) + (defrecord DefaultServiceDescriptionBuilder [max-constraints-schema] ServiceDescriptionBuilder (build [_ user-service-description {:keys [assoc-run-as-user-approved? component->previous-descriptor-fns defaults kv-store - metric-group-mappings service-id-prefix username]}] + metric-group-mappings reference-type->entry service-id-prefix source-tokens username]}] (let [core-service-description (if (get user-service-description "run-as-user") user-service-description (let [candidate-service-description (assoc-run-as-requester-fields user-service-description username) @@ -562,12 +576,19 @@ user-service-description))) service-id (service-description->service-id service-id-prefix core-service-description) service-description (default-and-override core-service-description metric-group-mappings - kv-store defaults service-id)] + kv-store defaults service-id) + reference-type->entry (cond-> (or reference-type->entry {}) + (seq source-tokens) + (assoc :token {:sources (map walk/keywordize-keys source-tokens)}))] {:component->previous-descriptor-fns component->previous-descriptor-fns :core-service-description core-service-description + :reference-type->entry reference-type->entry :service-description service-description :service-id service-id})) + (retrieve-reference-type->stale-fn [_ {:keys [token->token-hash]}] + {:token (fn [{:keys [sources]}] (service-token-references-stale? token->token-hash sources))}) + (state [_] {}) @@ -946,25 +967,25 @@ (throw (ex-info "Cannot use run-as-requester with a specific namespace" {:namespace raw-namespace :run-as-user raw-run-as-user :status 400 :log-level :warn}))) (sling/try+ - (let [{:keys [component->previous-descriptor-fns core-service-description service-description service-id]} - (build service-description-builder user-service-description - {:assoc-run-as-user-approved? assoc-run-as-user-approved? - :component->previous-descriptor-fns component->previous-descriptor-fns - :defaults defaults - :kv-store kv-store - :metric-group-mappings metric-group-mappings - :service-id-prefix service-id-prefix - :username username}) + (let [build-map (build service-description-builder user-service-description + {:assoc-run-as-user-approved? assoc-run-as-user-approved? + :component->previous-descriptor-fns component->previous-descriptor-fns + :defaults defaults + :kv-store kv-store + :metric-group-mappings metric-group-mappings + :reference-type->entry {} + :service-id-prefix service-id-prefix + :source-tokens source-tokens + :username username}) service-preauthorized (and token-preauthorized (empty? service-description-based-on-headers)) service-authentication-disabled (and token-authentication-disabled (empty? service-description-based-on-headers))] - {:component->previous-descriptor-fns component->previous-descriptor-fns - :core-service-description core-service-description - :on-the-fly? on-the-fly? - :service-authentication-disabled service-authentication-disabled - :service-description service-description - :service-id service-id - :service-preauthorized service-preauthorized - :source-tokens source-tokens}) + (-> build-map + (select-keys [:component->previous-descriptor-fns :core-service-description :reference-type->entry + :service-description :service-id]) + (assoc :on-the-fly? on-the-fly? + :service-authentication-disabled service-authentication-disabled + :service-preauthorized service-preauthorized + :source-tokens source-tokens))) (catch [:type :service-description-error] ex-data (throw (ex-info (:message ex-data) (merge (error-message-map-fn passthrough-headers waiter-headers) ex-data) @@ -1068,37 +1089,84 @@ (> (+ auth-timestamp (-> consent-expiry-days t/days t/in-millis)) (.getMillis ^DateTime (clock)))))) +(defn source-tokens->idle-timeout + "Computes the idle timeout of the service using token parameters and defaults." + [token->token-metadata token-defaults source-tokens-seq] + (->> source-tokens-seq + (map (fn source-tokens->idle-timeout [source-tokens] + (let [{:strs [fallback-period-secs stale-timeout-mins]} + (->> source-tokens + (map #(some-> % :token token->token-metadata)) + (reduce merge token-defaults))] + (+ (-> (+ fallback-period-secs (dec (-> 1 t/minutes t/in-seconds))) ;; ceiling + t/seconds + t/in-minutes) + stale-timeout-mins)))) + (reduce max))) + +(defn stale-reference? + "Returns true if the any entry in the reference has gone stale. + An empty, i.e. directly accessible, reference is never stale." + [reference-type->stale-fn reference] + (some (fn [[type entry]] + (when-let [stale-fn (reference-type->stale-fn type)] + (stale-fn entry))) + (seq reference))) + +(defn service-references-stale? + "Returns true if every entry in the references has gone stale." + [reference-type->stale-fn references] + (every? (fn [reference] (stale-reference? reference-type->stale-fn reference)) references)) + (defn service-id->idle-timeout "Computes the idle timeout, in minutes, for a given service. If the service is active or was created by on-the-fly, the idle timeout is retrieved from the service description. Else, the idle timeout is the sum of the fallback period seconds and the stale service timeout." - [service-id->service-description-fn service-id->source-tokens-set-fn token->token-hash token->token-metadata + [service-id->service-description-fn service-id->references-fn token->token-metadata reference-type->stale-fn token-defaults service-id] (let [{:strs [idle-timeout-mins]} (service-id->service-description-fn service-id) - source-tokens-set (service-id->source-tokens-set-fn service-id)] - (if (and (seq source-tokens-set) - ;; safe assumption mark a service stale when every token used to access it is stale - (every? (fn [source-tokens] - (and (seq source-tokens) - (every? (fn [{:strs [token version]}] - (not= (token->token-hash token) version)) - source-tokens))) - source-tokens-set)) - (do + references (service-id->references-fn service-id)] + (if (service-references-stale? reference-type->stale-fn references) + (let [{:strs [stale-timeout-mins]} token-defaults] (log/info service-id "that uses tokens is stale") - (->> source-tokens-set - (map (fn source-tokens->idle-timeout [source-tokens] - (let [{:strs [fallback-period-secs stale-timeout-mins]} - (->> source-tokens - (map #(token->token-metadata (get % "token"))) - (reduce merge token-defaults))] - (+ (-> (+ fallback-period-secs (dec (-> 1 t/minutes t/in-seconds))) ;; ceiling - t/seconds - t/in-minutes) - stale-timeout-mins)))) - (reduce max))) + (if-let [source-tokens (->> references (map :token) (remove nil?) (map :sources) seq)] + (source-tokens->idle-timeout token->token-metadata token-defaults source-tokens) + ;; use the default token stale timeout for a stale service built without tokens + stale-timeout-mins)) idle-timeout-mins))) +(let [service-id->key #(str "^REFERENCES#" %)] + + (defn service-id->references + "Retrieves the reference entries (as a set) for a service from the key-value store." + [kv-store service-id & {:keys [refresh] :or {refresh false}}] + (let [keys (service-id->key service-id)] + (kv/fetch kv-store keys :refresh refresh))) + + (defn store-reference! + "Stores the reference entries in the key-value store against a service." + [synchronize-fn kv-store service-id reference] + (when-not (-> (service-id->references kv-store service-id) + (or #{}) + (contains? reference)) + (let [reference-lock-prefix "REFERENCES-LOCK-" + bucket (-> service-id hash int (Math/abs) (mod 16)) + reference-lock (str reference-lock-prefix bucket)] + (meters/mark! (metrics/waiter-meter "core" "reference" "store" "all")) + (meters/mark! (metrics/waiter-meter "core" "reference" "store" (str "bucket-" bucket))) + (synchronize-fn + reference-lock + (fn inner-store-reference! [] + (let [existing-entries (or (service-id->references kv-store service-id :refresh true) #{})] + (if-not (contains? existing-entries reference) + (do + (meters/mark! (metrics/waiter-meter "core" "reference" "store" "new-entry")) + (log/info "storing new reference for" service-id reference) + (kv/store kv-store (service-id->key service-id) (conj existing-entries reference)) + ;; refresh the entry + (service-id->references kv-store service-id :refresh true)) + (log/debug "reference already associated with" service-id reference))))))))) + (let [service-id->key #(str "^SOURCE-TOKENS#" %)] (defn service-id->source-tokens-entries @@ -1118,6 +1186,7 @@ "Stores a source-tokens entry in the key-value store against a service." [synchronize-fn kv-store service-id source-tokens] (when (and (seq source-tokens) + ;; guard to avoid relatively expensive synchronize-fn invocation (not (has-source-tokens? kv-store service-id source-tokens))) (let [source-tokens-lock-prefix "SOURCE-TOKENS-LOCK-" bucket (-> service-id hash int (Math/abs) (mod 16)) diff --git a/waiter/test/waiter/core_test.clj b/waiter/test/waiter/core_test.clj index 09ff6a133..1a256c523 100644 --- a/waiter/test/waiter/core_test.clj +++ b/waiter/test/waiter/core_test.clj @@ -417,6 +417,7 @@ :generate-log-url-fn nil :make-inter-router-requests-sync-fn nil :router-metrics-helpers {:service-id->metrics-fn (constantly {})} + :service-id->references-fn (constantly []) :service-id->service-description-fn (constantly {}) :service-id->source-tokens-entries-fn (constantly #{}) :token->token-hash identity} @@ -513,6 +514,7 @@ :generate-log-url-fn (partial handler/generate-log-url #(str "http://www.example.com" %)) :make-inter-router-requests-sync-fn nil :router-metrics-helpers {:service-id->metrics-fn (constantly service-id->metrics)} + :service-id->references-fn (constantly []) :service-id->service-description-fn (constantly {}) :service-id->source-tokens-entries-fn (constantly #{}) :token->token-hash identity} diff --git a/waiter/test/waiter/descriptor_test.clj b/waiter/test/waiter/descriptor_test.clj index 5782fae65..2a08a5637 100644 --- a/waiter/test/waiter/descriptor_test.clj +++ b/waiter/test/waiter/descriptor_test.clj @@ -18,6 +18,7 @@ [clojure.core.async :as async] [clojure.set :as set] [clojure.test :refer :all] + [clojure.walk :as walk] [plumbing.core :as pc] [waiter.descriptor :refer :all] [waiter.kv :as kv] @@ -519,6 +520,11 @@ (let [template-basic (dissoc template "last-update-time" "previous")] (-> descriptor (update :core-service-description merge template-basic) + (update :reference-type->entry + (fn [reference] + (cond-> (dissoc reference component) + (get-in template ["previous" "last-update-time"]) + (assoc component {:version (str "v" (get-in template ["previous" "last-update-time"]))})))) (update :service-description merge template-basic) (assoc-in [:sources component] (get template "previous"))))))) sources {:cmd-source {"cmd" "ls2" @@ -538,6 +544,7 @@ [:cmd-source :cpu-source]) :core-service-description service-description-1 :passthrough-headers passthrough-headers + :reference-type->entry {:cmd-source {:version "v8"} :cpu-source {:version "v10"}} :service-description service-description-1 :sources sources :waiter-headers waiter-headers}] @@ -546,6 +553,7 @@ template-basic-1 (dissoc template-1 "last-update-time" "previous")] (is (= (-> curr-descriptor (update :core-service-description merge template-basic-1) + (assoc :reference-type->entry {:cmd-source {:version "v8"} :cpu-source {:version "v4"}}) (update :service-description merge template-basic-1) (assoc-in [:sources :cpu-source] (get template-1 "previous"))) prev-descriptor-1)) @@ -555,6 +563,7 @@ template-basic-2 (dissoc template-2 "last-update-time" "previous")] (is (= (-> prev-descriptor-1 (update :core-service-description merge template-basic-2) + (assoc :reference-type->entry {:cmd-source {:version "v6"} :cpu-source {:version "v4"}}) (update :service-description merge template-basic-2) (assoc-in [:sources :cmd-source] (get template-2 "previous"))) prev-descriptor-2)) @@ -562,8 +571,9 @@ (let [prev-descriptor-3 (descriptor->previous-descriptor kv-store builder prev-descriptor-2) template-3 (get-in curr-descriptor [:sources :cmd-source "previous"]) template-basic-3 (dissoc template-3 "last-update-time" "previous")] - (is (= (-> prev-descriptor-2 + (is (= (-> prev-descriptor-3 (update :core-service-description merge template-basic-3) + (assoc-in [:reference-type->entry :cpu-source :version] "v4") (update :service-description merge template-basic-3) (assoc-in [:sources :cmd-source] (get template-3 "previous"))) prev-descriptor-3)) @@ -573,6 +583,7 @@ template-basic-4 (dissoc template-4 "last-update-time" "previous")] (is (= (-> prev-descriptor-3 (update :core-service-description merge template-basic-4) + (assoc :reference-type->entry {}) (update :service-description merge template-basic-4) (assoc-in [:sources :cpu-source] (get template-4 "previous"))) prev-descriptor-4)) @@ -624,6 +635,12 @@ prev-descriptor)) (is (nil? (descriptor->previous-descriptor kv-store builder prev-descriptor)))))) + (defn reference-tokens-entry + "Creates an entry for the source-tokens field" + [token token-data] + (walk/keywordize-keys + (sd/source-tokens-entry token token-data))) + (deftest test-descriptor->previous-descriptor-single-token-with-previous (let [test-token "test-token" token-data-1 {"cmd" "ls" "cpus" 1 "mem" 32 "run-as-user" "ru" "version" "foo1"} @@ -640,6 +657,7 @@ passthrough-headers {} waiter-headers {} current-descriptor (-> {:passthrough-headers passthrough-headers + :reference-type->entry {:token {:sources [(reference-tokens-entry test-token token-data-2)]}} :sources sources :waiter-headers waiter-headers} (attach-token-fallback-source token-defaults build-service-description-and-id-helper)) @@ -648,6 +666,7 @@ :core-service-description service-description-1 :on-the-fly? nil :passthrough-headers passthrough-headers + :reference-type->entry {:token {:sources [(reference-tokens-entry test-token token-data-1)]}} :service-authentication-disabled false :service-description (merge (:defaults sources) service-description-1) :service-id (sd/service-description->service-id service-id-prefix service-description-1) @@ -679,6 +698,7 @@ passthrough-headers {} waiter-headers {} current-descriptor (-> {:passthrough-headers passthrough-headers + :reference-type->entry {:token {:sources [(reference-tokens-entry test-token token-data-3)]}} :sources sources :waiter-headers waiter-headers} (attach-token-fallback-source token-defaults build-service-description-and-id-helper)) @@ -687,6 +707,7 @@ :core-service-description service-description-1 :on-the-fly? nil :passthrough-headers passthrough-headers + :reference-type->entry {:token {:sources [(reference-tokens-entry test-token token-data-1)]}} :service-authentication-disabled false :service-description (merge (:defaults sources) service-description-1) :service-id (sd/service-description->service-id service-id-prefix service-description-1) @@ -718,6 +739,7 @@ passthrough-headers {} waiter-headers {"x-waiter-cpus" 20} current-descriptor (-> {:passthrough-headers passthrough-headers + :reference-type->entry {:token {:sources [(reference-tokens-entry test-token token-data-2)]}} :sources sources :waiter-headers waiter-headers} (attach-token-fallback-source token-defaults build-service-description-and-id-helper)) @@ -727,6 +749,7 @@ :core-service-description expected-core-service-description :on-the-fly? true :passthrough-headers passthrough-headers + :reference-type->entry {:token {:sources [(reference-tokens-entry test-token token-data-1)]}} :service-authentication-disabled false :service-description (merge (:defaults sources) expected-core-service-description) :service-id (sd/service-description->service-id service-id-prefix expected-core-service-description) @@ -783,6 +806,7 @@ passthrough-headers {} waiter-headers {} current-descriptor (-> {:passthrough-headers passthrough-headers + :reference-type->entry {:token {:sources (:source-tokens sources)}} :sources sources :waiter-headers waiter-headers} (attach-token-fallback-source token-defaults build-service-description-and-id-helper)) @@ -793,6 +817,8 @@ :core-service-description expected-core-service-description :on-the-fly? nil :passthrough-headers passthrough-headers + :reference-type->entry {:token {:sources [(reference-tokens-entry test-token-1 token-data-1) + (reference-tokens-entry test-token-2 token-data-2p)]}} :service-authentication-disabled false :service-description (merge (:defaults sources) expected-core-service-description) :service-id (sd/service-description->service-id service-id-prefix expected-core-service-description) @@ -817,6 +843,8 @@ :core-service-description expected-core-service-description :on-the-fly? nil :passthrough-headers passthrough-headers + :reference-type->entry {:token {:sources [(reference-tokens-entry test-token-1 token-data-1p) + (reference-tokens-entry test-token-2 token-data-2p)]}} :service-authentication-disabled false :service-description (merge (:defaults sources) expected-core-service-description) :service-id (sd/service-description->service-id service-id-prefix expected-core-service-description) diff --git a/waiter/test/waiter/handler_test.clj b/waiter/test/waiter/handler_test.clj index e7860be6a..1b39f0a27 100644 --- a/waiter/test/waiter/handler_test.clj +++ b/waiter/test/waiter/handler_test.clj @@ -421,6 +421,18 @@ other-user-services #{"service4" "service5" "service6"} healthy-services #{"service1" "service2" "service4" "service6" "service7" "service8" "service9"} unhealthy-services #{"service2" "service3" "service5"} + service-id->references {"service1" {:sources [{:token "t1.org" :version "v1"} {:token "t2.com" :version "v2"}] + :type :token } + "service3" {:sources [{:token "t2.com" :version "v2"} {:token "t3.edu" :version "v3"}] + :type :token } + "service4" {:sources [{:token "t1.org" :version "v1"} {:token "t2.com" :version "v2"}] + :type :token } + "service5" {:sources [{:token "t1.org" :version "v1"} {:token "t3.edu" :version "v3"}] + :type :token } + "service7" {:sources [{:token "t1.org" :version "v2"} {:token "t2.com" :version "v1"}] + :type :token } + "service9" {:sources [{:token "t2.com" :version "v3"}] + :type :token }} service-id->source-tokens {"service1" [{:token "t1.org" :version "v1"} {:token "t2.com" :version "v2"}] "service3" [{:token "t2.com" :version "v2"} {:token "t3.edu" :version "v3"}] "service4" [{:token "t1.org" :version "v1"} {:token "t2.com" :version "v2"}] @@ -456,6 +468,8 @@ (is (instance-counts-present body)))] (letfn [(service-id->metrics-fn [] {}) + (service-id->references-fn [service-id] + (get service-id->references service-id)) (service-id->service-description-fn [service-id & _] {"run-as-user" (if (contains? test-user-services service-id) test-user "another-user")}) (service-id->source-tokens-entries-fn [service-id] @@ -467,7 +481,7 @@ (let [{:keys [body] :as response} (list-services-handler entitlement-manager query-state-fn query-autoscaler-state-fn prepend-waiter-url service-id->service-description-fn service-id->metrics-fn - service-id->source-tokens-entries-fn request)] + service-id->references-fn service-id->source-tokens-entries-fn request)] (assert-successful-json-response response) (is (= test-user-services (->> body json/read-str walk/keywordize-keys (map :service-id) set))))) @@ -476,7 +490,7 @@ (let [{:keys [body] :as response} (list-services-handler entitlement-manager query-state-fn query-autoscaler-state-fn prepend-waiter-url service-id->service-description-fn service-id->metrics-fn - service-id->source-tokens-entries-fn request)] + service-id->references-fn service-id->source-tokens-entries-fn request)] (assert-successful-json-response response) (is (= other-user-services (->> body json/read-str walk/keywordize-keys (map :service-id) set)))))) @@ -489,7 +503,7 @@ (let [{:keys [body] :as response} (list-services-handler entitlement-manager query-state-fn query-autoscaler-state-fn prepend-waiter-url service-id->service-description-fn service-id->metrics-fn - service-id->source-tokens-entries-fn request)] + service-id->references-fn service-id->source-tokens-entries-fn request)] (assert-successful-json-response response) (is (= other-user-services (->> body json/read-str walk/keywordize-keys (map :service-id) set)))))) @@ -502,7 +516,7 @@ (let [{:keys [body] :as response} (list-services-handler entitlement-manager query-state-fn query-autoscaler-state-fn prepend-waiter-url service-id->service-description-fn service-id->metrics-fn - service-id->source-tokens-entries-fn request)] + service-id->references-fn service-id->source-tokens-entries-fn request)] (assert-successful-json-response response) (is (= all-services (->> body json/read-str walk/keywordize-keys (map :service-id) set)))))) @@ -515,7 +529,7 @@ (let [{:keys [body] :as response} (list-services-handler entitlement-manager query-state-fn query-autoscaler-state-fn prepend-waiter-url service-id->service-description-fn service-id->metrics-fn - service-id->source-tokens-entries-fn request)] + service-id->references-fn service-id->source-tokens-entries-fn request)] (assert-successful-json-response response) (is (= all-services (->> body json/read-str walk/keywordize-keys (map :service-id) set)))))) @@ -528,7 +542,7 @@ (let [{:keys [body] :as response} (list-services-handler entitlement-manager query-state-fn query-autoscaler-state-fn prepend-waiter-url service-id->service-description-fn service-id->metrics-fn - service-id->source-tokens-entries-fn request)] + service-id->references-fn service-id->source-tokens-entries-fn request)] (assert-successful-json-response response) (is (= other-user-services (->> body json/read-str walk/keywordize-keys (map :service-id) set)))))) @@ -541,7 +555,7 @@ (let [{:keys [body] :as response} (list-services-handler entitlement-manager query-state-fn query-autoscaler-state-fn prepend-waiter-url service-id->service-description-fn service-id->metrics-fn - service-id->source-tokens-entries-fn request)] + service-id->references-fn service-id->source-tokens-entries-fn request)] (assert-successful-json-response response) (is (= other-user-services (->> body json/read-str walk/keywordize-keys (map :service-id) set)))))) @@ -554,7 +568,7 @@ list-services-handler (core/wrap-error-handling #(list-services-handler entitlement-manager query-state-fn query-autoscaler-state-fn prepend-waiter-url service-id->service-description-fn service-id->metrics-fn - service-id->source-tokens-entries-fn %)) + service-id->references-fn service-id->source-tokens-entries-fn %)) {:keys [body headers status]} (list-services-handler request)] (is (= 400 status)) (is (= "text/plain" (get headers "content-type"))) @@ -570,7 +584,7 @@ ; without a run-as-user, should return all apps (list-services-handler entitlement-manager query-state-fn query-autoscaler-state-fn prepend-waiter-url service-id->service-description-fn service-id->metrics-fn - service-id->source-tokens-entries-fn request)] + service-id->references-fn service-id->source-tokens-entries-fn request)] (assert-successful-json-response response) (is (= all-services (->> body json/read-str walk/keywordize-keys (map :service-id) set))))) @@ -589,7 +603,7 @@ ; without a run-as-user, should return all apps (list-services-handler entitlement-manager query-state-fn query-autoscaler-state-fn prepend-waiter-url service-id->service-description-fn service-id->metrics-fn - service-id->source-tokens-entries-fn request)] + service-id->references-fn service-id->source-tokens-entries-fn request)] (assert-successful-json-response response) (is (= (->> service-id->source-tokens (filter (fn [[_ source-tokens]] @@ -613,7 +627,7 @@ ; without a run-as-user, should return all apps (list-services-handler entitlement-manager query-state-fn query-autoscaler-state-fn prepend-waiter-url service-id->service-description-fn service-id->metrics-fn - service-id->source-tokens-entries-fn request)] + service-id->references-fn service-id->source-tokens-entries-fn request)] (assert-successful-json-response response) (is (= (->> service-id->source-tokens (filter (fn [[_ source-tokens]] @@ -629,7 +643,7 @@ ; without a run-as-user, should return all apps (list-services-handler entitlement-manager query-state-fn query-autoscaler-state-fn prepend-waiter-url service-id->service-description-fn service-id->metrics-fn - service-id->source-tokens-entries-fn request)] + service-id->references-fn service-id->source-tokens-entries-fn request)] (assert-successful-json-response response) (is (= (->> service-id->source-tokens (filter (fn [[_ source-tokens]] diff --git a/waiter/test/waiter/service_description_test.clj b/waiter/test/waiter/service_description_test.clj index a7e0ccba7..62382a2fc 100644 --- a/waiter/test/waiter/service_description_test.clj +++ b/waiter/test/waiter/service_description_test.clj @@ -820,7 +820,7 @@ (deftest test-compute-service-description-source-tokens (let [defaults {"health-check-url" "/ping", "permitted-user" "bob"} - source-tokens (Object.) + source-tokens [:foo-bar] sources {:defaults defaults :service-description-template {"cmd" "token-cmd"} :source-tokens source-tokens} @@ -2160,112 +2160,121 @@ token-defaults {"fallback-period-secs" fallback-period-secs "stale-timeout-mins" stale-timeout-mins} idle-timeout-mins 25 - service-id "test-service-id" + service-id "test-service-id-" + service-id->service-description-fn (fn [in-service-id] + (is (str/starts-with? in-service-id service-id)) + {"idle-timeout-mins" idle-timeout-mins}) token->token-hash (fn [in-token] (str in-token ".hash1")) - token->token-metadata-fn (fn [token->token-data] - (fn token->token-metadata [in-token] - (-> in-token - token->token-data - (select-keys token-metadata-keys))))] + reference-type->stale-fn {:token #(service-token-references-stale? token->token-hash (:sources %))} + token->token-metadata-factory (fn [token->token-data] + (fn [in-token] + (-> in-token token->token-data (select-keys token-metadata-keys))))] + (testing "service with single token is active" (let [token->token-data {"t1" {"cpus" 1}} - service-id->service-description-fn (fn [in-service-id] - (is (= service-id in-service-id)) - {"idle-timeout-mins" idle-timeout-mins}) - service-id->source-token-entries-fn (fn [in-service-id] - (is (= service-id in-service-id)) - #{[{"token" "t1" "version" "t1.hash1"}]}) - token->token-metadata (token->token-metadata-fn token->token-data)] + service-id->references-fn (fn [in-service-id] + (is (= in-service-id (str service-id "s1"))) + #{{:token {:sources [{:token "t1" :version "t1.hash1"}]}}}) + token->token-metadata (token->token-metadata-factory token->token-data)] (is (= idle-timeout-mins (service-id->idle-timeout - service-id->service-description-fn service-id->source-token-entries-fn token->token-hash - token->token-metadata token-defaults service-id))))) + service-id->service-description-fn service-id->references-fn token->token-metadata reference-type->stale-fn + token-defaults (str service-id "s1")))))) + + (testing "direct access service is active" + (let [token->token-data {"t1" {"cpus" 1}} + service-id->references-fn (fn [in-service-id] + (is (= in-service-id (str service-id "s2"))) + #{{}}) + token->token-metadata (token->token-metadata-factory token->token-data)] + (is (= idle-timeout-mins + (service-id->idle-timeout + service-id->service-description-fn service-id->references-fn token->token-metadata reference-type->stale-fn + token-defaults (str service-id "s2")))))) (testing "service with multiple tokens is active" (let [token->token-data {"t1" {"cpus" 1} "t2" {"mem" 2048}} - service-id->service-description-fn (fn [in-service-id] - (is (= service-id in-service-id)) - {"idle-timeout-mins" idle-timeout-mins}) - service-id->source-token-entries-fn (fn [in-service-id] - (is (= service-id in-service-id)) - #{[{"token" "t1" "version" "t1.hash1"} {"token" "t2" "version" "t2.hash1"}]}) - token->token-metadata (token->token-metadata-fn token->token-data)] + service-id->references-fn (fn [in-service-id] + (is (= in-service-id (str service-id "s3"))) + #{{:token {:sources [{:token "t1" :version "t1.hash1"} {:token "t2" :version "t2.hash1"}]}}}) + token->token-metadata (token->token-metadata-factory token->token-data)] (is (= idle-timeout-mins (service-id->idle-timeout - service-id->service-description-fn service-id->source-token-entries-fn token->token-hash - token->token-metadata token-defaults service-id))))) + service-id->service-description-fn service-id->references-fn token->token-metadata reference-type->stale-fn + token-defaults (str service-id "s3")))))) (testing "service outdated but fallback not configured" (let [token->token-data {"t1" {"cpus" 1} "t2" {"mem" 2048}} - service-id->service-description-fn (fn [in-service-id] - (is (= service-id in-service-id)) - {"idle-timeout-mins" idle-timeout-mins}) - service-id->source-token-entries-fn (fn [in-service-id] - (is (= service-id in-service-id)) - #{[{"token" "t1" "version" "t1.hash0"}]}) - token->token-metadata (token->token-metadata-fn token->token-data)] + service-id->references-fn (fn [in-service-id] + (is (= in-service-id (str service-id "s4"))) + #{{:token {:sources [{:token "t1" :version "t1.hash0"}]}}}) + token->token-metadata (token->token-metadata-factory token->token-data)] (is (= (-> (+ fallback-period-secs (dec (-> 1 t/minutes t/in-seconds))) - t/seconds - t/in-minutes - (+ stale-timeout-mins)) + t/seconds + t/in-minutes + (+ stale-timeout-mins)) + (service-id->idle-timeout + service-id->service-description-fn service-id->references-fn token->token-metadata reference-type->stale-fn + token-defaults (str service-id "s4")))))) + + (testing "service outdated with tokens but direct access possible" + (let [token->token-data {"t1" {"cpus" 1} + "t2" {"mem" 2048}} + service-id->references-fn (fn [in-service-id] + (is (= in-service-id (str service-id "s5"))) + #{{:token {:sources [{:token "t1" :version "t1.hash0"}]}} + {}}) + token->token-metadata (token->token-metadata-factory token->token-data)] + (is (= idle-timeout-mins (service-id->idle-timeout - service-id->service-description-fn service-id->source-token-entries-fn token->token-hash - token->token-metadata token-defaults service-id))))) + service-id->service-description-fn service-id->references-fn token->token-metadata reference-type->stale-fn + token-defaults (str service-id "s5")))))) (testing "service outdated and fallback configured on one token" (let [token->token-data {"t1" {"cpus" 1 "fallback-period-secs" 300} "t2" {"mem" 2048}} - service-id->service-description-fn (fn [in-service-id] - (is (= service-id in-service-id)) - {"idle-timeout-mins" idle-timeout-mins}) - service-id->source-token-entries-fn (fn [in-service-id] - (is (= service-id in-service-id)) - #{[{"token" "t1" "version" "t1.hash1"} {"token" "t2" "version" "t2.hash0"}]}) - token->token-metadata (token->token-metadata-fn token->token-data)] + service-id->references-fn (fn [in-service-id] + (is (= in-service-id (str service-id "s6"))) + #{{:token {:sources [{:token "t1" :version "t1.hash1"} {:token "t2" :version "t2.hash0"}]}}}) + token->token-metadata (token->token-metadata-factory token->token-data)] (is (= idle-timeout-mins (service-id->idle-timeout - service-id->service-description-fn service-id->source-token-entries-fn token->token-hash - token->token-metadata token-defaults service-id))))) + service-id->service-description-fn service-id->references-fn token->token-metadata reference-type->stale-fn + token-defaults (str service-id "s6")))))) (testing "service outdated on some tokens and fallback and timeout configured on all tokens" (let [stale-timeout-mins 45 token->token-data {"t1" {"cpus" 123 "fallback-period-secs" 300} "t2" {"cmd" "tc" "fallback-period-secs" 600 "stale-timeout-mins" stale-timeout-mins} "t3" {"cmd" "tc" "fallback-period-secs" 900}} - service-id->service-description-fn (fn [in-service-id] - (is (= service-id in-service-id)) - {"idle-timeout-mins" idle-timeout-mins}) - service-id->source-token-entries-fn (fn [in-service-id] - (is (= service-id in-service-id)) - #{[{"token" "t1" "version" "t1.hash1"} - {"token" "t2" "version" "t2.hash0"} - {"token" "t3" "version" "t3.hash0"}]}) - token->token-metadata (token->token-metadata-fn token->token-data)] + service-id->references-fn (fn [in-service-id] + (is (= in-service-id (str service-id "s7"))) + #{{:token {:sources [{:token "t1" :version "t1.hash1"} + {:token "t2" :version "t2.hash0"} + {:token "t3" :version "t3.hash0"}]}}}) + token->token-metadata (token->token-metadata-factory token->token-data)] (is (= idle-timeout-mins (service-id->idle-timeout - service-id->service-description-fn service-id->source-token-entries-fn token->token-hash - token->token-metadata token-defaults service-id))))) + service-id->service-description-fn service-id->references-fn token->token-metadata reference-type->stale-fn + token-defaults (str service-id "s7")))))) (testing "service outdated on every token and fallback and timeout configured on all tokens" (let [stale-timeout-mins 45 token->token-data {"t1" {"cpus" 123 "fallback-period-secs" 300} "t2" {"cmd" "tc" "fallback-period-secs" 600 "stale-timeout-mins" stale-timeout-mins} "t3" {"cmd" "tc" "fallback-period-secs" 900}} - service-id->service-description-fn (fn [in-service-id] - (is (= service-id in-service-id)) - {"idle-timeout-mins" idle-timeout-mins}) - service-id->source-token-entries-fn (fn [in-service-id] - (is (= service-id in-service-id)) - #{[{"token" "t1" "version" "t1.hash0"} - {"token" "t2" "version" "t2.hash0"} - {"token" "t3" "version" "t3.hash0"}]}) - token->token-metadata (token->token-metadata-fn token->token-data)] + service-id->references-fn (fn [in-service-id] + (is (= in-service-id (str service-id "s8"))) + #{{:token {:sources [{:token "t1" :version "t1.hash0"} + {:token "t2" :version "t2.hash0"} + {:token "t3" :version "t3.hash0"}]}}}) + token->token-metadata (token->token-metadata-factory token->token-data)] (is (= (-> 900 t/seconds t/in-minutes (+ stale-timeout-mins)) (service-id->idle-timeout - service-id->service-description-fn service-id->source-token-entries-fn token->token-hash - token->token-metadata token-defaults service-id))))) + service-id->service-description-fn service-id->references-fn token->token-metadata reference-type->stale-fn + token-defaults (str service-id "s8")))))) (testing "service using latest of one partial token among many" (let [stale-timeout-mins 45 @@ -2273,18 +2282,15 @@ "t2" {"cmd" "tc" "fallback-period-secs" 600 "stale-timeout-mins" stale-timeout-mins} "t3" {"cmd" "tc" "fallback-period-secs" 900} "t4" {"fallback-period-secs" 1200 "stale-timeout-mins" (+ stale-timeout-mins 15)}} - service-id->service-description-fn (fn [in-service-id] - (is (= service-id in-service-id)) - {"idle-timeout-mins" idle-timeout-mins}) - service-id->source-token-entries-fn (fn [in-service-id] - (is (= service-id in-service-id)) - #{[{"token" "t1" "version" "t1.hash1"} {"token" "t2" "version" "t2.hash0"}] - [{"token" "t3" "version" "t3.hash0"} {"token" "t4" "version" "t4.hash0"}]}) - token->token-metadata (token->token-metadata-fn token->token-data)] + service-id->references-fn (fn [in-service-id] + (is (= in-service-id (str service-id "s9"))) + #{{:token {:sources [{:token "t1" :version "t1.hash1"} {:token "t2" :version "t2.hash0"}]}} + {:token {:sources [{:token "t3" :version "t3.hash0"} {:token "t4" :version "t4.hash0"}]}}}) + token->token-metadata (token->token-metadata-factory token->token-data)] (is (= idle-timeout-mins (service-id->idle-timeout - service-id->service-description-fn service-id->source-token-entries-fn token->token-hash - token->token-metadata token-defaults service-id))))) + service-id->service-description-fn service-id->references-fn token->token-metadata reference-type->stale-fn + token-defaults (str service-id "s9")))))) (testing "service using latest versions of multiple tokens" (let [stale-timeout-mins 45 @@ -2292,18 +2298,15 @@ "t2" {"cmd" "tc" "fallback-period-secs" 600 "stale-timeout-mins" stale-timeout-mins} "t3" {"cmd" "tc" "fallback-period-secs" 900} "t4" {"fallback-period-secs" 1200 "stale-timeout-mins" (+ stale-timeout-mins 15)}} - service-id->service-description-fn (fn [in-service-id] - (is (= service-id in-service-id)) - {"idle-timeout-mins" idle-timeout-mins}) - service-id->source-token-entries-fn (fn [in-service-id] - (is (= service-id in-service-id)) - #{[{"token" "t1" "version" "t1.hash1"} {"token" "t2" "version" "t2.hash1"}] - [{"token" "t3" "version" "t3.hash1"} {"token" "t4" "version" "t4.hash1"}]}) - token->token-metadata (token->token-metadata-fn token->token-data)] + service-id->references-fn (fn [in-service-id] + (is (= in-service-id (str service-id "s10"))) + #{{:token {:sources [{:token "t1" :version "t1.hash1"} {:token "t2" :version "t2.hash1"}]}} + {:token {:sources [{:token "t3" :version "t3.hash1"} {:token "t4" :version "t4.hash1"}]}}}) + token->token-metadata (token->token-metadata-factory token->token-data)] (is (= idle-timeout-mins (service-id->idle-timeout - service-id->service-description-fn service-id->source-token-entries-fn token->token-hash - token->token-metadata token-defaults service-id))))) + service-id->service-description-fn service-id->references-fn token->token-metadata reference-type->stale-fn + token-defaults (str service-id "s10")))))) (testing "service using latest of one set of token entries" (let [stale-timeout-mins 45 @@ -2311,18 +2314,15 @@ "t2" {"cmd" "tc" "fallback-period-secs" 600 "stale-timeout-mins" stale-timeout-mins} "t3" {"cmd" "tc" "fallback-period-secs" 900} "t4" {"fallback-period-secs" 1200 "stale-timeout-mins" (+ stale-timeout-mins 15)}} - service-id->service-description-fn (fn [in-service-id] - (is (= service-id in-service-id)) - {"idle-timeout-mins" idle-timeout-mins}) - service-id->source-token-entries-fn (fn [in-service-id] - (is (= service-id in-service-id)) - #{[{"token" "t1" "version" "t1.hash1"} {"token" "t2" "version" "t2.hash1"}] - [{"token" "t3" "version" "t3.hash0"} {"token" "t4" "version" "t4.hash0"}]}) - token->token-metadata (token->token-metadata-fn token->token-data)] + service-id->references-fn (fn [in-service-id] + (is (= in-service-id (str service-id "s11"))) + #{{:token {:sources [{:token "t1" :version "t1.hash1"} {:token "t2" :version "t2.hash1"}]}} + {:token {:sources [{:token "t3" :version "t3.hash0"} {:token "t4" :version "t4.hash0"}]}}}) + token->token-metadata (token->token-metadata-factory token->token-data)] (is (= idle-timeout-mins (service-id->idle-timeout - service-id->service-description-fn service-id->source-token-entries-fn token->token-hash - token->token-metadata token-defaults service-id))))) + service-id->service-description-fn service-id->references-fn token->token-metadata reference-type->stale-fn + token-defaults (str service-id "s11")))))) (testing "service outdated and fallback and timeout configured on multiple source tokens" (let [stale-timeout-mins 45 @@ -2330,19 +2330,16 @@ "t2" {"cmd" "tc" "fallback-period-secs" 600 "stale-timeout-mins" stale-timeout-mins} "t3" {"cmd" "tc" "fallback-period-secs" 900} "t4" {"fallback-period-secs" 1200 "stale-timeout-mins" (+ stale-timeout-mins 15)}} - service-id->service-description-fn (fn [in-service-id] - (is (= service-id in-service-id)) - {"idle-timeout-mins" idle-timeout-mins}) - service-id->source-token-entries-fn (fn [in-service-id] - (is (= service-id in-service-id)) - #{[{"token" "t1" "version" "t1.hash0"} {"token" "t2" "version" "t2.hash0"}] - [{"token" "t3" "version" "t3.hash0"} {"token" "t4" "version" "t4.hash0"}]}) - token->token-metadata (token->token-metadata-fn token->token-data)] + service-id->references-fn (fn [in-service-id] + (is (= in-service-id (str service-id "s12"))) + #{{:token {:sources [{:token "t1" :version "t1.hash0"} {:token "t2" :version "t2.hash0"}]}} + {:token {:sources [{:token "t3" :version "t3.hash0"} {:token "t4" :version "t4.hash0"}]}}}) + token->token-metadata (token->token-metadata-factory token->token-data)] (is (= (max (-> 900 t/seconds t/in-minutes (+ stale-timeout-mins)) (-> 1200 t/seconds t/in-minutes (+ stale-timeout-mins 15))) (service-id->idle-timeout - service-id->service-description-fn service-id->source-token-entries-fn token->token-hash - token->token-metadata token-defaults service-id))))))) + service-id->service-description-fn service-id->references-fn token->token-metadata reference-type->stale-fn + token-defaults (str service-id "s12")))))))) (defn- synchronize-fn [lock f] @@ -2384,6 +2381,55 @@ (is (= #{source-tokens-1 source-tokens-3 source-tokens-4} (service-id->source-tokens-entries kv-store service-id))))) +(deftest test-store-reference! + (let [kv-store (kv/->LocalKeyValueStore (atom {})) + service-id "test-service-id" + references-1 {:token {:sources [{:token "token-1" :version "v1"}]}} + references-1-copy {:sources [{:token "token-1" :version "v1"}] + :type :token} + references-2 {:token {:sources [{:token "token-1" :version "v1"} + {:token "token-2" :version "v2"}]}} + references-3 {:token {:sources [{:token "token-3" :version "v3"} + {:token "token-2" :version "v2"}]}} + references-3-copy {:token {:sources [{:token "token-3" :version "v3"} + {:token "token-2" :version "v2"}]}} + references-4 {:token {:sources [{:token "token-2" :version "v2"} + {:token "token-3" :version "v3"}]}}] + + (store-reference! synchronize-fn kv-store service-id references-1) + (is (contains? (service-id->references kv-store service-id) references-1)) + + (store-reference! synchronize-fn kv-store service-id references-1-copy) + (is (contains? (service-id->references kv-store service-id) references-1)) + + (store-reference! synchronize-fn kv-store service-id references-1) + (is (contains? (service-id->references kv-store service-id) references-1)) + + (store-reference! synchronize-fn kv-store service-id references-2) + (is (contains? (service-id->references kv-store service-id) references-1)) + (is (contains? (service-id->references kv-store service-id) references-2)) + + (store-reference! synchronize-fn kv-store service-id references-3) + (is (contains? (service-id->references kv-store service-id) references-1)) + (is (contains? (service-id->references kv-store service-id) references-2)) + (is (contains? (service-id->references kv-store service-id) references-3)) + + (store-reference! synchronize-fn kv-store service-id references-1) + (is (contains? (service-id->references kv-store service-id) references-1)) + (is (contains? (service-id->references kv-store service-id) references-2)) + (is (contains? (service-id->references kv-store service-id) references-3)) + + (store-reference! synchronize-fn kv-store service-id references-3-copy) + (is (contains? (service-id->references kv-store service-id) references-1)) + (is (contains? (service-id->references kv-store service-id) references-2)) + (is (contains? (service-id->references kv-store service-id) references-3)) + + (store-reference! synchronize-fn kv-store service-id references-4) + (is (contains? (service-id->references kv-store service-id) references-1)) + (is (contains? (service-id->references kv-store service-id) references-2)) + (is (contains? (service-id->references kv-store service-id) references-3)) + (is (contains? (service-id->references kv-store service-id) references-4)))) + (deftest test-service-description-builder-state (is {} (state (create-default-service-description-builder {}))))