Skip to content

Commit

Permalink
Добавил логирование таймаута обработки событий в OutboxConsumers
Browse files Browse the repository at this point in the history
  • Loading branch information
klwxsrx committed Apr 6, 2024
1 parent 23b846d commit 66c997f
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 9 deletions.
2 changes: 1 addition & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ SQL_CONNECTION_TIMEOUT=5m
PULSAR_ADDRESS=127.0.0.1:6650
PULSAR_CONNECTION_TIMEOUT=10m

GOOSE_SERVICE_URL="http://goose-service/"
GOOSE_SERVICE_URL=http://goose-service/
2 changes: 1 addition & 1 deletion internal/duck/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

var ErrDuckNotFound = errors.New("duck not found")

type API interface { // TODO: change duck to another domain with usage: auth, http-client, producing messages, consuming messages
type API interface { // TODO: change duck to another domain with usage: auth, http-client, producing events and tasks, consuming messages
Create(ctx context.Context, name string) (uuid.UUID, error)
SetActive(ctx context.Context, id uuid.UUID, isActive bool) error
}
20 changes: 13 additions & 7 deletions pkg/message/outboxconsumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ import (
)

const (
consumingMessagesBatchSize = 100
consumingMessagesBatchSize = 100
messageIDLogField = "messageID"

defaultConsumerProcessingInterval = time.Second
defaultMessageConsumingTimeout = time.Minute
)
Expand Down Expand Up @@ -206,11 +208,11 @@ func (c *outboxConsumer) Ack(msg *ConsumerMessage) {
Log(msg.Context, c.loggerErrorLevel, "failed to delete acked message")
}

c.releaseConsumingMessage(msg.Message.ID)
c.releaseConsumingMessage(msg.Context, msg.Message.ID, "")
}

func (c *outboxConsumer) Nack(msg *ConsumerMessage) {
c.releaseConsumingMessage(msg.Message.ID)
c.releaseConsumingMessage(msg.Context, msg.Message.ID, "")

c.metrics.
WithLabel("topic", msg.Message.Topic).
Expand Down Expand Up @@ -295,11 +297,11 @@ func (c *outboxConsumer) processOutboxMessagesImpl(ctx context.Context) (allProc

select {
case c.consumerCh <- &ConsumerMessage{Context: ctx, Message: msg}:
c.logger.WithField("messageID", msg.ID).Log(ctx, c.loggerInfoLevel, "outbox message sent to handler")
c.logger.WithField(messageIDLogField, msg.ID).Log(ctx, c.loggerInfoLevel, "outbox message sent to handler")
c.addConsumingMessage(msg.ID)
go func() { // TODO: test message timeout
go func() {
<-time.After(c.msgConsumingTimeout)
c.releaseConsumingMessage(msg.ID)
c.releaseConsumingMessage(ctx, msg.ID, "outbox message got handler timeout")
}()
case <-ctx.Done():
return true, nil
Expand All @@ -316,10 +318,14 @@ func (c *outboxConsumer) addConsumingMessage(msgID uuid.UUID) {
c.consumingMessages[msgID] = struct{}{}
}

func (c *outboxConsumer) releaseConsumingMessage(msgID uuid.UUID) {
func (c *outboxConsumer) releaseConsumingMessage(ctx context.Context, msgID uuid.UUID, logMsg string) {
c.consumingCond.L.Lock()
defer c.consumingCond.L.Unlock()

if _, ok := c.consumingMessages[msgID]; ok && logMsg != "" {
c.logger.WithField(messageIDLogField, msgID).Log(ctx, c.loggerInfoLevel, logMsg)
}

delete(c.consumingMessages, msgID)
if len(c.consumingMessages) == 0 {
c.consumingCond.Signal()
Expand Down

0 comments on commit 66c997f

Please sign in to comment.