Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

make it easier to move to clojure 1.3 #1

Closed
wants to merge 17 commits into from

2 participants

@pyr

by removing dependencies on clojure.contrib material

Pierre-Yves ... added some commits
@pyr

The round two commit is a bit more debateable, since it doesn't necessarily help as far as readability is concerned, but defnk didn't make it in clojure.core.incubator, so it will be left behind in clojure 1.3 it seems

@pyr

With this last batch of commits, clojure contrib should be eliminated from storm's dependencies. I only have the simple topologies to play with right now. If more complex use cases don't generate any odd behavior, contrib can be removed from dependencies and people will be able to start playing with 1.3

@nathanmarz
Owner

Making Storm Clojure 1.3 compatible is definitely a good thing to do. A few notes on these commits:

  1. Like Sam said, some and find-first are not equivalent. We can just copy the find-first implementation in backtype.storm.utils to get rid of the dependency.
  2. Likewise, instead of dirtying up the code, how about we just copy defnk into backtype.storm.utils?

Copying those functions isn't great, but unless there's a Clojure 1.3 compatible dependency we can pull in, I think it's the best we can do.

Pierre-Yves ... added some commits
Pierre-Yves Ritschard update wrong usage of some 7b47947
Pierre-Yves Ritschard typos 9ddcee7
@pyr

Instead of copying find-first i used (first (filter)) which uses the same result. I usually use some with fn's returning the element which yields the same result, sorry.

I fixed the defnk uses, I just feel there's not that much to warrant copying the function over, but if you feel it's too ugly i can copy it.

@nathanmarz
Owner

Yes, I'd like to keep the usage of defnk. Let's just copy that into backtype.storm.util. To keep the history clean, I think it would be best to make a new pull request with one commit for the defnk change, one commit the the find-first change, and then other commits for the other minor changes you made. I'd do it myself, but I'd like you to get credit for the commits!

@pyr
pyr added some commits
@pyr pyr bring back defnk f44cc73
@pyr pyr Merge branch 'master' of https://github.com/nathanmarz/storm
Conflicts:
	src/clj/backtype/storm/testing.clj
	src/clj/backtype/storm/thrift.clj
	src/clj/zilch/virtual_port.clj
0254c45
@pyr pyr make it easier to move to clojure 1.3 a2ca801
@pyr

Done, I'll just need to resort the history now

pyr added some commits
@pyr pyr re-remove contrib references 78f89fb
@pyr pyr find-first and contrib fixes abc260c
@pyr pyr Merge branch 'master' of github.com:pyr/storm into HEAD
Conflicts:
	src/clj/backtype/storm/daemon/common.clj
b672f5d
@pyr pyr make it easier to move to clojure 1.3
re-remove contrib references

find-first and contrib fixes

make it easier to move to clojure 1.3

Round two of contrib removal, replace defnk

round three find-first can also be called some

round 4: join is available in clojure.string

remove ceil dependency as code is commented out

remove dissoc-in since it is not used anymore

update wrong usage of some

typos

bring back defnk
ac95789
@pyr pyr Merge branch 'master' of github.com:pyr/storm
Conflicts:
	src/clj/backtype/storm/util.clj
df82aad
@pyr pyr a conflict slip through 7984112
@pyr pyr closed this
@mattwilliamsnyc mattwilliamsnyc referenced this pull request from a commit
@ptgoetz ptgoetz updated readme per issue #1 c5ba15f
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Sep 19, 2011
  1. make it easier to move to clojure 1.3

    Pierre-Yves Ritschard authored
  2. Round two of contrib removal, replace defnk

    Pierre-Yves Ritschard authored
  3. round three find-first can also be called some

    Pierre-Yves Ritschard authored
  4. round 4: join is available in clojure.string

    Pierre-Yves Ritschard authored
  5. remove ceil dependency as code is commented out

    Pierre-Yves Ritschard authored
  6. remove dissoc-in since it is not used anymore

    Pierre-Yves Ritschard authored
Commits on Sep 20, 2011
  1. update wrong usage of some

    Pierre-Yves Ritschard authored
  2. typos

    Pierre-Yves Ritschard authored
Commits on Sep 30, 2011
  1. @pyr

    bring back defnk

    pyr authored
  2. @pyr

    Merge branch 'master' of https://github.com/nathanmarz/storm

    pyr authored
    Conflicts:
    	src/clj/backtype/storm/testing.clj
    	src/clj/backtype/storm/thrift.clj
    	src/clj/zilch/virtual_port.clj
  3. @pyr
  4. @pyr

    re-remove contrib references

    pyr authored
  5. @pyr

    find-first and contrib fixes

    pyr authored
  6. @pyr

    Merge branch 'master' of github.com:pyr/storm into HEAD

    pyr authored
    Conflicts:
    	src/clj/backtype/storm/daemon/common.clj
  7. @pyr

    make it easier to move to clojure 1.3

    pyr authored
    re-remove contrib references
    
    find-first and contrib fixes
    
    make it easier to move to clojure 1.3
    
    Round two of contrib removal, replace defnk
    
    round three find-first can also be called some
    
    round 4: join is available in clojure.string
    
    remove ceil dependency as code is commented out
    
    remove dissoc-in since it is not used anymore
    
    update wrong usage of some
    
    typos
    
    bring back defnk
  8. @pyr

    Merge branch 'master' of github.com:pyr/storm

    pyr authored
    Conflicts:
    	src/clj/backtype/storm/util.clj
  9. @pyr

    a conflict slip through

    pyr authored
This page is out of date. Refresh to see the latest.
View
1  project.clj
@@ -7,6 +7,7 @@
:dev-resources-path "src/dev"
:dependencies [[org.clojure/clojure "1.2.0"]
[org.clojure/clojure-contrib "1.2.0"]
+ [org.clojure/tools.logging "0.1.2"]
[commons-io "1.4"]
[org.apache.commons/commons-exec "1.1"]
[jvyaml "1.0.0"]
View
1  src/clj/backtype/storm/bootstrap.clj
@@ -17,7 +17,6 @@
(require (quote [backtype.storm.messaging.loader :as msg-loader]))
(require (quote [backtype.storm.messaging.protocol :as msg]))
(use (quote [backtype.storm config util log clojure]))
- (use (quote [clojure.contrib.seq :only [find-first]]))
(require (quote [backtype.storm [thrift :as thrift] [cluster :as cluster]
[event :as event] [process-simulator :as psim]]))
(require (quote [clojure.set :as set]))
View
2  src/clj/backtype/storm/clojure.clj
@@ -1,5 +1,5 @@
(ns backtype.storm.clojure
- (:use [clojure.contrib.def :only [defnk defalias]])
+ (:use [clojure.contrib.def :only [defalias]])
(:use [backtype.storm bootstrap util])
(:import [backtype.storm LocalCluster StormSubmitter])
(:import [backtype.storm.generated StreamInfo])
View
1  src/clj/backtype/storm/cluster.clj
@@ -2,7 +2,6 @@
(:import [org.apache.zookeeper.data Stat])
(:import [backtype.storm.utils Utils])
(:use [backtype.storm util log config])
- (:use [clojure.contrib.core :only [dissoc-in]])
(:require [backtype.storm [zookeeper :as zk]])
)
View
2  src/clj/backtype/storm/config.clj
@@ -4,7 +4,7 @@
(:import [backtype.storm Config])
(:import [backtype.storm.utils Utils LocalState])
(:import [org.apache.commons.io FileUtils])
- (:require [clojure.contrib [str-utils2 :as str]])
+ (:require [clojure.string :as str])
(:use [backtype.storm util])
)
View
5 src/clj/backtype/storm/daemon/common.clj
@@ -1,5 +1,4 @@
(ns backtype.storm.daemon.common
- (:use [clojure.contrib.seq-utils :only [find-first]])
(:use [backtype.storm log config util])
)
@@ -67,9 +66,9 @@
(defn get-storm-id [storm-cluster-state storm-name]
(let [active-storms (.active-storms storm-cluster-state)]
- (find-first
+ (first (filter
#(= storm-name (:storm-name (.storm-base storm-cluster-state % nil)))
- active-storms)
+ active-storms))
))
(defn topology-bases [storm-cluster-state]
View
2  src/clj/backtype/storm/log.clj
@@ -1,5 +1,5 @@
(ns backtype.storm.log
- (:require [clojure.contrib [logging :as log]]))
+ (:require [clojure.tools [logging :as log]]))
(defmacro log-message [& args]
`(log/info (str ~@args)))
View
6 src/clj/backtype/storm/stats.clj
@@ -3,9 +3,7 @@
NotAliveException AlreadyAliveException InvalidTopologyException GlobalStreamId
ClusterSummary TopologyInfo TopologySummary TaskSummary TaskStats TaskSpecificStats
SpoutStats BoltStats ErrorInfo SupervisorSummary])
- (:use [backtype.storm util])
- (:use [clojure.contrib.seq-utils :only [find-first]])
- (:use [clojure.contrib.math :only [ceil]]))
+ (:use [backtype.storm util]))
;;TODO: consider replacing this with some sort of RRD
@@ -121,7 +119,7 @@
(apply rolling-window-set update-keyed-avg merge-keyed-avg extract-keyed-avg num-buckets bucket-sizes))
;; (defn choose-bucket [val buckets]
-;; (let [ret (find-first #(<= val %) buckets)]
+;; (let [ret (first (filter #(<= val %) buckets))]
;; (if ret
;; ret
;; (* 10 (first buckets)))
View
20 src/clj/backtype/storm/testing.clj
@@ -14,8 +14,6 @@
SpoutTracker BoltTracker TrackerAggregator])
(:require [backtype.storm [zookeeper :as zk]])
(:require [backtype.storm.messaging.loader :as msg-loader])
- (:use [clojure.contrib.def :only [defnk]])
- (:use [clojure.contrib.seq :only [find-first]])
(:use [backtype.storm cluster util thrift config log]))
(defn feeder-spout [fields]
@@ -62,7 +60,7 @@
(advance-time-ms! (* (long secs) 1000)))
-(defnk add-supervisor [cluster-map :ports 2 :conf {} :id nil]
+(defn add-supervisor [cluster-map & {:keys [ports conf id] :or {ports 2 conf {}}}]
(let [tmp-dir (local-temp-path)
port-ids (if (sequential? ports) ports (doall (repeatedly ports (:port-counter cluster-map))))
supervisor-conf (merge (:daemon-conf cluster-map)
@@ -87,7 +85,10 @@
;; local dir is always overridden in maps
;; can customize the supervisors (except for ports) by passing in map for :supervisors parameter
;; if need to customize amt of ports more, can use add-supervisor calls afterwards
-(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {}]
+(defn mk-local-storm-cluster [& {:keys [supervisors ports-per-supervisor daemon-conf]
+ :or {supervisors 2
+ ports-per-supervisor 3
+ daemon-conf {}}}]
(let [zk-port 2181
daemon-conf (merge (read-storm-config)
{TOPOLOGY-SKIP-MISSING-SERIALIZATIONS true
@@ -122,14 +123,13 @@
(defn get-supervisor [cluster-map supervisor-id]
(let [finder-fn #(= (.get-id %) supervisor-id)]
- (find-first finder-fn @(:supervisors cluster-map))
+ (first (filter finder-fn @(:supervisors cluster-map)))
))
(defn kill-supervisor [cluster-map supervisor-id]
(let [finder-fn #(= (.get-id %) supervisor-id)
supervisors @(:supervisors cluster-map)
- sup (find-first finder-fn
- supervisors)]
+ sup (first (filter finder-fn supervisors))]
;; tmp-dir will be taken care of by shutdown
(reset! (:supervisors cluster-map) (remove-first finder-fn supervisors))
(.shutdown sup)
@@ -258,7 +258,7 @@
(defmacro capture-shutdown-workers [& body]
`(:shutdown (capture-changed-workers ~@body)))
-(defnk aggregated-stat [cluster-map storm-name stat-key :component-ids nil]
+(defn aggregated-stat [cluster-map storm-name stat-key &{:keys [component-ids]}]
(let [state (:storm-cluster-state cluster-map)
storm-id (common/get-storm-id state storm-name)
component->tasks (reverse-map
@@ -294,7 +294,9 @@
;; TODO: mock-sources needs to be able to mock out state spouts as well
-(defnk complete-topology [cluster-map topology :mock-sources {} :storm-conf {}]
+(defn complete-topology [cluster-map topology & {:keys [mock-sources storm-conf]
+ :or {mock-sources {}
+ storm-conf {}}}]
(let [storm-name (str "topologytest-" (uuid))
state (:storm-cluster-state cluster-map)
spouts (.get_spouts topology)
View
26 src/clj/backtype/storm/thrift.clj
@@ -6,9 +6,7 @@
(:import [backtype.storm.topology OutputFieldsGetter IBasicBolt BasicBoltExecutor])
(:import [org.apache.thrift.protocol TBinaryProtocol TProtocol])
(:import [org.apache.thrift.transport TTransport TFramedTransport TSocket])
- (:use [backtype.storm util])
- (:use [clojure.contrib.def :only [defnk]])
- )
+ (:use [backtype.storm util]))
(def grouping-constants
{Grouping$_Fields/FIELDS :fields
@@ -136,24 +134,20 @@
(mk-grouping grouping-spec)]
)))
-(defnk mk-bolt-spec [inputs bolt :parallelism-hint nil :p nil]
- ;; for backwards compatibility
- (let [parallelism-hint (if p p parallelism-hint)
- bolt (if (instance? IBasicBolt bolt) (BasicBoltExecutor. bolt) bolt)]
+(defn mk-bolt-spec [inputs bolt & {:keys [parallelism-hint]}]
+ (let [bolt (if (instance? IBasicBolt bolt) (BasicBoltExecutor. bolt) bolt)]
(Bolt.
(mk-inputs inputs)
(ComponentObject/serialized_java (Utils/serialize bolt))
(mk-component-common bolt parallelism-hint)
)))
-(defnk mk-shell-bolt-spec [inputs command script output-spec :parallelism-hint nil :p nil]
- ;; for backwards compatibility
- (let [parallelism-hint (if p p parallelism-hint)]
- (Bolt.
- (mk-inputs inputs)
- (ComponentObject/shell (ShellComponent. command script))
- (mk-plain-component-common output-spec parallelism-hint)
- )))
+(defn mk-shell-bolt-spec [inputs command script output-spec & {:keys [parallelism-hint]}]
+ (Bolt.
+ (mk-inputs inputs)
+ (ComponentObject/shell (ShellComponent. command script))
+ (mk-plain-component-common output-spec parallelism-hint)
+ ))
(defn mk-topology
([spout-map bolt-map]
@@ -161,7 +155,7 @@
([spout-map bolt-map state-spout-map]
(StormTopology. spout-map bolt-map state-spout-map)))
-(defnk coordinated-bolt [bolt :type nil :all-out false]
+(defn coordinated-bolt [bolt & {:keys [type all-out] :or {all-out false}}]
(let [source (condp = type
nil nil
:all (CoordinatedBolt$SourceArgs/all)
View
6 src/clj/backtype/storm/ui/core.clj
@@ -4,8 +4,6 @@
(:use [backtype.storm config util])
(:use [backtype.storm.ui helpers])
(:use [backtype.storm.daemon [common :only [ACKER-COMPONENT-ID]]])
- (:use [clojure.contrib.def :only [defnk]])
- (:use [clojure.contrib.seq-utils :only [find-first]])
(:use [ring.adapter.jetty :only [run-jetty]])
(:import [backtype.storm.generated TaskSpecificStats
TaskStats TaskSummary TopologyInfo SpoutStats BoltStats
@@ -425,7 +423,7 @@
(sort-by #(.get_task_id ^TaskSummary %) ret)
))
-(defnk task-link [topology-id id :suffix ""]
+(defn task-link [topology-id id & {:keys [suffix] :or {suffix ""}}]
(link-to (format "/topology/%s/task/%s%s" topology-id id suffix)
id))
@@ -648,7 +646,7 @@
topology (.getTopology ^Nimbus$Client nimbus topology-id)
task (->> summ
.get_tasks
- (find-first #(= (.get_task_id ^TaskSummary %) task-id)))]
+ (first (filter #(= (.get_task_id ^TaskSummary %) task-id))))]
(concat
[[:h2 "Task summary"]]
[(task-summary-table task summ)]
View
6 src/clj/backtype/storm/ui/helpers.clj
@@ -1,9 +1,7 @@
(ns backtype.storm.ui.helpers
(:use compojure.core)
(:use [hiccup core page-helpers])
- (:use [clojure.contrib
- [str-utils2 :only [join]]
- [def :only [defnk]]])
+ (:use [clojure.string :only [join]])
(:use [backtype.storm.util :only [uuid]])
(:use [clj-time coerce format])
(:require [compojure.route :as route]
@@ -70,7 +68,7 @@
)]
])
-(defnk sort-table [id :sort-list "[[0,0]]" :time-cols []]
+(defn sort-table [id & {:keys [sort-list time-cols] :or {sort-list "[[0,0]]" time-cols []}}]
(let [strs (for [c time-cols] (format "%s: { sorter: 'stormtimestr'}" c))
sorters (join ", " strs)]
[:script
View
39 src/clj/backtype/storm/util.clj
@@ -12,12 +12,10 @@
(:import [org.apache.commons.io FileUtils])
(:import [org.apache.commons.exec ExecuteException])
(:import [org.json.simple JSONValue])
- (:require [clojure.contrib [str-utils2 :as str]])
+ (:require [clojure.string :as str])
(:require [clojure [set :as set]])
(:use [clojure walk])
- (:use [backtype.storm log])
- (:use [clojure.contrib.def :only [defnk]])
- )
+ (:use [backtype.storm log]))
(defn local-hostname []
(.getCanonicalHostName (InetAddress/getLocalHost)))
@@ -199,12 +197,12 @@
(sleeping? [this]))
;; afn returns amount of time to sleep
-(defnk async-loop [afn
- :daemon false
- :kill-fn (fn [error] (halt-process! 1 "Async loop died!"))
- :priority Thread/NORM_PRIORITY
- :args-fn (fn [] [])
- :start true]
+(defn async-loop [afn & {:keys [daemon kill-fn priority args-fn start]
+ :or {daemon false
+ kill-fn (fn [error] (halt-process! 1 "Async loop died!"))
+ priority Thread/NORM_PRIORITY
+ args-fn (fn [] [])
+ start true}}]
(let [thread (Thread.
(fn []
(try
@@ -501,3 +499,24 @@
(defn bit-xor-vals [vals]
(reduce bit-xor 0 vals))
+
+(defmacro defnk
+ "NOTE: copied over from clojure-contrib to allow reuse in 1.3
+ Define a function accepting keyword arguments. Symbols up to the first
+ keyword in the parameter list are taken as positional arguments. Then
+ an alternating sequence of keywords and defaults values is expected. The
+ values of the keyword arguments are available in the function body by
+ virtue of the symbol corresponding to the keyword (cf. :keys destructuring).
+ defnk accepts an optional docstring as well as an optional metadata map."
+ [fn-name & fn-tail]
+ (let [[fn-name [args & body]] (name-with-attributes fn-name fn-tail)
+ [pos kw-vals] (split-with symbol? args)
+ syms (map #(-> % name symbol) (take-nth 2 kw-vals))
+ values (take-nth 2 (rest kw-vals))
+ sym-vals (apply hash-map (interleave syms values))
+ de-map {:keys (vec syms)
+ :or sym-vals}]
+ `(defn ~fn-name
+ [~@pos & options#]
+ (let [~de-map (apply hash-map options#)]
+ ~@body))))
View
11 src/clj/zilch/virtual_port.clj
@@ -1,5 +1,4 @@
(ns zilch.virtual-port
- (:use [clojure.contrib.def :only [defnk]])
(:use [backtype.storm util log])
(:require [zilch [mq :as mq]])
(:import [java.nio ByteBuffer])
@@ -46,11 +45,11 @@
([^ZMQ$Socket socket virtual-port ^bytes message]
(virtual-send socket virtual-port message ZMQ/NOBLOCK)))
-(defnk launch-virtual-port!
- [context url :daemon true
- :kill-fn (fn [t] (System/exit 1))
- :priority Thread/NORM_PRIORITY
- :valid-ports nil]
+(defn launch-virtual-port!
+ [context url & {:keys [daemon kill-fn priority valid-ports]
+ :or {daemon true
+ kill-fn (fn [] (System/exit 1))
+ priority Thread/NORM_PRIORITY}}]
(let [valid-ports (set (map short valid-ports))
vthread (async-loop
(fn [^ZMQ$Socket socket virtual-mapping]
Something went wrong with that request. Please try again.