Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion scouter.agent.java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.4.35</version>
<version>3.3.8.RELEASE</version>
<scope>provided</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import scouter.agent.Logger;
import scouter.agent.util.Tuple;

import java.lang.reflect.Method;

/**
* @author Gun Lee (gunlee01@gmail.com) on 2020/08/08
*/
Expand All @@ -31,7 +33,8 @@ public class ScouterOptimizableOperatorProxy {
public static boolean accessible = false;
public static boolean first = true;

public static Tuple.StringLongPair nameOnCheckpoint(Object candidate, int maxScanDepth) {
public static Tuple.StringLongPair nameOnCheckpoint(Object candidate, int maxScanDepth, boolean isReactor34,
Method isCheckpoint) {
try {
if (!accessible && first) {
try {
Expand All @@ -54,12 +57,14 @@ public static Tuple.StringLongPair nameOnCheckpoint(Object candidate, int maxSca
}
if (closeAssembly instanceof MonoOnAssembly) {
FluxOnAssembly.AssemblySnapshot snapshot = ((MonoOnAssembly) closeAssembly).stacktrace;
if (snapshot != null && snapshot.isCheckpoint()) {
boolean cp = isReactor34 ? (Boolean) isCheckpoint.invoke(snapshot) : snapshot.checkpointed;
if (snapshot != null && cp) {
return new Tuple.StringLongPair(snapshot.cached, snapshot.hashCode());
}
} else if (closeAssembly instanceof FluxOnAssembly) {
FluxOnAssembly.AssemblySnapshot snapshot = ((FluxOnAssembly) closeAssembly).snapshotStack;
if (snapshot != null && snapshot.isCheckpoint()) {
boolean cp = isReactor34 ? (Boolean) isCheckpoint.invoke(snapshot) : snapshot.checkpointed;
if (snapshot != null && cp) {
return new Tuple.StringLongPair(snapshot.cached, snapshot.hashCode());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@ public interface IReactiveSupport {

String dumpScannable(TraceContext traceContext, TraceContext.TimedScannable timedScannable, long now);

boolean isReactor34();

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ public Object monoCoroutineContextHook(Object coroutineContext, TraceContext tra
public String dumpScannable(TraceContext traceContext, TraceContext.TimedScannable timedScannable, long now) {
return null;
}

@Override
public boolean isReactor34() {
return false;
}
};

public static IReactiveSupport create(ClassLoader parent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,33 @@
import scouter.lang.step.ParameterizedMessageStep;
import scouter.util.StringUtil;

import java.lang.reflect.Method;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;

public class ReactiveSupport implements IReactiveSupport {

static Configure configure = Configure.getInstance();
private Method subscriberContextMethod;
private static Method isCheckpoint;
private static boolean isReactor34;

public ReactiveSupport() {
isReactor34 = ReactiveSupportUtils.isSupportReactor34();
try {
if (isReactor34) {
subscriberContextMethod = Mono.class.getMethod("contextWrite", Function.class);
Class<?> assemblySnapshotClass = Class.forName("reactor.core.publisher.FluxOnAssembly$AssemblySnapshot");
isCheckpoint = assemblySnapshotClass.getDeclaredMethod("isCheckpoint");
isCheckpoint.setAccessible(true);
} else {
subscriberContextMethod = Mono.class.getMethod("subscriberContext", Function.class);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public Object subscriptOnContext(Object mono0, final TraceContext traceContext) {
Expand All @@ -58,12 +78,17 @@ public Object subscriptOnContext(Object mono0, final TraceContext traceContext)
}
Mono<?> mono = (Mono<?>) mono0;
traceContext.isReactiveTxidMarked = true;
return mono.contextWrite(new Function<Context, Context>() {

Mono<?> monoChain;
Function<Context, Context> func = new Function<Context, Context>() {
@Override
public Context apply(Context context) {
return context.put(TraceContext.class, traceContext);
}
}).doOnSuccess(new Consumer<Object>() {
};

monoChain = (Mono<?>) subscriberContextMethod.invoke(mono, func);
return monoChain.doOnSuccess(new Consumer<Object>() {
@Override
public void accept(Object o) {
TraceMain.endHttpService(new TraceMain.Stat(traceContext), null);
Expand Down Expand Up @@ -164,7 +189,7 @@ public TxidLifter(CoreSubscriber<T> coreSubscriber, Scannable scannable, Publish
this.traceContext = traceContext;

Tuple.StringLongPair checkpointPair = ScouterOptimizableOperatorProxy
.nameOnCheckpoint(scannable, configure.profile_reactor_checkpoint_search_depth);
.nameOnCheckpoint(scannable, configure.profile_reactor_checkpoint_search_depth, isReactor34, isCheckpoint);
checkpointDesc = checkpointPair.aString;

Integer parentDepth = context.getOrDefault(SubscribeDepth.class, 0);
Expand Down Expand Up @@ -321,4 +346,9 @@ public String dumpScannable(TraceContext traceContext, TraceContext.TimedScannab
ScouterOptimizableOperatorProxy.appendSources4Dump(scannable, builder, configure.profile_reactor_checkpoint_search_depth);
return builder.toString();
}

@Override
public boolean isReactor34() {
return isReactor34;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package scouter.xtra.reactive;

import reactor.core.publisher.Mono;
import scouter.agent.Logger;

import java.util.function.Function;

/**
* @author Gun Lee (gunlee01@gmail.com) on 2/21/24
*/
public class ReactiveSupportUtils {

public static boolean isSupportReactor34() {
try {
Class<?> assemblySnapshotClass = Class.forName("reactor.core.publisher.FluxOnAssembly$AssemblySnapshot");
assemblySnapshotClass.getDeclaredMethod("isCheckpoint");

Class<Mono> monoClass = Mono.class;
Class<?>[] parameterTypes = new Class<?>[]{Function.class};
monoClass.getMethod("contextWrite", parameterTypes);

return true;

} catch (ClassNotFoundException | NoSuchMethodException e) {
e.printStackTrace();
Logger.println("R301", e.getMessage());
return false;
} catch (Exception e) {
e.printStackTrace();
Logger.println("R302", e.getMessage(), e);
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,33 @@
import scouter.lang.step.ParameterizedMessageStep;
import scouter.util.StringUtil;

import java.lang.reflect.Method;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;

public class ReactiveSupportWithCoroutine implements IReactiveSupport {

static Configure configure = Configure.getInstance();
private Method subscriberContextMethod;
private static Method isCheckpoint;
private static boolean isReactor34;

public ReactiveSupportWithCoroutine() {
isReactor34 = ReactiveSupportUtils.isSupportReactor34();
try {
if (isReactor34) {
subscriberContextMethod = Mono.class.getMethod("contextWrite", Function.class);
Class<?> assemblySnapshotClass = Class.forName("reactor.core.publisher.FluxOnAssembly$AssemblySnapshot");
isCheckpoint = assemblySnapshotClass.getDeclaredMethod("isCheckpoint");
isCheckpoint.setAccessible(true);
} else {
subscriberContextMethod = Mono.class.getMethod("subscriberContext", Function.class);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public Object subscriptOnContext(Object mono0, final TraceContext traceContext) {
Expand All @@ -61,12 +81,16 @@ public Object subscriptOnContext(Object mono0, final TraceContext traceContext)
}
Mono<?> mono = (Mono<?>) mono0;
traceContext.isReactiveTxidMarked = true;
return mono.contextWrite(new Function<Context, Context>() {
Mono<?> monoChain;
Function<Context, Context> func = new Function<Context, Context>() {
@Override
public Context apply(Context context) {
return context.put(TraceContext.class, traceContext);
}
}).doOnSuccess(new Consumer<Object>() {
};

monoChain = (Mono<?>) subscriberContextMethod.invoke(mono, func);
return monoChain.doOnSuccess(new Consumer<Object>() {
@Override
public void accept(Object o) {
TraceMain.endHttpService(new TraceMain.Stat(traceContext), null);
Expand Down Expand Up @@ -161,6 +185,7 @@ public static class TxidLifter<T> implements SpanSubscription<T>, Scannable {
private final String checkpointDesc;
private final Integer depth;
private Subscription orgSubs;
private boolean isReactor34;

private enum ReactorCheckPointType {
ON_SUBSCRIBE,
Expand All @@ -176,9 +201,10 @@ public TxidLifter(CoreSubscriber<T> coreSubscriber, Scannable scannable, Publish
this.scannable = scannable;
this.publisher = publisher;
this.traceContext = traceContext;
this.isReactor34 = isReactor34;

Tuple.StringLongPair checkpointPair = ScouterOptimizableOperatorProxy
.nameOnCheckpoint(scannable, configure.profile_reactor_checkpoint_search_depth);
.nameOnCheckpoint(scannable, configure.profile_reactor_checkpoint_search_depth, isReactor34, isCheckpoint);
checkpointDesc = checkpointPair.aString;

Integer parentDepth = context.getOrDefault(SubscribeDepth.class, 0);
Expand Down Expand Up @@ -335,4 +361,9 @@ public String dumpScannable(TraceContext traceContext, TraceContext.TimedScannab
ScouterOptimizableOperatorProxy.appendSources4Dump(scannable, builder, configure.profile_reactor_checkpoint_search_depth);
return builder.toString();
}

@Override
public boolean isReactor34() {
return isReactor34;
}
}