Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Pass responses through transformers.

  • Loading branch information...
commit d1f752a7e27264124af8a9dc1644e5aaa50428d0 1 parent 05334a5
@drewr drewr authored
View
9 src/esperanto/action.clj
@@ -1,12 +1,19 @@
(ns esperanto.action
+ (:require [esperanto.transform.cluster]
+ [esperanto.transform.indices]
+ [esperanto.transform.index])
+ (:use [esperanto.transform :only [transform]])
(:import (org.elasticsearch.action ActionListener)
(org.elasticsearch.indices IndexMissingException)))
+(defn post [response]
+ (transform response))
+
(defn execute
([request]
(future
(try
- (-> request .execute .actionGet)
+ (-> request .execute .actionGet post)
(catch IndexMissingException _
nil))))
([request listener]
View
4 src/esperanto/admin/cluster.clj
@@ -41,6 +41,4 @@
:else (statuses status)))
(defn status [client & indices]
- (-> (health client indices)
- .getStatus
- to-status))
+ (-> (health client indices) :status to-status))
View
4 src/esperanto/admin/indices.clj
@@ -9,11 +9,11 @@
([client idx]
(make-index-creation client idx {}))
([client idx settings]
- (make-index-creation client idx {} {}))
+ (make-index-creation client idx settings {}))
([client idx settings mapping]
(let [req (-> client .admin .indices
(.prepareCreate idx)
- (.setSettings settings))]
+ (.setSettings (json/encode settings)))]
(when mapping
(doseq [[type obj] mapping]
(.addMapping req (name type) (json/encode obj))))
View
11 src/esperanto/index.clj
@@ -31,7 +31,7 @@
(defn index-doc [client idx doc]
(merge doc
- {:id (.getId @(execute (make-index-request client idx doc)))}))
+ @(execute (make-index-request client idx doc))))
(defn index-bulk
([client reqs]
@@ -50,9 +50,12 @@
(filter #(.isFailed %))
(map #(.getFailureMessage %))))
-(defn copy [client1 idx1 client2 idx2]
+(defn copy [client1 idx1 client2 idx2 & {:keys [batchsize]}]
(count
(apply concat
- (for [docs (partition-all 5 (index-seq client1 idx1))]
- (index-bulk client2 idx2 docs)))))
+ (remove :failed?
+ (map :items (for [docs (partition-all
+ (or batchsize 50)
+ (index-seq client1 idx1))]
+ (index-bulk client2 idx2 docs)))))))
View
57 src/esperanto/search.clj
@@ -1,7 +1,8 @@
(ns esperanto.search
(:refer-clojure :exclude [count])
(:require [cheshire.core :as json])
- (:use [esperanto.action :only [execute]])
+ (:use [esperanto.action :only [execute]]
+ [esperanto.transform :only [transform]])
(:import (org.elasticsearch.action.search SearchType)
(org.elasticsearch.client.node NodeClient)
(org.elasticsearch.common.unit TimeValue)
@@ -14,6 +15,12 @@
(.prepareSearch idxs)
(.setQuery (json/generate-string query)))))
+(defn make-scan-request [client idx query batchsize timeout]
+ (-> (make-search-request client idx query)
+ (.setSearchType SearchType/SCAN)
+ (.setSize batchsize)
+ (.setScroll (TimeValue/timeValueMillis timeout))))
+
(defn make-scroll-request [client id timeout]
(-> client
(.prepareSearchScroll id)
@@ -24,34 +31,11 @@
(.prepareCount (into-array idxs))
(.setQuery (QueryBuilders/queryString query))))
-(defn hit->clj [hit]
- (let [src (.sourceAsString hit)]
- (with-meta (merge {:id (.getId hit)}
- (if src
- (json/parse-string src :kw)
- {:ERROR "_source is not enabled"}))
- {:index (.getIndex hit)
- :node (-> hit .getShard .getNodeId)
- :shard (-> hit .getShard .getShardId)
- :sort-vals (seq (.getSortValues hit))})))
-
-(defn search->clj [r]
- (with-meta (map hit->clj (.getHits r))
- {:facets (.getFacets r)
- :shards (.getTotalShards r)
- :shards-bad (.getFailedShards r)
- :shards-good (.getSuccessfulShards r)
- :status (bean (.status r))
- :timed-out? (.isTimedOut r)
- :took (.getTookInMillis r)
- :total (-> r .getHits .getTotalHits)}))
-
(defn search
([client idx]
(search client idx {:match_all {}}))
([client idx query]
- (search->clj
- @(execute (make-search-request client idx query)))))
+ @(execute (make-search-request client idx query))))
(defn searchq
([client idx query]
@@ -62,28 +46,19 @@
([client idx]
(count client idx "*:*"))
([client idx query]
- (-> (make-count-request client [idx] query)
- execute
- deref
- .getCount)))
+ (:count @(execute (make-count-request client [idx] query)))))
(defn scroll [client id timeout]
(lazy-seq
- (when-let [hits (seq
- (-> @(execute (make-scroll-request client id timeout))
- .hits))]
- (cons (map hit->clj hits) (scroll client id timeout)))))
+ (when-let [hits (seq (-> @(execute
+ (make-scroll-request client id timeout))))]
+ (cons hits (scroll client id timeout)))))
(defn scan
([client idx query timeout]
- (scroll client
- (.scrollId @(execute
- (-> (make-search-request client idx query)
- (.setSearchType SearchType/SCAN)
- (.setSize 50)
- (.setScroll
- (TimeValue/timeValueMillis timeout)))))
- timeout)))
+ (let [id (-> @(execute (make-scan-request client idx query 100 timeout))
+ meta :scroll-id)]
+ (scroll client id timeout))))
(defn index-seq
([client idx]
View
5 src/esperanto/transform.clj
@@ -0,0 +1,5 @@
+(ns esperanto.transform)
+
+(defprotocol Transformer
+ (transform [this] "More than meets the eye!"))
+
View
54 src/esperanto/transform/cluster.clj
@@ -0,0 +1,54 @@
+(ns esperanto.transform.cluster
+ (:use [esperanto.transform :only [Transformer]])
+ (:import (org.elasticsearch.action.admin.cluster.health
+ ClusterHealthResponse)))
+
+(defn transform-health
+ "=== public org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse ===
+ [ 0] <init> (String,List)
+ [ 1] activePrimaryShards : int ()
+ [ 2] activeShards : int ()
+ [ 3] allValidationFailures : List ()
+ [ 4] clusterName : String ()
+ [ 5] equals : boolean (Object)
+ [ 6] getActivePrimaryShards : int ()
+ [ 7] getActiveShards : int ()
+ [ 8] getAllValidationFailures : List ()
+ [ 9] getClass : Class ()
+ [10] getClusterName : String ()
+ [11] getIndices : Map ()
+ [12] getInitializingShards : int ()
+ [13] getNumberOfDataNodes : int ()
+ [14] getNumberOfNodes : int ()
+ [15] getRelocatingShards : int ()
+ [16] getStatus : ClusterHealthStatus ()
+ [17] getUnassignedShards : int ()
+ [18] getValidationFailures : List ()
+ [19] hashCode : int ()
+ [20] indices : Map ()
+ [21] initializingShards : int ()
+ [22] isTimedOut : boolean ()
+ [23] iterator : Iterator ()
+ [24] notify : void ()
+ [25] notifyAll : void ()
+ [26] numberOfDataNodes : int ()
+ [27] numberOfNodes : int ()
+ [28] readFrom : void (StreamInput)
+ [29] relocatingShards : int ()
+ [30] status : ClusterHealthStatus ()
+ [31] timedOut : boolean ()
+ [32] toString : String ()
+ [33] unassignedShards : int ()
+ [34] validationFailures : List ()
+ [35] wait : void ()
+ [36] wait : void (long)
+ [37] wait : void (long,int)
+ [38] writeTo : void (StreamOutput)
+ "
+ [obj]
+ (bean obj))
+
+(extend ClusterHealthResponse
+ Transformer
+ {:transform transform-health})
+
View
282 src/esperanto/transform/index.clj
@@ -0,0 +1,282 @@
+(ns esperanto.transform.index
+ (:require [cheshire.core :as json])
+ (:use [esperanto.transform :only [Transformer]])
+ (:import (org.elasticsearch.action.bulk BulkResponse BulkItemResponse)
+ (org.elasticsearch.action.count CountResponse)
+ (org.elasticsearch.action.index IndexResponse)
+ (org.elasticsearch.search.internal InternalSearchHit)
+ (org.elasticsearch.action.search SearchResponse)))
+
+(defn transform-bulk
+ "=== public org.elasticsearch.action.bulk.BulkResponse ===
+ [ 0] <init> (BulkItemResponse[],long)
+ [ 1] buildFailureMessage : String ()
+ [ 2] equals : boolean (Object)
+ [ 3] getClass : Class ()
+ [ 4] getTook : TimeValue ()
+ [ 5] getTookInMillis : long ()
+ [ 6] hasFailures : boolean ()
+ [ 7] hashCode : int ()
+ [ 8] items : BulkItemResponse[] ()
+ [ 9] iterator : Iterator ()
+ [10] notify : void ()
+ [11] notifyAll : void ()
+ [12] readFrom : void (StreamInput)
+ [13] toString : String ()
+ [14] took : TimeValue ()
+ [15] tookInMillis : long ()
+ [16] wait : void ()
+ [17] wait : void (long)
+ [18] wait : void (long,int)
+ [19] writeTo : void (StreamOutput)
+ "
+ [obj]
+ (merge
+ {:failures? (.hasFailures obj)
+ :items (map esperanto.transform/transform (.items obj))}
+ (bean obj)))
+
+(extend BulkResponse
+ Transformer
+ {:transform transform-bulk})
+
+(defn transform-bulk-item
+ "=== public org.elasticsearch.action.bulk.BulkItemResponse ===
+ [ 0] static readBulkItem : BulkItemResponse (StreamInput)
+ [ 1] <init> (int,String,ActionResponse)
+ [ 2] <init> (int,String,Failure)
+ [ 3] equals : boolean (Object)
+ [ 4] failed : boolean ()
+ [ 5] failure : Failure ()
+ [ 6] failureMessage : String ()
+ [ 7] getClass : Class ()
+ [ 8] getFailure : Failure ()
+ [ 9] getFailureMessage : String ()
+ [10] getId : String ()
+ [11] getIndex : String ()
+ [12] getType : String ()
+ [13] hashCode : int ()
+ [14] id : String ()
+ [15] index : String ()
+ [16] isFailed : boolean ()
+ [17] itemId : int ()
+ [18] notify : void ()
+ [19] notifyAll : void ()
+ [20] opType : String ()
+ [21] readFrom : void (StreamInput)
+ [22] response : ActionResponse ()
+ [23] toString : String ()
+ [24] type : String ()
+ [25] version : long ()
+ [26] wait : void ()
+ [27] wait : void (long)
+ [28] wait : void (long,int)
+ [29] writeTo : void (StreamOutput)
+ "
+
+ [obj]
+ (merge
+ {:op-type (.opType obj)
+ :failed? (.isFailed obj)}
+ (bean obj)))
+
+(extend BulkItemResponse
+ Transformer
+ {:transform transform-bulk-item})
+
+(defn transform-count
+ "=== public org.elasticsearch.action.count.CountResponse ===
+ [ 0] count : long ()
+ [ 1] equals : boolean (Object)
+ [ 2] failedShards : int ()
+ [ 3] getClass : Class ()
+ [ 4] getCount : long ()
+ [ 5] getFailedShards : int ()
+ [ 6] getShardFailures : List ()
+ [ 7] getSuccessfulShards : int ()
+ [ 8] getTotalShards : int ()
+ [ 9] hashCode : int ()
+ [10] notify : void ()
+ [11] notifyAll : void ()
+ [12] readFrom : void (StreamInput)
+ [13] shardFailures : List ()
+ [14] successfulShards : int ()
+ [15] toString : String ()
+ [16] totalShards : int ()
+ [17] wait : void ()
+ [18] wait : void (long)
+ [19] wait : void (long,int)
+ [20] writeTo : void (StreamOutput)
+ "
+ [obj]
+ (bean obj))
+
+(extend CountResponse
+ Transformer
+ {:transform transform-count})
+
+(defn transform-hit
+ "=== public org.elasticsearch.search.internal.InternalSearchHit ===
+ [ 0] static EMPTY_PARAMS : Params
+ [ 1] static readSearchHit : InternalSearchHit (StreamInput,StreamContext)
+ [ 2] <init> (int,String,String,byte[],Map)
+ [ 3] docId : int ()
+ [ 4] equals : boolean (Object)
+ [ 5] explanation : Explanation ()
+ [ 6] explanation : void (Explanation)
+ [ 7] field : SearchHitField (String)
+ [ 8] fields : Map ()
+ [ 9] fields : void (Map)
+ [10] fieldsOrNull : Map ()
+ [11] getClass : Class ()
+ [12] getExplanation : Explanation ()
+ [13] getFields : Map ()
+ [14] getHighlightFields : Map ()
+ [15] getId : String ()
+ [16] getIndex : String ()
+ [17] getMatchedFilters : String[] ()
+ [18] getScore : float ()
+ [19] getShard : SearchShardTarget ()
+ [20] getSortValues : Object[] ()
+ [21] getSource : Map ()
+ [22] getType : String ()
+ [23] getVersion : long ()
+ [24] hashCode : int ()
+ [25] highlightFields : Map ()
+ [26] highlightFields : void (Map)
+ [27] id : String ()
+ [28] index : String ()
+ [29] isSourceEmpty : boolean ()
+ [30] iterator : Iterator ()
+ [31] matchedFilters : String[] ()
+ [32] matchedFilters : void (String[])
+ [33] notify : void ()
+ [34] notifyAll : void ()
+ [35] readFrom : void (StreamInput)
+ [36] readFrom : void (StreamInput,StreamContext)
+ [37] score : float ()
+ [38] score : void (float)
+ [39] shard : SearchShardTarget ()
+ [40] shard : void (SearchShardTarget)
+ [41] shardTarget : void (SearchShardTarget)
+ [42] sortValues : Object[] ()
+ [43] sortValues : void (Object[])
+ [44] source : byte[] ()
+ [45] sourceAsMap : Map ()
+ [46] sourceAsString : String ()
+ [47] toString : String ()
+ [48] toXContent : XContentBuilder (XContentBuilder,Params)
+ [49] type : String ()
+ [50] version : long ()
+ [51] version : void (long)
+ [52] wait : void ()
+ [53] wait : void (long)
+ [54] wait : void (long,int)
+ [55] writeTo : void (StreamOutput)
+ [56] writeTo : void (StreamOutput,StreamContext)
+ "
+ [hit]
+ (let [src (.sourceAsString hit)]
+ (with-meta (merge {:id (.getId hit)}
+ (if src
+ (json/parse-string src :kw)
+ {:ERROR "_source is not enabled"}))
+ {:index (.getIndex hit)
+ :node (-> hit .getShard .getNodeId)
+ :shard (-> hit .getShard .getShardId)
+ :sort-vals (seq (.getSortValues hit))})))
+
+(extend InternalSearchHit
+ Transformer
+ {:transform transform-hit})
+
+(defn transform-index
+ "=== public org.elasticsearch.action.index.IndexResponse ===
+ [ 0] <init> ()
+ [ 1] <init> (String,String,String,long)
+ [ 2] equals : boolean (Object)
+ [ 3] getClass : Class ()
+ [ 4] getId : String ()
+ [ 5] getIndex : String ()
+ [ 6] getMatches : List ()
+ [ 7] getType : String ()
+ [ 8] getVersion : long ()
+ [ 9] hashCode : int ()
+ [10] id : String ()
+ [11] index : String ()
+ [12] matches : List ()
+ [13] matches : void (List)
+ [14] notify : void ()
+ [15] notifyAll : void ()
+ [16] readFrom : void (StreamInput)
+ [17] toString : String ()
+ [18] type : String ()
+ [19] version : long ()
+ [20] wait : void ()
+ [21] wait : void (long)
+ [22] wait : void (long,int)
+ [23] writeTo : void (StreamOutput)
+ "
+ [obj]
+ (bean obj))
+
+(extend IndexResponse
+ Transformer
+ {:transform transform-index})
+
+(defn transform-search
+ "=== public org.elasticsearch.action.search.SearchResponse ===
+ [ 0] static EMPTY_PARAMS : Params
+ [ 1] static readSearchResponse : SearchResponse (StreamInput)
+ [ 2] <init> (InternalSearchResponse,String,int,int,long,ShardSearchFailure[])
+ [ 3] equals : boolean (Object)
+ [ 4] facets : Facets ()
+ [ 5] failedShards : int ()
+ [ 6] getClass : Class ()
+ [ 7] getFacets : Facets ()
+ [ 8] getFailedShards : int ()
+ [ 9] getHits : SearchHits ()
+ [10] getScrollId : String ()
+ [11] getShardFailures : ShardSearchFailure[] ()
+ [12] getSuccessfulShards : int ()
+ [13] getTook : TimeValue ()
+ [14] getTookInMillis : long ()
+ [15] getTotalShards : int ()
+ [16] hashCode : int ()
+ [17] hits : SearchHits ()
+ [18] isTimedOut : boolean ()
+ [19] notify : void ()
+ [20] notifyAll : void ()
+ [21] readFrom : void (StreamInput)
+ [22] scrollId : String ()
+ [23] shardFailures : ShardSearchFailure[] ()
+ [24] status : RestStatus ()
+ [25] successfulShards : int ()
+ [26] timedOut : boolean ()
+ [27] toString : String ()
+ [28] toXContent : XContentBuilder (XContentBuilder,Params)
+ [29] took : TimeValue ()
+ [30] tookInMillis : long ()
+ [31] totalShards : int ()
+ [32] wait : void ()
+ [33] wait : void (long)
+ [34] wait : void (long,int)
+ [35] writeTo : void (StreamOutput)
+ "
+ [obj]
+ (with-meta (map esperanto.transform/transform
+ (.getHits obj))
+ {:facets (.getFacets obj)
+ :scroll-id (.getScrollId obj)
+ :shards (.getTotalShards obj)
+ :shards-bad (.getFailedShards obj)
+ :shards-good (.getSuccessfulShards obj)
+ :status (bean (.status obj))
+ :timed-out? (.isTimedOut obj)
+ :took (.getTookInMillis obj)
+ :total (-> obj .getHits .getTotalHits)}))
+
+(extend SearchResponse
+ Transformer
+ {:transform transform-search})
+
View
90 src/esperanto/transform/indices.clj
@@ -0,0 +1,90 @@
+(ns esperanto.transform.indices
+ (:use [esperanto.transform :only [Transformer]])
+ (:import (org.elasticsearch.action.admin.indices.create CreateIndexResponse)
+ (org.elasticsearch.action.admin.indices.delete DeleteIndexResponse)
+ (org.elasticsearch.action.admin.indices.refresh RefreshResponse)
+ (org.elasticsearch.action.admin.indices.status
+ IndicesStatusResponse)))
+
+(defn transform-create
+ "=== public org.elasticsearch.action.admin.indices.create.CreateIndexResponse ===
+ [ 0] acknowledged : boolean ()
+ [ 1] equals : boolean (Object)
+ [ 2] getAcknowledged : boolean ()
+ [ 3] getClass : Class ()
+ [ 4] hashCode : int ()
+ [ 5] notify : void ()
+ [ 6] notifyAll : void ()
+ [ 7] readFrom : void (StreamInput)
+ [ 8] toString : String ()
+ [ 9] wait : void ()
+ [10] wait : void (long)
+ [11] wait : void (long,int)
+ [12] writeTo : void (StreamOutput)
+ "
+ [obj]
+ (bean obj))
+
+(defn transform-delete
+ "=== public org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse ===
+ [ 0] acknowledged : boolean ()
+ [ 1] equals : boolean (Object)
+ [ 2] getAcknowledged : boolean ()
+ [ 3] getClass : Class ()
+ [ 4] hashCode : int ()
+ [ 5] notify : void ()
+ [ 6] notifyAll : void ()
+ [ 7] readFrom : void (StreamInput)
+ [ 8] toString : String ()
+ [ 9] wait : void ()
+ [10] wait : void (long)
+ [11] wait : void (long,int)
+ [12] writeTo : void (StreamOutput)
+ "
+ [obj]
+ (bean obj))
+
+(defn transform-refresh
+ "=== public org.elasticsearch.action.admin.indices.refresh.RefreshResponse ===
+ [ 0] equals : boolean (Object)
+ [ 1] failedShards : int ()
+ [ 2] getClass : Class ()
+ [ 3] getFailedShards : int ()
+ [ 4] getShardFailures : List ()
+ [ 5] getSuccessfulShards : int ()
+ [ 6] getTotalShards : int ()
+ [ 7] hashCode : int ()
+ [ 8] notify : void ()
+ [ 9] notifyAll : void ()
+ [10] readFrom : void (StreamInput)
+ [11] shardFailures : List ()
+ [12] successfulShards : int ()
+ [13] toString : String ()
+ [14] totalShards : int ()
+ [15] wait : void ()
+ [16] wait : void (long)
+ [17] wait : void (long,int)
+ [18] writeTo : void (StreamOutput)
+ "
+ [obj]
+ (bean obj))
+
+(defn transform-status [obj]
+ {:transformed obj})
+
+(extend CreateIndexResponse
+ Transformer
+ {:transform transform-create})
+
+(extend DeleteIndexResponse
+ Transformer
+ {:transform transform-delete})
+
+(extend IndicesStatusResponse
+ Transformer
+ {:transform transform-status})
+
+(extend RefreshResponse
+ Transformer
+ {:transform transform-refresh})
+
View
16 test/esperanto/test/client.clj
@@ -4,7 +4,7 @@
[esperanto.node :only [make-test-tcp-node node-fixture
rand-cluster-name]]
[esperanto.admin.indices :only [refresh index-fixture create]]
- [esperanto.admin.cluster :only [wait-for-green status]]
+ [esperanto.admin.cluster :only [wait-for-green wait-for-yellow status]]
[esperanto.index :only [index-doc]]))
(def cluster (rand-cluster-name))
@@ -21,16 +21,12 @@
:host "localhost"
:port port})]
(try
- (is (not (.isTimedOut masterwait)))
- (is (not (.isTimedOut (wait-for-green client [] 5000))))
- (is
- (.getAcknowledged
- (create client index
- (cheshire.core/generate-string
- {:index.number_of_replicas "0"}))))
- (is (not (.isTimedOut (wait-for-green client [index] 5000))))
+ (is (not (:timedOut masterwait)))
+ (is (not (:timedOut (wait-for-green client [] 5000))))
+ (is (:acknowledged (create client index {:number_of_replicas 0})))
+ (is (not (:timedOut (wait-for-green client [index] 3000))))
(index-doc client index doc)
(refresh client index)
- (is (not (.isTimedOut (wait-for-green client [index] 5000))))
+ (is (not (:timedOut (wait-for-green client [index] 3000))))
(finally
(.stop master)))))
View
6 test/esperanto/test/search.clj
@@ -55,9 +55,9 @@
_ (refresh client index)
sresp (searchq client index "quick")
timeout 500]
- (is (< (.getTookInMillis resp) timeout)
+ (is (< (:tookInMillis resp) timeout)
(format "*** bulk index took longer than %dms" timeout))
- (is (not (.hasFailures resp)))
+ (is (not (:failures? resp)))
(is (= 100 (-> sresp meta :total)))))
(deftest t-index-seq
@@ -68,7 +68,7 @@
_ (refresh client index)
sresp (search client index)
xs (index-seq client index)]
- (is (not (.hasFailures bulk)))
+ (is (not (:failures? bulk)))
;; How many docs does ES think the index has?
(is (= ct (count client index)))
;; How many docs does a search for all docs return?
Please sign in to comment.
Something went wrong with that request. Please try again.