-
Notifications
You must be signed in to change notification settings - Fork 16
/
elasticsearch.rb
226 lines (203 loc) · 6.27 KB
/
elasticsearch.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
require "elasticsearch/version"
require 'faraday'
require 'faraday_middleware'
require 'yajl'
require 'time'
module ElasticSearch
class JSONResponse < Faraday::Response::Middleware
def parse(body)
Yajl.load body
end
end
class Error < StandardError
end
def self.get_connection(server)
return unless server
Faraday.new(:url => server) do |builder|
# TODO: add timeout middleware
builder.request :json
# builder.response :logger
builder.response :json, :content_type => /\bjson$/
builder.adapter :excon
end
end
def self.available?
conn = get_connection
resp = conn.get '/'
resp.status == 200
end
# Object to represent an index in elasticsearch
class Index
def initialize(name, server)
@name = name
@conn = ElasticSearch.get_connection(server)
end
# Some helpers for making REST calls to elasticsearch
%w[ get post put delete ].each do |method|
class_eval <<-EOC, __FILE__, __LINE__
def #{method}(*args, &blk)
raise Error, "no connection" unless @conn
resp = @conn.#{method}(*args, &blk)
raise Error, "elasticsearch server is offline or not accepting requests" if resp.status == 0
raise Error, resp.body['error'] if resp.body['error']
@last_resp = resp
resp.body
end
EOC
end
# Force a refresh of this index
#
# This basically tells elasticsearch to flush it's buffers
# but not clear caches (unlike a commit in Solr)
# "Commits" happen automatically and are managed by elasticsearch
#
# Returns a hash, the parsed response body from elasticsearch
def refresh
post "/#{@name}/_refresh"
end
def bulk(data)
return if data.empty?
body = post "/#{@name}/_bulk", data
raise Error, "bulk import got HTTP #{@last_resp.status} response" if @last_resp.status != 200
end
# Grab a bunch of items from this index
#
# type - the type to pull from
# ids - an Array of ids to fetch
#
# Returns a hash, the parsed response body from elasticsearch
def mget(type, ids)
get do |req|
req.url "#{@name}/#{type}/_mget"
req.body = {'ids' => ids}
end
end
# Search this index using a post body
#
# types - the type or types (comma seperated) to search
# options - options hash for this search request
#
# Returns a hash, the parsed response body from elasticsearch
def search(types, options)
get do |req|
req.url "#{@name}/#{types}/_search"
req.body = options
end
end
# Search this index using a query string
#
# types - the type or types (comma seperated) to search
# query - the search query string
# options - options hash for this search request (optional)
#
# Returns a hash, the parsed response body from elasticsearch
def query(types, query, options=nil)
query = {'q' => query} if query.is_a?(String)
get do |req|
req.url "#{@name}/#{types}/_search", query
req.body = options if options
end
end
# Count results using a query string
#
# types - the type or types (comma seperated) to search
# query - the search query string
# options - options hash for this search request (optional)
#
# Returns a hash, the parsed response body from elasticsearch
def count(types, query, options=nil)
query = {'q' => query} if query.is_a?(String)
get do |req|
req.url "#{@name}/#{types}/_count", query
req.body = options if options
end
end
# Add a document to this index
#
# type - the type of this document
# id - the unique identifier for this document
# doc - the document to be indexed
#
# Returns a hash, the parsed response body from elasticsearch
def add(type, id, doc, params={})
doc.each do |key, val|
# make sure dates are in a consistent format for indexing
doc[key] = val.iso8601 if val.respond_to?(:iso8601)
end
put do |req|
req.url "/#{@name}/#{type}/#{id}", params
req.body = doc
end
end
# Remove a document from this index
#
# type - the type of document to be removed
# id - the unique identifier of the document to be removed
#
# Returns a hash, the parsed response body from elasticsearch
def remove(type, id)
delete do |req|
req.url "#{@name}/#{type}/#{id}"
end
end
# Remove all of a type from this index
#
# type - the type of document to be removed
#
# Returns a hash, the parsed response body from elasticsearch
def remove_all(type)
delete do |req|
req.url "#{@name}/#{type}/_query", :q => '*'
end
end
# Remove a collection of documents matched by a query
#
# types - the type or types to query
# options - the search options hash
#
# Returns a hash, the parsed response body from elasticsearch
def remove_by_query(types, options)
delete do |req|
req.url "#{@name}/#{types}/_query"
req.body = options
end
end
# Fetch the mappings defined for this index
#
# types - the type or types to query
#
# Returns a hash, the parsed response body from elasticsearch
def get_mapping(types)
get do |req|
req.url "#{@name}/#{types}/_mapping"
end
end
# Adds mappings to the index
#
# type - the type we're modifying
# mapping - the new mapping to merge into the index
#
# Returns a hash, the parsed response body from elasticsearch
def put_mapping(type, mapping)
put do |req|
req.url "#{@name}/#{type}/_mapping"
req.body = mapping
end
end
# Create a new index in elasticsearch
#
# name - the name of the index to be created
# server - URL of the server to create the index on
# create_options - a hash of index creation options
#
# Returns a new ElasticSearch::Index instance
def self.create(name, server, create_options={})
conn = ElasticSearch.get_connection(server)
conn.put do |req|
req.url "/#{name}"
req.body = create_options
end
new(name, server)
end
end
end