Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

updates versions and fixes memleak in UnboundedProcessor #1106

Merged
merged 4 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ subprojects {
apply plugin: 'com.github.sherter.google-java-format'
apply plugin: 'com.github.vlsi.gradle-extensions'

ext['reactor-bom.version'] = '2020.0.32'
ext['logback.version'] = '1.2.10'
ext['netty-bom.version'] = '4.1.93.Final'
ext['netty-boringssl.version'] = '2.0.61.Final'
ext['reactor-bom.version'] = '2020.0.39'
ext['logback.version'] = '1.3.14'
ext['netty-bom.version'] = '4.1.106.Final'
ext['netty-boringssl.version'] = '2.0.62.Final'
ext['hdrhistogram.version'] = '2.1.12'
ext['mockito.version'] = '4.11.0'
ext['slf4j.version'] = '1.7.36'
Expand Down
5 changes: 3 additions & 2 deletions rsocket-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,14 @@ dependencies {
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'

jcstressImplementation(project(":rsocket-test"))
jcstressImplementation 'org.slf4j:slf4j-api'
jcstressImplementation "ch.qos.logback:logback-classic"
jcstressImplementation 'io.projectreactor:reactor-test'
}

jcstress {
mode = 'quick' //quick, default, tough
jcstressDependency = "org.openjdk.jcstress:jcstress-core:0.15"
mode = 'sanity' //sanity, quick, default, tough
jcstressDependency = "org.openjdk.jcstress:jcstress-core:0.16"
}

jar {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.rsocket.core.StressSubscriber;
import io.rsocket.utils.FastLogger;
import java.util.Arrays;
import java.util.ConcurrentModificationException;
import org.openjdk.jcstress.annotations.Actor;
import org.openjdk.jcstress.annotations.Arbiter;
import org.openjdk.jcstress.annotations.Expect;
Expand All @@ -14,14 +17,17 @@
import org.openjdk.jcstress.infra.results.L_Result;
import reactor.core.Fuseable;
import reactor.core.publisher.Hooks;
import reactor.util.Logger;

public abstract class UnboundedProcessorStressTest {

static {
Hooks.onErrorDropped(t -> {});
}

final UnboundedProcessor unboundedProcessor = new UnboundedProcessor();
final Logger logger = new FastLogger(getClass().getName());

final UnboundedProcessor unboundedProcessor = new UnboundedProcessor(logger);

@JCStressTest
@Outcome(
Expand Down Expand Up @@ -145,6 +151,8 @@ public void arbiter(LLL_Result r) {
stressSubscriber.values.forEach(ByteBuf::release);

r.r3 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();

checkOutcomes(this, r.toString(), logger);
}
}

Expand Down Expand Up @@ -270,6 +278,8 @@ public void arbiter(LLL_Result r) {
stressSubscriber.values.forEach(ByteBuf::release);

r.r3 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();

checkOutcomes(this, r.toString(), logger);
}
}

Expand Down Expand Up @@ -375,6 +385,8 @@ public void arbiter(LLL_Result r) {
stressSubscriber.values.forEach(ByteBuf::release);

r.r3 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();

checkOutcomes(this, r.toString(), logger);
}
}

Expand Down Expand Up @@ -476,6 +488,8 @@ public void arbiter(LLL_Result r) {
stressSubscriber.values.forEach(ByteBuf::release);

r.r3 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();

checkOutcomes(this, r.toString(), logger);
}
}

Expand Down Expand Up @@ -578,6 +592,8 @@ public void arbiter(LLL_Result r) {
stressSubscriber.values.forEach(ByteBuf::release);

r.r3 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();

checkOutcomes(this, r.toString(), logger);
}
}

Expand Down Expand Up @@ -701,6 +717,8 @@ public void arbiter(LLL_Result r) {
stressSubscriber.values.forEach(ByteBuf::release);

r.r3 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();

checkOutcomes(this, r.toString(), logger);
}
}

Expand Down Expand Up @@ -781,6 +799,8 @@ public void arbiter(LLL_Result r) {
stressSubscriber.values.forEach(ByteBuf::release);

r.r3 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();

checkOutcomes(this, r.toString(), logger);
}
}

Expand Down Expand Up @@ -837,9 +857,15 @@ public void arbiter(LLL_Result r) {
+ stressSubscriber.onErrorCalls * 2
+ stressSubscriber.droppedErrors.size() * 3;

if (stressSubscriber.concurrentOnNext || stressSubscriber.concurrentOnComplete) {
throw new ConcurrentModificationException("boo");
}

stressSubscriber.values.forEach(ByteBuf::release);

r.r3 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();

checkOutcomes(this, r.toString(), logger);
}
}

Expand Down Expand Up @@ -892,6 +918,8 @@ public void arbiter(LLL_Result r) {
stressSubscriber.values.forEach(ByteBuf::release);

r.r3 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();

checkOutcomes(this, r.toString(), logger);
}
}

Expand Down Expand Up @@ -1107,6 +1135,8 @@ public void arbiter(LLLL_Result r) {
stressSubscriber.values.forEach(ByteBuf::release);

r.r4 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();

checkOutcomes(this, r.toString(), logger);
}
}

Expand Down Expand Up @@ -1238,6 +1268,8 @@ public void arbiter(LLLL_Result r) {
stressSubscriber.values.forEach(ByteBuf::release);

r.r4 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();

checkOutcomes(this, r.toString(), logger);
}
}

Expand Down Expand Up @@ -1390,6 +1422,8 @@ public void arbiter(LLLL_Result r) {
stressSubscriber.values.forEach(ByteBuf::release);

r.r4 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();

checkOutcomes(this, r.toString(), logger);
}
}

Expand Down Expand Up @@ -1522,6 +1556,8 @@ public void arbiter(LLLL_Result r) {
stressSubscriber.values.forEach(ByteBuf::release);

r.r4 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();

checkOutcomes(this, r.toString(), logger);
}
}

Expand Down Expand Up @@ -1587,6 +1623,8 @@ public void arbiter(LLLL_Result r) {
stressSubscriber.values.forEach(ByteBuf::release);

r.r4 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();

checkOutcomes(this, r.toString(), logger);
}
}

Expand Down Expand Up @@ -1652,6 +1690,8 @@ public void arbiter(LLLL_Result r) {
stressSubscriber.values.forEach(ByteBuf::release);

r.r4 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();

checkOutcomes(this, r.toString(), logger);
}
}

Expand All @@ -1678,6 +1718,16 @@ public void subscribe2() {
@Arbiter
public void arbiter(L_Result r) {
r.r1 = stressSubscriber1.onErrorCalls + stressSubscriber2.onErrorCalls;

checkOutcomes(this, r.toString(), logger);
}
}

static void checkOutcomes(Object instance, String result, Logger logger) {
if (Arrays.stream(instance.getClass().getDeclaredAnnotationsByType(Outcome.class))
.flatMap(o -> Arrays.stream(o.id()))
.noneMatch(s -> s.equalsIgnoreCase(result))) {
throw new RuntimeException(result + " " + logger);
}
}
}
137 changes: 137 additions & 0 deletions rsocket-core/src/jcstress/java/io/rsocket/utils/FastLogger.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package io.rsocket.utils;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import reactor.util.Logger;

/**
* Implementation of {@link Logger} which is based on the {@link ThreadLocal} based queue which
* collects all the events on the per-thread basis. </br> Such logger is designed to have all events
* stored during the stress-test run and then sorted and printed out once all the Threads completed
* execution (inside the {@link org.openjdk.jcstress.annotations.Arbiter} annotated method. </br>
* Note, this implementation only supports trace-level logs and ignores all others, it is intended
* to be used by {@link reactor.core.publisher.StateLogger}.
*/
public class FastLogger implements Logger {

final Map<Thread, List<String>> queues = new ConcurrentHashMap<>();

final ThreadLocal<List<String>> logsQueueLocal =
ThreadLocal.withInitial(
() -> {
final ArrayList<String> logs = new ArrayList<>(100);
queues.put(Thread.currentThread(), logs);
return logs;
});

private final String name;

public FastLogger(String name) {
this.name = name;
}

@Override
public String toString() {
return queues
.values()
.stream()
.flatMap(List::stream)
.sorted(
Comparator.comparingLong(
s -> {
Pattern pattern = Pattern.compile("\\[(.*?)]");
Matcher matcher = pattern.matcher(s);
matcher.find();
return Long.parseLong(matcher.group(1));
}))
.collect(Collectors.joining("\n"));
}

@Override
public String getName() {
return this.name;
}

@Override
public boolean isTraceEnabled() {
return true;
}

@Override
public void trace(String msg) {
logsQueueLocal.get().add(String.format("[%s] %s", System.nanoTime(), msg));
}

@Override
public void trace(String format, Object... arguments) {
trace(String.format(format, arguments));
}

@Override
public void trace(String msg, Throwable t) {
trace(String.format("%s, %s", msg, Arrays.toString(t.getStackTrace())));
}

@Override
public boolean isDebugEnabled() {
return false;
}

@Override
public void debug(String msg) {}

@Override
public void debug(String format, Object... arguments) {}

@Override
public void debug(String msg, Throwable t) {}

@Override
public boolean isInfoEnabled() {
return false;
}

@Override
public void info(String msg) {}

@Override
public void info(String format, Object... arguments) {}

@Override
public void info(String msg, Throwable t) {}

@Override
public boolean isWarnEnabled() {
return false;
}

@Override
public void warn(String msg) {}

@Override
public void warn(String format, Object... arguments) {}

@Override
public void warn(String msg, Throwable t) {}

@Override
public boolean isErrorEnabled() {
return false;
}

@Override
public void error(String msg) {}

@Override
public void error(String format, Object... arguments) {}

@Override
public void error(String msg, Throwable t) {}
}