Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
branch: master
Fetching contributors…

Cannot retrieve contributors at this time

451 lines (390 sloc) 11.658 kB
$include_const 'datatypes.pasm';
$include_const 'hash_key_type.pasm';
$include_const 'ZeroMQ/Constants.pasm';
function load[anon,load]() {
load_bytecode('ZeroMQ/Raw.pbc');
}
// errno to exception translation
function zmq_err[anon](string name) {
using ZeroMQ.Raw.zmq_errno;
using ZeroMQ.Raw.zmq_strerror;
int errno = zmq_errno();
string err_str = atos(zmq_strerror(errno));
string msg = name + ': ' + err_str;
return new 'Exception'({'type': errno, 'message': msg});
}
// type constants
function int_t[anon, immediate]() {
var int_t = new 'StructView'([ DATATYPE_STRUCT, 1, DATATYPE_INT ]);
return int_t;
}
function int64_t[anon, immediate]() {
var int64_t = new 'StructView'([ DATATYPE_STRUCT, 1, DATATYPE_INT64 ]);
return int64_t;
}
function uint64_t[anon, immediate]() {
var uint64_t = new 'StructView'([ DATATYPE_STRUCT, 1, DATATYPE_UINT64 ]);
return uint64_t;
}
function byte_t[anon, immediate]() {
var byte_t = new 'StructView'([ DATATYPE_STRUCT, 1, DATATYPE_UINT8 ]);
return byte_t;
}
function zmq_pollitem_t[anon,immediate]() {
var pollitem_t = new 'StructView'([ DATATYPE_STRUCT, 4,
DATATYPE_PTR, DATATYPE_INT, DATATYPE_SHORT, DATATYPE_SHORT ]);
return pollitem_t;
}
function zmq_msg_t[anon,immediate]() {
var uchar_t = new 'StructView'([ DATATYPE_STRUCT, 1, DATATYPE_UCHAR ]);
var msg_t = new 'StructView'([ DATATYPE_STRUCT, 4,
DATATYPE_PTR, DATATYPE_UCHAR, DATATYPE_UCHAR,
DATATYPE_SIZED, uchar_t.aligned_size()*ZMQ_MAX_VSM_SIZE, uchar_t.align() ]);
return msg_t;
}
// buffer utilities
function stoa[anon](string s) {
var str_to_cstring = dlfunc(null, 'Parrot_str_to_cstring', 'ppS');
return str_to_cstring(getinterp(), s);
}
function stob[anon](string s) {
var str_to_buf = dlfunc(null, 'Parrot_str_cstring', 'ppS');
var buf = str_to_buf(getinterp(), s);
var str_bytes = dlfunc(null, 'Parrot_str_byte_length', 'ipS');
int bytes = str_bytes(getinterp(), s);
return buf, bytes;
}
function atos[anon](var a) {
var cstring_to_str = dlfunc(null, 'Parrot_str_new', 'Sppi');
return cstring_to_str(getinterp(), a, 0);
}
function btos[anon](var b, int len) {
var buf_to_str = dlfunc(null, 'Parrot_str_new', 'Sppi');
return buf_to_str(getinterp(), b, len);
}
namespace ZeroMQ {
function version() {
using static int_t;
var buf = int_t.alloc(3);
var major = int_t.array_offs(buf, 0);
var minor = int_t.array_offs(buf, 1);
var patch = int_t.array_offs(buf, 2);
using ZeroMQ.Raw.zmq_version;
zmq_version(major, minor, patch);
var ret = new 'FixedIntegerArray'(3);
ret[0] = int_t[major,0];
ret[1] = int_t[minor,0];
ret[2] = int_t[patch,0];
return ret;
}
class Context {
var ptr;
function Context(int i[optional], int has_i[opt_flag]) {
if (!has_i)
i = 1;
using ZeroMQ.Raw.zmq_init;
if (null == (self.ptr = zmq_init(i)))
throw zmq_err('zmq_init');
}
function term[vtable('destroy')]() {
// avoid double-free problems induced by manual termination
if (self.ptr != null) {
var ptr = self.ptr;
self.ptr = null;
using ZeroMQ.Raw.zmq_term;
if (zmq_term(ptr))
throw zmq_err('zmq_term');
}
}
function socket(int type) {
return new ZeroMQ.Socket(self, type);
}
}
class Socket {
var ptr;
function Socket(var ctx, int type) {
if (!(ctx instanceof ZeroMQ.Context))
die("incorrect type");
using ZeroMQ.Raw.zmq_socket;
if (null == (self.ptr = zmq_socket(ctx.ptr, type)))
throw zmq_err('zmq_socket');
}
function close[vtable('destroy')]() {
// avoid double-free problems induced by manual closing
if (self.ptr != null) {
var ptr = self.ptr;
self.ptr = null;
using ZeroMQ.Raw.zmq_close;
if (zmq_close(ptr))
throw zmq_err('zmq_close');
}
}
function bind(string addr) {
using static stoa;
var addr_buf = stoa(addr);
using ZeroMQ.Raw.zmq_bind;
if (zmq_bind(self.ptr, addr_buf))
throw zmq_err('zmq_bind');
}
function connect(string addr) {
using static stoa;
var addr_buf = stoa(addr);
using ZeroMQ.Raw.zmq_connect;
if (zmq_connect(self.ptr, addr_buf))
throw zmq_err('zmq_connect');
}
function send(var msg, int flags[optional], int has_flags[opt_flag]) {
if (!has_flags)
flags = 0;
if (!(msg instanceof ZeroMQ.Message))
die("incorrect type");
using ZeroMQ.Raw.zmq_send;
if (zmq_send(self.ptr, msg.ptr, flags))
throw zmq_err('zmq_send');
}
function recv(var msg, int flags[optional], int has_flags[opt_flag]) {
if (!has_flags)
flags = 0;
if (!(msg instanceof ZeroMQ.Message))
die("incorrect type");
using ZeroMQ.Raw.zmq_recv;
if (zmq_recv(self.ptr, msg.ptr, flags))
throw zmq_err('zmq_recv');
}
function getsockopt(int opt) {
using static byte_t;
using static int_t;
using static int64_t;
using static uint64_t;
var opt_buf = null;
var opt_buf_t = null;
var opt_buflen = int_t.alloc();
switch (opt) {
case ZMQ_MCAST_LOOP:
case ZMQ_RCVMORE:
case ZMQ_SWAP:
case ZMQ_RATE:
case ZMQ_RECOVERY_IVL:
opt_buf = int64_t.alloc();
opt_buf_t = int64_t;
int_t[opt_buflen,0] = int64_t.size();
break;
case ZMQ_HWM:
case ZMQ_AFFINITY:
case ZMQ_SNDBUF:
case ZMQ_RCVBUF:
opt_buf = uint64_t.alloc();
opt_buf_t = uint64_t;
int_t[opt_buflen,0] = uint64_t.size();
break;
case ZMQ_IDENTITY:
opt_buf = byte_t.alloc(255);
opt_buf_t = byte_t;
int_t[opt_buflen,0] = byte_t.aligned_size() * 255;;
break;
}
if (zmq_getsockopt(self.ptr, opt, opt_buf, opt_buflen))
throw zmq_err('zmq_getsockopt');
switch (opt_buf_t) {
case int64_t:
case uint64_t:
return opt_buf_t[opt_buf,0];
case byte_t:
// set size to actual size
int len = int_t[opt_buflen,0];
using static btos;
return btos(opt_buf, len);
}
}
function setsockopt(int opt, var val) {
using static byte_t;
using static int_t;
using static int64_t;
using static uint64_t;
var opt_buf = null;
int opt_buflen = 0;
switch (opt) {
case ZMQ_MCAST_LOOP:
case ZMQ_RCVMORE:
val = val ? true : false;
case ZMQ_SWAP:
case ZMQ_RATE:
case ZMQ_RECOVERY_IVL:
opt_buf = int64_t.alloc();
int64_t[opt_buf,0] = val;
opt_buflen = int64_t.size();
break;
case ZMQ_HWM:
case ZMQ_AFFINITY:
case ZMQ_SNDBUF:
case ZMQ_RCVBUF:
opt_buf = uint64_t.alloc();
uint64_t[opt_buf,0] = val;
opt_buflen = uint64_t.size();
break;
case ZMQ_IDENTITY:
using static stob;
var cs = stob(val);
opt_buf = cs[0]; opt_buflen = cs[1];
break;
}
if (zmq_setsockopt(self.ptr, opt, opt_buf, opt_buflen))
throw zmq_err('zmq_setsockopt');
}
}
function device(int type, var frontend, var backend) {
if (!(frontend instanceof ZeroMQ.Socket
&& backend instanceof ZeroMQ.Socket))
die("incorrect type");
if (zmq_device(type, frontend.ptr, backend.ptr))
throw zmq_err('zmq_device');
}
function msg_data_buf_stash[anon,immediate]() {
var ret = new 'Hash';
ret.set_key_type(Hash_key_type_PMC_ptr);
return ret;
}
function free_msg_data_buf_cb[anon](var userdata, var ptr) {
using static msg_data_buf_stash;
delete msg_data_buf_stash[userdata];
}
class Message {
var ptr;
function Message(var init[optional], int has_init[opt_flag]) {
using static zmq_msg_t;
var ptr = self.ptr = zmq_msg_t.alloc();
if (has_init) {
if (init instanceof Integer) {
using ZeroMQ.Raw.zmq_msg_init_size;
if (zmq_msg_init_size(ptr, int(init)))
throw zmq_err('zmq_msg_init_size');
}
else if (init instanceof String) {
using static stob;
var buf; int buf_len;
var cs = invoke(stob(init));
buf = cs[0]; buf_len = cs[1];
using static msg_data_buf_stash;
msg_data_buf_stash[buf] = buf;
using static free_msg_data_buf_cb;
var free_cb;
${ new_callback free_cb, free_msg_data_buf_cb, buf, 'vpU' };
using ZeroMQ.Raw.zmq_msg_init_data;
if (zmq_msg_init_data(ptr, buf, buf_len, free_cb, buf))
throw zmq_err('zmq_msg_init_data');
}
else if (init instanceof PtrBuf) {
using static msg_data_buf_stash;
msg_data_buf_stash[init] = init;
using static free_msg_data_buf_cb;
var free_cb;
${ new_callback free_cb, free_msg_data_buf_cb, init, 'vpU' };
using ZeroMQ.Raw.zmq_msg_init_data;
if (zmq_msg_init_data(ptr, init, int(init), free_cb, init))
throw zmq_err('zmq_msg_init_data');
}
}
else {
using ZeroMQ.Raw.zmq_msg_init;
if (zmq_msg_init(ptr))
throw zmq_err('zmq_msg_init');
}
}
function close[vtable('destroy')]() {
// avoid double-free problems induced by manual closing
if (self.ptr != null) {
var ptr = self.ptr;
self.ptr = null;
using ZeroMQ.Raw.zmq_msg_close;
if (zmq_msg_close(ptr))
throw zmq_err('zmq_msg_close');
}
}
function data() {
using ZeroMQ.Raw.zmq_msg_data;
using ZeroMQ.Raw.zmq_msg_size;
var ptr = zmq_msg_data(self.ptr);
int size = zmq_msg_size(self.ptr);
var ret = new 'PtrBuf'(ptr);
ret = size;
return ret;
}
function data_str[vtable('get_string')]() {
using ZeroMQ.Raw.zmq_msg_data;
using ZeroMQ.Raw.zmq_msg_size;
var ptr = zmq_msg_data(self.ptr);
int size = zmq_msg_size(self.ptr);
using static btos;
return btos(ptr, size);
}
function size() {
using ZeroMQ.Raw.zmq_msg_size;
return zmq_msg_size(self.ptr);
}
function copy(var src) {
if (!(src instanceof ZeroMQ.Message))
die("incorrect type");
using ZeroMQ.Raw.zmq_msg_copy;
if (zmq_msg_copy(self.ptr, src.ptr))
throw zmq_err('zmq_msg_copy');
}
function move(var src) {
if (!(src instanceof ZeroMQ.Message))
die("incorrect type");
using ZeroMQ.Raw.zmq_msg_move;
if (zmq_msg_move(self.ptr, src.ptr))
throw zmq_err('zmq_msg_move');
}
}
class PollItem {
var ptr;
var parent;
function set_flags[vtable('set_integer_native')](int flags) {
using static zmq_pollitem_t;
zmq_pollitem_t[self, 2] = flags;
}
function get_flags() {
using static zmq_pollitem_t;
int flags = zmq_pollitem_t[self, 2];
return flags;
}
function get_status[vtable('get_integer')]() {
using static zmq_pollitem_t;
int sts = zmq_pollitem_t[self, 3];
return sts;
}
}
class PollItemList {
var ptr;
var len;
var socks;
function PollItemList(var items) {
using static zmq_pollitem_t;
var ptr = self.ptr = zmq_pollitem_t.alloc(n);
int n = self.len = elements(items);
self.socks = items;
for (int i = 0; i < n; i++) {
var item = items[i];
var elt_ptr = zmq_pollitem_t.array_offs(ptr, i);
if (item instanceof ZeroMQ.Socket)
zmq_pollitem_t[elt_ptr,0] = item.ptr;
else
zmq_pollitem_t[elt_ptr,1] = item.get_fd();
}
}
function get_pmc_keyed_int[vtable](int k) {
using static zmq_pollitem_t;
var item = new ZeroMQ.PollItem;
item.ptr = zmq_pollitem_t.array_offs(self.ptr, k);
item.parent = self;
return item;
}
}
function poll(var itemlist, int timeout) {
using ZeroMQ.Raw.zmq_poll;
if (!(itemlist instanceof ZeroMQ.PollItemList))
die("incorrect type");
if (zmq_poll(itemlist.ptr, itemlist.size, timeout))
throw zmq_err('zmq_poll');
}
}
Jump to Line
Something went wrong with that request. Please try again.