-
Notifications
You must be signed in to change notification settings - Fork 945
Commit
Lettuce now stops LatencyStats when removing the object from latency collection scope. Stopping LatencyStats unregisters from the associated PauseDetector and removes listener resources so GC can remove the object.
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -55,7 +55,10 @@ public class DefaultCommandLatencyCollector implements CommandLatencyCollector { | |
private static final long MAX_LATENCY = TimeUnit.MINUTES.toNanos(5); | ||
|
||
private final CommandLatencyCollectorOptions options; | ||
private Map<CommandLatencyId, Latencies> latencyMetrics = new ConcurrentHashMap<>(CommandType.values().length); | ||
private final AtomicReference<Map<CommandLatencyId, Latencies>> latencyMetricsRef = new AtomicReference<>( | ||
createNewLatencyMap()); | ||
|
||
private volatile boolean stopped; | ||
|
||
public DefaultCommandLatencyCollector(CommandLatencyCollectorOptions options) { | ||
this.options = options; | ||
|
@@ -77,7 +80,7 @@ public void recordCommandLatency(SocketAddress local, SocketAddress remote, Prot | |
return; | ||
} | ||
|
||
Latencies latencies = latencyMetrics.computeIfAbsent(createId(local, remote, commandType), id -> { | ||
Latencies latencies = latencyMetricsRef.get().computeIfAbsent(createId(local, remote, commandType), id -> { | ||
|
||
PauseDetectorWrapper wrapper = PAUSE_DETECTOR.get(); | ||
if (wrapper == null) { | ||
|
@@ -108,28 +111,37 @@ private long rangify(long latency) { | |
|
||
@Override | ||
public boolean isEnabled() { | ||
return latencyMetrics != null && options.isEnabled(); | ||
return options.isEnabled() && !stopped; | ||
} | ||
|
||
@Override | ||
public void shutdown() { | ||
|
||
if (latencyMetrics != null) { | ||
latencyMetrics.clear(); | ||
latencyMetrics = null; | ||
stopped = true; | ||
|
||
Map<CommandLatencyId, Latencies> latenciesMap = latencyMetricsRef.get(); | ||
if (latencyMetricsRef.compareAndSet(latenciesMap, Collections.emptyMap())) { | ||
latenciesMap.values().forEach(Latencies::stop); | ||
} | ||
} | ||
|
||
@Override | ||
public Map<CommandLatencyId, CommandMetrics> retrieveMetrics() { | ||
|
||
Map<CommandLatencyId, Latencies> copy = new HashMap<>(latencyMetrics); | ||
Map<CommandLatencyId, Latencies> latenciesMap = latencyMetricsRef.get(); | ||
Map<CommandLatencyId, Latencies> metricsToUse; | ||
|
||
if (options.resetLatenciesAfterEvent()) { | ||
latencyMetrics.clear(); | ||
|
||
metricsToUse = latenciesMap; | ||
latencyMetricsRef.set(createNewLatencyMap()); | ||
|
||
metricsToUse.values().forEach(Latencies::stop); | ||
} else { | ||
metricsToUse = new HashMap<>(latenciesMap); | ||
} | ||
|
||
return getMetrics(copy); | ||
return getMetrics(metricsToUse); | ||
} | ||
|
||
private Map<CommandLatencyId, CommandMetrics> getMetrics(Map<CommandLatencyId, Latencies> latencyMetrics) { | ||
|
@@ -188,6 +200,10 @@ public static boolean isAvailable() { | |
return LATENCY_UTILS_AVAILABLE && HDR_UTILS_AVAILABLE; | ||
} | ||
|
||
private static ConcurrentHashMap<CommandLatencyId, Latencies> createNewLatencyMap() { | ||
return new ConcurrentHashMap<>(CommandType.values().length); | ||
} | ||
|
||
/** | ||
* Returns a disabled no-op {@link CommandLatencyCollector}. | ||
* | ||
|
@@ -234,6 +250,11 @@ public Histogram getFirstResponseHistogram() { | |
public Histogram getCompletionHistogram() { | ||
return completion.getIntervalHistogram(); | ||
} | ||
|
||
public void stop() { | ||
firstResponse.stop(); | ||
This comment has been minimized.
Sorry, something went wrong.
This comment has been minimized.
Sorry, something went wrong.
mp911de
Author
Collaborator
|
||
completion.stop(); | ||
} | ||
} | ||
|
||
private static class CummulativeLatencies extends Latencies { | ||
|
@mp911de, I was looking into fixing this yesterday. I had similar approach to call LatencyStats.stop(). But that method has a side effect
public synchronized void stop() { pauseTracker.stop(); latencyStatsScheduledExecutor.shutdown(); }
it stops the pauseTracker which is good because it will remove the listener but it also does
latencyStatsScheduledExecutor.shutdown();
which is a static member. Then noone will start it back. But because pauseTracker is not accessible from outside i couldn't figure out how to do this cleanly.Are you sure that the
latencyStatsScheduledExecutor.shutdown();
will not cause a problem?