diff --git a/engine/api/cache/redis.go b/engine/api/cache/redis.go index 505934d985..12c7ece09c 100644 --- a/engine/api/cache/redis.go +++ b/engine/api/cache/redis.go @@ -186,13 +186,14 @@ func (s *RedisStore) DequeueWithContext(c context.Context, queueName string, val } elemChan := make(chan string) + var elemChanClosed bool var once sync.Once go func() { - ticker := time.NewTicker(200 * time.Millisecond).C + ticker := time.NewTicker(250 * time.Millisecond).C for { select { case <-ticker: - res, err := s.Client.RPop(queueName).Result() + res, err := s.Client.BRPop(200*time.Millisecond, queueName).Result() if err == redis.Nil { continue } @@ -200,12 +201,12 @@ func (s *RedisStore) DequeueWithContext(c context.Context, queueName string, val time.Sleep(1 * time.Second) continue } - if len(res) != 2 { - continue + if err == nil && len(res) == 2 && !elemChanClosed { + elemChan <- res[1] } - elemChan <- res case <-c.Done(): once.Do(func() { + elemChanClosed = true close(elemChan) }) return @@ -219,6 +220,7 @@ func (s *RedisStore) DequeueWithContext(c context.Context, queueName string, val json.Unmarshal(b, value) } once.Do(func() { + elemChanClosed = true close(elemChan) }) }