Skip to content

Commit

Permalink
tidy and add mongo.close() function
Browse files Browse the repository at this point in the history
  • Loading branch information
orlandov committed Jan 4, 2010
1 parent 5855026 commit ccfe6b3
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 38 deletions.
91 changes: 54 additions & 37 deletions mongo.cc
Expand Up @@ -18,8 +18,6 @@ extern "C" {
}
#include "bson.h"

#define DEBUG_LEVEL 1

#define DEBUGMODE 0
#define pdebug(...) do{if(DEBUGMODE)printf(__VA_ARGS__);}while(0)

Expand Down Expand Up @@ -81,6 +79,7 @@ class Connection : public node::EventEmitter {
t->InstanceTemplate()->SetInternalFieldCount(1);

NODE_SET_PROTOTYPE_METHOD(t, "connect", Connect);
NODE_SET_PROTOTYPE_METHOD(t, "close", Close);
NODE_SET_PROTOTYPE_METHOD(t, "find", Find);
NODE_SET_PROTOTYPE_METHOD(t, "insert", Insert);
NODE_SET_PROTOTYPE_METHOD(t, "update", Update);
Expand Down Expand Up @@ -135,8 +134,7 @@ class Connection : public node::EventEmitter {
MongoCreateSocket();
}

mongo_conn_return
MongoCreateSocket() {
mongo_conn_return MongoCreateSocket() {
conn->sock = 0;
conn->connected = 0;

Expand All @@ -152,15 +150,11 @@ class Connection : public node::EventEmitter {
}

setNonBlocking(conn->sock);
int res = connect( conn->sock, (struct sockaddr*) &conn->sa, conn->addressSize);
int res = connect(conn->sock, (struct sockaddr*) &conn->sa, conn->addressSize);

assert(res < 0);
assert(errno == EINPROGRESS);

// if ( ){
// return mongo_conn_fail;
// }

ev_io_set(&connect_watcher, conn->sock, EV_WRITE);
StartConnectWatcher();
}
Expand All @@ -174,8 +168,9 @@ class Connection : public node::EventEmitter {
Emit(String::New("connection"), 0, NULL);
}

bool
Connect(const char *host, const int32_t port) {
void Connect(const char *host, const int32_t port) {
HandleScope scope;

mongo_connection_options opts;
memcpy(opts.host, host, strlen(host)+1);
opts.host[strlen(host)] = '\0';
Expand All @@ -191,14 +186,20 @@ class Connection : public node::EventEmitter {
ev_io_set(&write_watcher, conn->sock, EV_WRITE);

StartWriteWatcher();
}

//Attach();
void Close() {
HandleScope scope;

return true;
StopWriteWatcher();
StopReadWatcher();

mongo_destroy(conn);

Emit(String::New("close"), 0, NULL);
}

void
CheckBufferContents(void) {
void CheckBufferContents() {
if (state == STATE_READ_HEAD) {
if (buflen >= headerSize) {
pdebug("got enough for the head\n");
Expand Down Expand Up @@ -255,9 +256,9 @@ class Connection : public node::EventEmitter {
return true;
}

void
ParseMessage(void) {
void ParseMessage() {
HandleScope scope;

pdebug("in parse message\n");

int len;
Expand All @@ -284,8 +285,7 @@ class Connection : public node::EventEmitter {
ParseReply(out);
}

void
ParseReply(mongo_reply *out) {
void ParseReply(mongo_reply *out) {
HandleScope scope;

pdebug("parsing reply\n");
Expand All @@ -311,17 +311,15 @@ class Connection : public node::EventEmitter {
StopReadWatcher();
StartWriteWatcher();
pdebug("end of readresponse\n");

return;
}

bool FreeCursor() {
void FreeCursor() {
free((void*)cursor->ns);
free(cursor);
cursor = NULL;
}

bool EmitResults() {
void EmitResults() {
FreeCursor();

Emit(String::New("result"), 1, reinterpret_cast<Handle<Value> *>(&results));
Expand All @@ -331,11 +329,9 @@ class Connection : public node::EventEmitter {
results.Clear();
Handle<Array> r = Array::New();
results = Persistent<Array>::New(r);

return false;
}

bool AdvanceCursor(void) {
bool AdvanceCursor() {
char* bson_addr;

/* no data */
Expand Down Expand Up @@ -372,19 +368,20 @@ class Connection : public node::EventEmitter {
return false;
}

bool ConsumeInput(void) {
bool ConsumeInput() {
char *tmp;
char readbuf[chunk_size];
int32_t readbuflen;

while (true) {
for (;;) {
readbuflen = read(conn->sock, readbuf, chunk_size);

// no more input to consume
if (readbuflen == -1 && errno == EAGAIN) {
// no more input to consume
pdebug("len == -1 && errno == EAGAIN\n");
}
else if (readbuflen <= 0) {
// socket problem?
pdebug("length error on read %d errno = %d\n", readbuflen, errno);
}
else {
Expand Down Expand Up @@ -439,15 +436,15 @@ class Connection : public node::EventEmitter {
protected:

static Handle<Value>
New (const Arguments& args) {
New(const Arguments& args) {
HandleScope scope;

Connection *connection = new Connection();
connection->Wrap(args.This());
return args.This();
}

Connection () : EventEmitter () {
Connection() : EventEmitter() {
HandleScope scope;
Handle<Array> r = Array::New();
results = Persistent<Array>::New(r);
Expand All @@ -467,22 +464,36 @@ class Connection : public node::EventEmitter {
}

static Handle<Value>
Connect (const Arguments &args) {
Connection *connection = ObjectWrap::Unwrap<Connection>(args.This());
Connect(const Arguments &args) {
HandleScope scope;

String::Utf8Value host(args[0]->ToString());
Connection *connection = ObjectWrap::Unwrap<Connection>(args.This());
connection->Connect(*host, args[1]->Int32Value());

return Undefined();
}

static Handle<Value>
Find(const Arguments &args) {
Close(const Arguments &args) {
Connection *connection = ObjectWrap::Unwrap<Connection>(args.This());

HandleScope scope;

connection->Close();

return Undefined();
}

static Handle<Value>
Find(const Arguments &args) {
Connection *connection = ObjectWrap::Unwrap<Connection>(args.This());

HandleScope scope;

// TODO assert ns != undefined (args.Length > 0)
String::Utf8Value ns(args[0]->ToString());

bson query_bson;
bson query_fields_bson;
int nToReturn(0), nToSkip(0);
Expand Down Expand Up @@ -527,9 +538,10 @@ class Connection : public node::EventEmitter {

static Handle<Value>
Insert(const Arguments &args) {
pdebug("inserting here\n");
HandleScope scope;
Connection *connection = ObjectWrap::Unwrap<Connection>(args.This());

HandleScope scope;

String::Utf8Value ns(args[0]->ToString());
// TODO assert ns != undefined (args.Length > 0)

Expand All @@ -547,8 +559,10 @@ class Connection : public node::EventEmitter {

static Handle<Value>
Update(const Arguments &args) {
HandleScope scope;
Connection *connection = ObjectWrap::Unwrap<Connection>(args.This());

HandleScope scope;

String::Utf8Value ns(args[0]->ToString());
// TODO assert ns != undefined (args.Length > 0)

Expand Down Expand Up @@ -609,15 +623,18 @@ class Connection : public node::EventEmitter {
StopWriteWatcher();
if (get_more) {
RequestMore();
return;
}
else {
Emit(String::New("ready"), 0, NULL);
return;
}
}
if (revents & EV_READ) {
pdebug("!!! got a read event\n");
ConsumeInput();
CheckBufferContents();
return;
}
if (revents & EV_ERROR) {
pdebug("!!! got an error event\n");
Expand Down
8 changes: 8 additions & 0 deletions mongodb.js
Expand Up @@ -64,6 +64,10 @@ function MongoDB() {

self = this;

this.connection.addListener("close", function () {
self.emit("close");
});

this.connection.addListener("ready", function () {
self.dispatch();
});
Expand Down Expand Up @@ -92,6 +96,10 @@ MongoDB.prototype.connect = function(args) {
this.connection.connect(this.hostname, this.port);
}

MongoDB.prototype.close = function() {
this.connection.close();
}

MongoDB.prototype.addQuery = function(promise, ns, query, fields, limit, skip ) {
var q = [ promise, ns ];
if (query) q.push(query);
Expand Down
6 changes: 5 additions & 1 deletion test_mongo.js
Expand Up @@ -7,6 +7,10 @@ mongodb = require("./mongodb");

var mongo = new mongodb.MongoDB();

mongo.addListener("close", function () {
sys.puts("Tests done!");
});

mongo.addListener("connection", function () {
var widgets = mongo.getCollection('widgets');

Expand Down Expand Up @@ -52,7 +56,7 @@ mongo.addListener("connection", function () {
assertTrue(results[i].shazbot != 0);
}

sys.puts("Tests done!");
mongo.close();
});
});
});
Expand Down

0 comments on commit ccfe6b3

Please sign in to comment.