From 8d16a3f0043d668b2f128862dfac3a15b365e66b Mon Sep 17 00:00:00 2001 From: "pg.yang" Date: Sat, 27 Aug 2022 00:47:04 +0800 Subject: [PATCH] Add plugin to support NATS client --- .github/workflows/plugins-test.9999.yaml | 77 ++ CHANGES.md | 1 + .../trace/component/ComponentsDefine.java | 2 + .../apm-sdk-plugin/nats-client-plugin/pom.xml | 47 + .../client/CreateDispatcherInterceptor.java | 44 + .../nats/client/CreateSubInterceptor.java | 47 + .../nats/client/DeliverReplyInterceptor.java | 56 + .../nats/client/NatsClientPluginConfig.java | 38 + .../apm/plugin/nats/client/NatsCommons.java | 124 ++ .../NatsJetStreamConstructorInterceptor.java | 31 + .../nats/client/NatsMessageInterceptor.java | 41 + ...atsSubscriptionConstructorInterceptor.java | 29 + .../SubscriptionNextMsgInterceptor.java | 53 + .../client/WriterConstructorInterceptor.java | 29 + .../nats/client/WriterQueueInterceptor.java | 83 ++ .../WriterSendMessageBatchInterceptor.java | 121 ++ .../define/NatsConnectionInstrumentation.java | 85 ++ .../NatsConnectionWriterInstrumentation.java | 103 ++ .../define/NatsJetStreamInstrumentation.java | 81 ++ .../define/NatsMessageInstrumentation.java | 79 ++ .../NatsSubscriptionInstrumentation.java | 80 ++ .../src/main/resources/skywalking-plugin.def | 21 + apm-sniffer/apm-sdk-plugin/pom.xml | 1 + .../service-agent/java-agent/Plugin-list.md | 1 + pom.xml | 1 + .../nats-client-scenario/bin/startup.sh | 31 + .../config/expectedData.yaml | 1214 +++++++++++++++++ .../nats-client-scenario/configuration.yml | 35 + .../scenarios/nats-client-scenario/pom.xml | 126 ++ .../src/main/assembly/assembly.xml | 41 + .../apm/testcase/nats/client/Application.java | 30 + .../client/controller/StartController.java | 130 ++ .../client/publisher/JetStreamPublisher.java | 46 + .../publisher/JetStreamPublisherFetcher.java | 46 + .../client/publisher/NormalPublisher.java | 38 + .../nats/client/publisher/Publisher.java | 35 + .../client/publisher/ReqReplyPublisher.java | 38 + .../nats/client/subscriber/Consumer.java | 25 + .../subscriber/JetStreamFetcherConsumer.java | 66 + .../subscriber/JetStreamHandlerConsumer.java | 56 + .../client/subscriber/NextMsgConsumer.java | 55 + .../client/subscriber/ReqReplyConsumer.java | 46 + .../testcase/nats/client/work/StopSignal.java | 31 + .../testcase/nats/client/work/StreamUtil.java | 57 + .../nats/client/work/TrackedConnection.java | 61 + .../nats/client/work/WorkBuilder.java | 72 + .../nats-client-scenario/support-version.list | 21 + 47 files changed, 3575 insertions(+) create mode 100644 .github/workflows/plugins-test.9999.yaml create mode 100644 apm-sniffer/apm-sdk-plugin/nats-client-plugin/pom.xml create mode 100644 apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/CreateDispatcherInterceptor.java create mode 100644 apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/CreateSubInterceptor.java create mode 100644 apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/DeliverReplyInterceptor.java create mode 100644 apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/NatsClientPluginConfig.java create mode 100644 apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/NatsCommons.java create mode 100644 apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/NatsJetStreamConstructorInterceptor.java create mode 100644 apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/NatsMessageInterceptor.java create mode 100644 apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/NatsSubscriptionConstructorInterceptor.java create mode 100644 apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/SubscriptionNextMsgInterceptor.java create mode 100644 apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/WriterConstructorInterceptor.java create mode 100644 apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/WriterQueueInterceptor.java create mode 100644 apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/WriterSendMessageBatchInterceptor.java create mode 100644 apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/define/NatsConnectionInstrumentation.java create mode 100644 apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/define/NatsConnectionWriterInstrumentation.java create mode 100644 apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/define/NatsJetStreamInstrumentation.java create mode 100644 apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/define/NatsMessageInstrumentation.java create mode 100644 apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/define/NatsSubscriptionInstrumentation.java create mode 100644 apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/resources/skywalking-plugin.def create mode 100644 test/plugin/scenarios/nats-client-scenario/bin/startup.sh create mode 100644 test/plugin/scenarios/nats-client-scenario/config/expectedData.yaml create mode 100644 test/plugin/scenarios/nats-client-scenario/configuration.yml create mode 100644 test/plugin/scenarios/nats-client-scenario/pom.xml create mode 100644 test/plugin/scenarios/nats-client-scenario/src/main/assembly/assembly.xml create mode 100644 test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/Application.java create mode 100644 test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/controller/StartController.java create mode 100644 test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/publisher/JetStreamPublisher.java create mode 100644 test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/publisher/JetStreamPublisherFetcher.java create mode 100644 test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/publisher/NormalPublisher.java create mode 100644 test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/publisher/Publisher.java create mode 100644 test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/publisher/ReqReplyPublisher.java create mode 100644 test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/subscriber/Consumer.java create mode 100644 test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/subscriber/JetStreamFetcherConsumer.java create mode 100644 test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/subscriber/JetStreamHandlerConsumer.java create mode 100644 test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/subscriber/NextMsgConsumer.java create mode 100644 test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/subscriber/ReqReplyConsumer.java create mode 100644 test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/work/StopSignal.java create mode 100644 test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/work/StreamUtil.java create mode 100644 test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/work/TrackedConnection.java create mode 100644 test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/work/WorkBuilder.java create mode 100644 test/plugin/scenarios/nats-client-scenario/support-version.list diff --git a/.github/workflows/plugins-test.9999.yaml b/.github/workflows/plugins-test.9999.yaml new file mode 100644 index 0000000000..e0bb9d1abb --- /dev/null +++ b/.github/workflows/plugins-test.9999.yaml @@ -0,0 +1,77 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: Test + +on: + pull_request: + paths: + - '.github/workflows/plugins-*.yaml' + - 'apm-application-toolkit/**' + - 'apm-commons/**' + - 'apm-protocol/**' + - 'apm-sniffer/**' + - 'test/plugin/**' + - '**/pom.xml' + - '!**.md' + push: + branches: + - test/ci/* + +concurrency: + group: plugins-3-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + +jobs: + build: + name: Build + runs-on: ubuntu-latest + timeout-minutes: 30 + steps: + - uses: actions/checkout@v2 + with: + submodules: true + - name: Build + uses: ./.github/actions/build + + test: + needs: [ build ] + name: ${{ matrix.case }} + runs-on: ubuntu-latest + timeout-minutes: 90 + strategy: + matrix: + case: + - nats-client-scenario + steps: + - uses: actions/checkout@v2 + with: + submodules: true + - uses: actions/setup-java@v2 + with: + distribution: adopt + java-version: 8 + - name: Install Oracle Libs + if: matrix.case == 'oracle-scenario' + run: | + mkdir -p skywalking-agent/plugins + curl -O https://skyapm.github.io/ci-assist/jars/ojdbc14-10.2.0.4.0.jar + curl -L -o ./skywalking-agent/plugins/apm-oracle-10.x-plugin-2.0.0.jar https://github.com/SkyAPM/java-plugin-extensions/releases/download/2.0.0/apm-oracle-10.x-plugin-2.0.0.jar + ./mvnw -q --batch-mode install:install-file -Dfile=ojdbc14-10.2.0.4.0.jar -DgroupId=com.oracle -DartifactId=ojdbc14 -Dversion=10.2.0.4.0 -Dpackaging=jar + - name: Run Plugin Test + uses: ./.github/actions/run + with: + test_case: ${{ matrix.case }} diff --git a/CHANGES.md b/CHANGES.md index 3cf412f66a..68ac1f7ed1 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -18,6 +18,7 @@ Release Notes. * Force the injected high-priority classes in order to avoid NoClassDefFoundError. * Plugin to support xxl-job 2.3.x. * Add plugin to support Micronaut(HTTP Client/Server) 3.2.x-3.6.x +* Add plugin to support NATS Java client #### Documentation diff --git a/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/ComponentsDefine.java b/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/ComponentsDefine.java index 23d7f9f1d0..52930e9870 100755 --- a/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/ComponentsDefine.java +++ b/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/ComponentsDefine.java @@ -227,4 +227,6 @@ public class ComponentsDefine { public static final OfficialComponent MICRONAUT = new OfficialComponent(131, "Micronaut"); + public static final OfficialComponent NATS = new OfficialComponent(132, "Nats"); + } diff --git a/apm-sniffer/apm-sdk-plugin/nats-client-plugin/pom.xml b/apm-sniffer/apm-sdk-plugin/nats-client-plugin/pom.xml new file mode 100644 index 0000000000..341684e8e6 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/nats-client-plugin/pom.xml @@ -0,0 +1,47 @@ + + + + + + apm-sdk-plugin + org.apache.skywalking + 8.12.0-SNAPSHOT + + + 4.0.0 + + nats-client-plugin + jar + + + 8 + 8 + + + + + io.nats + jnats + 2.15.6 + provided + + + + diff --git a/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/CreateDispatcherInterceptor.java b/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/CreateDispatcherInterceptor.java new file mode 100644 index 0000000000..d33c1fa98e --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/CreateDispatcherInterceptor.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.apm.plugin.nats.client; + +import io.nats.client.Connection; +import io.nats.client.MessageHandler; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; +import java.lang.reflect.Method; +import static org.apache.skywalking.apm.plugin.nats.client.NatsCommons.buildTraceMsgHandler; + +public class CreateDispatcherInterceptor implements InstanceMethodsAroundInterceptor { + + @Override + public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, MethodInterceptResult result) throws Throwable { + allArguments[0] = buildTraceMsgHandler((MessageHandler) allArguments[0], (Connection) objInst); + } + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, Object ret) throws Throwable { + return ret; + } + + @Override + public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, Throwable t) { + } + +} diff --git a/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/CreateSubInterceptor.java b/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/CreateSubInterceptor.java new file mode 100644 index 0000000000..f563ecb151 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/CreateSubInterceptor.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.apm.plugin.nats.client; + +import io.nats.client.Connection; +import io.nats.client.MessageHandler; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; + +import java.lang.reflect.Method; + +import static org.apache.skywalking.apm.plugin.nats.client.NatsCommons.buildTraceMsgHandler; + +public class CreateSubInterceptor implements InstanceMethodsAroundInterceptor { + @Override + public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, MethodInterceptResult result) throws Throwable { + allArguments[3] = buildTraceMsgHandler((MessageHandler) allArguments[3], (Connection) objInst.getSkyWalkingDynamicField()); + } + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] + argumentsTypes, Object ret) throws Throwable { + return ret; + } + + @Override + public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] + argumentsTypes, Throwable t) { + + } +} diff --git a/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/DeliverReplyInterceptor.java b/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/DeliverReplyInterceptor.java new file mode 100644 index 0000000000..5b6901e154 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/DeliverReplyInterceptor.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.apm.plugin.nats.client; + +import io.nats.client.Connection; +import io.nats.client.Message; +import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; + +import java.lang.reflect.Method; + +import static org.apache.skywalking.apm.plugin.nats.client.NatsCommons.createIncomingSpan; + +public class DeliverReplyInterceptor implements InstanceMethodsAroundInterceptor { + + @Override + public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, MethodInterceptResult result) throws Throwable { + createIncomingSpan((Message) allArguments[0], (Connection) objInst); + } + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, Object ret) throws Throwable { + if (ContextManager.isActive()) { + AbstractSpan span = ContextManager.activeSpan(); + ContextManager.stopSpan(span); + } + return ret; + } + + @Override + public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, Throwable t) { + if (ContextManager.isActive()) { + AbstractSpan span = ContextManager.activeSpan(); + span.log(t).errorOccurred(); + ContextManager.stopSpan(span); + } + } +} diff --git a/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/NatsClientPluginConfig.java b/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/NatsClientPluginConfig.java new file mode 100644 index 0000000000..9cf459c737 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/NatsClientPluginConfig.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.nats.client; + +import org.apache.skywalking.apm.agent.core.boot.PluginConfig; + +public class NatsClientPluginConfig { + + public static class Plugin { + @PluginConfig(root = NatsClientPluginConfig.class) + public static class NatsClient { + /** + * Nats publish message asynchronously , it put message to local queue , + * then write message to network by call flush method in another thread . + * This config term indicate whether collect complete trace . + * If set to true ,the plugin will trace enqueue , flush . Otherwise, only enqueue + * Notice , If set true . will generate a lot of Span (one span for a message). These spans are not released util call flush + */ + public static boolean ENABLE_FULL_TRACE = false; + } + } +} diff --git a/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/NatsCommons.java b/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/NatsCommons.java new file mode 100644 index 0000000000..8b28f68011 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/NatsCommons.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.apm.plugin.nats.client; + +import io.nats.client.Connection; +import io.nats.client.Message; +import io.nats.client.MessageHandler; +import org.apache.skywalking.apm.agent.core.context.CarrierItem; +import org.apache.skywalking.apm.agent.core.context.ContextCarrier; +import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.context.tag.IntegerTag; +import org.apache.skywalking.apm.agent.core.context.tag.StringTag; +import org.apache.skywalking.apm.agent.core.context.tag.Tags; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; +import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer; +import org.apache.skywalking.apm.network.trace.component.ComponentsDefine; +import org.apache.skywalking.apm.util.StringUtil; + +import java.util.Optional; + +public class NatsCommons { + + private static final String SID = "sid"; + private static final String REPLY_TO = "reply_to"; + private static final String MSG_STATE = "state"; + private static final String MSG = "message"; + + static boolean skipTrace(Object msg) { + // include null + if (!(msg instanceof Message)) { + return true; + } + Message natsMsg = (Message) msg; + return StringUtil.isBlank(natsMsg.getSubject()) || natsMsg.isStatusMessage(); + } + + static Optional createIncomingSpan(Message message, Connection connection) { + if (message.getHeaders() == null) { + return Optional.empty(); + } + + ContextCarrier contextCarrier = new ContextCarrier(); + CarrierItem next = contextCarrier.items(); + while (next.hasNext()) { + next = next.next(); + if (StringUtil.isNotEmpty(next.getHeadKey())) { + next.setHeadValue(message.getHeaders().getFirst(next.getHeadKey())); + } + } + AbstractSpan span = ContextManager.createEntrySpan("Nats/Sub/" + message.getSubject(), contextCarrier); + addCommonTag(span, message, connection.getConnectedUrl()); + return Optional.of(span); + } + + static void injectCarrier(Message message) { + ContextCarrier contextCarrier = new ContextCarrier(); + ContextManager.inject(contextCarrier); + CarrierItem next = contextCarrier.items(); + while (next.hasNext()) { + next = next.next(); + if (StringUtil.isNotEmpty(next.getHeadKey()) + && StringUtil.isNotEmpty(next.getHeadValue())) { + message.getHeaders().add(next.getHeadKey(), next.getHeadValue()); + } + } + } + + static void addCommonTag(AbstractSpan span, Message message, String connectionUrl) { + Optional.ofNullable(message.getReplyTo()).ifPresent(v -> span.tag(new StringTag(REPLY_TO), v)); + Optional.ofNullable(message.getSID()).ifPresent(v -> span.tag(new StringTag(SID), v)); + span.setComponent(ComponentsDefine.NATS); + SpanLayer.asMQ(span); + Tags.URL.set(span, connectionUrl); + if (message.getStatus() != null) { + int code = message.getStatus().getCode(); + String statusMsg = message.getStatus().getMessage(); + span.tag(new IntegerTag(MSG_STATE), String.valueOf(code)); + if (StringUtil.isNotBlank(statusMsg)) { + span.tag(new StringTag(MSG), statusMsg); + } + if (code != 0) { + span.errorOccurred(); + } + } + } + + static MessageHandler buildTraceMsgHandler(MessageHandler msgHandler, Connection connection) { + if (msgHandler == null) { + return null; + } + return msg -> { + if (skipTrace(msg) || msg.getHeaders() == null) { + msgHandler.onMessage(msg); + return; + } + try { + NatsCommons.createIncomingSpan(msg, connection); + msgHandler.onMessage(msg); + ContextManager.stopSpan(); + } catch (Exception e) { + if (ContextManager.isActive()) { + AbstractSpan span = ContextManager.activeSpan().log(e).errorOccurred(); + ContextManager.stopSpan(span); + } + } + }; + + } +} diff --git a/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/NatsJetStreamConstructorInterceptor.java b/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/NatsJetStreamConstructorInterceptor.java new file mode 100644 index 0000000000..dd60a6b168 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/NatsJetStreamConstructorInterceptor.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.apm.plugin.nats.client; + +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor; + +public class NatsJetStreamConstructorInterceptor implements InstanceConstructorInterceptor { + + @Override + public void onConstruct(EnhancedInstance objInst, Object[] allArguments) throws Throwable { + // Hold NatsConnection + objInst.setSkyWalkingDynamicField(allArguments[0]); + + } +} diff --git a/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/NatsMessageInterceptor.java b/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/NatsMessageInterceptor.java new file mode 100644 index 0000000000..128ba969b5 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/NatsMessageInterceptor.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.apm.plugin.nats.client; + +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; + +import java.lang.reflect.Method; + +public class NatsMessageInterceptor implements InstanceMethodsAroundInterceptor { + @Override + public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, MethodInterceptResult result) throws Throwable { + + } + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, Object ret) throws Throwable { + return ret; + } + + @Override + public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, Throwable t) { + + } +} diff --git a/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/NatsSubscriptionConstructorInterceptor.java b/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/NatsSubscriptionConstructorInterceptor.java new file mode 100644 index 0000000000..0236bfebc2 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/NatsSubscriptionConstructorInterceptor.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.apm.plugin.nats.client; + +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor; + +public class NatsSubscriptionConstructorInterceptor implements InstanceConstructorInterceptor { + @Override + public void onConstruct(EnhancedInstance objInst, Object[] allArguments) throws Throwable { + // Hold NatsConnection + objInst.setSkyWalkingDynamicField(allArguments[3]); + } +} diff --git a/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/SubscriptionNextMsgInterceptor.java b/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/SubscriptionNextMsgInterceptor.java new file mode 100644 index 0000000000..6470fa57ec --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/SubscriptionNextMsgInterceptor.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.apm.plugin.nats.client; + +import io.nats.client.Connection; +import io.nats.client.Message; +import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; + +import java.lang.reflect.Method; + +import static org.apache.skywalking.apm.plugin.nats.client.NatsCommons.createIncomingSpan; + +public class SubscriptionNextMsgInterceptor implements InstanceMethodsAroundInterceptor { + + @Override + public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, MethodInterceptResult result) throws Throwable { + } + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, Object ret) throws Throwable { + if (ret == null) { + return null; + } + Message msg = (Message) ret; + // Close the span immediately + createIncomingSpan(msg, (Connection) objInst.getSkyWalkingDynamicField()) + .ifPresent(ContextManager::stopSpan); + return ret; + } + + @Override + public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, Throwable t) { + //do nothing since we can't capture any message + } +} diff --git a/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/WriterConstructorInterceptor.java b/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/WriterConstructorInterceptor.java new file mode 100644 index 0000000000..baa4f9b093 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/WriterConstructorInterceptor.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.apm.plugin.nats.client; + +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor; + +public class WriterConstructorInterceptor implements InstanceConstructorInterceptor { + @Override + public void onConstruct(EnhancedInstance objInst, Object[] allArguments) throws Throwable { + // Hold io.nats.client.Connection + objInst.setSkyWalkingDynamicField(allArguments[0]); + } +} diff --git a/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/WriterQueueInterceptor.java b/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/WriterQueueInterceptor.java new file mode 100644 index 0000000000..35bf6f4d17 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/WriterQueueInterceptor.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.apm.plugin.nats.client; + +import io.nats.client.Connection; +import io.nats.client.Message; +import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; +import org.apache.skywalking.apm.agent.core.logging.api.ILog; +import org.apache.skywalking.apm.agent.core.logging.api.LogManager; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; + +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.skywalking.apm.plugin.nats.client.NatsCommons.addCommonTag; +import static org.apache.skywalking.apm.plugin.nats.client.NatsCommons.injectCarrier; +import static org.apache.skywalking.apm.plugin.nats.client.NatsCommons.skipTrace; + +public class WriterQueueInterceptor implements InstanceMethodsAroundInterceptor { + + private static final ILog LOGGER = LogManager.getLogger(WriterQueueInterceptor.class); + + @Override + public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, MethodInterceptResult result) throws Throwable { + if (skipTrace(allArguments[0])) { + return; + } + String connection = ((Connection) objInst.getSkyWalkingDynamicField()).getConnectedUrl(); + Message message = (Message) allArguments[0]; + if (NatsClientPluginConfig.Plugin.NatsClient.ENABLE_FULL_TRACE) { + EnhancedInstance enhancedMsg = (EnhancedInstance) allArguments[0]; + AbstractSpan span = ContextManager.createLocalSpan("Nats/Pub/Enqueue/" + message.getSubject()); + addCommonTag(span, message, connection); + enhancedMsg.setSkyWalkingDynamicField(ContextManager.capture()); + } else { + AbstractSpan span = ContextManager.createExitSpan("Nats/Pub/" + message.getSubject(), connection); + addCommonTag(span, message, connection); + injectCarrier(message); + } + } + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, Object ret) throws Throwable { + Message message = (Message) allArguments[0]; + if (skipTrace(message)) { + return ret; + } + AbstractSpan span = ContextManager.activeSpan(); + if (!(Boolean) ret) { + Map eventMap = new HashMap(); + eventMap.put("enqueue", "failed"); + span.errorOccurred().log(System.currentTimeMillis(), eventMap); + } + + ContextManager.stopSpan(span); + return ret; + } + + @Override + public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, Throwable t) { + AbstractSpan span = ContextManager.activeSpan().errorOccurred().log(t); + ContextManager.stopSpan(span); + } +} diff --git a/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/WriterSendMessageBatchInterceptor.java b/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/WriterSendMessageBatchInterceptor.java new file mode 100644 index 0000000000..a39bee067f --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/WriterSendMessageBatchInterceptor.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.apm.plugin.nats.client; + +import io.nats.client.Connection; +import io.nats.client.Message; +import io.nats.client.impl.NatsMessage; +import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.context.ContextSnapshot; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; +import org.apache.skywalking.apm.agent.core.logging.api.ILog; +import org.apache.skywalking.apm.agent.core.logging.api.LogManager; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.Optional; + +import static org.apache.skywalking.apm.plugin.nats.client.NatsCommons.addCommonTag; +import static org.apache.skywalking.apm.plugin.nats.client.NatsCommons.injectCarrier; +import static org.apache.skywalking.apm.plugin.nats.client.NatsCommons.skipTrace; + +public class WriterSendMessageBatchInterceptor implements InstanceMethodsAroundInterceptor { + + private static final ILog LOGGER = LogManager.getLogger(NatsCommons.class); + + private static final Field NEXT_FIELD; + + static { + Field field; + try { + field = NatsMessage.class.getDeclaredField("next"); + field.setAccessible(true); + } catch (NoSuchFieldException e) { + field = null; + } + NEXT_FIELD = field; + } + + @Override + public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, MethodInterceptResult result) throws Throwable { + String connection = ((Connection) objInst.getSkyWalkingDynamicField()).getConnectedUrl(); + Object next = allArguments[0]; + while (next != null) { + if (!skipTrace(next)) { + Message message = (Message) next; + EnhancedInstance enhanced = (EnhancedInstance) next; + AbstractSpan span = ContextManager.createExitSpan("Nats/Pub/" + message.getSubject(), connection); + addCommonTag(span, message, connection); + Optional.ofNullable(enhanced.getSkyWalkingDynamicField()) + .ifPresent(snapshot -> ContextManager.continued((ContextSnapshot) snapshot)); + injectCarrier(message); + LOGGER.info("global traceId -> {}", ContextManager.getGlobalTraceId()); + //escape from message's lifecycle ahead of time always correct + enhanced.setSkyWalkingDynamicField(null); + } + next = next(next); + } + } + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, Object ret) throws Throwable { + while (ContextManager.isActive()) { + AbstractSpan abstractSpan = ContextManager.activeSpan(); + ContextManager.stopSpan(abstractSpan); + } + return ret; + } + + @Override + public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, Throwable t) { + while (ContextManager.isActive()) { + AbstractSpan span = ContextManager.activeSpan(); + span.log(t); + span.errorOccurred(); + ContextManager.stopSpan(span); + } + Object next = allArguments[0]; + try { + while (next != null) { + if (!skipTrace(next)) { + EnhancedInstance enhanced = (EnhancedInstance) next; + //escape from message's lifecycle ahead of time always correct + enhanced.setSkyWalkingDynamicField(null); + } + next = next(next); + } + } catch (IllegalAccessException e) { + LOGGER.warn("nats client plugin error", e); + } + + } + + private NatsMessage next(Object message) throws IllegalAccessException { + if (NEXT_FIELD == null) { + return null; + } + if (!(message instanceof NatsMessage)) { + return null; + } + return (NatsMessage) NEXT_FIELD.get(message); + } + +} diff --git a/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/define/NatsConnectionInstrumentation.java b/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/define/NatsConnectionInstrumentation.java new file mode 100644 index 0000000000..6ec872b4ef --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/define/NatsConnectionInstrumentation.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.apm.plugin.nats.client.define; + +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine; +import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; +import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName; + +public class NatsConnectionInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { + + private static final String ENHANCE_CLASS = "io.nats.client.impl.NatsConnection"; + + private static final String CREATE_DISPATCHER_INTERCEPTOR = "org.apache.skywalking.apm.plugin.nats.client.CreateDispatcherInterceptor"; + private static final String REPLY_MSG_INTERCEPTOR = "org.apache.skywalking.apm.plugin.nats.client.DeliverReplyInterceptor"; + + @Override + protected ClassMatch enhanceClass() { + return byName(ENHANCE_CLASS); + } + + @Override + public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { + return new ConstructorInterceptPoint[0]; + } + + @Override + public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { + return new InstanceMethodsInterceptPoint[]{ + new InstanceMethodsInterceptPoint() { + @Override + public ElementMatcher getMethodsMatcher() { + return named("createDispatcher").and(takesArguments(1)); + } + + @Override + public String getMethodsInterceptor() { + return CREATE_DISPATCHER_INTERCEPTOR; + } + + @Override + public boolean isOverrideArgs() { + return true; + } + }, + new InstanceMethodsInterceptPoint() { + @Override + public ElementMatcher getMethodsMatcher() { + return named("deliverReply").and(takesArguments(1)); + } + + @Override + public String getMethodsInterceptor() { + return REPLY_MSG_INTERCEPTOR; + } + + @Override + public boolean isOverrideArgs() { + return true; + } + } + }; + } +} \ No newline at end of file diff --git a/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/define/NatsConnectionWriterInstrumentation.java b/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/define/NatsConnectionWriterInstrumentation.java new file mode 100644 index 0000000000..1ea2435bfc --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/define/NatsConnectionWriterInstrumentation.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.apm.plugin.nats.client.define; + +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine; +import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch; +import org.apache.skywalking.apm.plugin.nats.client.NatsClientPluginConfig; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; +import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName; + +public class NatsConnectionWriterInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { + + private static final String ENHANCE_CLASS = "io.nats.client.impl.NatsConnectionWriter"; + private static final String PUBLISH_INTERCEPTOR_CLASS_NAME = "org.apache.skywalking.apm.plugin.nats.client.WriterSendMessageBatchInterceptor"; + private static final String CONSTRUCTOR_INTERCEPTOR = "org.apache.skywalking.apm.plugin.nats.client.WriterConstructorInterceptor"; + private static final String QUEUE_INTERCEPTOR_CLASS_NAME = "org.apache.skywalking.apm.plugin.nats.client.WriterQueueInterceptor"; + + private static final InstanceMethodsInterceptPoint QUEUE_INTERCEPTOR = new InstanceMethodsInterceptPoint() { + @Override + public ElementMatcher getMethodsMatcher() { + return named("queue").and(takesArguments(1)); + } + + @Override + public String getMethodsInterceptor() { + return QUEUE_INTERCEPTOR_CLASS_NAME; + } + + @Override + public boolean isOverrideArgs() { + return false; + } + }; + + private static final InstanceMethodsInterceptPoint SEND_MSG_INTERCEPTOR = new InstanceMethodsInterceptPoint() { + @Override + public ElementMatcher getMethodsMatcher() { + return named("sendMessageBatch").and(takesArguments(3)); + } + + @Override + public String getMethodsInterceptor() { + return PUBLISH_INTERCEPTOR_CLASS_NAME; + } + + @Override + public boolean isOverrideArgs() { + return true; + } + }; + + @Override + protected ClassMatch enhanceClass() { + return byName(ENHANCE_CLASS); + } + + @Override + public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { + return new ConstructorInterceptPoint[]{ + new ConstructorInterceptPoint() { + @Override + public ElementMatcher getConstructorMatcher() { + return takesArguments(1); + } + + @Override + public String getConstructorInterceptor() { + return CONSTRUCTOR_INTERCEPTOR; + } + } + }; + } + + @Override + public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { + if (NatsClientPluginConfig.Plugin.NatsClient.ENABLE_FULL_TRACE) { + return new InstanceMethodsInterceptPoint[]{QUEUE_INTERCEPTOR, SEND_MSG_INTERCEPTOR}; + } + return new InstanceMethodsInterceptPoint[]{QUEUE_INTERCEPTOR}; + } + +} diff --git a/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/define/NatsJetStreamInstrumentation.java b/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/define/NatsJetStreamInstrumentation.java new file mode 100644 index 0000000000..7bed065abe --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/define/NatsJetStreamInstrumentation.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.apm.plugin.nats.client.define; + +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine; +import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; +import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName; + +public class NatsJetStreamInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { + + private static final String ENHANCE_CLASS = "io.nats.client.impl.NatsJetStream"; + + private static final String CREATE_SUB_INTERCEPTOR = "org.apache.skywalking.apm.plugin.nats.client.CreateSubInterceptor"; + private static final String CONSTRUCTOR_INTERCEPTOR = "org.apache.skywalking.apm.plugin.nats.client.NatsJetStreamConstructorInterceptor"; + + @Override + protected ClassMatch enhanceClass() { + return byName(ENHANCE_CLASS); + } + + @Override + public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { + return new ConstructorInterceptPoint[]{ + new ConstructorInterceptPoint() { + @Override + public ElementMatcher getConstructorMatcher() { + return takesArguments(2); + } + + @Override + public String getConstructorInterceptor() { + return CONSTRUCTOR_INTERCEPTOR; + } + } + }; + } + + @Override + public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { + return new InstanceMethodsInterceptPoint[]{ + new InstanceMethodsInterceptPoint() { + @Override + public ElementMatcher getMethodsMatcher() { + return named("createSubscription").and(takesArguments(7)); + } + + @Override + public String getMethodsInterceptor() { + return CREATE_SUB_INTERCEPTOR; + } + + @Override + public boolean isOverrideArgs() { + return true; + } + } + }; + } + +} \ No newline at end of file diff --git a/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/define/NatsMessageInstrumentation.java b/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/define/NatsMessageInstrumentation.java new file mode 100644 index 0000000000..7fbc2f14be --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/define/NatsMessageInstrumentation.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.apm.plugin.nats.client.define; + +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine; +import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch; +import org.apache.skywalking.apm.plugin.nats.client.NatsClientPluginConfig; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; +import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName; + +/* + * If ENABLE_FULL_TRACE set to true , we need enhance NatsMessage for adding Skywalking dynamic field , that used to hold ContextSnapshot + * See WriterQueueInterceptor + * See WriterSendMessageBatchInterceptor + * See org.apache.skywalking.apm.plugin.nats.client.NatsPluginConfig + * + * BTW , ACK is done by publishing a message , So we needn't enhance ACK method + */ +public class NatsMessageInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { + + private static final String ENHANCE_CLASS = "io.nats.client.impl.NatsMessage"; + private static final String PUBLISH_INTERCEPTOR = "org.apache.skywalking.apm.plugin.nats.client.NatsMessageInterceptor"; + + @Override + protected ClassMatch enhanceClass() { + return byName(ENHANCE_CLASS); + } + + @Override + public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { + return new ConstructorInterceptPoint[0]; + } + + @Override + public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { + if (NatsClientPluginConfig.Plugin.NatsClient.ENABLE_FULL_TRACE) { + return new InstanceMethodsInterceptPoint[]{ + new InstanceMethodsInterceptPoint() { + @Override + public ElementMatcher getMethodsMatcher() { + return named("toString").and(takesArguments(0)); + } + + @Override + public String getMethodsInterceptor() { + return PUBLISH_INTERCEPTOR; + } + + @Override + public boolean isOverrideArgs() { + return false; + } + } + }; + } + return new InstanceMethodsInterceptPoint[0]; + } +} diff --git a/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/define/NatsSubscriptionInstrumentation.java b/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/define/NatsSubscriptionInstrumentation.java new file mode 100644 index 0000000000..1a35e7f2c3 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/define/NatsSubscriptionInstrumentation.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.apm.plugin.nats.client.define; + +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine; +import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; +import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName; + +public class NatsSubscriptionInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { + + private static final String ENHANCE_CLASS = "io.nats.client.impl.NatsSubscription"; + private static final String NEXT_MSG_INTERCEPTOR = "org.apache.skywalking.apm.plugin.nats.client.SubscriptionNextMsgInterceptor"; + private static final String CONSTRUCTOR_INTERCEPTOR = "org.apache.skywalking.apm.plugin.nats.client.NatsSubscriptionConstructorInterceptor"; + + @Override + protected ClassMatch enhanceClass() { + return byName(ENHANCE_CLASS); + } + + @Override + public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { + return new ConstructorInterceptPoint[]{ + new ConstructorInterceptPoint() { + @Override + public ElementMatcher getConstructorMatcher() { + return takesArguments(5); + } + + @Override + public String getConstructorInterceptor() { + return CONSTRUCTOR_INTERCEPTOR; + } + } + }; + } + + @Override + public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { + return new InstanceMethodsInterceptPoint[]{ + new InstanceMethodsInterceptPoint() { + @Override + public ElementMatcher getMethodsMatcher() { + return named("nextMessageInternal").and(takesArguments(1)); + } + + @Override + public String getMethodsInterceptor() { + return NEXT_MSG_INTERCEPTOR; + } + + @Override + public boolean isOverrideArgs() { + return false; + } + } + }; + } +} diff --git a/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/resources/skywalking-plugin.def b/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/resources/skywalking-plugin.def new file mode 100644 index 0000000000..955e56f04a --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/nats-client-plugin/src/main/resources/skywalking-plugin.def @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +nats-client=org.apache.skywalking.apm.plugin.nats.client.define.NatsMessageInstrumentation +nats-client=org.apache.skywalking.apm.plugin.nats.client.define.NatsConnectionInstrumentation +nats-client=org.apache.skywalking.apm.plugin.nats.client.define.NatsConnectionWriterInstrumentation +nats-client=org.apache.skywalking.apm.plugin.nats.client.define.NatsSubscriptionInstrumentation +nats-client=org.apache.skywalking.apm.plugin.nats.client.define.NatsJetStreamInstrumentation \ No newline at end of file diff --git a/apm-sniffer/apm-sdk-plugin/pom.xml b/apm-sniffer/apm-sdk-plugin/pom.xml index 5381626931..512c879cd0 100644 --- a/apm-sniffer/apm-sdk-plugin/pom.xml +++ b/apm-sniffer/apm-sdk-plugin/pom.xml @@ -123,6 +123,7 @@ guava-eventbus-plugin hutool-plugins micronaut-plugins + nats-client-plugin pom diff --git a/docs/en/setup/service-agent/java-agent/Plugin-list.md b/docs/en/setup/service-agent/java-agent/Plugin-list.md index 60b9c2d106..75e72ec915 100644 --- a/docs/en/setup/service-agent/java-agent/Plugin-list.md +++ b/docs/en/setup/service-agent/java-agent/Plugin-list.md @@ -143,3 +143,4 @@ - hutool-http-5.x - micronaut-http-client-3.2.x-3.6.x - micronaut-http-server-3.2.x-3.6.x +- nats-client diff --git a/pom.xml b/pom.xml index 3273392e3b..8a3957e101 100755 --- a/pom.xml +++ b/pom.xml @@ -393,6 +393,7 @@ ${project.build.testSourceDirectory} scenarios/okhttp-scenario scenarios/spring-4.3.x-scenario +scenarios/nats-client-scenario **/*.properties, diff --git a/test/plugin/scenarios/nats-client-scenario/bin/startup.sh b/test/plugin/scenarios/nats-client-scenario/bin/startup.sh new file mode 100644 index 0000000000..171cc43a99 --- /dev/null +++ b/test/plugin/scenarios/nats-client-scenario/bin/startup.sh @@ -0,0 +1,31 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +home="$(cd "$(dirname $0)"; pwd)" + +java -jar ${agent_opts} -Dserver.port=8080 \ + -Dskywalking.plugin.natsclient.enable_full_trace=true \ + -Dskywalking.agent.service_name=scenario-8080- \ + -Dnats.server=nats-server \ + ${home}/../libs/nats-client-scenario.jar & + +java -jar ${agent_opts} -Dserver.port=8081 \ + -Dskywalking.plugin.natsclient.enable_full_trace=false \ + -Dskywalking.agent.service_name=scenario-8081- \ + -Dnats.server=nats-server \ + ${home}/../libs/nats-client-scenario.jar & diff --git a/test/plugin/scenarios/nats-client-scenario/config/expectedData.yaml b/test/plugin/scenarios/nats-client-scenario/config/expectedData.yaml new file mode 100644 index 0000000000..eda4b2de89 --- /dev/null +++ b/test/plugin/scenarios/nats-client-scenario/config/expectedData.yaml @@ -0,0 +1,1214 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +segmentItems: + - serviceName: scenario-8081- + segmentSize: ge 15 + segments: + - segmentId: not null + spans: + - operationName: Nats/Pub/$JS.API.STREAM.INFO.scenario-8081-test-stream-3 + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Exit + peer: nats://nats-server:4222 + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: url, value: 'nats://nats-server:4222'} + - segmentId: not null + spans: + - operationName: Nats/Pub/$JS.API.STREAM.CREATE.scenario-8081-test-stream-3 + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Exit + peer: nats://nats-server:4222 + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: url, value: 'nats://nats-server:4222'} + - segmentId: not null + spans: + - operationName: Nats/Pub/$JS.API.CONSUMER.CREATE.scenario-8081-test-stream-3 + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Exit + peer: nats://nats-server:4222 + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: url, value: 'nats://nats-server:4222'} + - segmentId: not null + spans: + - operationName: Nats/Pub/$JS.API.STREAM.INFO.scenario-8081-test-stream-4 + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Exit + peer: nats://nats-server:4222 + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: url, value: 'nats://nats-server:4222'} + - segmentId: not null + spans: + - operationName: Nats/Pub/$JS.API.STREAM.CREATE.scenario-8081-test-stream-4 + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Exit + peer: nats://nats-server:4222 + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: url, value: 'nats://nats-server:4222'} + - segmentId: not null + spans: + - operationName: Nats/Pub/$JS.API.STREAM.NAMES + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Exit + peer: nats://nats-server:4222 + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: url, value: 'nats://nats-server:4222'} + - segmentId: not null + spans: + - operationName: Nats/Pub/$JS.API.CONSUMER.INFO.scenario-8081-test-stream-4.scenario-8081-test-stream-4-durable + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Exit + peer: nats://nats-server:4222 + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: url, value: 'nats://nats-server:4222'} + - segmentId: not null + spans: + - operationName: Nats/Pub/$JS.API.CONSUMER.DURABLE.CREATE.scenario-8081-test-stream-4.scenario-8081-test-stream-4-durable + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Exit + peer: nats://nats-server:4222 + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: url, value: 'nats://nats-server:4222'} + - segmentId: not null + spans: + - operationName: Nats/Pub/$JS.API.CONSUMER.MSG.NEXT.scenario-8081-test-stream-4.scenario-8081-test-stream-4-durable + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Exit + peer: nats://nats-server:4222 + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: url, value: 'nats://nats-server:4222'} + - segmentId: not null + spans: + - operationName: Nats/Sub/scenario-8081-subject-1 + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Entry + peer: '' + skipAnalysis: false + tags: + - {key: sid, value: '1'} + - {key: url, value: 'nats://nats-server:4222'} + refs: + - {parentEndpoint: 'GET:/nats/start/remote', networkAddress: 'nats://nats-server:4222', + refType: CrossProcess, parentSpanId: 1, parentTraceSegmentId: not null, + parentServiceInstance: not null, parentService: scenario-8081-, + traceId: not null} + - segmentId: not null + spans: + - operationName: not null + operationId: 0 + parentSpanId: 0 + spanId: 1 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Exit + peer: nats://nats-server:4222 + skipAnalysis: false + tags: + - {key: url, value: 'nats://nats-server:4222'} + - operationName: Nats/Sub/scenario-8081-subject-2 + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Entry + peer: '' + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: sid, value: '1'} + - {key: url, value: 'nats://nats-server:4222'} + refs: + - {parentEndpoint: 'GET:/nats/start/remote', networkAddress: 'nats://nats-server:4222', + refType: CrossProcess, parentSpanId: 2, parentTraceSegmentId: not null, + parentServiceInstance: not null, parentService: scenario-8081-, + traceId: not null} + - segmentId: not null + spans: + - operationName: not null + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Entry + peer: '' + skipAnalysis: false + tags: + - {key: sid, value: '1'} + - {key: url, value: 'nats://nats-server:4222'} + refs: + - {parentEndpoint: Nats/Sub/scenario-8081-subject-2, networkAddress: 'nats://nats-server:4222', + refType: CrossProcess, parentSpanId: 1, parentTraceSegmentId: not null, + parentServiceInstance: not null, parentService: scenario-8081-, + traceId: not null} + - segmentId: not null + spans: + - operationName: not null + operationId: 0 + parentSpanId: 0 + spanId: 1 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Exit + peer: nats://nats-server:4222 + skipAnalysis: false + tags: + - {key: url, value: 'nats://nats-server:4222'} + - operationName: Nats/Sub/scenario-8081-subject-3 + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Entry + peer: '' + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: sid, value: '2'} + - {key: url, value: 'nats://nats-server:4222'} + refs: + - {parentEndpoint: 'GET:/nats/start/remote', networkAddress: 'nats://nats-server:4222', + refType: CrossProcess, parentSpanId: 4, parentTraceSegmentId: not null, + parentServiceInstance: not null, parentService: scenario-8081-, + traceId: not null} + - segmentId: not null + spans: + - operationName: Nats/Sub/scenario-8081-subject-4 + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Entry + peer: '' + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: sid, value: '2'} + - {key: url, value: 'nats://nats-server:4222'} + refs: + - {parentEndpoint: 'GET:/nats/start/remote', networkAddress: 'nats://nats-server:4222', + refType: CrossProcess, parentSpanId: 6, parentTraceSegmentId: not null, + parentServiceInstance: not null, parentService: scenario-8081-, + traceId: not null} + - segmentId: not null + spans: + - operationName: Nats/Pub/scenario-8081-subject-1 + operationId: 0 + parentSpanId: 0 + spanId: 1 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Exit + peer: nats://nats-server:4222 + skipAnalysis: false + tags: + - {key: url, value: 'nats://nats-server:4222'} + - operationName: Nats/Pub/scenario-8081-subject-2 + operationId: 0 + parentSpanId: 0 + spanId: 2 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Exit + peer: nats://nats-server:4222 + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: url, value: 'nats://nats-server:4222'} + - operationName: Nats/Pub/$JS.API.STREAM.INFO.scenario-8081-test-stream-3 + operationId: 0 + parentSpanId: 0 + spanId: 3 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Exit + peer: nats://nats-server:4222 + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: url, value: 'nats://nats-server:4222'} + - operationName: Nats/Pub/scenario-8081-subject-3 + operationId: 0 + parentSpanId: 0 + spanId: 4 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Exit + peer: nats://nats-server:4222 + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: url, value: 'nats://nats-server:4222'} + - operationName: Nats/Pub/$JS.API.STREAM.INFO.scenario-8081-test-stream-4 + operationId: 0 + parentSpanId: 0 + spanId: 5 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Exit + peer: nats://nats-server:4222 + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: url, value: 'nats://nats-server:4222'} + - operationName: Nats/Pub/scenario-8081-subject-4 + operationId: 0 + parentSpanId: 0 + spanId: 6 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Exit + peer: nats://nats-server:4222 + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: url, value: 'nats://nats-server:4222'} + - operationName: GET:/nats/start/remote + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: Http + startTime: gt 0 + endTime: gt 0 + componentId: 1 + isError: false + spanType: Entry + peer: '' + skipAnalysis: false + tags: + - {key: url, value: 'http://localhost:8081/nats/start/remote'} + - {key: http.method, value: GET} + - {key: http.status_code, value: '200'} + - serviceName: scenario-8080- + segmentSize: ge 32 + segments: + - segmentId: not null + spans: + - operationName: Nats/Pub/Enqueue/$JS.API.STREAM.INFO.scenario-8080-test-stream-3 + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Local + peer: '' + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: url, value: 'nats://nats-server:4222'} + - segmentId: not null + spans: + - operationName: Nats/Pub/$JS.API.STREAM.INFO.scenario-8080-test-stream-3 + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Exit + peer: nats://nats-server:4222 + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: url, value: 'nats://nats-server:4222'} + refs: + - {parentEndpoint: Nats/Pub/Enqueue/$JS.API.STREAM.INFO.scenario-8080-test-stream-3, + networkAddress: '', refType: CrossThread, parentSpanId: 0, parentTraceSegmentId: not null, + parentServiceInstance: not null, parentService: scenario-8080-, + traceId: not null} + - segmentId: not null + spans: + - operationName: Nats/Pub/Enqueue/$JS.API.CONSUMER.CREATE.scenario-8080-test-stream-3 + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Local + peer: '' + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: url, value: 'nats://nats-server:4222'} + - segmentId: not null + spans: + - operationName: Nats/Pub/$JS.API.STREAM.CREATE.scenario-8080-test-stream-3 + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Exit + peer: nats://nats-server:4222 + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: url, value: 'nats://nats-server:4222'} + refs: + - {parentEndpoint: Nats/Pub/Enqueue/$JS.API.STREAM.CREATE.scenario-8080-test-stream-3, + networkAddress: '', refType: CrossThread, parentSpanId: 0, parentTraceSegmentId: not null, + parentServiceInstance: not null, parentService: scenario-8080-, + traceId: not null} + - segmentId: not null + spans: + - operationName: Nats/Pub/Enqueue/$JS.API.STREAM.CREATE.scenario-8080-test-stream-3 + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Local + peer: '' + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: url, value: 'nats://nats-server:4222'} + - segmentId: not null + spans: + - operationName: Nats/Pub/$JS.API.CONSUMER.CREATE.scenario-8080-test-stream-3 + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Exit + peer: nats://nats-server:4222 + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: url, value: 'nats://nats-server:4222'} + refs: + - {parentEndpoint: Nats/Pub/Enqueue/$JS.API.CONSUMER.CREATE.scenario-8080-test-stream-3, + networkAddress: '', refType: CrossThread, parentSpanId: 0, parentTraceSegmentId: not null, + parentServiceInstance: not null, parentService: scenario-8080-, + traceId: not null} + - segmentId: not null + spans: + - operationName: Nats/Pub/Enqueue/$JS.API.STREAM.INFO.scenario-8080-test-stream-4 + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Local + peer: '' + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: url, value: 'nats://nats-server:4222'} + - segmentId: not null + spans: + - operationName: Nats/Pub/$JS.API.STREAM.INFO.scenario-8080-test-stream-4 + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Exit + peer: nats://nats-server:4222 + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: url, value: 'nats://nats-server:4222'} + refs: + - {parentEndpoint: Nats/Pub/Enqueue/$JS.API.STREAM.INFO.scenario-8080-test-stream-4, + networkAddress: '', refType: CrossThread, parentSpanId: 0, parentTraceSegmentId: not null, + parentServiceInstance: not null, parentService: scenario-8080-, + traceId: not null} + - segmentId: not null + spans: + - operationName: Nats/Pub/Enqueue/$JS.API.STREAM.CREATE.scenario-8080-test-stream-4 + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Local + peer: '' + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: url, value: 'nats://nats-server:4222'} + - segmentId: not null + spans: + - operationName: Nats/Pub/$JS.API.STREAM.CREATE.scenario-8080-test-stream-4 + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Exit + peer: nats://nats-server:4222 + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: url, value: 'nats://nats-server:4222'} + refs: + - {parentEndpoint: Nats/Pub/Enqueue/$JS.API.STREAM.CREATE.scenario-8080-test-stream-4, + networkAddress: '', refType: CrossThread, parentSpanId: 0, parentTraceSegmentId: not null, + parentServiceInstance: not null, parentService: scenario-8080-, + traceId: not null} + - segmentId: not null + spans: + - operationName: Nats/Pub/$JS.API.STREAM.NAMES + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Exit + peer: nats://nats-server:4222 + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: url, value: 'nats://nats-server:4222'} + refs: + - {parentEndpoint: Nats/Pub/Enqueue/$JS.API.STREAM.NAMES, networkAddress: '', + refType: CrossThread, parentSpanId: 0, parentTraceSegmentId: not null, + parentServiceInstance: not null, parentService: scenario-8080-, + traceId: not null} + - segmentId: not null + spans: + - operationName: Nats/Pub/Enqueue/$JS.API.CONSUMER.DURABLE.CREATE.scenario-8080-test-stream-4.scenario-8080-test-stream-4-durable + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Local + peer: '' + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: url, value: 'nats://nats-server:4222'} + - segmentId: not null + spans: + - operationName: Nats/Pub/Enqueue/$JS.API.STREAM.NAMES + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Local + peer: '' + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: url, value: 'nats://nats-server:4222'} + - segmentId: not null + spans: + - operationName: Nats/Pub/$JS.API.CONSUMER.INFO.scenario-8080-test-stream-4.scenario-8080-test-stream-4-durable + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Exit + peer: nats://nats-server:4222 + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: url, value: 'nats://nats-server:4222'} + refs: + - {parentEndpoint: Nats/Pub/Enqueue/$JS.API.CONSUMER.INFO.scenario-8080-test-stream-4.scenario-8080-test-stream-4-durable, + networkAddress: '', refType: CrossThread, parentSpanId: 0, parentTraceSegmentId: not null, + parentServiceInstance: not null, parentService: scenario-8080-, + traceId: not null} + - segmentId: not null + spans: + - operationName: Nats/Pub/Enqueue/$JS.API.CONSUMER.INFO.scenario-8080-test-stream-4.scenario-8080-test-stream-4-durable + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Local + peer: '' + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: url, value: 'nats://nats-server:4222'} + - segmentId: not null + spans: + - operationName: Nats/Pub/$JS.API.CONSUMER.DURABLE.CREATE.scenario-8080-test-stream-4.scenario-8080-test-stream-4-durable + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Exit + peer: nats://nats-server:4222 + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: url, value: 'nats://nats-server:4222'} + refs: + - {parentEndpoint: Nats/Pub/Enqueue/$JS.API.CONSUMER.DURABLE.CREATE.scenario-8080-test-stream-4.scenario-8080-test-stream-4-durable, + networkAddress: '', refType: CrossThread, parentSpanId: 0, parentTraceSegmentId: not null, + parentServiceInstance: not null, parentService: scenario-8080-, + traceId: not null} + - segmentId: not null + spans: + - operationName: Nats/Pub/$JS.API.CONSUMER.MSG.NEXT.scenario-8080-test-stream-4.scenario-8080-test-stream-4-durable + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Exit + peer: nats://nats-server:4222 + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: url, value: 'nats://nats-server:4222'} + refs: + - {parentEndpoint: Nats/Pub/Enqueue/$JS.API.CONSUMER.MSG.NEXT.scenario-8080-test-stream-4.scenario-8080-test-stream-4-durable, + networkAddress: '', refType: CrossThread, parentSpanId: 0, parentTraceSegmentId: not null, + parentServiceInstance: not null, parentService: scenario-8080-, + traceId: not null} + - segmentId: not null + spans: + - operationName: Nats/Pub/Enqueue/$JS.API.CONSUMER.MSG.NEXT.scenario-8080-test-stream-4.scenario-8080-test-stream-4-durable + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Local + peer: '' + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: url, value: 'nats://nats-server:4222'} + - segmentId: not null + spans: + - operationName: HEAD:/nats/check + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: Http + startTime: gt 0 + endTime: gt 0 + componentId: 1 + isError: false + spanType: Entry + peer: '' + skipAnalysis: false + tags: + - {key: url, value: 'http://localhost:8080/nats/check'} + - {key: http.method, value: HEAD} + - {key: http.status_code, value: '200'} + - segmentId: not null + spans: + - operationName: Nats/Pub/scenario-8080-subject-1 + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Exit + peer: nats://nats-server:4222 + skipAnalysis: false + tags: + - {key: url, value: 'nats://nats-server:4222'} + refs: + - {parentEndpoint: 'GET:/nats/start', networkAddress: '', refType: CrossThread, + parentSpanId: 1, parentTraceSegmentId: not null, + parentServiceInstance: not null, parentService: scenario-8080-, + traceId: not null} + - segmentId: not null + spans: + - operationName: Nats/Sub/scenario-8080-subject-1 + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Entry + peer: '' + skipAnalysis: false + tags: + - {key: sid, value: '1'} + - {key: url, value: 'nats://nats-server:4222'} + refs: + - {parentEndpoint: Nats/Pub/scenario-8080-subject-1, networkAddress: 'nats://nats-server:4222', + refType: CrossProcess, parentSpanId: 0, parentTraceSegmentId: not null, + parentServiceInstance: not null, parentService: scenario-8080-, + traceId: not null} + - segmentId: not null + spans: + - operationName: Nats/Pub/scenario-8080-subject-2 + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Exit + peer: nats://nats-server:4222 + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: url, value: 'nats://nats-server:4222'} + refs: + - {parentEndpoint: 'GET:/nats/start', networkAddress: '', refType: CrossThread, + parentSpanId: 2, parentTraceSegmentId: not null, + parentServiceInstance: not null, parentService: scenario-8080-, + traceId: not null} + - segmentId: not null + spans: + - operationName: not null + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Exit + peer: nats://nats-server:4222 + skipAnalysis: false + tags: + - {key: url, value: 'nats://nats-server:4222'} + refs: + - {parentEndpoint: Nats/Sub/scenario-8080-subject-2, networkAddress: '', refType: CrossThread, + parentSpanId: 1, parentTraceSegmentId: not null, + parentServiceInstance: not null, parentService: scenario-8080-, + traceId: not null} + - segmentId: not null + spans: + - operationName: not null + operationId: 0 + parentSpanId: 0 + spanId: 1 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Local + peer: '' + skipAnalysis: false + tags: + - {key: url, value: 'nats://nats-server:4222'} + - operationName: Nats/Sub/scenario-8080-subject-2 + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Entry + peer: '' + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: sid, value: '1'} + - {key: url, value: 'nats://nats-server:4222'} + refs: + - {parentEndpoint: Nats/Pub/scenario-8080-subject-2, networkAddress: 'nats://nats-server:4222', + refType: CrossProcess, parentSpanId: 0, parentTraceSegmentId: not null, + parentServiceInstance: not null, parentService: scenario-8080-, + traceId: not null} + - segmentId: not null + spans: + - operationName: not null + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Entry + peer: '' + skipAnalysis: false + tags: + - {key: sid, value: '1'} + - {key: url, value: 'nats://nats-server:4222'} + refs: + - {parentEndpoint: not null, + networkAddress: 'nats://nats-server:4222', refType: CrossProcess, parentSpanId: 0, + parentTraceSegmentId: not null, + parentServiceInstance: not null, parentService: scenario-8080-, + traceId: not null} + - segmentId: not null + spans: + - operationName: Nats/Pub/$JS.API.STREAM.INFO.scenario-8080-test-stream-3 + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Exit + peer: nats://nats-server:4222 + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: url, value: 'nats://nats-server:4222'} + refs: + - {parentEndpoint: 'GET:/nats/start', networkAddress: '', refType: CrossThread, + parentSpanId: 3, parentTraceSegmentId: not null, + parentServiceInstance: not null, parentService: scenario-8080-, + traceId: not null} + - segmentId: not null + spans: + - operationName: not null + operationId: 0 + parentSpanId: 0 + spanId: 1 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Local + peer: '' + skipAnalysis: false + tags: + - {key: url, value: 'nats://nats-server:4222'} + - operationName: Nats/Sub/scenario-8080-subject-3 + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Entry + peer: '' + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: sid, value: '2'} + - {key: url, value: 'nats://nats-server:4222'} + refs: + - {parentEndpoint: Nats/Pub/scenario-8080-subject-3, networkAddress: 'nats://nats-server:4222', + refType: CrossProcess, parentSpanId: 0, parentTraceSegmentId: not null, + parentServiceInstance: not null, parentService: scenario-8080-, + traceId: not null} + - segmentId: not null + spans: + - operationName: not null + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Exit + peer: nats://nats-server:4222 + skipAnalysis: false + tags: + - {key: url, value: 'nats://nats-server:4222'} + refs: + - {parentEndpoint: Nats/Sub/scenario-8080-subject-3, networkAddress: '', refType: CrossThread, + parentSpanId: 1, parentTraceSegmentId: not null, + parentServiceInstance: not null, parentService: scenario-8080-, + traceId: not null} + - segmentId: not null + spans: + - operationName: Nats/Pub/scenario-8080-subject-3 + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Exit + peer: nats://nats-server:4222 + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: url, value: 'nats://nats-server:4222'} + refs: + - {parentEndpoint: 'GET:/nats/start', networkAddress: '', refType: CrossThread, + parentSpanId: 4, parentTraceSegmentId: not null, + parentServiceInstance: not null, parentService: scenario-8080-, + traceId: not null} + - segmentId: not null + spans: + - operationName: Nats/Pub/scenario-8080-subject-4 + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Exit + peer: nats://nats-server:4222 + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: url, value: 'nats://nats-server:4222'} + refs: + - {parentEndpoint: 'GET:/nats/start', networkAddress: '', refType: CrossThread, + parentSpanId: 6, parentTraceSegmentId: not null, + parentServiceInstance: not null, parentService: scenario-8080-, + traceId: not null} + - segmentId: not null + spans: + - operationName: Nats/Sub/scenario-8080-subject-4 + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Entry + peer: '' + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: sid, value: '2'} + - {key: url, value: 'nats://nats-server:4222'} + refs: + - {parentEndpoint: Nats/Pub/scenario-8080-subject-4, networkAddress: 'nats://nats-server:4222', + refType: CrossProcess, parentSpanId: 0, parentTraceSegmentId: not null, + parentServiceInstance: not null, parentService: scenario-8080-, + traceId: not null} + - segmentId: not null + spans: + - operationName: Nats/Pub/Enqueue/scenario-8080-subject-1 + operationId: 0 + parentSpanId: 0 + spanId: 1 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Local + peer: '' + skipAnalysis: false + tags: + - {key: url, value: 'nats://nats-server:4222'} + - operationName: Nats/Pub/Enqueue/scenario-8080-subject-2 + operationId: 0 + parentSpanId: 0 + spanId: 2 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Local + peer: '' + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: url, value: 'nats://nats-server:4222'} + - operationName: Nats/Pub/Enqueue/$JS.API.STREAM.INFO.scenario-8080-test-stream-3 + operationId: 0 + parentSpanId: 0 + spanId: 3 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Local + peer: '' + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: url, value: 'nats://nats-server:4222'} + - operationName: Nats/Pub/Enqueue/scenario-8080-subject-3 + operationId: 0 + parentSpanId: 0 + spanId: 4 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Local + peer: '' + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: url, value: 'nats://nats-server:4222'} + - operationName: Nats/Pub/Enqueue/$JS.API.STREAM.INFO.scenario-8080-test-stream-4 + operationId: 0 + parentSpanId: 0 + spanId: 5 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Local + peer: '' + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: url, value: 'nats://nats-server:4222'} + - operationName: Nats/Pub/Enqueue/scenario-8080-subject-4 + operationId: 0 + parentSpanId: 0 + spanId: 6 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Local + peer: '' + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: url, value: 'nats://nats-server:4222'} + - operationName: GET:/nats/start + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: Http + startTime: gt 0 + endTime: gt 0 + componentId: 1 + isError: false + spanType: Entry + peer: '' + skipAnalysis: false + tags: + - {key: url, value: 'http://localhost:8080/nats/start'} + - {key: http.method, value: GET} + - {key: http.status_code, value: '200'} + - segmentId: not null + spans: + - operationName: Nats/Pub/$JS.API.STREAM.INFO.scenario-8080-test-stream-4 + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 132 + isError: false + spanType: Exit + peer: nats://nats-server:4222 + skipAnalysis: false + tags: + - {key: reply_to, value: not null} + - {key: url, value: 'nats://nats-server:4222'} + refs: + - {parentEndpoint: 'GET:/nats/start', networkAddress: '', refType: CrossThread, + parentSpanId: 5, parentTraceSegmentId: not null, + parentServiceInstance: not null, parentService: scenario-8080-, + traceId: not null} \ No newline at end of file diff --git a/test/plugin/scenarios/nats-client-scenario/configuration.yml b/test/plugin/scenarios/nats-client-scenario/configuration.yml new file mode 100644 index 0000000000..48b10fdb91 --- /dev/null +++ b/test/plugin/scenarios/nats-client-scenario/configuration.yml @@ -0,0 +1,35 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +type: jvm +entryService: http://localhost:8080/nats/start +healthCheck: http://localhost:8080/nats/check +startScript: ./bin/startup.sh +depends_on: + - nats-server +dependencies: + nats-server: + image: nats:2.8.4-alpine3.15 + hostname: nats-server + environment: + - nats.server=nats-server + expose: + - "4222" + entrypoint: + - nats-server + - "--auth" + - "abcdefgh" + - "--jetstream" \ No newline at end of file diff --git a/test/plugin/scenarios/nats-client-scenario/pom.xml b/test/plugin/scenarios/nats-client-scenario/pom.xml new file mode 100644 index 0000000000..9f1804687e --- /dev/null +++ b/test/plugin/scenarios/nats-client-scenario/pom.xml @@ -0,0 +1,126 @@ + + + + + org.apache.skywalking.apm.testcase + nats-client-scenario + 1.0.0 + jar + + 4.0.0 + + + UTF-8 + 8 + 8 + 2.6.0 + 2.7.3 + + + skywalking-nats-scenario + + + + + org.springframework.boot + spring-boot-dependencies + ${spring.boot.version} + pom + import + + + + + + + + org.springframework.boot + spring-boot-starter + + + + org.springframework.boot + spring-boot-starter-web + + + + io.nats + jnats + ${test.framework.version} + + + + org.projectlombok + lombok + provided + + + + + + + sonatype releases + https://oss.sonatype.org/content/repositories/releases + + true + + + + + + nats-client-scenario + + + org.springframework.boot + spring-boot-maven-plugin + 2.7.3 + + + + repackage + + + + + + + org.apache.maven.plugins + maven-assembly-plugin + + + assemble + package + + single + + + + src/main/assembly/assembly.xml + + ./target/ + + + + + + + + \ No newline at end of file diff --git a/test/plugin/scenarios/nats-client-scenario/src/main/assembly/assembly.xml b/test/plugin/scenarios/nats-client-scenario/src/main/assembly/assembly.xml new file mode 100644 index 0000000000..f9e30f5e37 --- /dev/null +++ b/test/plugin/scenarios/nats-client-scenario/src/main/assembly/assembly.xml @@ -0,0 +1,41 @@ + + + + + zip + + + + + ./bin + 0775 + + + + + + ${project.build.directory}/nats-client-scenario.jar + ./libs + 0775 + + + diff --git a/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/Application.java b/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/Application.java new file mode 100644 index 0000000000..54de1f2df3 --- /dev/null +++ b/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/Application.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.apm.testcase.nats.client; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class Application { + + public static void main(String[] args) { + SpringApplication.run(Application.class, args); + } + +} diff --git a/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/controller/StartController.java b/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/controller/StartController.java new file mode 100644 index 0000000000..851802a875 --- /dev/null +++ b/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/controller/StartController.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.apm.testcase.nats.client.controller; + +import org.apache.skywalking.apm.testcase.nats.client.publisher.JetStreamPublisher; +import org.apache.skywalking.apm.testcase.nats.client.publisher.JetStreamPublisherFetcher; +import org.apache.skywalking.apm.testcase.nats.client.publisher.NormalPublisher; +import org.apache.skywalking.apm.testcase.nats.client.publisher.ReqReplyPublisher; +import org.apache.skywalking.apm.testcase.nats.client.subscriber.JetStreamFetcherConsumer; +import org.apache.skywalking.apm.testcase.nats.client.subscriber.JetStreamHandlerConsumer; +import org.apache.skywalking.apm.testcase.nats.client.subscriber.NextMsgConsumer; +import org.apache.skywalking.apm.testcase.nats.client.subscriber.ReqReplyConsumer; +import org.apache.skywalking.apm.testcase.nats.client.work.StopSignal; +import org.apache.skywalking.apm.testcase.nats.client.work.WorkBuilder; +import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +@RestController +@RequestMapping("nats") +public class StartController { + + private final List works = new ArrayList<>(); + + private final StopSignal stopSignal = new StopSignal(); + + @GetMapping(value = "check") + public String check() throws Exception { + URL url = new URL("http://localhost:8081/nats/check/remote"); + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.getResponseCode(); + connection.disconnect(); + return "success"; + } + + @GetMapping(value = "check/remote") + public String check2() throws Exception { + return "success"; + } + + @GetMapping(value = "start", produces = MediaType.TEXT_HTML_VALUE) + public String start() throws Exception { + CountDownLatch countDownLatch = new CountDownLatch(1); + new Thread(() -> { + requestRemote(); + countDownLatch.countDown(); + }).start(); + // test ENABLE_FULL_TRACE false + works.forEach(WorkBuilder.Work::publish); + countDownLatch.await(); + return "success"; + } + + @GetMapping(value = "start/remote", produces = MediaType.TEXT_HTML_VALUE) + public String remote() throws Exception { + works.forEach(WorkBuilder.Work::publish); + return "success"; + } + + // test ENABLE_FULL_TRACE false + private void requestRemote() { + try { + URL url = new URL("http://localhost:8081/nats/start/remote"); + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.getResponseCode(); + connection.disconnect(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @PostConstruct + public void init() { + //normal connection + String server = System.getProperty("nats.server"); + String subjectPrefix = System.getProperty("skywalking.agent.service_name", ""); + if (server == null) { + throw new RuntimeException("missing property : nats.server"); + } +// // test normal message have pub enqueue no sub + works.add(new WorkBuilder("message-subject-1", subjectPrefix + "subject-1", server) + .build(new NormalPublisher(), new NextMsgConsumer(stopSignal))); +// +// // test request-reply message + works.add(new WorkBuilder("request-reply-subject-2", subjectPrefix + "subject-2", server) + .build(new ReqReplyPublisher(), new ReqReplyConsumer())); + +// // test stream message and handle message + works.add(new WorkBuilder("stream-subject-3", subjectPrefix + "subject-3", server) + .build(new JetStreamPublisher(subjectPrefix + "test-stream-3"), + new JetStreamHandlerConsumer(subjectPrefix + "test-stream-3"))); + + // test stream message and pull message + String stream = subjectPrefix + "test-stream-4"; + works.add(new WorkBuilder("request-stream-subject-4", subjectPrefix + "subject-4", server) + .build(new JetStreamPublisherFetcher(stream), new JetStreamFetcherConsumer(stream))); + + works.forEach(WorkBuilder.Work::subscribe); + } + + @PreDestroy + public void stop() { + stopSignal.stop(); + } + +} diff --git a/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/publisher/JetStreamPublisher.java b/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/publisher/JetStreamPublisher.java new file mode 100644 index 0000000000..5cd7033a80 --- /dev/null +++ b/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/publisher/JetStreamPublisher.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.apm.testcase.nats.client.publisher; + +import io.nats.client.Connection; +import io.nats.client.JetStream; +import io.nats.client.api.PublishAck; +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.apm.testcase.nats.client.work.StreamUtil; + +@Slf4j +public class JetStreamPublisher implements Publisher { + private String stream; + + public JetStreamPublisher(String stream) { + this.stream = stream; + } + + @Override + public void publish(Connection connection, String msg, String subject) { + try { + JetStream js = connection.jetStream(); + StreamUtil.initStream(connection, subject, this.stream); + log.info("send message : {} to {}:{}", msg, this.stream, subject); + PublishAck pa = js.publish(buildMsg(subject, msg)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + +} \ No newline at end of file diff --git a/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/publisher/JetStreamPublisherFetcher.java b/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/publisher/JetStreamPublisherFetcher.java new file mode 100644 index 0000000000..76f90990ec --- /dev/null +++ b/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/publisher/JetStreamPublisherFetcher.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.apm.testcase.nats.client.publisher; + +import io.nats.client.Connection; +import io.nats.client.JetStream; +import io.nats.client.api.PublishAck; +import org.apache.skywalking.apm.testcase.nats.client.work.StreamUtil; + +public class JetStreamPublisherFetcher implements Publisher { + + private final String stream; + + public JetStreamPublisherFetcher(String stream) { + this.stream = stream; + } + + @Override + public void publish(Connection connection, String msg, String subject) { + + try { + JetStream js = connection.jetStream(); + StreamUtil.initStream(connection, subject, stream); + PublishAck pa = js.publish(buildMsg(subject, msg)); + } catch (Exception e) { + throw new RuntimeException(e); + } + + } + +} diff --git a/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/publisher/NormalPublisher.java b/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/publisher/NormalPublisher.java new file mode 100644 index 0000000000..c1d6c325c9 --- /dev/null +++ b/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/publisher/NormalPublisher.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.apm.testcase.nats.client.publisher; + +import io.nats.client.Connection; +import lombok.extern.slf4j.Slf4j; + +import java.time.Duration; + +@Slf4j +public class NormalPublisher implements Publisher { + + public void publish(Connection connection, String msg, String subject) { + try { + connection.publish(buildMsg(subject, msg)); + log.info("send message : {} to {}", msg, subject); + connection.flush(Duration.ofSeconds(5)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + +} \ No newline at end of file diff --git a/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/publisher/Publisher.java b/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/publisher/Publisher.java new file mode 100644 index 0000000000..8eed818fad --- /dev/null +++ b/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/publisher/Publisher.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.apm.testcase.nats.client.publisher; + +import io.nats.client.Connection; +import io.nats.client.impl.NatsMessage; + +import java.nio.charset.StandardCharsets; + +public interface Publisher { + + void publish(Connection connection, String msg, String subject); + + default NatsMessage buildMsg(String subject, String msg) { + return NatsMessage.builder() + .data(msg, StandardCharsets.UTF_8) + .subject(subject).build(); + + } +} diff --git a/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/publisher/ReqReplyPublisher.java b/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/publisher/ReqReplyPublisher.java new file mode 100644 index 0000000000..200a851e4d --- /dev/null +++ b/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/publisher/ReqReplyPublisher.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.apm.testcase.nats.client.publisher; + +import io.nats.client.Connection; +import lombok.extern.slf4j.Slf4j; + +import java.time.Duration; + +@Slf4j +public class ReqReplyPublisher implements Publisher { + + @Override + public void publish(Connection connection, String msg, String subject) { + try { + log.info("send message : {} to {}", msg, subject); + connection.request(buildMsg(subject, msg), Duration.ofMinutes(5)); + log.info("receive reply message : {} from {}'reply", msg, subject); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } +} diff --git a/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/subscriber/Consumer.java b/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/subscriber/Consumer.java new file mode 100644 index 0000000000..42c901b7d1 --- /dev/null +++ b/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/subscriber/Consumer.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.apm.testcase.nats.client.subscriber; + +import io.nats.client.Connection; + +public interface Consumer { + //should block util execute totally , and only subscribe once + void subscribe(Connection connection, String subject); +} diff --git a/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/subscriber/JetStreamFetcherConsumer.java b/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/subscriber/JetStreamFetcherConsumer.java new file mode 100644 index 0000000000..4b1ef70c4a --- /dev/null +++ b/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/subscriber/JetStreamFetcherConsumer.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.apm.testcase.nats.client.subscriber; + +import io.nats.client.Connection; +import io.nats.client.JetStream; +import io.nats.client.JetStreamSubscription; +import io.nats.client.Message; +import io.nats.client.PullSubscribeOptions; +import io.nats.client.api.ConsumerConfiguration; +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.apm.testcase.nats.client.work.StreamUtil; + +import java.time.Duration; +import java.util.List; + +@Slf4j +public class JetStreamFetcherConsumer implements Consumer { + + private final String stream; + + public JetStreamFetcherConsumer(String stream) { + this.stream = stream; + } + + @Override + public void subscribe(Connection connection, String subject) { + new Thread(() -> { + try { + ConsumerConfiguration cc = ConsumerConfiguration.builder() + .ackWait(Duration.ofMillis(100)) + .build(); + PullSubscribeOptions pullOptions = PullSubscribeOptions.builder() + .durable(stream + "-durable") + .configuration(cc) + .build(); + StreamUtil.initStream(connection, subject, stream); + JetStream js = connection.jetStream(); + JetStreamSubscription subscribe = js.subscribe(subject, pullOptions); + List messages = subscribe.fetch(1, Duration.ofHours(1)); + if (messages != null) { + messages.forEach(msg -> log.info("received message : {} ", msg)); + } + } catch (Exception e) { + e.printStackTrace(); + } + }).start(); + + } + +} diff --git a/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/subscriber/JetStreamHandlerConsumer.java b/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/subscriber/JetStreamHandlerConsumer.java new file mode 100644 index 0000000000..a8481dba4f --- /dev/null +++ b/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/subscriber/JetStreamHandlerConsumer.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.apm.testcase.nats.client.subscriber; + +import io.nats.client.Connection; +import io.nats.client.JetStream; +import io.nats.client.MessageHandler; +import io.nats.client.PushSubscribeOptions; +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.apm.testcase.nats.client.work.StreamUtil; + +@Slf4j +public class JetStreamHandlerConsumer implements Consumer { + + private String stream; + + private PushSubscribeOptions so; + + public JetStreamHandlerConsumer(String stream) { + this.stream = stream; + this.so = PushSubscribeOptions.builder() + .stream(this.stream) + .build(); + } + + @Override + public void subscribe(Connection connection, String subject) { + try { + JetStream js = connection.jetStream(); + StreamUtil.initStream(connection, subject, this.stream); + MessageHandler handler = msg -> { + log.info("receive : {}, from :{} ,and will ack", subject, msg); + msg.ack(); + }; + js.subscribe(subject, connection.createDispatcher(), handler, true, so); + } catch (Exception e) { + throw new RuntimeException(e); + } + + } +} \ No newline at end of file diff --git a/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/subscriber/NextMsgConsumer.java b/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/subscriber/NextMsgConsumer.java new file mode 100644 index 0000000000..831278f488 --- /dev/null +++ b/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/subscriber/NextMsgConsumer.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.apm.testcase.nats.client.subscriber; + +import io.nats.client.Connection; +import io.nats.client.Message; +import io.nats.client.Subscription; +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.apm.testcase.nats.client.work.StopSignal; + +import java.time.Duration; + +@Slf4j +public class NextMsgConsumer implements Consumer { + + private final StopSignal stopSignal; + + public NextMsgConsumer(StopSignal stopSignal) { + this.stopSignal = stopSignal; + } + + @Override + public void subscribe(Connection connection, String subject) { + Subscription sub = connection.subscribe(subject); + new Thread(() -> { + while (!stopSignal.stopped()) { + try { + Message msg = sub.nextMessage(Duration.ofMinutes(50)); + if (msg != null) { + msg.ack(); + log.info("receive : {}, from :{} ", subject, msg); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }).start(); + } + +} \ No newline at end of file diff --git a/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/subscriber/ReqReplyConsumer.java b/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/subscriber/ReqReplyConsumer.java new file mode 100644 index 0000000000..6a10e81f29 --- /dev/null +++ b/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/subscriber/ReqReplyConsumer.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.apm.testcase.nats.client.subscriber; + +import io.nats.client.Connection; +import io.nats.client.Dispatcher; +import lombok.extern.slf4j.Slf4j; + +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.concurrent.TimeoutException; + +@Slf4j +public class ReqReplyConsumer implements Consumer { + + @Override + public void subscribe(Connection connection, String subject) { + + Dispatcher d = connection.createDispatcher(msg -> { + log.info("receive : {}, from :{} and will reply", subject, msg); + connection.publish(msg.getReplyTo(), "Have received msg".getBytes(StandardCharsets.UTF_8)); + try { + connection.flush(Duration.ofSeconds(5)); + } catch (TimeoutException e) { + throw new RuntimeException(e); + } + }); + d.subscribe(subject); + } + +} diff --git a/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/work/StopSignal.java b/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/work/StopSignal.java new file mode 100644 index 0000000000..fc73dfe66b --- /dev/null +++ b/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/work/StopSignal.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.apm.testcase.nats.client.work; + +public class StopSignal { + private volatile int stop = 1; + + public void stop() { + this.stop = 0; + } + + public boolean stopped() { + return this.stop == 0; + } + +} diff --git a/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/work/StreamUtil.java b/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/work/StreamUtil.java new file mode 100644 index 0000000000..9a10c9c3db --- /dev/null +++ b/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/work/StreamUtil.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.apm.testcase.nats.client.work; + +import io.nats.client.Connection; +import io.nats.client.JetStreamApiException; +import io.nats.client.JetStreamManagement; +import io.nats.client.api.StorageType; +import io.nats.client.api.StreamConfiguration; +import io.nats.client.api.StreamInfo; + +import java.util.List; + +public class StreamUtil { + + public static void initStream(Connection connection, String subject, String stream) throws Exception { + + JetStreamManagement jetStreamManagement = connection.jetStreamManagement(); + StreamInfo streamInfo; + try { + streamInfo = jetStreamManagement.getStreamInfo(stream); + } catch (JetStreamApiException e) { + streamInfo = null; + } + + if (streamInfo == null) { + StreamConfiguration sc = StreamConfiguration.builder() + .name(stream) + .storageType(StorageType.Memory) + .subjects(subject) + .build(); + jetStreamManagement.addStream(sc); + } else { + List subjects = streamInfo.getConfiguration().getSubjects(); + if (!subjects.contains(subject)) { + subjects.add(subject); + jetStreamManagement.updateStream(streamInfo.getConfiguration()); + } + } + } + +} diff --git a/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/work/TrackedConnection.java b/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/work/TrackedConnection.java new file mode 100644 index 0000000000..31b4ee0c1f --- /dev/null +++ b/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/work/TrackedConnection.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.apm.testcase.nats.client.work; + +import io.nats.client.Connection; +import io.nats.client.ErrorListener; +import io.nats.client.Nats; +import io.nats.client.Options; +import lombok.extern.slf4j.Slf4j; +import java.io.IOException; +import java.time.Duration; + +@Slf4j +public class TrackedConnection { + + public static Connection newConnection(String url) throws IOException, InterruptedException { + return Nats.connect(createOptions(url)); + } + + public static Options createOptions(String server) { + Options.Builder builder = new Options.Builder(). + server(server). + connectionTimeout(Duration.ofSeconds(5)). + pingInterval(Duration.ofSeconds(10)). + reconnectWait(Duration.ofSeconds(1)). + maxReconnects(-1). + traceConnection() + .token("abcdefgh".toCharArray()); + + builder = builder.connectionListener((conn, type) -> log.info("Status change " + type)); + builder = builder.errorListener(new ErrorListener() { + @Override + public void exceptionOccurred(Connection conn, Exception exp) { + log.info("ATS connection exception occurred"); + exp.printStackTrace(); + } + + @Override + public void errorOccurred(Connection conn, String error) { + log.info("NATS connection error occurred " + error); + } + }); + + return builder.build(); + } +} diff --git a/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/work/WorkBuilder.java b/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/work/WorkBuilder.java new file mode 100644 index 0000000000..dd85f98526 --- /dev/null +++ b/test/plugin/scenarios/nats-client-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/work/WorkBuilder.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.apm.testcase.nats.client.work; + +import io.nats.client.Connection; +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.apm.testcase.nats.client.publisher.Publisher; +import org.apache.skywalking.apm.testcase.nats.client.subscriber.Consumer; + +@Slf4j +public class WorkBuilder { + private final String message; + private final String subject; + private final String url; + + public WorkBuilder(String message, String subject, String url) { + this.message = message; + this.subject = subject; + this.url = url; + } + + public Work build(Publisher publisher, Consumer consumer) { + + return new Work() { + @Override + public void subscribe() { + Connection consumerCon = WorkBuilder.this.createConnection(); + consumer.subscribe(consumerCon, WorkBuilder.this.subject); + } + + @Override + public void publish() { + Connection connection = WorkBuilder.this.createConnection(); + publisher.publish(connection, message, WorkBuilder.this.subject); + } + }; + } + + private Connection createConnection() { + Connection connection; + try { + connection = TrackedConnection.newConnection(WorkBuilder.this.url); + } catch (Exception e) { + throw new RuntimeException(e); + } + return connection; + } + + public interface Work { + // first subscribe + void subscribe(); + + // then publish message + void publish(); + } + +} diff --git a/test/plugin/scenarios/nats-client-scenario/support-version.list b/test/plugin/scenarios/nats-client-scenario/support-version.list new file mode 100644 index 0000000000..ce53e5e005 --- /dev/null +++ b/test/plugin/scenarios/nats-client-scenario/support-version.list @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +2.15.6 +2.15.4 +2.15.0 +2.14.2 +2.14.0