Skip to content
This repository has been archived by the owner on Aug 27, 2022. It is now read-only.

Commit

Permalink
8240754: Instrument FlowTest.java to provide more debug traces
Browse files Browse the repository at this point in the history
Reviewed-by: chegar
  • Loading branch information
dfuch committed Mar 9, 2020
1 parent dc17821 commit 5c8f935
Showing 1 changed file with 96 additions and 36 deletions.
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -90,26 +90,27 @@ public FlowTest() throws IOException {
SSLFlowDelegate sslClient = new SSLFlowDelegate(engineClient, executor, end, looper);
// going to measure how long handshake takes
final long start = System.currentTimeMillis();
sslClient.alpn().whenComplete((String s, Throwable t) -> {
var alpnCF = sslClient.alpn()
.whenComplete((String s, Throwable t) -> {
if (t != null)
t.printStackTrace();
long endTime = System.currentTimeMillis();
alpn = s;
System.out.println("ALPN: " + alpn);
println("ALPN: " + alpn);
long period = (endTime - start);
System.out.printf("Handshake took %d ms\n", period);
printf("Handshake took %d ms", period);
});
Subscriber<List<ByteBuffer>> reader = sslClient.upstreamReader();
Subscriber<List<ByteBuffer>> writer = sslClient.upstreamWriter();
looper.setReturnSubscriber(reader);
// now connect all the pieces
srcPublisher.subscribe(writer);
String aa = sslClient.alpn().join();
System.out.println("AAALPN = " + aa);
String aa = alpnCF.join();
println("AAALPN = " + aa);
}

private void handlePublisherException(Object o, Throwable t) {
System.out.println("Src Publisher exception");
println("Src Publisher exception");
t.printStackTrace(System.out);
}

Expand All @@ -125,29 +126,43 @@ private static ByteBuffer getBuffer(long startingAt) {
@Test
public void run() {
long count = 0;
System.out.printf("Submitting %d buffer arrays\n", COUNTER);
System.out.printf("LoopCount should be %d\n", TOTAL_LONGS);
printf("Submitting %d buffer arrays", COUNTER);
printf("LoopCount should be %d", TOTAL_LONGS);
for (long i = 0; i < COUNTER; i++) {
ByteBuffer b = getBuffer(count);
count += LONGS_PER_BUF;
srcPublisher.submit(List.of(b));
}
System.out.println("Finished submission. Waiting for loopback");
println("Finished submission. Waiting for loopback");
// make sure we don't wait for allBytesReceived in case of error.
completion.whenComplete((r,t) -> allBytesReceived.countDown());
var done = completion.whenComplete((r,t) -> {
try {
if (t != null) {
println("Completion with error: " + t);
t.printStackTrace(System.out);
} else {
println("Successful completion");
}
} finally {
println("allBytesReceived.countDown()");
allBytesReceived.countDown();
}
});

try {
allBytesReceived.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("All bytes received: ");
println("All bytes received; latch count:"
+ allBytesReceived.getCount());
srcPublisher.close();
try {
completion.join();
done.join();
if (!alpn.equals("proto2")) {
throw new RuntimeException("wrong alpn received");
}
System.out.println("OK");
println("OK");
} finally {
executor.shutdownNow();
}
Expand Down Expand Up @@ -177,6 +192,7 @@ public static void main(String[]args) throws Exception {
*/
static class SSLLoopbackSubscriber implements Subscriber<List<ByteBuffer>> {
private final BlockingQueue<ByteBuffer> buffer;
private final SSLServerSocket serv;
private final Socket clientSock;
private final SSLSocket serverSock;
private final Thread thread1, thread2, thread3;
Expand All @@ -188,7 +204,7 @@ static class SSLLoopbackSubscriber implements Subscriber<List<ByteBuffer>> {
ExecutorService exec,
CountDownLatch allBytesReceived) throws IOException {
SSLServerSocketFactory fac = ctx.getServerSocketFactory();
SSLServerSocket serv = (SSLServerSocket) fac.createServerSocket();
serv = (SSLServerSocket) fac.createServerSocket();
serv.setReuseAddress(false);
serv.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
SSLParameters params = serv.getSSLParameters();
Expand Down Expand Up @@ -216,7 +232,7 @@ public void start() {
}

private void handlePublisherException(Object o, Throwable t) {
System.out.println("Loopback Publisher exception");
println("Loopback Publisher exception");
t.printStackTrace(System.out);
}

Expand All @@ -227,27 +243,29 @@ private void clientReader() {
try {
InputStream is = clientSock.getInputStream();
final int bufsize = FlowTest.randomRange(512, 16 * 1024);
System.out.println("clientReader: bufsize = " + bufsize);
println("clientReader: bufsize = " + bufsize);
while (true) {
byte[] buf = new byte[bufsize];
int n = is.read(buf);
if (n == -1) {
System.out.println("clientReader close: read "
println("clientReader close: read "
+ readCount.get() + " bytes");
System.out.println("clientReader: got EOF. "
+ "Waiting signal to close publisher.");
println("clientReader: got EOF. "
+ "Waiting signal to close publisher.");
allBytesReceived.await();
System.out.println("clientReader: closing publisher");
println("clientReader: closing publisher; latch count: "
+ allBytesReceived.getCount());
publisher.close();
sleep(2000);
Utils.close(is, clientSock);
Utils.close(is, clientSock, serv);
return;
}
ByteBuffer bb = ByteBuffer.wrap(buf, 0, n);
readCount.addAndGet(n);
publisher.submit(List.of(bb));
}
} catch (Throwable e) {
println("clientReader failed: " + e);
e.printStackTrace();
Utils.close(clientSock);
}
Expand All @@ -266,9 +284,9 @@ private void clientWriter() {
if (buf == FlowTest.SENTINEL) {
// finished
//Utils.sleep(2000);
System.out.println("clientWriter close: " + nbytes + " written");
println("clientWriter close: " + nbytes + " written");
clientSock.shutdownOutput();
System.out.println("clientWriter close return");
println("clientWriter close return");
return;
}
int len = buf.remaining();
Expand All @@ -280,6 +298,7 @@ private void clientWriter() {
clientSubscription.request(1);
}
} catch (Throwable e) {
println("clientWriter failed: " + e);
e.printStackTrace();
}
}
Expand Down Expand Up @@ -307,21 +326,23 @@ private void serverLoopback() {
InputStream is = serverSock.getInputStream();
OutputStream os = serverSock.getOutputStream();
final int bufsize = FlowTest.randomRange(512, 16 * 1024);
System.out.println("serverLoopback: bufsize = " + bufsize);
println("serverLoopback: bufsize = " + bufsize);
byte[] bb = new byte[bufsize];
while (true) {
int n = is.read(bb);
if (n == -1) {
sleep(2000);
is.close();
serverSock.close();
println("Received EOF: closing server socket");
Utils.close(is, serverSock, serv);
println("Server socket closed");
return;
}
os.write(bb, 0, n);
os.flush();
loopCount.addAndGet(n);
}
} catch (Throwable e) {
println("serverLoopback failed: " + e);
e.printStackTrace();
}
}
Expand Down Expand Up @@ -376,18 +397,18 @@ public void onComplete() {
*/
static class EndSubscriber implements Subscriber<List<ByteBuffer>> {

private final long nbytes;
private final long nlongs;

private final AtomicLong counter;
private volatile Flow.Subscription subscription;
private final CompletableFuture<Void> completion;
private final CountDownLatch allBytesReceived;

EndSubscriber(long nbytes,
EndSubscriber(long nlongs,
CompletableFuture<Void> completion,
CountDownLatch allBytesReceived) {
counter = new AtomicLong(0);
this.nbytes = nbytes;
this.nlongs = nlongs;
this.completion = completion;
this.allBytesReceived = allBytesReceived;
}
Expand Down Expand Up @@ -417,12 +438,16 @@ public void onNext(List<ByteBuffer> buffers) {

for (ByteBuffer buf : buffers) {
while (buf.hasRemaining()) {
if (buf.remaining() % 8 != 0) {
completion.completeExceptionally(
new AssertionError("Unaligned buffer: " + buf.remaining()));
}
long n = buf.getLong();
//if (currval > (FlowTest.TOTAL_LONGS - 50)) {
//System.out.println("End: " + currval);
//}
if (n != currval++) {
System.out.println("ERROR at " + n + " != " + (currval - 1));
println("ERROR at " + n + " != " + (currval - 1));
completion.completeExceptionally(new RuntimeException("ERROR"));
subscription.cancel();
return;
Expand All @@ -433,25 +458,24 @@ public void onNext(List<ByteBuffer> buffers) {
counter.set(currval);
subscription.request(1);
if (currval >= TOTAL_LONGS) {
println("allBytesReceived.countDown(): currval=" +currval);
allBytesReceived.countDown();
}
}

@Override
public void onError(Throwable throwable) {
allBytesReceived.countDown();
completion.completeExceptionally(throwable);
}

@Override
public void onComplete() {
long n = counter.get();
if (n != nbytes) {
System.out.printf("nbytes=%d n=%d\n", nbytes, n);
if (n != nlongs) {
printf("Error at end: nlongs=%d n=%d", nlongs, n);
completion.completeExceptionally(new RuntimeException("ERROR AT END"));
} else {
System.out.println("DONE OK: counter = " + n);
allBytesReceived.countDown();
println("DONE OK: counter = " + n);
completion.complete(null);
}
}
Expand All @@ -464,4 +488,40 @@ private static void sleep(int millis) {
e.printStackTrace();
}
}

static final long START = System.nanoTime();

static String now() {
long now = System.nanoTime() - START;
long min = now / 1000_000_000L / 60L;
long sec = (now / 1000_000_000L) % 60L;
long mil = (now / 1000_000L) % 1000L;
long nan = (now % 1000_000L);
StringBuilder str = new StringBuilder();
if (min != 0) {
str = str.append(min).append("m ");
}
if (sec != 0) {
str = str.append(sec).append("s ");
}
if (mil != 0) {
str.append(mil).append("ms ");
}
if (nan != 0) {
str.append(nan).append("ns ");
}
assert now == min * 60L * 1000_000_000L
+ sec * 1000_000_000L
+ mil * 1000_000L + nan;
return str.toString().trim();
}

static void printf(String fmt, Object... args) {
println(String.format(fmt, args));
}

static void println(String msg) {
System.out.println("[" + Thread.currentThread() + "] [" + now() + "] " + msg);
}

}

0 comments on commit 5c8f935

Please sign in to comment.