Skip to content

Commit

Permalink
working on windows
Browse files Browse the repository at this point in the history
  • Loading branch information
matthiasg committed Mar 6, 2012
1 parent b04e9b3 commit ea889f8
Show file tree
Hide file tree
Showing 18 changed files with 370 additions and 17 deletions.
Binary file added compiled/0.6/win32/ia32/zeromq.node
Binary file not shown.
11 changes: 11 additions & 0 deletions examples/workers/producer.js
@@ -0,0 +1,11 @@

var zmq = require('../../')
, sock = zmq.socket('push');

sock.bindSync('tcp://127.0.0.1:3000');
console.log('Producer bound to port 3000');

setInterval(function(){
console.log('sending work');
sock.send('some work');
}, 500);
10 changes: 10 additions & 0 deletions examples/workers/worker.js
@@ -0,0 +1,10 @@

var zmq = require('../../')
, sock = zmq.socket('pull');

sock.connect('tcp://127.0.0.1:3000');
console.log('Worker connected to port 3000');

sock.on('message', function(msg){
console.log('work: %s', msg.toString());
});
7 changes: 5 additions & 2 deletions lib/index.js
Expand Up @@ -3,8 +3,11 @@
* Module dependencies.
*/

var EventEmitter = require('events').EventEmitter
, zmq = require('../binding');
var EventEmitter = require('events').EventEmitter;


var zmq = require('bindings')('zeromq');
//var zmq = require('../build/Release/zeromq');

/**
* Expose bindings as the module.
Expand Down
20 changes: 20 additions & 0 deletions package.json
@@ -0,0 +1,20 @@
{
"author": "Matthias Goetzke <m.goetzke@curasystems.de> (https://twitter.com/mgoetzke)",
"name": "zmq-windows",
"description": "test of zeromq bindings for windows",
"version": "0.0.1",
"homepage": "https://github.com/matthiasg",
"main": "./index.js",
"bin": "./bin/ariel.js",
"engines": {
"node": ">= 0.6"
},
"dependencies": {
"bindings": "> 0.3.0"
},
"devDependencies": {
"should": ">= 0.5.1",
"ariel": "*"
},
"optionalDependencies": {}
}
43 changes: 28 additions & 15 deletions src/bindings.cc
Expand Up @@ -71,7 +71,7 @@ namespace zmq {
public:
static void Initialize(v8::Handle<v8::Object> target);
virtual ~Socket();
void CallbackIfReady();
bool CallbackIfReady();

private:
static Handle<Value> New(const Arguments &args);
Expand Down Expand Up @@ -111,8 +111,9 @@ namespace zmq {
uint8_t state_;

bool IsReady();
uv_check_t *check_handle_;
static void UV_CheckFDState(uv_check_t* handle, int status);
uv_timer_t *check_timer_handle_;
uv_loop_t *check_loop;
static void UV_CheckFDState(uv_timer_t* handle, int status);
};

Persistent<String> callback_symbol;
Expand Down Expand Up @@ -268,14 +269,14 @@ namespace zmq {
return zmq_poll(items, 1, 0);
}

void
bool
Socket::CallbackIfReady() {
if (this->IsReady()) {
HandleScope scope;

Local<Value> callback_v = this->handle_->Get(callback_symbol);
if (!callback_v->IsFunction()) {
return;
return false;
}

TryCatch try_catch;
Expand All @@ -285,27 +286,36 @@ namespace zmq {
if (try_catch.HasCaught()) {
FatalException(try_catch);
}

return true;
}
else
return false;
}

void
Socket::UV_CheckFDState(uv_check_t* handle, int status) {
Socket::UV_CheckFDState(uv_timer_t* timer, int status) {
assert(status == 0);

Socket* s = static_cast<Socket*>(handle->data);
s->CallbackIfReady();
Socket* s = static_cast<Socket*>(timer->data);

while(s->CallbackIfReady());
}

Socket::Socket(Context *context, int type) : ObjectWrap() {
context_ = Persistent<Object>::New(context->handle_);
socket_ = zmq_socket(context->context_, type);
state_ = STATE_READY;

check_handle_ = new uv_check_t;
check_loop = uv_default_loop();
check_timer_handle_ = new uv_timer_t;

check_handle_->data = this;
uv_check_init(uv_default_loop(), check_handle_);
uv_check_start(check_handle_, Socket::UV_CheckFDState);
uv_timer_init( check_loop, check_timer_handle_ );
uv_timer_start( check_timer_handle_, Socket::UV_CheckFDState, 0, 1);

check_timer_handle_->data = this;
//uv_check_init(uv_default_loop(), check_handle_);
//uv_check_start(check_handle_, Socket::UV_CheckFDState);
}

Socket *
Expand Down Expand Up @@ -784,9 +794,12 @@ namespace zmq {
context_.Dispose();
context_.Clear();

uv_check_stop(check_handle_);
delete check_handle_;
uv_unref(uv_default_loop());
//uv_check_stop(check_handle_);
uv_timer_stop(check_timer_handle_);
uv_unref(check_loop);

//delete check_handle_;
//uv_unref(uv_default_loop());
}
}

Expand Down
1 change: 1 addition & 0 deletions test-all.cmd
@@ -0,0 +1 @@
for %f in (test\*.js) do call node %f
10 changes: 10 additions & 0 deletions test/run
@@ -0,0 +1,10 @@
#!/usr/bin/env bash

echo
files=test/test.*.js
for file in $files; do
printf "\033[90m ${file#test/}\033[0m "
node $@ $file && printf "\033[36m✓\033[0m\n"
test $? -eq 0 || exit $?
done
echo
25 changes: 25 additions & 0 deletions test/test.socket.events.js
@@ -0,0 +1,25 @@

var zmq = require('../')
, should = require('should');

var rep = zmq.socket('rep')
, req = zmq.socket('req');

rep.on('message', function(msg){
msg.should.be.an.instanceof(Buffer);
msg.toString().should.equal('hello');
rep.send('world');
});

rep.bind('inproc://stuff');

rep.on('bind', function(){
req.connect('inproc://stuff');
req.send('hello');
req.on('message', function(msg){
msg.should.be.an.instanceof(Buffer);
msg.toString().should.equal('world');
req.close();
rep.close();
});
});
35 changes: 35 additions & 0 deletions test/test.socket.js
@@ -0,0 +1,35 @@

var zmq = require('../')
, should = require('should');

// .socket

var sock = zmq.socket('req');
sock.type.should.equal('req');
sock.close.should.be.a('function');

// options

zmq.socket('req', { backlog: 30 }).backlog.should.equal(30);

// setsockopt

sock.getsockopt(zmq.ZMQ_BACKLOG).should.not.equal(75);
sock.setsockopt(zmq.ZMQ_BACKLOG, 75).should.equal(sock);
sock.getsockopt(zmq.ZMQ_BACKLOG).should.equal(75);
sock.setsockopt(zmq.ZMQ_BACKLOG, 100);

// setsockopt + string sugar

sock.getsockopt('backlog').should.not.equal(75);
sock.setsockopt('backlog', 75).should.equal(sock);
sock.getsockopt('backlog').should.equal(75);

// setsockopt sugar

sock.backlog.should.be.a('number');
sock.backlog.should.not.equal(50);
sock.backlog = 50;
sock.backlog.should.equal(50);

process.exit(0);
32 changes: 32 additions & 0 deletions test/test.socket.messages.js
@@ -0,0 +1,32 @@

var zmq = require('../')
, should = require('should');

var push = zmq.socket('push')
, pull = zmq.socket('pull');

var n = 0;

pull.on('message', function(msg){
msg = msg.toString();
switch (n++) {
case 0:
msg.should.equal('string');
break;
case 1:
msg.should.equal('15.99');
break;
case 2:
msg.should.equal('buffer');
push.close();
pull.close();
break;
}
});

pull.bind('inproc://stuff', function(){
push.connect('inproc://stuff');
push.send('string');
push.send(15.99);
push.send(new Buffer('buffer'));
});
19 changes: 19 additions & 0 deletions test/test.socket.multipart.message.js
@@ -0,0 +1,19 @@

var zmq = require('../')
, should = require('should');

var push = zmq.socket('push')
, pull = zmq.socket('pull');

pull.on('message', function(msg1, msg2, msg3){
msg1.toString().should.equal('string');
msg2.toString().should.equal('15.99');
msg3.toString().should.equal('buffer');
push.close();
pull.close();
});

pull.bind('inproc://stuff', function(){
push.connect('inproc://stuff');
push.send(['string', 15.99, new Buffer('buffer')]);
});
23 changes: 23 additions & 0 deletions test/test.socket.multipart.sndmore.js
@@ -0,0 +1,23 @@

var zmq = require('../')
, should = require('should');

var push = zmq.socket('push')
, pull = zmq.socket('pull');

pull.on('message', function(a, b, c, d, e){
a.toString().should.equal('tobi');
b.toString().should.equal('loki');
c.toString().should.equal('jane');
d.toString().should.equal('luna');
e.toString().should.equal('manny');
push.close();
pull.close();
});

pull.bind('inproc://stuff', function(){
push.connect('inproc://stuff');
push.send(['tobi', 'loki'], zmq.ZMQ_SNDMORE);
push.send(['jane', 'luna'], zmq.ZMQ_SNDMORE);
push.send('manny');
});
33 changes: 33 additions & 0 deletions test/test.socket.pub-sub.filter.js
@@ -0,0 +1,33 @@

var zmq = require('../')
, should = require('should');

var pub = zmq.socket('pub')
, sub = zmq.socket('sub');

var n = 0;

sub.subscribe('js');
sub.subscribe('luna');

sub.on('message', function(msg){
msg.should.be.an.instanceof(Buffer);
switch (n++) {
case 0:
msg.toString().should.equal('js is cool');
break;
case 1:
msg.toString().should.equal('luna is cool too');
pub.close();
sub.close();
break;
}
});

sub.bind('inproc://stuff', function(){
pub.connect('inproc://stuff');
pub.send('js is cool');
pub.send('ruby is meh');
pub.send('py is pretty cool');
pub.send('luna is cool too');
});
33 changes: 33 additions & 0 deletions test/test.socket.pub-sub.js
@@ -0,0 +1,33 @@

var zmq = require('../')
, should = require('should');

var pub = zmq.socket('pub')
, sub = zmq.socket('sub');

var n = 0;

sub.subscribe('');
sub.on('message', function(msg){
msg.should.be.an.instanceof(Buffer);
switch (n++) {
case 0:
msg.toString().should.equal('foo');
break;
case 1:
msg.toString().should.equal('bar');
break;
case 2:
msg.toString().should.equal('baz');
sub.close();
pub.close();
break;
}
});

sub.bind('inproc://stuff', function(){
pub.connect('inproc://stuff');
pub.send('foo');
pub.send('bar');
pub.send('baz');
});

0 comments on commit ea889f8

Please sign in to comment.