Skip to content

Commit

Permalink
(SERVER-150) Put a 'retry' poison pill in the flushed pool
Browse files Browse the repository at this point in the history
When we replace the old pool with the new pool during a 'flush'
request, we need to have a way to handle the possible race
condition where some code has a reference to the old pool
and tries to borrow an instance from it after it has already
been flushed.  This commit is the first step towards that;
it puts a RetryPoisonPill into the old queue.
  • Loading branch information
cprice404 committed Dec 5, 2014
1 parent 64a78b2 commit 22b4255
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 6 deletions.
10 changes: 7 additions & 3 deletions src/clj/puppetlabs/services/jruby/jruby_puppet_agents.clj
@@ -1,7 +1,7 @@
(ns puppetlabs.services.jruby.jruby-puppet-agents
(:import (clojure.lang IFn Agent)
(com.puppetlabs.puppetserver PuppetProfiler)
(puppetlabs.services.jruby.jruby_puppet_core PoisonPill))
(puppetlabs.services.jruby.jruby_puppet_core PoisonPill RetryPoisonPill))
(:require [schema.core :as schema]
[puppetlabs.services.jruby.jruby-puppet-core :as jruby-core]
[clojure.tools.logging :as log]))
Expand Down Expand Up @@ -62,13 +62,17 @@
(log/infof "Cleaned up old JRuby instance %s of %s, creating replacement."
id count)
(try
(jruby-core/create-pool-instance! #spy/d new-pool id config profiler)
(jruby-core/create-pool-instance! new-pool id config profiler)
(jruby-core/mark-as-initialized! pool-state)
(log/infof "Finished creating JRubyPuppet instance %d of %d"
id count)
(catch Exception e
(.clear new-pool)
(.put new-pool (PoisonPill. e))
(throw (IllegalStateException. "There was a problem adding a JRubyPuppet instance to the pool." e))))))
(throw (IllegalStateException.
"There was a problem adding a JRubyPuppet instance to the pool."
e))))))
(.put (:pool old-pool) (RetryPoisonPill.))))

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;; Public
Expand Down
28 changes: 25 additions & 3 deletions src/clj/puppetlabs/services/jruby/jruby_puppet_core.clj
Expand Up @@ -42,6 +42,13 @@
;; state in a thread-safe manner.
[err])

(defrecord RetryPoisonPill
;; A sentinel object to put into an old pool when we swap in a new pool.
;; This can be used to build `borrow` functionality that will detect the
;; case where we're trying to borrow from an old pool, so that we can retry
;; with the new pool.
[])

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;; Schemas

Expand Down Expand Up @@ -106,6 +113,19 @@
(schema/pred
#(satisfies? puppet-env/EnvironmentStateContainer %)))])

(defn jruby-puppet-instance?
[x]
(instance? JRubyPuppetInstance x))

(defn retry-poison-pill?
[x]
(instance? RetryPoisonPill x))

(def JRubyPuppetInstanceOrRetry
(schema/conditional
jruby-puppet-instance? (schema/pred jruby-puppet-instance?)
retry-poison-pill? (schema/pred retry-poison-pill?)))


;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;; Private
Expand Down Expand Up @@ -237,7 +257,9 @@
be available to other callers) and throws the poison pill's exception.
Otherwise returns the instance that was passed in."
[instance pool]
{:post [((some-fn nil? #(instance? JRubyPuppetInstance %)) %)]}
{:post [((some-fn nil?
jruby-puppet-instance?
retry-poison-pill?) %)]}
(when (instance? PoisonPill instance)
(.put pool instance)
(throw (IllegalStateException. "Unable to borrow JRuby instance from pool"
Expand Down Expand Up @@ -302,15 +324,15 @@
puppet-env/mark-all-environments-expired!)))

(schema/defn ^:always-validate
borrow-from-pool :- JRubyPuppetInstance
borrow-from-pool :- JRubyPuppetInstanceOrRetry
"Borrows a JRubyPuppet interpreter from the pool. If there are no instances
left in the pool then this function will block until there is one available."
[pool :- pool-queue-type]
(let [instance (.take pool)]
(validate-instance-from-pool! instance pool)))

(schema/defn ^:always-validate
borrow-from-pool-with-timeout :- (schema/maybe JRubyPuppetInstance)
borrow-from-pool-with-timeout :- (schema/maybe JRubyPuppetInstanceOrRetry)
"Borrows a JRubyPuppet interpreter from the pool, like borrow-from-pool but a
blocking timeout is provided. If an instance is available then it will be
immediately returned to the caller, if not then this function will block
Expand Down
42 changes: 42 additions & 0 deletions test/unit/puppetlabs/services/jruby/jruby_puppet_agents_test.clj
@@ -0,0 +1,42 @@
(ns puppetlabs.services.jruby.jruby-puppet-agents-test
(:require [clojure.test :refer :all]
[schema.test :as schema-test]
[puppetlabs.trapperkeeper.testutils.bootstrap :as tk-testutils]
[puppetlabs.services.jruby.jruby-puppet-service :as jruby]
[puppetlabs.services.puppet-profiler.puppet-profiler-service :as profiler]
[puppetlabs.services.jruby.jruby-testutils :as jruby-testutils]
[puppetlabs.services.jruby.jruby-puppet-core :as jruby-core]
[puppetlabs.trapperkeeper.app :as tk-app]
[puppetlabs.trapperkeeper.services :as tk-services]
[puppetlabs.services.protocols.jruby-puppet :as jruby-protocol])
(:import (puppetlabs.services.jruby.jruby_puppet_core RetryPoisonPill)))

(use-fixtures :once schema-test/validate-schemas)

(deftest retry-poison-pill-test
(testing "Flush puts a retry poison pill into the old pool"
(tk-testutils/with-app-with-config
app
[jruby/jruby-puppet-pooled-service
profiler/puppet-profiler-service]
(-> (jruby-testutils/jruby-puppet-tk-config
(jruby-testutils/jruby-puppet-config 1)))
(let [jruby-service (tk-app/get-service app :JRubyPuppetService)
context (tk-services/service-context jruby-service)
pool-context (:pool-context context)
old-pool (jruby-core/get-pool pool-context)
pool-state-swapped (promise)
pool-state-watch-fn (fn [key pool-state old-val new-val]
(when (not= (:pool old-val) (:pool new-val))
(remove-watch pool-state key)
(deliver pool-state-swapped true)))]
; borrow an instance so we know that the pool is ready
(jruby/with-jruby-puppet jruby-puppet jruby-service)
(add-watch (:pool-state pool-context) :pool-state-watch pool-state-watch-fn)
(jruby-protocol/flush-jruby-pool! jruby-service)
; wait until we know the new pool has been swapped in
@pool-state-swapped
; wait until the flush is complete
(await (:pool-agent context))
(let [old-pool-instance (jruby-core/borrow-from-pool old-pool)]
(is (jruby-core/retry-poison-pill? old-pool-instance)))))))

0 comments on commit 22b4255

Please sign in to comment.