Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[watermill-kafka] Stress test fails in CI #175

Open
roblaszczak opened this issue Jan 28, 2020 · 1 comment
Open

[watermill-kafka] Stress test fails in CI #175

roblaszczak opened this issue Jan 28, 2020 · 1 comment
Labels

Comments

@roblaszczak
Copy link
Member

Because of too big load, Watermill Kafka tests are failing in CI. Locally the problem also occurs, until I will not set ulimit.

They are probably multiple solutions:

  • find a way to set ulimit in CI (I tried to do it in docker-compose config, but didn't help)
  • find, if we can fix something in the code in order to limit the load
  • reduce the number of parallel tests (not preferred)

Example failed build: https://circleci.com/gh/ThreeDotsLabs/watermill-kafka/235?utm_campaign=vcs-integration-link&utm_medium=referral&utm_source=github-build-link

@codingDr
Copy link

codingDr commented Nov 21, 2022

@roblaszczak @m110

I believe I have fixed the issue. I made some modifications and ran the stress tests locally, which passed. I would fork and submit a PR, but I have already forked for my own confluent-kafka-go implementation, so I will just post the code snippet here for you or someone else with permissions to the repo.

func (h consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {

	logFields := h.messageLogFields.Copy().Add(watermill.LogFields{
		"kafka_partition":      claim.Partition(),
		"kafka_initial_offset": claim.InitialOffset(),
	})

	for kafkaMsg := range claim.Messages() {
		h.logger.Debug("Message claimed", logFields)
		if err := h.messageHandler.processMessage(h.ctx, kafkaMsg, sess, logFields); err != nil {
			return err
		}
		select {
		case <-h.closing:
			h.logger.Debug("Subscriber is closing, stopping consumerGroupHandler", logFields)
			return nil

		case <-h.ctx.Done():
			h.logger.Debug("Ctx was cancelled, stopping consumerGroupHandler", logFields)
			return nil
		default:
			continue
		}
	}

	return nil
}

Hope this helps!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants