Skip to content
Browse files

simple example

  • Loading branch information...
1 parent 6fac4ec commit 252440d43e47602a339b49daa8da5729c020e91b @nathanmarz committed May 20, 2012
Showing with 52 additions and 109 deletions.
  1. +1 −1 project.clj
  2. +51 −108 src/examples/disruptor/eg1.clj
View
2 project.clj
@@ -1,6 +1,6 @@
(defproject clj-disruptor "0.0.1"
:description "Clojure access to LMax Disruptor (ring-buffer event processor) https://github.com/davesann/clj-disruptor"
- :dependencies [[org.clojure/clojure "1.3.0"]
+ :dependencies [[org.clojure/clojure "1.4.0"]
[com.googlecode.disruptor/disruptor "2.7.1"]
]
:dev-dependencies [[lein-eclipse "1.0.0"]
View
159 src/examples/disruptor/eg1.clj
@@ -5,76 +5,12 @@
[disruptor.events :as des])
)
-(defn timestamp []
- (System/currentTimeMillis))
-;; # global state to track number of handles.
-;; This could be the application state or model
-;; depending on how the event handlers are configured,
-;; this may not need th be thread safe
-(def handle-count (atom 0))
-
-(defn inc-handle-count []
- (swap! handle-count inc))
-
-(defn reset-count [handle-count value]
- (reset! handle-count value))
-
-
-;; # event handlers
-;; note - return values are ignored unless the handler is registered with a 'slot'
-;; (see below)
-
-(defn handler-A [event sequence-num end-of-batch?]
- (inc-handle-count)
- {:A (timestamp)})
-
-(defn handler-B [event sequence-num end-of-batch?]
- (inc-handle-count)
- {:B (timestamp)})
-
-(defn handler-C [event sequence-num end-of-batch?]
- (inc-handle-count)
- {:C (timestamp)})
-
-(defn handler-D [event sequence-num end-of-batch?]
- (inc-handle-count)
- {:D (timestamp)})
-
-;; this is used to terminate the disruptor and print the final event
-;; in practice you would likely not do this.
-(defn make-done-handler [iterations disruptor start-time]
- (fn [event sequence-num end-of-batch?]
- (inc-handle-count)
- (when (>= sequence-num iterations)
- (println "stopping")
- (.start (java.lang.Thread. #(dc/shutdown disruptor)))
-
- (let [hc @handle-count
- t (- (timestamp) start-time)
- handles-per-second (/ (* hc 1000.0) t)]
- (println)
- (println (des/pstr event))
- (println)
- (println (format "%d iterations, %d handles, handles/s: %.0f, time(s) %.2f"
- iterations
- hc
- handles-per-second
- (/ t 1000.0)
- ))
- ))))
-
-;; # General setup
-
-;; Use a Map for the event type the slot-keys can be any valid map key
-(comment
- (def create-event des/identity-map-event-factory)
- )
;; Use an object-array for the event type the slot-keys must be ints
;; within the length of the array
-(def create-event (des/create-object-array-event-factory 5))
+(def create-event (des/create-object-array-event-factory 1))
(defn create-executor-service-p
@@ -97,48 +33,55 @@
;; # run the example
-(def default-ring-size (* 8 1024))
-(def default-num-event-processors 4)
-
-(import '[com.lmax.disruptor RingBuffer])
-(defn publish-events
- "publish a bunch of events to the ring buffer"
- [^RingBuffer ring-buffer iterations]
- (loop [i 0]
- (when-not (> i iterations)
- (dc/publish ring-buffer 0 {:i i :start (timestamp)})
- (recur (unchecked-add 1 i))
- )))
-
-(defn go
- "run the example"
- ([iterations] (go iterations default-ring-size default-num-event-processors))
- ([iterations ring-size num-event-processors]
- (println "Started")
- (reset-count handle-count 0)
- (let [executor-service (create-executor-service)
- disruptor (dc/make-disruptor executor-service
- ring-size create-event
- :single-threaded :blocking
- )]
- (->
- (des/handle-events-with disruptor {1 handler-A})
- (des/then-handle {2 handler-B 3 handler-C})
- (des/then-handle {4 handler-D})
- (des/then-handle (make-done-handler iterations disruptor (timestamp)))
- )
- (let [ring-buffer (dc/start disruptor)]
- (.start (java.lang.Thread. #(publish-events ring-buffer iterations))))
-
- )))
-
-(comment
- (def iterations 300000000)
-
- (def iterations 300000)
- (def iterations 10)
- (go iterations)
+(def ring-size (* 8 1024))
+
+(def curr-time (atom nil))
+
+(def last-item 9999999)
+
+(defn handler-A [event sequence-num end-of-batch?]
+ (when (= last-item (first event))
+ (println "PROCESS THREAD: " (Thread/currentThread))
+ (println "TIME TO COMPLETION: " (- (System/currentTimeMillis) @curr-time))
+ )
+ ;; (if end-of-batch? (println (seq event) sequence-num end-of-batch?))
)
-
-
+(def executor-service (create-executor-service))
+(def disruptor (dc/make-disruptor executor-service
+ ring-size create-event
+ :single-threaded :yielding
+ ))
+(des/handle-events-with disruptor handler-A)
+(def ring-buffer (dc/start disruptor))
+
+(defn pub [ring-buffer data]
+ (dc/publish ring-buffer 0 data))
+
+
+(import 'java.util.concurrent.LinkedBlockingQueue)
+
+(defn pubq [^LinkedBlockingQueue q data]
+ (.put q data))
+
+(defn takeq [^LinkedBlockingQueue q]
+ (.take q))
+
+(def q (LinkedBlockingQueue.))
+(.execute executor-service (fn [] (while true
+ (let [obj (takeq q)]
+ (if (= obj last-item)
+ (println "TIME TO COMPLETION: " (- (System/currentTimeMillis) @curr-time))
+ ))
+ )))
+
+
+(println "PUB THREAD: " (Thread/currentThread))
+(reset! curr-time (System/currentTimeMillis))
+(doseq [i (range (inc last-item))]
+ (pub ring-buffer i))
+
+
+(reset! curr-time (System/currentTimeMillis))
+(doseq [i (range (inc last-item))]
+ (pubq q i))

0 comments on commit 252440d

Please sign in to comment.
Something went wrong with that request. Please try again.