From d1d189a52c301415055f61568aaa649e0349a3e7 Mon Sep 17 00:00:00 2001 From: Moritz Onken Date: Sat, 26 Apr 2014 13:32:10 +0800 Subject: [PATCH] use Search::Elasticsearch, tested with 1.1.1 --- dist.ini | 2 +- lib/ElasticSearchX/Model/Bulk.pm | 36 ++++++++--------- lib/ElasticSearchX/Model/Document/Role.pm | 5 ++- lib/ElasticSearchX/Model/Document/Set.pm | 40 ++++++++++--------- .../Model/Document/Trait/Attribute.pm | 9 ++++- .../Model/Document/Trait/Class.pm | 15 +++++++ lib/ElasticSearchX/Model/Document/Types.pm | 1 - lib/ElasticSearchX/Model/Index.pm | 4 +- lib/ElasticSearchX/Model/Role.pm | 29 +++++++------- lib/ElasticSearchX/Model/Scroll.pm | 15 ++++--- t/bulk.t | 4 +- t/es/delete.t | 1 + t/es/fields.t | 2 +- t/es/version.t | 2 +- t/lib/MyModel.pm | 7 +++- t/model/es_version.t | 5 ++- t/model/model.t | 2 - t/tutorial.t | 4 +- 18 files changed, 104 insertions(+), 79 deletions(-) diff --git a/dist.ini b/dist.ini index 16fabf7..15dad70 100644 --- a/dist.ini +++ b/dist.ini @@ -22,7 +22,7 @@ DateTime = 0 DateTime::Format::Epoch::Unix = 0 DateTime::Format::ISO8601 = 0 Digest::SHA1 = 0 -ElasticSearch = 0.65 +Search::Elasticsearch = 1.11 JSON = 0 List::MoreUtils = 0 List::Util = 0 diff --git a/lib/ElasticSearchX/Model/Bulk.pm b/lib/ElasticSearchX/Model/Bulk.pm index 414289d..1d118ab 100644 --- a/lib/ElasticSearchX/Model/Bulk.pm +++ b/lib/ElasticSearchX/Model/Bulk.pm @@ -1,33 +1,42 @@ package ElasticSearchX::Model::Bulk; +use Search::Elasticsearch::Bulk; use Moose; has stash => ( is => 'ro', - traits => ['Array'], - handles => { add => 'push', stash_size => 'count' }, - default => sub { [] }, + isa => "Search::Elasticsearch::Bulk", + handles => { stash_size => '_buffer_count', commit => "flush" }, + lazy_build => 1, ); has size => ( is => 'ro', isa => 'Int', default => 100 ); has es => ( is => 'ro' ); +sub _build_stash { + my $self = shift; + $self->es->bulk_helper( max_count => $self->size ); +} + +sub add { + my ($self, $action, $payload) = (shift, %{$_[0]}); + $payload->{source} = delete $payload->{body}; + $self->stash->add_action($action => $payload); +} + sub update { my ( $self, $doc, $qs ) = @_; $self->add( { index => ref $doc eq 'HASH' ? $doc : { $doc->_put( $doc->_update($qs) ) } } ); - $self->commit if ( $self->stash_size > $self->size ); return $self; } sub create { my ( $self, $doc, $qs ) = @_; $self->add( { create => ref $doc eq 'HASH' ? $doc : { $doc->_put($qs) } } ); - $self->commit if ( $self->stash_size > $self->size ); return $self; } sub put { my ( $self, $doc, $qs ) = @_; $self->add( { index => ref $doc eq 'HASH' ? $doc : { $doc->_put, %{ $qs || {} } } } ); - $self->commit if ( $self->stash_size > $self->size ); return $self; } @@ -42,21 +51,12 @@ sub delete { } } ); - $self->commit if ( $self->stash_size > $self->size ); return $self; } -sub commit { - my $self = shift; - return unless ( $self->stash_size ); - my $result = $self->es->bulk( $self->stash ); - $self->clear; - return $result; -} - sub clear { my $self = shift; - @{ $self->stash } = (); + $self->stash->clear_buffer; return $self; } @@ -80,7 +80,7 @@ __END__ =head1 DESCRIPTION -This class is a wrapper around L which adds +This class is a wrapper around L which adds some convenience. By specifiying a L you set the maximum number of documents that are processed in one request. You can either L or L documents. Once the C<$bulk> object is out @@ -104,7 +104,7 @@ gets out of scope or if you call L explicitly. =head2 es -The L object. +The L object. =head1 METHODS diff --git a/lib/ElasticSearchX/Model/Document/Role.pm b/lib/ElasticSearchX/Model/Document/Role.pm index 3095c6e..fd3b714 100644 --- a/lib/ElasticSearchX/Model/Document/Role.pm +++ b/lib/ElasticSearchX/Model/Document/Role.pm @@ -78,7 +78,8 @@ sub _create { sub put { my ( $self, $qs ) = @_; - my $return = $self->index->model->es->index( $self->_put($qs) ); + my $method = $qs && ref $qs eq "HASH" && (delete $qs->{create}) ? "create" : "index"; + my $return = $self->index->model->es->$method( $self->_put($qs) ); $self->_clear_loaded_attributes; my $id = $self->meta->get_id_attribute; $id->set_value( $self, $return->{_id} ) if ($id); @@ -98,7 +99,7 @@ sub _put { index => $self->index->name, type => $self->meta->short_name, $id ? ( id => $id ) : (), - data => $data, + body => $data, $parent ? ( parent => $parent->get_value($self) ) : (), %$qs, ); diff --git a/lib/ElasticSearchX/Model/Document/Set.pm b/lib/ElasticSearchX/Model/Document/Set.pm index 80314ac..27b9693 100644 --- a/lib/ElasticSearchX/Model/Document/Set.pm +++ b/lib/ElasticSearchX/Model/Document/Set.pm @@ -35,9 +35,11 @@ sub add_sort { push( @{ $_[0]->sort }, $_[1] ); return $_[0]; } sub add_field { push( @{ $_[0]->fields }, $_[1] ); return $_[0]; } -has query_type => +has search_type => ( isa => QueryType, is => 'rw', traits => [qw(ChainedClone)] ); +sub query_type { shift->search_type(@_) } + has mixin => ( is => 'ro', isa => 'HashRef', traits => [qw(ChainedClone)] ); has inflate => @@ -60,7 +62,7 @@ sub _build_qs { # we only want to set qs if they are not the default $qs->{refresh} = 1 if ( $self->_refresh ); - $qs->{query_type} = $self->query_type if ( $self->query_type ); + $qs->{search_type} = $self->search_type if $self->search_type; return $qs; } @@ -68,9 +70,8 @@ sub _build_query { my $self = shift; my $query = { query => $self->query ? $self->query : { match_all => {} } }; - $query->{filter} = $self->filter if ( $self->filter ); - $query = { query => { filtered => $query } } - if ( $self->filter && !$self->query ); + $query = { query => { filtered => { filter => $self->filter, query => $query->{query} } } } + if $self->filter; my $q = { %$query, $self->size ? ( size => $self->size ) : (), @@ -137,7 +138,7 @@ sub get { type => $type, id => $id, $self->fields ? ( fields => $self->fields ) : (), - ignore_missing => 1, + ignore => [404], %{ $qs || {} }, ); return undef unless ($res); @@ -148,11 +149,12 @@ sub all { my ( $self, $qs ) = @_; $qs = $self->_build_qs($qs); my ( $index, $type ) = ( $self->index->name, $self->type->short_name ); - my $res = $self->es->transport->request( - { method => 'POST', - cmd => "/$index/$type/_search", - data => $self->_build_query, - qs => { version => 1, %{ $qs || {} } }, + my $res = $self->es->search( + { index => $index, + type => $type, + body => $self->_build_query, + version => 1, + %{ $qs || {} }, } ); return $res unless ( $self->inflate ); @@ -173,14 +175,14 @@ sub count { my ( $self, $qs ) = @_; $qs = $self->_build_qs($qs); my ( $index, $type ) = ( $self->index->name, $self->type->short_name ); - my $res = $self->es->transport->request( - { method => 'POST', - cmd => "/$index/$type/_search", - data => { %{ $self->_build_query }, size => 0 }, - qs => $qs, + my $res = $self->es->count( + { index => $index, + type => $type, + body => { %{ $self->_build_query } }, + %$qs, } ); - return $res->{hits}->{total}; + return $res->{count}; } sub delete { @@ -190,7 +192,7 @@ sub delete { return $self->es->delete_by_query( index => $self->index->name, type => $self->type->short_name, - query => $query->{filter} ? { filtered => $query } : $query->{query}, + body => $query, %$qs, ); } @@ -279,7 +281,7 @@ build a C query, which performs far better. =head2 sort -=head2 query_type +=head2 search_type These attributes are passed directly to the ElasticSearch search request. diff --git a/lib/ElasticSearchX/Model/Document/Trait/Attribute.pm b/lib/ElasticSearchX/Model/Document/Trait/Attribute.pm index 1aa68b6..3b4979c 100644 --- a/lib/ElasticSearchX/Model/Document/Trait/Attribute.pm +++ b/lib/ElasticSearchX/Model/Document/Trait/Attribute.pm @@ -27,6 +27,14 @@ has property => ( is => 'ro', isa => 'Bool', default => 1 ); has query_property => ( is => 'ro', isa => 'Bool', default => 0 ); has field_name => ( is => 'ro', isa => 'Str', lazy => 1, default => sub { shift->name } ); +has isa_arrayref => ( is => 'ro', isa => 'Bool', builder => '_build_isa_arrayref' ); + +sub _build_isa_arrayref { + my $self = shift; + my $tc = $self->type_constraint; + return 0 unless $tc; + return $tc->is_a_type_of("ArrayRef"); +} sub build_property { my $self = shift; @@ -42,7 +50,6 @@ before _process_options => sub { %$options = ( builder => 'build_id', lazy => 1, %$options ) if ( $options->{id} && ref $options->{id} eq 'ARRAY' ); - #$options->{required} = 1 if($options->{id}); $options->{traits} ||= []; push( @{ $options->{traits} }, diff --git a/lib/ElasticSearchX/Model/Document/Trait/Class.pm b/lib/ElasticSearchX/Model/Document/Trait/Class.pm index 53ede38..fb6c2db 100644 --- a/lib/ElasticSearchX/Model/Document/Trait/Class.pm +++ b/lib/ElasticSearchX/Model/Document/Trait/Class.pm @@ -12,6 +12,8 @@ has set_class => ( is => 'ro', builder => '_build_set_class', lazy => 1 ); has short_name => ( is => 'ro', builder => '_build_short_name', lazy => 1 ); has _all_properties => ( is => 'ro', lazy => 1, builder => '_build_all_properties' ); +has _isa_arrayref => + ( is => 'ro', lazy => 1, builder => '_build_isa_arrayref' ); has _field_alias => ( is => 'ro', @@ -142,6 +144,13 @@ sub _build_all_properties { ]; } +sub _build_isa_arrayref { + return { + map { $_->name => $_->isa_arrayref } + @{shift->_all_properties} + }; +} + sub get_data { my ( $self, $instance ) = @_; return { @@ -178,7 +187,13 @@ sub inflate_result { #my $id = $self->get_id_attribute; my $parent = $self->get_parent_attribute; + my $arrays = $self->_isa_arrayref; my $fields = { %{ $res->{_source} || {} }, %{ $res->{fields} || {} } }; + $fields = { map { + my $is_array = ref $fields->{$_} eq "ARRAY"; + $arrays->{$_} && !$is_array ? ( $_ => [$fields->{$_}] ) : + !$arrays->{$_} && $is_array ? ( $_ => $fields->{$_}->[0] ) : ( $_ => $fields->{$_} ); + } keys %$fields }; my $map = $self->_reverse_field_alias; map { $fields->{ $map->{$_} } diff --git a/lib/ElasticSearchX/Model/Document/Types.pm b/lib/ElasticSearchX/Model/Document/Types.pm index ee69549..b2096ea 100644 --- a/lib/ElasticSearchX/Model/Document/Types.pm +++ b/lib/ElasticSearchX/Model/Document/Types.pm @@ -2,7 +2,6 @@ package ElasticSearchX::Model::Document::Types; use List::MoreUtils (); use DateTime::Format::Epoch::Unix; use DateTime::Format::ISO8601; -use ElasticSearch; use MooseX::Attribute::Deflator; use MooseX::Attribute::Deflator::Moose; use DateTime; diff --git a/lib/ElasticSearchX/Model/Index.pm b/lib/ElasticSearchX/Model/Index.pm index 66efb20..c20893d 100644 --- a/lib/ElasticSearchX/Model/Index.pm +++ b/lib/ElasticSearchX/Model/Index.pm @@ -104,12 +104,12 @@ sub deployment_statement { sub refresh { my $self = shift; - $self->es->refresh_index( index => $self->name ); + $self->es->indices->refresh( index => $self->name ); } sub delete { my $self = shift; - $self->es->delete_index( index => $self->name ); + $self->es->indices->delete( index => $self->name ); } __PACKAGE__->meta->make_immutable( inline_constructor => 0 ); diff --git a/lib/ElasticSearchX/Model/Role.pm b/lib/ElasticSearchX/Model/Role.pm index 60ec570..e3f432c 100644 --- a/lib/ElasticSearchX/Model/Role.pm +++ b/lib/ElasticSearchX/Model/Role.pm @@ -1,6 +1,6 @@ package ElasticSearchX::Model::Role; use Moose::Role; -use ElasticSearch; +use Search::Elasticsearch; use ElasticSearchX::Model::Index; use version; use ElasticSearchX::Model::Document::Types qw(ES); @@ -8,10 +8,9 @@ use ElasticSearchX::Model::Document::Types qw(ES); has es => ( is => 'rw', lazy_build => 1, coerce => 1, isa => ES ); sub _build_es { - ElasticSearch->new( - servers => '127.0.0.1:9200', - transport => 'http', - timeout => 30, + Search::Elasticsearch->new( + nodes => '127.0.0.1:9200', + cxn => 'HTTPTiny', ); } @@ -24,30 +23,30 @@ sub deploy { $name = $index->alias_for if ( $index->alias_for ); local $@; eval { - $t->request( { method => 'DELETE', cmd => "/$name" } ) + $self->es->indices->delete( index => $name ) } if ( $params{delete} ); my $dep = $index->deployment_statement; my $mapping = delete $dep->{mappings}; eval { - $t->request( + $t->perform_request( { method => 'PUT', - cmd => "/$name", - data => $dep, + path => "/$name", + body => $dep, } ); }; sleep(1); while ( my ( $k, $v ) = each %$mapping ) { - $t->request( + $t->perform_request( { method => 'PUT', - cmd => "/$name/$k/_mapping", - data => { $k => $v }, + path => "/$name/$k/_mapping", + body => { $k => $v }, } ); } if ( my $alias = $index->alias_for ) { my @aliases - = keys %{ $self->es->get_aliases( index => $index->name, ignore_missing => 1 ) + = keys %{ $self->es->indices->get_aliases( index => $index->name, ignore => [404] ) || {} }; my $actions = [ ( map { @@ -56,7 +55,7 @@ sub deploy { ), { add => { index => $alias, alias => $index->name } } ]; - $self->es->aliases( actions => $actions ); + $self->es->indices->update_aliases( body => { actions => $actions } ); } } return 1; @@ -69,7 +68,7 @@ sub bulk { sub es_version { my $self = shift; - my $string = $self->es->current_server_version->{number}; + my $string = $self->es->info->{version}->{number}; $string =~ s/RC//g; return version->parse( $string )->numify; } diff --git a/lib/ElasticSearchX/Model/Scroll.pm b/lib/ElasticSearchX/Model/Scroll.pm index 887a7c4..f51db3b 100644 --- a/lib/ElasticSearchX/Model/Scroll.pm +++ b/lib/ElasticSearchX/Model/Scroll.pm @@ -1,6 +1,6 @@ package ElasticSearchX::Model::Scroll; use Moose; -use ElasticSearch::ScrolledSearch; +use Search::Elasticsearch::Scroll; has scroll => ( is => 'ro', isa => 'Str', required => 1, default => '1m' ); @@ -13,13 +13,12 @@ has set => ( has _scrolled_search => ( is => 'ro', - isa => 'ElasticSearch::ScrolledSearch', + isa => 'Search::Elasticsearch::Scroll', lazy_build => 1, handles => { _next => 'next', total => 'total', - max_score => 'max_score', - eof => 'eof' + max_score => 'max_score' } ); @@ -30,9 +29,9 @@ has qs => ( sub _build__scrolled_search { my $self = shift; - ElasticSearch::ScrolledSearch->new( - $self->set->es, - { %{ $self->set->_build_query }, + Search::Elasticsearch::Scroll->new( + { es => $self->set->es, + body => $self->set->_build_query, scroll => $self->scroll, index => $self->index->name, type => $self->type->short_name, @@ -84,7 +83,7 @@ from. =head2 max_score -Delegates to L. +Delegates to L. =head2 next diff --git a/t/bulk.t b/t/bulk.t index 0738a65..f0deb1d 100644 --- a/t/bulk.t +++ b/t/bulk.t @@ -20,7 +20,7 @@ use Test::MockObject::Extends; use strict; use warnings; -my $es = Test::MockObject::Extends->new( ElasticSearch->new ); +my $es = Test::MockObject::Extends->new( Search::Elasticsearch->new ); my $i = 0; $es->mock( bulk => sub { $i++ } ); @@ -29,7 +29,7 @@ ok( my $model = MyModel->new( es => $es ), 'Created object' ); my $stash; { ok( my $bulk = $model->bulk, 'bulk object' ); - $stash = $bulk->stash; + $stash = $bulk->stash->_buffer; $bulk->put( $model->index('default')->type('tweet') ->new_document( { text => 'foo' } ) ); is( $bulk->stash_size, 1, 'stash size is 1' ); diff --git a/t/es/delete.t b/t/es/delete.t index b10491a..5d5d36f 100644 --- a/t/es/delete.t +++ b/t/es/delete.t @@ -7,6 +7,7 @@ use DateTime; my $model = MyModel->testing; my $twitter = $model->index('twitter')->type('user'); +$twitter->delete; ok( $twitter->put( { nickname => $_, name => 'mo', diff --git a/t/es/fields.t b/t/es/fields.t index e0613e7..7452f2a 100644 --- a/t/es/fields.t +++ b/t/es/fields.t @@ -15,7 +15,7 @@ ok( $twitter->refresh->put( 'Put mo ok' ); -ok( my $user = $twitter->query_type('scan')->fields( ['name'] )->first, 'get name field' ); +ok( my $user = $twitter->query_type('query_then_fetch')->fields( ['name'] )->first, 'get name field' ); is($user->name, 'Moritz Onken', 'got field ok'); is($user->nickname, 'mo', 'id field ok'); diff --git a/t/es/version.t b/t/es/version.t index ed2c81f..7857bc9 100644 --- a/t/es/version.t +++ b/t/es/version.t @@ -41,6 +41,6 @@ ok( $bulk->create($version1), 'bulk create already indexed doc' ); my $return = $bulk->commit; -is( @{ $return->{errors} }, 1, 'error' ); +is( $return->{errors}, 1, 'error' ); done_testing; diff --git a/t/lib/MyModel.pm b/t/lib/MyModel.pm index faa6557..24ec998 100644 --- a/t/lib/MyModel.pm +++ b/t/lib/MyModel.pm @@ -3,6 +3,7 @@ use Moose; use Test::More; use IO::Socket::INET; use ElasticSearchX::Model; +use Search::Elasticsearch; use version; index twitter => ( namespace => 'MyModel' ); @@ -14,12 +15,14 @@ sub testing { 'Requires an ElasticSearch server running on port 9900'; } - my $model = $class->new( es => ':9900' ); + my $model = $class->new( es => Search::Elasticsearch->new( + nodes => "127.0.0.1:9900", + # trace_to => "Stderr", + ) ); if ( $model->es_version < 0.019002 ) { plan skip_all => 'Requires ElasticSearch 0.19.2'; } - # $model->es->trace_calls(1); ok( $model->deploy( delete => 1 ), 'Deploy ok' ); return $model; } diff --git a/t/model/es_version.t b/t/model/es_version.t index 0ed27bc..012bd89 100644 --- a/t/model/es_version.t +++ b/t/model/es_version.t @@ -5,7 +5,7 @@ use Test::MockObject::Extends; use lib qw(t/lib); use MyModel; -my $es = Test::MockObject::Extends->new( ElasticSearch->new ); +my $es = Test::MockObject::Extends->new( Search::Elasticsearch->new ); my $model = MyModel->new( es => $es ); @@ -13,10 +13,11 @@ my $tests = { "0.20.0.RC1" => 0.020000001, "0.19.11" => 0.019011, "0.18.1" => 0.018001, + "1.1.1" => 1.001001, }; while(my ($string, $number) = each %$tests) { - $es->mock( current_server_version => sub { { number => $string } } ); + $es->mock( info => sub { { version => { number => $string } } } ); is($model->es_version, $number, "parse $string as $number"); } diff --git a/t/model/model.t b/t/model/model.t index 641c0df..329716c 100644 --- a/t/model/model.t +++ b/t/model/model.t @@ -64,8 +64,6 @@ is_deeply( ok( $idx->does('MyIndexTrait'), 'Trait has been applied' ); -isa_ok( $idx->model->es, 'ElasticSearch' ); - isa_ok( $idx->type('user'), 'ElasticSearchX::Model::Document::Set' ); is_deeply( $idx->type('user')->index, $idx, 'MyModel::IRC::User' ); diff --git a/t/tutorial.t b/t/tutorial.t index fec3a0f..88cb985 100644 --- a/t/tutorial.t +++ b/t/tutorial.t @@ -58,7 +58,7 @@ my $raw = { $tweets = $tweets->raw; is_deeply( $tweets->get( $tweet->id ), - { %$raw, exists => JSON::true }, + { %$raw, found => JSON::true }, 'Raw response' ); @@ -69,7 +69,7 @@ is_deeply( ); is( $twitter->type('tweet')->filter( { term => { user => 'mo' } } ) - ->query( { field => { 'message.analyzed' => 'baby' } } )->size(100) + ->query( { query_string => { default_field => 'message.analyzed', query => 'baby' } } )->size(100) ->all, 1, 'get all tweets that match "hello"'