Permalink
Browse files

Implement the rest of SchedulerListener methods

  • Loading branch information...
1 parent e642233 commit 82e49ceffb526ceae7ae5790d6863c6b4419a32a @michaelklishin committed Jan 14, 2012
@@ -5,7 +5,8 @@
:constructors {[com.rabbitmq.client.Channel String String] []})
(:require [langohr.basic :as lhb]
[clojure.data.json :as json])
- (:import [org.quartz SchedulerListener]
+ (:use [clojurewerkz.quartzite.conversion])
+ (:import [org.quartz SchedulerListener SchedulerException Trigger TriggerKey JobDetail JobKey]
[com.rabbitmq.client Channel]
[java.util Date]
[clojurewerkz.quartzite.listeners.amqp PublishingSchedulerListener]))
@@ -34,3 +35,63 @@
(payloadless-publisher -schedulerInStandbyMode "quartz.scheduler.standby")
(payloadless-publisher -schedulingDataCleared "quartz.scheduler.cleared")
(payloadless-publisher -schedulerShuttingDown "quartz.scheduler.shutdown")
+
+
+(defn -schedulerError
+ [this ^String msg ^SchedulerException cause]
+ (publish this (json/json-str { :message msg :cause (str cause) }) "quartz.scheduler.error"))
+
+
+(defn -jobScheduled
+ [this ^Trigger trigger]
+ (publish this (json/json-str { :group (-> trigger .getKey .getGroup) :key (-> trigger .getKey .getName) :description (.getDescription trigger) }) "quartz.scheduler.job-scheduled"))
+
+(defn -jobUnscheduled
+ [this ^TriggerKey key]
+ (publish this (json/json-str { :group (.getGroup key) :key (.getName key) }) "quartz.scheduler.job-unscheduled"))
+
+(defn -triggerFinalized
+ [this ^Trigger trigger]
+ (publish this (json/json-str { :group (-> trigger .getKey .getGroup) :key (-> trigger .getKey .getName) :description (.getDescription trigger) }) "quartz.scheduler.trigger-finalized"))
+
+(defn -triggerPaused
+ [this ^TriggerKey key]
+ (publish this (json/json-str { :group (.getGroup key) :key (.getName key) }) "quartz.scheduler.trigger-paused"))
+
+(defn -triggersPaused
+ [this ^String trigger-group]
+ (publish this (json/json-str { :group trigger-group }) "quartz.scheduler.triggers-paused"))
+
+(defn -triggerResumed
+ [this ^TriggerKey key]
+ (publish this (json/json-str { :group (.getGroup key) :key (.getName key) }) "quartz.scheduler.trigger-resumed"))
+
+(defn -triggersResumed
+ [this ^String trigger-group]
+ (publish this (json/json-str { :group trigger-group }) "quartz.scheduler.triggers-resumed"))
+
+
+
+(defn -jobAdded
+ [this ^JobDetail detail]
+ (publish this (json/json-str { :job-detail (from-job-data (.getJobDataMap detail)) :description (.getDescription detail) }) "quartz.scheduler.job-added"))
+
+(defn -jobDeleted
+ [this ^JobKey key]
+ (publish this (json/json-str { :group (.getGroup key) :key (.getName key) }) "quartz.scheduler.job-deleted"))
+
+(defn -jobPaused
+ [this ^JobKey key]
+ (publish this (json/json-str { :group (.getGroup key) :key (.getName key) }) "quartz.scheduler.job-paused"))
+
+(defn -jobsPaused
+ [this ^String job-group]
+ (publish this (json/json-str { :group job-group }) "quartz.scheduler.jobs-paused"))
+
+(defn -jobResumed
+ [this ^JobKey key]
+ (publish this (json/json-str { :group (.getGroup key) :key (.getName key) }) "quartz.scheduler.job-resumed"))
+
+(defn -jobsResumed
+ [this ^String job-group]
+ (publish this (json/json-str { :group job-group }) "quartz.scheduler.jobs-resumed"))
@@ -12,7 +12,7 @@
[clojurewerkz.quartzite.listeners.amqp.PublishingSchedulerListener])
(:import [com.rabbitmq.client Connection Channel AMQP$BasicProperties Envelope]
[clojurewerkz.quartzite.listeners.amqp PublishingSchedulerListener]
- [java.util.concurrent ConcurrentLinkedQueue]))
+ [java.util.concurrent ConcurrentLinkedQueue CountDownLatch]))
@@ -61,9 +61,151 @@
(let [[queue mbox] (register-consumer "test-scheduler-is-put-into-standby-event")
listener (PublishingSchedulerListener. channel "" queue)]
(sched/add-scheduler-listener listener)
- (sched/start)
+ (sched/start)
(sched/standby)
(sched/start)
(Thread/sleep 500)
(is (not (.isEmpty mbox)))
(is (= ["quartz.scheduler.started" "quartz.scheduler.standby" "quartz.scheduler.started"] (vec (message-types-in mbox))))))
+
+
+
+;;
+;; job.scheduled
+;;
+
+(def latch1 (CountDownLatch. 10))
+
+(defrecord Latch1Job []
+ org.quartz.Job
+ (execute [this ctx]
+ (.countDown ^CountDownLatch latch1)))
+
+(deftest test-job-scheduled-event
+ (let [[queue mbox] (register-consumer "test-scheduled-event")
+ listener (PublishingSchedulerListener. channel "" queue)
+ job (j/build
+ (j/of-type Latch1Job)
+ (j/with-identity "clojurewerkz.quartzite.listeners.amqp.test.scheduler_lifecycle_events" "job1"))
+ trigger (t/build
+ (t/start-now)
+ (t/with-schedule (s/schedule
+ (s/with-repeat-count 10)
+ (s/with-interval-in-milliseconds 200))))]
+ (sched/add-scheduler-listener listener)
+ (sched/start)
+ (sched/schedule job trigger)
+ (.await latch1)
+ (Thread/sleep 500)
+ (is (not (.isEmpty mbox)))
+ (is (some #{"quartz.scheduler.job-scheduled"} (vec (message-types-in mbox))))))
+
+
+;;
+;; job.unscheduled
+;;
+
+(def latch2 (CountDownLatch. 10))
+
+(defrecord Latch2Job []
+ org.quartz.Job
+ (execute [this ctx]
+ (.countDown ^CountDownLatch latch2)))
+
+(deftest test-job-unscheduled-event
+ (let [[queue mbox] (register-consumer "test-job-unscheduled-event")
+ listener (PublishingSchedulerListener. channel "" queue)
+ tk (t/key "clojurewerkz.quartzite.listeners.amqp.test.scheduler_lifecycle_events" "trigger2")
+ job (j/build
+ (j/of-type Latch2Job)
+ (j/with-identity "clojurewerkz.quartzite.listeners.amqp.test.scheduler_lifecycle_events" "job2"))
+ trigger (t/build
+ (t/start-now)
+ (t/with-identity tk)
+ (t/with-schedule (s/schedule
+ (s/with-repeat-count 10)
+ (s/with-interval-in-milliseconds 200))))]
+ (sched/add-scheduler-listener listener)
+ (sched/start)
+ (sched/schedule job trigger)
+ (.await latch2)
+ (Thread/sleep 200)
+ (sched/unschedule-job tk)
+ (Thread/sleep 500)
+ (is (not (.isEmpty mbox)))
+ (is (some #{"quartz.scheduler.job-unscheduled"}
+ (vec (message-types-in mbox))))))
+
+
+
+;;
+;; job.deleted
+;;
+
+(def latch3 (CountDownLatch. 10))
+
+(defrecord Latch3Job []
+ org.quartz.Job
+ (execute [this ctx]
+ (.countDown ^CountDownLatch latch3)))
+
+(deftest test-job-deleted-event
+ (let [[queue mbox] (register-consumer "test-job-deleted-event")
+ listener (PublishingSchedulerListener. channel "" queue)
+ job (j/build
+ (j/of-type Latch3Job)
+ (j/with-identity "clojurewerkz.quartzite.listeners.amqp.test.scheduler_lifecycle_events" "job3"))
+ trigger (t/build
+ (t/start-now)
+ (t/with-schedule (s/schedule
+ (s/with-repeat-count 10)
+ (s/with-interval-in-milliseconds 200))))
+ jk (j/key "clojurewerkz.quartzite.listeners.amqp.test.scheduler_lifecycle_events" "job3")]
+ (sched/add-scheduler-listener listener)
+ (sched/start)
+ (sched/schedule job trigger)
+ (.await latch3)
+ (Thread/sleep 200)
+ (sched/delete-job jk)
+ (Thread/sleep 500)
+ (is (not (.isEmpty mbox)))
+ (let [types (vec (message-types-in mbox))]
+ (is (some #{"quartz.scheduler.job-deleted"} types))
+ (is (some #{"quartz.scheduler.job-added"} types)))))
+
+
+;;
+;; trigger.paused, trigger.finalized
+;;
+
+(def latch4 (CountDownLatch. 10))
+
+(defrecord Latch4Job []
+ org.quartz.Job
+ (execute [this ctx]
+ (.countDown ^CountDownLatch latch4)))
+
+(deftest test-trigger-paused-event
+ (let [[queue mbox] (register-consumer "test-trigger-paused-event")
+ listener (PublishingSchedulerListener. channel "" queue)
+ tk (t/key "clojurewerkz.quartzite.listeners.amqp.test.scheduler_lifecycle_events" "trigger4")
+ job (j/build
+ (j/of-type Latch3Job)
+ (j/with-identity "clojurewerkz.quartzite.listeners.amqp.test.scheduler_lifecycle_events" "job4"))
+ trigger (t/build
+ (t/start-now)
+ (t/with-identity tk)
+ (t/with-schedule (s/schedule
+ (s/with-repeat-count 10)
+ (s/with-interval-in-milliseconds 200))))]
+ (sched/add-scheduler-listener listener)
+ (sched/start)
+ (sched/schedule job trigger)
+ (.await latch3)
+ (Thread/sleep 200)
+ (sched/pause-trigger tk)
+ (Thread/sleep 500)
+ (is (not (.isEmpty mbox)))
+ (let [types (vec (message-types-in mbox))]
+ (is (some #{"quartz.scheduler.trigger-paused"} types))
+ (is (some #{"quartz.scheduler.trigger-finalized"} types)))))

0 comments on commit 82e49ce

Please sign in to comment.