Skip to content

Commit

Permalink
aggr: ensure publisher future completes (Netflix-Skunkworks#400)
Browse files Browse the repository at this point in the history
Make the publisher queue size configurable. Adjust future
to ensure it will always be completed even if the stream
gets restarted. Now it will be completed when returned and
treated as successful if it was enqueued. This avoids
problems with the upstream hanging if the stream is restarted
and some of the returned futures are never completed.
  • Loading branch information
brharrington authored and manolama committed Oct 25, 2023
1 parent 406d36d commit 07c7251
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 6 deletions.
4 changes: 4 additions & 0 deletions atlas-aggregator/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ atlas.aggregator {
}
]

publisher {
queue-size = 10000
}

// Determines whether or not to include the aggregator node as a tag on counters.
// If false it will use atlas.dstype=sum instead.
include-aggr-tag = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import com.netflix.spectator.impl.Cache
import com.typesafe.config.Config

class AggrConfig(
config: Config,
val config: Config,
registry: Registry,
system: ActorSystem
) extends AtlasConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ class AkkaPublisher(registry: Registry, config: AggrConfig, implicit val system:
private val ec: ExecutionContext =
ThreadPools.fixedSize(registry, "PublishEncoding", encodingParallelism)

private val pubConfig = config.config.getConfig("atlas.aggregator.publisher")
private val queueSize = pubConfig.getInt("queue-size")

private val atlasClient = createClient("publisher-atlas")
private val lwcClient = createClient("publisher-lwc")

Expand All @@ -96,7 +99,7 @@ class AkkaPublisher(registry: Registry, config: AggrConfig, implicit val system:
flow
}
StreamOps
.blockingQueue[RequestTuple](config.debugRegistry(), name, 1000)
.blockingQueue[RequestTuple](config.debugRegistry(), name, queueSize)
.via(restartFlow)
.toMat(Sink.ignore)(Keep.left)
.run()
Expand All @@ -106,14 +109,18 @@ class AkkaPublisher(registry: Registry, config: AggrConfig, implicit val system:

override def publish(payload: PublishPayload): CompletableFuture[Void] = {
val t = new RequestTuple(atlasUri, "publisher-atlas", payload)
atlasClient.offer(t)
t.future
if (atlasClient.offer(t))
CompletableFuture.completedFuture(VoidInstance)
else
CompletableFuture.failedFuture(new IllegalStateException("failed to enqueue atlas request"))
}

override def publish(payload: EvalPayload): CompletableFuture[Void] = {
val t = new RequestTuple(evalUri, "publisher-lwc", payload)
lwcClient.offer(t)
t.future
if (lwcClient.offer(t))
CompletableFuture.completedFuture(VoidInstance)
else
CompletableFuture.failedFuture(new IllegalStateException("failed to enqueue lwc request"))
}

override def close(): Unit = {
Expand Down

0 comments on commit 07c7251

Please sign in to comment.