Skip to content

Commit

Permalink
add , and fix doc-string
Browse files Browse the repository at this point in the history
  • Loading branch information
ztellman committed Apr 12, 2015
1 parent 2b72c80 commit a59a6cc
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 24 deletions.
2 changes: 1 addition & 1 deletion project.clj
@@ -1,4 +1,4 @@
(defproject manifold "0.1.0-beta12"
(defproject manifold "0.1.0-SNAPSHOT"
:description "a compatibility layer for event-driven abstractions"
:license {:name "MIT License"
:url "http://opensource.org/licenses/MIT"}
Expand Down
55 changes: 48 additions & 7 deletions src/manifold/stream.clj
Expand Up @@ -39,6 +39,8 @@
LinkedList
Iterator]))

(set! *unchecked-math* true)

(utils/when-core-async
(require 'manifold.stream.async))

Expand Down Expand Up @@ -347,7 +349,7 @@
|:---|:---
| `permanent?` | if `true`, the channel cannot be closed
| `buffer-size` | the number of messages that can accumulate in the channel before backpressure is applied
| `description` | the description of the channel, typically a map of properties
| `description` | the description of the channel, which is a single arg function that takes the base properties and returns an enriched map.
| `executor` | the `java.util.concurrent.Executor` that will execute all callbacks registered on the deferreds returns by `put!` and `take!`
| `xform` | a transducer which will transform all messages that are enqueued via `put!` before they are dequeued via `take!`."
{:arglists '[[{:keys [permanent? buffer-size description executor xform]}]]}
Expand Down Expand Up @@ -754,7 +756,7 @@
([buffer-size]
(buffered-stream (constantly 1) buffer-size))
([metric limit]
(buffered-stream metric limit nil))
(buffered-stream metric limit identity))
([metric limit description]
(let [buf (stream Integer/MAX_VALUE)
buffer-size (AtomicLong. 0)
Expand Down Expand Up @@ -782,11 +784,11 @@
(close [_]
(.close ^IEventStream buf))
(description [_]
(merge
(manifold.stream/description buf)
description
{:buffer-size (.get buffer-size)
:buffer-capacity limit}))
(description
(merge
(manifold.stream/description buf)
{:buffer-size (.get buffer-size)
:buffer-capacity limit})))
(weakHandle [this ref-queue]
(or @handle
(do
Expand Down Expand Up @@ -915,6 +917,45 @@

(source-only s'))))

(defn throttle
"Limits the `max-rate` that messages are emitted, per second.
The `max-backlog` dictates how much \"memory\" the throttling mechanism has, or how many
messages it will emit immediately after a long interval without any messages. By default,
this is set to one second's worth."
([max-rate s]
(throttle max-rate max-rate s))
([max-rate max-backlog s]
(let [buf (stream)
s' (stream)
period (double (/ 1000 max-rate))]

(connect-via-proxy s buf s' {:upstream? true, :description {:op "throttle"}})

(d/loop [backlog 0.0, read-start (System/currentTimeMillis)]
(d/chain (take! buf ::none)

(fn [msg]
(if (identical? ::none msg)
(do
(close! s')
false)
(put! s' msg)))

(fn [result]
(when result
(let [elapsed (double (- (System/currentTimeMillis) read-start))
backlog' (+ backlog (- (/ elapsed period) 1))]
(if (<= 1 backlog')
(- backlog' 1.0)
(d/timeout! (d/deferred) (- period elapsed) 0.0)))))

(fn [backlog]
(when backlog
(d/recur backlog (System/currentTimeMillis))))))

s')))

;;;

(alter-meta! #'->Callback assoc :private true)
Expand Down
2 changes: 1 addition & 1 deletion src/manifold/stream/default.clj
Expand Up @@ -287,7 +287,7 @@
:or {permanent? false}}]
(let [consumers (LinkedList.)
producers (LinkedList.)
buffer-size (long buffer-size)
buffer-size (long (or buffer-size 0))
messages (when buffer-size (ArrayDeque.))
buffer-size (if buffer-size (long (Math/max 0 buffer-size)) 0)
add! (add! producers consumers messages buffer-size executor)
Expand Down
24 changes: 9 additions & 15 deletions src/manifold/time.clj
Expand Up @@ -127,29 +127,23 @@

;;;

(let [scheduler (delay
(ScheduledThreadPoolExecutor.
1
(utils/thread-factory (constantly "manifold-scheduler-queue"))))
num-cores (.availableProcessors (Runtime/getRuntime))
(let [num-cores (.availableProcessors (Runtime/getRuntime))
cnt (atom 0)
executor (delay
(Executors/newFixedThreadPool
scheduler (delay
(ScheduledThreadPoolExecutor.
num-cores
(utils/thread-factory #(str "manifold-scheduler-" (swap! cnt inc)))))]
(utils/thread-factory (constantly "manifold-scheduler-queue"))))]

(defn in
"Schedules no-arg function `f` to be invoked in `interval` milliseconds. Returns a deferred
representing the returned value of the function."
[^double interval f]
(let [d (manifold.deferred/deferred)
f (fn []
(let [f (fn []
(try
(manifold.deferred/success! d (f))
(catch Throwable e
(manifold.deferred/error! d e))))]
(.execute ^Executor @executor ^Runnable f)))]
(try
(manifold.deferred/success! d (f))
(catch Throwable e
(manifold.deferred/error! d e))))]
(.schedule ^ScheduledThreadPoolExecutor @scheduler
^Runnable f
(long (* interval 1e3))
Expand All @@ -176,7 +170,7 @@
(throw e))))]
(deliver future-ref
(.scheduleAtFixedRate ^ScheduledThreadPoolExecutor @scheduler
^Runnable (fn [] (.execute ^Executor @executor ^Runnable f))
^Runnable f
(long (* initial-delay 1e3))
(long (* period 1e3))
TimeUnit/MICROSECONDS))
Expand Down

0 comments on commit a59a6cc

Please sign in to comment.