Skip to content
This repository has been archived by the owner on Nov 9, 2017. It is now read-only.

Commit

Permalink
add manifold interop, mark 0.5.3
Browse files Browse the repository at this point in the history
  • Loading branch information
ztellman committed Aug 7, 2014
1 parent 0d859ad commit be3e0d1
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 32 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ local-push
*DS_Store
autodoc
.lein*
target/**
target/**
.nrepl*
2 changes: 1 addition & 1 deletion README.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Lamina is for describing and analyzing streams of data. It provides a rich set
Add the following to the `:dependencies` section of your `project.clj` file:

```clj
[lamina "0.5.2"]
[lamina "0.5.3"]
```

## Rationale
Expand Down
5 changes: 3 additions & 2 deletions project.clj
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
(defproject lamina "0.5.3-SNAPSHOT"
(defproject lamina "0.5.3"
:description "event-driven data structures for clojure"
:min-lein-version "2.0.0"
:dependencies [[org.clojure/tools.logging "0.2.6"]
[org.flatland/useful "0.11.1"]
[potemkin "0.3.4"]]
[potemkin "0.3.8"]
[manifold "0.1.0-alpha2"]]
:exclusions [org.clojure/contrib
org.clojure/clojure-contrib]
:profiles {:dev {:dependencies [[org.clojure/clojure "1.5.1"]
Expand Down
110 changes: 104 additions & 6 deletions src/lamina/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
(:use
[potemkin])
(:require
[manifold
[stream :as s]
[deferred :as d]]
[lamina.time :as t]
[lamina.core.watch :as w]
[lamina.core.named :as n]
Expand Down Expand Up @@ -77,7 +80,7 @@
last*
partition*
partition-all*

receive-in-order
emit-in-order
combine-latest
Expand Down Expand Up @@ -190,8 +193,8 @@
(u/error channel err true))

(defn siphon
"Takes all messages from `src` and forwards them to `dst`. If `dst` closes, `src` is closed, but
not vise-versa. Error states are similarly propagated. This is useful for many transient channels
"Takes all messages from `src` and forwards them to `dst`. If `dst` closes, `src` is closed, but
not vise-versa. Error states are similarly propagated. This is useful for many transient channels
feeding into one channel.
If more than two channels are specified, `siphon` becomes transitive. `(siphon a b c)` is equivalent to
Expand All @@ -205,8 +208,8 @@
(apply siphon dst rest)))

(defn join
"Takes all messages from `src` and forwards them to `dst`. If either channel closes or goes into an
error state, the same is done for the other channel. This is useful for channels which have a 1-to-1
"Takes all messages from `src` and forwards them to `dst`. If either channel closes or goes into an
error state, the same is done for the other channel. This is useful for channels which have a 1-to-1
relationship.
If more than two channels are specified, `join` becomes transitive. `(join a b c)` is equivalent to
Expand Down Expand Up @@ -353,7 +356,7 @@
ch (tap channel)]

(ground ch)

(p/run-pipeline ch
{:error-handler (fn [_])}
#(read-channel* %
Expand Down Expand Up @@ -398,3 +401,98 @@
([result-channel timeout]
@(with-timeout timeout result-channel)))

;;;

(s/def-source LaminaChannelSource
[ch]

(isSynchronous [_] false)

(description [this]
{:source? true
:drained? (drained? ch)
:type "lamina"})

(close [_]
(close ch))

(take [this blocking? default-val]
(let [d (p/run-pipeline
(read-channel* ch
:on-error default-val)
{:error-handler (fn [x] (p/complete default-val))}
(fn [x]
(if (identical? :lamina/drained! x)
(do
(.markDrained this)
default-val)
x)))]
(if blocking?
@d
d)))

(take [this blocking? default-val timeout timeout-val]
(let [d (p/run-pipeline
(read-channel* ch
:timeout timeout
:on-timeout timeout-val
:on-error default-val)
{:error-handler (fn [x] (p/complete default-val))}
(fn [x]
(if (identical? :lamina/drained! x)
(do
(.markDrained this)
default-val)
x)))]
(if blocking?
@d
d))))

(s/def-sink LaminaChannelSink
[ch]

(isSynchronous [_] false)

(description [this]
{:sink? true
:closed? (closed? ch)
:type "lamina"})

(close [this]
(close ch))

(put [this x blocking?]

(let [x (enqueue ch x)
x (cond
(r/async-promise? x)
(p/run-pipeline x
{:error-handler (fn [_])}
(fn [_] true))

(or
(identical? :lamina/closed! x)
(identical? :lamina/error! x))
(d/success-deferred false)

:else
(d/success-deferred true))]
(if blocking?
@x
x)))

(put [this x blocking? timeout timeout-val]

(.put this x blocking?)))

(extend-protocol s/Sourceable

lamina.core.channel.IChannel
(to-source [ch]
(->LaminaChannelSource ch)))

(extend-protocol s/Sinkable

lamina.core.channel.IChannel
(to-source [ch]
(->LaminaChannelSink ch)))
6 changes: 2 additions & 4 deletions src/lamina/core/channel.clj
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
[java.io
Writer]))



;;;

(definterface+ IChannel
Expand Down Expand Up @@ -220,7 +218,7 @@
`:predicate` - a function that takes the message, and returns true if it should be consumed. If the
predicate returns false, the returned result will realize as value defined by `:on-false`.
`:result` - the result that the read message should be enqueued into. If the same result is used for
`:result` - the result that the read message should be enqueued into. If the same result is used for
`read-channel` calls from multiple channels, this will have the effect of being realized
as the first message from any of the channels, and not consuming any messages from the other
channels.
Expand Down Expand Up @@ -402,7 +400,7 @@

(defn bridge
"A generalization of `bridge-join` and `bridge-siphon`. Takes one `src` channel, and one or
more downstream `dsts` channels. All messages from `src` will be passed into `callback`, which
more downstream `dsts` channels. All messages from `src` will be passed into `callback`, which
may or may not forward it to the downstream channels.
This represents a relationship between channels which may or may not always result in messages
Expand Down
47 changes: 29 additions & 18 deletions src/lamina/core/result.clj
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
[potemkin]
[lamina.core.utils])
(:require
[manifold.deferred :as d]
[lamina.core.return-codes :as codes]
[lamina.core.lock :as l]
[lamina.time :as t])
Expand Down Expand Up @@ -159,7 +160,7 @@
(meta [_] metadata)
(alterMeta [_ _ _] (throw (Exception. "not implemented, use .resetMeta instead")))
(resetMeta [_ m] (set! metadata m))

IResult
(add-listener [_ listener]
(.add listeners listener))
Expand Down Expand Up @@ -230,15 +231,15 @@
(assoc-record ^ResultState state# :mode ::claimed)
:lamina/:already-claimed!)
(assoc-record ^ResultState state# :mode ::claimed)))

::claimed
(case signal#
::add (assoc-record ^ResultState state# :subscribers (inc (.subscribers state#)))
::remove (assoc-record ^ResultState state# :subscribers (dec (.subscribers state#)))
::success! (assoc-record ^ResultState state# :mode ::success, :value value#)
::error! (assoc-record ^ResultState state# :mode ::error, :value value#)
(::success ::error ::claim) :lamina/already-claimed!)

(::success ::error)
:lamina/already-realized!))))

Expand All @@ -253,15 +254,15 @@
(if (keyword? s#)
s#
(set-state ~this s#))))]

(if (keyword? s#)
s#
(let [^ResultState s# s#]
(case (int (.subscribers s#))

0
:lamina/realized

1
(try
(let [result# ((~f ^ResultCallback (.poll ~subscribers)) (.value s#))]
Expand All @@ -271,13 +272,13 @@
(log-error e# "Error in result callback.")
(error-to-listeners listeners# e#)
:lamina/error!))

(let [value# (.value s#)
^{:tag "objects"} ary# (object-array (.size ~subscribers))]

(loop [idx# 0]
(when-let [^ResultCallback c# (.poll ~subscribers)]

(try
(let [result# ((~f c#) value#)]
(enqueue-to-listeners listeners# result#)
Expand All @@ -286,7 +287,7 @@
(error-to-listeners listeners# e#)
(aset ary# idx# :lamina/error!)
(log-error e# "Error in result callback.")))

(recur (unchecked-inc idx#))))

(result-seq ary#))))))))
Expand All @@ -299,7 +300,7 @@
`(
clojure.lang.IPending
(isRealized [this#] (boolean (result this#)))

clojure.lang.IBlockingDeref
(deref [this# timeout-ms# timeout-val#]
(let [r# (result-channel)]
Expand All @@ -315,6 +316,17 @@
result-channel
result-callback)

(extend-protocol d/Deferrable

IResult
(to-deferred [res]
(let [d (d/deferred)]
(subscribe res
(result-callback
#(d/success! d %)
#(d/error! d %)))
d)))

(def-result-channel
[^Lock lock
^{:volatile-mutable true :tag ResultState} state
Expand All @@ -328,7 +340,7 @@
(success this msg))

IError

(error [this err _]
(compare-and-trigger
[this error lock state subscribers listeners]
Expand All @@ -338,7 +350,7 @@

clojure.lang.IMeta
clojure.lang.IReference

(meta [_] metadata)
(alterMeta [_ _ _] (throw (Exception. "not implemented, use .resetMeta instead")))
(resetMeta [_ m] (set! metadata m))
Expand Down Expand Up @@ -368,7 +380,7 @@
(subscribe this (ResultCallback. f f))
(.await latch)
(deref this))))))

IResult

(add-listener [_ listener]
Expand All @@ -381,7 +393,7 @@
(success [this val]
(compare-and-trigger
[this success lock state subscribers listeners]
::success .on-success val))
::success .on-success val))

;;
(success! [this val]
Expand Down Expand Up @@ -567,7 +579,7 @@
(if (zero? (.decrementAndGet counter))
(success-result (seq ary))
combined-result)

(let [r (first results)]
(if-not (async-promise? r)

Expand All @@ -577,7 +589,7 @@
(.decrementAndGet counter)
(recur (inc idx) (rest results)))


(case (result r)

;; just return the error
Expand Down Expand Up @@ -613,4 +625,3 @@

(defmethod print-method ResultChannel [o ^Writer w]
(.write w (str o)))

0 comments on commit be3e0d1

Please sign in to comment.