Permalink
Browse files

throttles the number of simultaneous kerberos library invocations dur…

…ing authentication (#536)

* throttles the number of simultaneous kerberos library invocations during authentication

* adds support for terminating idle threads in the throttler thread pool
adds support for additional metrics around the throttler

* adds metric for available thread count

* processes the request on an async thread instead of the kerberos thread pool

* adds queue length limit on the kerberos throttler

* avoids reflection by explicit type hinting
specifies text/plain content types
changes message to 'Too many Kerberos authentication requests'
  • Loading branch information...
shamsimam authored and sradack committed Jan 10, 2019
1 parent f330faf commit da4261b843607aa281bb5554438ce797d5400d9e
@@ -167,7 +167,14 @@

;; :kind :kerberos enables authentication using the Kerberos protocol
;:kind :kerberos
:kerberos {:factory-fn waiter.auth.kerberos/kerberos-authenticator}}
:kerberos {:factory-fn waiter.auth.kerberos/kerberos-authenticator
;; the maximum number of concurrent Kerberos library (GSSLibStub) invocations allowed
:concurrency-level 20
;; the idle time before cached threads from the invocation throttler are terminated
:keep-alive-mins 5
;; the maximum number of request waiting for Kerberos auth before server responds
;; with a 503 temporarily unavailable
:max-queue-length 1000}}

;; Waiter supports the run-as-requester feature to launch a service as the requesting user.
;; Triggering this feature without passing explicit headers requires providing an explicit consent and storing this in a cookie.
@@ -22,7 +22,9 @@
[waiter.auth.authentication :as auth]
[waiter.authorization :as authz]
[waiter.auth.spnego :as spnego]
[waiter.util.utils :as utils]))
[waiter.metrics :as metrics]
[waiter.util.utils :as utils])
(:import (java.util.concurrent LinkedBlockingQueue ThreadPoolExecutor TimeUnit)))

(defn get-opt-in-accounts
"Returns the list of users whose tickets are prestashed on host"
@@ -101,16 +103,33 @@
:status 403
:user run-as-user}))))))

(defrecord KerberosAuthenticator [password]
(defrecord KerberosAuthenticator [^ThreadPoolExecutor executor max-queue-length password]
auth/Authenticator
(wrap-auth-handler [_ request-handler]
(spnego/require-gss request-handler password)))
(spnego/require-gss request-handler executor max-queue-length password)))

(defn kerberos-authenticator
"Factory function for creating Kerberos authenticator middleware"
[{:keys [password]}]
{:pre [(not-empty password)]}
(->KerberosAuthenticator password))
[{:keys [concurrency-level keep-alive-mins max-queue-length password]}]
{:pre [(not-empty password)
(integer? concurrency-level)
(pos? concurrency-level)
(integer? keep-alive-mins)
(pos? keep-alive-mins)
(integer? max-queue-length)
(pos? max-queue-length)]}
(let [executor (ThreadPoolExecutor. 1 concurrency-level keep-alive-mins TimeUnit/MINUTES (LinkedBlockingQueue.))]
(metrics/waiter-gauge #(.getActiveCount executor)
"core" "kerberos" "throttle" "active-thread-count")
(metrics/waiter-gauge #(- concurrency-level (.getActiveCount executor))
"core" "kerberos" "throttle" "available-thread-count")
(metrics/waiter-gauge #(.getMaximumPoolSize executor)
"core" "kerberos" "throttle" "max-thread-count")
(metrics/waiter-gauge #(-> executor .getQueue .size)
"core" "kerberos" "throttle" "pending-task-count")
(metrics/waiter-gauge #(.getTaskCount executor)
"core" "kerberos" "throttle" "scheduled-task-count")
(->KerberosAuthenticator executor max-queue-length password)))

(defrecord KerberosAuthorizer
[prestash-cache query-chan]
@@ -20,16 +20,20 @@
[clojure.tools.logging :as log]
[metrics.counters :as counters]
[metrics.meters :as meters]
[metrics.timers :as timers]
[ring.middleware.cookies :as cookies]
[ring.util.response :as rr]
[waiter.auth.authentication :as auth]
[waiter.correlation-id :as cid]
[waiter.middleware :as middleware]
[waiter.metrics :as metrics])
[waiter.metrics :as metrics]
[waiter.util.utils :as utils])
(:import (org.apache.commons.codec.binary Base64)
(org.eclipse.jetty.client.api Authentication$Result Request)
(org.eclipse.jetty.http HttpHeader)
(org.ietf.jgss GSSManager GSSCredential GSSContext GSSName Oid)
(java.net URI)))
(java.net URI)
(java.util.concurrent ThreadPoolExecutor)))

(defn decode-input-token
"Decode the input token from the negotiate line, expects the authorization token to exist"
@@ -59,10 +63,23 @@
(meters/mark! (metrics/waiter-meter "core" "response-status-rate" "401"))
(-> (rr/response "Unauthorized")
(rr/status 401)
(rr/header "Content-Type" "text/html")
(rr/header "Content-Type" "text/plain")
(rr/header "Server" (utils/get-current-server-name))
(rr/header "WWW-Authenticate" "Negotiate")
(cookies/cookies-response)))

(defn response-503-temporarily-unavailable
"Tell the client you're overloaded and would like them to try later"
[]
(log/info "triggering 401 negotiate for spnego authentication")
(counters/inc! (metrics/waiter-counter "core" "response-status" "503"))
(meters/mark! (metrics/waiter-meter "core" "response-status-rate" "503"))
(-> (rr/response "Too many Kerberos authentication requests")
(rr/status 503)
(rr/header "Content-Type" "text/plain")
(rr/header "Server" (utils/get-current-server-name))
(cookies/cookies-response)))

(defn gss-context-init
"Initialize a new gss context with name 'svc_name'"
[]
@@ -73,42 +90,88 @@
(meters/mark! (metrics/waiter-meter "core" "gss-context-creation"))
gss))

(defn gss-get-princ
(defn gss-get-principal
[^GSSContext gss]
(str (.getSrcName gss)))

(defn too-many-pending-auth-requests?
"Returns true if there are too many pending Kerberos auth requests."
[^ThreadPoolExecutor thread-pool-executor max-queue-length]
(-> thread-pool-executor
.getQueue
.size
(>= max-queue-length)))

(defn populate-gss-credentials
"Perform Kerberos authentication on the provided thread pool and populate the result in the response channel."
[^ThreadPoolExecutor thread-pool-executor request response-chan]
(let [current-correlation-id (cid/get-correlation-id)
timer-context (timers/start (metrics/waiter-timer "core" "kerberos" "throttle" "delay"))]
(.execute
thread-pool-executor
(fn process-gss-task []
(cid/with-correlation-id
current-correlation-id
(try
(timers/stop timer-context)
(let [^GSSContext gss-context (gss-context-init)
token (do-gss-auth-check gss-context request)
principal (when (.isEstablished gss-context)
(gss-get-principal gss-context))]
(async/>!! response-chan {:principal principal
:token token}))
(catch Throwable th
(log/error th "error while performing kerberos auth")
(async/>!! response-chan {:error th}))
(finally
(async/close! response-chan))))))))

(defn require-gss
"This middleware enables the application to require a SPNEGO
authentication. If SPNEGO is successful then the handler `request-handler`
will be run, otherwise the handler will not be run and 401
returned instead. This middleware doesn't handle cookies for
authentication, but that should be stacked before this handler."
[request-handler password]
(fn require-gss-handler [{:keys [headers] :as req}]
[request-handler ^ThreadPoolExecutor thread-pool-executor max-queue-length password]
(fn require-gss-handler [{:keys [headers] :as request}]
(let [waiter-cookie (auth/get-auth-cookie-value (get headers "cookie"))
[auth-principal _ :as decoded-auth-cookie] (auth/decode-auth-cookie waiter-cookie password)]
(cond
;; Use the cookie, if not expired
(auth/decoded-auth-valid? decoded-auth-cookie)
(let [auth-params-map (auth/auth-params-map auth-principal)
request-handler' (middleware/wrap-merge request-handler auth-params-map)]
(request-handler' req))
(request-handler' request))
;; Ensure we are not already queued with lots of Kerberos auth requests
(too-many-pending-auth-requests? thread-pool-executor max-queue-length)
(response-503-temporarily-unavailable)
;; Try and authenticate using kerberos and add cookie in response when valid
(get-in req [:headers "authorization"])
(let [^GSSContext gss_context (gss-context-init)
token (do-gss-auth-check gss_context req)]
(if (.isEstablished gss_context)
(let [principal (gss-get-princ gss_context)
user (first (str/split principal #"@" 2))
resp (auth/handle-request-auth request-handler req user principal password)]
(log/debug "added cookies to response")
(if token
(if (map? resp)
(rr/header resp "WWW-Authenticate" token)
(async/go
(rr/header (async/<! resp) "WWW-Authenticate" token)))
resp))
(response-401-negotiate)))
(get-in request [:headers "authorization"])
(let [current-correlation-id (cid/get-correlation-id)
gss-response-chan (async/promise-chan)]
;; launch task that will populate the response in response-chan
(populate-gss-credentials thread-pool-executor request gss-response-chan)
(async/go
(cid/with-correlation-id
current-correlation-id
(let [{:keys [error principal token]} (async/<! gss-response-chan)]
(if-not error
(try
(if principal
(let [user (first (str/split principal #"@" 2))
response (auth/handle-request-auth request-handler request user principal password)]
(log/debug "added cookies to response")
(if token
(if (map? response)
(rr/header response "WWW-Authenticate" token)
(let [actual-response (async/<! response)]
(rr/header actual-response "WWW-Authenticate" token)))
response))
(response-401-negotiate))
(catch Throwable th
(log/error th "error while processing response")
th))
error)))))
;; Default to unauthorized
:else
(response-401-negotiate)))))
@@ -21,6 +21,7 @@
[clojure.string :as str]
[metrics.core :as mc]
[metrics.counters :as counters]
[metrics.gauges :as gauges]
[metrics.histograms :as histograms]
[metrics.meters :as meters]
[metrics.timers :as timers]
@@ -93,6 +94,11 @@
[classifier & nested-path]
`(counters/counter ~(metric-name (concat ["waiter" classifier "counters"] nested-path))))

(defmacro waiter-gauge
"Creates a gauge with waiter-specific naming scheme"
[f classifier & nested-path]
`(gauges/gauge-fn ~(metric-name (concat ["waiter" classifier "counters"] nested-path)) ~f))

(defmacro waiter-meter
"Creates a waiter-meter with waiter-specific naming scheme"
[classifier & nested-path]
@@ -206,6 +206,10 @@

(def settings-defaults
{:authenticator-config {:kind :one-user
:kerberos {:factory-fn 'waiter.auth.kerberos/kerberos-authenticator
:concurrency-level 20
:keep-alive-mins 5
:max-queue-length 1000}
:one-user {:factory-fn 'waiter.auth.authentication/one-user-authenticator}}
:cors-config {:kind :patterns
:patterns {:factory-fn 'waiter.cors/pattern-based-validator
@@ -110,7 +110,10 @@

(deftest test-kerberos-authenticator
(with-redefs [start-prestash-cache-maintainer (constantly nil)]
(let [config {:password "test-password"
(let [config {:concurrency-level 20
:keep-alive-mins 5
:max-queue-length 100
:password "test-password"
:prestash-cache-refresh-ms 100
:prestash-cache-min-refresh-ms 10
:prestash-query-host "example.com"}
@@ -14,8 +14,12 @@
;; limitations under the License.
;;
(ns waiter.auth.spnego-test
(:require [clojure.test :refer :all]
[waiter.auth.spnego :refer :all]))
(:require [clojure.core.async :as async]
[clojure.string :as str]
[clojure.test :refer :all]
[waiter.auth.authentication :as auth]
[waiter.auth.spnego :refer :all]
[waiter.util.utils :as utils]))

(deftest test-decode-input-token
(is (decode-input-token {:headers {"authorization" "Negotiate Kerberos-Auth"}}))
@@ -25,3 +29,99 @@
(let [encoded-token (encode-output-token (.getBytes "Kerberos-Auth"))]
(is (= "Kerberos-Auth"
(String. (decode-input-token {:headers {"authorization" encoded-token}}))))))

(deftest test-require-gss
(let [ideal-response {:body "OK" :status 200}
request-handler (constantly ideal-response)
thread-pool (Object.)
max-queue-length 10
password [:cached "test-password"]
auth-principal "user@test.com"
standard-request {}
handler (require-gss request-handler thread-pool max-queue-length password)
standard-401-response {:body "Unauthorized"
:headers {"Content-Type" "text/plain"
"Server" "waiter"
"WWW-Authenticate" "Negotiate"}
:status 401}]

(testing "valid auth cookie"
(with-redefs [auth/decode-auth-cookie (constantly [auth-principal nil])
auth/decoded-auth-valid? (constantly true)]
(is (= (assoc ideal-response
:authorization/principal auth-principal
:authorization/user (first (str/split auth-principal #"@" 2)))
(handler standard-request)))))

(testing "too many pending kerberos requests"
(with-redefs [auth/decode-auth-cookie (constantly [auth-principal nil])
auth/decoded-auth-valid? (constantly false)
too-many-pending-auth-requests? (constantly true)]
(let [handler (require-gss request-handler thread-pool max-queue-length password)]
(is (= {:body "Too many Kerberos authentication requests"
:headers {"Content-Type" "text/plain"
"Server" "waiter"}
:status 503}
(handler standard-request))))))

(testing "standard 401 response on missing authorization header"
(with-redefs [auth/decode-auth-cookie (constantly [auth-principal nil])
auth/decoded-auth-valid? (constantly false)
too-many-pending-auth-requests? (constantly false)]
(let [handler (require-gss request-handler thread-pool max-queue-length password)]
(is (= standard-401-response (handler standard-request))))))

(testing "kerberos authentication path"
(with-redefs [auth/decode-auth-cookie (constantly [auth-principal nil])
auth/decoded-auth-valid? (constantly false)
too-many-pending-auth-requests? (constantly false)]
(let [auth-request (update standard-request :headers assoc "authorization" "foo-bar")
error-object (Object.)]

(testing "401 response on failed authentication"
(with-redefs [populate-gss-credentials (fn [_ _ response-chan]
(async/>!! response-chan {:foo :bar}))]
(let [handler (require-gss request-handler thread-pool max-queue-length password)
response (handler auth-request)
response (if (map? response)
response
(async/<!! response))]
(is (= standard-401-response response)))))

(testing "error object on exception"
(with-redefs [populate-gss-credentials (fn [_ _ response-chan]
(async/>!! response-chan {:error error-object}))]
(let [handler (require-gss request-handler thread-pool max-queue-length password)
response (handler auth-request)
response (if (map? response)
response
(async/<!! response))]
(is (= error-object response)))))

(testing "successful authentication - principal and token"
(with-redefs [populate-gss-credentials (fn [_ _ response-chan]
(async/>!! response-chan {:principal auth-principal
:token "test-token"}))]
(let [handler (require-gss request-handler thread-pool max-queue-length password)
response (handler auth-request)
response (if (map? response)
response
(async/<!! response))]
(is (= (assoc ideal-response
:authorization/principal "user@test.com"
:authorization/user "user"
:headers {"WWW-Authenticate" "test-token"})
(utils/dissoc-in response [:headers "set-cookie"]))))))

(testing "successful authentication - principal only"
(with-redefs [populate-gss-credentials (fn [_ _ response-chan]
(async/>!! response-chan {:principal auth-principal}))]
(let [handler (require-gss request-handler thread-pool max-queue-length password)
response (handler auth-request)
response (if (map? response)
response
(async/<!! response))]
(is (= (assoc ideal-response
:authorization/principal "user@test.com"
:authorization/user "user")
(utils/dissoc-in response [:headers "set-cookie"])))))))))))

0 comments on commit da4261b

Please sign in to comment.