Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

zpoll functionality #5

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 129 additions & 0 deletions src/classes/zpoll.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
const std = @import("std");
const zcontext = @import("zcontext.zig");
const zmessage = @import("zmessage.zig");
const zsocket = @import("zsocket.zig");
const c = @import("../zmq.zig").c;

pub const ZPollEvent = enum(i16) {
PollIn = 1,
PollOut = 2,
PollErr = 4,
PollPri = 8,
};

pub const ZPollItem = struct {
/// The ZSocket that the event will poll on
socket: *zsocket.ZSocket,

/// File descriptor associated with the socket
fd: i32 = 0,

/// Bitmask specifying the events to poll for on the socket.
events: i16 = 0,

/// Bitmask specifying the events that occurred on the socket during polling
revents: i16 = 0,

/// Produces a ZPollItem. At compile time events are merged to a single bitmask flag.
pub fn build(socket: *zsocket.ZSocket, fd: i32, comptime events: []const ZPollEvent) ZPollItem {
comptime var flag: i16 = 0;
inline for (events) |eventFlag| {
flag |= @intFromEnum(eventFlag);
}
return .{
.socket = socket,
.fd = fd,
.events = flag,
.revents = 0,
};
}
};

/// The size indicates the number of poll items that the ZPoll can contain.
pub fn ZPoll(size: usize) type {
return struct {
const Self = @This();
pollItems_: [size]c.zmq_pollitem_t = undefined,

/// Sets up a new ZPoll instance
pub fn init(poll_items: []const ZPollItem) Self {
var zpoll = Self{};
for (0.., poll_items) |i, item| {
zpoll.pollItems_[i] = .{
.socket = item.socket.socket_,
.fd = item.fd,
.events = item.events,
.revents = item.revents,
};
}
return zpoll;
}

/// Gets the returned events bitmask
pub fn returnedEvents(self: *Self, index: usize) i16 {
return self.pollItems_[index].revents;
}

/// Verifies if all requested events are flagged at the given index in the returned events.
/// At compile time events are merged to a single bitmask flag.
pub fn eventsOccurred(self: *Self, index: usize, comptime events: []const ZPollEvent) bool {
comptime var flag = 0;
inline for (events) |eventFlag| {
flag |= @intFromEnum(eventFlag);
}
return self.pollItems_[index].revents & flag != 0;
}

/// Perform polling on multiple ZeroMQ sockets to check for events.
/// Equivalent to the zmq_poll function.
pub fn poll(self: *Self, len: usize, timeout: i64) !void {
const rc = c.zmq_poll(&self.pollItems_, @intCast(len), timeout);
if (rc < 0) {
return switch (c.zmq_errno()) {
c.ETERM => error.ZSocketTerminated,
c.EFAULT => error.ItemsInvalid,
c.EINTR => error.Interrupted,
else => return error.PollFailed,
};
}
}
};
}

test "ZPoll - two sockets" {
const allocator = std.testing.allocator;

var context = try zcontext.ZContext.init(allocator);
defer context.deinit();

const router1 = try zsocket.ZSocket.init(.Router, &context);
defer router1.deinit();
try router1.bind("inproc://test-socket1");

const router2 = try zsocket.ZSocket.init(.Router, &context);
defer router2.deinit();
try router2.bind("inproc://test-socket2");

var msg = try zmessage.ZMessage.initUnmanaged("testmsg", null);
defer msg.deinit();

const req1 = try zsocket.ZSocket.init(.Req, &context);
defer req1.deinit();
try req1.connect("inproc://test-socket1");
try req1.send(&msg, .{});

const req2 = try zsocket.ZSocket.init(.Req, &context);
defer req2.deinit();
try req2.connect("inproc://test-socket2");
try req2.send(&msg, .{});

var poll = ZPoll(2).init(&[_]ZPollItem{
ZPollItem.build(router1, 0, &[_]ZPollEvent{.PollIn}),
ZPollItem.build(router2, 0, &[_]ZPollEvent{.PollIn}),
});

try poll.poll(1, 1000);
try std.testing.expect(poll.eventsOccurred(0, &[_]ZPollEvent{.PollIn}));
try poll.poll(2, 1000);
try std.testing.expect(poll.eventsOccurred(1, &[_]ZPollEvent{.PollIn}));
}
36 changes: 33 additions & 3 deletions src/classes/zsocket.zig
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,31 @@ pub const ZSocketOption = union(enum) {
///
/// For more details, see https://libzmq.readthedocs.io/en/latest/zmq_setsockopt.html
RouterHandover: bool,

/// ZMQ_SUBSCRIBE: Establish message filter
///
/// The 'ZMQ_SUBSCRIBE' option shall establish a new message filter on a 'ZMQ_SUB' socket.
/// Newly created 'ZMQ_SUB' sockets shall filter out all incoming messages, therefore you should
/// call this option to establish an initial message filter.
///
/// An empty 'option_value' of length zero shall subscribe to all incoming messages. A non-empty
/// 'option_value' shall subscribe to all messages beginning with the specified prefix. Multiple
/// filters may be attached to a single 'ZMQ_SUB' socket, in which case a message shall
/// be accepted if it matches at least one filter.
///
/// For more details, see https://libzmq.readthedocs.io/en/latest/zmq_setsockopt.html
Subscribe: []u8,

/// ZMQ_UNSUBSCRIBE: Remove message filter
///
/// The 'ZMQ_UNSUBSCRIBE' option shall remove an existing message filter on a 'ZMQ_SUB'
/// socket. The filter specified must match an existing filter previously established
/// with the 'ZMQ_SUBSCRIBE' option. If the socket has several instances of the same
/// filter attached the 'ZMQ_UNSUBSCRIBE' option shall remove only one instance, leaving
/// the rest in place and functional.
///
/// For more details, see https://libzmq.readthedocs.io/en/latest/zmq_setsockopt.html
Unsubscribe: []u8,
};

/// System level socket, which allows for opening outgoing and
Expand Down Expand Up @@ -544,6 +569,12 @@ pub const ZSocket = struct {

result = c.zmq_setsockopt(self.socket_, c.ZMQ_ROUTER_HANDOVER, &v, @sizeOf(@TypeOf(v)));
},
.Subscribe => {
result = c.zmq_setsockopt(self.socket_, c.ZMQ_SUBSCRIBE, opt.Subscribe.ptr, opt.Subscribe.len);
},
.Unsubscribe => {
result = c.zmq_setsockopt(self.socket_, c.ZMQ_UNSUBSCRIBE, opt.Unsubscribe.ptr, opt.Unsubscribe.len);
},

//else => return error.UnknownOption,
}
Expand Down Expand Up @@ -605,10 +636,9 @@ pub const ZSocket = struct {
.RoutingId => {
result = c.zmq_getsockopt(self.socket_, c.ZMQ_ROUTING_ID, opt.RoutingId.ptr, &opt.RoutingId.len);
},
.RouterHandover => {
return error.UnknownOption; // ZMQ_ROUTER_HANDOVER cannot be retrieved
.RouterHandover, .Subscribe, .Unsubscribe => {
return error.UnknownOption; // ZMQ_ROUTER_HANDOVER, ZMQ_SUBSCRIBE, and ZMQ_UNSUBSCRIBE cannot be retrieved
},

//else => return error.UnknownOption,
}

Expand Down
5 changes: 5 additions & 0 deletions src/zzmq.zig
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ const std = @import("std");
const zcontext = @import("classes/zcontext.zig");
const zsocket = @import("classes/zsocket.zig");
const zmessage = @import("classes/zmessage.zig");
const zpool = @import("classes/zpoll.zig");

pub const ZContext = zcontext.ZContext;
pub const ZVersion = zcontext.ZVersion;
Expand All @@ -14,6 +15,10 @@ pub const ZMessageReceived = zsocket.ZMessageReceived;

pub const ZMessage = zmessage.ZMessage;

pub const ZPollItem = zpool.ZPollItem;
pub const ZPollEvent = zpool.ZPollEvent;
pub const ZPoll = zpool.ZPoll;

test {
std.testing.refAllDeclsRecursive(@This());
}
Loading