Skip to content

Commit

Permalink
Merge pull request #6 from yummly/merge-upstream
Browse files Browse the repository at this point in the history
Merge from upstream
  • Loading branch information
Ryan Smith committed Dec 1, 2020
2 parents ba29e4b + 34256ab commit 65dd5f2
Show file tree
Hide file tree
Showing 11 changed files with 64 additions and 16 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ A detailed discussion of Manifold's rationale can be found [here](http://aleph.i


```clj
[yummly/manifold "0.1.9"]
[yummly/manifold "0.1.9-alpha4"]
```

### deferreds
Expand Down
5 changes: 4 additions & 1 deletion deps.edn
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,7 @@
:check {:extra-deps {athos/clj-check {:git/url "https://github.com/athos/clj-check.git"
:sha "b48d4e7000586529f81c1e29069b503b57259514"}}
:jvm-opts ["-Dclojure.spec.compile-asserts=true"]
:main-opts ["-m" "clj-check.check"]}}}
:main-opts ["-m" "clj-check.check"]}
:pack {:extra-deps {pack/pack.alpha {:git/url "https://github.com/juxt/pack.alpha.git"
:sha "c70740ffc10805f34836da2160fa1899601fac02"}}
:main-opts ["-m"]}}}
3 changes: 1 addition & 2 deletions project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(defproject manifold "0.1.9-alpha2"
(defproject manifold "0.1.9-alpha4"
:description "a compatibility layer for event-driven abstractions"
:license {:name "MIT License"
:url "http://opensource.org/licenses/MIT"}
Expand All @@ -25,6 +25,5 @@
:global-vars {*warn-on-reflection* true}
:jvm-opts ^:replace ["-server"
"-XX:-OmitStackTraceInFastThrow"
"-XX:+UseConcMarkSweepGC"
"-Xmx2g"
"-XX:NewSize=1g"])
2 changes: 1 addition & 1 deletion src/manifold/deferred.clj
Original file line number Diff line number Diff line change
Expand Up @@ -1341,7 +1341,7 @@
gensyms))
set)
dep? (set/union binding-dep? body-dep?)]
`(let [executor# (manifold.executor/executor)]
`(let [executor# (or (manifold.executor/executor) (ex/execute-pool))]
(manifold.executor/with-executor nil
(let [~@(mapcat
(fn [n var val gensym]
Expand Down
8 changes: 6 additions & 2 deletions src/manifold/executor.clj
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,23 @@

(defn ^ThreadFactory thread-factory
([name-generator executor-promise]
(thread-factory name-generator executor-promise nil))
(thread-factory name-generator executor-promise nil true))
([name-generator executor-promise stack-size]
(thread-factory name-generator executor-promise stack-size true))
([name-generator executor-promise stack-size daemon?]
(reify ThreadFactory
(newThread [_ runnable]
(let [name (name-generator)
curr-loader (.getClassLoader (class thread-factory))
f #(do
(.set executor-thread-local @executor-promise)
(.run ^Runnable runnable))]
(doto
(if stack-size
(Thread. nil f name stack-size)
(Thread. nil f name))
(.setDaemon true)))))))
(.setDaemon daemon?)
(.setContextClassLoader curr-loader)))))))

;;;

Expand Down
2 changes: 1 addition & 1 deletion src/manifold/stream.clj
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@

(defn try-put!
"Puts a value into a stream if the put can successfully be completed in `timeout`
milliseconds. Returns a promiise that yields `true` if it succeeds, and `false`
milliseconds. Returns a promise that yields `true` if it succeeds, and `false`
if it fails or times out. Guaranteed to be non-blocking.
A special `timeout-val` may be specified, if it is important to differentiate
Expand Down
4 changes: 2 additions & 2 deletions src/manifold/time.clj
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,8 @@
(.setRemoveOnCancelPolicy true))))]
(def ^:dynamic ^IClock *clock*
(reify IClock
(in [_ f interval] (.in ^IClock @clock f interval))
(every [_ f delay period] (.every ^IClock @clock f delay period)))))
(in [_ interval f] (.in ^IClock @clock interval f))
(every [_ delay period f] (.every ^IClock @clock delay period f)))))

(defmacro with-clock
"Ensures that all calls to `every` and `in` are made through the specified clock, rather
Expand Down
18 changes: 17 additions & 1 deletion test/manifold/executor_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
Executor
Executor$Controller]
[java.util.concurrent
LinkedBlockingQueue]))
ExecutorService
Executors
LinkedBlockingQueue
ThreadFactory]))

(deftest test-instrumented-executor-uses-thread-factory
(let [thread-count (atom 0)
Expand All @@ -24,3 +27,16 @@
thread-names (LinkedBlockingQueue. 1)]
(.execute ^Executor executor #(.put thread-names (.getName (Thread/currentThread))))
(is (contains? #{(str threadpool-prefix 1) (str threadpool-prefix 2)} (.take thread-names)))))

(deftest test-rt-dynamic-classloader
(let [num-threads (atom 0)
in-thread-loader (promise)
tf (e/thread-factory
#(str "my-loader-prefix-" (swap! num-threads inc))
(deliver (promise) nil))
executor (Executors/newFixedThreadPool 1 ^ThreadFactory tf)]
(.execute ^ExecutorService executor
(fn []
(let [l (clojure.lang.RT/baseLoader)]
(deliver in-thread-loader l))))
(is (instance? clojure.lang.DynamicClassLoader @in-thread-loader))))
4 changes: 2 additions & 2 deletions test/manifold/stream_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -389,8 +389,8 @@

(deftest test-periodically
(testing "produces with delay"
(let [s (s/periodically 10 0 (constantly 1))]
(Thread/sleep 15)
(let [s (s/periodically 20 0 (constantly 1))]
(Thread/sleep 30)
(s/close! s)
;; will produces 2 items here no matter the sleep amount
;; as the periodically stream has a buffer of 1
Expand Down
3 changes: 1 addition & 2 deletions test/manifold/test_utils.clj
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,5 @@
`(do
(println "\n-----\n" ~name "\n-----\n")
(c/quick-bench
(do ~@body)
:reduce-with #(and %1 %2))))
(do ~@body))))

29 changes: 28 additions & 1 deletion test/manifold/tsasvla_test.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
(ns manifold.tsasvla-test
(:require [clojure.test :refer :all]
[manifold.tsasvla :refer [tsasvla <!?]]
[manifold.deferred :as d])
[manifold.deferred :as d]
[manifold.test-utils :refer :all])
(:import (java.util.concurrent TimeoutException)))

(deftest async-test
Expand Down Expand Up @@ -119,3 +120,29 @@
(is (= ::timeout @(d/alt (tsasvla (<!? (d/deferred))) (d/timeout! (d/deferred) 10 ::timeout))))
(is (= 1 @(tsasvla (<!? (d/alt (d/deferred) (d/success-deferred 1))))))
(is (= 1 @(d/alt (tsasvla (<!? (d/deferred))) (d/success-deferred 1))))))

(deftest ^:benchmark benchmark-chain
(bench "invoke comp x1"
((comp inc) 0))
(bench "tsasvla x1"
@(tsasvla (inc (<!? 0))))
(bench "tsasvla deferred x1"
@(tsasvla (inc (<!? (d/success-deferred 0)))))
(bench "tsasvla future 200 x1"
@(tsasvla (inc (<!? (d/future (Thread/sleep 200) 0)))))
(bench "invoke comp x2"
((comp inc inc) 0))
(bench "tsasvla x2"
@(tsasvla (inc (<!? (inc (<!? 0))))))
(bench "tsasvla deferred x2"
@(tsasvla (inc (<!? (inc (<!? (d/success-deferred 0)))))))
(bench "tsasvla future 200 x2"
@(tsasvla (inc (<!? (inc (<!? (d/future (Thread/sleep 200) 0)))))))
(bench "invoke comp x5"
((comp inc inc inc inc inc) 0))
(bench "tsasvla x5"
@(tsasvla (inc (<!? (inc (<!? (inc (<!? (inc (<!? (inc (<!? 0))))))))))))
(bench "tsasvla deferred x5"
@(tsasvla (inc (<!? (inc (<!? (inc (<!? (inc (<!? (inc (<!? (d/success-deferred 0)))))))))))))
(bench "tsasvla future 200 x5"
@(tsasvla (inc (<!? (inc (<!? (inc (<!? (inc (<!? (inc (<!? (d/future (Thread/sleep 200) 0))))))))))))))

0 comments on commit 65dd5f2

Please sign in to comment.