From 4ad2679afa47823d39bd76ef9c76c42ea37770e0 Mon Sep 17 00:00:00 2001 From: Diego Date: Mon, 17 Jun 2019 18:58:36 -0300 Subject: [PATCH 1/2] remove AtomicGetOrElseUpdateOnTrieMap::atomicGetOrElseUpdate method in favor of TrieMap::getOrElseUpdate --- core/kamon-core/src/main/scala/kamon/Utilities.scala | 2 +- .../src/main/scala/kamon/metric/MetricRegistry.scala | 10 +++++----- core/kamon-core/src/main/scala/kamon/package.scala | 6 +----- .../src/main/scala/kamon/trace/AdaptiveSampler.scala | 6 +++--- .../akka/AkkaClusterShardingMetrics.scala | 6 +++--- .../scala/kamon/instrumentation/akka/AkkaMetrics.scala | 2 +- .../kamon/instrumentation/akka/AkkaRemoteMetrics.scala | 2 +- .../pekko/PekkoClusterShardingMetrics.scala | 6 +++--- .../kamon/instrumentation/pekko/PekkoMetrics.scala | 2 +- .../instrumentation/pekko/PekkoRemoteMetrics.scala | 2 +- 10 files changed, 20 insertions(+), 24 deletions(-) diff --git a/core/kamon-core/src/main/scala/kamon/Utilities.scala b/core/kamon-core/src/main/scala/kamon/Utilities.scala index 2c1efb07f..070be64ea 100644 --- a/core/kamon-core/src/main/scala/kamon/Utilities.scala +++ b/core/kamon-core/src/main/scala/kamon/Utilities.scala @@ -46,7 +46,7 @@ trait Utilities { self: Configuration => * empty. */ def filter(configKey: String): Filter = - _filters.atomicGetOrElseUpdate(configKey, Filter.from(configKey)) + _filters.getOrElseUpdate(configKey, Filter.from(configKey)) /** * Kamon's Clock implementation. diff --git a/core/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala b/core/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala index 23c175222..569066805 100644 --- a/core/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala +++ b/core/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala @@ -50,7 +50,7 @@ class MetricRegistry(config: Config, clock: Clock) { Metric.Counter = { val metric = validateInstrumentType[Metric.Counter] { - _metrics.atomicGetOrElseUpdate(name, _factory.counter(name, description, unit, autoUpdateInterval)) + _metrics.getOrElseUpdate(name, _factory.counter(name, description, unit, autoUpdateInterval)) } (name, Instrument.Type.Counter) checkDescription(metric.name, metric.description, description) @@ -66,7 +66,7 @@ class MetricRegistry(config: Config, clock: Clock) { Metric.Gauge = { val metric = validateInstrumentType[Metric.Gauge] { - _metrics.atomicGetOrElseUpdate(name, _factory.gauge(name, description, unit, autoUpdateInterval)) + _metrics.getOrElseUpdate(name, _factory.gauge(name, description, unit, autoUpdateInterval)) } (name, Instrument.Type.Gauge) checkDescription(metric.name, metric.description, description) @@ -82,7 +82,7 @@ class MetricRegistry(config: Config, clock: Clock) { autoUpdateInterval: Option[Duration]): Metric.Histogram = { val metric = validateInstrumentType[Metric.Histogram] { - _metrics.atomicGetOrElseUpdate(name, _factory.histogram(name, description, unit, dynamicRange, autoUpdateInterval)) + _metrics.getOrElseUpdate(name, _factory.histogram(name, description, unit, dynamicRange, autoUpdateInterval)) } (name, Instrument.Type.Histogram) checkDescription(metric.name, metric.description, description) @@ -98,7 +98,7 @@ class MetricRegistry(config: Config, clock: Clock) { def timer(name: String, description: Option[String], dynamicRange: Option[DynamicRange], autoUpdateInterval: Option[Duration]): Metric.Timer = { val metric = validateInstrumentType[Metric.Timer] { - _metrics.atomicGetOrElseUpdate(name, _factory.timer(name, description, Some(MeasurementUnit.time.nanoseconds), + _metrics.getOrElseUpdate(name, _factory.timer(name, description, Some(MeasurementUnit.time.nanoseconds), dynamicRange, autoUpdateInterval)) } (name, Instrument.Type.Timer) @@ -115,7 +115,7 @@ class MetricRegistry(config: Config, clock: Clock) { autoUpdateInterval: Option[Duration]): Metric.RangeSampler = { val metric = validateInstrumentType[Metric.RangeSampler] { - _metrics.atomicGetOrElseUpdate(name, _factory.rangeSampler(name, description, unit, dynamicRange, autoUpdateInterval)) + _metrics.getOrElseUpdate(name, _factory.rangeSampler(name, description, unit, dynamicRange, autoUpdateInterval)) } (name, Instrument.Type.RangeSampler) checkDescription(metric.name, metric.description, description) diff --git a/core/kamon-core/src/main/scala/kamon/package.scala b/core/kamon-core/src/main/scala/kamon/package.scala index 7ddc3f5a0..80f099566 100644 --- a/core/kamon-core/src/main/scala/kamon/package.scala +++ b/core/kamon-core/src/main/scala/kamon/package.scala @@ -64,14 +64,10 @@ package object kamon { /** - * Workaround to the non thread-safe [scala.collection.concurrent.TrieMap#getOrElseUpdate()] method. More details on - * why this is necessary can be found at [[https://issues.scala-lang.org/browse/SI-7943]]. + * Atomic variant of [scala.collection.concurrent.TrieMap#getOrElseUpdate()] method with cleanup and init functions. */ implicit class AtomicGetOrElseUpdateOnTrieMap[K, V](val trieMap: TrieMap[K, V]) extends AnyVal { - def atomicGetOrElseUpdate(key: K, op: => V): V = - atomicGetOrElseUpdate(key, op, { _: V => () }, { _: V => () }) - def atomicGetOrElseUpdate(key: K, op: => V, cleanup: V => Unit, init: V => Unit): V = trieMap.get(key) match { case Some(v) => v diff --git a/core/kamon-core/src/main/scala/kamon/trace/AdaptiveSampler.scala b/core/kamon-core/src/main/scala/kamon/trace/AdaptiveSampler.scala index 09eeffcce..feb7e35a7 100644 --- a/core/kamon-core/src/main/scala/kamon/trace/AdaptiveSampler.scala +++ b/core/kamon-core/src/main/scala/kamon/trace/AdaptiveSampler.scala @@ -48,13 +48,13 @@ class AdaptiveSampler extends Sampler { override def decide(operation: Sampler.Operation): SamplingDecision = { val operationName = operation.operationName() - val operationSampler = _samplers.get(operationName).getOrElse { + val operationSampler = _samplers.getOrElse(operationName, { // It might happen that the first time we see an operation under high concurrent throughput we will reach this // block more than once, but worse case effect is that we will rebalance the operation samplers more than once. - val sampler = _samplers.atomicGetOrElseUpdate(operationName, buildOperationSampler(operationName)) + val sampler = _samplers.getOrElseUpdate(operationName, buildOperationSampler(operationName)) rebalance() sampler - } + }) val decision = operationSampler.decide() if(decision == SamplingDecision.Sample) diff --git a/instrumentation/kamon-akka/src/common/scala/kamon/instrumentation/akka/AkkaClusterShardingMetrics.scala b/instrumentation/kamon-akka/src/common/scala/kamon/instrumentation/akka/AkkaClusterShardingMetrics.scala index e6de342c4..c1c2140eb 100644 --- a/instrumentation/kamon-akka/src/common/scala/kamon/instrumentation/akka/AkkaClusterShardingMetrics.scala +++ b/instrumentation/kamon-akka/src/common/scala/kamon/instrumentation/akka/AkkaClusterShardingMetrics.scala @@ -50,10 +50,10 @@ object AkkaClusterShardingMetrics { private val _shardTelemetry = ShardingInstruments.shardTelemetry(system, typeName, shardHostedEntities, shardProcessedMessages) def hostedEntitiesPerShardCounter(shardID: String): AtomicLong = - _shardTelemetry.entitiesPerShard.atomicGetOrElseUpdate(shardID, new AtomicLong()) + _shardTelemetry.entitiesPerShard.getOrElseUpdate(shardID, new AtomicLong()) def processedMessagesPerShardCounter(shardID: String): AtomicLong = - _shardTelemetry.messagesPerShard.atomicGetOrElseUpdate(shardID, new AtomicLong()) + _shardTelemetry.messagesPerShard.getOrElseUpdate(shardID, new AtomicLong()) // We should only remove when the ShardRegion actor is terminated. override def remove(): Unit = { @@ -85,7 +85,7 @@ object AkkaClusterShardingMetrics { private val _shardTelemetryMap = TrieMap.empty[String, ShardTelemetry] private def shardTelemetry(system: String, typeName: String, shardEntities: Histogram, shardMessages: Histogram): ShardTelemetry = { - _shardTelemetryMap.atomicGetOrElseUpdate(shardTelemetryKey(system, typeName), { + _shardTelemetryMap.getOrElseUpdate(shardTelemetryKey(system, typeName), { val entitiesPerShard = TrieMap.empty[String, AtomicLong] val messagesPerShard = TrieMap.empty[String, AtomicLong] val samplingInterval = AkkaRemoteInstrumentation.settings().shardMetricsSampleInterval diff --git a/instrumentation/kamon-akka/src/common/scala/kamon/instrumentation/akka/AkkaMetrics.scala b/instrumentation/kamon-akka/src/common/scala/kamon/instrumentation/akka/AkkaMetrics.scala index c4e5f404b..bc8c8bfe2 100644 --- a/instrumentation/kamon-akka/src/common/scala/kamon/instrumentation/akka/AkkaMetrics.scala +++ b/instrumentation/kamon-akka/src/common/scala/kamon/instrumentation/akka/AkkaMetrics.scala @@ -181,7 +181,7 @@ object AkkaMetrics { ) def forSystem(name: String): ActorSystemInstruments = - _systemInstrumentsCache.atomicGetOrElseUpdate(name, new ActorSystemInstruments(TagSet.of("system", name))) + _systemInstrumentsCache.getOrElseUpdate(name, new ActorSystemInstruments(TagSet.of("system", name))) class ActorSystemInstruments(tags: TagSet) extends InstrumentGroup(tags) { val deadLetters = register(SystemDeadLetters) diff --git a/instrumentation/kamon-akka/src/common/scala/kamon/instrumentation/akka/AkkaRemoteMetrics.scala b/instrumentation/kamon-akka/src/common/scala/kamon/instrumentation/akka/AkkaRemoteMetrics.scala index 7247b1aad..e0ddb3f73 100644 --- a/instrumentation/kamon-akka/src/common/scala/kamon/instrumentation/akka/AkkaRemoteMetrics.scala +++ b/instrumentation/kamon-akka/src/common/scala/kamon/instrumentation/akka/AkkaRemoteMetrics.scala @@ -41,5 +41,5 @@ object AkkaRemoteMetrics { } def serializationInstruments(system: String): SerializationInstruments = - _serializationInstrumentsCache.atomicGetOrElseUpdate(system, new SerializationInstruments(system)) + _serializationInstrumentsCache.getOrElseUpdate(system, new SerializationInstruments(system)) } diff --git a/instrumentation/kamon-pekko/src/main/scala/kamon/instrumentation/pekko/PekkoClusterShardingMetrics.scala b/instrumentation/kamon-pekko/src/main/scala/kamon/instrumentation/pekko/PekkoClusterShardingMetrics.scala index 858dc9b9b..4cbf6b9ec 100644 --- a/instrumentation/kamon-pekko/src/main/scala/kamon/instrumentation/pekko/PekkoClusterShardingMetrics.scala +++ b/instrumentation/kamon-pekko/src/main/scala/kamon/instrumentation/pekko/PekkoClusterShardingMetrics.scala @@ -50,10 +50,10 @@ object PekkoClusterShardingMetrics { private val _shardTelemetry = ShardingInstruments.shardTelemetry(system, typeName, shardHostedEntities, shardProcessedMessages) def hostedEntitiesPerShardCounter(shardID: String): AtomicLong = - _shardTelemetry.entitiesPerShard.atomicGetOrElseUpdate(shardID, new AtomicLong()) + _shardTelemetry.entitiesPerShard.getOrElseUpdate(shardID, new AtomicLong()) def processedMessagesPerShardCounter(shardID: String): AtomicLong = - _shardTelemetry.messagesPerShard.atomicGetOrElseUpdate(shardID, new AtomicLong()) + _shardTelemetry.messagesPerShard.getOrElseUpdate(shardID, new AtomicLong()) // We should only remove when the ShardRegion actor is terminated. override def remove(): Unit = { @@ -85,7 +85,7 @@ object PekkoClusterShardingMetrics { private val _shardTelemetryMap = TrieMap.empty[String, ShardTelemetry] private def shardTelemetry(system: String, typeName: String, shardEntities: Histogram, shardMessages: Histogram): ShardTelemetry = { - _shardTelemetryMap.atomicGetOrElseUpdate(shardTelemetryKey(system, typeName), { + _shardTelemetryMap.getOrElseUpdate(shardTelemetryKey(system, typeName), { val entitiesPerShard = TrieMap.empty[String, AtomicLong] val messagesPerShard = TrieMap.empty[String, AtomicLong] val samplingInterval = PekkoRemoteInstrumentation.settings().shardMetricsSampleInterval diff --git a/instrumentation/kamon-pekko/src/main/scala/kamon/instrumentation/pekko/PekkoMetrics.scala b/instrumentation/kamon-pekko/src/main/scala/kamon/instrumentation/pekko/PekkoMetrics.scala index 69ae24fd2..b50a81ad7 100644 --- a/instrumentation/kamon-pekko/src/main/scala/kamon/instrumentation/pekko/PekkoMetrics.scala +++ b/instrumentation/kamon-pekko/src/main/scala/kamon/instrumentation/pekko/PekkoMetrics.scala @@ -181,7 +181,7 @@ object PekkoMetrics { ) def forSystem(name: String): ActorSystemInstruments = - _systemInstrumentsCache.atomicGetOrElseUpdate(name, new ActorSystemInstruments(TagSet.of("system", name))) + _systemInstrumentsCache.getOrElseUpdate(name, new ActorSystemInstruments(TagSet.of("system", name))) class ActorSystemInstruments(tags: TagSet) extends InstrumentGroup(tags) { val deadLetters = register(SystemDeadLetters) diff --git a/instrumentation/kamon-pekko/src/main/scala/kamon/instrumentation/pekko/PekkoRemoteMetrics.scala b/instrumentation/kamon-pekko/src/main/scala/kamon/instrumentation/pekko/PekkoRemoteMetrics.scala index dca48d5a9..9ea27cdca 100644 --- a/instrumentation/kamon-pekko/src/main/scala/kamon/instrumentation/pekko/PekkoRemoteMetrics.scala +++ b/instrumentation/kamon-pekko/src/main/scala/kamon/instrumentation/pekko/PekkoRemoteMetrics.scala @@ -41,5 +41,5 @@ object PekkoRemoteMetrics { } def serializationInstruments(system: String): SerializationInstruments = - _serializationInstrumentsCache.atomicGetOrElseUpdate(system, new SerializationInstruments(system)) + _serializationInstrumentsCache.getOrElseUpdate(system, new SerializationInstruments(system)) } From 1f2e11eb1ed62e30b157b96f55aa2c4c98c22c89 Mon Sep 17 00:00:00 2001 From: Hugh Simpson Date: Tue, 10 Oct 2023 19:07:06 +0100 Subject: [PATCH 2/2] fix rebase bugs --- .../kamon/instrumentation/akka/AkkaClusterShardingMetrics.scala | 2 +- .../instrumentation/pekko/PekkoClusterShardingMetrics.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/instrumentation/kamon-akka/src/common/scala/kamon/instrumentation/akka/AkkaClusterShardingMetrics.scala b/instrumentation/kamon-akka/src/common/scala/kamon/instrumentation/akka/AkkaClusterShardingMetrics.scala index c1c2140eb..b3c4a5351 100644 --- a/instrumentation/kamon-akka/src/common/scala/kamon/instrumentation/akka/AkkaClusterShardingMetrics.scala +++ b/instrumentation/kamon-akka/src/common/scala/kamon/instrumentation/akka/AkkaClusterShardingMetrics.scala @@ -85,7 +85,7 @@ object AkkaClusterShardingMetrics { private val _shardTelemetryMap = TrieMap.empty[String, ShardTelemetry] private def shardTelemetry(system: String, typeName: String, shardEntities: Histogram, shardMessages: Histogram): ShardTelemetry = { - _shardTelemetryMap.getOrElseUpdate(shardTelemetryKey(system, typeName), { + _shardTelemetryMap.atomicGetOrElseUpdate(shardTelemetryKey(system, typeName), { val entitiesPerShard = TrieMap.empty[String, AtomicLong] val messagesPerShard = TrieMap.empty[String, AtomicLong] val samplingInterval = AkkaRemoteInstrumentation.settings().shardMetricsSampleInterval diff --git a/instrumentation/kamon-pekko/src/main/scala/kamon/instrumentation/pekko/PekkoClusterShardingMetrics.scala b/instrumentation/kamon-pekko/src/main/scala/kamon/instrumentation/pekko/PekkoClusterShardingMetrics.scala index 4cbf6b9ec..804b1e77c 100644 --- a/instrumentation/kamon-pekko/src/main/scala/kamon/instrumentation/pekko/PekkoClusterShardingMetrics.scala +++ b/instrumentation/kamon-pekko/src/main/scala/kamon/instrumentation/pekko/PekkoClusterShardingMetrics.scala @@ -85,7 +85,7 @@ object PekkoClusterShardingMetrics { private val _shardTelemetryMap = TrieMap.empty[String, ShardTelemetry] private def shardTelemetry(system: String, typeName: String, shardEntities: Histogram, shardMessages: Histogram): ShardTelemetry = { - _shardTelemetryMap.getOrElseUpdate(shardTelemetryKey(system, typeName), { + _shardTelemetryMap.atomicGetOrElseUpdate(shardTelemetryKey(system, typeName), { val entitiesPerShard = TrieMap.empty[String, AtomicLong] val messagesPerShard = TrieMap.empty[String, AtomicLong] val samplingInterval = PekkoRemoteInstrumentation.settings().shardMetricsSampleInterval