/
queue.clj
106 lines (90 loc) · 4.05 KB
/
queue.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
(ns qbits.tape.queue
(:require [qbits.commons.enum :as enum]
[qbits.commons.jvm :as jvm]
[clojure.tools.logging :as log]
[clojure.core.protocols :as p]
[qbits.tape.cycle-listener :as cycle-listener]
[qbits.tape.codec.fressian :as fressian.codec])
(:import (net.openhft.chronicle.queue ChronicleQueue
RollCycles
ExcerptTailer)
(net.openhft.chronicle.queue.impl.single SingleChronicleQueueBuilder
SingleChronicleQueue)))
(set! *warn-on-reflection* true)
(def ->roll-cycle (enum/enum->fn RollCycles))
(defprotocol IQueue
(codec [q] "Returns codec to be used with queue instance")
(close! [q] "Closes the queue")
(closed? [q] "Returns true if the queue is closed")
(underlying-queue ^net.openhft.chronicle.queue.impl.single.SingleChronicleQueue [q]
"Returns the underlying chronicle-queue instance"))
(defn ^java.io.Closeable make
"Return a queue instance that will create/bind to a directory
* `roll-cycle` roll-cycle determines how often you create a new
Chronicle Queue data file. Can be `:minutely`, `:daily`,
`:test4-daily`, `:test-hourly`, `:hourly`, `:test-secondly`,
`:huge-daily-xsparse`, `:test-daily`, `:large-hourly-xsparse`,
`:large-daily`, `:test2-daily`, `:xlarge-daily`, `:huge-daily`,
`:large-hourly`, `:small-daily`, `:large-hourly-sparse`
* `autoclose-on-jvm-exit?` wheter to cleanly close the queue on jvm
exit (defaults to true)
* `cycle-release-tasks` Tasks to run on queue release. See
qbits.tape.cycle-listener
* `cycle-acquire-tasks` Tasks to run on queue acquisition. See
qbits.tape.cycle-listener
* `codec` qbits.tape.codec/ICodec instance that will be used to
encode/decode messages. Default to qbits.tape.codec.fressian/default"
([dir]
(make dir nil))
([dir {:keys [roll-cycle autoclose-on-jvm-exit?
cycle-release-tasks
cycle-acquire-tasks
codec]
:or {roll-cycle :small-daily
autoclose-on-jvm-exit? true
codec fressian.codec/default}}]
(let [^SingleChronicleQueue queue
(cond-> (ChronicleQueue/singleBuilder ^String dir)
roll-cycle
(.rollCycle (->roll-cycle roll-cycle))
(or (seq cycle-release-tasks)
(seq cycle-acquire-tasks))
(.storeFileListener (cycle-listener/make dir
(->roll-cycle roll-cycle)
{:release-tasks cycle-release-tasks
:acquire-tasks cycle-acquire-tasks}))
:then (.build))
q (reify
IQueue
(closed? [this]
(.isClosed queue))
(close! [this]
(.close queue))
(codec [this]
codec)
(underlying-queue [this] queue)
java.io.Closeable
(close [this] (.close queue))
p/Datafiable
(datafy [this]
#::{:source-id (.sourceId queue)
:last-acknowledged-index-replicated (.lastAcknowledgedIndexReplicated queue)
:last-index-replicated (.lastIndexReplicated queue)
:file (.fileAbsolutePath queue)
:index-count (.indexCount queue)
:index-spacing (.indexSpacing queue)
:roll-cycle (.rollCycle queue)
:delta-checkpoint-interval (.deltaCheckpointInterval queue)
:buffered (.buffered queue)
:cycle (.cycle queue)
:closed? (.isClosed queue)}))]
(when autoclose-on-jvm-exit?
(jvm/add-shutdown-hook!
(fn []
(when-not (closed? q)
(log/infof "JVM exit handler triggered for open Queue: %s"
q)
(close! q)
(log/infof "Closed Queue: %s"
q)))))
q)))