From 07c7251e1ffb72d78843e2b18f975b84fd1a6a16 Mon Sep 17 00:00:00 2001 From: brharrington Date: Mon, 6 Mar 2023 20:56:03 -0600 Subject: [PATCH] aggr: ensure publisher future completes (#400) Make the publisher queue size configurable. Adjust future to ensure it will always be completed even if the stream gets restarted. Now it will be completed when returned and treated as successful if it was enqueued. This avoids problems with the upstream hanging if the stream is restarted and some of the returned futures are never completed. --- .../src/main/resources/application.conf | 4 ++++ .../netflix/atlas/aggregator/AggrConfig.scala | 2 +- .../atlas/aggregator/AkkaPublisher.scala | 17 ++++++++++++----- 3 files changed, 17 insertions(+), 6 deletions(-) diff --git a/atlas-aggregator/src/main/resources/application.conf b/atlas-aggregator/src/main/resources/application.conf index f52d7434..b01788c8 100644 --- a/atlas-aggregator/src/main/resources/application.conf +++ b/atlas-aggregator/src/main/resources/application.conf @@ -22,6 +22,10 @@ atlas.aggregator { } ] + publisher { + queue-size = 10000 + } + // Determines whether or not to include the aggregator node as a tag on counters. // If false it will use atlas.dstype=sum instead. include-aggr-tag = false diff --git a/atlas-aggregator/src/main/scala/com/netflix/atlas/aggregator/AggrConfig.scala b/atlas-aggregator/src/main/scala/com/netflix/atlas/aggregator/AggrConfig.scala index 5e3faa62..81fcb397 100644 --- a/atlas-aggregator/src/main/scala/com/netflix/atlas/aggregator/AggrConfig.scala +++ b/atlas-aggregator/src/main/scala/com/netflix/atlas/aggregator/AggrConfig.scala @@ -28,7 +28,7 @@ import com.netflix.spectator.impl.Cache import com.typesafe.config.Config class AggrConfig( - config: Config, + val config: Config, registry: Registry, system: ActorSystem ) extends AtlasConfig diff --git a/atlas-aggregator/src/main/scala/com/netflix/atlas/aggregator/AkkaPublisher.scala b/atlas-aggregator/src/main/scala/com/netflix/atlas/aggregator/AkkaPublisher.scala index dbe055c8..32d24cc2 100644 --- a/atlas-aggregator/src/main/scala/com/netflix/atlas/aggregator/AkkaPublisher.scala +++ b/atlas-aggregator/src/main/scala/com/netflix/atlas/aggregator/AkkaPublisher.scala @@ -76,6 +76,9 @@ class AkkaPublisher(registry: Registry, config: AggrConfig, implicit val system: private val ec: ExecutionContext = ThreadPools.fixedSize(registry, "PublishEncoding", encodingParallelism) + private val pubConfig = config.config.getConfig("atlas.aggregator.publisher") + private val queueSize = pubConfig.getInt("queue-size") + private val atlasClient = createClient("publisher-atlas") private val lwcClient = createClient("publisher-lwc") @@ -96,7 +99,7 @@ class AkkaPublisher(registry: Registry, config: AggrConfig, implicit val system: flow } StreamOps - .blockingQueue[RequestTuple](config.debugRegistry(), name, 1000) + .blockingQueue[RequestTuple](config.debugRegistry(), name, queueSize) .via(restartFlow) .toMat(Sink.ignore)(Keep.left) .run() @@ -106,14 +109,18 @@ class AkkaPublisher(registry: Registry, config: AggrConfig, implicit val system: override def publish(payload: PublishPayload): CompletableFuture[Void] = { val t = new RequestTuple(atlasUri, "publisher-atlas", payload) - atlasClient.offer(t) - t.future + if (atlasClient.offer(t)) + CompletableFuture.completedFuture(VoidInstance) + else + CompletableFuture.failedFuture(new IllegalStateException("failed to enqueue atlas request")) } override def publish(payload: EvalPayload): CompletableFuture[Void] = { val t = new RequestTuple(evalUri, "publisher-lwc", payload) - lwcClient.offer(t) - t.future + if (lwcClient.offer(t)) + CompletableFuture.completedFuture(VoidInstance) + else + CompletableFuture.failedFuture(new IllegalStateException("failed to enqueue lwc request")) } override def close(): Unit = {