Skip to content

Commit

Permalink
Integrate WAL into main stream
Browse files Browse the repository at this point in the history
  • Loading branch information
sunng87 committed Sep 16, 2011
1 parent b807f84 commit c46ae9f
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 5 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ clojalk.iml
clojalk.iws
clojalk.ipr
/docs/
/binlogs/

3 changes: 3 additions & 0 deletions clojalk.properties
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
server.port=10000

wal.dir=./binlogs/
wal.files=8
5 changes: 4 additions & 1 deletion src/clojalk/main.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
(ns clojalk.main
(:gen-class)
(:refer-clojure :exclude [use peek])
(:use [clojalk net core utils jmx])
(:use [clojalk net core utils jmx wal])
(:use [clojure.contrib.properties]))

(defn property [properties key]
Expand All @@ -10,6 +10,9 @@
(defn -main [& args]
(let [prop-file-name (or (first args) "clojalk.properties")
props (read-properties prop-file-name)]
(binding [*clojalk-log-dir* (property props "wal.dir")
*clojalk-log-count* (property props "wal.files")]
(start-wal))
(start-tasks)
(binding [*clojalk-port* (as-int (property props "server.port"))]
(start-server))
Expand Down
22 changes: 18 additions & 4 deletions src/clojalk/wal.clj
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,11 @@

;; Update id counter after all job records are loaded from logs
;;
;; IMPORTANT! Append a **0** into the job key collection to prevent
;; exception when there is no jobs
(defn- update-id-counter []
(swap! clojalk.core/id-counter (constantly (long (apply max (keys @clojalk.core/jobs))))))
(swap! clojalk.core/id-counter
(constantly (long (apply max (conj (keys @clojalk.core/jobs) 0))))))

;; ## Replay logs and load jobs
;;
Expand All @@ -199,11 +202,11 @@
;; server restarted.
;;
(defn replay-logs []
(let [bin-log-files (scan-dir *clojalk-log-dir*)]
(if-let [bin-log-files (scan-dir *clojalk-log-dir*)]
(dosync
(doall (map #(read-file % replay-handler) bin-log-files))
(replay-tubes)))
(update-id-counter)
(replay-tubes))
(update-id-counter))
(empty-dir *clojalk-log-dir*))

;; log files are split into several parts
Expand All @@ -215,6 +218,11 @@

;; Create empty log files into `log-files`. This is invoked after legacy logs replayed.
(defn init-log-files []
(let [dir (file *clojalk-log-dir*)]
(if-not (.exists dir) (.mkdirs dir))
(if-not (.exists dir)
(throw (IllegalStateException.
(str "Failed to create WAL directory: " (.getAbsolutePath dir))))))
(let [ss (map #(output-stream (file *clojalk-log-dir* (str "clojalk-" % ".bin")))
(range *clojalk-log-count*))]
(dosync (ref-set log-files ss))))
Expand All @@ -234,3 +242,9 @@
(defn dump-all-jobs []
(doseq [j (vals @clojalk.core/jobs)]
(write-job j true)))

;; Start proceduce of WAL module invoked before server and task start
(defn start-wal []
(replay-logs)
(init-log-files)
(dump-all-jobs))

0 comments on commit c46ae9f

Please sign in to comment.