Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
GavinChenYan authored and stevehu committed Jul 3, 2020
1 parent 7fe5eec commit d5dcd45
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 0 deletions.
3 changes: 3 additions & 0 deletions body/src/main/java/com/networknt/body/BodyHandler.java
Expand Up @@ -130,6 +130,9 @@ private void attachFormDataBody(final HttpServerExchange exchange) throws IOExce
FormData formData = parser.parseBlocking();
data = BodyConverter.convert(formData);
exchange.putAttachment(REQUEST_BODY, data);
} else {
InputStream inputStream = exchange.getInputStream();
exchange.putAttachment(REQUEST_BODY, inputStream);
}
}

Expand Down
Expand Up @@ -698,6 +698,27 @@ public ByteBuffer callApiWithByteBuffer() throws Exception {
return reference.get().getAttachment(Http2Client.BUFFER_BODY);
}

public ByteBuffer callApiWithByteBufferRequest() throws Exception {
final Http2Client client = createClient();
final CountDownLatch latch = new CountDownLatch(1);
final ClientConnection connection = client.connect(ADDRESS, worker, Http2Client.BUFFER_POOL, OptionMap.EMPTY).get();
final AtomicReference<ClientResponse> reference = new AtomicReference<>();
try {
ClientRequest request = new ClientRequest().setPath(API).setMethod(Methods.GET);
request.getRequestHeaders().put(Headers.HOST, "localhost");
client.populateHeader(request, "Bearer token", "cid", "tid");
request.getRequestHeaders().add(Headers.CONNECTION, Headers.CLOSE.toString());
connection.sendRequest(request, client.byteBufferClientCallback(reference, latch, ByteBuffer.wrap("WebKitFormBoundaryOmz20xyMCkE27rN7".getBytes())));
latch.await();

// final ClientResponse response = reference.get();
Assert.assertNotNull(reference.get().getAttachment(Http2Client.BUFFER_BODY));
Assert.assertEquals(false, connection.isOpen());
} finally {
IoUtils.safeClose(connection);
}
return reference.get().getAttachment(Http2Client.BUFFER_BODY);
}

@Test
public void testAsyncAboutToExpire() throws InterruptedException, ExecutionException {
Expand Down
47 changes: 47 additions & 0 deletions client/src/main/java/com/networknt/client/Http2Client.java
Expand Up @@ -21,6 +21,7 @@
import com.networknt.client.circuitbreaker.CircuitBreaker;
import com.networknt.client.http.*;
import com.networknt.client.listener.ByteBufferReadChannelListener;
import com.networknt.client.listener.ByteBufferWriteChannelListener;
import com.networknt.client.oauth.Jwt;
import com.networknt.client.oauth.TokenManager;
import com.networknt.client.ssl.ClientX509ExtendedTrustManager;
Expand Down Expand Up @@ -803,6 +804,52 @@ public void failed(IOException e) {
};
}

public ClientCallback<ClientExchange> byteBufferClientCallback(final AtomicReference<ClientResponse> reference, final CountDownLatch latch, final ByteBuffer requestBody) {
return new ClientCallback<ClientExchange>() {
public void completed(ClientExchange result) {
new ByteBufferWriteChannelListener(requestBody).setup(result.getRequestChannel());
result.setResponseListener(new ClientCallback<ClientExchange>() {
public void completed(final ClientExchange result) {
reference.set(result.getResponse());
(new ByteBufferReadChannelListener(result.getConnection().getBufferPool()) {
protected void bufferDone(List<Byte> out) {
byte[] byteArray = new byte[out.size()];
int index = 0;
for (byte b : out) {
byteArray[index++] = b;
}
result.getResponse().putAttachment(BUFFER_BODY, (ByteBuffer.wrap(byteArray)));
latch.countDown();
}

protected void error(IOException e) {
latch.countDown();
}
}).setup(result.getResponseChannel());
}
public void failed(IOException e) {
latch.countDown();
}
});

try {
result.getRequestChannel().shutdownWrites();
if (!result.getRequestChannel().flush()) {
result.getRequestChannel().getWriteSetter().set(ChannelListeners.flushingChannelListener((ChannelListener)null, (ChannelExceptionHandler)null));
result.getRequestChannel().resumeWrites();
}
} catch (IOException var3) {
latch.countDown();
}

}

public void failed(IOException e) {
latch.countDown();
}
};
}

public ClientCallback<ClientExchange> createClientCallback(final AtomicReference<ClientResponse> reference, final CountDownLatch latch, final String requestBody) {
return new ClientCallback<ClientExchange>() {
@Override
Expand Down
@@ -0,0 +1,98 @@
package com.networknt.client.listener;

import io.undertow.UndertowLogger;
import org.xnio.ChannelListener;
import org.xnio.ChannelListeners;
import org.xnio.IoUtils;
import org.xnio.channels.StreamSinkChannel;

import java.io.IOException;
import java.nio.ByteBuffer;

/**
* This is a new Channel Listener to handler Byte Array write from request.
*
* @author Gavin Chen
*/

public class ByteBufferWriteChannelListener implements ChannelListener<StreamSinkChannel>{

private final ByteBuffer buffer;

public ByteBufferWriteChannelListener(ByteBuffer body) {
this.buffer =body;
}

public void setup(StreamSinkChannel channel) {
while(true) {
try {
int c = channel.write(this.buffer);
if (this.buffer.hasRemaining() && c > 0) {
continue;
}

if (this.buffer.hasRemaining()) {
channel.getWriteSetter().set(this);
channel.resumeWrites();
} else {
this.writeDone(channel);
}
} catch (IOException var3) {
this.handleError(channel, var3);
}

return;
}
}

protected void handleError(StreamSinkChannel channel, IOException e) {
UndertowLogger.REQUEST_IO_LOGGER.ioException(e);
IoUtils.safeClose(channel);
}

public void handleEvent(StreamSinkChannel channel) {
while(true) {
try {
int c = channel.write(this.buffer);
if (this.buffer.hasRemaining() && c > 0) {
continue;
}

if (this.buffer.hasRemaining()) {
channel.resumeWrites();
return;
}

this.writeDone(channel);
} catch (IOException var3) {
this.handleError(channel, var3);
}

return;
}
}

public boolean hasRemaining() {
return this.buffer.hasRemaining();
}

protected void writeDone(final StreamSinkChannel channel) {
try {
channel.shutdownWrites();
if (!channel.flush()) {
channel.getWriteSetter().set(ChannelListeners.flushingChannelListener(new ChannelListener<StreamSinkChannel>() {
public void handleEvent(StreamSinkChannel o) {
IoUtils.safeClose(channel);
}
}, ChannelListeners.closingChannelExceptionHandler()));
channel.resumeWrites();
}
} catch (IOException var3) {
this.handleError(channel, var3);
}

}

}


0 comments on commit d5dcd45

Please sign in to comment.