Skip to content

Configurable max queue size #161

@wadejensen

Description

@wadejensen

It would be really great if analytics-java supported the ability to configure the maximum number of inflight messages. The current queues within analytics-java are unbounded. This would help prevent running out of memory due to a high volume of messages if the Segment API or our proxy to it became unavailable.

At our company we send large volumes of messages to AWS Kinesis Firehose from our backends. I would like to route them through Segment instead, however I'm not comfortable firing that volume of events through this client on all our backends without a method of circuit-breaking or dropping messages in the case of a downstream outage.

It's already technically possible to configure the client's networkExecutor to drop batches of messages beyond some threshold (example to follow in a comment). However that configuration code is well, a lot... And it seems less than ideal to drop entire batches in that scenario.

I took a run at writing something myself, but quickly realised that because of the three threads in play: the looperExecutor, networkExecutor, and flushScheduler, its not as simple as placing a limit on the capacity of the initial messageQueue, since the looper is busy-waiting to pull messages off, batch them, and submit to the networkExecutor.

I see a few ways to support this feature, however when I played around with this, none of them were a surgical, light touch PR, and testing wasn't smooth, so I wanted to get a sense of the interest in accepting a PR for this feature and what approach would be preferred.

Possible approaches:

  • Smallest change. Allowing a limit on the depth of messageQueue and use a SynchronousQueue for the networkExecutor. However this would effectively block the looperExecutor on submission to the networkExecutor and arguably defeat the purpose of a separate thread, and allowing the configuration of a networkExecutor.
  • Allowing a limit on the depth of messageQueue and also using BlockingQueue with a very small limit for the networkExecutor.
  • Allowing a limit on the depth of messageQueue and also calling SegmentService:upload directly from looperExecutor and blocking the busy-waiting loop, ie. merge the looperExecutor and networkExecutor.

Everything I'm proposing here seems to be fighting against a separate looper and networkExecutor thread. I'm keen to understand the motivation for the current design so I could operate better within the spirit of it. Given the default networkExecutor instance is single-threaded, I'm curious how many people use a larger thread pool which would take advantage of this decoupling.

Apologies for opening what is essentially a duplicate to #147. However that issue has been inactive for a long time and proposes a PR that doesn't entirely resolve the issue.

Please let me know what changes you'd be open to 🙂

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions