Skip to content
Browse files

Remove last of `operation.h` code in Iterator

  • Loading branch information...
1 parent 15a1d92 commit 8438dc8124d729311c693a43db6f262976acadeb @mikepb mikepb committed
Showing with 189 additions and 273 deletions.
  1. +19 −23 src/coffee/leveldb/iterator.coffee
  2. +159 −82 src/cpp/iterator.cc
  3. +11 −39 src/cpp/iterator.h
  4. +0 −128 src/cpp/operation.h
  5. +0 −1 wscript
View
42 src/coffee/leveldb/iterator.coffee
@@ -29,6 +29,7 @@ binding = require '../leveldb.node'
exports.Iterator = class Iterator
busy = false
+ validKey = false
lock = ->
throw new Error 'Concurrent operations not supported' if busy
@@ -38,6 +39,11 @@ exports.Iterator = class Iterator
throw new Error 'Not locked' unless busy
busy = false
+ afterSeek = (callback) -> (err, valid) ->
+ unlock()
+ validKey = valid
+ callback err if callback
+
###
Constructor.
@@ -115,11 +121,7 @@ exports.Iterator = class Iterator
###
- valid: ->
- lock()
- answer = @self.valid()
- unlock()
- answer
+ valid: -> validKey
###
@@ -135,9 +137,7 @@ exports.Iterator = class Iterator
seek: (key, callback) ->
key = new Buffer key unless Buffer.isBuffer key
lock()
- @self.seek key, (err) ->
- unlock()
- callback err if callback
+ @self.seek key, afterSeek callback
###
@@ -151,9 +151,7 @@ exports.Iterator = class Iterator
first: (callback) ->
lock()
- @self.first (err) ->
- unlock()
- callback err if callback
+ @self.first afterSeek callback
###
@@ -167,9 +165,7 @@ exports.Iterator = class Iterator
last: (callback) ->
lock()
- @self.last (err) ->
- unlock()
- callback err if callback
+ @self.last afterSeek callback
###
@@ -182,10 +178,9 @@ exports.Iterator = class Iterator
###
next: (callback) ->
+ throw new Error 'Illegal state' unless validKey
lock()
- @self.next (err) ->
- unlock()
- callback err if callback
+ @self.next afterSeek callback
###
@@ -198,10 +193,9 @@ exports.Iterator = class Iterator
###
prev: (callback) ->
+ throw new Error 'Illegal state' unless validKey
lock()
- @self.prev (err) ->
- unlock()
- callback err if callback
+ @self.prev afterSeek callback
###
@@ -218,6 +212,7 @@ exports.Iterator = class Iterator
###
key: (options = {}, callback) ->
+ throw new Error 'Illegal state' unless validKey
# optional options
if typeof options is 'function'
@@ -226,7 +221,8 @@ exports.Iterator = class Iterator
throw new Error 'Missing callback' unless callback
- # async
+ return callback() unless validKey
+
lock()
@self.key (err, key) ->
unlock()
@@ -272,7 +268,6 @@ exports.Iterator = class Iterator
###
current: (options = {}, callback) ->
-
# optional options
if typeof options is 'function'
callback = options
@@ -280,7 +275,8 @@ exports.Iterator = class Iterator
throw new Error 'Missing callback' unless callback
- # async
+ return callback() unless validKey
+
lock()
@self.current (err, kv) ->
unlock()
View
241 src/cpp/iterator.cc
@@ -1,5 +1,4 @@
#include <assert.h>
-#include <pthread.h>
#include <leveldb/iterator.h>
#include <node.h>
@@ -8,7 +7,6 @@
#include "handle.h"
#include "helpers.h"
#include "iterator.h"
-#include "operation.h"
namespace node_leveldb {
@@ -22,7 +20,6 @@ void JIterator::Initialize(Handle<Object> target) {
constructor->InstanceTemplate()->SetInternalFieldCount(1);
constructor->SetClassName(String::NewSymbol("Iterator"));
- NODE_SET_PROTOTYPE_METHOD(constructor, "valid", Valid);
NODE_SET_PROTOTYPE_METHOD(constructor, "first", First);
NODE_SET_PROTOTYPE_METHOD(constructor, "last", Last);
NODE_SET_PROTOTYPE_METHOD(constructor, "seek", Seek);
@@ -49,12 +46,89 @@ Handle<Value> JIterator::New(const Arguments& args) {
return args.This();
}
-Handle<Value> JIterator::Valid(const Arguments& args) {
+
+
+
+
+class JIterator::iter_params {
+ public:
+ iter_params(const Arguments& args) : valid_(false) {
+ self_ = ObjectWrap::Unwrap<JIterator>(args.This());
+ self_->Ref();
+ assert(args[args.Length() - 1]->IsFunction());
+ Local<Function> fn = Local<Function>::Cast(args[args.Length() - 1]);
+ callback_ = Persistent<Function>::New(fn);
+ }
+
+ virtual ~iter_params() {
+ self_->Unref();
+ callback_.Dispose();
+ }
+
+ virtual void Result(Handle<Value>& result) {
+ result = valid_ ? True() : False();
+ }
+
+ JIterator* self_;
+ leveldb::Status status_;
+ Persistent<Function> callback_;
+ bool valid_;
+};
+
+class JIterator::seek_params : public iter_params {
+ public:
+ seek_params(const Arguments& args) : iter_params(args) {}
+ ~seek_params() { keyHandle_.Dispose(); }
+ leveldb::Slice key_;
+ Persistent<Value> keyHandle_;
+};
+
+class JIterator::kv_params : public iter_params {
+ public:
+ kv_params(const Arguments& args) : iter_params(args) {}
+
+ virtual void Result(Handle<Value>& result) {
+ if (!value_.empty()) {
+ Handle<Array> array = Array::New(2);
+ array->Set(0, ToBuffer(key_));
+ array->Set(1, ToBuffer(value_));
+ result = array;
+ } else {
+ result = ToBuffer(key_);
+ }
+ }
+
+ leveldb::Slice key_;
+ leveldb::Slice value_;
+};
+
+void JIterator::AfterAsync(uv_work_t* req) {
HandleScope scope;
- JIterator* self = ObjectWrap::Unwrap<JIterator>(args.This());
- return self->it_ != NULL && self->it_->Valid() ? True() : False();
+ iter_params* op = static_cast<iter_params*>(req->data);
+ assert(!op->callback_.IsEmpty());
+
+ Handle<Value> error = Null();
+ Handle<Value> result = Null();
+
+ if (!op->status_.ok()) {
+ error = Exception::Error(String::New(op->status_.ToString().c_str()));
+ } else {
+ op->Result(result);
+ }
+
+ TryCatch tryCatch;
+ Handle<Value> args[] = { error, result };
+ op->callback_->Call(Context::GetCurrent()->Global(), 2, args);
+ if (tryCatch.HasCaught()) FatalException(tryCatch);
+
+ delete op;
+ delete req;
}
+
+
+
+
Handle<Value> JIterator::Seek(const Arguments& args) {
HandleScope scope;
@@ -62,114 +136,117 @@ Handle<Value> JIterator::Seek(const Arguments& args) {
if (args.Length() < 1 || !Buffer::HasInstance(args[0]))
return ThrowTypeError("Invalid arguments");
- Op* op = Op::New(Seek, Conv, args);
+ seek_params* op = new seek_params(args);
op->key_ = ToSlice(args[0], op->keyHandle_);
- return op->Run();
+ AsyncQueue(op, AsyncSeek, AfterAsync);
+ return Undefined();
}
-Handle<Value> JIterator::First(const Arguments& args) {
- return Op::Go(First, Conv, args);
+void JIterator::AsyncSeek(uv_work_t* req) {
+ seek_params* op = static_cast<seek_params*>(req->data);
+ op->self_->it_->Seek(op->key_);
+ op->status_ = op->self_->it_->status();
+ op->valid_ = op->self_->it_->Valid();
}
-Handle<Value> JIterator::Last(const Arguments& args) {
- return Op::Go(Last, Conv, args);
-}
-Handle<Value> JIterator::Next(const Arguments& args) {
- return Op::Go(Next, Conv, args);
+
+
+
+Handle<Value> JIterator::First(const Arguments& args) {
+ iter_params* op = new iter_params(args);
+ AsyncQueue(op, AsyncFirst, AfterAsync);
+ return Undefined();
}
-Handle<Value> JIterator::Prev(const Arguments& args) {
- return Op::Go(Prev, Conv, args);
+void JIterator::AsyncFirst(uv_work_t* req) {
+ iter_params* op = static_cast<iter_params*>(req->data);
+ op->self_->it_->SeekToFirst();
+ op->status_ = op->self_->it_->status();
+ op->valid_ = op->self_->it_->Valid();
}
-Handle<Value> JIterator::GetKey(const Arguments& args) {
- return Op::Go(GetKey, Conv, args);
+
+
+
+
+Handle<Value> JIterator::Last(const Arguments& args) {
+ iter_params* op = new iter_params(args);
+ AsyncQueue(op, AsyncLast, AfterAsync);
+ return Undefined();
}
-Handle<Value> JIterator::GetKeyValue(const Arguments& args) {
- return Op::Go(GetKeyValue, Conv, args);
+void JIterator::AsyncLast(uv_work_t* req) {
+ iter_params* op = static_cast<iter_params*>(req->data);
+ op->self_->it_->SeekToLast();
+ op->status_ = op->self_->it_->status();
+ op->valid_ = op->self_->it_->Valid();
}
-//
-// ASYNC FUNCTIONS
-//
-void JIterator::Seek(Op* op) {
- leveldb::Iterator* it = op->self_->it_;
- it->Seek(op->key_);
- op->status_ = it->status();
+
+Handle<Value> JIterator::Next(const Arguments& args) {
+ iter_params* op = new iter_params(args);
+ AsyncQueue(op, AsyncNext, AfterAsync);
+ return Undefined();
}
-void JIterator::First(Op* op) {
- leveldb::Iterator* it = op->self_->it_;
- it->SeekToFirst();
- op->status_ = it->status();
+void JIterator::AsyncNext(uv_work_t* req) {
+ iter_params* op = static_cast<iter_params*>(req->data);
+ op->self_->it_->Next();
+ op->status_ = op->self_->it_->status();
+ op->valid_ = op->self_->it_->Valid();
}
-void JIterator::Last(Op* op) {
- leveldb::Iterator* it = op->self_->it_;
- it->SeekToLast();
- op->status_ = it->status();
+
+
+
+
+Handle<Value> JIterator::Prev(const Arguments& args) {
+ iter_params* op = new iter_params(args);
+ AsyncQueue(op, AsyncPrev, AfterAsync);
+ return Undefined();
}
-void JIterator::Next(Op* op) {
- leveldb::Iterator* it = op->self_->it_;
- if (it->Valid()) {
- it->Next();
- op->status_ = it->status();
- } else {
- op->invalidState_ = true;
- }
+void JIterator::AsyncPrev(uv_work_t* req) {
+ iter_params* op = static_cast<iter_params*>(req->data);
+ op->self_->it_->Prev();
+ op->status_ = op->self_->it_->status();
+ op->valid_ = op->self_->it_->Valid();
}
-void JIterator::Prev(Op* op) {
- leveldb::Iterator* it = op->self_->it_;
- if (it->Valid()) {
- it->Prev();
- op->status_ = it->status();
- } else {
- op->invalidState_ = true;
- }
+
+
+
+
+Handle<Value> JIterator::GetKey(const Arguments& args) {
+ kv_params* op = new kv_params(args);
+ AsyncQueue(op, AsyncGetKey, AfterAsync);
+ return Undefined();
}
-void JIterator::GetKey(Op* op) {
- leveldb::Iterator* it = op->self_->it_;
- if (it->Valid()) {
- op->value_ = it->key();
- } else {
- op->status_ = it->status();
- }
+void JIterator::AsyncGetKey(uv_work_t* req) {
+ kv_params* op = static_cast<kv_params*>(req->data);
+ op->key_ = op->self_->it_->key();
}
-void JIterator::GetKeyValue(Op* op) {
- leveldb::Iterator* it = op->self_->it_;
- if (it->Valid()) {
- op->key_ = it->key();
- op->value_ = it->value();
- } else {
- op->status_ = it->status();
- }
+
+
+
+
+Handle<Value> JIterator::GetKeyValue(const Arguments& args) {
+ kv_params* op = new kv_params(args);
+ AsyncQueue(op, AsyncGetKeyValue, AfterAsync);
+ return Undefined();
}
-void JIterator::Conv(Op* op, Handle<Value>& error, Handle<Value>& result) {
- if (op->invalidState_) {
- error = Exception::Error(String::New("Illegal state"));
- } else if (!op->status_.ok()) {
- error = Exception::Error(String::New(op->status_.ToString().c_str()));
- } else if (!op->value_.empty()) {
- if (!op->key_.empty()) {
- Local<Array> array = Array::New(2);
- array->Set(0, ToBuffer(op->key_));
- array->Set(1, ToBuffer(op->value_));
- result = array;
- } else {
- result = ToBuffer(op->value_);
- }
- }
+void JIterator::AsyncGetKeyValue(uv_work_t* req) {
+ kv_params* op = static_cast<kv_params*>(req->data);
+ op->key_ = op->self_->it_->key();
+ op->value_ = op->self_->it_->value();
}
} // node_leveldb
View
50 src/cpp/iterator.h
@@ -3,14 +3,12 @@
#include <assert.h>
#include <errno.h>
-#include <pthread.h>
#include <leveldb/iterator.h>
#include <node.h>
#include <v8.h>
#include "helpers.h"
-#include "operation.h"
using namespace v8;
using namespace node;
@@ -33,7 +31,6 @@ class JIterator : ObjectWrap {
static void Initialize(Handle<Object> target);
static Handle<Value> New(const Arguments& args);
- static Handle<Value> Valid(const Arguments& args);
static Handle<Value> Seek(const Arguments& args);
static Handle<Value> First(const Arguments& args);
static Handle<Value> Last(const Arguments& args);
@@ -53,47 +50,22 @@ class JIterator : ObjectWrap {
JIterator(const JIterator&);
void operator=(const JIterator&);
- class Op;
- class Op : public Operation<Op> {
- public:
+ class iter_params;
+ class seek_params;
+ class kv_params;
- inline Op(const ExecFunction exec, const ConvFunction conv,
- Handle<Object>& handle, Handle<Function>& callback)
- : Operation<Op>(exec, conv, handle, callback), invalidState_(0)
- {
- self_ = ObjectWrap::Unwrap<JIterator>(handle);
- self_->Ref();
- }
-
- virtual ~Op() {
- self_->Unref();
- keyHandle_.Dispose();
- }
-
- JIterator* self_;
-
- leveldb::Status status_;
- leveldb::Slice key_;
- leveldb::Slice value_;
-
- Persistent<Value> keyHandle_;
-
- bool invalidState_;
- };
-
- static void Seek(Op* op);
- static void First(Op* op);
- static void Last(Op* op);
- static void Next(Op* op);
- static void Prev(Op* op);
+ static void AsyncSeek(uv_work_t* req);
+ static void AsyncFirst(uv_work_t* req);
+ static void AsyncLast(uv_work_t* req);
+ static void AsyncNext(uv_work_t* req);
+ static void AsyncPrev(uv_work_t* req);
- static void GetKey(Op* op);
- static void GetKeyValue(Op* op);
+ static void AsyncGetKey(uv_work_t* req);
+ static void AsyncGetKeyValue(uv_work_t* req);
- static void Conv(Op* op, Handle<Value>& error, Handle<Value>& result);
+ static void AfterAsync(uv_work_t* req);
leveldb::Iterator* it_;
- pthread_mutex_t lock_;
};
} // node_leveldb
View
128 src/cpp/operation.h
@@ -1,128 +0,0 @@
-#ifndef NODE_LEVELDB_OP_H_
-#define NODE_LEVELDB_OP_H_
-
-#include <node.h>
-#include <v8.h>
-
-#include "node_async_shim.h"
-
-using namespace v8;
-using namespace node;
-
-namespace node_leveldb {
-
-template < class T > class Operation {
- public:
-
- typedef void (*ExecFunction)(T* op);
- typedef void (*ConvFunction)(
- T* op, Handle<Value>& error, Handle<Value>& result);
-
- inline Operation(const ExecFunction exec, const ConvFunction conv,
- Handle<Object>& self, Handle<Function>& callback)
- : exec_(exec), conv_(conv)
- {
- handle_ = Persistent<Object>::New(self);
- callback_ = Persistent<Function>::New(callback);
- }
-
- virtual ~Operation() {
- handle_.Dispose();
- callback_.Dispose();
- }
-
- virtual inline Handle<Value> BeforeRun() { return Handle<Value>(); }
- virtual inline void AfterExecute() {}
- virtual inline void BeforeReturn() {}
-
- inline Handle<Value> Run() {
- Handle<Value> error = BeforeRun();
-
- if (!error.IsEmpty()) {
-
- delete this;
- return error;
-
- } else if (callback_.IsEmpty()) {
-
- Handle<Value> result = Null();
-
- T* self = static_cast<T*>(this);
- exec_(self);
- AfterExecute();
-
- conv_(self, error, result);
- BeforeReturn();
-
- delete this;
-
- return error.IsEmpty() ? result : ThrowException(error);
-
- } else {
-
- BEGIN_ASYNC(this, Async, After);
- return Undefined();
-
- }
- }
-
- static inline Handle<Value> Go(const ExecFunction run,
- const ConvFunction conv,
- const Arguments& args)
- {
- HandleScope scope;
- return New(run, conv, args)->Run();
- }
-
- static inline T* New(const ExecFunction run,
- const ConvFunction conv,
- const Arguments& args)
- {
- Handle<Object> self = args.This();
- Handle<Function> callback = GetCallback(args);
- return new T(run, conv, self, callback);
- }
-
- static async_rtn Async(uv_work_t* req) {
- Operation* op = static_cast<Operation*>(req->data);
- T* self = static_cast<T*>(req->data);
- op->exec_(self);
- op->AfterExecute();
- RETURN_ASYNC;
- }
-
- static async_rtn After(uv_work_t* req) {
- HandleScope scope;
-
- Handle<Value> error = Null();
- Handle<Value> result = Null();
-
- Operation* op = static_cast<Operation*>(req->data);
- T* self = static_cast<T*>(req->data);
-
- op->conv_(self, error, result);
- op->BeforeReturn();
-
- TryCatch tryCatch;
-
- assert(!op->callback_.IsEmpty());
- Handle<Value> argv[] = { error, result };
- op->callback_->Call(Context::GetCurrent()->Global(), 2, argv);
-
- if (tryCatch.HasCaught()) FatalException(tryCatch);
-
- delete op;
-
- RETURN_ASYNC_AFTER;
- }
-
- ExecFunction exec_;
- ConvFunction conv_;
-
- Persistent<Object> handle_;
- Persistent<Function> callback_;
-};
-
-} // node_leveldb
-
-#endif // NODE_LEVELDB_OP_H_
View
1 wscript
@@ -110,7 +110,6 @@ def build(bld):
node_leveldb.source = snappy_src + leveldb_src + node_leveldb_src
node_leveldb.name = "node_leveldb"
node_leveldb.target = "leveldb"
- node_leveldb.uselib = ["pthread"]
node_leveldb.includes = [snappy_dir, leveldb_dir, leveldb_dir + '/include']
node_leveldb.cxxflags = ['-Wall', '-O2', '-DSNAPPY'] + bld.env.PORT_CFLAGS.split(' ') + bld.env.PLATFORM_CFLAGS.split(' ')
bld.add_post_fun(build_post)

0 comments on commit 8438dc8

Please sign in to comment.
Something went wrong with that request. Please try again.