Skip to content

Commit

Permalink
Reduce use of SocketUtils in tests
Browse files Browse the repository at this point in the history
Resolves #825
  • Loading branch information
onobc authored and olegz committed Mar 23, 2022
1 parent 2ce4572 commit 8b589c6
Show file tree
Hide file tree
Showing 19 changed files with 420 additions and 251 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2020 the original author or authors.
* Copyright 2020-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,7 +27,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand All @@ -50,6 +49,7 @@
*
* @author Daniel Zou
* @author Mike Eltsufin
* @author Chris Bono
*/
final public class LocalServerTestSupport {

Expand Down Expand Up @@ -82,10 +82,7 @@ public static <I, O> void verify(Class<?> mainClass, String function, I input, O
}
}

static ServerProcess startServer(Class<?> springApplicationMainClass, String function)
throws InterruptedException, IOException {
int port = SocketUtils.findAvailableTcpPort();

static ServerProcess startServer(Class<?> springApplicationMainClass, String function) throws IOException {
String signatureType = "http";
String target = FunctionInvoker.class.getCanonicalName();

Expand All @@ -102,7 +99,7 @@ static ServerProcess startServer(Class<?> springApplicationMainClass, String fun

ProcessBuilder processBuilder = new ProcessBuilder().command(command).redirectErrorStream(true);
Map<String, String> environment = new HashMap<>();
environment.put("PORT", String.valueOf(port));
environment.put("PORT", String.valueOf(0));
environment.put("K_SERVICE", "test-function");
environment.put("FUNCTION_SIGNATURE_TYPE", signatureType);
environment.put("FUNCTION_TARGET", target);
Expand All @@ -112,28 +109,28 @@ static ServerProcess startServer(Class<?> springApplicationMainClass, String fun
}
processBuilder.environment().putAll(environment);
Process serverProcess = processBuilder.start();
CountDownLatch ready = new CountDownLatch(1);
StringBuilder output = new StringBuilder();
Future<?> outputMonitorResult = EXECUTOR
.submit(() -> monitorOutput(serverProcess.getInputStream(), ready, output));
boolean serverReady = ready.await(5, TimeUnit.SECONDS);
if (!serverReady) {
Future<Integer> outputMonitorResult = EXECUTOR.submit(() -> monitorOutput(serverProcess.getInputStream()));

int port;
try {
port = outputMonitorResult.get(5L, TimeUnit.SECONDS);
}
catch (Exception ex) {
serverProcess.destroy();
throw new AssertionError("Server never became ready");
}
return new ServerProcess(serverProcess, outputMonitorResult, output, port);
return new ServerProcess(serverProcess, port);
}

private static void monitorOutput(InputStream processOutput, CountDownLatch ready, StringBuilder output) {
private static Integer monitorOutput(InputStream processOutput) {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(processOutput))) {
String line;
while ((line = reader.readLine()) != null) {
if (line.contains(SERVER_READY_STRING)) {
ready.countDown();
}
System.out.println(line);
synchronized (output) {
output.append(line).append('\n');
if (line.contains(SERVER_READY_STRING)) {
// Started ServerConnector@192b07fd{HTTP/1.1,[http/1.1]}{0.0.0.0:59259}
String portStr = line.substring(line.lastIndexOf(':') + 1, line.lastIndexOf('}'));
return Integer.parseInt(portStr);
}
if (line.contains("WARNING")) {
throw new AssertionError("Found warning in server output:\n" + line);
Expand All @@ -143,30 +140,24 @@ private static void monitorOutput(InputStream processOutput, CountDownLatch read
catch (IOException e) {
throw new UncheckedIOException(e);
}
throw new RuntimeException("End of input stream and server never became ready");
}

static class ServerProcess implements AutoCloseable {

private final Process process;

private final Future<?> outputMonitorResult;

private final StringBuilder output;

private final int port;

ServerProcess(Process process, Future<?> outputMonitorResult, StringBuilder output, int port) {
ServerProcess(Process process, int port) {
this.process = process;
this.outputMonitorResult = outputMonitorResult;
this.output = output;
this.port = port;
}

Process process() {
return process;
}


@Override
public void close() {
process().destroy();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2021 the original author or authors.
* Copyright 2021-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,28 +16,34 @@

package org.springframework.cloud.function.grpc;

import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.context.SmartLifecycle;
import org.springframework.util.ClassUtils;

import io.grpc.BindableService;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.protobuf.services.ProtoReflectionService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.context.EnvironmentAware;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.Environment;
import org.springframework.core.env.MapPropertySource;
import org.springframework.util.ClassUtils;

/**
*
* @author Oleg Zhurakousky
* @author Dave Syer
* @author Chris Bono
*
* @since 3.2
*
*/
class GrpcServer implements SmartLifecycle {
class GrpcServer implements SmartLifecycle, EnvironmentAware {

private Log logger = LogFactory.getLog(GrpcServer.class);

Expand All @@ -49,6 +55,8 @@ class GrpcServer implements SmartLifecycle {

private Server server;

private Environment environment;

GrpcServer(FunctionGrpcProperties grpcProperties, BindableService[] grpcMessageServices) {
this.grpcProperties = grpcProperties;
this.grpcMessageServices = grpcMessageServices;
Expand All @@ -70,7 +78,12 @@ public void start() {

logger.info("Starting gRPC server");
this.server.start();
logger.info("gRPC server is listening on port " + this.grpcProperties.getPort());
logger.info("gRPC server is listening on port " + this.server.getPort());

if (environment instanceof ConfigurableEnvironment) {
((ConfigurableEnvironment) this.environment).getPropertySources().addFirst(
new MapPropertySource("grpcServerProps", Collections.singletonMap("local.grpc.server.port", server.getPort())));
}
}
catch (Exception e) {
stop();
Expand All @@ -90,4 +103,9 @@ public void stop() {
public boolean isRunning() {
return this.server != null && !this.server.isShutdown();
}

@Override
public void setEnvironment(Environment environment) {
this.environment = environment;
}
}

0 comments on commit 8b589c6

Please sign in to comment.