Skip to content
This repository has been archived by the owner on Mar 13, 2021. It is now read-only.

Prototype grpc server #17

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,4 @@ VOLUME /tmp
COPY target/dependency/BOOT-INF/lib /app/lib
COPY target/dependency/META-INF /app/META-INF
COPY target/dependency/BOOT-INF/classes /app
ENV SPRING_CLOUD_STREAM_BINDER_FILE_PREFIX /pipes
ENTRYPOINT ["java","-Xmx128m","-Djava.security.egd=file:/dev/./urandom","-XX:TieredStopAtLevel=1","-noverify","-cp","app:app/lib/*","io.projectriff.invoker.JavaFunctionInvokerApplication"]
49 changes: 25 additions & 24 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<spring-cloud.version>Edgware.SR1</spring-cloud.version>
<spring-cloud-function.version>1.0.0.M3</spring-cloud-function.version>
<spring-cloud-stream-file.version>1.0.0.M1</spring-cloud-stream-file.version>
<spring-cloud-deployer.version>1.2.0.RELEASE</spring-cloud-deployer.version>
<reactor.version>3.2.0.M1</reactor.version>
<spring-cloud.version>Edgware.RELEASE</spring-cloud.version>
<spring-cloud-function.version>1.0.0.BUILD-SNAPSHOT</spring-cloud-function.version>
<spring-cloud-deployer.version>1.2.0.RELEASE</spring-cloud-deployer.version>
<grpc.version>1.8.0</grpc.version>
<dockerfile-maven.version>1.3.6</dockerfile-maven.version>
<docker.org>projectriff</docker.org>
<docker.tag>0.0.4-snapshot</docker.tag>
Expand All @@ -32,30 +32,31 @@
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-function-web</artifactId>
<artifactId>spring-cloud-function-context</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-function-stream</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</exclusion>
</exclusions>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-file</artifactId>
<version>${spring-cloud-stream-file.version}</version>
<groupId>io.projectriff</groupId>
<artifactId>riff-function-proto</artifactId>
<version>0.0.3-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/io/projectriff/invoker/ApplicationRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,4 +163,16 @@ private void closeContext() {
this.app = null;
}
}

public void awaitTermination() {
if (this.app != null) {
String name = "grpcConfiguration";
if (!containsBean(name)) {
return;
}
Expression parsed = new SpelExpressionParser().parseExpression(
"context.getBean('" + name + "').awaitTermination()");
parsed.getValue(this.app);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
* @author Dave Syer
*/
@Configuration
@EnableConfigurationProperties
@EnableConfigurationProperties(FunctionProperties.class)
public class FunctionConfiguration {

private static Log logger = LogFactory.getLog(FunctionConfiguration.class);
Expand Down
6 changes: 2 additions & 4 deletions src/main/java/io/projectriff/invoker/FunctionProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.stream.IntStream;

import javax.annotation.PostConstruct;
import javax.validation.constraints.NotNull;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand All @@ -43,7 +42,6 @@ public class FunctionProperties {

private static Log logger = LogFactory.getLog(FunctionProperties.class);

@NotNull
private String uri;
private String[] jarLocation;
private String[] className;
Expand Down Expand Up @@ -74,12 +72,12 @@ public void init() {
logger.info("initializing with uri: " + uri);
Matcher m = uriPattern.matcher(uri);
Assert.isTrue(m.matches(),
"expected format: <jarLocation>?handler=<className>[&main=<className>]");
"expected format: <jarLocation>?handler=<className>[&main=<className>]");

String jarLocation = m.group(1);
String className = m.group(2);
String rest = m.group(3);
if (rest!=null && rest.startsWith("main=")) {
if (rest != null && rest.startsWith("main=")) {
this.mainClassName = rest.substring("main=".length());
}

Expand Down
109 changes: 109 additions & 0 deletions src/main/java/io/projectriff/invoker/GrpcConfiguration.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.projectriff.invoker;

import java.io.IOException;
import java.util.function.Function;

import javax.annotation.PreDestroy;

import io.grpc.Server;
import io.grpc.ServerBuilder;

import com.google.gson.Gson;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
import org.springframework.cloud.function.core.FunctionCatalog;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.util.SocketUtils;

import reactor.core.publisher.Flux;

/**
* @author Dave Syer
*
*/
@Configuration
@ConfigurationProperties("server")
public class GrpcConfiguration {

private static final Log logger = LogFactory.getLog(GrpcConfiguration.class);
private Server server;
private int port = 10382;

@Autowired
private Gson mapper;
@Autowired
private FunctionProperties functions;
@Autowired
private FunctionInspector inspector;
@Autowired
private FunctionCatalog catalog;

public int getPort() {
return port;
}

public void setPort(int port) {
port = port > 0 ? port : SocketUtils.findAvailableTcpPort();
this.port = port;
}

/** Start serving requests. */
@EventListener(ContextRefreshedEvent.class)
public void start() throws IOException {
Function<Flux<?>, Flux<?>> function = catalog
.lookupFunction(functions.getFunctionName());
this.server = ServerBuilder.forPort(this.port)
.addService(new JavaFunctionInvokerServer(function, this.mapper,
inspector.getInputType(function),
inspector.getOutputType(function), inspector.isMessage(function)))
.build();
this.server.start();
logger.info("Server started, listening on " + port);
}

public void awaitTermination() {
if (server != null) {
try {
server.awaitTermination();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

/** Stop serving requests and shutdown resources. */
@PreDestroy
public void stop() {
if (this.server != null) {
logger.info("Server shutting down on " + port);
this.server.shutdown();
}
// This shouldn't be necessary, and doesn't seem to help anyway. How to stop the
// class loader from being destroyed before the server has stopped?
awaitTermination();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,28 @@ public class JavaFunctionInvokerApplication {
private ApplicationRunner runner;
private URLClassLoader classLoader;

public static void main(String[] args) {
JavaFunctionInvokerApplication application = new JavaFunctionInvokerApplication();
if (application.isolated(args)) {
public static void main(String[] args) throws IOException {
if (JavaFunctionInvokerApplication.isolated(args)) {
JavaFunctionInvokerApplication application = new JavaFunctionInvokerApplication();
application.run(args);
application.awaitTermination();
}
else {
SpringApplication.run(JavaFunctionInvokerApplication.class, args);
SpringApplication.run(JavaFunctionInvokerApplication.class, args)
.getBean(GrpcConfiguration.class).awaitTermination();
}
}

public void run(String... args) {
runner().run(args);
}

private void awaitTermination() {
if (this.runner != null) {
this.runner.awaitTermination();
}
}

@PreDestroy
public void close() throws Exception {
if (this.runner != null) {
Expand All @@ -82,7 +90,7 @@ private ApplicationRunner runner() {
return this.runner;
}

private boolean isolated(String[] args) {
private static boolean isolated(String[] args) {
for (String arg : args) {
if (arg.equals("--function.runner.isolated=false")) {
return false;
Expand Down

This file was deleted.

Loading