Skip to content

Commit

Permalink
Using :crux/ prefix for events rather than an internal Crux ns
Browse files Browse the repository at this point in the history
  • Loading branch information
jarohen committed Apr 30, 2020
1 parent 258b080 commit d702d5b
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 55 deletions.
24 changes: 10 additions & 14 deletions crux-core/src/crux/bus.clj
Expand Up @@ -7,39 +7,35 @@
[java.util.concurrent ExecutorService Executors TimeUnit]))

(defprotocol EventSource
(listen [_ f] [_ listen-ops f]))
(listen [_ listen-ops f]))

(defprotocol EventSink
(send [_ event]))

(s/def ::event-type keyword?)
(s/def ::event-types (s/coll-of ::event-type :kind set?))
(s/def :crux/event-type keyword?)

(defmulti event-spec ::event-type, :default ::default)
(defmulti event-spec :crux/event-type, :default ::default)
(defmethod event-spec ::default [_] any?)

(s/def ::event (s/and (s/keys :req [::event-type])
(s/multi-spec event-spec ::event-type)))
(s/def ::event (s/and (s/keys :req [:crux/event-type])
(s/multi-spec event-spec :crux/event-type)))

(defrecord EventBus [!listeners]
EventSource
(listen [this listen-ops f]
(let [{::keys [event-types]} listen-ops]
(let [{:crux/keys [event-type]} listen-ops]
(swap! !listeners
conj {:executor (Executors/newSingleThreadExecutor (cio/thread-factory "bus-listener"))
:f f
::event-types event-types})
:crux/event-type event-type})
nil))

(listen [this f]
(listen this {} f))

EventSink
(send [_ {::keys [event-type] :as event}]
(send [_ {:crux/keys [event-type] :as event}]
(s/assert ::event event)

(doseq [{:keys [^ExecutorService executor f ::event-types]} @!listeners]
(when (or (nil? event-types) (contains? event-types event-type))
(doseq [{:keys [^ExecutorService executor f] :as listener} @!listeners]
(when (= event-type (:crux/event-type listener))
(.submit executor ^Runnable #(f event)))))

Closeable
Expand Down
8 changes: 4 additions & 4 deletions crux-core/src/crux/query.clj
Expand Up @@ -1213,12 +1213,12 @@
query-id (str (UUID/randomUUID))
safe-query (-> conformed-query .q-normalized (dissoc :args))]
(when bus
(bus/send bus {:crux.bus/event-type ::submitted-query
(bus/send bus {:crux/event-type ::submitted-query
::query safe-query
::query-id query-id}))
(let [ret (q this conformed-query)]
(when bus
(bus/send bus {:crux.bus/event-type ::completed-query
(bus/send bus {:crux/event-type ::completed-query
::query safe-query
::query-id query-id}))
ret)))
Expand All @@ -1234,13 +1234,13 @@
query-id (str (UUID/randomUUID))
safe-query (-> conformed-query .q-normalized (dissoc :args))]
(when bus
(bus/send bus {:crux.bus/event-type ::submitted-query
(bus/send bus {:crux/event-type ::submitted-query
::query safe-query
::query-id query-id}))
(cio/->cursor (fn []
(.close index-store)
(when bus
(bus/send bus {:crux.bus/event-type ::completed-query
(bus/send bus {:crux/event-type ::completed-query
::query safe-query
::query-id query-id})))
(q this index-store conformed-query))))
Expand Down
8 changes: 4 additions & 4 deletions crux-core/src/crux/tx.clj
Expand Up @@ -467,15 +467,15 @@
(or (idx/keep-non-evicted-doc (db/get-single-object object-store index-store (c/new-id k)))
(idx/evicted-doc? doc)))))
not-empty)]
(bus/send bus {::bus/event-type ::indexing-docs, :doc-ids (set (keys docs))})
(bus/send bus {:crux/event-type ::indexing-docs, :doc-ids (set (keys docs))})

(let [bytes-indexed (db/index-docs indexer docs-to-upsert)
docs-stats (->> (vals docs-to-upsert)
(map #(idx/doc-predicate-stats % false)))]

(db/put-objects object-store docs-to-upsert)

(bus/send bus {::bus/event-type ::indexed-docs,
(bus/send bus {:crux/event-type ::indexed-docs,
:doc-ids (set (keys docs))
:av-count (->> (vals docs) (apply concat) (count))
:bytes-indexed bytes-indexed})
Expand All @@ -486,7 +486,7 @@
(s/assert :crux.tx.event/tx-events tx-events)

(log/debug "Indexing tx-id:" tx-id "tx-events:" (count tx-events))
(bus/send bus {::bus/event-type ::indexing-tx, ::submitted-tx tx})
(bus/send bus {:crux/event-type ::indexing-tx, ::submitted-tx tx})

(with-open [index-store (db/open-index-store indexer)]
(binding [*current-tx* (assoc tx :crux.tx.event/tx-events tx-events)]
Expand Down Expand Up @@ -517,7 +517,7 @@
(log/warn "Transaction aborted:" (cio/pr-edn-str tx-events) (cio/pr-edn-str tx-time) tx-id)
(db/mark-tx-as-failed indexer tx)))

(bus/send bus {::bus/event-type ::indexed-tx, ::submitted-tx tx, :committed? committed?})
(bus/send bus {:crux/event-type ::indexed-tx, ::submitted-tx tx, :committed? committed?})

{:tombstones (when committed?
(:tombstones res))}))))
Expand Down
20 changes: 7 additions & 13 deletions crux-core/test/crux/bus_test.clj
Expand Up @@ -4,24 +4,18 @@
(:import [crux.bus EventBus]))

(t/deftest test-bus
(let [!unfiltered-events (atom [])
!filtered-events (atom [])]
(let [!events (atom [])]
(with-open [bus ^EventBus (bus/->EventBus (atom #{}))]
(bus/send bus {::bus/event-type :foo, :value 1})
(bus/send bus {:crux/event-type :foo, :value 1})

(bus/listen bus {::bus/event-types #{:foo}} #(swap! !filtered-events conj %))
(bus/listen bus #(swap! !unfiltered-events conj %))
(bus/listen bus {:crux/event-type :foo} #(swap! !events conj %))

(bus/send bus {::bus/event-type :foo, :value 2})
(bus/send bus {::bus/event-type :bar, :value 1})
(bus/send bus {:crux/event-type :foo, :value 2})
(bus/send bus {:crux/event-type :bar, :value 1})

;; just to ensure all the jobs are handled
;; - we don't guarantee this if the node is shut down
(Thread/sleep 100))

(t/is (= [{::bus/event-type :foo, :value 2}
{::bus/event-type :bar, :value 1}]
@!unfiltered-events))

(t/is (= [{::bus/event-type :foo, :value 2}]
@!filtered-events))))
(t/is (= [{:crux/event-type :foo, :value 2}]
@!events))))
12 changes: 6 additions & 6 deletions crux-metrics/src/crux/metrics/indexer.clj
Expand Up @@ -15,7 +15,7 @@
(defn assign-tx-latency-gauge [registry {:crux.node/keys [bus]}]
(let [!last-tx-lag (atom 0)]
(bus/listen bus
{:crux.bus/event-types #{:crux.tx/indexed-tx}}
{:crux/event-type :crux.tx/indexed-tx}
(fn [{::tx/keys [submitted-tx]}]
(reset! !last-tx-lag (- (System/currentTimeMillis)
(.getTime ^Date (::tx/tx-time submitted-tx))))))
Expand All @@ -27,7 +27,7 @@
(defn assign-doc-meter [registry {:crux.node/keys [bus]}]
(let [meter (dropwizard/meter registry ["indexer" "indexed-docs"])]
(bus/listen bus
{:crux.bus/event-types #{:crux.tx/indexed-docs}}
{:crux/event-type :crux.tx/indexed-docs}
(fn [{:keys [doc-ids]}]
(dropwizard/mark! meter (count doc-ids))))

Expand All @@ -36,15 +36,15 @@
(defn assign-av-meter [registry {:crux.node/keys [bus]}]
(let [meter (dropwizard/meter registry ["indexer" "indexed-avs"])]
(bus/listen bus
{:crux.bus/event-types #{:crux.tx/indexed-docs}}
{:crux/event-type :crux.tx/indexed-docs}
(fn [{:keys [av-count]}]
(dropwizard/mark! meter av-count)))
meter))

(defn assign-bytes-meter [registry {:crux.node/keys [bus]}]
(let [meter (dropwizard/meter registry ["indexer" "indexed-bytes"])]
(bus/listen bus
{:crux.bus/event-types #{:crux.tx/indexed-docs}}
{:crux/event-types :crux.tx/indexed-docs}
(fn [{:keys [bytes-indexed]}]
(dropwizard/mark! meter bytes-indexed)))

Expand All @@ -54,12 +54,12 @@
(let [timer (dropwizard/timer registry ["indexer" "indexed-txs"])
!timer (atom nil)]
(bus/listen bus
{:crux.bus/event-types #{:crux.tx/indexing-tx}}
{:crux/event-type :crux.tx/indexing-tx}
(fn [_]
(reset! !timer (dropwizard/start timer))))

(bus/listen bus
{:crux.bus/event-types #{:crux.tx/indexed-tx}}
{:crux/event-type :crux.tx/indexed-tx}
(fn [_]
(let [[ctx _] (reset-vals! !timer nil)]
(dropwizard/stop ctx))))
Expand Down
4 changes: 2 additions & 2 deletions crux-metrics/src/crux/metrics/query.clj
Expand Up @@ -6,11 +6,11 @@
[registry {:crux.node/keys [bus]}]
(let [!timer-store (atom {})
query-timer (dropwizard/timer registry ["query" "timer"])]
(bus/listen bus {:crux.bus/event-types #{:crux.query/submitted-query}}
(bus/listen bus {:crux/event-type :crux.query/submitted-query}
(fn [event]
(swap! !timer-store assoc (:crux.query/query-id event) (dropwizard/start query-timer))))

(bus/listen bus {:crux.bus/event-types #{:crux.query/completed-query}}
(bus/listen bus {:crux/event-type :crux.query/completed-query}
(fn [event]
(dropwizard/stop (get @!timer-store (:crux.query/query-id event)))
(swap! !timer-store dissoc (:crux.query/query-id event))))
Expand Down
25 changes: 13 additions & 12 deletions crux-test/test/crux/tx_test.clj
Expand Up @@ -773,14 +773,15 @@

(t/deftest raises-tx-events-422
(let [!events (atom [])
!latch (promise)]
(bus/listen (get-in (meta *api*) [::n/topology ::n/bus])
{::bus/event-types #{::tx/indexing-docs ::tx/indexed-docs
::tx/indexing-tx ::tx/indexed-tx}}
#(do
(swap! !events conj %)
(when (= ::tx/indexed-tx (::bus/event-type %))
(deliver !latch @!events))))
!latch (promise)
bus (get-in (meta *api*) [::n/topology ::n/bus])]
(doseq [event-type #{::tx/indexing-docs ::tx/indexed-docs
::tx/indexing-tx ::tx/indexed-tx}]
(bus/listen bus {:crux/event-type event-type}
(fn [evt]
(swap! !events conj evt)
(when (= ::tx/indexed-tx (:crux/event-type evt))
(deliver !latch @!events)))))

(let [doc-1 {:crux.db/id :foo, :value 1}
doc-2 {:crux.db/id :bar, :value 2}
Expand All @@ -789,12 +790,12 @@
(when (= ::timeout (deref !latch 500 ::timeout))
(t/is false))

(t/is (= [{::bus/event-type ::tx/indexing-docs, :doc-ids #{(c/new-id doc-1) (c/new-id doc-2)}}
{::bus/event-type ::tx/indexed-docs
(t/is (= [{:crux/event-type ::tx/indexing-docs, :doc-ids #{(c/new-id doc-1) (c/new-id doc-2)}}
{:crux/event-type ::tx/indexed-docs
:doc-ids #{(c/new-id doc-1) (c/new-id doc-2)}
:av-count 4}
{::bus/event-type ::tx/indexing-tx, ::tx/submitted-tx submitted-tx}
{::bus/event-type ::tx/indexed-tx, ::tx/submitted-tx submitted-tx, :committed? true}]
{:crux/event-type ::tx/indexing-tx, ::tx/submitted-tx submitted-tx}
{:crux/event-type ::tx/indexed-tx, ::tx/submitted-tx submitted-tx, :committed? true}]
(-> (vec @!events)
(update 1 dissoc :bytes-indexed)))))))

Expand Down

0 comments on commit d702d5b

Please sign in to comment.