Skip to content

Commit

Permalink
I checked in wrong version of knucleotide.clojure-7.clojure last time.
Browse files Browse the repository at this point in the history
  • Loading branch information
jafingerhut committed Sep 4, 2012
1 parent c61f146 commit a680049
Showing 1 changed file with 167 additions and 50 deletions.
217 changes: 167 additions & 50 deletions knucleotide/knucleotide.clojure-7.clojure
Expand Up @@ -6,27 +6,118 @@
;; modified by Mike Anderson to make better use of primitive operations

(ns knucleotide
(:import java.util.concurrent.ExecutorService
java.util.concurrent.Executors)
(:gen-class))

(set! *warn-on-reflection* true)


;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; This is a copy of part of Amit Rathore's Medusa package, which
;; allows you to submit a bunch of Clojure expressions to run to a
;; thread pool with a fixed size. No more than that many threads will
;; ever run at once, but Medusa tries to keep that many threads going
;; at all times, as long as there are things to do that have been
;; submitted. This is unlike Clojure's built-in pmap, which often
;; runs fewer threads in parallel if the run time of the jobs differs
;; significantly from each other.
;;
;; git clone http://github.com/amitrathore/clj-utils.git
;; git clone http://github.com/amitrathore/medusa.git
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

(def THREADPOOL)

(def running-futures (ref {}))

(defn create-runonce [function]
(let [sentinel (Object.)
result (atom sentinel)]
(fn [& args]
(locking sentinel
(if (= @result sentinel)
(reset! result (apply function args))
@result)))))

(defmacro defrunonce [fn-name args & body]
`(def ~fn-name (create-runonce (fn ~args ~@body))))

(defrunonce init-medusa [pool-size]
(def THREADPOOL (Executors/newFixedThreadPool pool-size)))

(defn claim-thread [future-id]
(let [thread-info {:thread (Thread/currentThread) :future-id future-id
:started (System/currentTimeMillis)}]
(dosync (alter running-futures assoc future-id thread-info))))

(defn mark-completion [future-id]
(dosync (alter running-futures dissoc future-id)))

(defn medusa-future-thunk [future-id thunk]
(let [^Callable work (fn []
(claim-thread future-id)
(let [val (thunk)]
(mark-completion future-id)
val))]
(.submit ^ExecutorService THREADPOOL work)))

(defn random-uuid []
(str (java.util.UUID/randomUUID)))

(defmacro medusa-future [& body]
`(medusa-future-thunk (random-uuid) (fn [] (do ~@body))))

(defn medusa-pmap [num-threads f coll]
(if (== num-threads 1)
(map f coll)
(do
(init-medusa num-threads)
(let [seq-of-futures (doall (map #(medusa-future (f %)) coll))]
(map (fn [java-future] (.get ^java.util.concurrent.Future java-future))
seq-of-futures)))))

(defn shutdown-medusa []
(.shutdown ^ExecutorService THREADPOOL))

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; This is the end of the subset of Medusa code.
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;


(defmacro key-type [num]
`(long ~num))

(definterface ITallyCounter
(^int get_count [])
(definterface IFragment
(set_key_BANG_ [^long k])
(^long get_key [])
(inc_BANG_ [])
(add_BANG_ [^int n]))


(deftype TallyCounter [^{:unsynchronized-mutable true :tag int} cnt]
ITallyCounter
(get-count [this] cnt)
(add_BANG_ [^int n])
(^int get_count []))


(deftype Fragment [^{:unsynchronized-mutable true :tag long} key
^{:unsynchronized-mutable true :tag int} cnt]
Object
;; TBD: Is there a way to return an int hashCode that is a truncated
;; version of the long value key without using bit-and? Simply
;; using (int key) throws an exception if key is larger than
;; Integer/MAX_VALUE, e.g. (int Long/MAX_VALUE).
(^int hashCode [this]
(int (bit-and key Integer/MAX_VALUE)))
(^boolean equals [this ^Object o]
(let [^Fragment f o]
(== key (.key f))))

IFragment
(set-key! [this ^long k]
(set! key k))
(get-key [this] key)
(inc! [this]
(set! cnt (unchecked-inc-int cnt)))
(add! [this ^int n]
(set! cnt (unchecked-add-int cnt n))))
(set! cnt (unchecked-add-int cnt n)))
(get-count [this] cnt))



Expand Down Expand Up @@ -109,10 +200,11 @@
(recur new-key (dec offset)))))))


(defn key-to-dna-str [k len]
(apply str (map code-val-to-dna-char
(map (fn [pos] (bit-and 3 (bit-shift-right k pos)))
(range 0 (* 2 len) 2)))))
(defn key-to-dna-str [^Fragment f len]
(let [k (.get-key f)]
(apply str (map code-val-to-dna-char
(map (fn [pos] (bit-and 3 (bit-shift-right k pos)))
(range 0 (* 2 len) 2))))))

;; required function : "to update a hashtable of k-nucleotide keys and
;; count values, for a particular reading-frame"
Expand All @@ -126,24 +218,29 @@
(loop [offset (int end-offset)
key (key-type (dna-str-to-key dna-str offset len))
tally (let [h (java.util.HashMap.)
one (TallyCounter. (int 1))]
(.put h key one)
h)]
one (Fragment. (long key) (int 1))]
(.put h one one)
h)
fragment (Fragment. (long 0) (int 1))]
(if (<= offset start-offset)
tally
(let [new-offset (unchecked-dec-int offset)
(let [new-offset (unchecked-dec offset)
new-first-char-code (dna-char-to-code-val
(.charAt dna-str new-offset))
new-key (key-type (bit-and mask (unchecked-add (bit-shift-left key 2)
new-first-char-code)))]
(if-let [^TallyCounter cur-count (get tally new-key)]
(.inc! cur-count)
(let [one (TallyCounter. (int 1))]
(.put tally new-key one)))
(recur new-offset new-key tally))))))


(defn ^:static getcnt ^long [^TallyCounter tc]
(.set-key! fragment new-key)
(if-let [^Fragment cur-fragment (get tally fragment)]
(do
(.inc! cur-fragment)
(recur new-offset new-key tally fragment))
(do
(let [new-fragment (Fragment. (long 0) (int 1))]
(.put tally fragment fragment)
(recur new-offset new-key tally new-fragment)))))))))


(defn ^:static getcnt ^long [^Fragment tc]
(.get-count tc))

(defn ^:static tally-total [tally]
Expand Down Expand Up @@ -173,9 +270,9 @@


(defn one-tally-to-str [dna-str tallies]
(let [zerotc (TallyCounter. 0)
k (dna-str-to-key dna-str)]
(format "%d\t%s" (reduce + (map #(getcnt (get % k zerotc))
(let [zerotc (Fragment. 0 0)
^Fragment f (Fragment. (long (dna-str-to-key dna-str)) 0)]
(format "%d\t%s" (reduce + (map #(getcnt (get % f zerotc))
tallies))
dna-str)))

Expand All @@ -188,9 +285,8 @@
(repeat min-units-per-piece)))))


(defn break-work-into-pieces [{:keys [kind] :as m} dna-str]
(defn break-work-into-pieces [{:keys [kind n-pieces] :as m} dna-str]
(let [substr-len (if (= kind :tally-all) (:substr-len m) (count (:substr m)))
n-pieces (if (= kind :tally-all) 1 (quot (inc substr-len) 2))
n-substrs (inc (- (count dna-str) substr-len))
sizes (piece-sizes n-substrs n-pieces)
start-end-offsets (map (fn [[a b]] [a (dec b)])
Expand All @@ -204,20 +300,28 @@


(defn do-one-piece [{:keys [substr-len dna-str start-offset end-offset] :as m}]
(assoc m :tally-table (tally-dna-subs-with-len substr-len dna-str
start-offset end-offset)))
(binding [*out* *err*]
(println (format "begin start-one-piece %2d %6d %6d\n"
substr-len start-offset end-offset)))
(let [v
(assoc m :tally-table (tally-dna-subs-with-len substr-len dna-str
start-offset end-offset))]
(binding [*out* *err*]
(println (format "end start-one-piece %2d %6d %6d\n"
substr-len start-offset end-offset)))
v))

;; Like merge-with, except it only works for the HashMaps with
;; TallyCounter's as values. It mutates the first HashMap given in
;; place, and potentially some of the TallyCounters in all of the
;; Fragments as key/value pairs. It mutates the first HashMap given
;; in place, and potentially some of the Fragments in all of the
;; hashmaps.
(defn add-tally-hashmaps! [hmaps]
(let [merge-entry (fn [^java.util.HashMap hm e]
(let [k (key e) v (val e)]
(if (contains? hm k)
(let [^TallyCounter cur-count (get hm k)
(let [^Fragment cur-fragment (get hm k)
n (int (getcnt v))]
(.add! cur-count n))
(.add! cur-fragment n))
(.put hm k v)))
hm)
merge2 (fn [hm1 hm2]
Expand Down Expand Up @@ -250,19 +354,31 @@


(defn run [br]
(let [dna-str (fasta-dna-str-with-desc-beginning "THREE" (line-seq br))
work-pieces-todo (mapcat
#(break-work-into-pieces % dna-str)
[
{:kind :tally-one :substr "GGTATTTTAATTTATAGT"}
{:kind :tally-all :substr-len 1}
{:kind :tally-one :substr "GGTATTTTAATT"}
{:kind :tally-all :substr-len 2}
{:kind :tally-one :substr "GGTATT"}
{:kind :tally-one :substr "GGTA"}
{:kind :tally-one :substr "GGT"}
])
work-pieces-done (pmap do-one-piece work-pieces-todo)
(let [n-threads (.. Runtime getRuntime availableProcessors)
dna-str (fasta-dna-str-with-desc-beginning "THREE" (line-seq br))
work-pieces-todo
(mapcat #(break-work-into-pieces % dna-str)
[{:kind :tally-all :n-pieces 1 :substr-len 1}
{:kind :tally-all :n-pieces 1 :substr-len 2}
;; 26.3 elapsed, 134.9 user
;; 26.1 elapsed, 135.0 user
;; 26.2 elapsed, 135.2 user
{:kind :tally-one :n-pieces 3 :substr "GGT"}
{:kind :tally-one :n-pieces 4 :substr "GGTA"}
{:kind :tally-one :n-pieces 6 :substr "GGTATT"}
{:kind :tally-one :n-pieces 12 :substr "GGTATTTTAATT"}
{:kind :tally-one :n-pieces 18 :substr "GGTATTTTAATTTATAGT"}

;; 30.7 elapsed, 156.4 user
;; 28.5 elapsed, 142.8 user
;; 27.8 elapsed, 133.0 user
;; {:kind :tally-one :n-pieces 2 :substr "GGT"}
;; {:kind :tally-one :n-pieces 2 :substr "GGTA"}
;; {:kind :tally-one :n-pieces 3 :substr "GGTATT"}
;; {:kind :tally-one :n-pieces 3 :substr "GGTATTTTAATT"}
;; {:kind :tally-one :n-pieces 4 :substr "GGTATTTTAATTTATAGT"}
])
work-pieces-done (medusa-pmap n-threads do-one-piece work-pieces-todo)
grouped-results (partition-by :substr-len work-pieces-done)
combined-results (pmap combine-pieces grouped-results)
results (sort-by :substr-len combined-results)]
Expand All @@ -273,4 +389,5 @@
(defn -main [& args]
(with-open [br (java.io.BufferedReader. *in*)]
(run br))
(shutdown-agents))
(shutdown-agents)
(shutdown-medusa))

0 comments on commit a680049

Please sign in to comment.