Re-organise internals - breaking into 4 sub namespaces - peer, decode…
…, encode and util. Also, added zeroconf functionality - allowing servers to register themselves on the local network. OSC handlers are now stored in a tree opening the possibility for a full implementation of incoming OSC message pattern matching. Finally, various listener and handler removal fns were added - handlers and listeners now accept keys to allow them to be individually removed. Handlers now also remove themselves if they return :done
samaaron committed Aug 8, 2011
1 parent 99f1038 commit 97725ed
Showing 8 changed files with 651 additions and 474 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Expand Up @@ -3,3 +3,5 @@ lib
3 changes: 2 additions & 1 deletion project.clj
@@ -1,5 +1,6 @@
(defproject overtone/osc-clj "0.3.0-SNAPSHOT"
:description "An Open Sound Control library for Clojure."
:dependencies [[org.clojure/clojure "1.2.0"]
[org.clojure/clojure-contrib "1.2.0"]]
[org.clojure/clojure-contrib "1.2.0"]
[org.clojars.technomancy/jmdns "3.2.1"]]
:dev-dependencies [[lein-clojars "0.5.0"]])
264 changes: 76 additions & 188 deletions src/osc.clj
@@ -1,223 +1,111 @@
(ns osc
(java.util.concurrent TimeUnit TimeoutException PriorityBlockingQueue)
( InetSocketAddress DatagramSocket DatagramPacket)
(java.nio.channels DatagramChannel AsynchronousCloseException ClosedChannelException)
(java.nio ByteBuffer ByteOrder))
(:use [clojure.set :as set])
(:require [clojure.contrib.fcase]))

(def OSC-SEND-Q-SIZE 42)

(def OSC-TIMETAG-NOW 1) ; Timetag representing right now.
(def SEVENTY-YEAR-SECS 2208988800)
(def BUFFER-SIZE 32768)
(def PAD (byte-array 4))

(def osc-debug* (ref false))

(defn print-debug [& msgs]
(binding [*out* *err*]
(apply println msgs)))

; TODO: Look into using nano-seconds instead of millis
(defn osc-now []

(defn osc-msg
"Returns an OSC message."
[path & args]
(let [type-tag (first args)
type-tag (if (and type-tag (.startsWith type-tag ","))
(.substring type-tag 1)
(with-meta {:path path
:type-tag type-tag
:args (next args)}
{:type :osc-msg})))

(defn osc-msg?
"Is obj an OSC message?"
[obj] (= :osc-msg (type obj)))

(defn osc-bundle
"Returns an OSC bundle, which is a timestamped set of OSC messages and/or bundles."
[timestamp items]
(with-meta {:timestamp timestamp
:items items}
{:type :osc-bundle}))

(defn osc-bundle? [obj] (= :osc-bundle (type obj)))

(load "osc/internals")

; osc-type-tag defined in osc/internals
(defn osc-msg-infer-types
"Returns an OSC message. Infers the types of the args."
[path & args]
(apply osc-msg path (osc-type-tag args) args))
(:use [osc.util]

(defn osc-listen
"Attach a generic listener function that will be called with every incoming osc message."
[peer listener]
"Attach a generic listener function that will be called with every incoming osc message.
(osc-listen s (fn [msg] (println \"listener: \" msg)) :foo)"
[peer listener key]
(alter (:listeners peer) conj listener)))
(alter (:listeners peer) assoc key listener)))

; TODO: change this around so returning :done will result in removal of a handler, and make
; this function accept the handler to remove as an argument.
(defn osc-remove-handler
"Called from within a handler to remove itself."
(dosync (alter *osc-handlers* assoc *current-path*
(set/difference (get @*osc-handlers* *current-path*) #{*current-handler*}))))
(defn osc-rm-listener
"Remove the generic listener associated with the specific key
(osc-rm-listener s :foo)"
[peer key]
(alter (:listeners peer) dissoc key)))

(defn osc-listeners
"Return a seq of all registered listeners
(osc-listeners s) ;=> (:foo)"
(keys @(:listeners peer)))

(defn osc-rm-all-listeners
"Remove all generic listeners associated with peer"
(ref-set (:listeners peer) {})))

(defn osc-handle
"Attach a handler function to receive on the specified path. (Works for both clients and servers.)
(let [server (osc-server PORT)
client (osc-client HOST PORT)
flag (atom false)]
(osc-handle server \"/test\" (fn [msg] (reset! flag true)))
(osc-send client \"/test\" \"i\" 42)
(Thread/sleep 200)
(= true @flag)))
[peer path handler & [one-shot]]
(let [handlers (:handlers peer)
phandlers (get @handlers path #{})
handler (if one-shot
(fn [msg]
(handler msg)
(dosync (alter handlers assoc path (union phandlers #{handler}))))) ; save the handler
"Add a handle fn to an OSC path with the specified key. This handle will be
called when an incoming OSC message matches the supplied path. This may either
be a direct match, or a pattern match if the incoming OSC message uses wild
card chars in its path (Pattern matching not implemented yet).
The path you specify may not contain any of the OSC reserved chars:
# * , ? [ ] { } and whitespace"
([peer path handler] (osc-handle peer path handler handler))
([peer path handler key]
(peer-handle peer path handler key)))

(defn osc-rm-handler
"Remove the handler at path with the specified key. This just removes one
specific handler (if found)"
[peer path key]
(peer-rm-handler peer path key))

(defn osc-rm-handlers
"Remove all the handlers at path."
[peer path]
(peer-rm-handlers peer path))

(defn osc-rm-all-handlers
"Remove all registered handlers for the supplied path (defaulting to /)
This not only removes the handlers associated with the specified path
but also all handlers further down in the path tree. i.e. if handlers
have been registered for both /foo/bar and /foo/bar/baz and
osc-rm-all-handlers is called with /foo/bar, then all handlers associated
with both /foo/bar and /foo/bar/baz will be removed."
([peer] (osc-rm-all-handlers peer "/"))
([peer path] (peer-rm-all-handlers peer path)))

(defn osc-recv
"Receive a single message on an osc path (node) with an optional timeout.
; Wait a max of 250 ms to receive the next incoming OSC message
; addressed to the /magic node.
(osc-recv client \"/magic\" 250)
"Register a one-shot handler which will remove itself once called. If a
timeout is specified, it will return nil if a message matching the path
is not received within timeout milliseconds. Otherwise, it will block
the current thread until a message has been received."
[peer path & [timeout]]
(let [p (promise)]
(osc-handle peer path (fn [msg]
(deliver p msg)))
(let [res (try
(if timeout
(.get (future @p) timeout TimeUnit/MILLISECONDS) ; Blocks until
(catch TimeoutException t

;; We use binding to *osc-msg-bundle* to bundle messages
;; and send combined with an OSC timestamp.
(def *osc-msg-bundle* nil)

(defn osc-send-msg
"Send OSC msg to peer."
[peer msg]
(if @osc-debug*
(print-debug "osc-send-msg: " msg))
(if *osc-msg-bundle*
(swap! *osc-msg-bundle* #(conj %1 msg))
(.put (:send-q peer) [peer (assoc msg :timestamp 0)])))

(defn osc-send-bundle
"Send OSC bundle to peer."
[peer bundle]
(if @osc-debug*
(print-debug "osc-send-bundle: " bundle))
(.put (:send-q peer) [peer bundle]))
(peer-recv peer path timeout))

(defn osc-send
"Creates an OSC message and either sends it to the server immediately
or if a bundle is currently being formed it adds it to the list of messages."
[client path & args]
(osc-send-msg client (apply osc-msg path (osc-type-tag args) args)))

(defmacro in-osc-bundle [client timestamp & body]
`(binding [*osc-msg-bundle* (atom [])]
(let [res# (do ~@body)]
(osc-send-bundle ~client (osc-bundle ~timestamp @*osc-msg-bundle*))

; OSC peers have listeners and handlers. A listener is sent every message received, and
; handlers are dispatched by OSC node (a.k.a. path).
(defn osc-peer [& [listening?]]
(let [chan (DatagramChannel/open)
rcv-buf (ByteBuffer/allocate BUFFER-SIZE)
send-buf (ByteBuffer/allocate BUFFER-SIZE)
send-q (PriorityBlockingQueue. OSC-SEND-Q-SIZE (comparator (fn [a b] (< (:timestamp (second a)) (:timestamp (second b))))))
running? (ref true)
handlers (ref {})
listeners (ref #{(msg-handler-dispatcher handlers)})
send-thread (sender-thread running? send-q send-buf chan)
listen-thread (if listening? (listener-thread chan rcv-buf running? listeners))]
(.configureBlocking chan true)
{:chan chan
:rcv-buf rcv-buf
:send-q send-q
:running? running?
:send-thread send-thread
:listen-thread listen-thread
:listeners listeners
:handlers handlers
:send-fn chan-send}))

(defn osc-client
"Returns an OSC client ready to communicate with a host on a given port.
Use :protocol in the options map to \"tcp\" if you don't want \"udp\"."
[host port]
(let [peer (osc-peer true)
sock (.socket (:chan peer))
local (.getLocalPort sock)]
(.bind sock (InetSocketAddress. local))
(assoc peer
:host (ref host)
:port (ref port)
:addr (ref (InetSocketAddress. host port)))))
(client-peer host port))

(defn osc-target
"Update the target address of an OSC client so future calls to osc-send
will go to a new destination."
will go to a new destination. Automatically updates zeroconf if necessary."
[peer host port]
(ref-set (:host peer) host)
(ref-set (:port peer) port)
(ref-set (:addr peer) (InetSocketAddress. host port))))
(update-peer-target peer host port))

(defn osc-server
"Returns a live OSC server ready to register handler functions."
(let [peer (osc-peer true)
sock (.socket (:chan peer))]
(.bind sock (InetSocketAddress. port))
(assoc peer
:host (ref nil)
:port (ref port)
:addr (ref nil))))
"Returns a live OSC server ready to register handler functions. By default
this also registers the server with zeroconf. However, an optional param
can be passed to disable this."
([port] (osc-server port true))
([port use-zero-conf?] (server-peer port use-zero-conf?)))

(defn osc-close
"Close an osc-peer, also works for clients and servers."
"Close an osc-peer, works for both clients and servers. If peer has been
registered with zeroconf, it will automatically remove it."
[peer & wait]
(dosync (ref-set (:running? peer) false))
(.close (:chan peer))
(when wait
(if (:listen-thread peer)
(if (integer? wait)
(.join (:listen-thread peer) wait)
(.join (:listen-thread peer))))
(if (:send-thread peer)
(if (integer? wait)
(.join (:send-thread peer) wait)
(.join (:send-thread peer))))))
(apply close-peer peer wait))

(defn osc-debug
[& [on-off]]
(let [on-off (if (= on-off false) false true)]
(dosync (ref-set osc-debug* on-off))))

(defn osc-remove-zero-conf
"Unregister any zeroconf services registered in this session"
81 changes: 81 additions & 0 deletions src/osc/decode.clj
@@ -0,0 +1,81 @@
(ns osc.decode
(:use [osc.util]))

(defn osc-align
"Jump the current position to a 4 byte boundary for OSC compatible alignment."
(.position buf (bit-and (bit-not 3) (+ 3 (.position buf)))))

(defn- decode-string [buf]
(let [start (.position buf)]
(while (not (zero? (.get buf))) nil)
(let [end (.position buf)
len (- end start)
str-buf (byte-array len)]
(.position buf start)
(.get buf str-buf 0 len)
(osc-align buf)
(String. str-buf 0 (dec len)))))

(defn- decode-blob [buf]
(let [size (.getInt buf)
blob (byte-array size)]
(.get buf blob 0 size)
(osc-align buf)

(defn- decode-msg
"Pull data out of the message according to the type tag."
(let [path (decode-string buf)
type-tag (decode-string buf)
args (reduce (fn [mem t]
(conj mem
(case t
\i (.getInt buf)
\h (.getLong buf)
\f (.getFloat buf)
\d (.getDouble buf)
\b (decode-blob buf)
\s (decode-string buf))))
(rest type-tag))]
(apply osc-msg path type-tag args)))

(defn- decode-timetag [buf]
(let [tag (.getLong buf)]
(if (= tag OSC-TIMETAG-NOW)
(let [secs (- (bit-shift-right tag 32) SEVENTY-YEAR-SECS)
ms-frac (bit-shift-right (* (bit-and tag (bit-shift-left 0xffffffff 32))
1000) 32)]
(+ (* secs 1000) ; secs as ms

(defn- osc-bundle-buf? [buf]
(let [start-char (char (.get buf))]
(.position buf (- (.position buf) 1))
(= \# start-char)))

(declare osc-decode-packet)

(defn- decode-bundle-items [buf]
(loop [items []]
(if (.hasRemaining buf)
(let [item-size (.getInt buf)
original-limit (.limit buf)
item (do (.limit buf (+ (.position buf) item-size)) (osc-decode-packet buf))]
(.limit buf original-limit)
(recur (conj items item)))

(defn- decode-bundle [buf]
(decode-string buf) ; #bundle
(osc-bundle (decode-timetag buf) (decode-bundle-items buf)))

(defn osc-decode-packet
"Decode an OSC packet buffer into a bundle or message map."
(if (osc-bundle-buf? buf)
(decode-bundle buf)
(decode-msg buf)))

