diff --git a/pom.xml b/pom.xml
index 7ba1eb5b..1066cb9d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,6 +54,10 @@
com.linecorp.armeria
0.98.3
+
+
+ 4.1.46.Final
+
2.20.0
2.12.1
5.9.1
@@ -132,6 +136,13 @@
pom
import
+
+ io.netty
+ netty-bom
+ ${netty.version}
+ pom
+ import
+
io.zipkin.zipkin2
zipkin
diff --git a/reporter-xray-udp/pom.xml b/reporter-xray-udp/pom.xml
index fd43dcc2..974c3ac6 100644
--- a/reporter-xray-udp/pom.xml
+++ b/reporter-xray-udp/pom.xml
@@ -39,5 +39,15 @@
io.zipkin.reporter2
zipkin-reporter
+
+ io.netty
+ netty-transport
+ test
+
+
+ io.zipkin.zipkin2
+ zipkin-tests
+ test
+
diff --git a/reporter-xray-udp/src/test/java/zipkin2/reporter/xray_udp/XRayUDPReporterTest.java b/reporter-xray-udp/src/test/java/zipkin2/reporter/xray_udp/XRayUDPReporterTest.java
index 20735fed..67f26aa2 100644
--- a/reporter-xray-udp/src/test/java/zipkin2/reporter/xray_udp/XRayUDPReporterTest.java
+++ b/reporter-xray-udp/src/test/java/zipkin2/reporter/xray_udp/XRayUDPReporterTest.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2018 The OpenZipkin Authors
+ * Copyright 2016-2020 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -13,4 +13,79 @@
*/
package zipkin2.reporter.xray_udp;
-public class XRayUDPReporterTest {}
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.DatagramPacket;
+import io.netty.channel.socket.nio.NioDatagramChannel;
+import java.net.InetSocketAddress;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import zipkin2.TestObjects;
+import zipkin2.storage.xray_udp.InternalStorageAccess;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class XRayUDPReporterTest {
+
+ private static EventLoopGroup workerGroup;
+ private static Channel serverChannel;
+ private static BlockingQueue receivedPayloads;
+
+ private static XRayUDPReporter reporter;
+
+ @BeforeClass
+ public static void startServer() {
+ workerGroup = new NioEventLoopGroup();
+ receivedPayloads = new LinkedBlockingQueue<>();
+
+ Bootstrap bootstrap = new Bootstrap();
+ bootstrap.group(workerGroup)
+ .channel(NioDatagramChannel.class)
+ .handler(new ChannelInitializer() {
+ @Override protected void initChannel(NioDatagramChannel channel) {
+ channel.pipeline().addLast(new SimpleChannelInboundHandler() {
+ @Override protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) {
+ receivedPayloads.add(ByteBufUtil.getBytes(msg.content()));
+ }
+ });
+ }
+ });
+
+ serverChannel = bootstrap.bind(0).syncUninterruptibly().channel();
+
+ // TODO(anuraaga): Consider changing return type of create to XRayUdpReporter so it's more
+ // obvious the type is Closeable.
+ reporter = (XRayUDPReporter) XRayUDPReporter
+ .create("localhost:" + ((InetSocketAddress) serverChannel.localAddress()).getPort());
+ }
+
+ @AfterClass
+ public static void stopServer() {
+ reporter.close();
+ serverChannel.close().syncUninterruptibly();
+ workerGroup.shutdownGracefully();
+ }
+
+ @Before
+ public void setUp() {
+ receivedPayloads.clear();
+ }
+
+ @Test
+ public void sendSingleSpan() throws Exception {
+ reporter.report(TestObjects.CLIENT_SPAN);
+ assertThat(receivedPayloads.take())
+ .containsExactly(InternalStorageAccess.encode(TestObjects.CLIENT_SPAN));
+ assertThat(receivedPayloads).isEmpty();
+ }
+}
diff --git a/reporter-xray-udp/src/test/java/zipkin2/storage/xray_udp/InternalStorageAccess.java b/reporter-xray-udp/src/test/java/zipkin2/storage/xray_udp/InternalStorageAccess.java
new file mode 100644
index 00000000..12dbce58
--- /dev/null
+++ b/reporter-xray-udp/src/test/java/zipkin2/storage/xray_udp/InternalStorageAccess.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2016-2020 The OpenZipkin Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package zipkin2.storage.xray_udp;
+
+import zipkin2.Span;
+
+/**
+ * Classpath accessor to expose package-private methods in the storage.
+ */
+public final class InternalStorageAccess {
+ /** Encode a span for XRay, */
+ public static byte[] encode(Span span) {
+ return UDPMessageEncoder.encode(span);
+ }
+}
diff --git a/storage-xray-udp/pom.xml b/storage-xray-udp/pom.xml
index f62ce210..3104c84c 100644
--- a/storage-xray-udp/pom.xml
+++ b/storage-xray-udp/pom.xml
@@ -46,5 +46,15 @@
2.4.0
test
+
+ io.netty
+ netty-transport
+ test
+
+
+ io.zipkin.zipkin2
+ zipkin-tests
+ test
+
diff --git a/storage-xray-udp/src/test/java/zipkin2/storage/xray_udp/XRayUDPStorageTest.java b/storage-xray-udp/src/test/java/zipkin2/storage/xray_udp/XRayUDPStorageTest.java
index 2fde0201..9209a69b 100644
--- a/storage-xray-udp/src/test/java/zipkin2/storage/xray_udp/XRayUDPStorageTest.java
+++ b/storage-xray-udp/src/test/java/zipkin2/storage/xray_udp/XRayUDPStorageTest.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2018 The OpenZipkin Authors
+ * Copyright 2016-2020 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -13,4 +13,108 @@
*/
package zipkin2.storage.xray_udp;
-public class XRayUDPStorageTest {}
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.DatagramPacket;
+import io.netty.channel.socket.nio.NioDatagramChannel;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import zipkin2.Span;
+import zipkin2.TestObjects;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+public class XRayUDPStorageTest {
+
+ private static EventLoopGroup workerGroup;
+ private static Channel serverChannel;
+ private static BlockingQueue receivedPayloads;
+
+ private static XRayUDPStorage storage;
+
+ @BeforeClass
+ public static void startServer() {
+ workerGroup = new NioEventLoopGroup();
+ receivedPayloads = new LinkedBlockingQueue<>();
+
+ Bootstrap bootstrap = new Bootstrap();
+ bootstrap.group(workerGroup)
+ .channel(NioDatagramChannel.class)
+ .handler(new ChannelInitializer() {
+ @Override protected void initChannel(NioDatagramChannel channel) {
+ channel.pipeline().addLast(new SimpleChannelInboundHandler() {
+ @Override protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) {
+ receivedPayloads.add(ByteBufUtil.getBytes(msg.content()));
+ }
+ });
+ }
+ });
+
+ serverChannel = bootstrap.bind(0).syncUninterruptibly().channel();
+
+ storage = XRayUDPStorage.newBuilder()
+ .address("localhost:" + ((InetSocketAddress) serverChannel.localAddress()).getPort())
+ .build();
+ }
+
+ @AfterClass
+ public static void stopServer() {
+ storage.close();
+ serverChannel.close().syncUninterruptibly();
+ workerGroup.shutdownGracefully();
+ }
+
+ @Before
+ public void setUp() {
+ receivedPayloads.clear();
+ }
+
+ @Test
+ public void sendTrace() throws Exception {
+ storage.accept(TestObjects.TRACE).execute();
+ for (Span span : TestObjects.TRACE) {
+ byte[] received = receivedPayloads.take();
+ assertThat(received).containsExactly(UDPMessageEncoder.encode(span));
+ }
+ assertThat(receivedPayloads).isEmpty();
+ }
+
+ @Test
+ public void sendSingleSpan() throws Exception {
+ storage.accept(Collections.singletonList(TestObjects.CLIENT_SPAN)).execute();
+ assertThat(receivedPayloads.take())
+ .containsExactly(UDPMessageEncoder.encode(TestObjects.CLIENT_SPAN));
+ assertThat(receivedPayloads).isEmpty();
+ }
+
+ @Test
+ public void sendNoSpans() throws Exception {
+ storage.accept(Collections.emptyList()).execute();
+ // Give some time for any potential bugs to get sent to the server.
+ Thread.sleep(100);
+ assertThat(receivedPayloads).isEmpty();
+ }
+
+ @Test
+ public void sendAfterClose() throws Exception {
+ XRayUDPStorage storage = XRayUDPStorage.newBuilder()
+ .address("localhost:" + ((InetSocketAddress)serverChannel.localAddress()).getPort())
+ .build();
+ storage.close();
+ assertThatThrownBy(() -> storage.accept(TestObjects.TRACE))
+ .isInstanceOf(IllegalStateException.class);
+ }
+}