From c5725d86dfab65cb93c171b655dfd51a06d97930 Mon Sep 17 00:00:00 2001 From: segabriel Date: Sat, 21 Sep 2019 10:54:39 +0300 Subject: [PATCH] Fixed memory leak in jmx bean --- .../membership/MembershipProtocolImpl.java | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java index e7ec2d7d..3e8622bc 100644 --- a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; @@ -43,7 +44,6 @@ import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoSink; -import reactor.core.publisher.ReplayProcessor; import reactor.core.scheduler.Scheduler; public final class MembershipProtocolImpl implements MembershipProtocol { @@ -742,15 +742,22 @@ public static class JmxMonitorMBean extends AbstractMonitorMBean implements Moni public static final int REMOVED_MEMBERS_HISTORY_SIZE = 42; private final MembershipProtocolImpl membershipProtocol; - private final ReplayProcessor removedMembersHistory; + private final List removedMembersHistory; private JmxMonitorMBean(MembershipProtocolImpl membershipProtocol) { this.membershipProtocol = membershipProtocol; - this.removedMembersHistory = ReplayProcessor.create(REMOVED_MEMBERS_HISTORY_SIZE); + this.removedMembersHistory = new CopyOnWriteArrayList<>(); + membershipProtocol .listen() .filter(MembershipEvent::isRemoved) - .subscribe(removedMembersHistory); + .subscribe( + event -> { + removedMembersHistory.add(event); + if (removedMembersHistory.size() > REMOVED_MEMBERS_HISTORY_SIZE) { + removedMembersHistory.remove(0); + } + }); } @Override @@ -782,9 +789,9 @@ public String getSuspectedMembersAsString() { @Override public List getDeadMembers() { - List deadMembers = new ArrayList<>(); - removedMembersHistory.map(MembershipEvent::toString).subscribe(deadMembers::add); - return deadMembers; + return removedMembersHistory.stream() + .map(MembershipEvent::toString) + .collect(Collectors.toList()); } @Override