Permalink
Browse files

clojure 1.3 changes except for various long/int issues

  • Loading branch information...
1 parent 18dd742 commit f597bb120ff8fafc5d5df48e92fb01cb0eea0aa3 @nathanmarz committed Oct 18, 2011
View
@@ -5,8 +5,7 @@
:javac-options {:debug "true" :fork "true"}
:resources-path "conf"
:dev-resources-path "src/dev"
- :dependencies [[org.clojure/clojure "1.2.0"]
- [org.clojure/clojure-contrib "1.2.0"]
+ :dependencies [[org.clojure/clojure "1.3.0"]
[commons-io "1.4"]
[org.apache.commons/commons-exec "1.1"]
[jvyaml "1.0.0"]
@@ -19,10 +18,12 @@
[compojure "0.6.4"]
[hiccup "0.3.6"]
[ring/ring-jetty-adapter "0.3.11"]
+ [org.clojure/tools.logging "0.2.3"]
+ [org.clojure/math.numeric-tower "0.0.1"]
]
:uberjar-exclusions [#"META-INF.*"]
:dev-dependencies [
- [swank-clojure "1.2.1"]
+ [swank-clojure "1.4.0-SNAPSHOT"]
[lein-ring "0.4.5"]
]
:jvm-opts ["-Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib"]
@@ -16,8 +16,7 @@
(import (quote [backtype.storm.daemon Shutdownable]))
(require (quote [backtype.storm.messaging.loader :as msg-loader]))
(require (quote [backtype.storm.messaging.protocol :as msg]))
- (use (quote [backtype.storm config util log clojure]))
- (use (quote [clojure.contrib.seq :only [find-first]]))
+ (use (quote [backtype.storm config util log]))
(require (quote [backtype.storm [thrift :as thrift] [cluster :as cluster]
[event :as event] [process-simulator :as psim]]))
(require (quote [clojure.set :as set]))
@@ -1,13 +1,13 @@
(ns backtype.storm.clojure
- (:use [clojure.contrib.def :only [defnk defalias]])
- (:use [backtype.storm bootstrap util])
+ (:use [backtype.storm util])
(:import [backtype.storm LocalCluster StormSubmitter])
(:import [backtype.storm.generated StreamInfo])
(:import [backtype.storm.tuple Tuple])
(:import [backtype.storm.task OutputCollector IBolt])
(:import [backtype.storm.spout SpoutOutputCollector ISpout])
(:import [backtype.storm.utils Utils])
(:import [backtype.storm.clojure ClojureBolt ClojureSpout])
+ (:import [java.util List])
(:require [backtype.storm [thrift :as thrift]]))
(defn hint [sym class-sym]
@@ -2,7 +2,6 @@
(:import [org.apache.zookeeper.data Stat])
(:import [backtype.storm.utils Utils])
(:use [backtype.storm util log config])
- (:use [clojure.contrib.core :only [dissoc-in]])
(:require [backtype.storm [zookeeper :as zk]])
)
@@ -4,7 +4,7 @@
(:import [backtype.storm Config])
(:import [backtype.storm.utils Utils LocalState])
(:import [org.apache.commons.io FileUtils])
- (:require [clojure.contrib [str-utils2 :as str]])
+ (:require [clojure [string :as str]])
(:use [backtype.storm util])
)
@@ -1,5 +1,4 @@
(ns backtype.storm.daemon.common
- (:use [clojure.contrib.seq-utils :only [find-first]])
(:use [backtype.storm log config util])
)
@@ -171,6 +171,8 @@
(defn- setup-storm-static [conf storm-id storm-cluster-state]
(doseq [[task-id component-id] (mk-task-component-assignments conf storm-id)]
+ (log-message "static " task-id (class task-id))
+ (log-message "static comp" component-id (class component-id))
(.set-task! storm-cluster-state storm-id task-id (TaskInfo. component-id))
))
@@ -250,6 +252,7 @@
(let [existing-assigned (reverse-map (:task->node+port existing-assignment))
storm-conf (read-storm-conf conf storm-id)
all-task-ids (set (.task-ids storm-cluster-state storm-id))
+ _ (doseq [t all-task-ids] (log-message "compute " t (class t)))
alive-ids (set (alive-tasks conf storm-id storm-cluster-state
all-task-ids (:task->start-time-secs existing-assignment) task-heartbeats-cache))
alive-assigned (filter-val (partial every? alive-ids) existing-assigned)
@@ -310,6 +313,8 @@
start-times
)
]
+ (doseq [t (keys (:task->node+port assignment))] (log-message "Nimbus task" t (class t)))
+ (doseq [t (keys (:task->start-time-secs assignment))] (log-message "Nimbus time" t (class t)))
;; tasks figure out what tasks to talk to by looking at topology at runtime
;; only log/set when there's been a change to the assignment
(if (= existing-assignment assignment)
@@ -30,7 +30,7 @@
))]
(into {} (for [[port task-ids] port-tasks]
;; need to cast to int b/c it might be a long (due to how yaml parses things)
- [(int port) (LocalAssignment. storm-id task-ids)]
+ [(Integer. port) (LocalAssignment. storm-id task-ids)]
))
))
@@ -43,6 +43,7 @@
"Returns seq of task-ids that receive messages from this worker"
;; if this is an acker, needs to talk to the spouts
[task->component mk-topology-context task-ids]
+ (log-message "worker outbound " (class (first task-ids)))
(let [topology-context (mk-topology-context (first task-ids))
spout-components (-> topology-context
.getRawTopology
@@ -82,6 +83,7 @@
cluster-state (cluster/mk-distributed-cluster-state conf)
storm-cluster-state (cluster/mk-storm-cluster-state cluster-state)
task-ids (read-worker-task-ids storm-cluster-state storm-id supervisor-id port)
+ _ (doseq [t task-ids] (log-message t " " (class t)))
;; because in local mode, its not a separate
;; process. supervisor will register it in this case
_ (when (= :distributed (cluster-mode conf))
@@ -1,11 +1,11 @@
(ns backtype.storm.log
- (:require [clojure.contrib [logging :as log]]))
+ (:require [clojure.tools [logging :as log]]))
(defmacro log-message [& args]
`(log/info (str ~@args)))
(defmacro log-error [e & args]
- `(log/error (str ~@args) ~e))
+ `(log/log :error ~e (str ~@args)))
(defmacro log-debug [& args]
`(log/debug (str ~@args)))
@@ -4,13 +4,12 @@
ClusterSummary TopologyInfo TopologySummary TaskSummary TaskStats TaskSpecificStats
SpoutStats BoltStats ErrorInfo SupervisorSummary])
(:use [backtype.storm util])
- (:use [clojure.contrib.seq-utils :only [find-first]])
- (:use [clojure.contrib.math :only [ceil]]))
+ (:use [clojure.math.numeric-tower :only [ceil]]))
;;TODO: consider replacing this with some sort of RRD
(defn curr-time-bucket [^Integer time-secs ^Integer bucket-size-secs]
- (* bucket-size-secs (unchecked-divide time-secs bucket-size-secs))
+ (* bucket-size-secs (unchecked-divide-int time-secs bucket-size-secs))
)
(defrecord RollingWindow [updater merger extractor bucket-size-secs num-buckets buckets])
@@ -14,8 +14,6 @@
SpoutTracker BoltTracker TrackerAggregator])
(:require [backtype.storm [zookeeper :as zk]])
(:require [backtype.storm.messaging.loader :as msg-loader])
- (:use [clojure.contrib.def :only [defnk]])
- (:use [clojure.contrib.seq :only [find-first]])
(:use [backtype.storm cluster util thrift config log]))
(defn feeder-spout [fields]
@@ -7,7 +7,6 @@
(:import [org.apache.thrift.protocol TBinaryProtocol TProtocol])
(:import [org.apache.thrift.transport TTransport TFramedTransport TSocket])
(:use [backtype.storm util])
- (:use [clojure.contrib.def :only [defnk]])
)
(def grouping-constants
@@ -4,8 +4,6 @@
(:use [backtype.storm config util])
(:use [backtype.storm.ui helpers])
(:use [backtype.storm.daemon [common :only [ACKER-COMPONENT-ID]]])
- (:use [clojure.contrib.def :only [defnk]])
- (:use [clojure.contrib.seq-utils :only [find-first]])
(:use [ring.adapter.jetty :only [run-jetty]])
(:import [backtype.storm.generated TaskSpecificStats
TaskStats TaskSummary TopologyInfo SpoutStats BoltStats
@@ -16,7 +14,7 @@
[backtype.storm [thrift :as thrift]])
(:gen-class))
-(def *STORM-CONF* (read-storm-config))
+(def ^:dynamic *STORM-CONF* (read-storm-config))
(defmacro with-nimbus [nimbus-sym & body]
`(thrift/with-nimbus-connection [~nimbus-sym "localhost" (*STORM-CONF* NIMBUS-THRIFT-PORT)]
@@ -679,4 +677,4 @@
(handler/site main-routes))
(defn -main []
- (run-jetty app {:port (int (*STORM-CONF* UI-PORT))}))
+ (run-jetty app {:port (Integer. (*STORM-CONF* UI-PORT))}))
@@ -1,16 +1,14 @@
(ns backtype.storm.ui.helpers
(:use compojure.core)
(:use [hiccup core page-helpers])
- (:use [clojure.contrib
- [str-utils2 :only [join]]
- [def :only [defnk]]])
- (:use [backtype.storm.util :only [uuid]])
+ (:use [clojure [string :only [join]]])
+ (:use [backtype.storm.util :only [uuid defnk]])
(:use [clj-time coerce format])
(:require [compojure.route :as route]
[compojure.handler :as handler]))
(defn split-divide [val divider]
- [(int (/ val divider)) (mod val divider)]
+ [(Integer. (int (/ val divider))) (mod val divider)]
)
(def PRETTY-SEC-DIVIDERS
@@ -12,21 +12,105 @@
(:import [org.apache.commons.io FileUtils])
(:import [org.apache.commons.exec ExecuteException])
(:import [org.json.simple JSONValue])
- (:require [clojure.contrib [str-utils2 :as str]])
+ (:require [clojure [string :as str]])
(:require [clojure [set :as set]])
(:use [clojure walk])
(:use [backtype.storm log])
- (:use [clojure.contrib.def :only [defnk]])
)
+(defmacro defalias
+ "Defines an alias for a var: a new var with the same root binding (if
+ any) and similar metadata. The metadata of the alias is its initial
+ metadata (as provided by def) merged into the metadata of the original."
+ ([name orig]
+ `(do
+ (alter-meta!
+ (if (.hasRoot (var ~orig))
+ (def ~name (.getRawRoot (var ~orig)))
+ (def ~name))
+ ;; When copying metadata, disregard {:macro false}.
+ ;; Workaround for http://www.assembla.com/spaces/clojure/tickets/273
+ #(conj (dissoc % :macro)
+ (apply dissoc (meta (var ~orig)) (remove #{:macro} (keys %)))))
+ (var ~name)))
+ ([name orig doc]
+ (list `defalias (with-meta name (assoc (meta name) :doc doc)) orig)))
+
+;; name-with-attributes by Konrad Hinsen:
+(defn name-with-attributes
+ "To be used in macro definitions.
+ Handles optional docstrings and attribute maps for a name to be defined
+ in a list of macro arguments. If the first macro argument is a string,
+ it is added as a docstring to name and removed from the macro argument
+ list. If afterwards the first macro argument is a map, its entries are
+ added to the name's metadata map and the map is removed from the
+ macro argument list. The return value is a vector containing the name
+ with its extended metadata map and the list of unprocessed macro
+ arguments."
+ [name macro-args]
+ (let [[docstring macro-args] (if (string? (first macro-args))
+ [(first macro-args) (next macro-args)]
+ [nil macro-args])
+ [attr macro-args] (if (map? (first macro-args))
+ [(first macro-args) (next macro-args)]
+ [{} macro-args])
+ attr (if docstring
+ (assoc attr :doc docstring)
+ attr)
+ attr (if (meta name)
+ (conj (meta name) attr)
+ attr)]
+ [(with-meta name attr) macro-args]))
+
+(defmacro defnk
+ "Define a function accepting keyword arguments. Symbols up to the first
+ keyword in the parameter list are taken as positional arguments. Then
+ an alternating sequence of keywords and defaults values is expected. The
+ values of the keyword arguments are available in the function body by
+ virtue of the symbol corresponding to the keyword (cf. :keys destructuring).
+ defnk accepts an optional docstring as well as an optional metadata map."
+ [fn-name & fn-tail]
+ (let [[fn-name [args & body]] (name-with-attributes fn-name fn-tail)
+ [pos kw-vals] (split-with symbol? args)
+ syms (map #(-> % name symbol) (take-nth 2 kw-vals))
+ values (take-nth 2 (rest kw-vals))
+ sym-vals (apply hash-map (interleave syms values))
+ de-map {:keys (vec syms)
+ :or sym-vals}]
+ `(defn ~fn-name
+ [~@pos & options#]
+ (let [~de-map (apply hash-map options#)]
+ ~@body))))
+
+(defn find-first
+ "Returns the first item of coll for which (pred item) returns logical true.
+ Consumes sequences up to the first match, will consume the entire sequence
+ and return nil if no match is found."
+ [pred coll]
+ (first (filter pred coll)))
+
+(defn dissoc-in
+ "Dissociates an entry from a nested associative structure returning a new
+ nested structure. keys is a sequence of keys. Any empty maps that result
+ will not be present in the new structure."
+ [m [k & ks :as keys]]
+ (if ks
+ (if-let [nextmap (get m k)]
+ (let [newmap (dissoc-in nextmap ks)]
+ (if (seq newmap)
+ (assoc m k newmap)
+ (dissoc m k)))
+ m)
+ (dissoc m k)))
+
(defn local-hostname []
(.getCanonicalHostName (InetAddress/getLocalHost)))
(defn uuid []
(str (UUID/randomUUID)))
(defn current-time-secs []
- (int (unchecked-divide (Time/currentTimeMillis) (long 1000))))
+ (Time/currentTimeSecs))
(defn clojurify-structure [s]
(prewalk (fn [x]
@@ -125,7 +209,7 @@
(defn mk-counter []
(let [val (atom 0)]
(fn []
- (swap! val inc))))
+ (Integer. (swap! val inc)))))
(defmacro for-times [times & body]
`(for [i# (range ~times)]
@@ -324,10 +408,10 @@
(- (System/currentTimeMillis) time-ms))
(defn parse-int [str]
- (Integer/parseInt str))
+ (Integer/valueOf str))
(defn integer-divided [sum num-pieces]
- (let [base (int (/ sum num-pieces))
+ (let [base (Integer. (int (/ sum num-pieces)))
num-inc (mod sum num-pieces)
num-bases (- num-pieces num-inc)]
(if (= num-inc 0)
@@ -1,5 +1,4 @@
(ns zilch.virtual-port
- (:use [clojure.contrib.def :only [defnk]])
(:use [backtype.storm util log])
(:require [zilch [mq :as mq]])
(:import [java.nio ByteBuffer])
@@ -271,6 +271,8 @@ private static final Keyword makeKeyword(String name) {
@Override
public Object valAt(Object o) {
+ // should change this to get by field name, and push metadata stuff like this
+ // into metadata
if(o.equals(STREAM_KEYWORD)) {
return getSourceStreamId();
} else if(o.equals(COMPONENT_KEYWORD)) {
@@ -62,6 +62,10 @@ public static long currentTimeMillis() {
}
}
+ public static int currentTimeSecs() {
+ return (int) (currentTimeMillis() / 1000);
+ }
+
public static void advanceTime(long ms) {
if(!simulating.get()) throw new IllegalStateException("Cannot simulate time unless in simulation mode");
simulatedCurrTimeMs.set(simulatedCurrTimeMs.get() + ms);
@@ -1,7 +1,7 @@
(ns backtype.storm.integration-test
(:use [clojure test])
(:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter])
- (:use [backtype.storm bootstrap testing])
+ (:use [backtype.storm bootstrap clojure testing])
(:use [backtype.storm.daemon common])
)
@@ -1,6 +1,5 @@
(ns backtype.storm.nimbus-test
(:use [clojure test])
- (:use [clojure.contrib.def :only [defnk]])
(:require [backtype.storm.daemon [nimbus :as nimbus]])
(:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter])
(:use [backtype.storm bootstrap testing])
@@ -6,8 +6,8 @@
(defn uuid [] (str (UUID/randomUUID)))
(defn random-msg []
- (byte-array (map byte (for [i (range (int (rand 100)))]
- (int (rand 100))
+ (byte-array (map byte (for [i (range (Integer. (int (rand 100))))]
+ (Integer. (int (rand 100)))
))))
(def url

0 comments on commit f597bb1

Please sign in to comment.