Skip to content
This repository has been archived by the owner on Sep 2, 2020. It is now read-only.

Cache paths for elastic search store #36

Merged
merged 4 commits into from
Jul 2, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions src/org/spootnik/cyanite/carbon.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,32 @@
[gloss.core :refer [string]]
[lamina.core :refer [receive-all map* siphon]]))

(defn parse-num
"parse a number into the given value, return the
default value if it fails"
[parse default number]
(try (parse number)
(catch Exception e
(debug "got an invalid number" number (.getMessage e))
default)))

(defn formatter
"Split each line on whitespace, discard nan metric lines
and format correct lines for each resolution"
[index rollups input]
(let [[path metric time] (s/split (.trim input) #" ")]
(when (not= metric "nan")
(let [[path metric time] (s/split (.trim input) #" ")
metric (parse-num #(Double/parseDouble %) "nan" metric)
time (parse-num #(Long/parseLong %) "nan" time)]
(when (and (not= metric "nan") (not= time "nan"))
;; hardcode empty tenant for now
(when index (path/register index "" path))
(for [{:keys [rollup period rollup-to]} rollups]
{:path path
:rollup rollup
:period period
:ttl (* period rollup)
:time (rollup-to (Long/parseLong time))
:metric (Double/parseDouble metric)}))))
:time (rollup-to time)
:metric metric}))))

(defn handler
"Send each metric over to the cassandra store"
Expand Down
24 changes: 19 additions & 5 deletions src/org/spootnik/cyanite/es_path.clj
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@
(defn search
"search for a path"
[query scroll tenant path leafs-only]
(let [res (query :query (build-es-query path tenant leafs-only) :size 100 :search_type "query_then_fetch" :scroll "1m")
(let [res (query :query (build-es-query path tenant leafs-only)
:size 100
:search_type "query_then_fetch"
:scroll "1m")
hits (scroll res)]
(map #(:_source %) hits)))

Expand All @@ -74,11 +77,20 @@
(dorun (map #(if (not (path-exists? (:path %)))
(write-key (:path %) %)) paths))))

(defn path-exists-cache?
[path-exists? store path]
(if (contains? @store path)
true
(if (path-exists? path)
(do (swap! store assoc path true) true)
false)))

(defn es-rest
[{:keys [index url]
:or {index "cyanite_paths" url "http://localhost:9200"}}]
(let [conn (esr/connect url)
existsfn (partial esrd/present? conn index ES_DEF_TYPE)
(let [store (atom {})
conn (esr/connect url)
existsfn (partial path-exists-cache? (partial esrd/present? conn index ES_DEF_TYPE) store)
updatefn (partial esrd/put conn index ES_DEF_TYPE)
scrollfn (partial esrd/scroll-seq conn)
queryfn (partial esrd/search conn index ES_DEF_TYPE)]
Expand All @@ -92,12 +104,14 @@
(lookup [this tenant path]
(map :path (search queryfn scrollfn tenant path true))))))


(defn es-native
[{:keys [index host port cluster_name]
:or {index "cyanite_paths" host "localhost" port 9300}}]
(let [conn (esn/connect [[host port]]
(let [store (atom {})
conn (esn/connect [[host port]]
{"cluster.name" cluster_name})
existsfn (partial esnd/present? conn index ES_DEF_TYPE)
existsfn (partial path-exists-cache? (partial esnd/present? conn index ES_DEF_TYPE) store)
updatefn (partial esnd/put conn index ES_DEF_TYPE)
scrollfn (partial esnd/scroll-seq conn)
queryfn (partial esnd/search conn index ES_DEF_TYPE)]
Expand Down