diff --git a/project.clj b/project.clj index a2ef58bb87..330d3f0768 100644 --- a/project.clj +++ b/project.clj @@ -118,11 +118,21 @@ [ring-basic-authentication] [ring-mock] [grimradical/clj-semver "0.3.0" :exclusions [org.clojure/clojure]] - [beckon]] + [beckon] + [com.cemerick/url "0.1.1"]] ; SERVER-332, enable SSLv3 for unit tests that exercise SSLv3 :jvm-opts ["-Djava.security.properties=./dev-resources/java.security"]} :testutils {:source-paths ^:replace ["test/unit" "test/integration"]} + :test { + ;; NOTE: In core.async version 0.2.382, the default size for + ;; the core.async dispatch thread pool was reduced from + ;; (42 + (2 * num-cpus)) to... eight. The jruby metrics tests + ;; use core.async and need more than eight threads to run + ;; properly; this setting overrides the default value. Without + ;; it the metrics tests will hang. + :jvm-opts ["-Dclojure.core.async.pool-size=50"] + } :ezbake {:dependencies ^:replace [;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; NOTE: we need to explicitly pass in `nil` values diff --git a/src/clj/puppetlabs/enterprise/services/jruby/pe_jruby_metrics_core.clj b/src/clj/puppetlabs/enterprise/services/jruby/pe_jruby_metrics_core.clj new file mode 100644 index 0000000000..ddee604cfe --- /dev/null +++ b/src/clj/puppetlabs/enterprise/services/jruby/pe_jruby_metrics_core.clj @@ -0,0 +1,320 @@ +(ns puppetlabs.enterprise.services.jruby.pe-jruby-metrics-core + (:require [schema.core :as schema] + [puppetlabs.metrics :as metrics] + [puppetlabs.services.jruby-pool-manager.jruby-schemas :as jruby-schemas] + [clojure.tools.logging :as log] + [puppetlabs.enterprise.utils :as utils] + [puppetlabs.trapperkeeper.services.status.status-core :as status-core] + [puppetlabs.comidi :as comidi] + [puppetlabs.i18n.core :refer [trs]]) + (:import (com.codahale.metrics MetricRegistry Gauge Counter Histogram Timer) + (clojure.lang Atom IFn) + (puppetlabs.services.jruby_pool_manager.jruby_schemas JRubyInstance) + (java.util.concurrent TimeUnit))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;;; Schemas + +;; creating some constants for lock state enumeration; using Strings +;; rather than keywords since these need to be serialized over the wire +(def jruby-pool-lock-not-in-use (str :not-in-use)) +(def jruby-pool-lock-requested (str :requested)) +(def jruby-pool-lock-acquired (str :acquired)) + +(def JRubyPoolLockState + (schema/enum jruby-pool-lock-not-in-use + jruby-pool-lock-requested + jruby-pool-lock-acquired)) + +(def JRubyLockEventType + (schema/enum :lock-requested :lock-acquired :lock-released)) + +(def JRubyPoolLockStatus + {:current-state JRubyPoolLockState + :last-change-time schema/Str}) + +(def JRubyPoolLockRequestReason + {:type (schema/eq :master-code-sync) + :lock-request-id schema/Str}) + +(def JRubyMetrics + {:num-jrubies Gauge + :requested-count Counter + :requested-jrubies-histo Histogram + :borrow-count Counter + :borrow-timeout-count Counter + :borrow-retry-count Counter + :return-count Counter + :num-free-jrubies Gauge + :free-jrubies-histo Histogram + :borrow-timer Timer + :wait-timer Timer + :requested-instances Atom + :borrowed-instances Atom + :lock-wait-timer Timer + :lock-held-timer Timer + :lock-requests Atom + :lock-status Atom + :sampler-job-id schema/Any}) + +(def TimestampedReason + {:time Long + :reason jruby-schemas/JRubyEventReason}) + +(def HttpRequestReasonInfo + {:request {:request-method comidi/RequestMethod + :route-id schema/Str + :uri schema/Str} + schema/Any schema/Any}) + +(def RequestReasonInfo + (schema/conditional + #(and (map? %) (contains? % :request)) HttpRequestReasonInfo + :else jruby-schemas/JRubyEventReason)) + +(def TimestampedReasonWithRequestInfo + (assoc TimestampedReason :reason RequestReasonInfo)) + +(def InstanceRequestInfo + {:duration-millis schema/Num + :reason RequestReasonInfo + :time schema/Num}) + +(def JRubyMetricsStatusV1 + {(schema/optional-key :experimental) + {:jruby-pool-lock-status JRubyPoolLockStatus + :metrics {:num-jrubies schema/Int + :num-free-jrubies schema/Int + :requested-count schema/Int + :borrow-count schema/Int + :borrow-timeout-count schema/Int + :borrow-retry-count schema/Int + :return-count schema/Int + :average-requested-jrubies schema/Num + :average-free-jrubies schema/Num + :average-borrow-time schema/Num + :average-wait-time schema/Num + :requested-instances [InstanceRequestInfo] + :borrowed-instances [InstanceRequestInfo] + :num-pool-locks schema/Int + :average-lock-wait-time schema/Num + :average-lock-held-time schema/Num}}}) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;;; Private + +(schema/defn timestamped-reason :- TimestampedReason + [reason :- jruby-schemas/JRubyEventReason] + {:time (System/currentTimeMillis) + :reason reason}) + +(schema/defn add-duration-to-instance :- InstanceRequestInfo + [{:keys [time] :as instance} :- TimestampedReasonWithRequestInfo] + (assoc instance + :duration-millis + (- (System/currentTimeMillis) time))) + +(schema/defn instance-request-info + :- TimestampedReasonWithRequestInfo + [instance :- TimestampedReason] + (if-let [request (get-in instance [:reason :request])] + (assoc-in instance [:reason :request] + {:uri (:uri request) + :request-method (:request-method request) + :route-id (get-in request [:route-info :route-id])}) + instance)) + +(schema/defn ^:always-validate requested-instances-info :- [InstanceRequestInfo] + [instances :- [TimestampedReason]] + (map + (comp add-duration-to-instance instance-request-info) + instances)) + +(schema/defn track-successful-borrow-instance! + [{:keys [borrow-count borrowed-instances]} :- JRubyMetrics + jruby-instance :- JRubyInstance + reason :- jruby-schemas/JRubyEventReason] + (.inc borrow-count) + (let [id (:id jruby-instance)] + (when (get @borrowed-instances id) + (log/warn (trs "JRuby instance ''{0}'' borrowed, but it appears to have already been in use!" id))) + (swap! borrowed-instances assoc id (timestamped-reason reason)))) + +(schema/defn track-request-instance! + [{:keys [requested-count requested-instances]} :- JRubyMetrics + {:keys [reason] :as event} :- jruby-schemas/JRubyRequestedEvent] + (.inc requested-count) + (swap! requested-instances assoc event (timestamped-reason reason))) + +(schema/defn track-borrow-instance! + [{:keys [borrow-timeout-count borrow-retry-count requested-instances wait-timer] :as metrics} :- JRubyMetrics + {jruby-instance :instance requested-event :requested-event reason :reason :as event} :- jruby-schemas/JRubyBorrowedEvent] + (condp (fn [pred instance] (pred instance)) jruby-instance + nil? (.inc borrow-timeout-count) + jruby-schemas/shutdown-poison-pill? (log/warn (trs "Not tracking jruby instance borrowed because server is shutting down")) + jruby-schemas/jruby-instance? (track-successful-borrow-instance! metrics jruby-instance reason)) + (if-let [ta (get @requested-instances requested-event)] + (do + (.update wait-timer + (- (System/currentTimeMillis) (:time ta)) + (TimeUnit/MILLISECONDS)) + (swap! requested-instances dissoc requested-event)) + (log/warn (trs "Unable to find request event for borrowed JRuby instance: {0}" event)))) + +(schema/defn track-return-instance! + [{:keys [return-count borrowed-instances borrow-timer]} :- JRubyMetrics + {jruby-instance :instance} :- jruby-schemas/JRubyReturnedEvent] + (.inc return-count) + (when (jruby-schemas/jruby-instance? jruby-instance) + (let [id (:id jruby-instance)] + (if-let [ta (get @borrowed-instances id)] + (do + (.update borrow-timer + (- (System/currentTimeMillis) (:time ta)) + (TimeUnit/MILLISECONDS)) + (swap! borrowed-instances dissoc id)) + (log/warn (trs "JRuby instance ''{0}'' returned, but no record of when it was borrowed!" id)))))) + +(schema/defn ^:always-validate update-pool-lock-status! :- JRubyPoolLockStatus + [jruby-pool-lock-status :- Atom + jruby-lock-event-type :- JRubyLockEventType] + (swap! jruby-pool-lock-status + assoc + :current-state (case jruby-lock-event-type + :lock-requested jruby-pool-lock-requested + :lock-acquired jruby-pool-lock-acquired + :lock-released jruby-pool-lock-not-in-use) + :last-change-time (utils/timestamp))) + +(schema/defn ^:always-validate track-lock-requested! + [{:keys [lock-requests lock-status]} :- JRubyMetrics + {:keys [lock-request-id]} :- JRubyPoolLockRequestReason] + (swap! lock-requests assoc + lock-request-id + {:state :requested + :time (System/currentTimeMillis)}) + (update-pool-lock-status! lock-status :lock-requested)) + +(schema/defn ^:always-validate track-lock-acquired! + [{:keys [lock-requests lock-status lock-wait-timer]} :- JRubyMetrics + {:keys [lock-request-id]} :- JRubyPoolLockRequestReason] + (if-let [lock-request (get @lock-requests lock-request-id)] + (do + (.update lock-wait-timer + (- (System/currentTimeMillis) (:time lock-request)) + (TimeUnit/MILLISECONDS)) + (swap! lock-requests assoc + lock-request-id + {:state :acquired + :time (System/currentTimeMillis)})) + (log/warn (trs "Lock request ''{0}'' acquired, but no record of when it was requested!" + lock-request-id))) + (update-pool-lock-status! lock-status :lock-acquired)) + +(schema/defn ^:always-validate track-lock-released! + [{:keys [lock-requests lock-status lock-held-timer]} :- JRubyMetrics + {:keys [lock-request-id]} :- JRubyPoolLockRequestReason] + (if-let [lock-request (get @lock-requests lock-request-id)] + (do + (.update lock-held-timer + (- (System/currentTimeMillis) (:time lock-request)) + (TimeUnit/MILLISECONDS)) + (swap! lock-requests dissoc lock-request-id)) + (log/warn (trs "Lock request ''{0}'' released, but no record of when it was acquired!" + lock-request-id))) + (update-pool-lock-status! lock-status :lock-released)) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;;; Public + +(schema/defn ^:always-validate init-metrics :- JRubyMetrics + [hostname :- schema/Str + max-active-instances :- schema/Int + free-instances-fn :- IFn + registry :- MetricRegistry] + {:num-jrubies (metrics/register registry (metrics/host-metric-name hostname "jruby.num-jrubies") + (metrics/gauge max-active-instances)) + :requested-count (.counter registry (metrics/host-metric-name hostname "jruby.request-count")) + ;; See the comments on `pe-jruby-metrics-service/schedule-metrics-sampler!` for + ;; an explanation of how we're using these histograms. + :requested-jrubies-histo (.histogram registry (metrics/host-metric-name hostname "jruby.requested-jrubies-histo")) + :borrow-count (.counter registry (metrics/host-metric-name hostname "jruby.borrow-count")) + :borrow-timeout-count (.counter registry (metrics/host-metric-name hostname "jruby.borrow-timeout-count")) + :borrow-retry-count (.counter registry (metrics/host-metric-name hostname "jruby.borrow-retry-count")) + :return-count (.counter registry (metrics/host-metric-name hostname "jruby.return-count")) + :num-free-jrubies (metrics/register registry (metrics/host-metric-name hostname "jruby.num-free-jrubies") + (proxy [Gauge] [] + (getValue [] + (free-instances-fn)))) + ;; See the comments on `pe-jruby-metrics-service/schedule-metrics-sampler!` for + ;; an explanation of how we're using these histograms. + :free-jrubies-histo (.histogram registry (metrics/host-metric-name hostname "jruby.free-jrubies-histo")) + :borrow-timer (.timer registry (metrics/host-metric-name hostname "jruby.borrow-timer")) + :wait-timer (.timer registry (metrics/host-metric-name hostname "jruby.wait-timer")) + :requested-instances (atom {}) + :borrowed-instances (atom {}) + :lock-wait-timer (.timer registry (metrics/host-metric-name hostname "jruby.lock-wait-timer")) + :lock-held-timer (.timer registry (metrics/host-metric-name hostname "jruby.lock-held-timer")) + :lock-requests (atom {}) + :lock-status (atom {:current-state jruby-pool-lock-not-in-use + :last-change-time (utils/timestamp)}) + :sampler-job-id nil}) + +(schema/defn track-free-instance-count! + [metrics :- JRubyMetrics + free-instance-count :- schema/Int] + (.update (:free-jrubies-histo metrics) free-instance-count)) + +(schema/defn track-requested-instance-count! + [{:keys [requested-jrubies-histo requested-instances]} :- JRubyMetrics] + (.update requested-jrubies-histo (count @requested-instances))) + +(schema/defn jruby-event-callback + [metrics :- JRubyMetrics + event :- jruby-schemas/JRubyEvent] + (case (:type event) + :instance-requested (track-request-instance! metrics event) + :instance-borrowed (track-borrow-instance! metrics event) + :instance-returned (track-return-instance! metrics event) + :lock-requested (track-lock-requested! metrics (:reason event)) + :lock-acquired (track-lock-acquired! metrics (:reason event)) + :lock-released (track-lock-released! metrics (:reason event)) + + (throw (IllegalStateException. (trs "Unrecognized jruby event type: {0}" (:type event)))))) + +(schema/defn ^:always-validate v1-status :- status-core/StatusCallbackResponse + [metrics :- JRubyMetrics + level :- status-core/ServiceStatusDetailLevel] + (let [{:keys [num-jrubies requested-count requested-jrubies-histo + borrow-count borrow-timeout-count borrow-retry-count + return-count free-jrubies-histo num-free-jrubies borrow-timer + wait-timer requested-instances borrowed-instances lock-status + lock-wait-timer lock-held-timer]} metrics + level>= (partial status-core/compare-levels >= level)] + {:state :running + :status (cond-> + ;; no status info at ':critical' level + {} + ;; no extra status at ':info' level yet + (level>= :info) identity + (level>= :debug) (assoc + :experimental + {:jruby-pool-lock-status @lock-status + :metrics + {:num-jrubies (.getValue num-jrubies) + :num-free-jrubies (.getValue num-free-jrubies) + :requested-count (.getCount requested-count) + :borrow-count (.getCount borrow-count) + :borrow-timeout-count (.getCount borrow-timeout-count) + :borrow-retry-count (.getCount borrow-retry-count) + :return-count (.getCount return-count) + :average-requested-jrubies (metrics/mean requested-jrubies-histo) + :average-free-jrubies (metrics/mean free-jrubies-histo) + :average-borrow-time (metrics/mean-millis borrow-timer) + :average-wait-time (metrics/mean-millis wait-timer) + :requested-instances (requested-instances-info (vals @requested-instances)) + :borrowed-instances (requested-instances-info (vals @borrowed-instances)) + :num-pool-locks (.getCount lock-held-timer) + :average-lock-wait-time (metrics/mean-millis lock-wait-timer) + :average-lock-held-time (metrics/mean-millis lock-held-timer) + }}))})) diff --git a/src/clj/puppetlabs/enterprise/services/jruby/pe_jruby_metrics_service.clj b/src/clj/puppetlabs/enterprise/services/jruby/pe_jruby_metrics_service.clj new file mode 100644 index 0000000000..7552712a61 --- /dev/null +++ b/src/clj/puppetlabs/enterprise/services/jruby/pe_jruby_metrics_service.clj @@ -0,0 +1,79 @@ +(ns puppetlabs.enterprise.services.jruby.pe-jruby-metrics-service + (:require [puppetlabs.services.protocols.jruby-puppet :as jruby-protocol] + [puppetlabs.trapperkeeper.core :as tk] + [puppetlabs.trapperkeeper.services :as tk-services] + [puppetlabs.enterprise.services.jruby.pe-jruby-metrics-core :as pe-jruby-metrics-core] + [clojure.tools.logging :as log] + [puppetlabs.trapperkeeper.services.status.status-core :as status-core] + [puppetlabs.enterprise.services.protocols.jruby-metrics :as jruby-metrics-protocol] + [puppetlabs.i18n.core :refer [trs]])) + +(defn sample-jruby-metrics! + [jruby-service metrics] + (log/trace (trs "Sampling JRuby metrics")) + (pe-jruby-metrics-core/track-free-instance-count! + metrics + (jruby-protocol/free-instance-count jruby-service)) + (pe-jruby-metrics-core/track-requested-instance-count! metrics)) + +;; This function schedules some metrics sampling to happen on a background thread. +;; The reason it is necessary to do this is because the metrics histograms are +;; sample-based, as opposed to time-based, and we are interested in keeping a +;; time-based average for certain metrics. e.g. if we only updated the +;; "free-instance-count" average when an instance was borrowed or returned, then, +;; if there was a period where there was no load on the server, the histogram +;; would not be getting any updates at all and the average would appear to +;; remain flat, when actually it should be changing (increasing, presumably, +;; because there should be plenty of free jruby instances available in the pool). +(defn schedule-metrics-sampler! + [jruby-service metrics interspaced] + (interspaced 5000 (partial sample-jruby-metrics! jruby-service metrics))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;;; Public + +(tk/defservice pe-jruby-metrics-service + jruby-metrics-protocol/JRubyMetricsService + [[:ConfigService get-in-config] + [:JRubyPuppetService register-event-handler] + [:MetricsService get-metrics-registry] + [:SchedulerService interspaced stop-job] + [:StatusService register-status]] + (init + [this context] + (let [jruby-service (tk-services/get-service this :JRubyPuppetService) + metrics-server-id (get-in-config [:metrics :server-id]) + max-active-instances (-> (tk-services/get-service this :JRubyPuppetService) + tk-services/service-context + (get-in [:pool-context :config :max-active-instances])) + metrics (pe-jruby-metrics-core/init-metrics + metrics-server-id + max-active-instances + (fn [] (jruby-protocol/free-instance-count jruby-service)) + (get-metrics-registry :puppetserver))] + (register-status + "pe-jruby-metrics" + (status-core/get-artifact-version "puppetlabs" "puppetserver") + 1 + (partial pe-jruby-metrics-core/v1-status metrics)) + (assoc context :metrics metrics))) + + (start + [this context] + (let [jruby-service (tk-services/get-service this :JRubyPuppetService) + {:keys [metrics]} (tk-services/service-context this) + sampler-job-id (schedule-metrics-sampler! jruby-service metrics interspaced)] + (register-event-handler (partial pe-jruby-metrics-core/jruby-event-callback + metrics)) + (assoc-in context [:metrics :sampler-job-id] sampler-job-id))) + + (get-metrics + [this] + (:metrics (tk-services/service-context this))) + + (stop + [this context] + (log/info (trs "PE JRuby Metrics Service: stopping metrics sampler job")) + (stop-job (get-in context [:metrics :sampler-job-id])) + (log/info (trs "PE JRuby Metrics Service: stopped metrics sampler job")) + context)) diff --git a/src/clj/puppetlabs/enterprise/services/protocols/jruby_metrics.clj b/src/clj/puppetlabs/enterprise/services/protocols/jruby_metrics.clj new file mode 100644 index 0000000000..6f50fadebb --- /dev/null +++ b/src/clj/puppetlabs/enterprise/services/protocols/jruby_metrics.clj @@ -0,0 +1,5 @@ +(ns puppetlabs.enterprise.services.protocols.jruby-metrics) + +(defprotocol JRubyMetricsService + (get-metrics [this] + "Get the current map of JRuby-related metrics")) \ No newline at end of file diff --git a/src/clj/puppetlabs/enterprise/utils.clj b/src/clj/puppetlabs/enterprise/utils.clj new file mode 100644 index 0000000000..1a702d872a --- /dev/null +++ b/src/clj/puppetlabs/enterprise/utils.clj @@ -0,0 +1,18 @@ +(ns puppetlabs.enterprise.utils + (:require [clj-time.format :as time-format] + [clj-time.core :as time])) + +(def datetime-formatter + "The date/time formatter used to produce timestamps using clj-time. + This matches the format used by PuppetDB." + (time-format/formatters :date-time)) + +(defn format-date-time + "Given a DateTime object, return a human-readable, formatted string." + [date-time] + (time-format/unparse datetime-formatter date-time)) + +(defn timestamp + "Returns a nicely-formatted string of the current date/time." + [] + (format-date-time (time/now))) diff --git a/test/integration/puppetlabs/enterprise/services/jruby/pe_jruby_metrics_service_test.clj b/test/integration/puppetlabs/enterprise/services/jruby/pe_jruby_metrics_service_test.clj new file mode 100644 index 0000000000..61bb8d8f1b --- /dev/null +++ b/test/integration/puppetlabs/enterprise/services/jruby/pe_jruby_metrics_service_test.clj @@ -0,0 +1,808 @@ +(ns puppetlabs.enterprise.services.jruby.pe-jruby-metrics-service-test + (:require [clojure.test :refer :all] + [puppetlabs.trapperkeeper.core :as tk] + [puppetlabs.trapperkeeper.app :as tk-app] + [puppetlabs.trapperkeeper.services :as tk-services] + [puppetlabs.trapperkeeper.testutils.bootstrap :as bootstrap] + [puppetlabs.services.jruby.jruby-puppet-testutils :as jruby-testutils] + [puppetlabs.trapperkeeper.services.webserver.jetty9-service :as jetty9-service] + [puppetlabs.services.jruby.jruby-puppet-service :as jruby-service] + [puppetlabs.services.jruby-pool-manager.impl.jruby-internal :as jruby-internal] + [puppetlabs.services.puppet-profiler.puppet-profiler-service :as profiler] + [puppetlabs.enterprise.services.jruby.pe-jruby-metrics-service :as pe-jruby-metrics-service] + [puppetlabs.trapperkeeper.services.scheduler.scheduler-service :as scheduler-service] + [puppetlabs.trapperkeeper.services.metrics.metrics-service :as metrics-service] + [puppetlabs.services.request-handler.request-handler-service :as request-handler-service] + [puppetlabs.services.versioned-code-service.versioned-code-service :as versioned-code-service] + [puppetlabs.services.jruby-pool-manager.jruby-pool-manager-service :as jruby-pool-manager-service] + [puppetlabs.services.protocols.jruby-puppet :as jruby-protocol] + [puppetlabs.enterprise.services.protocols.jruby-metrics :as jruby-metrics-protocol] + [puppetlabs.services.protocols.puppet-server-config :as ps-config-protocol] + [puppetlabs.trapperkeeper.services.status.status-service :as status-service] + [puppetlabs.trapperkeeper.services.webrouting.webrouting-service :as webrouting-service] + [schema.test :as schema-test] + [puppetlabs.metrics :as metrics] + [puppetlabs.http.client.sync :as http-client] + [puppetlabs.comidi :as comidi] + [cemerick.url :as url] + [puppetlabs.enterprise.testutils.task-coordinator :as coordinator] + [puppetlabs.enterprise.services.jruby.pe-jruby-metrics-core :as pe-jruby-metrics-core] + [clojure.tools.logging :as log] + [schema.core :as schema]) + (:import (com.puppetlabs.puppetserver JRubyPuppetResponse JRubyPuppet) + (clojure.lang IFn Atom) + (java.util.concurrent TimeUnit))) + +(use-fixtures :once schema-test/validate-schemas) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;;; Config / constants + +(def default-test-config + (assoc (jruby-testutils/jruby-puppet-tk-config + (jruby-testutils/jruby-puppet-config {:max-active-instances 2})) + :webserver {:port 8140 + :host "localhost"} + :web-router-service {:puppetlabs.trapperkeeper.services.status.status-service/status-service "/status"} + :metrics {:server-id "localhost"} + :puppetserver {:ssl-client-header "X_ssl-client-header-FOO" + :ssl-client-verify-header "X_ssl-client-verify-header-FOO"})) + +(def request-phases [:http-handler-invoked :borrowed-jruby :returning-jruby :request-complete]) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;;; Basic utility fns + +(defn http-get + [uri] + (http-client/get uri {:as :text})) + +(defn timestamp-after? + [start-time event-time] + (let [now (System/currentTimeMillis)] + (if (and (<= start-time event-time) + (>= now event-time)) + true + (throw (IllegalStateException. + (format + "Timestamp seems wrong: '%s'; expected it to be between '%s' and '%s'" + event-time + start-time + now)))))) + +(defn async-request + ([coordinator request-id uri] + (async-request coordinator request-id uri nil)) + ([coordinator request-id uri phase] + ;; add the request id into the url as a query param + ;; for use with the coordinator + (let [orig-url (url/url (str "http://localhost:8140" uri)) + query (assoc (:query orig-url) "request-id" request-id) + url (assoc orig-url :query query) + ;; our request function to pass to the coordinator + ;; is just a simple HTTP GET. + req-fn (fn [] (http-get (str url)))] + (coordinator/initialize-task coordinator request-id req-fn phase)))) + +(defn sync-request + [coordinator request-id uri] + (async-request coordinator request-id uri :request-complete) + (coordinator/final-result coordinator request-id)) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;;; Test services/mocks + +;; The request handler service depends on the PuppetServerConfigService, +;; but we don't actually need any of the puppet config values, so here +;; we just create a mock services that passes through to the normal +;; TK config service to make the tests run faster. +(tk/defservice mock-puppetserver-config-service + ps-config-protocol/PuppetServerConfigService + [[:ConfigService get-config get-in-config]] + (get-config [this] (get-config)) + (get-in-config [this ks] (get-in-config ks))) + +(schema/defn ^:always-validate comidi-handler-service :- (schema/protocol tk-services/ServiceDefinition) + [coordinator :- (schema/protocol coordinator/TaskCoordinator)] + (tk/service + [[:WebserverService add-ring-handler] + [:RequestHandlerService handle-request]] + (start [this context] + (let [app-request-handler (fn [request] + ;; we pass the request-id as a query-param, so that + ;; the comidi handler can interact with the + ;; test request coordinator + (let [request-id (-> (:query-string request) + url/query->map + (get "request-id"))] + ;; notify the coordinator that we've begun to handle the request + (coordinator/notify-task-progress coordinator request-id :http-handler-invoked) + ;; delegate to the jruby request handler + (let [resp (handle-request request)] + ;; notify the coordinator that the request is complete + (coordinator/notify-task-progress coordinator request-id :request-complete) + resp))) + routes (comidi/context "/foo" + (comidi/routes + (comidi/GET ["/bar/" :bar] request + (app-request-handler request)) + (comidi/GET ["/baz/" :baz] request + (app-request-handler request)))) + handler (-> routes + comidi/routes->handler + (comidi/wrap-with-route-metadata routes))] + (add-ring-handler handler "/foo")) + context))) + +;; here we're creating a custom jruby instance so that interacts with the +;; Coordinator, so that we have fine-grained control over how we handle the +;; incoming requests +(schema/defn ^:always-validate coordinated-mock-jruby-instance + [coordinator :- (schema/protocol coordinator/TaskCoordinator)] + (fn [] (reify JRubyPuppet + (handleRequest [this request] + ;; read the request-id from the query params so that we can + ;; interact with the test coordinator + (let [request-id (get-in request ["params" "request-id"])] + ;; notify the coordinator that we've borrowed a jruby instance + (coordinator/notify-task-progress coordinator request-id :borrowed-jruby) + ;; if the request has a 'sleep' query param, sleep + (when-let [sleep (get-in request ["params" "sleep"])] + (log/debugf "JRuby handler: request '%s' sleeping '%s'" request-id sleep) + (Thread/sleep (Long/parseLong sleep))) + ;; notify coordinator that we're about to return the jruby to the pool + (coordinator/notify-task-progress coordinator request-id :returning-jruby) + (JRubyPuppetResponse. (int 200) "hi!" "text/plain" "9.0.0.0"))) + (getSetting [_ _] + (Object.)) + (terminate [_] + (log/info "Terminating Master"))))) + +(def TestEnvironment + {:metrics pe-jruby-metrics-core/JRubyMetrics + :sample-metrics! IFn + :sampling-scheduled? Atom + :coordinator (schema/protocol coordinator/TaskCoordinator) + :expected-metrics-values IFn + :update-expected-values IFn + :current-metrics-values IFn + :jruby-service (schema/protocol jruby-protocol/JRubyPuppetService)}) + +(schema/defn ^:always-validate build-test-env :- TestEnvironment + [sampling-scheduled? :- Atom + coordinator :- (schema/protocol coordinator/TaskCoordinator) + app] + (let [jruby-metrics-service (tk-app/get-service app :JRubyMetricsService) + {:keys [num-jrubies num-free-jrubies requested-count borrow-count + borrow-timeout-count borrow-retry-count return-count + borrowed-instances requested-instances] + :as metrics} (jruby-metrics-protocol/get-metrics + jruby-metrics-service) + jruby-service (tk-app/get-service app :JRubyPuppetService) + sample-metrics! #(pe-jruby-metrics-service/sample-jruby-metrics! jruby-service metrics) + + ;; an atom to track the expected values for the basic metrics, so + ;; that we don't have to keep track of the latest counts by hand + ;; in all of the tests + expected-values-atom (atom {:num-jrubies 2 + :num-free-jrubies 2 + :requested-count 0 + :borrow-count 0 + :borrow-timeout-count 0 + :borrow-retry-count 0 + :return-count 0 + :current-requested-instances 0 + :current-borrowed-instances 0}) + + ;; convenience functions to use for comparing current metrics + ;; values with expected values. + current-metrics-values (fn [] {:num-jrubies (.getValue num-jrubies) + :num-free-jrubies (.getValue num-free-jrubies) + :requested-count (.getCount requested-count) + :borrow-count (.getCount borrow-count) + :borrow-timeout-count (.getCount borrow-timeout-count) + :borrow-retry-count (.getCount borrow-retry-count) + :return-count (.getCount return-count) + :current-requested-instances (count @requested-instances) + :current-borrowed-instances (count @borrowed-instances)}) + update-expected-values (fn [deltas] + (reset! expected-values-atom + (reduce (fn [acc [k delta]] + (update-in acc [k] + delta)) + @expected-values-atom + deltas))) + expected-metrics-values (fn [] @expected-values-atom)] + {:metrics metrics + :sample-metrics! sample-metrics! + :sampling-scheduled? sampling-scheduled? + :coordinator coordinator + :expected-metrics-values expected-metrics-values + :update-expected-values update-expected-values + :current-metrics-values current-metrics-values + :jruby-service jruby-service})) + +(defmacro with-metrics-test-env + [test-env-var-name config & body] + `(let [coordinator# (coordinator/task-coordinator request-phases) + ;; we will stub out the call that would normal schedule a recurring + ;; job to take samples of the metrics, so that we have control over + ;; when the sampling occurs. Otherwise these tests would be very racy. + sampling-scheduled?# (atom false) + mock-schedule-metrics-sampler# (fn [_# _# _#] (reset! sampling-scheduled?# true))] + (with-redefs [puppetlabs.enterprise.services.jruby.pe-jruby-metrics-service/schedule-metrics-sampler! mock-schedule-metrics-sampler# + jruby-internal/create-pool-instance! (partial jruby-testutils/create-mock-pool-instance + (coordinated-mock-jruby-instance coordinator#))] + (bootstrap/with-app-with-config + app# + [jetty9-service/jetty9-service + jruby-service/jruby-puppet-pooled-service + profiler/puppet-profiler-service + pe-jruby-metrics-service/pe-jruby-metrics-service + scheduler-service/scheduler-service + metrics-service/metrics-service + request-handler-service/request-handler-service + status-service/status-service + versioned-code-service/versioned-code-service + webrouting-service/webrouting-service + (comidi-handler-service coordinator#) + mock-puppetserver-config-service + jruby-pool-manager-service/jruby-pool-manager-service] + ~config + + (let [~test-env-var-name (build-test-env sampling-scheduled?# coordinator# app#)] + ~@body))))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;;; Tests + +(deftest ^:metrics basic-metrics-test + (with-metrics-test-env test-env default-test-config + (let [{:keys [num-jrubies num-free-jrubies requested-count + requested-jrubies-histo borrow-count borrow-timeout-count + borrow-retry-count return-count free-jrubies-histo + borrow-timer wait-timer requested-instances + borrowed-instances]} (:metrics test-env) + {:keys [update-expected-values current-metrics-values + expected-metrics-values coordinator]} test-env] + (testing "initial metrics values" + (is (= 2 (.getValue num-jrubies))) + (is (= 2 (.getValue num-free-jrubies))) + (is (= 0 (.getCount requested-count))) + (is (= 0.0 (metrics/mean requested-jrubies-histo))) + (is (= 0 (.getCount borrow-count))) + (is (= 0 (.getCount borrow-timeout-count))) + (is (= 0 (.getCount borrow-retry-count))) + (is (= 0 (.getCount return-count))) + (is (= 0.0 (metrics/mean free-jrubies-histo))) + (is (= 0 (metrics/mean-millis borrow-timer))) + (is (= 0 (metrics/mean-millis wait-timer))) + (is (= 0 (count @requested-instances))) + (is (= 0 (count @borrowed-instances)))) + + (testing "basic metrics values: happy-path" + (sync-request coordinator 1 "/foo/bar/req1") + (sync-request coordinator 2 "/foo/baz/req2") + + (update-expected-values {:requested-count 2 + :borrow-count 2 + :return-count 2}) + + (is (= (expected-metrics-values) (current-metrics-values))))))) + +(deftest ^:metrics borrowed-instances-test + (with-metrics-test-env test-env default-test-config + (let [{:keys [coordinator update-expected-values expected-metrics-values + current-metrics-values jruby-service]} test-env + {:keys [borrowed-instances]} (:metrics test-env) + start-time (System/currentTimeMillis)] + (testing "introspect reasons for borrowed instances" + (async-request coordinator 1 "/foo/bar/async1" :returning-jruby) + (async-request coordinator 2 "/foo/baz/async2" :returning-jruby) + + ;; when we get here we should have two active requests blocked, so + ;; we can check out some of those metrics. + (update-expected-values {:num-free-jrubies -2 + :requested-count 2 + :borrow-count 2 + :current-borrowed-instances 2}) + (is (= (expected-metrics-values) (current-metrics-values))) + + ;; let's take a peek at the info about the borrowed instances. + ;; the keys for `borrowed-instances` are the jruby instance ids. + (is (= #{1 2} (set (keys @borrowed-instances)))) + (is (= #{"/foo/bar/async1" "/foo/baz/async2"} + (set (map #(get-in % [:reason :request :uri]) (vals @borrowed-instances))))) + (let [routes (set (map #(get-in % [:reason :request :route-info :route-id]) (vals @borrowed-instances)))] + (is (= #{"foo-baz-:baz" "foo-bar-:bar"} routes))) + (doseq [borrowed (vals @borrowed-instances)] + (is (= :get (get-in borrowed [:reason :request :request-method]))) + (is (timestamp-after? start-time (:time borrowed)))) + + ;; unblock both of our requests + (coordinator/final-result coordinator 1) + (coordinator/final-result coordinator 2) + + ;; validate that the metrics are updated after the unblocks + (update-expected-values {:num-free-jrubies 2 + :return-count 2 + :current-borrowed-instances -2}) + (is (= (expected-metrics-values) (current-metrics-values)))) + + (testing "introspect borrows that did not come from an http request" + ;; borrow an instance manually (rather than via an HTTP request) + (let [instance (jruby-testutils/borrow-instance jruby-service :metrics-manual-borrow-test)] + (update-expected-values {:num-free-jrubies -1 + :requested-count 1 + :borrow-count 1 + :current-borrowed-instances 1}) + (is (= (expected-metrics-values) (current-metrics-values))) + + ;; validate that the metrics show info about the borrowed instance + (let [borrowed (first (vals @borrowed-instances))] + (is (timestamp-after? start-time (:time borrowed))) + (is (= :metrics-manual-borrow-test (:reason borrowed)))) + + (jruby-testutils/return-instance jruby-service instance :metrics-manual-borrow-test)) + + (update-expected-values {:num-free-jrubies 1 + :return-count 1 + :current-borrowed-instances -1}) + (is (= (expected-metrics-values) (current-metrics-values))))))) + +(deftest ^:metrics requested-instances-test + (with-metrics-test-env test-env default-test-config + (let [{:keys [coordinator update-expected-values expected-metrics-values + current-metrics-values jruby-service]} test-env + {:keys [requested-count requested-instances borrowed-instances]} (:metrics test-env) + start-time (System/currentTimeMillis)] + (testing "introspect reasons for requested instances" + ;; this test is about validating that requests that are stuck + ;; waiting for a jruby instance from an empty pool show up in + ;; the metrics. + ;; + ;; first we'll queue up a few requests to consume the 2 jruby instances + (async-request coordinator 1 "/foo/bar/async1" :returning-jruby) + (async-request coordinator 2 "/foo/baz/async2" :returning-jruby) + + (update-expected-values {:num-free-jrubies -2 + :requested-count 2 + :borrow-count 2 + :current-borrowed-instances 2}) + (is (= (expected-metrics-values) (current-metrics-values))) + + ;; now we'll create a few more requests and tell them they may + ;; try to proceed to the :borrowed-jruby phase; they won't + ;; be able to get there, because the pool is empty, so we won't + ;; block waiting for them. + (async-request coordinator 3 "/foo/bar/async3") + (coordinator/unblock-task-to coordinator 3 :borrowed-jruby) + (async-request coordinator 4 "/foo/baz/async4") + (coordinator/unblock-task-to coordinator 4 :borrowed-jruby) + + ;; wait for requests 3 and 4 to progress to their attempts to + ;; borrow jrubies + (let [expected-requested-count (+ 2 (:requested-count (expected-metrics-values)))] + (while (> expected-requested-count (.getCount requested-count)))) + + (update-expected-values {:requested-count 2 + :current-requested-instances 2}) + (is (= (expected-metrics-values) (current-metrics-values))) + + ;; now, make sure we can see info about requests 3 and 4 in the + ;; metrics + (let [req-instances (vals @requested-instances)] + (is (= #{"/foo/bar/async3" "/foo/baz/async4"} + (set (map #(get-in % [:reason :request :uri]) req-instances)))) + (let [routes (set (map #(get-in % [:reason :request :route-info :route-id]) req-instances))] + (is (= #{"foo-baz-:baz" "foo-bar-:bar"} routes))) + (doseq [requested req-instances] + (is (= :get (get-in requested [:reason :request :request-method]))) + (is (timestamp-after? start-time (:time requested))))) + + ;; finish the first two requests + (coordinator/final-result coordinator 1) + (coordinator/final-result coordinator 2) + + ;; make sure requests 3 and 4 have successfully borrowed + (coordinator/wait-for-task coordinator 3 :borrowed-jruby) + (coordinator/wait-for-task coordinator 4 :borrowed-jruby) + + (update-expected-values {:return-count 2 + :borrow-count 2 + :current-requested-instances -2}) + (is (= (expected-metrics-values) (current-metrics-values))) + + ;; finish 3 and 4 + (coordinator/final-result coordinator 3) + (coordinator/final-result coordinator 4) + + (update-expected-values {:num-free-jrubies 2 + :return-count 2 + :current-borrowed-instances -2}) + (is (= (expected-metrics-values) (current-metrics-values)))) + + + (testing "introspect requested instances that did not come from an http request" + ;; this test is about validating that borrow attempts, which + ;; did not originate from an HTTP request, but are stuck + ;; waiting for a jruby instance from an empty pool show up in + ;; the metrics. + + ;; first we'll queue up a few requests to consume the 2 jruby instances + (async-request coordinator 1 "/foo/bar/async1" :borrowed-jruby) + (async-request coordinator 2 "/foo/baz/async2" :borrowed-jruby) + + (update-expected-values {:num-free-jrubies -2 + :requested-count 2 + :borrow-count 2 + :current-borrowed-instances 2}) + (is (= (expected-metrics-values) (current-metrics-values))) + + ;; manually borrow an instance, but do it on another thread because + ;; we know it will block right now due to the empty pool + (let [future-instance (promise) + wait-to-return (promise) + future-thread (future + (let [instance (jruby-testutils/borrow-instance + jruby-service :metrics-manual-borrow-test2)] + (deliver future-instance instance) + @wait-to-return + (jruby-testutils/return-instance + jruby-service instance + :metrics-manual-borrow-test2))) + expected-request-count (inc (:requested-count (expected-metrics-values)))] + ;; wait for manual borrow attempt to register as a requested instance + (while (> expected-request-count (.getCount requested-count))) + + (update-expected-values {:requested-count 1 + :current-requested-instances 1}) + (is (= (expected-metrics-values) (current-metrics-values))) + + ;; validate that we can see it in the metrics + (let [requested (first (vals @requested-instances))] + (is (timestamp-after? start-time (:time requested))) + (is (= :metrics-manual-borrow-test2 (:reason requested)))) + + (coordinator/final-result coordinator 1) + (coordinator/final-result coordinator 2) + + (let [instance @future-instance] + (update-expected-values {:num-free-jrubies 1 + :return-count 2 + :borrow-count 1 + :current-borrowed-instances -1 + :current-requested-instances -1}) + (is (= (expected-metrics-values) (current-metrics-values))) + + (let [borrowed (first (vals @borrowed-instances))] + (is (timestamp-after? start-time (:time borrowed))) + (is (= :metrics-manual-borrow-test2 (:reason borrowed)))) + + (deliver wait-to-return true) + @future-thread)) + + (update-expected-values {:num-free-jrubies 1 + :return-count 1 + :current-borrowed-instances -1}) + (is (= (expected-metrics-values) (current-metrics-values))))))) + +(deftest ^:metrics timers-test + (with-metrics-test-env test-env default-test-config + (let [{:keys [coordinator sampling-scheduled? sample-metrics! + expected-metrics-values update-expected-values + current-metrics-values]} test-env + {:keys [requested-jrubies-histo free-jrubies-histo + borrow-timer wait-timer requested-count + num-free-jrubies]} (:metrics test-env)] + (testing "borrow/wait timers, histograms and sampling" + ;; under normal circumstances, we'd be taking samples for the + ;; histogram metrics at a scheduled interval. However, for the + ;; purposes of testing, we mocked that out so that we can + ;; trigger the sampling explicitly. + ;; + ;; first we should double-check that the sampler *would* have + ;; been scheduled if it hadn't been mocked out. + (testing "metrics sampling is scheduled" + (is (true? @sampling-scheduled?))) + + ;; now let's just validate that we know what our starting values + ;; are. + (is (= 0.0 (metrics/mean requested-jrubies-histo))) + (is (= 0.0 (metrics/mean free-jrubies-histo))) + (is (= 0 (metrics/mean-millis borrow-timer))) + (is (= 0 (metrics/mean-millis wait-timer))) + + (testing "wait timer increases under load" + ;; we're going to run a bunch of requests that sleep for 10 + ;; millis each, and take a sample after we unblock each one. + ;; the samples should all be increasing. + (let [samples (atom []) + done? (promise) + create-requests (fn [] + (doseq [request-id (range 10)] + (async-request coordinator request-id "/foo/bar/foo?sleep=10") + ;; allow each request to proceed as far as borrowing + ;; a jruby, but don't wait for them. (most + ;; of them will be blocked because the pool + ;; will be empty.) + (coordinator/unblock-task-to coordinator request-id :borrowed-jruby))) + + run-request-and-take-sample (fn [request-id phase] + ;; block until the request has borrowed a jruby; we need + ;; to do this since we told them they were unblocked to + ;; that point. + (coordinator/wait-for-task coordinator request-id :borrowed-jruby) + (let [current-wait-time (metrics/mean-millis wait-timer)] + ;; now we know the request is blocked at the :borrowed-jruby phase. + ;; sleep for a few ms longer than the current wait time, to make sure + ;; that the other queued requests show a noticeable difference in their + ;; borrow time. + (Thread/sleep (+ current-wait-time 10))) + ;; and now we can advance it to the end. + (coordinator/final-result coordinator request-id) + ;; take a sample and add it to the samples atom + (sample-metrics!) + (swap! samples conj (metrics/mean-millis wait-timer)) + ;; check to see if all of the requests are done + (when (= 10 (count @samples)) + (deliver done? true)))] + (create-requests) + + ;; block until all of the requests have reached the point + ;; where they either have a jruby or are blocked in an + ;; attempt to take one from the empty pool. + (let [expected-requested-count (+ 10 (:requested-count (expected-metrics-values)))] + (while (> expected-requested-count (.getCount requested-count)))) + + ;; add callbacks that get fired when each request reaches + ;; the :borrowed-jruby phase + (doseq [request-id (range 10)] + (coordinator/callback-at-phase coordinator request-id :borrowed-jruby run-request-and-take-sample)) + + ;; block until all requests are completed + @done? + + ;; we drop the first two samples because those came from the + ;; first two jrubies borrowed, so, should have had basically zero + ;; wait time. All subsequent requests will have + ;; been queued and blocked waiting for a jruby together, + ;; so the wait times should be increasing for each request. + (let [relevant-samples (drop 2 @samples)] + ;; To validate that the samples are all increasing, we can just + ;; compare it to a sorted version of itself + (is (= (sort relevant-samples) relevant-samples))) + + (update-expected-values {:requested-count 10 + :borrow-count 10 + :return-count 10}) + (is (= (expected-metrics-values) (current-metrics-values))))) + + (testing "free-jrubies histo increases when no instances are borrowed" + ;; validate that all the jrubies are free + (is (= 2 (.getValue num-free-jrubies))) + (let [initial-value (metrics/mean free-jrubies-histo)] + ;; take 10 samples; no jrubies are in use, so the average + ;; should increase with each sample. + (let [samples (for [i (range 10)] + (do + (sample-metrics!) + (metrics/mean free-jrubies-histo)))] + (is (= (sort samples) samples))) + (is (< initial-value (metrics/mean free-jrubies-histo)))) + + (is (= (expected-metrics-values) (current-metrics-values)))) + + (testing "requested-jrubies histo decreases when requests are not queued" + (let [initial-value (metrics/mean requested-jrubies-histo)] + ;; take 10 samples; the average should increase with each sample. + (let [samples (for [i (range 10)] + (do + (sample-metrics!) + (metrics/mean requested-jrubies-histo)))] + (is (= (reverse (sort samples)) samples))) + + (is (> initial-value (metrics/mean requested-jrubies-histo)))) + + (is (= (expected-metrics-values) (current-metrics-values))))))) + + (with-metrics-test-env test-env default-test-config + (let [{:keys [coordinator sample-metrics! + expected-metrics-values update-expected-values + current-metrics-values]} test-env + {:keys [requested-jrubies-histo requested-count]} (:metrics test-env)] + (testing "requested-jrubies histo increases when requests are queued" + ;; take a sample to ensure that the requested-jrubies-histo is updated + (sample-metrics!) + + (is (= 0.0 (metrics/mean requested-jrubies-histo))) + + ;; get ourselves to a state where two jrubies instance requests are queued + (async-request coordinator 1 "/foo/bar/req" :borrowed-jruby) + (async-request coordinator 2 "/foo/bar/req" :borrowed-jruby) + ;; the next two requests won't be able to make it all the way to + ;; ':borrowed-jruby', because the pool is empty, so we won't + ;; block waiting for them to get there. + (async-request coordinator 3 "/foo/bar/req") + (coordinator/unblock-task-to coordinator 3 :borrowed-jruby) + (async-request coordinator 4 "/foo/bar/req") + (coordinator/unblock-task-to coordinator 4 :borrowed-jruby) + + ;; wait until all of the jrubies have been requested so we know + ;; that requests 3 and 4 are blocked trying to take a jruby + ;; from the pool + (let [expected-requested-count (+ 4 (:requested-count (expected-metrics-values)))] + (while (> expected-requested-count (.getCount requested-count)))) + + ;; ok, now we have two jruby instance requests pending. + ;; take some samples and validate that the histogram is + ;; increasing. + (let [initial-value (metrics/mean requested-jrubies-histo)] + (let [samples (for [i (range 10)] + (do + (sample-metrics!) + (metrics/mean requested-jrubies-histo)))] + (is (= (sort samples) samples))) + + (is (< initial-value (metrics/mean requested-jrubies-histo)))) + + ;; finish up the requests + (coordinator/final-result coordinator 1) + (coordinator/final-result coordinator 2) + (coordinator/wait-for-task coordinator 3 :borrowed-jruby) + (coordinator/wait-for-task coordinator 4 :borrowed-jruby) + (coordinator/final-result coordinator 3) + (coordinator/final-result coordinator 4) + + (update-expected-values {:requested-count 4 + :borrow-count 4 + :return-count 4}) + (is (= (expected-metrics-values) (current-metrics-values)))))) + + (with-metrics-test-env test-env default-test-config + (let [{:keys [coordinator sample-metrics! + expected-metrics-values update-expected-values + current-metrics-values]} test-env + {:keys [free-jrubies-histo num-free-jrubies]} (:metrics test-env)] + (testing "free-jrubies histo decreases when instances are borrowed" + ;; take a sample to ensure that the free-jrubies-histo is updated + (sample-metrics!) + + (let [initial-value (metrics/mean free-jrubies-histo)] + (is (= 2.0 initial-value)) + + ;; borrow two instances so we know that there are no free jrubies + (async-request coordinator 1 "/foo/bar/req" :borrowed-jruby) + (async-request coordinator 2 "/foo/bar/req" :borrowed-jruby) + (is (= 0 (.getValue num-free-jrubies))) + + ;; take 10 samples; the average should decrease with each sample. + (let [samples (for [i (range 10)] + (do + (sample-metrics!) + (metrics/mean free-jrubies-histo)))] + (is (= (reverse (sort samples)) samples))) + + (is (> initial-value (metrics/mean free-jrubies-histo))) + + ;; free up our blocked requests + (coordinator/final-result coordinator 1) + (coordinator/final-result coordinator 2)) + + (update-expected-values {:requested-count 2 + :borrow-count 2 + :return-count 2}) + (is (= (expected-metrics-values) (current-metrics-values)))))) + + (with-metrics-test-env test-env default-test-config + (let [{:keys [coordinator sample-metrics! + expected-metrics-values update-expected-values + current-metrics-values]} test-env + {:keys [wait-timer num-free-jrubies]} (:metrics test-env)] + (testing "wait timer decreases when not under load" + ;; set the wait timer to an artificially super-high value so that we + ;; can be reasonably assured that subsequent borrows will reduce the mean + (.update wait-timer 1 TimeUnit/HOURS) + + ;; make sure there are no jrubies in use, so we know that there + ;; should be no wait times. + (is (= 2 (.getValue num-free-jrubies))) + + (let [start-wait-time (metrics/mean-millis wait-timer)] + (dotimes [request-id 10] + (let [last-wait-time (metrics/mean-millis wait-timer)] + ;; make a request (should have zero wait time) + (sync-request coordinator request-id "/foo/bar/req") + ;; take a sample and confirm that the mean wait time has decreased + (sample-metrics!) + (is (>= last-wait-time (metrics/mean-millis wait-timer))))) + (is (> start-wait-time (metrics/mean-millis wait-timer)))) + + (update-expected-values {:requested-count 10 + :borrow-count 10 + :return-count 10}) + (is (= (expected-metrics-values) (current-metrics-values)))))) + + (with-metrics-test-env test-env default-test-config + (let [{:keys [coordinator sample-metrics! + expected-metrics-values update-expected-values + current-metrics-values]} test-env + {:keys [borrow-timer]} (:metrics test-env)] + (testing "borrow timer increases when requests are slower" + (dotimes [request-id 10] + (let [last-borrow-time (metrics/mean-millis borrow-timer)] + (let [longer-borrow-time (+ 50 last-borrow-time)] + (sync-request coordinator request-id (format "/foo/bar/req?sleep=%s" longer-borrow-time)) + ;; sample and validate that the borrow time has increased + (sample-metrics!) + (let [new-borrow-time (metrics/mean-millis borrow-timer)] + (is (<= last-borrow-time new-borrow-time) + (format + (str "Borrow time did not increase! " + "last-borrow-time: '%s', longer-borrow-time: '%s', " + "new-borrow-time: '%s'") + last-borrow-time longer-borrow-time new-borrow-time)))))) + + (update-expected-values {:requested-count 10 + :borrow-count 10 + :return-count 10}) + (is (= (expected-metrics-values) (current-metrics-values)))))) + + (with-metrics-test-env test-env default-test-config + (let [{:keys [coordinator sample-metrics! + expected-metrics-values update-expected-values + current-metrics-values]} test-env + {:keys [borrow-timer]} (:metrics test-env)] + (testing "borrow timer decreases when requests are faster" + ;; set the borrow timer to an artificially super-high value so that we + ;; can be reasonably assured that subsequent borrows will reduce the mean + (.update borrow-timer 1 TimeUnit/HOURS) + (let [start-borrow-time (metrics/mean-millis borrow-timer)] + ;; make some requests with no sleep + (dotimes [request-id 10] + (let [last-borrow-time (metrics/mean-millis borrow-timer)] + (sync-request coordinator request-id "/foo/bar/req") + ;; sample and validate that the borrow time has decreased + (sample-metrics!) + (is (>= last-borrow-time (metrics/mean-millis borrow-timer))))) + (is (> start-borrow-time (metrics/mean-millis borrow-timer)))) + + (update-expected-values {:requested-count 10 + :borrow-count 10 + :return-count 10}) + (is (= (expected-metrics-values) (current-metrics-values))))))) + +(deftest ^:metrics borrow-timeout-test + (with-metrics-test-env + test-env + (assoc-in default-test-config + ;;; set the borrow timeout to a low value so that we can test + ;;; timeout handling without making the test too slow + [:jruby-puppet :borrow-timeout] 1000) + (let [{:keys [coordinator update-expected-values expected-metrics-values + current-metrics-values jruby-service]} test-env] + (testing "borrow timeout" + ;; first we'll queue up a few requests to consume the 2 jruby instances + (async-request coordinator 1 "/foo/bar/async1" :returning-jruby) + (async-request coordinator 2 "/foo/baz/async2" :returning-jruby) + + (update-expected-values {:num-free-jrubies -2 + :requested-count 2 + :borrow-count 2 + :current-borrowed-instances 2}) + (is (= (expected-metrics-values) (current-metrics-values))) + + ;; now attempt a manual borrow and allow it to timeout + (let [borrow-result (jruby-testutils/borrow-instance jruby-service :introspect-manual-borrow-test2)] + (is (nil? borrow-result))) + + (update-expected-values {:requested-count 1 + :borrow-timeout-count 1}) + (is (= (expected-metrics-values) (current-metrics-values))) + + (coordinator/final-result coordinator 1) + (coordinator/final-result coordinator 2) + + (update-expected-values {:num-free-jrubies 2 + :return-count 2 + :current-borrowed-instances -2}) + (is (= (expected-metrics-values) (current-metrics-values))))))) diff --git a/test/unit/puppetlabs/enterprise/testutils/task_coordinator.clj b/test/unit/puppetlabs/enterprise/testutils/task_coordinator.clj new file mode 100644 index 0000000000..a63fe0ab23 --- /dev/null +++ b/test/unit/puppetlabs/enterprise/testutils/task_coordinator.clj @@ -0,0 +1,249 @@ +(ns puppetlabs.enterprise.testutils.task-coordinator + "This namespace defines a TaskCoordinator protocol. It is used for testing + in cases where you need a lot of control over asynchronous behaviors; it provides + a mechanism for defining phases for a task, and blocking/unblocking the task + at any phase of its life cycle." + (:require [schema.core :as schema] + [clojure.core.async.impl.protocols :as async-protocols] + [clojure.tools.logging :as log] + [clojure.core.async :as async]) + (:import (clojure.lang IFn))) + + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Private + +(defprotocol TaskCoordinator + (initialize-task + [this task-id task-fn] + [this task-id task-fn phase] + "Initialize a task given an id and a function that will actually perform + the task. Also supports an optional `phase` keyword; if not provided, + then the task will be blocked without having started yet. If `phase` + is provided, the task will be executed synchronously until it reaches + the specified `phase` (which must be a valid phase from `task-phases`).") + (notify-task-progress [this task-id phase] + "Notify the coordinator that the task with the given ID has reached the + given phase. This function should be called from within the logic of the + task itself, and will cause the task to block until the coordinator + has given it permission to proceed.") + (advance-task-to [this task-id phase] + "Synchronously advance the task with the given id to the specified phase.") + (unblock-task-to [this task-id phase] + "Allow the task with the given id to proceed, asynchronously, to the + specified phase. This call is asynchronous and returns immediately; it + should always be followed by a call to `wait-for-task` to re-join the + threads after some intermediate action.") + (wait-for-task [this task-id phase] + "Blocks until the task with the given id reaches the specified phase. This + should only be called following a call to `unblock-task-to`.") + (callback-at-phase [this task-id phase callback] + "Register a callback function that should be called when a task reaches + a certain phase. Used in combination with `unblock-task-to`. The callback + function should accept two arguments: a task-id and a phase.") + (final-result [this task-id] + "Synchronously advance the task through to completion (if it has not + already been completed), and return the result of the original + `task-fn` that was passed in to `initialize-task`.") + + (task-phases [this] + "For internal use; the sequence of phases making up the life cycle for a + task managed by this coordinator.") + (tasks [this] + "For internal use; the state of all of the tasks currently being managed + by this coordinator.")) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Private + +(schema/defn wait-for-ack + [channel :- (schema/protocol async-protocols/Channel)] + (let [ack (async/!! channel {:task-id task-id + :phase phase}) + (log/debug "Blocking task; waiting for ack:" task-id phase) + (wait-for-ack channel) + (log/debug "task received ack, unblocking:" task-id phase))) + +(schema/defn ^:always-validate do-notify-task-progress + [coordinator :- (schema/protocol TaskCoordinator) + task-id :- (schema/either schema/Str schema/Int schema/Keyword) + phase :- schema/Keyword] + (do-notify-task-progress* coordinator (str task-id) phase)) + +(schema/defn ^:always-validate do-advance-task-to* + [coordinator :- (schema/protocol TaskCoordinator) + task-id :- schema/Str + desired-phase :- schema/Keyword] + (log/debug "Advancing task to phase:" task-id desired-phase) + (let [channel (:channel (get @(tasks coordinator) task-id)) + next-phase (fn [] (:phase (async/!! channel :go) + (loop [phase (next-phase)] + (log/debug "Advancing task; reached phase:" task-id phase) + (let [task (get @(tasks coordinator) task-id) + next-expected-phase (first (:remaining-phases task))] + (when-not (= next-expected-phase phase) + (let [msg (format "Expected next phase '%s', got '%s'" next-expected-phase phase)] + (log/error msg) + (throw (IllegalStateException. msg)))) + (let [remaining-phases (rest (:remaining-phases task))] + (log/debug "Advancing task; remaining phases:" task-id remaining-phases) + (swap! (tasks coordinator) assoc-in [task-id :remaining-phases] remaining-phases)) + (swap! (tasks coordinator) assoc-in [task-id :current-phase] phase) + (if (= desired-phase phase) + (log/debug "task has reached desired phase:" task-id phase) + (do + (log/debug "Advancing task: sending ack" task-id) + (async/>!! (:channel task) :go) + (recur (next-phase)))))))) + +(schema/defn ^:always-validate do-advance-task-to + [coordinator :- (schema/protocol TaskCoordinator) + task-id :- (schema/either schema/Str schema/Int schema/Keyword) + desired-phase :- schema/Keyword] + (do-advance-task-to* coordinator (str task-id) desired-phase)) + +(schema/defn ^:always-validate do-advance-task-through-all-phases* + [coordinator :- (schema/protocol TaskCoordinator) + task-id :- schema/Str] + (let [task (get @(tasks coordinator) task-id)] + (when-not (empty? (:remaining-phases task)) + (log/debug "Advancing task to last phase:" task-id (last (task-phases coordinator))) + (do-advance-task-to coordinator task-id (last (task-phases coordinator)))) + (log/debug "Advancing task to completion:" task-id) + (async/>!! (:channel task) :go))) + +(schema/defn ^:always-validate do-advance-task-through-all-phases + [coordinator :- (schema/protocol TaskCoordinator) + task-id :- (schema/either schema/Str schema/Int schema/Keyword)] + (do-advance-task-through-all-phases* coordinator (str task-id))) + +(schema/defn ^:always-validate do-unblock-task-to + ([coordinator :- (schema/protocol TaskCoordinator) + task-id :- (schema/either schema/Str schema/Int schema/Keyword) + desired-phase :- schema/Keyword] + (do-unblock-task-to coordinator task-id desired-phase nil)) + ([coordinator :- (schema/protocol TaskCoordinator) + task-id :- (schema/either schema/Str schema/Int schema/Keyword) + desired-phase :- schema/Keyword + callback :- (schema/maybe IFn)] + (log/debug "Unblocking task to phase:" task-id desired-phase) + (async/go (advance-task-to coordinator task-id desired-phase) + (when callback + (callback task-id desired-phase))))) + +(schema/defn ^:always-validate do-wait-for-task* + [coordinator :- (schema/protocol TaskCoordinator) + task-id :- schema/Str + desired-phase :- schema/Keyword] + (log/debug "Waiting for task to reach phase:" task-id desired-phase) + (loop [task (get @(tasks coordinator) task-id)] + (when-not (= desired-phase (:current-phase task)) + (recur (get @(tasks coordinator) task-id))))) + +(schema/defn ^:always-validate do-wait-for-task + [coordinator :- (schema/protocol TaskCoordinator) + task-id :- (schema/either schema/Str schema/Int schema/Keyword) + desired-phase :- schema/Keyword] + (do-wait-for-task* coordinator (str task-id) desired-phase)) + +(schema/defn ^:always-validate do-callback-at-phase + [coordinator :- (schema/protocol TaskCoordinator) + task-id :- (schema/either schema/Str schema/Int schema/Keyword) + desired-phase :- schema/Keyword + callback :- IFn] + (async/go (do-wait-for-task coordinator task-id desired-phase) + (callback task-id desired-phase))) + +(schema/defn ^:always-validate do-final-result* + [coordinator :- (schema/protocol TaskCoordinator) + task-id :- schema/Str] + (log/debug "Retrieving final result:" task-id) + (let [task (get @(tasks coordinator) task-id)] + (do-advance-task-through-all-phases coordinator task-id) + (log/debug "Reading final result:" task-id) + (let [result (async/