Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pkg/queue] Provide factory for consumers #2685

Closed
mx-psi opened this issue Dec 10, 2020 · 14 comments · Fixed by #2714
Closed

[pkg/queue] Provide factory for consumers #2685

mx-psi opened this issue Dec 10, 2020 · 14 comments · Fixed by #2714

Comments

@mx-psi
Copy link
Contributor

mx-psi commented Dec 10, 2020

Requirement - what kind of business use case are you trying to solve?

As stated in open-telemetry/opentelemetry-collector#2254 for performance reasons we want to be able to keep state for each consumer of a bounded queue so that we can reuse a certain struct.

Problem - what in Jaeger blocks you from solving the requirement?

It's not currently possible to define a state for each consumer of a bounded queue: the public interface for the bounded queue only allows for providing a function and every consumer runs that function, so you can only keep a global state for all consumers.

func (q *BoundedQueue) StartConsumers(num int, consumer func(item interface{})) {

Proposal - what do you suggest to solve the problem or improve the existing situation?

A possible solution would work like this.

  1. Define an interface like
type Consumer interface {
  Consume(item interface{})
}
  1. Provide a new way to start a bounded queue with signature
func (q *BoundedQueue) StartConsumersWithFactory(num int, factory func() Consumer)

Reimplement consumer logic internally so that each consumer is built using the factory.

  1. Rewrite StartConsumers using this factory
type statelessConsumer struct{
  consumefn func(item interface{}) 
}

func (c *statelessConsumer) Consume(item interface{}){
  c.consumefn(item)
}

func (q *BoundedQueue) StartConsumers(num int, consumer func(item interface{})) { 
  consumer := &statelessConsumer{consumer}
  q.StartConsumersWithFactory(num, func(){
    return consumer 
  })
}

Any open questions to address

I think this would keep backwards-compatible behavior with minimal/no impact to performance but I am not a Go expert so that should be double-checked.

@mx-psi mx-psi changed the title [pkg/queue] [pkg/queue] Provide factory for consumers Dec 10, 2020
@jpkrohling
Copy link
Contributor

Can you give some concrete examples of what you need to accomplish? It's a bit too abstract for me to understand if this would be suitable to other consumers of this package.

@mx-psi
Copy link
Contributor Author

mx-psi commented Dec 10, 2020

Can you give some concrete examples of what you need to accomplish?

Sure! I can try to explain our current use case if that helps.

When we process traces in the Datadog exporter we use a struct called SublayerCalculator to get some information about each trace. This struct has a ComputeSublayers method that resets the struct (fills a bunch of bytes slices with zeros) and then computes some info about traces. We need to do this computation for every trace, and the struct is somewhat costly to create (so we can't just allocate a new one for each batch of traces).

Right now we use one SublayerCalculator (behind a mutex) for the whole exporter, so, whenever any consumer wants to compute the sublayers, it has to lock the mutex. The mutex contention reduces throughput of traces under high load conditions.

What we would want is for each consumer to have its own SublayerCalculator. That way each consumer could function independently and we could increase throughput under high load.

Other consumers of this package might want to do this for similar reasons: keeping some state per-consumer to avoid allocations or using a mutex.


edit: To be even more concrete, in our case we would have something like this (after going through all the layers in the exporterhelper)

type Consumer struct{
   calculator *SublayerCalculator
   exp *tracesExporter
}

func (c *Consumer) Consume(item interface{}){ // here item would be a concrete type like pdata.Trace
   c.exp.Consume(calculator, item)
}

func NewConsumerFactory(exp *tracesExporter) func() Consumer{
  return func(){
    return Consumer{
       calculator: NewSublayerCalculator(),
       exp: exp,
    } 
  }
}

@yurishkuro
Copy link
Member

I think I understand what you want to do wrt to the queue itself, but why do you need to make changes in Jaeger?

@mx-psi
Copy link
Contributor Author

mx-psi commented Dec 10, 2020

@yurishkuro This package is used in the OpenTelemetry project (in particular in the OpenTelemetry Collector). I am sorry I did not give more context in my second message, @jpkrohling also contributes to this project so I skipped over some details.

@yurishkuro
Copy link
Member

To clarify, my concern is that this creates a twice-removed dependency on an implementation detail in Jaeger. OTEL collector is once-removed, and from the ticket they don't need this enhancement, so it's only DD exporter which needs it, and as I understand it's not a core component in OTEL Collector, thus twice-removed. The /pkg does not extend any stability guarantees outside of Jaeger projects, I'm even surprised that OTEL is depending on it explicitly.

I think your proposal is reasonable and fairly minimal, I don't have strong objections to extending the queue this way. It will need unit tests with comments about external dependency, which will help a bit with stability against refactoring.

@mx-psi
Copy link
Contributor Author

mx-psi commented Dec 10, 2020

The /pkg does not extend any stability guarantees outside of Jaeger projects, I'm even surprised that OTEL is depending on it explicitly.

Ah, I wasn't aware about that, thanks for the clarification. I assumed since OTEL was using it it was fine to use outside of Jaeger. If you are okay with keeping the dependency, I would be willing to open a PR for this with the appropriate testing and comments. Once released our intent would be then to use this in the OTEl Collector directly if a change is accepted there and otherwise use it only in the Datadog exporter.

@yurishkuro
Copy link
Member

@jaegertracing/jaeger-maintainers PTAL

@joe-elliott
Copy link
Member

joe-elliott commented Dec 10, 2020

I generally think it's fine as long as @mx-psi is willing to take on the risk of depending on this package. In practice it probably won't change much, but we know how that goes. Perhaps it would be easier to fork the BoundedQueue if there will be more changes to come? It's not a ton of code and if you need a lot of specialized behavior that could be easier in the long run.

This particular change looks like it can be done in a safe way that has minimal impact to the Jaeger codebase so I think it's acceptable.

@objectiser
Copy link
Contributor

One thing we need to consider is the changes that will occur for jaeger v2 - so better if it wasn't dependent upon this pkg.

@mx-psi
Copy link
Contributor Author

mx-psi commented Dec 11, 2020

I generally think it's fine as long as @mx-psi is willing to take on the risk of depending on this package. In practice it probably won't change much, but we know how that goes. Perhaps it would be easier to fork the BoundedQueue if there will be more changes to come? It's not a ton of code and if you need a lot of specialized behavior that could be easier in the long run.

I think we are fine with that, we don't plan on making further changes and if the Jaeger project needs to make significant breaking changes on this package we can always fork later.

One thing we need to consider is the changes that will occur for jaeger v2 - so better if it wasn't dependent upon this pkg.

@objectiser note that this package is already a dependency of the OpenTelemetry Collector (see here). If that is a concern, maybe you should open an issue on the OTEL repo to remove the dependency or at least make maintainers aware of the lack of stability.

@jpkrohling
Copy link
Contributor

IMO, if we have something as part of pkg, Go conventions "dictate" that others can consume and expect semver compatibility. I wouldn't have added a dependency just because of this queue, but it's out there already.

For v2, we should place things like this in an internal package, preventing go mod from adding such dependency.

@yurishkuro
Copy link
Member

The main reason we have avoided using internal packages is that Uber's internal builds are cherry-picking quite a lot of different components, and I definitely ran into issues trying to move some stuff under internal. So we kind of opened that can of worms.

@mx-psi
Copy link
Contributor Author

mx-psi commented Dec 29, 2020

Hi, I am not sure what the outcome of this discussion is in the end. Are Jaeger maintainers okay with a PR for this issue? Should the OpenTelemetry Collector fork the queue package and I can discuss the changes there?

@yurishkuro
Copy link
Member

Feel free to post a PR, I don't think it makes things worse given that we already have the dependency.

mx-psi added a commit to mx-psi/jaeger that referenced this issue Jan 5, 2021
This provides a way to keep state for each consumer of a bounded queue,
which is useful in certain performance-critical setups. Fixes jaegertracing#2685.

Signed-off-by: Pablo Baeyens <pablo.baeyens@datadoghq.com>
yurishkuro pushed a commit that referenced this issue Jan 7, 2021
* [pkg/queue] Add `StartConsumersWithFactory` function

This provides a way to keep state for each consumer of a bounded queue,
which is useful in certain performance-critical setups. Fixes #2685.

Signed-off-by: Pablo Baeyens <pablo.baeyens@datadoghq.com>

* Refactor bounded queue tests to extract common parts
The common part with the assertions was moved to a new `checkQueue`
function that is used by `TestBoundedQueue` and `TestBoundedQueueWithFactory`.

Signed-off-by: Pablo Baeyens <pablo.baeyens@datadoghq.com>

* Use http.HandlerFunc pattern for getting a Consumer from a callback

Signed-off-by: Pablo Baeyens <pablo.baeyens@datadoghq.com>

* Address review comment about unit tests

Refactor unit tests using a `helper` function that takes a function
to start consumers.

Signed-off-by: Pablo Baeyens <pablo.baeyens@datadoghq.com>
bhiravabhatla pushed a commit to bhiravabhatla/jaeger that referenced this issue Jan 25, 2021
)

* [pkg/queue] Add `StartConsumersWithFactory` function

This provides a way to keep state for each consumer of a bounded queue,
which is useful in certain performance-critical setups. Fixes jaegertracing#2685.

Signed-off-by: Pablo Baeyens <pablo.baeyens@datadoghq.com>

* Refactor bounded queue tests to extract common parts
The common part with the assertions was moved to a new `checkQueue`
function that is used by `TestBoundedQueue` and `TestBoundedQueueWithFactory`.

Signed-off-by: Pablo Baeyens <pablo.baeyens@datadoghq.com>

* Use http.HandlerFunc pattern for getting a Consumer from a callback

Signed-off-by: Pablo Baeyens <pablo.baeyens@datadoghq.com>

* Address review comment about unit tests

Refactor unit tests using a `helper` function that takes a function
to start consumers.

Signed-off-by: Pablo Baeyens <pablo.baeyens@datadoghq.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants