Skip to content

Commit

Permalink
Merge pull request #305 from fl4via/REM3-413_5.0
Browse files Browse the repository at this point in the history
[REM3-413] Properly treat the scenario where a connection is pre auth…
  • Loading branch information
fl4via committed Jun 14, 2024
2 parents 332d5e3 + ff37800 commit 92c45cf
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ public void accept(final StreamConnection channel) {
final RemoteConnection connection = new RemoteConnection(channel, sslChannel, optionMap, HttpUpgradeConnectionProvider.this);
final ServerConnectionOpenListener openListener = new ServerConnectionOpenListener(connection, getConnectionProviderContext(), saslAuthenticationFactory, optionMap);
channel.getSinkChannel().setWriteListener(connection.getWriteListener());
channel.getSinkChannel().setCloseListener(c -> connection.getWriteListener().shutdownWrites());
conn.tracef("Accepted connection from %s to %s", channel.getPeerAddress(), channel.getLocalAddress());
openListener.handleEvent(channel.getSourceChannel());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,10 @@ final class RemoteWriteListener implements ChannelListener<ConduitStreamSinkChan
public void handleEvent(final ConduitStreamSinkChannel channel) {
final ByteBuffer[] cachedArray = this.cachedArray;
synchronized (queue) {
if (closed && !channel.isOpen()) {
Messages.conn.trace("Skipping write event because write listener is in closed state and channel is not open");
return;
}
Pooled<ByteBuffer> pooled;
final Queue<Pooled<ByteBuffer>> queue = this.queue;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ public Cancellable cancel() {
if (connection.isOpen()) {
remoteConnection.setResult(cancellableResult);
connection.getSinkChannel().setWriteListener(remoteConnection.getWriteListener());
connection.getSinkChannel().setCloseListener(channel -> remoteConnection.getWriteListener().shutdownWrites());
final ClientConnectionOpenListener openListener = new ClientConnectionOpenListener(destination, remoteConnection, connectionProviderContext, authenticationConfiguration, saslClientFactoryOperator, serverMechs, connectOptions);
openListener.handleEvent(connection.getSourceChannel());
}
Expand Down Expand Up @@ -414,6 +415,7 @@ private void handleAccepted(final StreamConnection accepted, final SslChannel ss
final RemoteConnection connection = new RemoteConnection(accepted, sslChannel, serverOptionMap, RemoteConnectionProvider.this);
final ServerConnectionOpenListener openListener = new ServerConnectionOpenListener(connection, connectionProviderContext, saslAuthenticationFactory, serverOptionMap);
accepted.getSinkChannel().setWriteListener(connection.getWriteListener());
accepted.getSinkChannel().setCloseListener(channel -> connection.getWriteListener().shutdownWrites());
log.tracef("Accepted connection from %s to %s", connection.getPeerAddress(), connection.getLocalAddress());
openListener.handleEvent(accepted.getSourceChannel());
}
Expand Down

0 comments on commit 92c45cf

Please sign in to comment.