From bdf194f4b92696f0a165d48e3a1f8a444ea8b29c Mon Sep 17 00:00:00 2001 From: Sergei Malafeev Date: Sat, 8 Jun 2019 00:09:05 +0800 Subject: [PATCH] #38 add lettuce pubsub Signed-off-by: Sergei Malafeev --- opentracing-redis-lettuce/pom.xml | 10 +- .../lettuce/TracingRedisAsyncCommands.java | 6 +- .../redis/lettuce/TracingRedisCommands.java | 4 +- .../TracingRedisPubSubAsyncCommands.java | 67 ++++++++++++++ .../lettuce/TracingRedisPubSubCommands.java | 65 +++++++++++++ .../lettuce/TracingRedisPubSubListener.java | 92 +++++++++++++++++++ .../TracingStatefulRedisConnection.java | 2 +- .../TracingStatefulRedisPubSubConnection.java | 60 ++++++++++++ .../redis/lettuce/TracingLettuceTest.java | 33 +++++++ 9 files changed, 332 insertions(+), 7 deletions(-) create mode 100644 opentracing-redis-lettuce/src/main/java/io/opentracing/contrib/redis/lettuce/TracingRedisPubSubAsyncCommands.java create mode 100644 opentracing-redis-lettuce/src/main/java/io/opentracing/contrib/redis/lettuce/TracingRedisPubSubCommands.java create mode 100644 opentracing-redis-lettuce/src/main/java/io/opentracing/contrib/redis/lettuce/TracingRedisPubSubListener.java create mode 100644 opentracing-redis-lettuce/src/main/java/io/opentracing/contrib/redis/lettuce/TracingStatefulRedisPubSubConnection.java diff --git a/opentracing-redis-lettuce/pom.xml b/opentracing-redis-lettuce/pom.xml index d7701ff..9510d1c 100644 --- a/opentracing-redis-lettuce/pom.xml +++ b/opentracing-redis-lettuce/pom.xml @@ -14,7 +14,9 @@ the License. --> - + opentracing-redis-parent io.opentracing.contrib @@ -45,6 +47,12 @@ embedded-redis test + + + org.awaitility + awaitility + test + \ No newline at end of file diff --git a/opentracing-redis-lettuce/src/main/java/io/opentracing/contrib/redis/lettuce/TracingRedisAsyncCommands.java b/opentracing-redis-lettuce/src/main/java/io/opentracing/contrib/redis/lettuce/TracingRedisAsyncCommands.java index b7e230b..83cce2c 100644 --- a/opentracing-redis-lettuce/src/main/java/io/opentracing/contrib/redis/lettuce/TracingRedisAsyncCommands.java +++ b/opentracing-redis-lettuce/src/main/java/io/opentracing/contrib/redis/lettuce/TracingRedisAsyncCommands.java @@ -77,8 +77,8 @@ public class TracingRedisAsyncCommands implements RedisAsyncCommands { private final RedisAsyncCommands commands; - private final TracingHelper helper; - private final TracingConfiguration tracingConfiguration; + final TracingHelper helper; + final TracingConfiguration tracingConfiguration; /** * @param commands redis async commands @@ -3094,7 +3094,7 @@ public RedisFuture unwatch() { return prepareRedisFuture(commands.unwatch(), span); } - private RedisFuture prepareRedisFuture(RedisFuture future, Span span) { + RedisFuture prepareRedisFuture(RedisFuture future, Span span) { return continueScopeSpan(setCompleteAction(future, span)); } diff --git a/opentracing-redis-lettuce/src/main/java/io/opentracing/contrib/redis/lettuce/TracingRedisCommands.java b/opentracing-redis-lettuce/src/main/java/io/opentracing/contrib/redis/lettuce/TracingRedisCommands.java index b07f597..99e8fe9 100644 --- a/opentracing-redis-lettuce/src/main/java/io/opentracing/contrib/redis/lettuce/TracingRedisCommands.java +++ b/opentracing-redis-lettuce/src/main/java/io/opentracing/contrib/redis/lettuce/TracingRedisCommands.java @@ -75,8 +75,8 @@ public class TracingRedisCommands implements RedisCommands { private final RedisCommands commands; - private final TracingHelper helper; - private final TracingConfiguration tracingConfiguration; + final TracingHelper helper; + final TracingConfiguration tracingConfiguration; public TracingRedisCommands(RedisCommands commands, TracingConfiguration tracingConfiguration) { diff --git a/opentracing-redis-lettuce/src/main/java/io/opentracing/contrib/redis/lettuce/TracingRedisPubSubAsyncCommands.java b/opentracing-redis-lettuce/src/main/java/io/opentracing/contrib/redis/lettuce/TracingRedisPubSubAsyncCommands.java new file mode 100644 index 0000000..b4a7ae3 --- /dev/null +++ b/opentracing-redis-lettuce/src/main/java/io/opentracing/contrib/redis/lettuce/TracingRedisPubSubAsyncCommands.java @@ -0,0 +1,67 @@ +/* + * Copyright 2017-2019 The OpenTracing Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package io.opentracing.contrib.redis.lettuce; + +import io.lettuce.core.RedisFuture; +import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; +import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands; +import io.opentracing.Span; +import io.opentracing.contrib.redis.common.TracingConfiguration; +import java.util.Arrays; + +public class TracingRedisPubSubAsyncCommands extends + TracingRedisAsyncCommands implements RedisPubSubAsyncCommands { + private final RedisPubSubAsyncCommands commands; + + public TracingRedisPubSubAsyncCommands( + RedisPubSubAsyncCommands commands, + TracingConfiguration tracingConfiguration) { + super(commands, tracingConfiguration); + this.commands = commands; + } + + @Override + public RedisFuture psubscribe(K... patterns) { + final Span span = helper.buildSpan("psubscribe"); + span.setTag("patterns", Arrays.toString(patterns)); + return prepareRedisFuture(commands.psubscribe(patterns), span); + } + + @Override + public RedisFuture punsubscribe(K... patterns) { + final Span span = helper.buildSpan("punsubscribe"); + span.setTag("patterns", Arrays.toString(patterns)); + return prepareRedisFuture(commands.punsubscribe(patterns), span); + } + + @Override + public RedisFuture subscribe(K... channels) { + final Span span = helper.buildSpan("subscribe"); + span.setTag("channels", Arrays.toString(channels)); + return prepareRedisFuture(commands.subscribe(channels), span); + } + + @Override + public RedisFuture unsubscribe(K... channels) { + final Span span = helper.buildSpan("unsubscribe"); + span.setTag("channels", Arrays.toString(channels)); + return prepareRedisFuture(commands.unsubscribe(channels), span); + } + + @Override + public StatefulRedisPubSubConnection getStatefulConnection() { + return new TracingStatefulRedisPubSubConnection<>(commands.getStatefulConnection(), + tracingConfiguration); + } +} diff --git a/opentracing-redis-lettuce/src/main/java/io/opentracing/contrib/redis/lettuce/TracingRedisPubSubCommands.java b/opentracing-redis-lettuce/src/main/java/io/opentracing/contrib/redis/lettuce/TracingRedisPubSubCommands.java new file mode 100644 index 0000000..9d209da --- /dev/null +++ b/opentracing-redis-lettuce/src/main/java/io/opentracing/contrib/redis/lettuce/TracingRedisPubSubCommands.java @@ -0,0 +1,65 @@ +/* + * Copyright 2017-2019 The OpenTracing Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package io.opentracing.contrib.redis.lettuce; + +import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; +import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands; +import io.opentracing.Span; +import io.opentracing.contrib.redis.common.TracingConfiguration; +import java.util.Arrays; + +public class TracingRedisPubSubCommands extends TracingRedisCommands implements + RedisPubSubCommands { + private final RedisPubSubCommands commands; + + public TracingRedisPubSubCommands(RedisPubSubCommands commands, + TracingConfiguration tracingConfiguration) { + super(commands, tracingConfiguration); + this.commands = commands; + } + + @Override + public void psubscribe(K... patterns) { + final Span span = helper.buildSpan("psubscribe"); + span.setTag("patterns", Arrays.toString(patterns)); + helper.decorate(span, () -> commands.psubscribe(patterns)); + } + + @Override + public void punsubscribe(K... patterns) { + final Span span = helper.buildSpan("punsubscribe"); + span.setTag("patterns", Arrays.toString(patterns)); + helper.decorate(span, () -> commands.punsubscribe(patterns)); + } + + @Override + public void subscribe(K... channels) { + final Span span = helper.buildSpan("subscribe"); + span.setTag("channels", Arrays.toString(channels)); + helper.decorate(span, () -> commands.subscribe(channels)); + } + + @Override + public void unsubscribe(K... channels) { + final Span span = helper.buildSpan("unsubscribe"); + span.setTag("channels", Arrays.toString(channels)); + helper.decorate(span, () -> commands.unsubscribe(channels)); + } + + @Override + public StatefulRedisPubSubConnection getStatefulConnection() { + return new TracingStatefulRedisPubSubConnection<>(commands.getStatefulConnection(), + tracingConfiguration); + } +} diff --git a/opentracing-redis-lettuce/src/main/java/io/opentracing/contrib/redis/lettuce/TracingRedisPubSubListener.java b/opentracing-redis-lettuce/src/main/java/io/opentracing/contrib/redis/lettuce/TracingRedisPubSubListener.java new file mode 100644 index 0000000..385e910 --- /dev/null +++ b/opentracing-redis-lettuce/src/main/java/io/opentracing/contrib/redis/lettuce/TracingRedisPubSubListener.java @@ -0,0 +1,92 @@ +/* + * Copyright 2017-2019 The OpenTracing Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package io.opentracing.contrib.redis.lettuce; + +import static io.opentracing.contrib.redis.common.TracingHelper.nullable; + +import io.lettuce.core.pubsub.RedisPubSubListener; +import io.opentracing.Span; +import io.opentracing.contrib.redis.common.TracingConfiguration; +import io.opentracing.contrib.redis.common.TracingHelper; + +public class TracingRedisPubSubListener implements RedisPubSubListener { + + private final RedisPubSubListener listener; + private final TracingHelper helper; + + public TracingRedisPubSubListener(RedisPubSubListener listener, + TracingConfiguration tracingConfiguration) { + this.listener = listener; + this.helper = new TracingHelper(tracingConfiguration); + } + + @Override + public void message(K channel, V message) { + final Span span = helper.buildSpan("message"); + span.setTag("channel", nullable(channel)); + span.setTag("message", nullable(message)); + helper.decorate(span, () -> listener.message(channel, message)); + } + + @Override + public void message(K pattern, K channel, V message) { + final Span span = helper.buildSpan("message"); + span.setTag("pattern", nullable(pattern)); + span.setTag("channel", nullable(channel)); + span.setTag("message", nullable(message)); + helper.decorate(span, () -> listener.message(pattern, channel, message)); + } + + @Override + public void subscribed(K channel, long count) { + final Span span = helper.buildSpan("subscribed"); + span.setTag("channel", nullable(channel)); + span.setTag("count", count); + helper.decorate(span, () -> listener.subscribed(channel, count)); + } + + @Override + public void psubscribed(K pattern, long count) { + final Span span = helper.buildSpan("psubscribed"); + span.setTag("pattern", nullable(pattern)); + span.setTag("count", count); + helper.decorate(span, () -> listener.psubscribed(pattern, count)); + } + + @Override + public void unsubscribed(K channel, long count) { + final Span span = helper.buildSpan("unsubscribed"); + span.setTag("channel", nullable(channel)); + span.setTag("count", count); + helper.decorate(span, () -> listener.unsubscribed(channel, count)); + } + + @Override + public void punsubscribed(K pattern, long count) { + final Span span = helper.buildSpan("punsubscribed"); + span.setTag("pattern", nullable(pattern)); + span.setTag("count", count); + helper.decorate(span, () -> listener.punsubscribed(pattern, count)); + } + + @Override + public boolean equals(Object obj) { + return listener.equals(obj); + } + + @Override + public int hashCode() { + return listener.hashCode(); + } +} diff --git a/opentracing-redis-lettuce/src/main/java/io/opentracing/contrib/redis/lettuce/TracingStatefulRedisConnection.java b/opentracing-redis-lettuce/src/main/java/io/opentracing/contrib/redis/lettuce/TracingStatefulRedisConnection.java index 8af513c..59c7530 100644 --- a/opentracing-redis-lettuce/src/main/java/io/opentracing/contrib/redis/lettuce/TracingStatefulRedisConnection.java +++ b/opentracing-redis-lettuce/src/main/java/io/opentracing/contrib/redis/lettuce/TracingStatefulRedisConnection.java @@ -29,7 +29,7 @@ public class TracingStatefulRedisConnection implements StatefulRedisConnection { private final StatefulRedisConnection connection; - private final TracingConfiguration tracingConfiguration; + final TracingConfiguration tracingConfiguration; /** * @param connection redis connection diff --git a/opentracing-redis-lettuce/src/main/java/io/opentracing/contrib/redis/lettuce/TracingStatefulRedisPubSubConnection.java b/opentracing-redis-lettuce/src/main/java/io/opentracing/contrib/redis/lettuce/TracingStatefulRedisPubSubConnection.java new file mode 100644 index 0000000..6693a76 --- /dev/null +++ b/opentracing-redis-lettuce/src/main/java/io/opentracing/contrib/redis/lettuce/TracingStatefulRedisPubSubConnection.java @@ -0,0 +1,60 @@ +/* + * Copyright 2017-2019 The OpenTracing Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package io.opentracing.contrib.redis.lettuce; + +import io.lettuce.core.pubsub.RedisPubSubListener; +import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; +import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands; +import io.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands; +import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands; +import io.opentracing.contrib.redis.common.TracingConfiguration; + +public class TracingStatefulRedisPubSubConnection extends + TracingStatefulRedisConnection implements + StatefulRedisPubSubConnection { + + private final StatefulRedisPubSubConnection connection; + + public TracingStatefulRedisPubSubConnection( + StatefulRedisPubSubConnection connection, + TracingConfiguration tracingConfiguration) { + super(connection, tracingConfiguration); + this.connection = connection; + } + + @Override + public RedisPubSubCommands sync() { + return new TracingRedisPubSubCommands<>(connection.sync(), tracingConfiguration); + } + + @Override + public RedisPubSubAsyncCommands async() { + return new TracingRedisPubSubAsyncCommands<>(connection.async(), tracingConfiguration); + } + + @Override + public RedisPubSubReactiveCommands reactive() { + return connection.reactive(); + } + + @Override + public void addListener(RedisPubSubListener listener) { + connection.addListener(new TracingRedisPubSubListener<>(listener, tracingConfiguration)); + } + + @Override + public void removeListener(RedisPubSubListener listener) { + connection.removeListener(listener); + } +} diff --git a/opentracing-redis-lettuce/src/test/java/io/opentracing/contrib/redis/lettuce/TracingLettuceTest.java b/opentracing-redis-lettuce/src/test/java/io/opentracing/contrib/redis/lettuce/TracingLettuceTest.java index a709621..98646e8 100644 --- a/opentracing-redis-lettuce/src/test/java/io/opentracing/contrib/redis/lettuce/TracingLettuceTest.java +++ b/opentracing-redis-lettuce/src/test/java/io/opentracing/contrib/redis/lettuce/TracingLettuceTest.java @@ -13,6 +13,8 @@ */ package io.opentracing.contrib.redis.lettuce; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.core.IsEqual.equalTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertSame; @@ -20,12 +22,16 @@ import io.lettuce.core.api.StatefulRedisConnection; import io.lettuce.core.api.async.RedisAsyncCommands; import io.lettuce.core.api.sync.RedisCommands; +import io.lettuce.core.pubsub.RedisPubSubAdapter; +import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; +import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands; import io.opentracing.Scope; import io.opentracing.Span; import io.opentracing.contrib.redis.common.TracingConfiguration; import io.opentracing.mock.MockSpan; import io.opentracing.mock.MockTracer; import java.util.List; +import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Before; @@ -53,6 +59,29 @@ public void after() { } } + @Test + public void pubSub() { + RedisClient client = RedisClient.create("redis://localhost"); + StatefulRedisPubSubConnection connection = + new TracingStatefulRedisPubSubConnection<>(client.connectPubSub(), + new TracingConfiguration.Builder(mockTracer).build()); + + connection.addListener(new RedisPubSubAdapter<>()); + + RedisPubSubCommands commands = connection.sync(); + commands.subscribe("channel"); + + final RedisCommands commands2 = client.connect().sync(); + commands2.publish("channel", "msg"); + + client.shutdown(); + + await().atMost(15, TimeUnit.SECONDS).until(reportedSpansSize(), equalTo(3)); + + List spans = mockTracer.finishedSpans(); + assertEquals(3, spans.size()); + } + @Test public void sync() { RedisClient client = RedisClient.create("redis://localhost"); @@ -121,4 +150,8 @@ public void async_continue_span() throws Exception { List spans = mockTracer.finishedSpans(); assertEquals(2, spans.size()); } + + private Callable reportedSpansSize() { + return () -> mockTracer.finishedSpans().size(); + } }