Permalink
Browse files

Implementing Put, Del, and Write async methods

  • Loading branch information...
1 parent f538728 commit 42228bd10b9b0eb2f895bd134365b700ae6fdde7 @carter-thaxton carter-thaxton committed Jun 7, 2011
Showing with 226 additions and 72 deletions.
  1. +18 −8 demo/async.js
  2. +1 −1 leveldb.js
  3. +186 −58 src/DB.cc
  4. +20 −5 src/DB.h
  5. +1 −0 src/WriteBatch.cc
View
@@ -11,19 +11,29 @@ var db = new DB();
console.log("Opening...");
db.open(path, {create_if_missing: true, paranoid_checks: true}, function(err) {
if (err) throw err;
- else console.log("ok");
+ console.log("ok");
- // Putting TODO: async
+ // Putting
console.log("\nPutting...");
var key = new Buffer("Hello");
var value = new Buffer("World");
- var status = db.put({}, key, value);
- console.log(status);
-
- console.log("\nClosing...")
- db.close(function(err) {
+ db.put(key, value, function(err) {
if (err) throw err;
- console.log('ok');
+ console.log("ok");
+
+ // Deleting
+ console.log("\nDeleting...");
+ db.del(key, function(err) {
+ if (err) throw err;
+ console.log("ok");
+
+ // Closing
+ console.log("\nClosing...")
+ db.close(function(err) {
+ if (err) throw err;
+ console.log('ok');
+ });
+ });
});
});
View
@@ -82,7 +82,7 @@ db.put(key, value, options, function (err) { /*...*/ });
db.del(key, options, function (err) { /*...*/ });
// Write a batch of updates in one call
-db.write(updates, options, function (err) { /*...*/ });
+db.write(writeBatch, options, function (err) { /*...*/ });
// Getting a value from the database
db.get(key, options, function (err, value) { /*...*/ });
View
@@ -3,6 +3,7 @@
#include <node_buffer.h>
#include "helpers.h"
+#include <iostream>
using namespace node_leveldb;
@@ -108,7 +109,7 @@ int DB::EIO_Open(eio_req *req) {
DB *self = params->self;
// Close old DB, if open() is called more than once
- if (self->db) {
+ if (self->db != NULL) {
delete self->db;
self->db = NULL;
}
@@ -135,6 +136,7 @@ int DB::EIO_AfterOpen(eio_req *req) {
Handle<Value> DB::Close(const Arguments& args) {
HandleScope scope;
+ // Get this and arguments
DB* self = ObjectWrap::Unwrap<DB>(args.This());
// Optional callback
@@ -157,7 +159,7 @@ int DB::EIO_Close(eio_req *req) {
Params *params = static_cast<Params*>(req->data);
DB *self = params->self;
- if (self->db) {
+ if (self->db != NULL) {
delete self->db;
self->db = NULL;
}
@@ -175,76 +177,183 @@ int DB::EIO_AfterClose(eio_req *req) {
//
-// DestroyDB
+// Put
//
-Handle<Value> DB::DestroyDB(const Arguments& args) {
+Handle<Value> DB::Put(const Arguments& args) {
HandleScope scope;
-
+
+ // Get this and arguments
+ DB* self = ObjectWrap::Unwrap<DB>(args.This());
+ if (self->db == NULL) {
+ return ThrowException(Exception::Error(String::New("DB has not been opened")));
+ }
+
// Check args
- if (!(args.Length() == 2 && args[0]->IsString() && args[1]->IsObject())) {
- return ThrowException(Exception::TypeError(String::New("Invalid arguments: Expected (String, Object)")));
+ if (args.Length() < 2 || !Buffer::HasInstance(args[0]) || !Buffer::HasInstance(args[1])) {
+ return ThrowException(Exception::TypeError(String::New("Invalid arguments: Expected (Buffer, Buffer)")));
}
- String::Utf8Value name(args[0]);
- leveldb::Options options = JsToOptions(args[1]);
-
- return processStatus(leveldb::DestroyDB(*name, options));
-}
+ leveldb::Slice key = JsToSlice(args[0]);
+ leveldb::Slice value = JsToSlice(args[1]);
+
+ leveldb::WriteBatch *writeBatch = new leveldb::WriteBatch();
+ writeBatch->Put(key, value);
-Handle<Value> DB::RepairDB(const Arguments& args) {
- HandleScope scope;
+ int pos = 2;
- // Check args
- if (!(args.Length() == 2 && args[0]->IsString() && args[1]->IsObject())) {
- return ThrowException(Exception::TypeError(String::New("Invalid arguments: Expected (String, Object)")));
+ // Optional write options
+ leveldb::WriteOptions options = leveldb::WriteOptions();
+ if (pos < args.Length() && args[pos]->IsObject() && !args[pos]->IsFunction()) {
+ options = JsToWriteOptions(args[pos]);
+ pos++;
}
- String::Utf8Value name(args[0]);
- leveldb::Options options = JsToOptions(args[1]);
+ // Optional callback
+ Local<Function> callback;
+ if (pos < args.Length() && args[pos]->IsFunction()) {
+ callback = Local<Function>::Cast(args[pos]);
+ pos++;
+ }
+
+ // Pass parameters to async function
+ WriteParams *params = new WriteParams(self, writeBatch, options, callback);
+ params->disposeWriteBatch = true;
+
+ // Use Write to implement Put
+ EIO_BeforeWrite(params);
- return processStatus(leveldb::RepairDB(*name, options));
+ return args.This();
}
-Handle<Value> DB::Put(const Arguments& args) {
- HandleScope scope;
- // Check args
- if (!(args.Length() == 3 && args[0]->IsObject() && Buffer::HasInstance(args[1]) && Buffer::HasInstance(args[2]))) {
- return ThrowException(Exception::TypeError(String::New("Invalid arguments: Expected (Object, Buffer, Buffer)")));
- }
+//
+// Del
+//
+Handle<Value> DB::Del(const Arguments& args) {
+ HandleScope scope;
+
+ // Get this and arguments
DB* self = ObjectWrap::Unwrap<DB>(args.This());
if (self->db == NULL) {
return ThrowException(Exception::Error(String::New("DB has not been opened")));
}
- leveldb::WriteOptions options = JsToWriteOptions(args[0]);
- leveldb::Slice key = JsToSlice(args[1]);
- leveldb::Slice value = JsToSlice(args[2]);
+ // Check args
+ if (args.Length() < 1 || !Buffer::HasInstance(args[0])) {
+ return ThrowException(Exception::TypeError(String::New("Invalid arguments: Expected (Buffer)")));
+ }
- return processStatus(self->db->Put(options, key, value));
-}
+ leveldb::Slice key = JsToSlice(args[0]);
+
+ leveldb::WriteBatch *writeBatch = new leveldb::WriteBatch();
+ writeBatch->Delete(key);
-Handle<Value> DB::Del(const Arguments& args) {
- HandleScope scope;
+ int pos = 1;
- // Check args
- if (!(args.Length() == 2 && args[0]->IsObject() && Buffer::HasInstance(args[1]))) {
- return ThrowException(Exception::TypeError(String::New("Invalid arguments: Expected (Object, Buffer)")));
+ // Optional write options
+ leveldb::WriteOptions options = leveldb::WriteOptions();
+ if (pos < args.Length() && args[pos]->IsObject() && !args[pos]->IsFunction()) {
+ options = JsToWriteOptions(args[pos]);
+ pos++;
}
+ // Optional callback
+ Local<Function> callback;
+ if (pos < args.Length() && args[pos]->IsFunction()) {
+ callback = Local<Function>::Cast(args[pos]);
+ pos++;
+ }
+
+ // Pass parameters to async function
+ WriteParams *params = new WriteParams(self, writeBatch, options, callback);
+ params->disposeWriteBatch = true;
+
+ // Use Write to implement Del
+ EIO_BeforeWrite(params);
+
+ return args.This();
+}
+
+
+//
+// Write
+//
+
+Handle<Value> DB::Write(const Arguments& args) {
+ HandleScope scope;
+
+ // Get this and arguments
DB* self = ObjectWrap::Unwrap<DB>(args.This());
if (self->db == NULL) {
return ThrowException(Exception::Error(String::New("DB has not been opened")));
}
- leveldb::WriteOptions options = JsToWriteOptions(args[0]);
- leveldb::Slice key = JsToSlice(args[1]);
+ // Required WriteBatch
+ if (args.Length() < 1 || !args[0]->IsObject()) {
+ return ThrowException(Exception::TypeError(String::New("DB.write() expects a WriteBatch object")));
+ }
+ Local<Object> writeBatchObject = Object::Cast(*args[0]);
+ WriteBatch* writeBatchWrapper = ObjectWrap::Unwrap<WriteBatch>(writeBatchObject);
+ leveldb::WriteBatch* writeBatch = writeBatchWrapper->wb;
+
+ int pos = 1;
+
+ // Optional write options
+ leveldb::WriteOptions options = leveldb::WriteOptions();
+ if (pos < args.Length() && args[pos]->IsObject() && !args[pos]->IsFunction()) {
+ options = JsToWriteOptions(args[pos]);
+ pos++;
+ }
+
+ // Optional callback
+ Local<Function> callback;
+ if (pos < args.Length() && args[pos]->IsFunction()) {
+ callback = Local<Function>::Cast(args[pos]);
+ pos++;
+ }
+
+ // Pass parameters to async function
+ WriteParams *params = new WriteParams(self, writeBatch, options, callback);
+ EIO_BeforeWrite(params);
+
+ return args.This();
+}
+
+void DB::EIO_BeforeWrite(WriteParams *params) {
+ eio_custom(EIO_Write, EIO_PRI_DEFAULT, EIO_AfterWrite, params);
+}
+
+int DB::EIO_Write(eio_req *req) {
+ WriteParams *params = static_cast<WriteParams*>(req->data);
+ DB *self = params->self;
+
+ // Do the actual work
+ if (self->db != NULL) {
+ params->status = self->db->Write(params->options, params->writeBatch);
+ }
+
+ return 0;
+}
- return processStatus(self->db->Delete(options, key));
+int DB::EIO_AfterWrite(eio_req *req) {
+ WriteParams *params = static_cast<WriteParams*>(req->data);
+ params->Callback();
+
+ if (params->disposeWriteBatch) {
+ delete params->writeBatch;
+ }
+
+ delete params;
+ return 0;
}
+
+//
+// Get
+//
+
Handle<Value> DB::Get(const Arguments& args) {
HandleScope scope;
@@ -277,25 +386,6 @@ Handle<Value> DB::Get(const Arguments& args) {
return scope.Close(actualBuffer);
}
-Handle<Value> DB::Write(const Arguments& args) {
- HandleScope scope;
-
- if (!(args.Length() == 2 && args[0]->IsObject() && args[1]->IsObject())) {
- return ThrowException(Exception::TypeError(String::New("Invalid arguments: Expected (Object, WriteBatch)")));
- }
-
- DB* self = ObjectWrap::Unwrap<DB>(args.This());
- if (self->db == NULL) {
- return ThrowException(Exception::Error(String::New("DB has not been opened")));
- }
-
- leveldb::WriteOptions options = JsToWriteOptions(args[0]);
- Local<Object> wbObject = Object::Cast(*args[1]);
-
- WriteBatch* wb = ObjectWrap::Unwrap<WriteBatch>(wbObject);
-
- return processStatus(self->db->Write(options, wb->wb));
-}
Handle<Value> DB::NewIterator(const Arguments& args) {
HandleScope scope;
@@ -324,6 +414,44 @@ Handle<Value> DB::GetApproximateSizes(const Arguments& args) {
//
+// DestroyDB
+//
+
+Handle<Value> DB::DestroyDB(const Arguments& args) {
+ HandleScope scope;
+
+ // Check args
+ if (!(args.Length() == 2 && args[0]->IsString() && args[1]->IsObject())) {
+ return ThrowException(Exception::TypeError(String::New("Invalid arguments: Expected (String, Object)")));
+ }
+
+ String::Utf8Value name(args[0]);
+ leveldb::Options options = JsToOptions(args[1]);
+
+ return processStatus(leveldb::DestroyDB(*name, options));
+}
+
+
+//
+// RepairDB
+//
+
+Handle<Value> DB::RepairDB(const Arguments& args) {
+ HandleScope scope;
+
+ // Check args
+ if (!(args.Length() == 2 && args[0]->IsString() && args[1]->IsObject())) {
+ return ThrowException(Exception::TypeError(String::New("Invalid arguments: Expected (String, Object)")));
+ }
+
+ String::Utf8Value name(args[0]);
+ leveldb::Options options = JsToOptions(args[1]);
+
+ return processStatus(leveldb::RepairDB(*name, options));
+}
+
+
+//
// Implementation of Params, which are passed from JS thread to EIO thread and back again
//
Oops, something went wrong. Retry.

0 comments on commit 42228bd

Please sign in to comment.