Skip to content
Browse files

greatly lesson likelihood of race condition between supervisor and wo…

…rker
  • Loading branch information...
1 parent 87d133f commit f6804ad2ce30e1ab3abcf1115b1d46256e0575b8 @nathanmarz nathanmarz committed Mar 6, 2013
Showing with 25 additions and 7 deletions.
  1. +9 −3 src/clj/backtype/storm/daemon/worker.clj
  2. +16 −4 src/jvm/backtype/storm/utils/LocalState.java
View
12 src/clj/backtype/storm/daemon/worker.clj
@@ -41,12 +41,18 @@
(current-time-secs)
(:storm-id worker)
(:executors worker)
- (:port worker))]
+ (:port worker))
+ state (worker-state conf (:worker-id worker))]
(log-debug "Doing heartbeat " (pr-str hb))
;; do the local-file-system heartbeat.
- (.put (worker-state conf (:worker-id worker))
+ (.put state
LS-WORKER-HEARTBEAT
- hb)
+ hb
+ false
+ )
+ (.cleanup state 60) ; this is just in case supervisor is down so that disk doesn't fill up.
+ ; it shouldn't take supervisor 120 seconds between listing dir and reading it
+
))
(defn worker-outbound-tasks
View
20 src/jvm/backtype/storm/utils/LocalState.java
@@ -29,22 +29,34 @@ public Object get(Object key) throws IOException {
}
public synchronized void put(Object key, Object val) throws IOException {
+ put(key, val, true);
+ }
+
+ public synchronized void put(Object key, Object val, boolean cleanup) throws IOException {
Map<Object, Object> curr = snapshot();
curr.put(key, val);
- persist(curr);
+ persist(curr, cleanup);
}
public synchronized void remove(Object key) throws IOException {
+ remove(key, true);
+ }
+
+ public synchronized void remove(Object key, boolean cleanup) throws IOException {
Map<Object, Object> curr = snapshot();
curr.remove(key);
- persist(curr);
+ persist(curr, cleanup);
+ }
+
+ public synchronized void cleanup(int keepVersions) throws IOException {
+ _vs.cleanup(keepVersions);
}
- private void persist(Map<Object, Object> val) throws IOException {
+ private void persist(Map<Object, Object> val, boolean cleanup) throws IOException {
byte[] toWrite = Utils.serialize(val);
String newPath = _vs.createVersion();
FileUtils.writeByteArrayToFile(new File(newPath), toWrite);
_vs.succeedVersion(newPath);
- _vs.cleanup(4);
+ if(cleanup) _vs.cleanup(4);
}
}

0 comments on commit f6804ad

Please sign in to comment.
Something went wrong with that request. Please try again.