From 8a1748f7b584f9e0ed0a4d8ddd5e77bdae953034 Mon Sep 17 00:00:00 2001 From: "ollie.j" Date: Mon, 13 May 2024 12:12:06 +0900 Subject: [PATCH] datasync: Robust kafkat_test --- datasync/chaindatafetcher/kafka/kafka_test.go | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/datasync/chaindatafetcher/kafka/kafka_test.go b/datasync/chaindatafetcher/kafka/kafka_test.go index be94863142..cd2d46881d 100644 --- a/datasync/chaindatafetcher/kafka/kafka_test.go +++ b/datasync/chaindatafetcher/kafka/kafka_test.go @@ -176,10 +176,15 @@ func (s *KafkaSuite) TestKafka_CreateAndDeleteTopic() { // deleted a topic successfully s.Nil(s.kfk.DeleteTopic(s.topic)) - topics, err := s.kfk.ListTopics() - if _, exist := topics[s.topic]; exist { - s.Fail("topic must not exist") + for i := 0; i < 10; i++ { + topics, err := s.kfk.ListTopics() + s.NoError(err) + if _, exist := topics[s.topic]; !exist { + return // success + } + time.Sleep(time.Second) } + s.Fail("topic must not exist") } type kafkaData struct { @@ -466,7 +471,7 @@ func (s *KafkaSuite) TestKafka_PubSubWithSegements_BufferOverflow() { }() // checkout the returned error is buffer overflow error - timeout := time.NewTimer(3 * time.Second) + timeout := time.NewTimer(5 * time.Second) select { case <-timeout.C: s.Fail("timeout") @@ -501,7 +506,7 @@ func (s *KafkaSuite) TestKafka_PubSubWithSegments_ErrCallBack() { }() // checkout the returned error is callback error - timeout := time.NewTimer(3 * time.Second) + timeout := time.NewTimer(5 * time.Second) select { case <-timeout.C: s.Fail("timeout") @@ -543,7 +548,7 @@ func (s *KafkaSuite) TestKafka_PubSubWithSegments_MessageTimeout() { }() // checkout the returned error is callback error - timeout := time.NewTimer(3 * time.Second) + timeout := time.NewTimer(5 * time.Second) select { case <-timeout.C: s.Fail("timeout") @@ -580,6 +585,5 @@ func (s *KafkaSuite) TestKafka_Consumer_AddTopicAndHandler_Error() { } func TestKafkaSuite(t *testing.T) { - // TODO: revive after CircleCI image fix - // suite.Run(t, new(KafkaSuite)) + suite.Run(t, new(KafkaSuite)) }