Skip to content

Commit

Permalink
Added writeVoidPromise, writeAndFlushVoidPromise methods to ChannelGr…
Browse files Browse the repository at this point in the history
…oup and DefaultChannelGroup

Motivation:

Reduce Objects and GC by option. Give the option to developers to be able to choose to Channel.voidPromise() when making group writes. This will reduce object creation for ChannelGroup writes and writeAndFlush's by adding additional methods which add the Channel.voidPromise() into the Channel.write() portion of the method.

Modifications:

* ChannelGroup.class - Added new methods to the interface along with documentation to the regular method and addition documentation specificly the purpose of the new methods. ChannelGroupFuture writeVoidPromise(Object message);, ChannelGroupFuture writeVoidPromise(Object message, ChannelMatcher matcher);, ChannelGroupFuture writeAndFlushVoidPromise(Object message);, ChannelGroupFuture writeAndFlushVoidPromise(Object message);.
* DefaultChannelGroup now implements these new interface methods in the utilizing the existing write and writeAndFlush method by coping them and then adding Channel.voidPromise() to the two methods that actually have the write logic - public ChannelGroupFuture writeVoidPromise(Object message, ChannelMatcher matcher)  and  public ChannelGroupFuture writeAndFlushVoidPromise(Object message, ChannelMatcher matcher).

Result:

Netty.io API users will now be able to use DefaultChannelGroup.writeVoidPromise() and DefaultChannelGroup.writeAndFlushVoidPromise();, and be able to save on object creation and GC when not caring about the FuturePromise.

closes #3127
  • Loading branch information
Underbalanced committed Nov 17, 2015
1 parent 8accc52 commit 462039d
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 0 deletions.
40 changes: 40 additions & 0 deletions transport/src/main/java/io/netty/channel/group/ChannelGroup.java
Expand Up @@ -119,6 +119,20 @@ public interface ChannelGroup extends Set<Channel>, Comparable<ChannelGroup> {
*/
ChannelGroupFuture write(Object message);

/**
* Writes the specified {@code message} to all {@link Channel}s in this
* group. If the specified {@code message} is an instance of
* {@link ByteBuf}, it is automatically
* {@linkplain ByteBuf#duplicate() duplicated} to avoid a race
* condition. The same is true for {@link ByteBufHolder}. Please note that this operation is asynchronous as
* {@link Channel#write(Object)} is. This method will reduce Object Allocation and thus reduce GC use this method
* if you are not interested in {@link io.netty.channel.ChannelFuture}. This method will write and use
* {@link io.netty.channel.VoidChannelPromise} for each channel that is written to.
*
* @return itself
*/
ChannelGroupFuture writeVoidPromise(Object message);

/**
* Writes the specified {@code message} to all {@link Channel}s in this
* group that match the given {@link ChannelMatcher}. If the specified {@code message} is an instance of
Expand All @@ -132,6 +146,21 @@ public interface ChannelGroup extends Set<Channel>, Comparable<ChannelGroup> {
*/
ChannelGroupFuture write(Object message, ChannelMatcher matcher);

/**
* Writes the specified {@code message} to all {@link Channel}s in this
* group that match the given {@link ChannelMatcher}. If the specified {@code message} is an instance of
* {@link ByteBuf}, it is automatically
* {@linkplain ByteBuf#duplicate() duplicated} to avoid a race
* condition. The same is true for {@link ByteBufHolder}. Please note that this operation is asynchronous as
* {@link Channel#write(Object)} is. This method will reduce Object Allocation and thus reduce GC use this method
* if you are not interested in {@link io.netty.channel.ChannelFuture}. This method will write and use
* {@link io.netty.channel.VoidChannelPromise} for each channel that is written to.
*
* @return the {@link ChannelGroupFuture} instance that notifies when
* the operation is done for all channels
*/
ChannelGroupFuture writeVoidPromise(Object message, ChannelMatcher matcher);

/**
* Flush all {@link Channel}s in this
* group. If the specified {@code messages} are an instance of
Expand Down Expand Up @@ -163,6 +192,11 @@ public interface ChannelGroup extends Set<Channel>, Comparable<ChannelGroup> {
*/
ChannelGroupFuture writeAndFlush(Object message);

/**
* Shortcut for calling {@link #writeVoidPromise(Object)} and {@link #flush()}.
*/
ChannelGroupFuture writeAndFlushVoidPromise(Object message);

/**
* @deprecated Use {@link #writeAndFlush(Object)} instead.
*/
Expand All @@ -175,6 +209,12 @@ public interface ChannelGroup extends Set<Channel>, Comparable<ChannelGroup> {
*/
ChannelGroupFuture writeAndFlush(Object message, ChannelMatcher matcher);

/**
* Shortcut for calling {@link #writeVoidPromise(Object)} and {@link #flush()} and only act on
* {@link Channel}s that match the {@link ChannelMatcher}.
*/
ChannelGroupFuture writeAndFlushVoidPromise(Object message, ChannelMatcher matcher);

/**
* @deprecated Use {@link #writeAndFlush(Object, ChannelMatcher)} instead.
*/
Expand Down
Expand Up @@ -240,6 +240,11 @@ public ChannelGroupFuture write(Object message) {
return write(message, ChannelMatchers.all());
}

@Override
public ChannelGroupFuture writeVoidPromise(Object message) {
return writeVoidPromise(message, ChannelMatchers.all());
}

// Create a safe duplicate of the message to write it to a channel but not affect other writes.
// See https://github.com/netty/netty/issues/1461
private static Object safeDuplicate(Object message) {
Expand Down Expand Up @@ -272,6 +277,26 @@ public ChannelGroupFuture write(Object message, ChannelMatcher matcher) {
return new DefaultChannelGroupFuture(this, futures, executor);
}

@Override
public ChannelGroupFuture writeVoidPromise(Object message, ChannelMatcher matcher) {
if (message == null) {
throw new NullPointerException("message");
}
if (matcher == null) {
throw new NullPointerException("matcher");
}

Map<Channel, ChannelFuture> futures = new LinkedHashMap<Channel, ChannelFuture>(size());
for (Channel c: nonServerChannels.values()) {
if (matcher.matches(c)) {
futures.put(c, c.write(safeDuplicate(message), c.voidPromise()));
}
}

ReferenceCountUtil.release(message);
return new DefaultChannelGroupFuture(this, futures, executor);
}

@Override
public ChannelGroup flush() {
return flush(ChannelMatchers.all());
Expand All @@ -287,6 +312,11 @@ public ChannelGroupFuture writeAndFlush(Object message) {
return writeAndFlush(message, ChannelMatchers.all());
}

@Override
public ChannelGroupFuture writeAndFlushVoidPromise(Object message) {
return writeAndFlushVoidPromise(message, ChannelMatchers.all());
}

@Override
public ChannelGroupFuture disconnect(ChannelMatcher matcher) {
if (matcher == null) {
Expand Down Expand Up @@ -400,6 +430,25 @@ public ChannelGroupFuture writeAndFlush(Object message, ChannelMatcher matcher)
return new DefaultChannelGroupFuture(this, futures, executor);
}

@Override
public ChannelGroupFuture writeAndFlushVoidPromise(Object message, ChannelMatcher matcher) {
if (message == null) {
throw new NullPointerException("message");
}

Map<Channel, ChannelFuture> futures = new LinkedHashMap<Channel, ChannelFuture>(size());

for (Channel c: nonServerChannels.values()) {
if (matcher.matches(c)) {
futures.put(c, c.writeAndFlush(safeDuplicate(message), c.voidPromise()));
}
}

ReferenceCountUtil.release(message);

return new DefaultChannelGroupFuture(this, futures, executor);
}

@Override
public ChannelGroupFuture newCloseFuture() {
return newCloseFuture(ChannelMatchers.all());
Expand Down

2 comments on commit 462039d

@netkins
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TeamCity pull requests :: netty Build 671 is now running

@netkins
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TeamCity pull requests :: netty Build 671 outcome was SUCCESS
Summary: Tests passed: 5695, ignored: 21 Build time: 00:44:20

Please sign in to comment.