Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support configuring transport and secure in the starter #30

Merged
merged 3 commits into from Feb 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions starter-spring/build.gradle
Expand Up @@ -28,4 +28,5 @@ dependencies {

testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'io.projectreactor:reactor-test'
testImplementation 'io.micrometer:micrometer-registry-prometheus'
}
Expand Up @@ -13,11 +13,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.micrometer.prometheus.rsocket.autoconfigure;

import io.micrometer.prometheus.PrometheusMeterRegistry;
import io.micrometer.prometheus.rsocket.PrometheusRSocketClient;
import io.rsocket.transport.netty.client.TcpClientTransport;
import org.springframework.boot.actuate.autoconfigure.metrics.export.prometheus.PrometheusMetricsExportAutoConfiguration;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
Expand All @@ -37,8 +37,7 @@ public class PrometheusRSocketAutoConfiguration {
@ConditionalOnMissingBean
@Bean(destroyMethod = "pushAndClose")
PrometheusRSocketClient prometheusRSocketClient(PrometheusMeterRegistry meterRegistry, PrometheusRSocketProperties properties) {
return new PrometheusRSocketClient(meterRegistry,
TcpClientTransport.create(properties.getHost(), properties.getPort()),
return new PrometheusRSocketClient(meterRegistry, properties.createClientTransport(),
c -> c.retryBackoff(properties.getMaxRetries(), properties.getFirstBackoff(), properties.getMaxBackoff()));
}
}
Expand Up @@ -13,21 +13,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.micrometer.prometheus.rsocket.autoconfigure;

import io.rsocket.transport.ClientTransport;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import org.springframework.boot.context.properties.ConfigurationProperties;
import reactor.netty.http.client.HttpClient;
import reactor.netty.tcp.TcpClient;

import java.time.Duration;

@ConfigurationProperties("management.metrics.export.prometheus.rsocket")
public class PrometheusRSocketProperties {

/**
* The host name of the proxy to connect to.
*/
private String host;
private String host = "localhost";

/**
* The TCP port to make a connection on.
* The port to make a connection on.
*/
private int port = 7001;

Expand All @@ -46,6 +53,16 @@ public class PrometheusRSocketProperties {
*/
private Duration maxBackoff = Duration.ofMinutes(10);

/**
* RSocket transport protocol.
*/
private Transport transport = Transport.TCP;

/**
* Whether to use a secured protocol.
*/
private boolean secure = false;

public long getMaxRetries() {
return maxRetries;
}
Expand Down Expand Up @@ -85,4 +102,53 @@ public int getPort() {
public void setPort(int port) {
this.port = port;
}

public Transport getTransport() {
return transport;
}

public void setTransport(Transport transport) {
this.transport = transport;
}

public void setSecure(boolean secure) {
this.secure = secure;
}

public boolean isSecure() {
return secure;
}

ClientTransport createClientTransport() {
final TcpClient tcpClient = TcpClient.create().host(this.host).port(this.port);
return this.transport.create(this.secure ? tcpClient.secure() : tcpClient);
}

/**
* Choice of transport protocol for the RSocket server.
*/
enum Transport {

/**
* TCP transport protocol.
*/
TCP {
@Override
ClientTransport create(TcpClient tcpClient) {
return TcpClientTransport.create(tcpClient);
}
},

/**
* WebSocket transport protocol.
*/
WEBSOCKET {
@Override
ClientTransport create(TcpClient tcpClient) {
return WebsocketClientTransport.create(HttpClient.from(tcpClient), "/");
}
};

abstract ClientTransport create(TcpClient tcpClient);
}
}
@@ -0,0 +1,88 @@
/**
* Copyright 2019 Pivotal Software, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* https://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.micrometer.prometheus.rsocket.autoconfigure;

import io.rsocket.AbstractRSocket;
import io.rsocket.Payload;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.ServerTransport;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.transport.netty.server.WebsocketServerTransport;
import org.junit.Test;
import org.springframework.boot.actuate.autoconfigure.metrics.MetricsAutoConfiguration;
import org.springframework.boot.actuate.autoconfigure.metrics.export.prometheus.PrometheusMetricsExportAutoConfiguration;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.util.SocketUtils;
import reactor.core.publisher.Mono;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;

public class PrometheusRSocketAutoConfigurationTest {

private final ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withConfiguration(AutoConfigurations.of(MetricsAutoConfiguration.class, PrometheusMetricsExportAutoConfiguration.class, PrometheusRSocketAutoConfiguration.class));

private Mono<CloseableChannel> startServer(ServerTransport<CloseableChannel> serverTransport, CountDownLatch latch) {
return RSocketFactory.receive()
.acceptor((setup, sendingSocket) -> {
latch.countDown();
final AbstractRSocket rsocket = new AbstractRSocket() {

@Override
public Mono<Void> fireAndForget(Payload payload) {
return Mono.empty();
}
};
return Mono.just(rsocket);
})
.transport(serverTransport)
.start();
}

@Test
public void prometheusRSocketClientTcp() {
int port = SocketUtils.findAvailableTcpPort();
final CountDownLatch latch = new CountDownLatch(1);
this.startServer(TcpServerTransport.create(port), latch)
.block();
this.contextRunner.withPropertyValues("management.metrics.export.prometheus.rsocket.port=" + port,
"management.metrics.export.prometheus.rsocket.transport=tcp")
.run(context -> {
latch.await(5, TimeUnit.SECONDS);
assertThat(latch.getCount()).isEqualTo(0);
});
}

@Test
public void prometheusRSocketClientWebsocket() {
int port = SocketUtils.findAvailableTcpPort();
final CountDownLatch latch = new CountDownLatch(1);
this.startServer(WebsocketServerTransport.create(port), latch)
.block();
this.contextRunner.withPropertyValues("management.metrics.export.prometheus.rsocket.port=" + port,
"management.metrics.export.prometheus.rsocket.transport=websocket")
.run(context -> {
latch.await(5, TimeUnit.SECONDS);
assertThat(latch.getCount()).isEqualTo(0);
});
}
}
25 changes: 25 additions & 0 deletions starter-spring/src/test/resources/logback.xml
@@ -0,0 +1,25 @@
<!--
Copyright 2019 Pivotal Software, Inc.
<p>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
<p>
https://www.apache.org/licenses/LICENSE-2.0
<p>
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %5p --- [%15.15t] %-40.40logger{39} : %m%n</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>
</configuration>