Permalink
Browse files

added id-timer abstraction

  • Loading branch information...
1 parent 1f39064 commit 720901fa1ed63e1985aec7fef6e5239b46d7964d @nathanmarz committed Dec 8, 2011
Showing with 45 additions and 1 deletion.
  1. +27 −1 src/clj/backtype/storm/util.clj
  2. +18 −0 src/jvm/backtype/storm/utils/ClojureTimerTask.java
@@ -3,7 +3,7 @@
(:import [java.util Map List Collection])
(:import [java.io FileReader])
(:import [backtype.storm Config])
- (:import [backtype.storm.utils Time Container])
+ (:import [backtype.storm.utils Time Container ClojureTimerTask])
(:import [java.util UUID])
(:import [java.util.concurrent.locks ReentrantReadWriteLock])
(:import [java.io File RandomAccessFile StringWriter PrintWriter])
@@ -12,6 +12,7 @@
(:import [org.apache.commons.io FileUtils])
(:import [org.apache.commons.exec ExecuteException])
(:import [org.json.simple JSONValue])
+ (:import [java.util Timer])
(:require [clojure.contrib [str-utils2 :as str]])
(:require [clojure [set :as set]])
(:use [clojure walk])
@@ -531,3 +532,28 @@
(defn container-get [^Container container]
(. container object))
+
+(defn mk-timer []
+ {:timer (Timer.) :scheduled (atom #{})})
+
+(defn scheduled? [timer id]
+ (contains? @(:scheduled timer) id))
+
+(defn schedule [timer id delay afn]
+ (when (scheduled? timer id)
+ (throw (IllegalArgumentException. "Cannot schedule an already scheduled id")))
+ (let [wrapped (fn [] (afn) (swap! (:scheduled timer) disj id))]
+ (.schedule ^Timer (:timer timer)
+ delay
+ (ClojureTimerTask. wrapped))
+ (swap! (:scheduled timer) conj id)
+ ))
+
+(defn schedule-if-free [timer id delay afn]
+ (if-not (scheduled? timer id)
+ (schedule timer id delay afn)
+ ))
+
+;; need a map from "states" to the corresponding event transition (transition can include a delay)
+;; {:rebalance {:delay 10 :action (fn [] ... :active)}
+;; {(fn [] ...)}}
@@ -0,0 +1,18 @@
+package backtype.storm.utils;
+
+import clojure.lang.IFn;
+import java.util.TimerTask;
+
+public class ClojureTimerTask extends TimerTask {
+ IFn _afn;
+
+ public ClojureTimerTask(IFn afn) {
+ super();
+ _afn = afn;
+ }
+
+ @Override
+ public void run() {
+ _afn.run();
+ }
+}

0 comments on commit 720901f

Please sign in to comment.