-
Notifications
You must be signed in to change notification settings - Fork 0
/
common.clj
153 lines (126 loc) · 4.1 KB
/
common.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
(ns re-share.es.common
"Common ES functions"
(:refer-clojure :exclude (get))
(:require
[clojure.string :refer (split)]
[re-share.core :refer (error-m)]
[re-share.config :refer (get!)]
[taoensso.timbre :refer (refer-timbre)]
[qbits.spandex :as s]
[clj-time.core :as t]
[clj-time.format :as f]
[re-share.es.node :refer (connection)]))
(refer-timbre)
; Common ES functions
(defn- ok [resp]
(#{200 201} (:status resp)))
(defn- illegal [e]
(instance? java.lang.IllegalStateException e))
; when we reset the connection
(defn- reactor-stopped [e]
(let [c "Request cannot be executed; I/O reactor status: STOPPED"]
(and (illegal e) (= (-> e Throwable->map :cause) c))))
(defn- handle-ex [e]
(when-not (reactor-stopped e)
(error-m e)
(throw e)))
(defn exists-call
[target]
(try
(ok (s/request (connection) {:url target :method :head}))
(catch Exception e
(when-not (= 404 (:status (ex-data e)))
(handle-ex e)))))
(defn exists?
"Check if index exists or instance with id existing within an index"
([index]
(exists-call [index]))
([index t id]
(exists-call [index t id])))
(defn delete-call
[target]
(try
(ok (s/request (connection) {:url target :method :delete}))
(catch Exception e
(handle-ex e))))
(defn delete
"Delete all under index or a single id"
([index t]
(delete-call [index t]))
([index t id]
(delete-call [index t id])))
(defn delete-all
[index]
(try
(ok (s/request (connection) {:url [index :_delete_by_query] :method :post :body {:query {:match_all {}}}}))
(catch Exception e
(handle-ex e))))
(defn put-call
[target m]
(try
(ok (s/request (connection) {:url target :method :put :body m}))
(catch Exception e
(handle-ex e))))
(defn put [index t id m]
(put-call [index t id] m))
(defn get [index t id]
(try
(get-in (s/request (connection) {:url [index t id] :method :get}) [:body :_source])
(catch Exception e
(when-not (= 404 (:status (ex-data e)))
(handle-ex e)))))
(defn create
"Persist instance m of and return generated id"
[index t m]
(try
(let [{:keys [status body] :as resp} (s/request (connection) {:url [index t] :method :post :body m})]
(when-not (ok resp)
(throw (ex-info "failed to create" {:resp resp :m m :index index})))
(body :_id))
(catch Exception e
(handle-ex e))))
(def ^:const default-settings {:settings {:number_of_shards 1}})
(defn create-index
"Create an index with provided mappings"
[index {:keys [mappings] :as spec}]
{:pre [mappings]}
(ok (s/request (connection) {:url [index] :method :put :body (merge default-settings spec)})))
(defn list-indices []
(let [ks [:health :status :index :uuid :pri :rep :docs.count :docs.deleted :store.size :pri.store.size]]
(map #(zipmap ks (filter (comp not empty?) (split % #"\s")))
(split (:body (s/request (connection) {:url [:_cat :indices] :method :get})) #"\n"))))
(defn clear
"Clear index type"
[index t]
(when (exists? index)
(info "Clearing index" index)
(delete index t)))
(defn all
"An all query using match all on provided index this should use scrolling for 10K systems"
[index]
(let [query {:size 10000 :query {:match_all {}}}
{:keys [body]} (s/request (connection) {:url [index :_search] :method :get :body query})]
(mapv (juxt :_id :_source) (get-in body [:hits :hits]))))
(defn delete-by
"Delete by query like {:match {:type \"nmap scan\"}}"
[index t query]
(try
(s/request (connection) {:url [index t :_delete_by_query] :method :post :body {:query query}})
(catch Exception e
(handle-ex e))))
(def conn-prefix (atom :default))
(defn get-es! []
(get! :shared :elasticsearch @conn-prefix))
(defn prefix-switch
"Change es prefix"
[k]
(reset! conn-prefix k))
(def day-format (f/formatter "yyyyMMdd"))
(defn with-day [day idx]
(str idx "-" (f/unparse day-format day)))
(defn index
"index with key prefix and type postfix (since ES 6x onlys single type per index is supported)"
([k t]
(index k t (t/now)))
([k t d]
(with-day d (str (get! k :elasticsearch :index) "-" (name t)))))