Browse files

update documentation

  • Loading branch information...
1 parent dcff325 commit 7c470b1f5f3b41dbfa0284de9d9a0b7cb3b58944 @sunng87 committed Sep 19, 2011
Showing with 82 additions and 3 deletions.
  1. +3 −0 src/clojalk/data.clj
  2. +79 −3 src/clojalk/net.clj
@@ -6,6 +6,7 @@
;; # Data Structures and constructors
;; Structure definition for ***Job***
;; **Job** is the basic task unit in clojalk. The fields are described below.
;; * **id** a numerical unique id of this Job
@@ -36,6 +37,7 @@
:reserves :timeouts :releases :buries :kicks)
;; Structure definition for Tube
;; Tube is a collection of jobs, similar to the database in RDBMS.
;; * **name** the name of this tube, as keyword.
@@ -51,6 +53,7 @@
:waiting_list :paused :pause_deadline :pauses)
;; Structure definition for Session (connection in beanstalkd)
;; Session represents all clients connected to clojalk.
;; * **id** the id of this session
@@ -1,3 +1,10 @@
+;; # Network interface for clojalk
+;; Clojalk uses **aleph** as TCP server which is based on Netty.
+;; The text based protocol is almost compatible with Beanstalkd except
+;; some response message with space ended, due to the limitation of
+;; gloss protocol definition framework. (I will explain in the document.)
(:refer-clojure :exclude [use peek])
(:require [clojure.contrib.logging :as logging])
@@ -8,7 +15,7 @@
(:use [lamina.core])
(:use [gloss.core]))
-;; this is for test and debug only
+;; this is an aleph handler for testing and debug only
(defn echo-handler [ch client-info]
(receive-all ch
#(if-let [msg %]
@@ -21,17 +28,38 @@
(enqueue ch ["UNKNOWN_COMMAND"]))))))
+;; Create a new session on the channel.
+;; Not that `` could accept additional field as
+;; data storage. Here we attach the channel to it.
+;; Also we registered a `lamina` channel callback on the channel-close
+;; event to cleanup data bound on the session.
(defn- create-session [ch remote-addr type]
(open-session remote-addr type :channel ch)
;; also register on-closed callback on channel
(on-closed ch #(close-session remote-addr)))
+;; Find a session from sessions. Create a new session with
+;; `create-session` if not found.
(defn get-or-create-session [ch remote-addr type]
(if-not (contains? @sessions remote-addr)
(create-session ch remote-addr type))
(@sessions remote-addr))
-;; reserve watcher
+;; Internally, the reserve operation in clojalk is non-blocking. It
+;; will return `nil` if there is no job available for reservation. And
+;; the job will be assigned to waiting session when it becomes
+;; available.
+;; We use watch on the session ref to detect if there is a new job
+;; assigned to the session and then to return the message to client.
+;; To find out the new job, just compare the `:incoming_job` field.
+;; Also, if state of the session is changed from `:waiting` to
+;; `:idle`, it means the session has been expired. We will send a
+;; `TIMEOUT` message to client.
(defn reserve-watcher [key identity old-value new-value]
(let [old-job (:incoming_job old-value)
new-job (:incoming_job new-value)]
@@ -43,7 +71,24 @@
(if (and (= :waiting old-state) (= :idle new-state))
(enqueue (:channel new-value) ["TIMED_OUT"]))))
-;; server handlers
+;; ## Command handlers.
+;; All these command handlers simply follow the procedure:
+;; 1. Extract arguments from argument array
+;; 2. Type conversion (from string to numbers)
+;; 3. Run specific command with macro `exec-cmd` defined in
+;; 4. Return data or error message to client
+;; Handles input like:
+;; <BODY>
+;; Arguments are parsed into numbers. If there are invalid characters
+;; in numeric fields, a `BAD_FORMAT` will be returned to client.
(defn on-put [ch session args]
(let [priority (as-int (first args))
@@ -56,27 +101,58 @@
(enqueue ch ["DRAINING"])))
(catch NumberFormatException e (enqueue ch ["BAD_FORMAT"]))))
+;; Handles input like:
+;; reserve
+;; Add a watch to the session. We use session id as watcher id so next
+;; time when session receives reserve command, the watcher is
+;; overwrote.
+;; the handler will return immediately whenever there is any job could
+;; be reserved.
(defn on-reserve [ch session]
(add-watch session (:id session) reserve-watcher)
(exec-cmd "reserve" session))
+;; Handles input like:
+;; use <TUBE-NAME>
(defn on-use [ch session args]
(let [tube-name (first args)]
(exec-cmd "use" session tube-name)
(enqueue ch ["USING" tube-name])))
+;; Handles input like:
+;; watch <TUBE-NAME>
(defn on-watch [ch session args]
(let [tube-name (first args)]
(exec-cmd "watch" session tube-name)
(enqueue ch ["WATCHING" (str (count (:watch @session)))])))
+;; Handles input like:
+;; ignore <TUBE-NAME>
+;; It returns `NOT_IGNORED` if the session is ignoring
+;; the only tube is watching. And this check is performed by this
+;; handler instead of logic in `clojalk.core`
(defn on-ignore [ch session args]
(if (> (count (:watch @session)) 1)
(let [tube-name (first args)]
(exec-cmd "ignore" session tube-name)
(enqueue ch ["WATCHING" (str (count (:watch @session)))]))
(enqueue ch ["NOT_IGNORED"])))
+;; Handles input like:
+;; quit
(defn on-quit [ch remote-addr]
; (close-session remote-addr)
(close ch))

0 comments on commit 7c470b1

Please sign in to comment.