Skip to content

Commit

Permalink
[#8472] Improves Coroutines plugin
Browse files Browse the repository at this point in the history
 1. Changes tracing target (Task.runSafely -> resumeWith)
    = RunSafely is a method that is executed only in case of a specific target, so it has been changed to a general method.
 2. Supports tracing threadName
 3. Supports tracing cancelling event
  • Loading branch information
koo-taejin authored and emeroad committed Jan 24, 2022
1 parent aecb94a commit d478253
Show file tree
Hide file tree
Showing 16 changed files with 653 additions and 453 deletions.
29 changes: 10 additions & 19 deletions agent/src/main/resources/profiles/local/pinpoint.config
Original file line number Diff line number Diff line change
Expand Up @@ -1256,22 +1256,13 @@ profiler.rocketmq.basePackage=
# v1.0.1 ~
###########################################################
profiler.kotlin.coroutines.enable=false
# Coroutine Example
# runBlocking(CoroutineName("pinpoint-coroutines")) {
# // has same name("pinpoint-coroutines") with parentContext
# launch {
# }
# // has name("async-pinpoint-coroutines")
# async(CoroutineName("async-pinpoint-coroutines")) { //
# }
# }
# 1. For trace everyone in the above situation
# profiler.kotlin.coroutines.name.include=pinpoint-coroutines,async-pinpoint-coroutines
# 2. For trace only pinpoint-coroutines context
# profiler.kotlin.coroutines.name.include=pinpoint-coroutines
# Note) Trace "async-pinpoint-coroutines" is impossible. This is because the above root connection does not exist.
#
# Only perfect string matching is supported. (If you do not include any value, it will not be tracked.)
# Comma separated list of coroutines name
# eg) profiler.kotlin.coroutines.name.include=CoroutineMyJob1,CoroutineMyJob2
profiler.kotlin.coroutines.name.include=

#Trace the name of the thread.
#This is important information to check whether the developer's intention and the behavior of the coroutine match.
#Recommend that you use it in the development environment and not in the production environment.
profiler.kotlin.coroutines.record.threadName=false

#Track cancellations and the propagation of cancellations.
#This is important information to check whether the developer's intention and the behavior of the coroutine match.
#Recommend that you use it in the development environment and not in the production environment.
profiler.kotlin.coroutines.record.cancel=false
29 changes: 10 additions & 19 deletions agent/src/main/resources/profiles/release/pinpoint.config
Original file line number Diff line number Diff line change
Expand Up @@ -1279,22 +1279,13 @@ profiler.rocketmq.basePackage=
# v1.0.1 ~
###########################################################
profiler.kotlin.coroutines.enable=false
## Coroutine Example
# runBlocking(CoroutineName("pinpoint-coroutines")) {
# // has same name("pinpoint-coroutines") with parentContext
# launch {
# }
# // has name("async-pinpoint-coroutines")
# async(CoroutineName("async-pinpoint-coroutines")) { //
# }
# }
# 1. For trace everyone in the above situation
# profiler.kotlin.coroutines.name.include=pinpoint-coroutines,async-pinpoint-coroutines
# 2. For trace only pinpoint-coroutines context
# profiler.kotlin.coroutines.name.include=pinpoint-coroutines
# Note) Trace "async-pinpoint-coroutines" is impossible. This is because the above root connection does not exist.
#
# Only perfect string matching is supported. (If you do not include any value, it will not be tracked.)
# Comma separated list of coroutines name
# eg) profiler.kotlin.coroutines.name.include=CoroutineMyJob1,CoroutineMyJob2
profiler.kotlin.coroutines.name.include=

#Trace the name of the thread.
#This is important information to check whether the developer's intention and the behavior of the coroutine match.
#Recommend that you use it in the development environment and not in the production environment.
profiler.kotlin.coroutines.record.threadName=false

#Track cancellations and the propagation of cancellations.
#This is important information to check whether the developer's intention and the behavior of the coroutine match.
#Recommend that you use it in the development environment and not in the production environment.
profiler.kotlin.coroutines.record.cancel=false
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 NAVER Corp.
* Copyright 2022 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,7 +25,9 @@
import com.navercorp.pinpoint.test.plugin.PinpointAgent;
import com.navercorp.pinpoint.test.plugin.PinpointConfig;
import com.navercorp.pinpoint.test.plugin.PinpointPluginTestSuite;

import kotlin.coroutines.CoroutineContext;
import kotlinx.coroutines.CoroutineDispatcher;
import kotlinx.coroutines.Dispatchers;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -49,97 +51,122 @@
public class CoroutinesIT {

private static final String DISPATCH_METHOD = ".dispatch(";
private static final String RUN_METHOD = ".runSafely(";
private static final String RESUME_WITH_METHOD = ".resumeWith(";
private static final String SCHEDULE_RESUME_METHOD = ".scheduleResumeAfterDelay(";
private static final String ASYNC_INVOCATION = "Asynchronous Invocation";

@Test
public void executeOneLaunchBlockTest() {
int minimumExpectedCount = 7;
int launchBlockCount = 1;
int expectedExecutedRunSafelyCount = 2;
public void executeRunBlockingWitoutContext() {
final boolean activeAsync = false;

// This test has 1 ~ 2 executed Async Invocation
// This test has 2 executed runSafely()
// This test has 1 ~ 4 executed Async Invocation
// This test has 4 executed runSafely()
CoroutinesLaunch coroutinesLaunch = new CoroutinesLaunch();
coroutinesLaunch.execute("pinpoint-test");
coroutinesLaunch.executeWithRunBlocking();

PluginTestVerifier verifier = PluginTestVerifierHolder.getInstance();
List<String> executedMethod = verifier.getExecutedMethod();

AtomicInteger index = new AtomicInteger();

// dispatch runblocking
Assert.assertTrue(executedMethod.size() >= minimumExpectedCount);
assertFirstDispatch(executedMethod, index);
for (int i = 0; i < launchBlockCount; i++) {
// dispatch launch job
Assert.assertTrue(executedMethod.get(index.getAndIncrement()).contains(DISPATCH_METHOD));
}
// runBlocking(context) {
Assert.assertTrue(executedMethod.get(index.getAndIncrement()).contains(DISPATCH_METHOD));
// runBlocking(context) {
assertResumeWith(executedMethod, index, activeAsync);

final String[] executeActualMethods = Arrays.copyOfRange(executedMethod.toArray(new String[0]), index.get(), executedMethod.size());
Assert.assertTrue(assertExecutedCount(executeActualMethods, RUN_METHOD, expectedExecutedRunSafelyCount));
Assert.assertTrue(assertExecutedCount(executeActualMethods, ASYNC_INVOCATION, executeActualMethods.length - expectedExecutedRunSafelyCount));
// val job1 = async(CoroutineName("first")) {
Assert.assertTrue(executedMethod.get(index.getAndIncrement()).contains(DISPATCH_METHOD));
// val job2 = launch(CoroutineName("second")) {
Assert.assertTrue(executedMethod.get(index.getAndIncrement()).contains(DISPATCH_METHOD));

// delay(10L) // job1
assertResumeWithAndSchedule(executedMethod, index, activeAsync);

// delay(5L) // job2
assertResumeWithAndSchedule(executedMethod, index, activeAsync);

// println("Hello World 1") // job1
assertResumeWith(executedMethod, index, activeAsync);
// println("Hello World 2") // job2
assertResumeWith(executedMethod, index, activeAsync);
// println("Hello all of jobs") // rootjob
assertResumeWith(executedMethod, index, activeAsync);
}

@Test
public void executeTwoLaunchBlockTest() {
int minimumExpectedCount = 10;
int launchBlockCount = 2;
int expectedExecutedRunSafelyCount = 4;

public void executeRunBlocking() {
final boolean activeAsync = true;

// This test has 1 ~ 4 executed Async Invocation
// This test has 4 executed runSafely()
CoroutinesLaunch coroutinesLaunch = new CoroutinesLaunch();
coroutinesLaunch.execute2("pinpoint-test");
CoroutineDispatcher dispatcher = Dispatchers.getDefault();
coroutinesLaunch.executeWithRunBlocking((CoroutineContext) dispatcher);

PluginTestVerifier verifier = PluginTestVerifierHolder.getInstance();

verifier.awaitTraceCount(17, 10L, 1000L);

List<String> executedMethod = verifier.getExecutedMethod();

AtomicInteger index = new AtomicInteger();

// dispatch runblocking
Assert.assertTrue(executedMethod.size() >= minimumExpectedCount);
assertFirstDispatch(executedMethod, index);
for (int i = 0; i < launchBlockCount; i++) {
// dispatch launch job
Assert.assertTrue(executedMethod.get(index.getAndIncrement()).contains(DISPATCH_METHOD));
// runBlocking(context) {
Assert.assertTrue(executedMethod.get(index.getAndIncrement()).contains(DISPATCH_METHOD));
// runBlocking(context) {
assertResumeWith(executedMethod, index, activeAsync);

// val job1 = async(CoroutineName("first")) {
Assert.assertTrue(executedMethod.get(index.getAndIncrement()).contains(DISPATCH_METHOD));
// val job2 = launch(CoroutineName("second")) {
Assert.assertTrue(executedMethod.get(index.getAndIncrement()).contains(DISPATCH_METHOD));

// println("Hello all of jobs") // rootjob
assertResumeWith(executedMethod, index, activeAsync);

// delay(10L) // job1
assertResumeWithAndSchedule(executedMethod, index, activeAsync);

// delay(5L) // job2
assertResumeWithAndSchedule(executedMethod, index, activeAsync);

// println("Hello World 1") // job1
assertResumeWith(executedMethod, index, activeAsync);
// println("Hello World 2") // job2
assertResumeWith(executedMethod, index, activeAsync);
}

private void assertResumeWithAndSchedule(List<String> executedMethod, AtomicInteger index, boolean activeAsync) {
if (activeAsync) {
Assert.assertTrue(executedMethod.get(index.getAndIncrement()).contains(ASYNC_INVOCATION));
}
Assert.assertTrue(executedMethod.get(index.getAndIncrement()).contains(RESUME_WITH_METHOD));
Assert.assertTrue(executedMethod.get(index.getAndIncrement()).contains(SCHEDULE_RESUME_METHOD));
}

private void assertResumeWith(List<String> executedMethod, AtomicInteger index, boolean activeAsync) {
if (activeAsync) {
Assert.assertTrue(executedMethod.get(index.getAndIncrement()).contains(ASYNC_INVOCATION));
}
Assert.assertTrue(executedMethod.get(index.getAndIncrement()).contains(RESUME_WITH_METHOD));
}

final String[] executeActualMethods = Arrays.copyOfRange(executedMethod.toArray(new String[0]), index.get(), executedMethod.size());
Assert.assertTrue(assertExecutedCount(executeActualMethods, RUN_METHOD, expectedExecutedRunSafelyCount));
Assert.assertTrue(assertExecutedCount(executeActualMethods, ASYNC_INVOCATION, executeActualMethods.length - expectedExecutedRunSafelyCount));
private void assertRunblockingDispatch(List<String> executedMethod, AtomicInteger index) {
Assert.assertTrue(executedMethod.get(index.getAndIncrement()).contains(DISPATCH_METHOD));
Assert.assertTrue(executedMethod.get(index.getAndIncrement()).equals(ASYNC_INVOCATION));
// run dispatchedContinuation
Assert.assertTrue(executedMethod.get(index.getAndIncrement()).contains(RESUME_WITH_METHOD));
}


private void assertFirstDispatch(List<String> executedMethod, AtomicInteger index) {
Assert.assertTrue(executedMethod.get(index.getAndIncrement()).contains(DISPATCH_METHOD));
Assert.assertTrue(executedMethod.get(index.getAndIncrement()).equals(ASYNC_INVOCATION));
// run dispatchedContinuation
Assert.assertTrue(executedMethod.get(index.getAndIncrement()).contains(RUN_METHOD));
Assert.assertTrue(executedMethod.get(index.getAndIncrement()).contains(RESUME_WITH_METHOD));
}

private boolean assertExecutedCount(String[] executeActualMethod, String expectedActualMethod, int expectedCount) {
long count = Arrays.stream(executeActualMethod).filter(e -> e.contains(expectedActualMethod)).count();
return count == expectedCount;
}

@Test
public void executeCurrentThreadTest() {
int expectedCount = 2;

// This test has 0 executed Async Invocation
// This test has 0 executed runSafely()
CoroutinesLaunch coroutinesLaunch = new CoroutinesLaunch();
coroutinesLaunch.executeParentDispatcher("pinpoint-test");

// executes 2 times dispatch
PluginTestVerifier verifier = PluginTestVerifierHolder.getInstance();

List<String> executedMethod = verifier.getExecutedMethod();
Assert.assertEquals(expectedCount, executedMethod.size());
Assert.assertTrue(executedMethod.get(0).contains(DISPATCH_METHOD));
Assert.assertTrue(executedMethod.get(1).contains(DISPATCH_METHOD));
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 NAVER Corp.
* Copyright 2022 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,45 +17,28 @@
package com.navercorp.pinpoint.plugin.kotlinx.coroutines

import kotlinx.coroutines.*
import java.util.*
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext

/**
* @author Taejin Koo
*/
class CoroutinesLaunch {

fun execute(coroutineName: String) {
runBlocking(CoroutineName(coroutineName) + Dispatchers.Default) {
execute0(coroutineName)
}
}

// Concurrently executes both sections
suspend fun execute0(firstName: String, secondName: String = UUID.randomUUID().toString()) =
coroutineScope { // this: CoroutineScope
val job = async(CoroutineName(firstName)) {
@JvmOverloads
fun executeWithRunBlocking(context: CoroutineContext = EmptyCoroutineContext) {
runBlocking(context) {
val job1 = async(CoroutineName("first")) {
delay(10L)
println("Hello World 1")
}
launch(CoroutineName(secondName)) {
val job2 = launch(CoroutineName("second")) {
delay(5L)
println("Hello World 2")
}
job.join()
println("Hello World")
}

fun execute2(coroutineName: String) {
runBlocking(CoroutineName(coroutineName) + Dispatchers.Default) {
execute0(coroutineName, coroutineName)
joinAll(job1, job2)
println("Hello all of jobs")
}
}

fun executeParentDispatcher(coroutineName: String) {
runBlocking(CoroutineName(coroutineName)) {
execute0(coroutineName)
}
}


}
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
profiler.kotlin.coroutines.enable=true
profiler.kotlin.coroutines.name.include=pinpoint-test
profiler.kotlin.coroutines.record.threadName=false
profiler.kotlin.coroutines.record.cancel=false
Loading

0 comments on commit d478253

Please sign in to comment.