Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optionally close connections when inactive #116

Merged
merged 29 commits into from
Sep 19, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
ef6763d
Added a `closeAfterInactivityTime` option to channel config objects. …
Jul 13, 2014
0fef77c
Added a handler that will close connections if they're idle for a pre…
Sep 4, 2014
54d57a2
Removed a spurious block in a test.
Sep 5, 2014
44356fa
Added `gracefulShutdownTimeout` to `ApnsConnectionConfiguration`.
Sep 5, 2014
b0a027c
Added support for shutting down immediately if a graceful shutdown at…
Sep 5, 2014
e7576fa
Added a test for graceful shutdown timeouts.
Sep 5, 2014
6b39dde
Trigger a write failure if we know the connection is shutting down.
jchambers Sep 6, 2014
8a67836
Added a send attempt limit option to ApnsConnectionConfiguration.
jchambers Sep 6, 2014
d08ad7a
Merge branch 'close_connections_when_inactive' of github.com:relayrid…
jchambers Sep 6, 2014
d10f979
Added shut-down-after-send-attempt-limit functionality and tests.
jchambers Sep 6, 2014
343c894
Merge branch 'master' of github.com:relayrides/pushy into close_conne…
jchambers Sep 6, 2014
263aca7
Merge branch 'close_connections_when_inactive' of github.com:relayrid…
jchambers Sep 6, 2014
7814c0f
Fixed a copy constructor goof and updated tests accordingly.
Sep 8, 2014
67e1123
Added a clarifying comment.
Sep 8, 2014
8667e84
Merge branch 'close_connections_when_inactive' of github.com:relayrid…
Sep 8, 2014
e3507ae
Fixed more copy constructor mistakes and added some clarifying commen…
Sep 8, 2014
dea9b7d
Merge branch 'close_connections_when_inactive' of github.com:relayrid…
Sep 8, 2014
b3b6e31
Fixed a spelling error.
Sep 8, 2014
2b4848d
Added an explicit check for null configurations in the ApnsConnection…
Sep 8, 2014
f5ab059
Let us never speak of this again.
Sep 8, 2014
10ede21
Reworded docs for clarity.
Sep 8, 2014
1d6e774
Merge branch 'close_connections_when_inactive' of github.com:relayrid…
Sep 8, 2014
8d06356
Added still more clarifying comments.
Sep 8, 2014
2febe14
Merge pull request #117 from relayrides/graceful_shutdown_timeout
jchambers Sep 9, 2014
e85b249
Merge branch 'close_connections_when_inactive' of github.com:relayrid…
Sep 9, 2014
524b9ec
Updated copy constructor, hashCode, equals, and tests for ApnsConnect…
Sep 9, 2014
2f64f36
Restored some accidentally removed `final` modifiers.
Sep 19, 2014
5cf6ce1
Merge pull request #118 from relayrides/close_after_send_attempt_limit
jchambers Sep 19, 2014
afd937a
Merge branch 'master' of github.com:relayrides/pushy into close_conne…
Sep 19, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 106 additions & 39 deletions src/main/java/com/relayrides/pushy/apns/ApnsConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;

Expand Down Expand Up @@ -68,6 +70,7 @@ public class ApnsConnection<T extends ApnsPushNotification> {
private final ApnsEnvironment environment;
private final SSLContext sslContext;
private final NioEventLoopGroup eventLoopGroup;
private final ApnsConnectionConfiguration configuration;
private final ApnsConnectionListener<T> listener;

private final String name;
Expand All @@ -84,12 +87,17 @@ public class ApnsConnection<T extends ApnsPushNotification> {

private final Object pendingWriteMonitor = new Object();
private int pendingWriteCount = 0;
private int sendAttempts = 0;

private SendableApnsPushNotification<KnownBadPushNotification> shutdownNotification;

private boolean rejectionReceived = false;
private final SentNotificationBuffer<T> sentNotificationBuffer;

private static final String PIPELINE_MAIN_HANDLER = "handler";
private static final String PIPELINE_IDLE_STATE_HANDLER = "idleStateHandler";
private static final String PIPELINE_GRACEFUL_SHUTDOWN_TIMEOUT_HANDLER = "gracefulShutdownTimeoutHandler";

private static final Logger log = LoggerFactory.getLogger(ApnsConnection.class);

public static final int DEFAULT_SENT_NOTIFICATION_BUFFER_CAPACITY = 8192;
Expand Down Expand Up @@ -315,6 +323,25 @@ public void channelWritabilityChanged(final ChannelHandlerContext context) throw
this.apnsConnection, context.channel().isWritable());
}
}

@Override
public void userEventTriggered(final ChannelHandlerContext context, final Object event) throws Exception {
if (event instanceof IdleStateEvent) {
// The IdleStateHandler for connection inactivity is removed by shutdownGracefully, which also populates
// shutdownNotification. If we get an IdleStateEvent without a shutdownNotification, we know that the
// event came from the connection inactivity handler. Otherwise, we know it came from the graceful
// shutdown timeout handler.
if (this.apnsConnection.shutdownNotification == null) {
log.debug("{} will shut down gracefully due to inactivity.", this.apnsConnection.name);
this.apnsConnection.shutdownGracefully();
} else {
log.debug("Graceful shutdown attempt for {} timed out; shutting down immediately.", this.apnsConnection.name);
this.apnsConnection.shutdownImmediately();
}
} else {
super.userEventTriggered(context, event);
}
}
}

/**
Expand Down Expand Up @@ -353,6 +380,12 @@ public ApnsConnection(final ApnsEnvironment environment, final SSLContext sslCon
}

this.eventLoopGroup = eventLoopGroup;

if (configuration == null) {
throw new NullPointerException("Connection configuration must not be null.");
}

this.configuration = configuration;
this.listener = listener;

if (name == null) {
Expand Down Expand Up @@ -400,7 +433,7 @@ protected void initChannel(final SocketChannel channel) {
pipeline.addLast("ssl", new SslHandler(sslEngine));
pipeline.addLast("decoder", new RejectedNotificationDecoder());
pipeline.addLast("encoder", new ApnsPushNotificationEncoder());
pipeline.addLast("handler", new ApnsConnectionHandler(apnsConnection));
pipeline.addLast(ApnsConnection.PIPELINE_MAIN_HANDLER, new ApnsConnectionHandler(apnsConnection));
}
});

Expand Down Expand Up @@ -428,6 +461,13 @@ public void operationComplete(final Future<Channel> handshakeFuture) {
if (apnsConnection.listener != null) {
apnsConnection.listener.handleConnectionSuccess(apnsConnection);
}

if (apnsConnection.configuration.getCloseAfterInactivityTime() != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can configuration be null as well? If so, an extra null check for that as well would be great.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can configuration be null as well?

No, but I made that clearer in 2b4848d.

There's already a test to make sure constructing a connection fails with a NullPointerException, but that was happening by way of making an unchecked read to the given configuration object (we would try to blindly read the sent notification buffer size) rather than checking for null configurations directly.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops -- f5ab059, too.

connectFuture.channel().pipeline().addBefore(ApnsConnection.PIPELINE_MAIN_HANDLER,
ApnsConnection.PIPELINE_IDLE_STATE_HANDLER,
new IdleStateHandler(0, 0, apnsConnection.configuration.getCloseAfterInactivityTime()));
}

} else {
log.debug("{} failed to complete TLS handshake with APNs gateway.",
apnsConnection.name, handshakeFuture.cause());
Expand Down Expand Up @@ -476,58 +516,69 @@ public synchronized void sendNotification(final T notification) {
throw new IllegalStateException(String.format("%s has not completed handshake.", this.name));
}

this.connectFuture.channel().eventLoop().execute(new Runnable() {
if (this.shutdownNotification == null) {
this.connectFuture.channel().eventLoop().execute(new Runnable() {

@Override
public void run() {
final SendableApnsPushNotification<T> sendableNotification =
new SendableApnsPushNotification<T>(notification, apnsConnection.sequenceNumber++);
@Override
public void run() {
final SendableApnsPushNotification<T> sendableNotification =
new SendableApnsPushNotification<T>(notification, apnsConnection.sequenceNumber++);

log.trace("{} sending {}", apnsConnection.name, sendableNotification);
log.trace("{} sending {}", apnsConnection.name, sendableNotification);

apnsConnection.pendingWriteCount += 1;
apnsConnection.pendingWriteCount += 1;

apnsConnection.connectFuture.channel().writeAndFlush(sendableNotification).addListener(new GenericFutureListener<ChannelFuture>() {
apnsConnection.connectFuture.channel().writeAndFlush(sendableNotification).addListener(new GenericFutureListener<ChannelFuture>() {

@Override
public void operationComplete(final ChannelFuture writeFuture) {
if (writeFuture.isSuccess()) {
log.trace("{} successfully wrote notification {}", apnsConnection.name,
sendableNotification.getSequenceNumber());
@Override
public void operationComplete(final ChannelFuture writeFuture) {
if (writeFuture.isSuccess()) {
log.trace("{} successfully wrote notification {}", apnsConnection.name,
sendableNotification.getSequenceNumber());

if (apnsConnection.rejectionReceived) {
// Even though the write succeeded, we know for sure that this notification was never
// processed by the gateway because it had already rejected another notification from
// this connection.
if (apnsConnection.listener != null) {
apnsConnection.listener.handleUnprocessedNotifications(apnsConnection, java.util.Collections.singletonList(notification));
if (apnsConnection.rejectionReceived) {
// Even though the write succeeded, we know for sure that this notification was never
// processed by the gateway because it had already rejected another notification from
// this connection.
if (apnsConnection.listener != null) {
apnsConnection.listener.handleUnprocessedNotifications(apnsConnection, java.util.Collections.singletonList(notification));
}
} else {
apnsConnection.sentNotificationBuffer.addSentNotification(sendableNotification);
}
} else {
apnsConnection.sentNotificationBuffer.addSentNotification(sendableNotification);
}
} else {
log.trace("{} failed to write notification {}",
apnsConnection.name, sendableNotification, writeFuture.cause());

// Assume this is a temporary failure (we know it's not a permanent rejection because we didn't
// even manage to write the notification to the wire) and re-enqueue for another send attempt.
if (apnsConnection.listener != null) {
apnsConnection.listener.handleWriteFailure(apnsConnection, notification, writeFuture.cause());
log.trace("{} failed to write notification {}",
apnsConnection.name, sendableNotification, writeFuture.cause());

// Assume this is a temporary failure (we know it's not a permanent rejection because we didn't
// even manage to write the notification to the wire) and re-enqueue for another send attempt.
if (apnsConnection.listener != null) {
apnsConnection.listener.handleWriteFailure(apnsConnection, notification, writeFuture.cause());
}
}
}

apnsConnection.pendingWriteCount -= 1;
assert apnsConnection.pendingWriteCount >= 0;
apnsConnection.pendingWriteCount -= 1;
assert apnsConnection.pendingWriteCount >= 0;

if (apnsConnection.pendingWriteCount == 0) {
synchronized (apnsConnection.pendingWriteMonitor) {
apnsConnection.pendingWriteMonitor.notifyAll();
if (apnsConnection.pendingWriteCount == 0) {
synchronized (apnsConnection.pendingWriteMonitor) {
apnsConnection.pendingWriteMonitor.notifyAll();
}
}
}
}
});
});
}
});
} else {
if (this.listener != null) {
this.listener.handleWriteFailure(this, notification, new IllegalStateException("Connection is shutting down."));
}
});
}

if (this.configuration.getSendAttemptLimit() != null && ++this.sendAttempts >= this.configuration.getSendAttemptLimit()) {
log.debug("{} reached send attempt limit and will shut down gracefully.", this.name);
this.shutdownGracefully();
}
}

/**
Expand Down Expand Up @@ -569,6 +620,12 @@ public void waitForPendingWritesToFinish() throws InterruptedException {
*/
public synchronized void shutdownGracefully() {

if (this.connectFuture != null && this.connectFuture.channel() != null) {
if (this.connectFuture.channel().pipeline().get(ApnsConnection.PIPELINE_IDLE_STATE_HANDLER) != null) {
this.connectFuture.channel().pipeline().remove(ApnsConnection.PIPELINE_IDLE_STATE_HANDLER);
}
}

final ApnsConnection<T> apnsConnection = this;

// We only need to send a known-bad notification if we were ever connected in the first place and if we're
Expand All @@ -587,6 +644,16 @@ public void run() {
apnsConnection.shutdownNotification = new SendableApnsPushNotification<KnownBadPushNotification>(
new KnownBadPushNotification(), apnsConnection.sequenceNumber++);

if (apnsConnection.configuration.getGracefulShutdownTimeout() != null &&
apnsConnection.connectFuture.channel().pipeline().get(PIPELINE_GRACEFUL_SHUTDOWN_TIMEOUT_HANDLER) == null) {
// We should time out, but haven't added an idle state handler yet.
apnsConnection.connectFuture.channel().pipeline().addBefore(
PIPELINE_MAIN_HANDLER,
PIPELINE_GRACEFUL_SHUTDOWN_TIMEOUT_HANDLER,
new IdleStateHandler(apnsConnection.configuration.getGracefulShutdownTimeout(), 0, 0));
}


apnsConnection.pendingWriteCount += 1;

apnsConnection.connectFuture.channel().writeAndFlush(apnsConnection.shutdownNotification).addListener(new GenericFutureListener<ChannelFuture>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
public class ApnsConnectionConfiguration {

private int sentNotificationBufferCapacity = ApnsConnection.DEFAULT_SENT_NOTIFICATION_BUFFER_CAPACITY;
private Integer closeAfterInactivityTime = null;
private Integer gracefulShutdownTimeout = null;
private Integer sendAttemptLimit = null;

/**
* Creates a new connection configuration object with all options set to their default values.
Expand All @@ -22,6 +25,9 @@ public ApnsConnectionConfiguration() {}
*/
public ApnsConnectionConfiguration(final ApnsConnectionConfiguration configuration) {
this.sentNotificationBufferCapacity = configuration.sentNotificationBufferCapacity;
this.closeAfterInactivityTime = configuration.closeAfterInactivityTime;
this.gracefulShutdownTimeout = configuration.gracefulShutdownTimeout;
this.sendAttemptLimit = configuration.sendAttemptLimit;
}

/**
Expand All @@ -42,29 +48,130 @@ public int getSentNotificationBufferCapacity() {
* @param sentNotificationBufferCapacity the sent notification buffer capacity for connections created with this
* configuration
*/
public void setSentNotificationBufferCapacity(int sentNotificationBufferCapacity) {
public void setSentNotificationBufferCapacity(final int sentNotificationBufferCapacity) {
this.sentNotificationBufferCapacity = sentNotificationBufferCapacity;
}

/**
* Returns the time, in seconds, between the sending of the last push notification and connection closure. If
* {@code null}, connections created with this configuration will never be closed due to inactivity.
*
* @return the time, in seconds, between the sending of the last push notification and connection closure
*/
public Integer getCloseAfterInactivityTime() {
return this.closeAfterInactivityTime;
}

/**
* Sets the time, in seconds, between the sending of the last push notification and connection closure. If
* {@code null} (the default), connections will never be closed due to inactivity.
*
* @param closeAfterInactivityTime the time, in seconds, between the sending of the last push notification and
* connection closure
*/
public void setCloseAfterInactivityTime(final Integer closeAfterInactivityTime) {
this.closeAfterInactivityTime = closeAfterInactivityTime;
}

/**
* Returns the time, in seconds, after which a graceful shutdown attempt should be abandoned and the connection
* should be closed immediately.
*
* @return the time, in seconds, after which a graceful shutdown attempt should be abandoned and the connection
* should be closed immediately
*/
public Integer getGracefulShutdownTimeout() {
return this.gracefulShutdownTimeout;
}

/**
* Sets the time, in seconds, after which a graceful shutdown attempt should be abandoned and the connection should
* be closed immediately. If {@code null} (the default) graceful shutdown attempts will never time out. Note that,
* if a graceful shutdown attempt times out, no guarantees are made as to the state of notifications sent by the
* connection.
*
* @param gracefulShutdownTimeout the time, in seconds, after which a graceful shutdown attempt should be abandoned
*/
public void setGracefulShutdownTimeout(final Integer gracefulShutdownTimeout) {
this.gracefulShutdownTimeout = gracefulShutdownTimeout;
}

/**
* Returns the number of notifications a connection may attempt to send before shutting down.
*
* @return the number of notifications a connection may attempt to send before shutting down, or {@code null} if no
* limit has been set
*/
public Integer getSendAttemptLimit() {
return this.sendAttemptLimit;
}

/**
* <p>Sets the number of notifications a connection may attempt to send before shutting down. If not {@code null},
* connections will attempt to shut down gracefully after the given number of send attempts regardless of whether
* those attempts were actually successful. By default, no limit is set.</p>
*
* <p>If the a send attempt limit is set and is less than the sent notification buffer size, it is guaranteed that
* notifications will never be lost due to buffer overruns (though they may be lost by other means, such as non-
* graceful shutdowns).</p>
*
* @param sendAttemptLimit the number of notifications the connection may attempt to send before shutting down
* gracefully; if {@code null}, no limit is set
*
* @see ApnsConnectionConfiguration#setSentNotificationBufferCapacity(int)
*/
public void setSendAttemptLimit(final Integer sendAttemptLimit) {
this.sendAttemptLimit = sendAttemptLimit;
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime
* result
+ ((closeAfterInactivityTime == null) ? 0
: closeAfterInactivityTime.hashCode());
result = prime
* result
+ ((gracefulShutdownTimeout == null) ? 0
: gracefulShutdownTimeout.hashCode());
result = prime
* result
+ ((sendAttemptLimit == null) ? 0 : sendAttemptLimit.hashCode());
result = prime * result + sentNotificationBufferCapacity;
return result;
}

@Override
public boolean equals(Object obj) {
public boolean equals(final Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
ApnsConnectionConfiguration other = (ApnsConnectionConfiguration) obj;
final ApnsConnectionConfiguration other = (ApnsConnectionConfiguration) obj;
if (closeAfterInactivityTime == null) {
if (other.closeAfterInactivityTime != null)
return false;
} else if (!closeAfterInactivityTime
.equals(other.closeAfterInactivityTime))
return false;
if (gracefulShutdownTimeout == null) {
if (other.gracefulShutdownTimeout != null)
return false;
} else if (!gracefulShutdownTimeout
.equals(other.gracefulShutdownTimeout))
return false;
if (sendAttemptLimit == null) {
if (other.sendAttemptLimit != null)
return false;
} else if (!sendAttemptLimit.equals(other.sendAttemptLimit))
return false;
if (sentNotificationBufferCapacity != other.sentNotificationBufferCapacity)
return false;
return true;
}

}
Loading