Skip to content

Commit

Permalink
Merge bdf194f into ab7ac48
Browse files Browse the repository at this point in the history
  • Loading branch information
malafeev committed Jun 7, 2019
2 parents ab7ac48 + bdf194f commit 8ba65e8
Show file tree
Hide file tree
Showing 9 changed files with 332 additions and 7 deletions.
10 changes: 9 additions & 1 deletion opentracing-redis-lettuce/pom.xml
Expand Up @@ -14,7 +14,9 @@
the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>opentracing-redis-parent</artifactId>
<groupId>io.opentracing.contrib</groupId>
Expand Down Expand Up @@ -45,6 +47,12 @@
<artifactId>embedded-redis</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Expand Up @@ -77,8 +77,8 @@
public class TracingRedisAsyncCommands<K, V> implements RedisAsyncCommands<K, V> {

private final RedisAsyncCommands<K, V> commands;
private final TracingHelper helper;
private final TracingConfiguration tracingConfiguration;
final TracingHelper helper;
final TracingConfiguration tracingConfiguration;

/**
* @param commands redis async commands
Expand Down Expand Up @@ -3094,7 +3094,7 @@ public RedisFuture<String> unwatch() {
return prepareRedisFuture(commands.unwatch(), span);
}

private <V> RedisFuture<V> prepareRedisFuture(RedisFuture<V> future, Span span) {
<V> RedisFuture<V> prepareRedisFuture(RedisFuture<V> future, Span span) {
return continueScopeSpan(setCompleteAction(future, span));
}

Expand Down
Expand Up @@ -75,8 +75,8 @@
public class TracingRedisCommands<K, V> implements RedisCommands<K, V> {

private final RedisCommands<K, V> commands;
private final TracingHelper helper;
private final TracingConfiguration tracingConfiguration;
final TracingHelper helper;
final TracingConfiguration tracingConfiguration;

public TracingRedisCommands(RedisCommands<K, V> commands,
TracingConfiguration tracingConfiguration) {
Expand Down
@@ -0,0 +1,67 @@
/*
* Copyright 2017-2019 The OpenTracing Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.opentracing.contrib.redis.lettuce;

import io.lettuce.core.RedisFuture;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands;
import io.opentracing.Span;
import io.opentracing.contrib.redis.common.TracingConfiguration;
import java.util.Arrays;

public class TracingRedisPubSubAsyncCommands<K, V> extends
TracingRedisAsyncCommands<K, V> implements RedisPubSubAsyncCommands<K, V> {
private final RedisPubSubAsyncCommands<K, V> commands;

public TracingRedisPubSubAsyncCommands(
RedisPubSubAsyncCommands<K, V> commands,
TracingConfiguration tracingConfiguration) {
super(commands, tracingConfiguration);
this.commands = commands;
}

@Override
public RedisFuture<Void> psubscribe(K... patterns) {
final Span span = helper.buildSpan("psubscribe");
span.setTag("patterns", Arrays.toString(patterns));
return prepareRedisFuture(commands.psubscribe(patterns), span);
}

@Override
public RedisFuture<Void> punsubscribe(K... patterns) {
final Span span = helper.buildSpan("punsubscribe");
span.setTag("patterns", Arrays.toString(patterns));
return prepareRedisFuture(commands.punsubscribe(patterns), span);
}

@Override
public RedisFuture<Void> subscribe(K... channels) {
final Span span = helper.buildSpan("subscribe");
span.setTag("channels", Arrays.toString(channels));
return prepareRedisFuture(commands.subscribe(channels), span);
}

@Override
public RedisFuture<Void> unsubscribe(K... channels) {
final Span span = helper.buildSpan("unsubscribe");
span.setTag("channels", Arrays.toString(channels));
return prepareRedisFuture(commands.unsubscribe(channels), span);
}

@Override
public StatefulRedisPubSubConnection<K, V> getStatefulConnection() {
return new TracingStatefulRedisPubSubConnection<>(commands.getStatefulConnection(),
tracingConfiguration);
}
}
@@ -0,0 +1,65 @@
/*
* Copyright 2017-2019 The OpenTracing Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.opentracing.contrib.redis.lettuce;

import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands;
import io.opentracing.Span;
import io.opentracing.contrib.redis.common.TracingConfiguration;
import java.util.Arrays;

public class TracingRedisPubSubCommands<K, V> extends TracingRedisCommands<K, V> implements
RedisPubSubCommands<K, V> {
private final RedisPubSubCommands<K, V> commands;

public TracingRedisPubSubCommands(RedisPubSubCommands<K, V> commands,
TracingConfiguration tracingConfiguration) {
super(commands, tracingConfiguration);
this.commands = commands;
}

@Override
public void psubscribe(K... patterns) {
final Span span = helper.buildSpan("psubscribe");
span.setTag("patterns", Arrays.toString(patterns));
helper.decorate(span, () -> commands.psubscribe(patterns));
}

@Override
public void punsubscribe(K... patterns) {
final Span span = helper.buildSpan("punsubscribe");
span.setTag("patterns", Arrays.toString(patterns));
helper.decorate(span, () -> commands.punsubscribe(patterns));
}

@Override
public void subscribe(K... channels) {
final Span span = helper.buildSpan("subscribe");
span.setTag("channels", Arrays.toString(channels));
helper.decorate(span, () -> commands.subscribe(channels));
}

@Override
public void unsubscribe(K... channels) {
final Span span = helper.buildSpan("unsubscribe");
span.setTag("channels", Arrays.toString(channels));
helper.decorate(span, () -> commands.unsubscribe(channels));
}

@Override
public StatefulRedisPubSubConnection<K, V> getStatefulConnection() {
return new TracingStatefulRedisPubSubConnection<>(commands.getStatefulConnection(),
tracingConfiguration);
}
}
@@ -0,0 +1,92 @@
/*
* Copyright 2017-2019 The OpenTracing Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.opentracing.contrib.redis.lettuce;

import static io.opentracing.contrib.redis.common.TracingHelper.nullable;

import io.lettuce.core.pubsub.RedisPubSubListener;
import io.opentracing.Span;
import io.opentracing.contrib.redis.common.TracingConfiguration;
import io.opentracing.contrib.redis.common.TracingHelper;

public class TracingRedisPubSubListener<K, V> implements RedisPubSubListener<K, V> {

private final RedisPubSubListener<K, V> listener;
private final TracingHelper helper;

public TracingRedisPubSubListener(RedisPubSubListener<K, V> listener,
TracingConfiguration tracingConfiguration) {
this.listener = listener;
this.helper = new TracingHelper(tracingConfiguration);
}

@Override
public void message(K channel, V message) {
final Span span = helper.buildSpan("message");
span.setTag("channel", nullable(channel));
span.setTag("message", nullable(message));
helper.decorate(span, () -> listener.message(channel, message));
}

@Override
public void message(K pattern, K channel, V message) {
final Span span = helper.buildSpan("message");
span.setTag("pattern", nullable(pattern));
span.setTag("channel", nullable(channel));
span.setTag("message", nullable(message));
helper.decorate(span, () -> listener.message(pattern, channel, message));
}

@Override
public void subscribed(K channel, long count) {
final Span span = helper.buildSpan("subscribed");
span.setTag("channel", nullable(channel));
span.setTag("count", count);
helper.decorate(span, () -> listener.subscribed(channel, count));
}

@Override
public void psubscribed(K pattern, long count) {
final Span span = helper.buildSpan("psubscribed");
span.setTag("pattern", nullable(pattern));
span.setTag("count", count);
helper.decorate(span, () -> listener.psubscribed(pattern, count));
}

@Override
public void unsubscribed(K channel, long count) {
final Span span = helper.buildSpan("unsubscribed");
span.setTag("channel", nullable(channel));
span.setTag("count", count);
helper.decorate(span, () -> listener.unsubscribed(channel, count));
}

@Override
public void punsubscribed(K pattern, long count) {
final Span span = helper.buildSpan("punsubscribed");
span.setTag("pattern", nullable(pattern));
span.setTag("count", count);
helper.decorate(span, () -> listener.punsubscribed(pattern, count));
}

@Override
public boolean equals(Object obj) {
return listener.equals(obj);
}

@Override
public int hashCode() {
return listener.hashCode();
}
}
Expand Up @@ -29,7 +29,7 @@
public class TracingStatefulRedisConnection<K, V> implements StatefulRedisConnection<K, V> {

private final StatefulRedisConnection<K, V> connection;
private final TracingConfiguration tracingConfiguration;
final TracingConfiguration tracingConfiguration;

/**
* @param connection redis connection
Expand Down
@@ -0,0 +1,60 @@
/*
* Copyright 2017-2019 The OpenTracing Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.opentracing.contrib.redis.lettuce;

import io.lettuce.core.pubsub.RedisPubSubListener;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands;
import io.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands;
import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands;
import io.opentracing.contrib.redis.common.TracingConfiguration;

public class TracingStatefulRedisPubSubConnection<K, V> extends
TracingStatefulRedisConnection<K, V> implements
StatefulRedisPubSubConnection<K, V> {

private final StatefulRedisPubSubConnection<K, V> connection;

public TracingStatefulRedisPubSubConnection(
StatefulRedisPubSubConnection<K, V> connection,
TracingConfiguration tracingConfiguration) {
super(connection, tracingConfiguration);
this.connection = connection;
}

@Override
public RedisPubSubCommands<K, V> sync() {
return new TracingRedisPubSubCommands<>(connection.sync(), tracingConfiguration);
}

@Override
public RedisPubSubAsyncCommands<K, V> async() {
return new TracingRedisPubSubAsyncCommands<>(connection.async(), tracingConfiguration);
}

@Override
public RedisPubSubReactiveCommands<K, V> reactive() {
return connection.reactive();
}

@Override
public void addListener(RedisPubSubListener<K, V> listener) {
connection.addListener(new TracingRedisPubSubListener<>(listener, tracingConfiguration));
}

@Override
public void removeListener(RedisPubSubListener<K, V> listener) {
connection.removeListener(listener);
}
}

0 comments on commit 8ba65e8

Please sign in to comment.