Skip to content

Commit

Permalink
One thread per registered session. (see #16)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgrand committed Dec 11, 2018
1 parent 256ca0e commit 61e23d8
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 64 deletions.
80 changes: 19 additions & 61 deletions src/clojure/nrepl/middleware/interruptible_eval.clj
Expand Up @@ -131,36 +131,7 @@
(.flush ^Writer err)))))
(maybe-restore-original-ns @bindings)))

(defn- configure-thread-factory
"Returns a new ThreadFactory for the given session. This implementation
generates daemon threads, with names that include the session id."
[]
(let [session-thread-counter (AtomicLong. 0)
;; Create a constant dcl for use across evaluations. This allows
;; modifications to the classloader to persist.
cl (clojure.lang.DynamicClassLoader.
(.getContextClassLoader (Thread/currentThread)))]
(reify ThreadFactory
(newThread [_ runnable]
(doto (Thread. runnable
(format "nREPL-worker-%s" (.getAndIncrement session-thread-counter)))
(.setDaemon true)
(.setContextClassLoader cl))))))

(defn- configure-executor
"Returns a ThreadPoolExecutor, configured (by default) to
have 1 core thread, use an unbounded queue, create only daemon threads,
and allow unused threads to expire after 30s."
[& {:keys [keep-alive queue thread-factory]
:or {keep-alive 30000
queue (SynchronousQueue.)}}]
(let [^ThreadFactory thread-factory (or thread-factory (configure-thread-factory))]
(ThreadPoolExecutor. 1 Integer/MAX_VALUE
(long 30000) TimeUnit/MILLISECONDS
^BlockingQueue queue
thread-factory)))

(def default-executor (delay (configure-executor)))

;; A little mini-agent implementation. Needed because agents cannot be used to host REPL
;; evaluation: http://dev.clojure.org/jira/browse/NREPL-17
Expand Down Expand Up @@ -206,43 +177,30 @@
\"eval\" and \"interrupt\" :op-erations that delegates to the given handler
otherwise."
[h & configuration]
(let [executor (:executor configuration @default-executor)]
(fn [{:keys [op session interrupt-id id transport] :as msg}]
(fn [{:keys [op session interrupt-id id transport] :as msg}]
(let [{:keys [interrupt exec] session-id :id} (meta session)]
(case op
"eval"
(if-not (:code msg)
(t/send transport (response-for msg :status #{:error :no-code :done}))
(queue-eval session executor
(fn []
(alter-meta! session assoc
:thread (Thread/currentThread)
:eval-msg msg)
(binding [*msg* msg]
(evaluate @session msg)
(t/send transport (response-for msg :status :done))
(alter-meta! session dissoc :thread :eval-msg)))))

(exec id
#(binding [*msg* msg]
(evaluate @session msg))
#(t/send transport (response-for msg :status :done))))

"interrupt"
;; interrupts are inherently racy; we'll check the agent's :eval-msg's :id and
;; bail if it's different than the one provided, but it's possible for
;; that message's eval to finish and another to start before we send
;; the interrupt / .stop.
(let [{:keys [id eval-msg ^Thread thread]} (meta session)]
(if (or (not interrupt-id)
(= interrupt-id (:id eval-msg)))
(if-not thread
(t/send transport (response-for msg :status #{:done :session-idle}))
(do
;; notify of the interrupted status before we .stop the thread so
;; it is received before the standard :done status (thereby ensuring
;; that is stays within the scope of a nrepl/message seq
(t/send transport {:status #{:interrupted}
:id (:id eval-msg)
:session id})
(.stop thread)
(t/send transport (response-for msg :status #{:done}))))
(t/send transport (response-for msg :status #{:error :interrupt-id-mismatch :done}))))

(let [interrupted-id (when interrupt (interrupt interrupt-id))]
(case interrupted-id
nil (t/send transport (response-for msg :status #{:error :interrupt-id-mismatch :done}))
:idle (t/send transport (response-for msg :status #{:done :session-idle}))
(do
;; interrupt prevents the interrupted computation to be ack'ed,
;; so a :done will never ne emitted before :interrupted
(t/send transport {:status #{:interrupted :done}
:id interrupted-id
:session session-id})
(t/send transport (response-for msg :status #{:done})))))

(h msg)))))

(set-descriptor! #'interruptible-eval
Expand Down
78 changes: 75 additions & 3 deletions src/clojure/nrepl/middleware/session.clj
Expand Up @@ -11,7 +11,8 @@
(:import
clojure.lang.LineNumberingPushbackReader
[java.io PrintWriter Reader Writer]
java.util.concurrent.LinkedBlockingQueue))
java.util.concurrent.atomic.AtomicLong
[java.util.concurrent LinkedBlockingQueue BlockingQueue Executor SynchronousQueue ThreadFactory ThreadPoolExecutor TimeUnit]))

(def ^{:private true} sessions (atom {}))

Expand All @@ -24,6 +25,40 @@
(def ^{:dynamic true :private true} *out-limit* 1024)
(def ^{:dynamic true :private true} *skipping-eol* false)

(defn- configure-thread-factory
"Returns a new ThreadFactory for the given session. This implementation
generates daemon threads, with names that include the session id."
[]
(let [session-thread-counter (AtomicLong. 0)
;; Create a constant dcl for use across evaluations. This allows
;; modifications to the classloader to persist.
cl (clojure.lang.DynamicClassLoader.
(.getContextClassLoader (Thread/currentThread)))]
(reify ThreadFactory
(newThread [_ runnable]
(doto (Thread. runnable
(format "nREPL-worker-%s" (.getAndIncrement session-thread-counter)))
(.setDaemon true)
(.setContextClassLoader cl))))))

(defn- configure-executor
"Returns a ThreadPoolExecutor, configured (by default) to
have 1 core thread, use an unbounded queue, create only daemon threads,
and allow unused threads to expire after 30s."
[& {:keys [keep-alive queue thread-factory]
:or {keep-alive 30000
queue (SynchronousQueue.)}}]
(let [^ThreadFactory thread-factory (or thread-factory (configure-thread-factory))]
(ThreadPoolExecutor. 1 Integer/MAX_VALUE
(long 30000) TimeUnit/MILLISECONDS
^BlockingQueue queue
thread-factory)))

(def default-executor (delay (configure-executor)))

(defn default-exec [id ^Runnable thunk ^Runnable ack]
(.submit ^java.util.concurrent.ExecutorService @default-executor ^Callable #(do (.run thunk) (.run ack))))

(defn- session-out
"Returns a PrintWriter suitable for binding as *out* or *err*. All of
the content written to that PrintWriter will (when .flush-ed) be sent on the
Expand Down Expand Up @@ -137,14 +172,51 @@
(atom (merge baseline-bindings (dissoc (get-thread-bindings) #'*agent*))
:meta {:id id
:stdin-reader stdin-reader
:input-queue input-queue}))))))
:input-queue input-queue
:exec default-exec}))))))

(defn session-exec [id]
(let [cl (clojure.lang.DynamicClassLoader.
(.getContextClassLoader (Thread/currentThread)))
queue (LinkedBlockingQueue.)
running (atom nil)
thread (atom nil)
main-loop #(let [[id ^Runnable r ^Runnable ack] (.take queue)]
(reset! running id)
(when (try
(.run r)
(compare-and-set! running id nil)
(finally
(compare-and-set! running id nil)))
(some-> ack .run)
(recur)))
spawn-thread #(doto (Thread. main-loop (str "nRepl-session-" id))
(.setDaemon true)
(.setContextClassLoader cl)
.start)]
(reset! thread (spawn-thread))
{:interrupt (fn [exec-id] ; nil means interrupt whatever is running
; returns :idle, interrupted id or nil
(let [current @running]
(cond
(nil? current) :idle
(and (or (nil? exec-id) (= current exec-id)) ; cas only checks identity, so check equality first
(compare-and-set! running current nil))
(do
(doto ^Thread @thread .interrupt .stop)
(reset! thread (spawn-thread))
current))))
:close #(.interrupt ^Thread @thread)
:exec (fn [exec-id r ack]
(.put queue [exec-id r ack]))}))

(defn- register-session
"Registers a new session containing the baseline bindings contained in the
given message's :session."
[{:keys [session transport] :as msg}]
(let [session (create-session transport @session)
id (-> session meta :id)]
{:keys [id]} (meta session)]
(alter-meta! session into (session-exec id))
(swap! sessions assoc id session)
(t/send transport (response-for msg :status :done :new-session id))))

Expand Down

0 comments on commit 61e23d8

Please sign in to comment.