Skip to content

Commit

Permalink
feat(gateway): add grpc metrics
Browse files Browse the repository at this point in the history
- add monitoring configuration to the standalone gateway to expose
  metrics over http
  • Loading branch information
menski committed Jun 28, 2019
1 parent 0d9cad9 commit aa7f47a
Show file tree
Hide file tree
Showing 13 changed files with 233 additions and 11 deletions.
6 changes: 5 additions & 1 deletion dist/pom.xml
Expand Up @@ -48,6 +48,11 @@
<artifactId>atomix-utils</artifactId>
</dependency>

<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_httpserver</artifactId>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand All @@ -63,7 +68,6 @@
<artifactId>log4j-core</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
Expand Down
9 changes: 8 additions & 1 deletion dist/src/main/config/gateway.cfg.toml
Expand Up @@ -36,7 +36,7 @@
# This setting can also be overridden using the environment variable ZEEBE_GATEWAY_HOST.
# host = "0.0.0.0"
#
# Sets the port the embedded gateway binds to
# Sets the port the gateway binds to
# This setting can also be overridden using the environment variable ZEEBE_GATEWAY_PORT.
# port = 26500

Expand Down Expand Up @@ -73,3 +73,10 @@
# Sets the number of threads the gateway will use to communicate with the broker cluster
# This setting can also be overridden using the environment variable ZEEBE_GATEWAY_MANAGEMENT_THREADS.
# managementThreads = 1

[monitoring]
# Sets the host the monitoring binds to
# host = "0.0.0.0"
#
# Sets the port the monitoring binds to
# port = 9600
14 changes: 14 additions & 0 deletions dist/src/main/java/io/zeebe/gateway/StandaloneGateway.java
Expand Up @@ -18,6 +18,7 @@
import io.atomix.cluster.AtomixCluster;
import io.atomix.cluster.discovery.BootstrapDiscoveryProvider;
import io.atomix.utils.net.Address;
import io.prometheus.client.exporter.HTTPServer;
import io.zeebe.gateway.impl.configuration.ClusterCfg;
import io.zeebe.gateway.impl.configuration.GatewayCfg;
import io.zeebe.util.TomlConfigurationReader;
Expand All @@ -28,10 +29,12 @@ public class StandaloneGateway {

private final AtomixCluster atomixCluster;
private final Gateway gateway;
private final GatewayCfg gatewayCfg;

public StandaloneGateway(GatewayCfg gatewayCfg) {
atomixCluster = createAtomixCluster(gatewayCfg.getCluster());
gateway = new Gateway(gatewayCfg, atomixCluster);
this.gatewayCfg = gatewayCfg;
}

private AtomixCluster createAtomixCluster(ClusterCfg clusterCfg) {
Expand All @@ -52,8 +55,19 @@ private AtomixCluster createAtomixCluster(ClusterCfg clusterCfg) {
}

public void run() throws IOException, InterruptedException {
HTTPServer monitoringServer = null;
if (gatewayCfg.getMonitoring().isEnabled()) {
monitoringServer =
new HTTPServer(
gatewayCfg.getMonitoring().getHost(), gatewayCfg.getMonitoring().getPort());
}

gateway.listenAndServe();
atomixCluster.stop();

if (monitoringServer != null) {
monitoringServer.stop();
}
}

public static void main(String args[]) throws Exception {
Expand Down
9 changes: 8 additions & 1 deletion gateway/pom.xml
@@ -1,4 +1,6 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>
<name>Zeebe Gateway</name>
Expand Down Expand Up @@ -104,6 +106,11 @@
<artifactId>atomix-utils</artifactId>
</dependency>

<dependency>
<groupId>me.dinowernli</groupId>
<artifactId>java-grpc-prometheus</artifactId>
</dependency>

<dependency>
<groupId>io.zeebe</groupId>
<artifactId>zeebe-protocol-test-util</artifactId>
Expand Down
20 changes: 17 additions & 3 deletions gateway/src/main/java/io/zeebe/gateway/Gateway.java
Expand Up @@ -18,13 +18,16 @@
import io.atomix.cluster.AtomixCluster;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerInterceptors;
import io.grpc.netty.NettyServerBuilder;
import io.zeebe.gateway.impl.broker.BrokerClient;
import io.zeebe.gateway.impl.broker.BrokerClientImpl;
import io.zeebe.gateway.impl.configuration.GatewayCfg;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.function.Function;
import me.dinowernli.grpc.prometheus.Configuration;
import me.dinowernli.grpc.prometheus.MonitoringServerInterceptor;
import org.slf4j.Logger;

public class Gateway {
Expand All @@ -47,7 +50,6 @@ public class Gateway {

private Server server;
private BrokerClient brokerClient;
private EndpointManager endpointManager;

public Gateway(GatewayCfg gatewayCfg, AtomixCluster atomixCluster) {
this(
Expand Down Expand Up @@ -83,8 +85,20 @@ public void start() throws IOException {

brokerClient = buildBrokerClient();

endpointManager = new EndpointManager(brokerClient);
server = serverBuilderFactory.apply(gatewayCfg).addService(endpointManager).build();
final EndpointManager endpointManager = new EndpointManager(brokerClient);

final ServerBuilder serverBuilder = serverBuilderFactory.apply(gatewayCfg);

if (gatewayCfg.getMonitoring().isEnabled()) {
final MonitoringServerInterceptor monitoringInterceptor =
MonitoringServerInterceptor.create(Configuration.allMetrics());
serverBuilder.addService(
ServerInterceptors.intercept(endpointManager, monitoringInterceptor));
} else {
serverBuilder.addService(endpointManager);
}

server = serverBuilder.build();

server.start();
}
Expand Down
Expand Up @@ -28,4 +28,6 @@ public class ConfigurationDefaults {
public static final String DEFAULT_CLUSTER_MEMBER_ID = "gateway";
public static final String DEFAULT_CLUSTER_HOST = "0.0.0.0";
public static final int DEFAULT_CLUSTER_PORT = 26502;
public static final boolean DEFAULT_MONITORING_ENABLED = false;
public static final int DEFAULT_MONITORING_PORT = 9600;
}
Expand Up @@ -27,4 +27,7 @@ public class EnvironmentConstants {
public static final String ENV_GATEWAY_CLUSTER_MEMBER_ID = "ZEEBE_GATEWAY_CLUSTER_MEMBER_ID";
public static final String ENV_GATEWAY_CLUSTER_HOST = "ZEEBE_GATEWAY_CLUSTER_HOST";
public static final String ENV_GATEWAY_CLUSTER_PORT = "ZEEBE_GATEWAY_CLUSTER_PORT";
public static final String ENV_GATEWAY_MONITORING_ENABLED = "ZEEBE_GATEWAY_MONITORING_ENABLED";
public static final String ENV_GATEWAY_MONITORING_HOST = "ZEEBE_GATEWAY_MONITORING_HOST";
public static final String ENV_GATEWAY_MONITORING_PORT = "ZEEBE_GATEWAY_MONITORING_PORT";
}
Expand Up @@ -24,6 +24,7 @@ public class GatewayCfg {
private NetworkCfg network = new NetworkCfg();
private ClusterCfg cluster = new ClusterCfg();
private ThreadsCfg threads = new ThreadsCfg();
private MonitoringCfg monitoring = new MonitoringCfg();

public void init() {
init(new Environment());
Expand All @@ -37,6 +38,7 @@ public void init(Environment environment, String defaultHost) {
network.init(environment, defaultHost);
cluster.init(environment);
threads.init(environment);
monitoring.init(environment, defaultHost);
}

public NetworkCfg getNetwork() {
Expand Down Expand Up @@ -66,6 +68,15 @@ public GatewayCfg setThreads(ThreadsCfg threads) {
return this;
}

public MonitoringCfg getMonitoring() {
return monitoring;
}

public GatewayCfg setMonitoring(MonitoringCfg monitoring) {
this.monitoring = monitoring;
return this;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -77,12 +88,13 @@ public boolean equals(Object o) {
final GatewayCfg that = (GatewayCfg) o;
return Objects.equals(network, that.network)
&& Objects.equals(cluster, that.cluster)
&& Objects.equals(threads, that.threads);
&& Objects.equals(threads, that.threads)
&& Objects.equals(monitoring, that.monitoring);
}

@Override
public int hashCode() {
return Objects.hash(network, cluster, threads);
return Objects.hash(network, cluster, threads, monitoring);
}

@Override
Expand All @@ -94,6 +106,8 @@ public String toString() {
+ cluster
+ ", threadsCfg="
+ threads
+ ", monitoringCfg="
+ monitoring
+ '}';
}

Expand Down
@@ -0,0 +1,105 @@
/*
* Copyright © 2017 camunda services GmbH (info@camunda.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.zeebe.gateway.impl.configuration;

import static io.zeebe.gateway.impl.configuration.ConfigurationDefaults.DEFAULT_MONITORING_ENABLED;
import static io.zeebe.gateway.impl.configuration.ConfigurationDefaults.DEFAULT_MONITORING_PORT;
import static io.zeebe.gateway.impl.configuration.EnvironmentConstants.ENV_GATEWAY_MONITORING_ENABLED;
import static io.zeebe.gateway.impl.configuration.EnvironmentConstants.ENV_GATEWAY_MONITORING_HOST;
import static io.zeebe.gateway.impl.configuration.EnvironmentConstants.ENV_GATEWAY_MONITORING_PORT;

import io.zeebe.transport.SocketAddress;
import io.zeebe.util.Environment;
import java.util.Objects;

public class MonitoringCfg {

private boolean enabled = DEFAULT_MONITORING_ENABLED;

private String host;
private int port = DEFAULT_MONITORING_PORT;

public void init(Environment environment, String defaultHost) {
environment.getBool(ENV_GATEWAY_MONITORING_ENABLED).ifPresent(this::setEnabled);
environment.get(ENV_GATEWAY_MONITORING_HOST).ifPresent(this::setHost);
environment.getInt(ENV_GATEWAY_MONITORING_PORT).ifPresent(this::setPort);

if (host == null) {
host = defaultHost;
}
}

public boolean isEnabled() {
return enabled;
}

public MonitoringCfg setEnabled(boolean enabled) {
this.enabled = enabled;
return this;
}

public String getHost() {
return host;
}

public MonitoringCfg setHost(String host) {
this.host = host;
return this;
}

public int getPort() {
return port;
}

public MonitoringCfg setPort(int port) {
this.port = port;
return this;
}

public SocketAddress toSocketAddress() {
return new SocketAddress(host, port);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final MonitoringCfg that = (MonitoringCfg) o;
return enabled == that.enabled && port == that.port && Objects.equals(host, that.host);
}

@Override
public int hashCode() {
return Objects.hash(enabled, host, port);
}

@Override
public String toString() {
return "MonitoringCfg{"
+ "enabled="
+ enabled
+ ", host='"
+ host
+ '\''
+ ", port="
+ port
+ '}';
}
}
Expand Up @@ -22,6 +22,9 @@
import static io.zeebe.gateway.impl.configuration.EnvironmentConstants.ENV_GATEWAY_CONTACT_POINT;
import static io.zeebe.gateway.impl.configuration.EnvironmentConstants.ENV_GATEWAY_HOST;
import static io.zeebe.gateway.impl.configuration.EnvironmentConstants.ENV_GATEWAY_MANAGEMENT_THREADS;
import static io.zeebe.gateway.impl.configuration.EnvironmentConstants.ENV_GATEWAY_MONITORING_ENABLED;
import static io.zeebe.gateway.impl.configuration.EnvironmentConstants.ENV_GATEWAY_MONITORING_HOST;
import static io.zeebe.gateway.impl.configuration.EnvironmentConstants.ENV_GATEWAY_MONITORING_PORT;
import static io.zeebe.gateway.impl.configuration.EnvironmentConstants.ENV_GATEWAY_PORT;
import static io.zeebe.gateway.impl.configuration.EnvironmentConstants.ENV_GATEWAY_REQUEST_TIMEOUT;
import static io.zeebe.gateway.impl.configuration.EnvironmentConstants.ENV_GATEWAY_TRANSPORT_BUFFER;
Expand All @@ -40,6 +43,7 @@ public class GatewayCfgTest {

private static final String DEFAULT_CFG_FILENAME = "/configuration/gateway.default.toml";
private static final GatewayCfg DEFAULT_CFG = new GatewayCfg();
private static final String EMPTY_CFG_FILENAME = "/configuration/gateway.empty.toml";
private static final String CUSTOM_CFG_FILENAME = "/configuration/gateway.custom.toml";
private static final GatewayCfg CUSTOM_CFG = new GatewayCfg();

Expand Down Expand Up @@ -70,6 +74,15 @@ public void shouldHaveDefaultValues() {
assertThat(gatewayCfg).isEqualTo(DEFAULT_CFG);
}

@Test
public void shouldLoadEmptyConfig() {
// when
final GatewayCfg gatewayCfg = readEmptyConfig();

// then
assertThat(gatewayCfg).isEqualTo(DEFAULT_CFG);
}

@Test
public void shouldLoadCustomConfig() {
// when
Expand All @@ -92,6 +105,9 @@ public void shouldUseEnvironmentVariables() {
setEnv(ENV_GATEWAY_CLUSTER_MEMBER_ID, "envMember");
setEnv(ENV_GATEWAY_CLUSTER_HOST, "envHost");
setEnv(ENV_GATEWAY_CLUSTER_PORT, "12345");
setEnv(ENV_GATEWAY_MONITORING_ENABLED, "true");
setEnv(ENV_GATEWAY_MONITORING_HOST, "monitorHost");
setEnv(ENV_GATEWAY_MONITORING_PORT, "231");

final GatewayCfg expected = new GatewayCfg();
expected.getNetwork().setHost("zeebe").setPort(5432);
Expand All @@ -105,6 +121,7 @@ public void shouldUseEnvironmentVariables() {
.setHost("envHost")
.setPort(12345);
expected.getThreads().setManagementThreads(32);
expected.getMonitoring().setEnabled(true).setHost("monitorHost").setPort(231);

// when
final GatewayCfg gatewayCfg = readCustomConfig();
Expand All @@ -121,6 +138,10 @@ private GatewayCfg readDefaultConfig() {
return readConfig(DEFAULT_CFG_FILENAME);
}

private GatewayCfg readEmptyConfig() {
return readConfig(EMPTY_CFG_FILENAME);
}

private GatewayCfg readCustomConfig() {
return readConfig(CUSTOM_CFG_FILENAME);
}
Expand Down
4 changes: 4 additions & 0 deletions gateway/src/test/resources/configuration/gateway.default.toml
@@ -0,0 +1,4 @@
[network]
[cluster]
[threads]
[monitoring]
Empty file.

0 comments on commit aa7f47a

Please sign in to comment.