Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

refactoring - implent raw http interface

  • Loading branch information...
commit a7b240d6f2c32fed4da316507c0faa65bbaff449 1 parent 6a06dda
@franckcuny franckcuny authored
Showing with 116 additions and 29 deletions.
  1. +116 −29 lib/AnyEvent/Riak.pm
View
145 lib/AnyEvent/Riak.pm
@@ -1,27 +1,100 @@
package AnyEvent::Riak;
use strict;
+use warnings;
use Carp;
use URI;
-use JSON::XS;
+use JSON;
use AnyEvent;
use AnyEvent::HTTP;
+use MIME::Base64;
+use YAML::Syck;
-our $VERSION = '0.01';
+our $VERSION = '0.02';
sub new {
my ( $class, %args ) = @_;
- my $host = delete $args{host} || 'http://127.0.0.1:8098';
- my $path = delete $args{path} || 'jiak';
+ my $host = delete $args{host} || 'http://127.0.0.1:8098';
+ my $path = delete $args{path} || 'riak';
+ my $mapred_path = delete $args{mapred_path} || 'mapred';
+ my $r = delete $args{r} || 2;
+ my $d = delete $args{w} || 2;
+ my $dw = delete $args{dw} || 2;
+
+ my $client_id
+ = "perl_anyevent_riak_" . encode_base64( int( rand(10737411824) ) );
bless {
- host => $host,
- path => $path,
+ host => $host,
+ path => $path,
+ mapred_path => $mapred_path,
+ client_id => $client_id,
+ r => $r,
+ d => $d,
+ dw => $dw,
%args,
}, $class;
}
+# sub cvcb {
+# my ( $self, $options, $status, $is_json ) = @_;
+# $is_json ||= 0;
+# $status ||= 200;
+# my $cv = AnyEvent->condvar;
+
+# my $success = sub {
+# my ($resp) = @_;
+# $cv->send($resp);
+# };
+
+# my $error = sub {
+# my ( $headers, $resp ) = @_;
+# $cv->croak( [ $headers, $resp ] );
+# };
+
+# my $cb = sub {
+# my ( $body, $headers ) = @_;
+# my $response;
+# if ($is_json) {
+# eval { $response = JSON::decode_json($body); };
+# if ( !$response ) {
+# $cv->croak( [ 'decode_error', $@, $body, $headers ] );
+# }
+# }
+# else {
+# $response = $body;
+# }
+# if ( $headers->{Status} eq $status ) {
+# $success->($response);
+# }
+# else {
+# $error->( $headers, $response );
+# }
+# };
+# return ( $cv, $cb );
+# }
+
+sub _build_headers {
+ my ( $self, ) = @_;
+ my $headers = {};
+ $headers = {
+ 'Content-Type' => 'application/json',
+ 'X-Riak-ClientId' => $self->{client_id},
+ };
+ return $headers;
+}
+
+sub is_alive {
+ my $self = shift;
+ $self->_request( 'GET', $self->_build_uri( [qw/ping/] ), 200 );
+}
+
+sub list_bucket {
+ my ( $self, $bucket_name, $options ) = @_;
+ $self->_request('GET', $self->_build_uri([$self->{path}, $bucket_name]), 200);
+}
+
sub set_bucket {
my ( $self, $bucket, $schema ) = @_;
@@ -39,16 +112,11 @@ sub set_bucket {
}
$self->_request(
- 'PUT', $self->_build_uri( [$bucket] ),
+ 'PUT', $self->_build_uri( [$self->{path}, $bucket] ),
'204', encode_json { schema => $schema }
);
}
-sub list_bucket {
- my ( $self, $bucket ) = @_;
- return $self->_request( 'GET', $self->_build_uri( [$bucket] ), '200' );
-}
-
sub fetch {
my ( $self, $bucket, $key, $r ) = @_;
$r = $self->{r} || 2 if !$r;
@@ -110,26 +178,35 @@ sub _build_spec {
sub _build_uri {
my ( $self, $path, $query ) = @_;
my $uri = URI->new( $self->{host} );
- $uri->path( $self->{path} . "/" . join( "/", @$path ) );
+ $uri->path( join( "/", @$path ) );
$uri->query_form(%$query) if $query;
return $uri->as_string;
}
+sub _build_query {
+ my ($self, $options) = @_;
+}
+
+
sub _request {
my ( $self, $method, $uri, $expected, $body ) = @_;
my $cv = AnyEvent->condvar;
my $cb = sub {
my ( $body, $headers ) = @_;
if ( $headers->{Status} == $expected ) {
- $body
- ? return $cv->send( decode_json($body) )
- : return $cv->send(1);
+ if ( $body && $headers->{'content-type'} eq 'application/json' ) {
+ return $cv->send( decode_json($body) );
+ }
+ else {
+ return $cv->send(1);
+ }
}
else {
return $cv->croak(
encode_json( [ $headers->{Status}, $headers->{Reason} ] ) );
}
};
+
if ($body) {
http_request(
$method => $uri,
@@ -161,23 +238,33 @@ AnyEvent::Riak - Non-blocking Riak client
my $riak = AnyEvent::Riak->new(
host => 'http://127.0.0.1:8098',
- path => 'jiak',
+ path => 'riak',
);
- my $buckets = $riak->list_bucket('bucketname')->recv;
- my $new_bucket = $riak->set_bucket('foo', {allowed_fields => '*'})->recv;
- my $store = $riak->store({bucket => 'foo', key => 'bar', object => {baz => 1},link => []})->recv;
- my $fetch = $riak->fetch('foo', 'bar')->recv;
- my $delete = $riak->delete('foo', 'bar')->recv;
+ if ( $riak->is_alive->recv ) {
+ my $buckets = $riak->list_bucket('bucketname')->recv;
+ my $new_bucket
+ = $riak->set_bucket( 'foo', { allowed_fields => '*' } )->recv;
+ my $store
+ = $riak->store(
+ { bucket => 'foo', key => 'bar', object => { baz => 1 }, link => [] }
+ )->recv;
+ my $fetch = $riak->fetch( 'foo', 'bar' )->recv;
+ my $delete = $riak->delete( 'foo', 'bar' )->recv;
+ }
=head1 DESCRIPTION
-AnyEvent::Riak is a non-blocking riak client using anyevent.
+AnyEvent::Riak is a non-blocking riak client using C<AnyEvent>. This client allows you to connect to a Riak instance, create, modify and delete Riak objects.
=head2 METHODS
=over 4
+=item B<is_alive>
+
+Check if the Riak server is alive.
+
=item B<list_bucket>
Get the schema and key list for 'bucket'
@@ -204,8 +291,8 @@ Store 'object' in Riak. If the object has not defined its 'key' field, a key
will be chosen for it by the server.
$riak->store({
- bucket => 'bucketname',
- key => 'key',
+ bucket => 'bucketname',
+ key => 'key',
object => { foo => "bar", baz => 2 },
links => [],
})->recv;
@@ -223,10 +310,10 @@ The 'spec' parameter should be an array of hashes, each hash optinally
defining 'bucket', 'tag', and 'acc' fields. If a field is not defined in a
spec hash, the wildcard '_' will be used instead.
- ok $res = $jiak->walk(
- 'bucketname',
- 'key',
- [ { bucket => 'bucketname', key => '_' } ]
+ ok $res = $jiak->walk(
+ 'bucketname',
+ 'key',
+ [ { bucket => 'bucketname', key => '_' } ]
)->recv;
=back
Please sign in to comment.
Something went wrong with that request. Please try again.