Skip to content

Commit

Permalink
use Search::Elasticsearch, tested with 1.1.1
Browse files Browse the repository at this point in the history
  • Loading branch information
monken committed Apr 26, 2014
1 parent e11c3a5 commit d1d189a
Show file tree
Hide file tree
Showing 18 changed files with 104 additions and 79 deletions.
2 changes: 1 addition & 1 deletion dist.ini
Expand Up @@ -22,7 +22,7 @@ DateTime = 0
DateTime::Format::Epoch::Unix = 0
DateTime::Format::ISO8601 = 0
Digest::SHA1 = 0
ElasticSearch = 0.65
Search::Elasticsearch = 1.11
JSON = 0
List::MoreUtils = 0
List::Util = 0
Expand Down
36 changes: 18 additions & 18 deletions lib/ElasticSearchX/Model/Bulk.pm
@@ -1,33 +1,42 @@
package ElasticSearchX::Model::Bulk;
use Search::Elasticsearch::Bulk;
use Moose;

has stash => (
is => 'ro',
traits => ['Array'],
handles => { add => 'push', stash_size => 'count' },
default => sub { [] },
isa => "Search::Elasticsearch::Bulk",
handles => { stash_size => '_buffer_count', commit => "flush" },
lazy_build => 1,
);
has size => ( is => 'ro', isa => 'Int', default => 100 );
has es => ( is => 'ro' );

sub _build_stash {
my $self = shift;
$self->es->bulk_helper( max_count => $self->size );
}

sub add {
my ($self, $action, $payload) = (shift, %{$_[0]});
$payload->{source} = delete $payload->{body};
$self->stash->add_action($action => $payload);
}

sub update {
my ( $self, $doc, $qs ) = @_;
$self->add( { index => ref $doc eq 'HASH' ? $doc : { $doc->_put( $doc->_update($qs) ) } } );
$self->commit if ( $self->stash_size > $self->size );
return $self;
}

sub create {
my ( $self, $doc, $qs ) = @_;
$self->add( { create => ref $doc eq 'HASH' ? $doc : { $doc->_put($qs) } } );
$self->commit if ( $self->stash_size > $self->size );
return $self;
}

sub put {
my ( $self, $doc, $qs ) = @_;
$self->add( { index => ref $doc eq 'HASH' ? $doc : { $doc->_put, %{ $qs || {} } } } );
$self->commit if ( $self->stash_size > $self->size );
return $self;
}

Expand All @@ -42,21 +51,12 @@ sub delete {
}
}
);
$self->commit if ( $self->stash_size > $self->size );
return $self;
}

sub commit {
my $self = shift;
return unless ( $self->stash_size );
my $result = $self->es->bulk( $self->stash );
$self->clear;
return $result;
}

sub clear {
my $self = shift;
@{ $self->stash } = ();
$self->stash->clear_buffer;
return $self;
}

Expand All @@ -80,7 +80,7 @@ __END__
=head1 DESCRIPTION
This class is a wrapper around L<ElasticSearch/bulk()> which adds
This class is a wrapper around L<Search::Elasticsearch::Bulk> which adds
some convenience. By specifiying a L</size> you set the maximum
number of documents that are processed in one request. You can either
L</put> or L</delete> documents. Once the C<$bulk> object is out
Expand All @@ -104,7 +104,7 @@ gets out of scope or if you call L</commit> explicitly.
=head2 es
The L<ElasticSearch> object.
The L<Search::Elasticsearch> object.
=head1 METHODS
Expand Down
5 changes: 3 additions & 2 deletions lib/ElasticSearchX/Model/Document/Role.pm
Expand Up @@ -78,7 +78,8 @@ sub _create {

sub put {
my ( $self, $qs ) = @_;
my $return = $self->index->model->es->index( $self->_put($qs) );
my $method = $qs && ref $qs eq "HASH" && (delete $qs->{create}) ? "create" : "index";
my $return = $self->index->model->es->$method( $self->_put($qs) );
$self->_clear_loaded_attributes;
my $id = $self->meta->get_id_attribute;
$id->set_value( $self, $return->{_id} ) if ($id);
Expand All @@ -98,7 +99,7 @@ sub _put {
index => $self->index->name,
type => $self->meta->short_name,
$id ? ( id => $id ) : (),
data => $data,
body => $data,
$parent ? ( parent => $parent->get_value($self) ) : (),
%$qs,
);
Expand Down
40 changes: 21 additions & 19 deletions lib/ElasticSearchX/Model/Document/Set.pm
Expand Up @@ -35,9 +35,11 @@ sub add_sort { push( @{ $_[0]->sort }, $_[1] ); return $_[0]; }

sub add_field { push( @{ $_[0]->fields }, $_[1] ); return $_[0]; }

has query_type =>
has search_type =>
( isa => QueryType, is => 'rw', traits => [qw(ChainedClone)] );

sub query_type { shift->search_type(@_) }

has mixin => ( is => 'ro', isa => 'HashRef', traits => [qw(ChainedClone)] );

has inflate =>
Expand All @@ -60,17 +62,16 @@ sub _build_qs {

# we only want to set qs if they are not the default
$qs->{refresh} = 1 if ( $self->_refresh );
$qs->{query_type} = $self->query_type if ( $self->query_type );
$qs->{search_type} = $self->search_type if $self->search_type;
return $qs;
}

sub _build_query {
my $self = shift;
my $query
= { query => $self->query ? $self->query : { match_all => {} } };
$query->{filter} = $self->filter if ( $self->filter );
$query = { query => { filtered => $query } }
if ( $self->filter && !$self->query );
$query = { query => { filtered => { filter => $self->filter, query => $query->{query} } } }
if $self->filter;
my $q = {
%$query,
$self->size ? ( size => $self->size ) : (),
Expand Down Expand Up @@ -137,7 +138,7 @@ sub get {
type => $type,
id => $id,
$self->fields ? ( fields => $self->fields ) : (),
ignore_missing => 1,
ignore => [404],
%{ $qs || {} },
);
return undef unless ($res);
Expand All @@ -148,11 +149,12 @@ sub all {
my ( $self, $qs ) = @_;
$qs = $self->_build_qs($qs);
my ( $index, $type ) = ( $self->index->name, $self->type->short_name );
my $res = $self->es->transport->request(
{ method => 'POST',
cmd => "/$index/$type/_search",
data => $self->_build_query,
qs => { version => 1, %{ $qs || {} } },
my $res = $self->es->search(
{ index => $index,
type => $type,
body => $self->_build_query,
version => 1,
%{ $qs || {} },
}
);
return $res unless ( $self->inflate );
Expand All @@ -173,14 +175,14 @@ sub count {
my ( $self, $qs ) = @_;
$qs = $self->_build_qs($qs);
my ( $index, $type ) = ( $self->index->name, $self->type->short_name );
my $res = $self->es->transport->request(
{ method => 'POST',
cmd => "/$index/$type/_search",
data => { %{ $self->_build_query }, size => 0 },
qs => $qs,
my $res = $self->es->count(
{ index => $index,
type => $type,
body => { %{ $self->_build_query } },
%$qs,
}
);
return $res->{hits}->{total};
return $res->{count};
}

sub delete {
Expand All @@ -190,7 +192,7 @@ sub delete {
return $self->es->delete_by_query(
index => $self->index->name,
type => $self->type->short_name,
query => $query->{filter} ? { filtered => $query } : $query->{query},
body => $query,
%$qs,
);
}
Expand Down Expand Up @@ -279,7 +281,7 @@ build a C<filtered> query, which performs far better.
=head2 sort
=head2 query_type
=head2 search_type
These attributes are passed directly to the ElasticSearch search request.
Expand Down
9 changes: 8 additions & 1 deletion lib/ElasticSearchX/Model/Document/Trait/Attribute.pm
Expand Up @@ -27,6 +27,14 @@ has property => ( is => 'ro', isa => 'Bool', default => 1 );
has query_property => ( is => 'ro', isa => 'Bool', default => 0 );
has field_name =>
( is => 'ro', isa => 'Str', lazy => 1, default => sub { shift->name } );
has isa_arrayref => ( is => 'ro', isa => 'Bool', builder => '_build_isa_arrayref' );

sub _build_isa_arrayref {
my $self = shift;
my $tc = $self->type_constraint;
return 0 unless $tc;
return $tc->is_a_type_of("ArrayRef");
}

sub build_property {
my $self = shift;
Expand All @@ -42,7 +50,6 @@ before _process_options => sub {
%$options = ( builder => 'build_id', lazy => 1, %$options )
if ( $options->{id} && ref $options->{id} eq 'ARRAY' );

#$options->{required} = 1 if($options->{id});
$options->{traits} ||= [];
push(
@{ $options->{traits} },
Expand Down
15 changes: 15 additions & 0 deletions lib/ElasticSearchX/Model/Document/Trait/Class.pm
Expand Up @@ -12,6 +12,8 @@ has set_class => ( is => 'ro', builder => '_build_set_class', lazy => 1 );
has short_name => ( is => 'ro', builder => '_build_short_name', lazy => 1 );
has _all_properties =>
( is => 'ro', lazy => 1, builder => '_build_all_properties' );
has _isa_arrayref =>
( is => 'ro', lazy => 1, builder => '_build_isa_arrayref' );

has _field_alias => (
is => 'ro',
Expand Down Expand Up @@ -142,6 +144,13 @@ sub _build_all_properties {
];
}

sub _build_isa_arrayref {
return {
map { $_->name => $_->isa_arrayref }
@{shift->_all_properties}
};
}

sub get_data {
my ( $self, $instance ) = @_;
return {
Expand Down Expand Up @@ -178,7 +187,13 @@ sub inflate_result {

#my $id = $self->get_id_attribute;
my $parent = $self->get_parent_attribute;
my $arrays = $self->_isa_arrayref;
my $fields = { %{ $res->{_source} || {} }, %{ $res->{fields} || {} } };
$fields = { map {
my $is_array = ref $fields->{$_} eq "ARRAY";
$arrays->{$_} && !$is_array ? ( $_ => [$fields->{$_}] ) :
!$arrays->{$_} && $is_array ? ( $_ => $fields->{$_}->[0] ) : ( $_ => $fields->{$_} );
} keys %$fields };
my $map = $self->_reverse_field_alias;
map {
$fields->{ $map->{$_} }
Expand Down
1 change: 0 additions & 1 deletion lib/ElasticSearchX/Model/Document/Types.pm
Expand Up @@ -2,7 +2,6 @@ package ElasticSearchX::Model::Document::Types;
use List::MoreUtils ();
use DateTime::Format::Epoch::Unix;
use DateTime::Format::ISO8601;
use ElasticSearch;
use MooseX::Attribute::Deflator;
use MooseX::Attribute::Deflator::Moose;
use DateTime;
Expand Down
4 changes: 2 additions & 2 deletions lib/ElasticSearchX/Model/Index.pm
Expand Up @@ -104,12 +104,12 @@ sub deployment_statement {

sub refresh {
my $self = shift;
$self->es->refresh_index( index => $self->name );
$self->es->indices->refresh( index => $self->name );
}

sub delete {
my $self = shift;
$self->es->delete_index( index => $self->name );
$self->es->indices->delete( index => $self->name );
}

__PACKAGE__->meta->make_immutable( inline_constructor => 0 );
Expand Down
29 changes: 14 additions & 15 deletions lib/ElasticSearchX/Model/Role.pm
@@ -1,17 +1,16 @@
package ElasticSearchX::Model::Role;
use Moose::Role;
use ElasticSearch;
use Search::Elasticsearch;
use ElasticSearchX::Model::Index;
use version;
use ElasticSearchX::Model::Document::Types qw(ES);

has es => ( is => 'rw', lazy_build => 1, coerce => 1, isa => ES );

sub _build_es {
ElasticSearch->new(
servers => '127.0.0.1:9200',
transport => 'http',
timeout => 30,
Search::Elasticsearch->new(
nodes => '127.0.0.1:9200',
cxn => 'HTTPTiny',
);
}

Expand All @@ -24,30 +23,30 @@ sub deploy {
$name = $index->alias_for if ( $index->alias_for );
local $@;
eval {
$t->request( { method => 'DELETE', cmd => "/$name" } )
$self->es->indices->delete( index => $name )
} if ( $params{delete} );
my $dep = $index->deployment_statement;
my $mapping = delete $dep->{mappings};
eval {
$t->request(
$t->perform_request(
{ method => 'PUT',
cmd => "/$name",
data => $dep,
path => "/$name",
body => $dep,
}
);
};
sleep(1);
while ( my ( $k, $v ) = each %$mapping ) {
$t->request(
$t->perform_request(
{ method => 'PUT',
cmd => "/$name/$k/_mapping",
data => { $k => $v },
path => "/$name/$k/_mapping",
body => { $k => $v },
}
);
}
if ( my $alias = $index->alias_for ) {
my @aliases
= keys %{ $self->es->get_aliases( index => $index->name, ignore_missing => 1 )
= keys %{ $self->es->indices->get_aliases( index => $index->name, ignore => [404] )
|| {} };
my $actions = [
( map {
Expand All @@ -56,7 +55,7 @@ sub deploy {
),
{ add => { index => $alias, alias => $index->name } }
];
$self->es->aliases( actions => $actions );
$self->es->indices->update_aliases( body => { actions => $actions } );
}
}
return 1;
Expand All @@ -69,7 +68,7 @@ sub bulk {

sub es_version {
my $self = shift;
my $string = $self->es->current_server_version->{number};
my $string = $self->es->info->{version}->{number};
$string =~ s/RC//g;
return version->parse( $string )->numify;
}
Expand Down

0 comments on commit d1d189a

Please sign in to comment.