Permalink
Browse files

[TMP] SCAN DRAFT

  • Loading branch information...
1 parent 8ab5774 commit 7d637074381cccbc88d9c09aa526710bd8187884 @karmi committed Mar 30, 2012
View
@@ -6,6 +6,7 @@
require 'active_support/core_ext/object/to_param'
require 'active_support/core_ext/object/to_query'
+require 'active_support/core_ext/hash/except.rb'
# Ruby 1.8 compatibility
require 'tire/rubyext/ruby_1_8' if defined?(RUBY_VERSION) && RUBY_VERSION < '1.9'
@@ -23,6 +24,7 @@
require 'tire/search/facet'
require 'tire/search/filter'
require 'tire/search/highlight'
+require 'tire/search/scan'
require 'tire/results/pagination'
require 'tire/results/collection'
require 'tire/results/item'
View
@@ -31,5 +31,9 @@ def index(name, &block)
Index.new(name, &block)
end
+ def scan(names, options={}, &block)
+ Search::Scan.new(names, options, &block)
+ end
+
end
end
View
@@ -41,6 +41,12 @@ def mapping
MultiJson.decode(@response.body)[@name]
end
+ def settings
+ @response = Configuration.client.get("#{Configuration.url}/#{@name}/_settings")
+ MultiJson.decode(@response.body)[@name]['settings']
+ end
+
+
def store(*args)
document, options = args
type = get_type_from_document(document)
@@ -124,6 +130,16 @@ def import(klass_or_collection, options={})
end
end
+ def reindex(name, options={}, &block)
+ new_index = Index.new(name)
+ new_index.create(options) unless Index.new(name).exists?
+ Search::Scan.new(self.name, &block).each do |results|
+ new_index.bulk_store results.map do |d|
+ d.to_hash.except(:type, :_index, :_explanation, :_score, :_version, :highlight, :sort)
+ end
+ end
+ end
+
def remove(*args)
if args.size > 1
type, document = args
@@ -272,12 +288,12 @@ def get_type_from_document(document, options={})
type = case
when document.respond_to?(:document_type)
document.document_type
- when document.is_a?(Hash)
- document[:_type] || document['_type'] || document[:type] || document['type']
when document.respond_to?(:_type)
document._type
when document.respond_to?(:type) && document.type != document.class
document.type
+ when document.is_a?(Hash)
+ document[:_type] || document['_type'] || document[:type] || document['type']
end
$VERBOSE = old_verbose
@@ -304,7 +320,8 @@ def convert_document_to_json(document)
"please pass an object which responds to `to_indexed_json` or a plain Hash."
document
when document.respond_to?(:to_indexed_json) then document.to_indexed_json
- else raise ArgumentError, "Please pass a JSON string or object with a 'to_indexed_json' method"
+ else raise ArgumentError, "Please pass a JSON string or object with a 'to_indexed_json' method," +
+ "'#{document.class}' given."
end
end
View
@@ -32,7 +32,11 @@ def [](key)
end
def id
- @attributes[:_id] || @attributes[:id]
+ @attributes[:_id] || @attributes[:id]
+ end
+
+ def type
+ @attributes[:_type] || @attributes[:type]
end
def persisted?
View
@@ -1,6 +1,6 @@
class Hash
- def to_json
+ def to_json(options=nil)
MultiJson.encode(self)
end unless respond_to?(:to_json)
View
@@ -0,0 +1,74 @@
+module Tire
+ module Search
+
+ # http://www.elasticsearch.org/guide/reference/api/search/search-type.html
+ # http://www.elasticsearch.org/guide/reference/api/search/scroll.html
+ #
+ class Scan
+ include Enumerable
+
+ attr_reader :indices, :options, :search, :total, :seen
+
+ def initialize(indices=nil, options={}, &block)
+ @indices = Array(indices)
+ @options = options.update(:search_type => 'scan', :scroll => '10m')
+ @seen = 0
+ @search = Search.new(@indices, @options, &block)
+ end
+
+ def url; Configuration.url + "/_search/scroll"; end
+ def params; @options.empty? ? '' : '?' + @options.to_param; end
+ def results; @results || (__perform; @results); end
+ def response; @response || (__perform; @response); end
+ def json; @json || (__perform; @json); end
+
+ def scroll_id
+ @scroll_id ||= @search.perform.json['_scroll_id']
+ end
+
+ def each
+ until results.empty?
+ yield results.results
+ __perform
+ end
+ end
+
+ def each_document
+ until results.empty?
+ results.each { |item| yield item }
+ __perform
+ end
+ end
+
+ def __perform
+ @response = Configuration.client.get [url, params].join, scroll_id
+ @json = MultiJson.decode @response.body
+ @results = Results::Collection.new @json, @options
+ @total = @json['hits']['total']
+ @seen += @results.size
+ @scroll_id = @json['_scroll_id']
+ return self
+ ensure
+ __logged
+ end
+
+ def to_a; results; end; alias :to_ary :to_a
+ def to_curl; %Q|curl -X GET "#{url}?pretty=true" -d '#{@scroll_id}'|; end
+
+ def __logged(error=nil)
+ if Configuration.logger
+
+ Configuration.logger.log_request 'scroll', nil, to_curl
+
+ took = @json['took'] rescue nil
+ code = @response.code rescue nil
+ body = "#{@seen}/#{@total}" rescue nil
+
+ Configuration.logger.log_response code || 'N/A', took || 'N/A', body
+ end
+ end
+
+ end
+
+ end
+end
View
@@ -141,6 +141,26 @@ class IndexTest < Test::Unit::TestCase
end
+ context "settings" do
+
+ should "return index settings" do
+ json =<<-JSON
+ {
+ "dummy" : {
+ "settings" : {
+ "index.number_of_shards" : "20",
+ "index.number_of_replicas" : "0"
+ }
+ }
+ }
+ JSON
+ Configuration.client.stubs(:get).returns(mock_response(json))
+
+ assert_equal '20', @index.settings['index.number_of_shards']
+ end
+
+ end
+
context "when storing" do
should "set type from Hash :type property" do
@@ -0,0 +1,29 @@
+require 'test_helper'
+
+module Tire
+ module Search
+ class ScanTest < Test::Unit::TestCase
+
+ context "Scan" do
+ setup { Configuration.reset }
+
+ should "initialize the search object" do
+ Search.expects(:new).with { |index| index == ['index1', 'index2'] }
+ Scan.new(['index1', 'index2'])
+ # Scan.new('webexpo')
+ end
+
+ should "fetch the initial scroll ID" do
+ s = Scan.new('webexpo')
+ # p s.scroll_id
+ s.each do |d|
+ p d
+ p '-'*100
+ end
+ end
+
+ end
+
+ end
+ end
+end
View
@@ -64,6 +64,28 @@ class TireTest < Test::Unit::TestCase
end
+ context "when scanning an index" do
+ should "allow to leave out a block" do
+ Search::Scan.expects(:new).with { |index| index == 'dummy' }
+
+ Tire.scan('dummy')
+ end
+
+ should "allow to pass the query as a block" do
+ Search::Scan.expects(:new).with { |index| index == 'dummy' }
+
+ Tire.scan('dummy') { query { string 'foo' } }
+ end
+
+ should "allow to pass the query as a hash" do
+ payload = { :query => { :query_string => { :query => 'foo' } } }
+ Search::Scan.expects(:new).with('dummy', payload)
+
+ Tire.scan 'dummy', payload
+ end
+
+ end
+
end
context "utils" do

0 comments on commit 7d63707

Please sign in to comment.