Skip to content

Commit

Permalink
otelsarama: Fix WrapAsyncProducer race condition (#881)
Browse files Browse the repository at this point in the history
* otelsarama: Fix WrapAsyncProducer race condition

* Update producer.go
  • Loading branch information
pellared committed Jul 15, 2021
1 parent cfed0ec commit 4c90cea
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 4 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
### Fixed

- Fix deadlocks and race conditions in `otelsarama.WrapAsyncProducer`.
The `messaging.message_id` and `messaging.kafka.partition` attributes are now not set if a message was not processed. (#754) (#755)
The `messaging.message_id` and `messaging.kafka.partition` attributes are now not set if a message was not processed. (#754) (#755) (#881)
- Fix `otelsarama.WrapAsyncProducer` so that the messages from the `Errors` channel contain the original `Metadata`. (#754)

## [0.21.0] - 2021-06-18
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,8 @@ func WrapAsyncProducer(saramaConfig *sarama.Config, p sarama.AsyncProducer, opts
span: span,
}

// Specific metadata with span id
// Remember metadata using span ID as a cache key
msg.Metadata = span.SpanContext().SpanID()

p.Input() <- msg
if saramaConfig.Producer.Return.Successes {
mtx.Lock()
producerMessageContexts[msg.Metadata] = mc
Expand All @@ -182,6 +180,8 @@ func WrapAsyncProducer(saramaConfig *sarama.Config, p sarama.AsyncProducer, opts
// be done.
mc.span.End()
}

p.Input() <- msg
}
}
}()
Expand Down

0 comments on commit 4c90cea

Please sign in to comment.