Skip to content
Browse files

can use callback

  • Loading branch information...
1 parent a7b240d commit 0ed18bf65a12003c60b4471d1436a061f86a0d43 franck cuny committed Apr 10, 2010
Showing with 175 additions and 137 deletions.
  1. +175 −137 lib/AnyEvent/Riak.pm
View
312 lib/AnyEvent/Riak.pm
@@ -37,46 +37,16 @@ sub new {
}, $class;
}
-# sub cvcb {
-# my ( $self, $options, $status, $is_json ) = @_;
-# $is_json ||= 0;
-# $status ||= 200;
-# my $cv = AnyEvent->condvar;
-
-# my $success = sub {
-# my ($resp) = @_;
-# $cv->send($resp);
-# };
-
-# my $error = sub {
-# my ( $headers, $resp ) = @_;
-# $cv->croak( [ $headers, $resp ] );
-# };
-
-# my $cb = sub {
-# my ( $body, $headers ) = @_;
-# my $response;
-# if ($is_json) {
-# eval { $response = JSON::decode_json($body); };
-# if ( !$response ) {
-# $cv->croak( [ 'decode_error', $@, $body, $headers ] );
-# }
-# }
-# else {
-# $response = $body;
-# }
-# if ( $headers->{Status} eq $status ) {
-# $success->($response);
-# }
-# else {
-# $error->( $headers, $response );
-# }
-# };
-# return ( $cv, $cb );
-# }
+sub _build_uri {
+ my ( $self, $path, $query ) = @_;
+ my $uri = URI->new( $self->{host} );
+ $uri->path( join( "/", @$path ) );
+ $uri->query_form(%$query) if $query;
+ return $uri->as_string;
+}
sub _build_headers {
- my ( $self, ) = @_;
+ my ( $self, $options) = @_;
my $headers = {};
$headers = {
'Content-Type' => 'application/json',
@@ -85,14 +55,82 @@ sub _build_headers {
return $headers;
}
+sub _init_callback {
+ my $self = shift;
+ $self->all_cv->begin();
+
+ my ( $cv, $cb );
+ if (@_) {
+ $cv = pop if UNIVERSAL::isa( $_[-1], 'AnyEvent::CondVar' );
+ $cb = pop if ref $_[-1] eq 'CODE';
+ }
+ $cv ||= AE::cv;
+
+ $cv->cb(
+ sub {
+ my $cv = shift;
+ my $res = $cv->recv;
+ $cb->($res);
+ }
+ ) if $cb;
+
+ return ( $cv, $cb );
+}
+
+sub all_cv {
+ my $self = shift;
+ $self->{all_cv} = AE::cv unless $self->{all_cv};
+ return $self->{all_cv};
+}
+
+sub default_cb {
+ my ( $self, $options ) = @_;
+ return sub {
+ my ( $body, $headers ) = @_;
+ my $status = 200;
+ if ( $headers->{Status} == $status ) {
+ if ( $options->{json} ) {
+ return JSON::decode_json( $_[0] );
+ }
+ else {
+ return $_[0];
+ }
+ }
+ };
+}
+
sub is_alive {
my $self = shift;
- $self->_request( 'GET', $self->_build_uri( [qw/ping/] ), 200 );
+
+ my ( $cv, $cb ) = $self->_init_callback(@_);
+ $cb = $self->default_cb( { json => 0 } ) if !$cb;
+
+ http_request(
+ GET => $self->_build_uri( [qw/ping/] ),
+ headers => { 'Content-Type' => 'application/json', },
+ sub {
+ $cv->send( $cb->(@_) );
+ },
+ );
+ return $cv;
}
sub list_bucket {
- my ( $self, $bucket_name, $options ) = @_;
- $self->_request('GET', $self->_build_uri([$self->{path}, $bucket_name]), 200);
+ my $self = shift;
+ my $bucket_name = shift;
+ my $options = shift;
+
+ my ( $cv, $cb ) = $self->_init_callback(@_);
+ $cb = $self->default_cb( { json => 1 } ) if !$cb;
+
+ http_request(
+ GET => $self->_build_uri( [ $self->{path}, $bucket_name ] ),
+ headers => { 'Content-Type' => 'application/json', },
+ sub {
+ $cv->send( $cb->(@_) );
+ }
+ );
+ return $cv;
}
sub set_bucket {
@@ -111,118 +149,118 @@ sub set_bucket {
$schema->{write_mask} = $schema->{read_mask};
}
- $self->_request(
- 'PUT', $self->_build_uri( [$self->{path}, $bucket] ),
- '204', encode_json { schema => $schema }
- );
}
-sub fetch {
- my ( $self, $bucket, $key, $r ) = @_;
- $r = $self->{r} || 2 if !$r;
- return $self->_request( 'GET',
- $self->_build_uri( [ $bucket, $key ], { r => $r } ), '200' );
-}
+# sub fetch {
+# my ( $self, $bucket, $key, $r ) = @_;
+# $r = $self->{r} || 2 if !$r;
+# return $self->_request( 'GET',
+# $self->_build_uri( [ $bucket, $key ], { r => $r } ), '200' );
+# }
-sub store {
- my ( $self, $object, $w, $dw, ) = @_;
+# sub store {
+# my ( $self, $object, $w, $dw, ) = @_;
- $w = $self->{w} || 2 if !$w;
- $dw = $self->{dw} || 2 if !$dw;
+# $w = $self->{w} || 2 if !$w;
+# $dw = $self->{dw} || 2 if !$dw;
- my $bucket = $object->{bucket};
- my $key = $object->{key};
- $object->{links} = [] if !exists $object->{links};
+# my $bucket = $object->{bucket};
+# my $key = $object->{key};
+# $object->{links} = [] if !exists $object->{links};
- return $self->_request(
- 'PUT',
- $self->_build_uri(
- [ $bucket, $key ],
- {
- w => $w,
- dw => $dw,
- returnbody => 'true'
- }
- ),
- '200',
- encode_json $object);
-}
+# return $self->_request(
+# 'PUT',
+# $self->_build_uri(
+# [ $bucket, $key ],
+# {
+# w => $w,
+# dw => $dw,
+# returnbody => 'true'
+# }
+# ),
+# '200',
+# encode_json $object);
+# }
-sub delete {
- my ( $self, $bucket, $key, $rw ) = @_;
+# sub delete {
+# my ( $self, $bucket, $key, $rw ) = @_;
- $rw = $self->{rw} || 2 if !$rw;
- return $self->_request( 'DELETE',
- $self->_build_uri( [ $bucket, $key ], { dw => $rw } ), 204 );
-}
+# $rw = $self->{rw} || 2 if !$rw;
+# return $self->_request( 'DELETE',
+# $self->_build_uri( [ $bucket, $key ], { dw => $rw } ), 204 );
+# }
-sub walk {
- my ( $self, $bucket, $key, $spec ) = @_;
- my $path = $self->_build_uri( [ $bucket, $key ] );
- $path .= $self->_build_spec($spec);
- return $self->_request( 'GET', $path, 200 );
-}
+# sub walk {
+# my ( $self, $bucket, $key, $spec ) = @_;
+# my $path = $self->_build_uri( [ $bucket, $key ] );
+# $path .= $self->_build_spec($spec);
+# return $self->_request( 'GET', $path, 200 );
+# }
-sub _build_spec {
- my ( $self, $spec ) = @_;
- my $acc = '/';
- foreach my $item (@$spec) {
- $acc
- .= ( $item->{bucket} || '_' ) . ','
- . ( $item->{tag} || '_' ) . ','
- . ( $item->{acc} || '_' ) . '/';
- }
- return $acc;
-}
+# sub _build_spec {
+# my ( $self, $spec ) = @_;
+# my $acc = '/';
+# foreach my $item (@$spec) {
+# $acc
+# .= ( $item->{bucket} || '_' ) . ','
+# . ( $item->{tag} || '_' ) . ','
+# . ( $item->{acc} || '_' ) . '/';
+# }
+# return $acc;
+# }
-sub _build_uri {
- my ( $self, $path, $query ) = @_;
- my $uri = URI->new( $self->{host} );
- $uri->path( join( "/", @$path ) );
- $uri->query_form(%$query) if $query;
- return $uri->as_string;
-}
-sub _build_query {
- my ($self, $options) = @_;
+# sub _build_query {
+# my ($self, $options) = @_;
+# }
+
+# sub _request {
+# my ( $self, $method, $uri, $expected, $body ) = @_;
+# my $cv = AnyEvent->condvar;
+# my $cb = sub {
+# my ( $body, $headers ) = @_;
+# if ( $headers->{Status} == $expected ) {
+# if ( $body && $headers->{'content-type'} eq 'application/json' ) {
+# return $cv->send( decode_json($body) );
+# }
+# else {
+# return $cv->send(1);
+# }
+# }
+# else {
+# return $cv->croak(
+# encode_json( [ $headers->{Status}, $headers->{Reason} ] ) );
+# }
+# };
+
+# if ($body) {
+# http_request(
+# $method => $uri,
+# headers => { 'Content-Type' => 'application/json', },
+# body => $body,
+# $cb
+# );
+# }
+# else {
+# http_request(
+# $method => $uri,
+# headers => { 'Content-Type' => 'application/json', },
+# $cb
+# );
+# }
+# $cv;
+# }
+
+sub head {
}
+sub get {
+}
-sub _request {
- my ( $self, $method, $uri, $expected, $body ) = @_;
- my $cv = AnyEvent->condvar;
- my $cb = sub {
- my ( $body, $headers ) = @_;
- if ( $headers->{Status} == $expected ) {
- if ( $body && $headers->{'content-type'} eq 'application/json' ) {
- return $cv->send( decode_json($body) );
- }
- else {
- return $cv->send(1);
- }
- }
- else {
- return $cv->croak(
- encode_json( [ $headers->{Status}, $headers->{Reason} ] ) );
- }
- };
+sub put {
+}
- if ($body) {
- http_request(
- $method => $uri,
- headers => { 'Content-Type' => 'application/json', },
- body => $body,
- $cb
- );
- }
- else {
- http_request(
- $method => $uri,
- headers => { 'Content-Type' => 'application/json', },
- $cb
- );
- }
- $cv;
+sub post {
}
1;

0 comments on commit 0ed18bf

Please sign in to comment.
Something went wrong with that request. Please try again.