Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optionally close connections when inactive #116

Merged
merged 29 commits into from
Sep 19, 2014
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
ef6763d
Added a `closeAfterInactivityTime` option to channel config objects. …
Jul 13, 2014
0fef77c
Added a handler that will close connections if they're idle for a pre…
Sep 4, 2014
54d57a2
Removed a spurious block in a test.
Sep 5, 2014
44356fa
Added `gracefulShutdownTimeout` to `ApnsConnectionConfiguration`.
Sep 5, 2014
b0a027c
Added support for shutting down immediately if a graceful shutdown at…
Sep 5, 2014
e7576fa
Added a test for graceful shutdown timeouts.
Sep 5, 2014
6b39dde
Trigger a write failure if we know the connection is shutting down.
jchambers Sep 6, 2014
8a67836
Added a send attempt limit option to ApnsConnectionConfiguration.
jchambers Sep 6, 2014
d08ad7a
Merge branch 'close_connections_when_inactive' of github.com:relayrid…
jchambers Sep 6, 2014
d10f979
Added shut-down-after-send-attempt-limit functionality and tests.
jchambers Sep 6, 2014
343c894
Merge branch 'master' of github.com:relayrides/pushy into close_conne…
jchambers Sep 6, 2014
263aca7
Merge branch 'close_connections_when_inactive' of github.com:relayrid…
jchambers Sep 6, 2014
7814c0f
Fixed a copy constructor goof and updated tests accordingly.
Sep 8, 2014
67e1123
Added a clarifying comment.
Sep 8, 2014
8667e84
Merge branch 'close_connections_when_inactive' of github.com:relayrid…
Sep 8, 2014
e3507ae
Fixed more copy constructor mistakes and added some clarifying commen…
Sep 8, 2014
dea9b7d
Merge branch 'close_connections_when_inactive' of github.com:relayrid…
Sep 8, 2014
b3b6e31
Fixed a spelling error.
Sep 8, 2014
2b4848d
Added an explicit check for null configurations in the ApnsConnection…
Sep 8, 2014
f5ab059
Let us never speak of this again.
Sep 8, 2014
10ede21
Reworded docs for clarity.
Sep 8, 2014
1d6e774
Merge branch 'close_connections_when_inactive' of github.com:relayrid…
Sep 8, 2014
8d06356
Added still more clarifying comments.
Sep 8, 2014
2febe14
Merge pull request #117 from relayrides/graceful_shutdown_timeout
jchambers Sep 9, 2014
e85b249
Merge branch 'close_connections_when_inactive' of github.com:relayrid…
Sep 9, 2014
524b9ec
Updated copy constructor, hashCode, equals, and tests for ApnsConnect…
Sep 9, 2014
2f64f36
Restored some accidentally removed `final` modifiers.
Sep 19, 2014
5cf6ce1
Merge pull request #118 from relayrides/close_after_send_attempt_limit
jchambers Sep 19, 2014
afd937a
Merge branch 'master' of github.com:relayrides/pushy into close_conne…
Sep 19, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion src/main/java/com/relayrides/pushy/apns/ApnsConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;

Expand Down Expand Up @@ -69,6 +71,7 @@ public class ApnsConnection<T extends ApnsPushNotification> {
private final ApnsEnvironment environment;
private final SSLContext sslContext;
private final NioEventLoopGroup eventLoopGroup;
private final ApnsConnectionConfiguration configuration;
private final ApnsConnectionListener<T> listener;

private static final AtomicInteger connectionCounter = new AtomicInteger(0);
Expand All @@ -92,6 +95,9 @@ public class ApnsConnection<T extends ApnsPushNotification> {
private boolean rejectionReceived = false;
private final SentNotificationBuffer<T> sentNotificationBuffer;

private static final String PIPELINE_MAIN_HANDLER = "handler";
private static final String PIPELINE_IDLE_STATE_HANDLER = "idleStateHandler";

private static final Logger log = LoggerFactory.getLogger(ApnsConnection.class);

public static final int DEFAULT_SENT_NOTIFICATION_BUFFER_CAPACITY = 8192;
Expand Down Expand Up @@ -317,6 +323,18 @@ public void channelWritabilityChanged(final ChannelHandlerContext context) throw
this.apnsConnection, context.channel().isWritable());
}
}

@Override
public void userEventTriggered(final ChannelHandlerContext context, final Object event) throws Exception {
if (event instanceof IdleStateEvent) {
log.debug("{} will shut down due to inactivity.", this.apnsConnection.name);

context.pipeline().remove(ApnsConnection.PIPELINE_IDLE_STATE_HANDLER);
this.apnsConnection.shutdownGracefully();
} else {
super.userEventTriggered(context, event);
}
}
}

/**
Expand Down Expand Up @@ -354,6 +372,7 @@ public ApnsConnection(final ApnsEnvironment environment, final SSLContext sslCon
}

this.eventLoopGroup = eventLoopGroup;
this.configuration = configuration;
this.listener = listener;

this.sentNotificationBuffer = new SentNotificationBuffer<T>(configuration.getSentNotificationBufferCapacity());
Expand Down Expand Up @@ -397,14 +416,15 @@ protected void initChannel(final SocketChannel channel) {
pipeline.addLast("ssl", new SslHandler(sslEngine));
pipeline.addLast("decoder", new RejectedNotificationDecoder());
pipeline.addLast("encoder", new ApnsPushNotificationEncoder());
pipeline.addLast("handler", new ApnsConnectionHandler(apnsConnection));
pipeline.addLast(ApnsConnection.PIPELINE_MAIN_HANDLER, new ApnsConnectionHandler(apnsConnection));
}
});

log.debug("{} beginning connection process.", apnsConnection.name);
this.connectFuture = bootstrap.connect(this.environment.getApnsGatewayHost(), this.environment.getApnsGatewayPort());
this.connectFuture.addListener(new GenericFutureListener<ChannelFuture>() {

@Override
public void operationComplete(final ChannelFuture connectFuture) {
if (connectFuture.isSuccess()) {
log.debug("{} connected; waiting for TLS handshake.", apnsConnection.name);
Expand All @@ -414,6 +434,7 @@ public void operationComplete(final ChannelFuture connectFuture) {
try {
sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {

@Override
public void operationComplete(final Future<Channel> handshakeFuture) {
if (handshakeFuture.isSuccess()) {
log.debug("{} successfully completed TLS handshake.", apnsConnection.name);
Expand All @@ -423,6 +444,13 @@ public void operationComplete(final Future<Channel> handshakeFuture) {
if (apnsConnection.listener != null) {
apnsConnection.listener.handleConnectionSuccess(apnsConnection);
}

if (apnsConnection.configuration.getCloseAfterInactivityTime() != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can configuration be null as well? If so, an extra null check for that as well would be great.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can configuration be null as well?

No, but I made that clearer in 2b4848d.

There's already a test to make sure constructing a connection fails with a NullPointerException, but that was happening by way of making an unchecked read to the given configuration object (we would try to blindly read the sent notification buffer size) rather than checking for null configurations directly.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops -- f5ab059, too.

connectFuture.channel().pipeline().addBefore(ApnsConnection.PIPELINE_MAIN_HANDLER,
ApnsConnection.PIPELINE_IDLE_STATE_HANDLER,
new IdleStateHandler(0, 0, apnsConnection.configuration.getCloseAfterInactivityTime()));
}

} else {
log.debug("{} failed to complete TLS handshake with APNs gateway.",
apnsConnection.name, handshakeFuture.cause());
Expand Down Expand Up @@ -473,6 +501,7 @@ public synchronized void sendNotification(final T notification) {

this.connectFuture.channel().eventLoop().execute(new Runnable() {

@Override
public void run() {
final SendableApnsPushNotification<T> sendableNotification =
new SendableApnsPushNotification<T>(notification, apnsConnection.sequenceNumber++);
Expand All @@ -483,6 +512,7 @@ public void run() {

apnsConnection.connectFuture.channel().writeAndFlush(sendableNotification).addListener(new GenericFutureListener<ChannelFuture>() {

@Override
public void operationComplete(final ChannelFuture writeFuture) {
if (writeFuture.isSuccess()) {
log.trace("{} successfully wrote notification {}", apnsConnection.name,
Expand Down Expand Up @@ -570,6 +600,7 @@ public synchronized void shutdownGracefully() {

this.connectFuture.channel().eventLoop().execute(new Runnable() {

@Override
public void run() {
// Don't send a second shutdown notification if we've already started the graceful shutdown process.
if (apnsConnection.shutdownNotification == null) {
Expand All @@ -583,6 +614,7 @@ public void run() {

apnsConnection.connectFuture.channel().writeAndFlush(apnsConnection.shutdownNotification).addListener(new GenericFutureListener<ChannelFuture>() {

@Override
public void operationComplete(final ChannelFuture future) {
if (future.isSuccess()) {
log.trace("{} successfully wrote known-bad notification {}",
Expand Down Expand Up @@ -643,6 +675,7 @@ private Runnable getImmediateShutdownRunnable() {
final ApnsConnection<T> apnsConnection = this;

return new Runnable() {
@Override
public void run() {
final SslHandler sslHandler = apnsConnection.connectFuture.channel().pipeline().get(SslHandler.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
public class ApnsConnectionConfiguration {

private int sentNotificationBufferCapacity = ApnsConnection.DEFAULT_SENT_NOTIFICATION_BUFFER_CAPACITY;
private Integer closeAfterInactivityTime = null;

/**
* Creates a new connection configuration object with all options set to their default values.
Expand All @@ -22,6 +23,7 @@ public ApnsConnectionConfiguration() {}
*/
public ApnsConnectionConfiguration(final ApnsConnectionConfiguration configuration) {
this.sentNotificationBufferCapacity = configuration.sentNotificationBufferCapacity;
this.closeAfterInactivityTime = configuration.closeAfterInactivityTime;
}

/**
Expand All @@ -42,27 +44,60 @@ public int getSentNotificationBufferCapacity() {
* @param sentNotificationBufferCapacity the sent notification buffer capacity for connections created with this
* configuration
*/
public void setSentNotificationBufferCapacity(int sentNotificationBufferCapacity) {
public void setSentNotificationBufferCapacity(final int sentNotificationBufferCapacity) {
this.sentNotificationBufferCapacity = sentNotificationBufferCapacity;
}

/**
* Returns the time, in seconds, since the last push notification was sent after which connections created with this
* configuration will be closed. If {@code null}, connections created with this configuration will never be closed
* due to inactivity.
*
* @return the time, in seconds, since the last push notification was sent after which connections created with this
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: A little rephrasing might make this clearer. What about @return the time, in seconds, between the last push notification and connection closure ?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah -- that's way better. Done in 10ede21.

* configuration will be closed
*/
public Integer getCloseAfterInactivityTime() {
return this.closeAfterInactivityTime;
}

/**
* Sets the time, in seconds, since the last push notification was sent after which connections created with this
* configuration will be closed. If {@code null} (the default), connections will never be closed due to inactivity.
*
* @param closeAfterInactivityTime the time, in seconds since the last push notification was sent, after which
* connections will be closed
*/
public void setCloseAfterInactivityTime(final Integer closeAfterInactivityTime) {
this.closeAfterInactivityTime = closeAfterInactivityTime;
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime
* result
+ ((closeAfterInactivityTime == null) ? 0
: closeAfterInactivityTime.hashCode());
result = prime * result + sentNotificationBufferCapacity;
return result;
}

@Override
public boolean equals(Object obj) {
public boolean equals(final Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
ApnsConnectionConfiguration other = (ApnsConnectionConfiguration) obj;
final ApnsConnectionConfiguration other = (ApnsConnectionConfiguration) obj;
if (closeAfterInactivityTime == null) {
if (other.closeAfterInactivityTime != null)
return false;
} else if (!closeAfterInactivityTime
.equals(other.closeAfterInactivityTime))
return false;
if (sentNotificationBufferCapacity != other.sentNotificationBufferCapacity)
return false;
return true;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.relayrides.pushy.apns;

import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import org.junit.Test;

Expand All @@ -11,12 +13,14 @@ public void testApnsConnectionConfiguration() {
final ApnsConnectionConfiguration configuration = new ApnsConnectionConfiguration();

assertTrue(configuration.getSentNotificationBufferCapacity() > 0);
assertNull(configuration.getCloseAfterInactivityTime());
}

@Test
public void testApnsConnectionConfigurationApnsConnectionConfiguration() {
final ApnsConnectionConfiguration configuration = new ApnsConnectionConfiguration();
configuration.setSentNotificationBufferCapacity(17);
configuration.setCloseAfterInactivityTime(19);

final ApnsConnectionConfiguration configurationCopy = new ApnsConnectionConfiguration(configuration);

Expand Down
49 changes: 41 additions & 8 deletions src/test/java/com/relayrides/pushy/apns/ApnsConnectionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,15 @@ public TestListener(final Object mutex) {
this.mutex = mutex;
}

@Override
public void handleConnectionSuccess(final ApnsConnection<SimpleApnsPushNotification> connection) {
synchronized (this.mutex) {
this.connectionSucceeded = true;
this.mutex.notifyAll();
}
}

@Override
public void handleConnectionFailure(final ApnsConnection<SimpleApnsPushNotification> connection, final Throwable cause) {
synchronized (mutex) {
this.connectionFailed = true;
Expand All @@ -81,7 +83,8 @@ public void handleConnectionFailure(final ApnsConnection<SimpleApnsPushNotificat
}
}

public void handleConnectionClosure(ApnsConnection<SimpleApnsPushNotification> connection) {
@Override
public void handleConnectionClosure(final ApnsConnection<SimpleApnsPushNotification> connection) {
try {
connection.waitForPendingWritesToFinish();
} catch (InterruptedException ignored) {
Expand All @@ -93,26 +96,30 @@ public void handleConnectionClosure(ApnsConnection<SimpleApnsPushNotification> c
}
}

public void handleWriteFailure(ApnsConnection<SimpleApnsPushNotification> connection,
SimpleApnsPushNotification notification, Throwable cause) {
@Override
public void handleWriteFailure(final ApnsConnection<SimpleApnsPushNotification> connection,
final SimpleApnsPushNotification notification, final Throwable cause) {

this.writeFailures.add(notification);
}

public void handleRejectedNotification(ApnsConnection<SimpleApnsPushNotification> connection,
SimpleApnsPushNotification rejectedNotification, RejectedNotificationReason reason) {
@Override
public void handleRejectedNotification(final ApnsConnection<SimpleApnsPushNotification> connection,
final SimpleApnsPushNotification rejectedNotification, final RejectedNotificationReason reason) {

this.rejectedNotification = rejectedNotification;
this.rejectionReason = reason;
}

public void handleUnprocessedNotifications(ApnsConnection<SimpleApnsPushNotification> connection,
Collection<SimpleApnsPushNotification> unprocessedNotifications) {
@Override
public void handleUnprocessedNotifications(final ApnsConnection<SimpleApnsPushNotification> connection,
final Collection<SimpleApnsPushNotification> unprocessedNotifications) {

this.unprocessedNotifications.addAll(unprocessedNotifications);
}

public void handleConnectionWritabilityChange(ApnsConnection<SimpleApnsPushNotification> connection, boolean writable) {
@Override
public void handleConnectionWritabilityChange(final ApnsConnection<SimpleApnsPushNotification> connection, final boolean writable) {
}
}

Expand Down Expand Up @@ -513,4 +520,30 @@ public void testWaitForPendingOperationsToFinish() throws Exception {
apnsConnection.shutdownGracefully();
}
}

@Test
public void testWriteTimeout() throws Exception {
final Object mutex = new Object();

final TestListener listener = new TestListener(mutex);

final ApnsConnectionConfiguration writeTimeoutConfiguration = new ApnsConnectionConfiguration();
writeTimeoutConfiguration.setCloseAfterInactivityTime(1);

final ApnsConnection<SimpleApnsPushNotification> apnsConnection =
new ApnsConnection<SimpleApnsPushNotification>(
TEST_ENVIRONMENT, SSLTestUtil.createSSLContextForTestClient(), this.getEventLoopGroup(),
writeTimeoutConfiguration, listener);

synchronized (mutex) {
apnsConnection.connect();

// Do nothing, but wait for the connection to time out due to inactivity
while (!listener.connectionClosed) {
mutex.wait();
}
}

assertTrue(listener.connectionClosed);
}
}