Skip to content

Commit

Permalink
Merge pull request #677 from fredoboulo/fix/649
Browse files Browse the repository at this point in the history
Problem: Impossible to register multiple handlers for one socket or
  • Loading branch information
trevorbernard committed Feb 25, 2019
2 parents 67fc402 + 551cd14 commit 7f476dd
Show file tree
Hide file tree
Showing 3 changed files with 225 additions and 35 deletions.
151 changes: 117 additions & 34 deletions src/main/java/org/zeromq/ZPoller.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import org.zeromq.ZMQ.Poller;
import org.zeromq.ZMQ.Socket;

import zmq.poll.PollItem;

/**
* Rewritten poller for 0MQ.
*
Expand Down Expand Up @@ -100,7 +103,7 @@ public interface EventsHandler
public interface ItemHolder
{
// the inner ZMQ poll item
zmq.poll.PollItem item();
PollItem item();

// the related ZeroMQ socket
Socket socket();
Expand Down Expand Up @@ -153,7 +156,7 @@ public ZPollItem(final SelectableChannel channel, final EventsHandler handler, f
}

@Override
public zmq.poll.PollItem item()
public PollItem item()
{
return base();
}
Expand Down Expand Up @@ -228,6 +231,91 @@ public EventsHandler handler()
}
}

private static class CompositePollItem implements ItemHolder, EventsHandler
{
private final Collection<ItemHolder> holders = new HashSet<>();
private final Socket socket;
private final SelectableChannel channel;

private PollItem item;
private EventsHandler globalHandler;

public CompositePollItem(final Object socketOrChannel)
{
this.socket = socketOrChannel instanceof Socket ? (Socket) socketOrChannel : null;
this.channel = socketOrChannel instanceof SelectableChannel ? (SelectableChannel) socketOrChannel : null;
assert (socket != null || channel != null);
}

@Override
public PollItem item()
{
if (item == null) {
item = createItem();
}
return item;
}

private PollItem createItem()
{
if (socket == null) {
return new PollItem(channel, ops());
}
else {
return new PollItem(socket.base(), ops());
}
}

private int ops()
{
return holders.stream().map(holder -> holder.item().zinterestOps()).reduce(this::or).orElse(0);
}

@Override
public Socket socket()
{
return socket;
}

@Override
public EventsHandler handler()
{
return this;
}

@Override
public boolean events(Socket socket, int events)
{
return holders.stream().filter(holder -> holder.item().hasEvent(events))
.map(holder -> holder.handler() == null ? globalHandler : holder.handler()).filter(Objects::nonNull)
.map(handler -> handler.events(socket, events)).reduce(this::and).orElse(false);
}

@Override
public boolean events(SelectableChannel channel, int events)
{
return holders.stream().filter(holder -> holder.item().hasEvent(events))
.map(holder -> holder.handler() == null ? globalHandler : holder.handler()).filter(Objects::nonNull)
.map(handler -> handler.events(channel, events)).reduce(this::and).orElse(false);
}

private Boolean and(Boolean b0, Boolean b1)
{
return b0 & b1;
}

private Integer or(Integer ops1, Integer ops2)
{
return ops1 | ops2;
}

private ItemHolder handler(EventsHandler handler)
{
globalHandler = handler;
return this;
}
}

/******************************************************************************/
/* 0MQ socket events */
/******************************************************************************/
Expand Down Expand Up @@ -334,7 +422,7 @@ private ZPoller(final ItemCreator creator, final ZContext context, final Selecto
this.creator = creator;
this.selector = selector;
items = new HashMap<>();
all = createContainer(0);
all = new HashSet<>();
}

// creates a new poll item
Expand Down Expand Up @@ -446,10 +534,10 @@ public final boolean unregister(final Object socketOrChannel)
if (socketOrChannel == null) {
return false;
}
Set<ItemHolder> items = this.items.remove(socketOrChannel);
CompositePollItem items = this.items.remove(socketOrChannel);
boolean rc = items != null;
if (rc) {
all.removeAll(items);
all.remove(items);
}
return rc;
}
Expand Down Expand Up @@ -488,7 +576,7 @@ public int poll(final long timeout)
protected int poll(final long timeout, final boolean dispatchEvents)
{
// get all the raw items
final Set<zmq.poll.PollItem> pollItems = all.stream().map(ItemHolder::item).collect(Collectors.toSet());
final Set<PollItem> pollItems = all.stream().map(ItemHolder::item).collect(Collectors.toSet());
// polling time
final int rc = poll(selector, timeout, pollItems);

Expand All @@ -505,12 +593,16 @@ protected int poll(final long timeout, final boolean dispatchEvents)
return -1;
}

// does the effective polling
private boolean dispatch(Set<CompositePollItem> all, int size)
{
return dispatch(all.stream().map(aggr -> aggr.handler(globalHandler)).collect(Collectors.toList()), size);
}

// does the effective polling
protected int poll(final Selector selector, final long tout, final Collection<zmq.poll.PollItem> items)
{
final int size = items.size();
return zmq.ZMQ.poll(selector, items.toArray(new zmq.poll.PollItem[size]), size, tout);
return zmq.ZMQ.poll(selector, items.toArray(new PollItem[size]), size, tout);
}

/**
Expand All @@ -533,7 +625,7 @@ protected boolean dispatch(final Collection<ItemHolder> all, int size)
// no handler, short-circuit
continue;
}
final zmq.poll.PollItem item = holder.item();
final PollItem item = holder.item();
final int events = item.readyOps();

if (events <= 0) {
Expand Down Expand Up @@ -606,7 +698,7 @@ public boolean readable(final Socket socket)
// checks for read event
public boolean readable(final Object socketOrChannel)
{
final zmq.poll.PollItem it = filter(socketOrChannel, READABLE);
final PollItem it = filter(socketOrChannel, READABLE);
if (it == null) {
return false;
}
Expand Down Expand Up @@ -658,7 +750,7 @@ public boolean writable(final Socket socket)
// checks for write event
public boolean writable(final Object socketOrChannel)
{
final zmq.poll.PollItem it = filter(socketOrChannel, WRITABLE);
final PollItem it = filter(socketOrChannel, WRITABLE);
if (it == null) {
return false;
}
Expand Down Expand Up @@ -710,7 +802,7 @@ public boolean error(final Socket socket)
// checks for error event
public boolean error(final Object socketOrChannel)
{
final zmq.poll.PollItem it = filter(socketOrChannel, ERR);
final PollItem it = filter(socketOrChannel, ERR);
if (it == null) {
return false;
}
Expand Down Expand Up @@ -750,9 +842,9 @@ public void destroy()
// creator of items
private final ItemCreator creator;
// managed items
private final Map<Object, Set<ItemHolder>> items;
private final Map<Object, CompositePollItem> items;
// all managed items to avoid penalty cost when dispatching
private final Set<ItemHolder> all;
private final Set<CompositePollItem> all;

// TODO set of handlers, each with its specified events?
// optional global events handler
Expand Down Expand Up @@ -792,15 +884,16 @@ else if (socket == null) {
}
assert (socketOrChannel != null);

Set<ItemHolder> holders = items.computeIfAbsent(socketOrChannel, i -> createContainer(1));
final boolean rc = holders.add(holder);
CompositePollItem aggregate = items.computeIfAbsent(socketOrChannel, CompositePollItem::new);
final boolean rc = aggregate.holders.add(holder);
if (rc) {
all.add(holder);
all.add(aggregate);
}
return rc;
}

// create the container of holders
@Deprecated
protected Set<ItemHolder> createContainer(int size)
{
return new HashSet<>(size);
Expand All @@ -809,33 +902,23 @@ protected Set<ItemHolder> createContainer(int size)
// gets all the items of this poller
protected Collection<ItemHolder> items()
{
return all;
return all.stream().map(holder -> holder.handler(globalHandler)).collect(Collectors.toSet());
}

// gets all the items of this poller regarding the given input
protected Iterable<ItemHolder> items(final Object socketOrChannel)
{
final Set<ItemHolder> set = items.get(socketOrChannel);
if (set == null) {
final CompositePollItem aggregate = items.get(socketOrChannel);
if (aggregate == null) {
return Collections.emptySet();
}
return set;
return aggregate.holders;
}

// filters items to get the first one matching the criteria, or null if none found
protected zmq.poll.PollItem filter(final Object socketOrChannel, int events)
protected PollItem filter(final Object socketOrChannel, int events)
{
if (socketOrChannel == null) {
return null;
}

final Iterable<ItemHolder> items = items(socketOrChannel);
for (ItemHolder item : items) {
final zmq.poll.PollItem it = item.item();
if (it.hasEvent(events)) {
return it;
}
}
return null;
return Optional.ofNullable(socketOrChannel).map(items::get).map(CompositePollItem::item)
.filter(item -> item.hasEvent(events)).orElse(null);
}
}
1 change: 0 additions & 1 deletion src/main/java/zmq/poll/PollItem.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ public int interestOps()
return interest;
}

@Deprecated
public int zinterestOps()
{
return zinterest;
Expand Down
Loading

0 comments on commit 7f476dd

Please sign in to comment.