Skip to content

Commit

Permalink
Merge pull request #93 from newrelic/spring-webflux
Browse files Browse the repository at this point in the history
Spring webflux
  • Loading branch information
tspring committed Oct 29, 2020
2 parents 7c44439 + 1867992 commit f412e6f
Show file tree
Hide file tree
Showing 31 changed files with 572 additions and 219 deletions.
21 changes: 0 additions & 21 deletions instrumentation/netty-reactor-0.6.0/build.gradle

This file was deleted.

17 changes: 17 additions & 0 deletions instrumentation/netty-reactor-0.7.0/NOTICE.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
This product contains a modified part of OpenTelemetry:

* License:

Copyright 2019 The OpenTelemetry Authors

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
or implied. See the License for the specific language governing permissions and limitations under
the License.

* Homepage: https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/master/LICENSE
31 changes: 31 additions & 0 deletions instrumentation/netty-reactor-0.7.0/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
dependencies {
implementation(project(":agent-bridge"))
implementation("io.projectreactor.ipc:reactor-netty:0.7.0.RELEASE")
}

sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8

compileTestJava {
sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8
}

compileJava.options.bootstrapClasspath = null

jar {
manifest { attributes 'Implementation-Title': 'com.newrelic.instrumentation.netty-reactor-0.7.0' }
}

verifyInstrumentation {
// Capped for the compatibility docs plugin
passesOnly 'io.projectreactor.ipc:reactor-netty:[0.7.0.RELEASE,0.7.15.RELEASE]'

// New Versions 0.8.0+ moved here
fails 'io.projectreactor.netty:reactor-netty:[0.8.0.RELEASE,)'
}

site {
title 'Netty Reactor'
type 'Appserver'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package io.netty;

import com.newrelic.agent.bridge.AgentBridge;
import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.Token;
import com.newrelic.api.agent.Trace;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.core.publisher.Operators;
import reactor.util.context.Context;

import java.util.function.BiFunction;
import java.util.function.Function;

// Based on OpenTelemetry code
// https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/master/instrumentation-core/reactor-3.1/src/main/java/io/opentelemetry/instrumentation/reactor/TracingSubscriber.java
public class TokenLinkingSubscriber<T> implements CoreSubscriber<T> {
private final Token token;
private final Subscriber<? super T> subscriber;
private Context context;

public TokenLinkingSubscriber(Subscriber<? super T> subscriber, Context ctx) {
this.subscriber = subscriber;
this.context = ctx;
// newrelic-token is added by spring-webflux-5.1 instrumentation
this.token = ctx.getOrDefault("newrelic-token", null);
}

@Override
public void onSubscribe(Subscription subscription) {
subscriber.onSubscribe(subscription);
}

@Override
public void onNext(T o) {
withNRToken(() -> subscriber.onNext(o));
}

@Override
public void onError(Throwable throwable) {
withNRError(() -> subscriber.onError(throwable), throwable);
}

@Override
public void onComplete() {
subscriber.onComplete();
}

@Override
public Context currentContext() {
return context;
}

@Trace(async = true, excludeFromTransactionTrace = true)
private void withNRToken(Runnable runnable) {
if (token != null && AgentBridge.getAgent().getTransaction(false) == null) {
token.link();
}
runnable.run();
}

@Trace(async = true, excludeFromTransactionTrace = true)
private void withNRError(Runnable runnable, Throwable throwable) {
if (token != null && token.isActive()) {
token.linkAndExpire();
NewRelic.noticeError(throwable);
}
runnable.run();
}

public static <T> Function<? super Publisher<T>, ? extends Publisher<T>> tokenLift() {
return Operators.lift(new TokenLifter<>());
}

private static class TokenLifter<T>
implements BiFunction<Scannable, CoreSubscriber<? super T>, CoreSubscriber<? super T>> {

public TokenLifter() {
}

@Override
public CoreSubscriber<? super T> apply(Scannable publisher, CoreSubscriber<? super T> sub) {
// if Flux/Mono #just, #empty, #error
if (publisher instanceof Fuseable.ScalarCallable) {
return sub;
}
Token token = sub.currentContext().getOrDefault("newrelic-token", null);
if (token != null ) {
return new TokenLinkingSubscriber<>(sub, sub.currentContext());
}
return sub;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package reactor.core.publisher;

import com.newrelic.api.agent.weaver.NewField;
import com.newrelic.api.agent.weaver.Weave;

import java.util.concurrent.atomic.AtomicBoolean;

@Weave(originalName = "reactor.core.publisher.Hooks")
public abstract class Hooks_Instrumentation {
@NewField
public static AtomicBoolean instrumented = new AtomicBoolean(false);
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,22 @@
import com.newrelic.api.agent.weaver.MatchType;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.Weaver;
import io.netty.TokenLinkingSubscriber;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandlerContext_Instrumentation;
import io.netty.handler.codec.http.HttpRequest;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Hooks_Instrumentation;

import static io.netty.TokenLinkingSubscriber.tokenLift;

@Weave(type = MatchType.BaseClass, originalName = "reactor.ipc.netty.http.server.HttpServerHandler")
class HttpServerHandler_Instrumentation {

public void channelRead(ChannelHandlerContext_Instrumentation ctx, Object msg) {
if (!Hooks_Instrumentation.instrumented.getAndSet(true)) {
Hooks.onEachOperator(TokenLinkingSubscriber.class.getName(), tokenLift());
}
Weaver.callOriginal();
if (msg instanceof HttpRequest) {
if (ctx.pipeline().reactiveLayerToken == null) {
Expand Down
17 changes: 17 additions & 0 deletions instrumentation/netty-reactor-0.8.0/NOTICE.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
This product contains a modified part of OpenTelemetry:

* License:

Copyright 2019 The OpenTelemetry Authors

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
or implied. See the License for the specific language governing permissions and limitations under
the License.

* Homepage: https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/master/LICENSE
10 changes: 10 additions & 0 deletions instrumentation/netty-reactor-0.8.0/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,16 @@ dependencies {
implementation("io.projectreactor.netty:reactor-netty:0.8.0.RELEASE")
}

sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8

compileTestJava {
sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8
}

compileJava.options.bootstrapClasspath = null

jar {
manifest { attributes 'Implementation-Title': 'com.newrelic.instrumentation.netty-reactor-0.8.0' }
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package com.nr.instrumentation.reactor.netty;

import com.newrelic.agent.bridge.AgentBridge;
import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.Token;
import com.newrelic.api.agent.Trace;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.core.publisher.Operators;
import reactor.util.context.Context;

import java.util.function.BiFunction;
import java.util.function.Function;

// Based on OpenTelemetry code
// https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/master/instrumentation-core/reactor-3.1/src/main/java/io/opentelemetry/instrumentation/reactor/TracingSubscriber.java
public class TokenLinkingSubscriber<T> implements CoreSubscriber<T> {
private final Token token;
private final Subscriber<? super T> subscriber;
private Context context;

public TokenLinkingSubscriber(Subscriber<? super T> subscriber, Context ctx) {
this.subscriber = subscriber;
this.context = ctx;
// newrelic-token is added by spring-webflux-5.1 instrumentation
this.token = ctx.getOrDefault("newrelic-token", null);
}

@Override
public void onSubscribe(Subscription subscription) {
subscriber.onSubscribe(subscription);
}

@Override
public void onNext(T o) {
withNRToken(() -> subscriber.onNext(o));
}

@Override
public void onError(Throwable throwable) {
withNRError(() -> subscriber.onError(throwable), throwable);
}

@Override
public void onComplete() {
subscriber.onComplete();
}

@Override
public Context currentContext() {
return context;
}

@Trace(async = true, excludeFromTransactionTrace = true)
private void withNRToken(Runnable runnable) {
if (token != null && AgentBridge.getAgent().getTransaction(false) == null) {
token.link();
}
runnable.run();
}

@Trace(async = true, excludeFromTransactionTrace = true)
private void withNRError(Runnable runnable, Throwable throwable) {
if (token != null && token.isActive()) {
token.linkAndExpire();
NewRelic.noticeError(throwable);
}
runnable.run();
}

public static <T> Function<? super Publisher<T>, ? extends Publisher<T>> tokenLift() {
return Operators.lift(new TokenLifter<>());
}

private static class TokenLifter<T>
implements BiFunction<Scannable, CoreSubscriber<? super T>, CoreSubscriber<? super T>> {

public TokenLifter() {
}

@Override
public CoreSubscriber<? super T> apply(Scannable publisher, CoreSubscriber<? super T> sub) {
// if Flux/Mono #just, #empty, #error
if (publisher instanceof Fuseable.ScalarCallable) {
return sub;
}
Token token = sub.currentContext().getOrDefault("newrelic-token", null);
if (token != null ) {
return new TokenLinkingSubscriber<>(sub, sub.currentContext());
}
return sub;
}
}
}

This file was deleted.

This file was deleted.

Loading

0 comments on commit f412e6f

Please sign in to comment.