Permalink
Browse files

Fix (reimplement) transducers [ci skip]

  • Loading branch information...
circlespainter committed Jan 27, 2015
1 parent 86f8d7a commit 4650cd5ae4aac27a18becaa5c843e1aca69e474c
View
6 NOTICE
@@ -4,6 +4,12 @@ This product contains modified source code from Continuations Library, by Matthi
* LICENSE: New BSD (http://directory.fsf.org/wiki/License:BSD_3Clause)
* HOMEPAGE: http://www.matthiasmann.de/content/view/24/26/
This product contains modified source code from Clojure
Copyright (c) 2013 Rich Hickey and contributors
* LICENSE: Eclipse Public License v1.0 (http://www.eclipse.org/legal/epl-v10.html)
* HOMEPAGE: https://github.com/clojure/clojure
This product contains modified source code from core.async, a Clojure library designed to provide facilities for async programming and communication by Rich Hickey and contributors:
Copyright (c) 2013 Rich Hickey and contributors
@@ -21,14 +21,15 @@
(:require
[co.paralleluniverse.pulsar.core :as p :refer [defsfn sfn]])
(:import
[co.paralleluniverse.strands.channels QueueObjectChannel TransferChannel TimeoutChannel Channels$OverflowPolicy SendPort ReceivePort Selector SelectAction Channels]
[co.paralleluniverse.strands.channels QueueObjectChannel TransferChannel TimeoutChannel Channels$OverflowPolicy SendPort ReceivePort Selector SelectAction Channels DelegatingSendPort]
[co.paralleluniverse.strands.queues ArrayQueue BoxQueue CircularObjectBuffer]
[java.util.concurrent TimeUnit Executors Executor]
[com.google.common.util.concurrent ThreadFactoryBuilder]
(java.util List Arrays)
(co.paralleluniverse.strands Strand)
(com.google.common.base Function)
(co.paralleluniverse.pulsar DelegatingChannel)))
(co.paralleluniverse.pulsar DelegatingChannel ClojureHelper SendProtocol)
(co.paralleluniverse.fibers Suspendable)))
(alias 'core 'clojure.core)
@@ -58,27 +59,53 @@
[[_ policy]]
(and (not= policy Channels$OverflowPolicy/BLOCK) (not= policy Channels$OverflowPolicy/BACKOFF)))
(defn rx-chan [chan xform ex-handler]
"Returns a new transforming channel based on the one passed as a first argument. The given transformation will
be applied."
(let [transform-and-handle (if xform
(if ex-handler
(fn [val-producer]
(let [val (val-producer)]
(try (xform val)
(catch Throwable t (or (ex-handler t) (throw t))))))
(fn [val-producer] (xform (val-producer))))
(fn [val-producer] (val-producer)))]
(defsfn ^:private ex-handler [ex]
(-> (Thread/currentThread)
.getUncaughtExceptionHandler
(.uncaughtException (Thread/currentThread) ex))
nil)
(defn rx-chan [chan xform exh]
"Returns a new transforming channel based on the one passed as a first argument. The given transducer will
be applied to the add (send) function."
(let [add-reducer-builder
(fn [snd-op]
(sfn
([x] x)
([x v] (snd-op v) x)))
handle-builder
(fn [snd-op]
(sfn [x exh t]
(let [else ((or (p/suspendable! exh) ex-handler) t)]
(if (nil? else)
x
((add-reducer-builder snd-op) x else)))))
xf-add-reducer-builder
(fn [snd-op]
(let [add-reducer (add-reducer-builder snd-op)
handle (handle-builder snd-op)
add! (if xform (p/suspendable! (xform add-reducer)) add-reducer)]
(sfn
([x]
(try
(add! x)
(catch Throwable t
(handle x exh t))))
([x v]
(try
(add! x v)
(catch Throwable t
(handle x exh t)))))))
px
(p/suspendable!
(p/sproxy [DelegatingSendPort SendProtocol] [chan]
(send [v] ((xf-add-reducer-builder (sfn [v] (p/sproxy-super send v))) this v)))
[SendProtocol])]
(cond
(and (nil? xform) (nil? ex-handler))
(and (nil? xform) (nil? exh))
chan
:else
(DelegatingChannel. chan
(.map (Channels/transform chan)
(reify Function
(apply [_ v] (transform-and-handle v))
(equals [this that] (= this that))))
chan))))
(DelegatingChannel. px chan chan))))
(defn chan
"Creates a channel with an optional buffer, an optional transducer
@@ -95,8 +122,8 @@
([buf-or-n xform ex-handler]
(cond
(number? buf-or-n) (chan (buffer buf-or-n) xform ex-handler)
(nil? buf-or-n) (rx-chan (TransferChannel.) xform ex-handler)
:else (rx-chan (QueueObjectChannel. (first buf-or-n) (second buf-or-n) false) xform ex-handler))))
(nil? buf-or-n) (do (when xform (assert buf-or-n "buffer must be supplied when transducer is")) (rx-chan (TransferChannel.) nil nil))
:else (let [buf (first buf-or-n) policy (second buf-or-n)] (rx-chan (QueueObjectChannel. buf policy false) xform ex-handler)))))
(defsfn <!
"Takes a val from port. Must be called inside a (go ...) block. Will
@@ -478,7 +505,7 @@
(close! res)
(put! p res)
true)))
async (fn [[v p :as job]]
async (sfn [[v p :as job]]
(if (nil? job)
(do (close! results) nil)
(let [res (chan 1)]
@@ -1,5 +1,5 @@
; Pulsar: lightweight threads and Erlang-like actors for Clojure.
; Copyright (C) 2013-2014, Parallel Universe Software Co. All rights reserved.
; Copyright (C) 2013-2015, Parallel Universe Software Co. All rights reserved.
;
; This program and the accompanying materials are dual-licensed under
; either the terms of the Eclipse Public License v1.0 as published by
@@ -9,6 +9,12 @@
;
; under the terms of the GNU Lesser General Public License version 3.0
; as published by the Free Software Foundation.
;
;
; `sproxy` is derived from Clojure (https://github.com/clojure/clojure).
; Copyright (C) 2013 Rich Hickey and contributors.
; Distributed under the Eclipse Public License.
;
;;;
;;;
@@ -284,6 +290,71 @@
([f & args]
(strampoline #(apply f args))))
(defmacro sproxy
"A suspendable version of proxy.
class-and-interfaces - a vector of class names
args - a (possibly empty) vector of arguments to the superclass
constructor.
f => (name [params*] body) or
(name ([params*] body) ([params+] body) ...)
Expands to code which creates a instance of a proxy class that
implements the named class/interface(s) by calling the supplied
fns. A single class, if provided, must be first. If not provided it
defaults to Object.
The interfaces names must be valid interface types. If a method fn
is not provided for a class method, the superclass methd will be
called. If a method fn is not provided for an interface method, an
UnsupportedOperationException will be thrown should it be
called. Method fns are closures and can capture the environment in
which proxy is called. Each method fn takes an additional implicit
first arg, which is bound to 'this. Note that while method fns can
be provided to override protected methods, they have no other access
to protected members, nor to super, as these capabilities cannot be
proxied."
[class-and-interfaces args & fs]
(let [bases (map #(or (resolve %) (throw (Exception. (str "Can't resolve: " %))))
class-and-interfaces)
[super interfaces] (#'clojure.core/get-super-and-interfaces bases)
compile-effect (when *compile-files*
(let [[cname bytecode] (#'clojure.core/generate-proxy super interfaces)]
(Compiler/writeClassFile cname bytecode)))
pc-effect (apply get-proxy-class bases)
pname (proxy-name super interfaces)]
;remember the class to prevent it from disappearing before use
(intern *ns* (symbol pname) pc-effect)
`(let [;pc# (get-proxy-class ~@class-and-interfaces)
p# (new ~(symbol pname) ~@args)] ;(construct-proxy pc# ~@args)]
(init-proxy p#
~(loop [fmap {} fs fs]
(if fs
(let [[sym & meths] (first fs)
meths (if (vector? (first meths))
(list meths)
meths)
meths (map (fn [[params & body]]
(cons (apply vector 'this params) body))
meths)]
(if-not (contains? fmap (name sym))
(recur (assoc fmap (name sym) (cons `sfn meths)) (next fs))
(throw (IllegalArgumentException.
(str "Method '" (name sym) "' redefined")))))
fmap)))
p#)))
(defmacro sproxy-super
"A suspendable version of proxy-super.
Use to call a superclass method in the body of a proxy method.
Note, expansion captures 'this"
[meth & args]
`((suspendable! proxy-call-with-super) (sfn [] (. ~'this ~meth ~@args)) ~'this ~(name meth)))
;; ## Fibers
(ann current-fiber [-> Fiber])
@@ -0,0 +1,24 @@
/*
* Pulsar: lightweight threads and Erlang-like actors for Clojure.
* Copyright (C) 2013-2015, Parallel Universe Software Co. All rights reserved.
*
* This program and the accompanying materials are dual-licensed under
* either the terms of the Eclipse Public License v1.0 as published by
* the Eclipse Foundation
*
* or (per the licensee's choosing)
*
* under the terms of the GNU Lesser General Public License version 3.0
* as published by the Free Software Foundation.
*/
package co.paralleluniverse.pulsar;
import co.paralleluniverse.fibers.SuspendExecution;
/**
*
* @author circlespainter
*/
public interface SendProtocol<M> {
void send(M m) throws SuspendExecution, InterruptedException;
}

0 comments on commit 4650cd5

Please sign in to comment.