Permalink
Browse files

adding receive method

  • Loading branch information...
1 parent 91a7b0e commit 10e61faa341a6ca9d49187a9687a306c9c27dc39 @tenderlove committed Sep 22, 2008
Showing with 56 additions and 4 deletions.
  1. +18 −0 example/local.rb
  2. +10 −0 example/remote.rb
  3. +6 −2 ext/quail/quail_exchange.c
  4. +15 −0 ext/quail/quail_handle.c
  5. +6 −1 ext/quail/quail_queue.c
  6. +1 −1 lib/quail/handle.rb
View
@@ -0,0 +1,18 @@
+require 'quail'
+
+msg = "hello world"* 100
+Quail::Handle.start('localhost') do |handle|
+ exchange = Quail::Exchange.new(handle, 'EL', Quail::LOCAL)
+ queue = Quail::Queue.new(handle, 'QL', Quail::LOCAL)
+
+ handle.bind("EL", "QG")
+ handle.bind("EG", "QL")
+
+ t = Time.now.to_f
+ 1_000_000.times {
+ exchange.send(msg)
+ }
+ puts Time.now.to_f - t
+ sleep 2
+end
+
View
@@ -0,0 +1,10 @@
+require 'quail'
+
+Quail::Handle.start('localhost') do |handle|
+ exchange = Quail::Exchange.new(handle, 'EG', Quail::GLOBAL)
+ queue = Quail::Queue.new(handle, 'QG', Quail::GLOBAL)
+
+ while true
+ handle.receive
+ end
+end
@@ -8,12 +8,16 @@ static VALUE create_exchange( VALUE self,
{
void * handle;
int eid;
+ int native_scope = NUM2INT(scope);
+
+ char * chr_address = native_scope == CZMQ_SCOPE_LOCAL ?
+ NULL : StringValuePtr(address);
Data_Get_Struct(rb_h, void, handle);
eid = czmq_create_exchange( handle,
StringValuePtr(name),
- NUM2INT(scope),
- StringValuePtr(address) );
+ native_scope,
+ chr_address );
return INT2NUM(eid);
}
View
@@ -23,10 +23,25 @@ static VALUE create_handle(VALUE self, VALUE host)
return self;
}
+static VALUE receive(VALUE self)
+{
+ void * handle;
+ void * buf;
+ size_t size;
+ czmq_free_fn *ffn;
+
+ Data_Get_Struct(rb_iv_get(self, "@handle"), void, handle);
+ czmq_receive(handle, &buf, &size, &ffn);
+ if(ffn) ffn(buf);
+
+ return rb_str_new((char *)buf, size);
+}
+
void Init_Quail_Handle(VALUE mQuail)
{
VALUE cQuailHandle = rb_define_class_under(mQuail, "Handle", rb_cObject);
rb_define_private_method(cQuailHandle, "create_handle", create_handle, 1);
rb_define_private_method(cQuailHandle, "destroy_handle", destroy_handle, 0);
rb_define_private_method(cQuailHandle, "native_bind", native_bind, 2);
+ rb_define_method(cQuailHandle, "receive", receive, 0);
}
View
@@ -8,9 +8,14 @@ static VALUE create_queue( VALUE self,
{
void * handle;
Data_Get_Struct(rb_h, void, handle);
+
+ int native_scope = NUM2INT(scope);
+ char * chr_address = native_scope == CZMQ_SCOPE_LOCAL ?
+ NULL : StringValuePtr(address);
+
czmq_create_queue( handle,
StringValuePtr(name),
- NUM2INT(scope),
+ native_scope,
StringValuePtr(address)
);
}
View
@@ -31,7 +31,7 @@ def finish
def bind exchange, queue
raise "not started" unless @started
- native_bind(exchange.name, queue.name)
+ native_bind(exchange, queue)
end
end
end

0 comments on commit 10e61fa

Please sign in to comment.