Browse files

make sure closes happen after any remaing IO has happened

  • Loading branch information...
1 parent 2a853ae commit 1ef9bfd9c66ce15438ff1359534ce8ddb6594ec9 @orlandov committed Jan 31, 2010
Showing with 46 additions and 43 deletions.
  1. +4 −5 lib/mongodb.js
  2. +42 −38 src/mongo.cc
View
9 lib/mongodb.js
@@ -58,7 +58,8 @@ Collection.prototype.count = function(query) {
}
function MongoDB() {
- this.connection = new mongo.Connection;
+ this.myID = Math.random();
+ this.connection = new mongo.Connection();
var self = this;
@@ -71,10 +72,11 @@ function MongoDB() {
});
this.connection.addListener("connection", function () {
- self.emit("connection");
+ self.emit("connection", self);
});
this.connection.addListener("result", function(result) {
+
var promise = self.currentQuery[0];
promise.emitSuccess(result);
self.currentQuery = null;
@@ -84,8 +86,6 @@ function MongoDB() {
sys.inherits(MongoDB, process.EventEmitter);
MongoDB.prototype.connect = function(args) {
- self = this;
-
this.queries = [];
this.hostname = args.hostname || "127.0.0.1";
this.port = args.port || 27017;
@@ -109,7 +109,6 @@ MongoDB.prototype.addQuery = function(promise, ns, query, fields, limit, skip )
MongoDB.prototype.dispatch = function() {
if (this.currentQuery || !this.queries.length) return;
-
this.currentQuery = this.queries.shift();
this.connection.find.apply(this.connection, this.currentQuery.slice(1));
}
View
80 src/mongo.cc
@@ -45,11 +45,11 @@ class Connection : public node::EventEmitter {
t->InstanceTemplate()->SetInternalFieldCount(1);
NODE_SET_PROTOTYPE_METHOD(t, "connect", Connect);
- NODE_SET_PROTOTYPE_METHOD(t, "close", Close);
- NODE_SET_PROTOTYPE_METHOD(t, "find", Find);
- NODE_SET_PROTOTYPE_METHOD(t, "insert", Insert);
- NODE_SET_PROTOTYPE_METHOD(t, "update", Update);
- NODE_SET_PROTOTYPE_METHOD(t, "remove", Remove);
+ NODE_SET_PROTOTYPE_METHOD(t, "close", Close);
+ NODE_SET_PROTOTYPE_METHOD(t, "find", Find);
+ NODE_SET_PROTOTYPE_METHOD(t, "insert", Insert);
+ NODE_SET_PROTOTYPE_METHOD(t, "update", Update);
+ NODE_SET_PROTOTYPE_METHOD(t, "remove", Remove);
target->Set(String::NewSymbol("Connection"), t->GetFunction());
}
@@ -158,6 +158,13 @@ class Connection : public node::EventEmitter {
void Close() {
pdebug("--- in Close()\n");
HandleScope scope;
+ close = true;
+ }
+
+ void reallyClose() {
+ HandleScope scope;
+ StopWriteWatcher();
+ StopReadWatcher();
if (writebuf) {
delete [] writebuf;
@@ -175,10 +182,6 @@ class Connection : public node::EventEmitter {
mongo_destroy(conn);
-
- pdebug("closed connection %d %d\n", get_more?1:0, writebuflen);
- StopWriteWatcher();
- StopReadWatcher();
Emit(String::New("close"), 0, NULL);
Unref();
@@ -366,7 +369,6 @@ class Connection : public node::EventEmitter {
StopWriteWatcher();
}
-
void ConsumeInput() {
char *tmp;
char readbuf[chunk_size];
@@ -419,16 +421,15 @@ class Connection : public node::EventEmitter {
data = mongo_data_append(data, cursor->ns, sl);
data = mongo_data_append32(data, &zero);
data = mongo_data_append64(data, &(cursor->mm->fields.cursorID));
- mongo_message_send(conn, mm);
- StartReadWatcher();
- StopWriteWatcher();
+ BufferMessageToSend(mm);
}
bool Find(const char *ns, bson *query=0, bson *query_fields=0,
int nToReturn=0, int nToSkip=0, int options=0) {
- pdebug("within Find\n");
+ StartReadWatcher();
+ assert(!close);
cursor = static_cast<mongo_cursor*>(
bson_malloc(sizeof(mongo_cursor)));
@@ -459,13 +460,9 @@ class Connection : public node::EventEmitter {
bson_fatal_msg((data == ((char*)mm) + mm->head.len), "query building fail!");
BufferMessageToSend(mm);
-
- pdebug("Starting read watcher!\n");
- StartReadWatcher();
}
void Insert(const char *ns, bson obj) {
- //mongo_insert(conn, ns, &obj);
char * data;
mongo_message *mm = mongo_message_create( 16 /* header */
+ 4 /* ZERO */
@@ -499,7 +496,6 @@ class Connection : public node::EventEmitter {
}
void Update(const char *ns, bson cond, bson op, int flags=0) {
- //mongo_update(conn, ns, &cond, &obj, 0);
char * data;
mongo_message * mm = mongo_message_create( 16 /* header */
+ 4 /* ZERO */
@@ -531,11 +527,14 @@ class Connection : public node::EventEmitter {
return args.This();
}
+ ~Connection() {
+ }
+
Connection() : node::EventEmitter() {
HandleScope scope;
results = Persistent<Array>::New(Array::New());
-
+ close = false;
cursor = false;
get_more = false;
buflen = writebuflen = 0;
@@ -552,20 +551,19 @@ class Connection : public node::EventEmitter {
static Handle<Value>
Connect(const Arguments &args) {
HandleScope scope;
+ Connection *connection = ObjectWrap::Unwrap<Connection>(args.This());
// XXX check args.Length
String::Utf8Value host(args[0]->ToString());
- Connection *connection = ObjectWrap::Unwrap<Connection>(args.This());
connection->Connect(*host, args[1]->Int32Value());
return Undefined();
}
static Handle<Value>
Close(const Arguments &args) {
- Connection *connection = ObjectWrap::Unwrap<Connection>(args.This());
-
HandleScope scope;
+ Connection *connection = ObjectWrap::Unwrap<Connection>(args.This());
connection->Close();
@@ -574,9 +572,8 @@ class Connection : public node::EventEmitter {
static Handle<Value>
Find(const Arguments &args) {
- Connection *connection = ObjectWrap::Unwrap<Connection>(args.This());
-
HandleScope scope;
+ Connection *connection = ObjectWrap::Unwrap<Connection>(args.This());
// TODO assert ns != undefined (args.Length > 0)
String::Utf8Value ns(args[0]->ToString());
@@ -618,9 +615,8 @@ class Connection : public node::EventEmitter {
static Handle<Value>
Insert(const Arguments &args) {
- Connection *connection = ObjectWrap::Unwrap<Connection>(args.This());
-
HandleScope scope;
+ Connection *connection = ObjectWrap::Unwrap<Connection>(args.This());
String::Utf8Value ns(args[0]->ToString());
// TODO assert ns != undefined (args.Length > 0)
@@ -639,9 +635,8 @@ class Connection : public node::EventEmitter {
static Handle<Value>
Update(const Arguments &args) {
- Connection *connection = ObjectWrap::Unwrap<Connection>(args.This());
-
HandleScope scope;
+ Connection *connection = ObjectWrap::Unwrap<Connection>(args.This());
String::Utf8Value ns(args[0]->ToString());
// TODO assert ns != undefined (args.Length > 0)
@@ -682,7 +677,7 @@ class Connection : public node::EventEmitter {
String::New("ns must be specified")));
}
String::Utf8Value ns(args[0]->ToString());
-
+
bson cond;
if (args.Length() > 1 && args[1]->IsObject()) {
@@ -705,8 +700,18 @@ class Connection : public node::EventEmitter {
}
void Event(EV_P_ ev_io *w, int revents) {
- if (!conn->connected) return;
- pdebug("event %d\n", conn->connected);
+ if (!conn->connected) {
+ StopReadWatcher();
+ StopWriteWatcher();
+ return;
+ };
+ pdebug("event %d %d\n", conn->connected, close ? 1 : 0);
+ if (revents & EV_READ) {
+ pdebug("!!! got a read event\n");
+ StopReadWatcher();
+ ConsumeInput();
+ CheckBuffer();
+ }
if (revents & EV_WRITE) {
pdebug("!!! got a write event\n");
pdebug("!!! writebuflen = %d\n", writebuflen);
@@ -725,12 +730,10 @@ class Connection : public node::EventEmitter {
Emit(String::New("ready"), 0, NULL);
}
}
- if (revents & EV_READ) {
- pdebug("!!! got a read event\n");
- StopReadWatcher();
- ConsumeInput();
- CheckBuffer();
- return;
+ if (close) {
+ pdebug("!!! really closing %d\n", close);
+ reallyClose();
+ close = false;
}
if (revents & EV_ERROR) {
pdebug("!!! got an error event\n");
@@ -756,6 +759,7 @@ class Connection : public node::EventEmitter {
// states
bool get_more;
+ bool close;
mongo_cursor *cursor;

0 comments on commit 1ef9bfd

Please sign in to comment.