/
appender.clj
57 lines (51 loc) · 2.08 KB
/
appender.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
(ns qbits.tape.appender
(:require [clojure.core.protocols :as p]
[qbits.tape.queue :as q]
[qbits.tape.codec :as codec])
(:import (net.openhft.chronicle.queue ChronicleQueue
ExcerptAppender)
(net.openhft.chronicle.bytes Bytes)))
(set! *warn-on-reflection* true)
(defprotocol IAppender
(write! [appender x] "Writes a new message to queue.")
(last-index [appender] "Returns the last index this appender appended")
(queue [appender] "Returns associated queue to this appender"))
(defn make
"Creates a new appender instance.
Takes a queue to append to as argument.
You can also call datafy on the appender to get associated data."
([queue]
(make queue nil))
([queue opts]
(let [^ExcerptAppender appender (.acquireAppender (q/underlying-queue queue))
codec (q/codec queue)]
(reify
IAppender
(write! [_ x]
(let [rw (Bytes/wrapForRead (codec/write codec x))
ret (with-open [ctx (.writingDocument appender)]
;; Could throw if the queue is closed in another thread or on
;; thread death: be paranoid here, we dont want to end up with
;; a borked file, trigger rollback on any exception.
(try
(-> ctx .wire .write (.bytes rw))
(.index ctx)
(catch Throwable t
(.rollbackOnClose ctx)
t)))]
(when (instance? Throwable ret)
(throw (ex-info "Appender write failed"
{:type ::write-failed
:appender appender
:msg x}
ret)))
ret))
(last-index [_]
(.lastIndexAppended appender))
(queue [_] queue)
p/Datafiable
(datafy [_]
#::{:cycle (.cycle appender)
:last-index-appended (.lastIndexAppended appender)
:source-id (.sourceId appender)
:queue queue})))))