Skip to content

Commit

Permalink
fix (api): code review #678
Browse files Browse the repository at this point in the history
  • Loading branch information
fsamin committed Jun 6, 2017
1 parent 04df761 commit 81ba2f2
Showing 1 changed file with 7 additions and 5 deletions.
12 changes: 7 additions & 5 deletions engine/api/cache/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,26 +186,27 @@ func (s *RedisStore) DequeueWithContext(c context.Context, queueName string, val
}

elemChan := make(chan string)
var elemChanClosed bool
var once sync.Once
go func() {
ticker := time.NewTicker(200 * time.Millisecond).C
ticker := time.NewTicker(250 * time.Millisecond).C
for {
select {
case <-ticker:
res, err := s.Client.RPop(queueName).Result()
res, err := s.Client.BRPop(200*time.Millisecond, queueName).Result()
if err == redis.Nil {
continue
}
if err == io.EOF {
time.Sleep(1 * time.Second)
continue
}
if len(res) != 2 {
continue
if err == nil && len(res) == 2 && !elemChanClosed {
elemChan <- res[1]
}
elemChan <- res
case <-c.Done():
once.Do(func() {
elemChanClosed = true
close(elemChan)
})
return
Expand All @@ -219,6 +220,7 @@ func (s *RedisStore) DequeueWithContext(c context.Context, queueName string, val
json.Unmarshal(b, value)
}
once.Do(func() {
elemChanClosed = true
close(elemChan)
})
}

0 comments on commit 81ba2f2

Please sign in to comment.