Skip to content
Browse files

merge with http://github.com/franckcuny/anyevent-riak/commit/61c5d22e…

  • Loading branch information...
2 parents 15032c3 + 61c5d22 commit da042b03699299f7bbf45fca4e4d1472e8b589ed @trung committed Apr 12, 2010
Showing with 369 additions and 202 deletions.
  1. +10 −0 .gitignore
  2. +6 −0 .shipit
  3. +2 −0 Changes
  4. +2 −1 Makefile.PL
  5. +276 −172 lib/AnyEvent/Riak.pm
  6. +73 −29 t/basic.t
View
10 .gitignore
@@ -0,0 +1,10 @@
+cover_db
+META.yml
+Makefile
+blib
+inc
+pm_to_blib
+MANIFEST
+Makefile.old
+MANIFEST.SKIP
+.prove
View
6 .shipit
@@ -0,0 +1,6 @@
+steps = FindVersion, ChangeVersion, CheckChangeLog, DistTest, Commit, Tag, MakeDist
+git.tagpattern = release-%v
+git.push_to = release
+
+CheckChangeLog.files = Changes
+MakeDist.destination = ~/code/distribs/
View
2 Changes
@@ -2,3 +2,5 @@ Revision history for Perl extension AnyEvent::Riak
0.01 Thu Dec 10 14:22:38 2009
- original version
+0.02
+ - Fix to adapt with Riak REST api 0.9.1
View
3 Makefile.PL
@@ -3,7 +3,7 @@ name 'AnyEvent-Riak';
all_from 'lib/AnyEvent/Riak.pm';
requires 'URI';
-requires 'JSON:XS';
+requires 'JSON::XS';
requires 'AnyEvent';
requires 'AnyEvent::HTTP';
requires 'Data::UUID';
@@ -15,4 +15,5 @@ build_requires 'Test::More';
build_requires 'Test::Exception';
use_test_base;
auto_include;
+# auto_set_repository;
WriteAll;
View
448 lib/AnyEvent/Riak.pm
@@ -1,177 +1,260 @@
package AnyEvent::Riak;
use strict;
+use warnings;
use Carp;
use URI;
-use JSON::XS;
+use JSON;
use AnyEvent;
use AnyEvent::HTTP;
use Data::UUID;
+use MIME::Base64;
+use YAML::Syck;
our $VERSION = '0.02';
sub new {
my ( $class, %args ) = @_;
- my $uuid = Data::UUID->new;
+ my $host = delete $args{host} || 'http://127.0.0.1:8098';
+ my $path = delete $args{path} || 'riak';
+ my $mapred_path = delete $args{mapred_path} || 'mapred';
+ my $r = delete $args{r} || 2;
+ my $d = delete $args{w} || 2;
+ my $dw = delete $args{dw} || 2;
+ my $uuid = Data::UUID->new;
- my $host = delete $args{host} || 'http://127.0.0.1:8098';
- my $path = delete $args{path} || 'riak';
- my $clientId = delete $args{clientId} || $uuid->create_str;
+ my $client_id
+ = "perl_anyevent_riak_" . $uuid->create_str;
bless {
- host => $host,
- path => $path,
- clientId => $clientId,
+ host => $host,
+ path => $path,
+ mapred_path => $mapred_path,
+ client_id => $client_id,
+ r => $r,
+ d => $d,
+ dw => $dw,
%args,
}, $class;
}
-sub set_bucket {
- my ( $self, $bucket, $schema ) = @_;
+sub _build_uri {
+ my ( $self, $path, $options ) = @_;
+ my $uri = URI->new( $self->{host} );
+ $uri->path( join( "/", @$path ) );
+ $uri->query_form( $self->_build_query($options) );
+ return $uri->as_string;
+}
- $self->_request(
- 'PUT', $self->_build_uri( [$bucket] ),
- '204', encode_json { props => $schema }
- );
+sub _build_headers {
+ my ( $self, $options) = @_;
+ my $headers = {
+ 'X-Riak-ClientId' => $self->{client_id},
+ 'Content-Type' => 'application/json',
+ };
+ # TODO add headers
+ return $headers;
}
-sub list_bucket {
- my ( $self, $bucket ) = @_;
- return $self->_request( 'GET', $self->_build_uri( [$bucket] ), '200' );
+sub _build_query {
+ my ($self, $options) = @_;
+ my $valid_options = [qw/props keys returnbody/];
+ my $query;
+ foreach (@$valid_options) {
+ $query->{$_} = $options->{$_} if exists $options->{$_}
+ }
+ $query;
}
-sub fetch {
- my ( $self, $bucket, $key, $r ) = @_;
- $r = $self->{r} || 2 if !$r;
- return $self->_request( 'GET',
- $self->_build_uri( [ $bucket, $key ], { r => $r } ), '200,300,304' );
+sub default_cb {
+ my ( $self, $options ) = @_;
+ return sub {
+ my $res = shift;
+ return $res;
+ };
}
-sub store {
- my ( $self, $object, $w, $dw, ) = @_;
-
- $w = $self->{w} || 2 if !$w;
- $dw = $self->{dw} || 2 if !$dw;
-
- my $bucket = $object->{bucket};
- my $key = $object->{key};
- $object->{links} = [] if !exists $object->{links};
-
- # Normal status codes: 200 OK, 204 No Content, 300 Multiple Choices.
- # FIXME Links must be set in the Links header
- return $self->_request(
- 'PUT',
- $self->_build_uri(
- [ $bucket, $key ],
- {
- w => $w,
- dw => $dw,
- returnbody => 'true'
+sub is_alive {
+ my ( $self, %options ) = @_;
+ my ( $cv, $cb );
+
+ $cv = AE::cv;
+ if ( $options{callback} ) {
+ $cb = delete $options{callback};
+ }
+ else {
+ $cb = $self->default_cb();
+ }
+
+ http_request(
+ GET => $self->_build_uri( [qw/ping/] ),
+ headers => $self->_build_headers(%options),
+ sub {
+ my ( $body, $headers ) = @_;
+ if ( $headers->{Status} == 200 ) {
+ $cv->send( $cb->(1) );
}
- ),
- '200,204,300',
- encode_json $object);
+ else {
+ $cv->send( $cb->(0) );
+ }
+ },
+ );
+ return $cv;
}
-sub delete {
- my ( $self, $bucket, $key, $rw ) = @_;
+sub list_bucket {
+ my ( $self, $bucket_name, %options ) = @_;
+ my ( $cv, $cb );
- $rw = $self->{rw} || 2 if !$rw;
- return $self->_request( 'DELETE',
- $self->_build_uri( [ $bucket, $key ], { dw => $rw } ), 204 );
-}
+ $cv = AE::cv;
+ if ( $options{callback} ) {
+ $cb = delete $options{callback};
+ }
+ else {
+ $cb = $self->default_cb();
+ }
-# FIXME doesn't work. Must handle multipart/fixed returned content
-sub walk {
- my ( $self, $bucket, $key, $spec ) = @_;
- my $path = $self->_build_uri( [ $bucket, $key ] );
- $path .= $self->_build_spec($spec);
- return $self->_request( 'GET', $path, 200 );
+ http_request(
+ GET => $self->_build_uri(
+ [ $self->{path}, $bucket_name ],
+ $options{parameters}
+ ),
+ headers => $self->_build_headers( $options{parameters} ),
+ sub {
+ my ( $body, $headers ) = @_;
+ if ( $body && $headers->{Status} == 200 ) {
+ my $res = JSON::decode_json($body);
+ $cv->send( $cb->($res) );
+ }
+ else {
+ $cv->send(undef);
+ }
+ }
+ );
+ return $cv;
}
-sub get_clientId {
- my ($self) = @_;
- return $self->{clientId};
-}
+sub set_bucket {
+ my ( $self, $bucket, $schema, %options ) = @_;
+ my ( $cv, $cb );
-sub _build_spec {
- my ( $self, $spec ) = @_;
- my $acc = '/';
- foreach my $item (@$spec) {
- $acc
- .= ( $item->{bucket} || '_' ) . ','
- . ( $item->{tag} || '_' ) . ','
- . ( $item->{acc} || '_' ) . '/';
+ $cv = AE::cv;
+ if ( $options{callback} ) {
+ $cb = delete $options{callback};
+ }
+ else {
+ $cb = $self->default_cb();
}
- return $acc;
-}
-sub _build_uri {
- my ( $self, $path, $query ) = @_;
- my $uri = URI->new( $self->{host} );
- $uri->path( $self->{path} . "/" . join( "/", @$path ) );
- $uri->query_form(%$query) if $query;
- return $uri->as_string;
+ http_request(
+ PUT => $self->_build_uri(
+ [ $self->{path}, 'bucket' ],
+ $options{parameters}
+ ),
+ headers => $self->_build_headers( $options{parameters} ),
+ body => JSON::encode_json($schema),
+ sub {
+ my ( $body, $headers ) = @_;
+ if ( $headers->{Status} == 204 ) {
+ $cv->send( $cb->(1) );
+ }
+ else {
+ $cv->send( $cb->(0) );
+ }
+ }
+ );
+ $cv;
}
-sub _request {
- my ( $self, $method, $uri, $expected, $body ) = @_;
- my $cv = AnyEvent->condvar;
-
- my $cb = sub {
- my ( $body, $headers ) = @_;
- if ( $expected =~ m/$headers->{Status}/ ) {
- eval {
- if ($body) {
- return $cv->send( decode_json($body) );
- } else {
- return $cv->send(1);
- }
- };
- if ($@) {
- return $cv->croak(
- JSON::XS->new->pretty(1)->encode( { method => $method,
- uri => $uri,
- body => $body,
- status => $headers->{Status},
- reason => $headers->{Reason},
- error => $@ } ) );
- }
- }
- else {
- return $cv->croak(
- JSON::XS->new->pretty(1)->encode( { method => $method,
- uri => $uri,
- body => $body,
- status => $headers->{Status},
- reason => $headers->{Reason}}) );
+sub fetch {
+ my ( $self, $bucket, $key, %options ) = @_;
+ my ( $cv, $cb );
+
+ $cv = AE::cv;
+ if ( $options{callback} ) {
+ $cb = delete $options{callback};
+ }
+ else {
+ $cb = $self->default_cb();
+ }
+
+ http_request(
+ GET => $self->_build_uri(
+ [ $self->{path}, $bucket, $key ],
+ $options{parameters}
+ ),
+ headers => $self->_build_headers( $options{parameters} ),
+ sub {
+ my ($body, $headers) = @_;
+ if ($body && $headers->{Status} == 200) {
+ $cv->send( $cb->(JSON::decode_json($body)) );
+ }else{
+ $cv->send( $cb->(0) );
+ }
}
- };
- if ($body) {
- http_request(
- $method => $uri,
- headers => $self->_build_headers,
- body => $body,
- $cb
- );
+ );
+ $cv;
+}
+
+sub store {
+ my ( $self, $bucket, $key, $object, %options ) = @_;
+ my ( $cv, $cb );
+
+ $cv = AE::cv;
+ if ( $options{callback} ) {
+ $cb = delete $options{callback};
}
else {
- http_request(
- $method => $uri,
- headers => $self->_build_headers,
- $cb
- );
+ $cb = $self->default_cb();
}
+
+ my $json = JSON::encode_json($object);
+
+ http_request(
+ POST => $self->_build_uri(
+ [ $self->{path}, $bucket, $key ],
+ $options{parameters}
+ ),
+ headers => $self->_build_headers( $options{parameters} ),
+ body => $json,
+ sub {
+ my ($body, $headers) = @_;
+ my $result;
+ if ($headers->{Status} == 204) {
+ $result = $body ? JSON::decode_json($body) : 1;
+ }else{
+ $result = 0;
+ }
+ $cv->send( $cb->($result) );
+ }
+ );
$cv;
}
-sub _build_headers {
- my ($self) = @_;
- return {
- 'Content-Type' => 'application/json',
- 'X-Riak-ClientId' => $self->{clientId},
- };
+sub delete {
+ my ( $self, $bucket, $key, %options ) = @_;
+ my ( $cv, $cb );
+
+ $cv = AE::cv;
+ if ( $options{callback} ) {
+ $cb = delete $options{callback};
+ }
+ else {
+ $cb = $self->default_cb();
+ }
+
+ http_request(
+ DELETE => $self->_build_uri(
+ [ $self->{path}, $bucket, $key ],
+ $options{parameters}
+ ),
+ headers => $self->_build_headers( $options{parameters} ),
+ sub {
+ $cv->send( $cb->(@_) );
+ }
+ );
+ $cv;
}
1;
@@ -183,77 +266,98 @@ AnyEvent::Riak - Non-blocking Riak client
=head1 SYNOPSIS
- use AnyEvent::Riak;
+ use AnyEvent::Riak;
- my $riak = AnyEvent::Riak->new(
- host => 'http://127.0.0.1:8098',
- path => 'jiak',
- );
+ my $riak = AnyEvent::Riak->new(
+ host => 'http://127.0.0.1:8098',
+ path => 'riak',
+ );
+
+ die "Riak is not running" unless $riak->is_alive->recv;
+
+ my $bucket = $riak->set_bucket( 'foo',
+ parameters => { { props => { n_val => 2 } } } )->recv;
- my $buckets = $riak->list_bucket('bucketname')->recv;
- my $new_bucket = $riak->set_bucket('foo', {allowed_fields => '*'})->recv;
- my $store = $riak->store({bucket => 'foo', key => 'bar', object => {baz => 1},link => []})->recv;
- my $fetch = $riak->fetch('foo', 'bar')->recv;
- my $delete = $riak->delete('foo', 'bar')->recv;
+
+This version is not compatible with the previous version (0.01) of this module and with Riak < 0.91.
+
+For a complete description of the Riak REST API, please refer to
+L<https://wiki.basho.com/display/RIAK/REST+API>.
=head1 DESCRIPTION
-AnyEvent::Riak is a non-blocking riak client using anyevent.
+AnyEvent::Riak is a non-blocking riak client using C<AnyEvent>. This client allows you to connect to a Riak instance, create, modify and delete Riak objects.
=head2 METHODS
=over 4
-=item B<list_bucket>
+=item B<is_alive>([callback => sub { }, parameters => { }])
+
+Check if the Riak server is alive. If the ping is successful, 1 is returned,
+else 0.
-Get the schema and key list for 'bucket'
+ my $ping = $riak->is_alive->recv;
- $riak->list_bucket('bucketname')->recv;
+=item B<list_bucket>($bucketname, [callback => sub { }, parameters => { }])
-=item B<set_bucket>
+Get the schema and key list for 'bucket'. Possible parameters are:
-Set the schema for 'bucket'. The schema parameter must be a hash with at least
-an 'allowed_fields' field. Other valid fields are 'requried_fields',
-'read_mask', and 'write_mask'.
+=over 2
- $riak->new_bucket('bucketname', {allowed_fields => '*'})->recv;
+=item
-=item B<fetch>
+props=[true|false] - whether to return the bucket properties
-Get the object stored in 'bucket' at 'key'.
+=item
- $riak->fetch('bucketname', 'key')->recv;
+keys=[true|false|stream] - whether to return the keys stored in the bucket
-=item B<store>
+=back
+
+If the operation failed, C<undef> is returned, else an hash reference
+describing the bucket is returned.
+
+ my $bucket = $riak->list_bucket(
+ 'bucketname',
+ parameters => {
+ props => 'false',
+ },
+ callback => sub {
+ my $struct = shift;
+ if ( scalar @{ $struct->{keys} } ) {
+ # do something
+ }
+ }
+ );
+
+=item B<set_bucket>($bucketname, $bucketschema, [parameters => { }, callback => sub { }])
-Store 'object' in Riak. If the object has not defined its 'key' field, a key
-will be chosen for it by the server.
+Sets bucket properties like n_val and allow_mult.
- $riak->store({
- bucket => 'bucketname',
- key => 'key',
- object => { foo => "bar", baz => 2 },
- links => [],
- })->recv;
+=over 2
+
+=item
+
+n_val - the number of replicas for objects in this bucket
+
+=item
+
+allow_mult - whether to allow sibling objects to be created (concurrent updates)
+
+=back
-=item B<delete>
+If successful, B<1> is returned, else B<0>.
-Delete the data stored in 'bucket' at 'key'.
+ my $result = $riak->set_bucket('bucket')->recv;
- $riak->delete('bucketname', 'key')->recv;
+=item B<fetch>($bucketname, $object, [parameters => { }, callback => sub { }])
-=item B<walk>
+Reads an object from a bucket.
-Follow links from the object stored in 'bucket' at 'key' to other objects.
-The 'spec' parameter should be an array of hashes, each hash optinally
-defining 'bucket', 'tag', and 'acc' fields. If a field is not defined in a
-spec hash, the wildcard '_' will be used instead.
+=item B<store>($bucketname, $objectname, $objectdata, [parameters => { }, callback => sub { }]);
- ok $res = $jiak->walk(
- 'bucketname',
- 'key',
- [ { bucket => 'bucketname', key => '_' } ]
- )->recv;
+=item B<delete>($bucketname, $objectname, [parameters => { }, callback => sub { }]);
=back
@@ -265,7 +369,7 @@ franck cuny E<lt>franck@lumberjaph.netE<gt>
=head1 LICENSE
-Copyright 2009 by linkfluence.
+Copyright 2009, 2010 by linkfluence.
L<http://linkfluence.net>
View
102 t/basic.t
@@ -1,58 +1,102 @@
use strict;
use warnings;
+
use Test::More;
use JSON::XS;
use Test::Exception;
use AnyEvent::Riak;
+use YAML::Syck;
+
+#plan tests => 15;
+
+my ( $host, $path );
-my ($host, $path);
BEGIN {
my $riak_test = $ENV{RIAK_TEST_SERVER};
( $host, $path ) = split ";", $riak_test if $riak_test;
plan skip_all =>
- 'set $ENV{RIAK_TEST_SERVER} like this http://127.0.0.1:8098;jiak if you want to run the tests'
+ 'set $ENV{RIAK_TEST_SERVER} if you want to run the tests'
unless ( $host && $path );
}
-my $jiak = AnyEvent::Riak->new( host => $host, path => $path );
+ok my $riak = AnyEvent::Riak->new( host => $host, path => $path, w => 1,
+ dw => 1),
+ 'create riak object';
+
+# ping
+ok my $ping_one = $riak->is_alive(
+ callback => sub {
+ my $res = shift;
+ pass "is alive in cb" if $res;
+ }
+ ),
+ 'ping with callback';
-ok my $clientId = $jiak->get_clientId, "... get clientId";
+ok my $ping_two = $riak->is_alive()->recv, 'ping without callback';
-ok defined $clientId, "... clientId defined. Value is $clientId";
+ok my $s = $ping_one->recv, 'response from ping without callback';
+is $s, 1, 'valid response from ping';
-ok my $buckets = $jiak->list_bucket('bar')->recv, "... fetch bucket list";
-is scalar @{ $buckets->{keys} }, '0', '... no keys';
+# list bucket
+ok my $bucket_cb = $riak->list_bucket(
+ 'bar',
+ parameters => { props => 'true', keys => 'true' },
+ callback => sub {
+ my $res = shift;
+ ok $res->{props};
+ is scalar @{ $res->{keys} }, 0, '0 keys in cb';
+ }
+ ),
+ 'fetch bucket list';
+ok my $buckets = $riak->list_bucket('bar')->recv, "fetch bucket list, twice";
+is scalar @{ $buckets->{keys} }, '0', 'no keys';
+
+ok my $res_bucket = $bucket_cb->recv, 'get bucket';
+
+# set bucket
ok my $new_bucket
- = $jiak->set_bucket( 'foo', { allow_mult => 'false' } )->recv,
- '... set a new bucket';
+ = $riak->set_bucket( 'foo', { props => { n_val => 2 } } )->recv,
+ 'set a new bucket';
my $value = {
- bucket => 'foo',
- key => 'bar',
- object => { foo => "bar", baz => 1 },
- links => []
+ foo => 'bar',
};
-ok my $res = $jiak->store($value)->recv, '... set a new key';
+ok my $res = $riak->store('foo', 'bar', $value)->recv, 'set a new key';
-ok $res = $jiak->fetch( 'foo', 'bar' )->recv, '... fetch our new key';
-ok $res = $jiak->delete( 'foo', 'bar' )->recv, '... delete our key';
+ok $res = $riak->fetch( 'foo', 'bar' )->recv, 'fetch our new key';
+is_deeply $res, $value, 'value is ok';
+# ok $res = $riak->delete( 'foo', 'bar' )->recv, 'delete our key';
-dies_ok { $jiak->fetch( 'foo', 'foo' )->recv } '... dies when error';
-like $@, qr/404/, '... 404 response';
+# ok my $store_w_cb = $riak->store(
+# 'foo', 'bar3', $value, undef, undef,
+# sub {
+# pass "store value ok";
+# $riak->fetch(
+# 'foo', 'bar3', undef,
+# sub {
+# my $body = shift;
+# is_deeply (JSON::decode_json($body), $value, 'value is ok in cb');
+# }
+# );
+# }
+# );
-ok $res = $jiak->store($value)->recv, '... set a new key';
-my $second_value = {
- bucket => 'foo',
- key => 'baz',
- object => { foo => "bar", baz => 2 },
- links => [ [ 'foo', 'bar', 'tagged' ] ],
-};
-ok $res = $jiak->store($second_value)->recv, '... set another new key';
+# ok my $final_res = $store_w_cb->recv;
+# $final_res->recv; # FIXME all cb should be called at this point
+
+# ok $res = $riak->store($value)->recv, '... set a new key';
+# my $second_value = {
+# bucket => 'foo',
+# key => 'baz',
+# object => { foo => "bar", baz => 2 },
+# links => [ [ 'foo', 'bar', 'tagged' ] ],
+# };
+# ok $res = $riak->store($second_value)->recv, '... set another new key';
-#ok $res = $jiak->walk( 'foo', 'baz', [ { bucket => 'foo', } ] )->recv,
-# '... walk';
-#is $res->{results}->[0]->[0]->{key}, "bar", "... walked to bar";
+# ok $res = $riak->walk( 'foo', 'baz', [ { bucket => 'foo', } ] )->recv,
+# '... walk';
+# is $res->{results}->[0]->[0]->{key}, "bar", "... walked to bar";
done_testing();

0 comments on commit da042b0

Please sign in to comment.
Something went wrong with that request. Please try again.