Skip to content

Commit

Permalink
fix(hooks): avoid to send same gerrit event twice or more (#4881)
Browse files Browse the repository at this point in the history
  • Loading branch information
sguiheux authored and richardlt committed Jan 13, 2020
1 parent 1eb063f commit bd70519
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 5 deletions.
44 changes: 41 additions & 3 deletions engine/hooks/gerrit.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package hooks
import (
"bufio"
"context"
"crypto/md5"
"encoding/hex"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -227,7 +229,7 @@ func (s *Service) ComputeGerritStreamEvent(ctx context.Context, vcsServer string
}

// ListenGerritStreamEvent listen the gerrit event stream
func ListenGerritStreamEvent(ctx context.Context, v sdk.VCSConfiguration, gerritEventChan chan<- GerritEvent) {
func ListenGerritStreamEvent(ctx context.Context, store cache.Store, v sdk.VCSConfiguration, gerritEventChan chan<- GerritEvent) {
signer, err := ssh.ParsePrivateKey([]byte(v.Password))
if err != nil {
log.Error(ctx, "unable to read ssh key: %v", err)
Expand Down Expand Up @@ -272,6 +274,7 @@ func ListenGerritStreamEvent(ctx context.Context, v sdk.VCSConfiguration, gerrit
}
}()

lockKey := cache.Key("gerrit", "event", "lock")
tick := time.NewTicker(50 * time.Millisecond)
for {
select {
Expand All @@ -291,11 +294,46 @@ func ListenGerritStreamEvent(ctx context.Context, v sdk.VCSConfiguration, gerrit
continue
}
var event GerritEvent
if err := json.Unmarshal([]byte(line), &event); err != nil {
lineBytes := []byte(line)
if err := json.Unmarshal(lineBytes, &event); err != nil {
log.Error(ctx, "unable to read gerrit event %v: %s", err, line)
continue
}
gerritEventChan <- event

// Avoid that 2 hook uservice dispatch the same event
// Take the lock to dispatch an event
_, _ = store.Lock(lockKey, time.Minute, 100, 100)

// compute md5
hasher := md5.New()
hasher.Write(lineBytes) // nolint
md5 := hex.EncodeToString(hasher.Sum(nil))

// check if this event has already been dispatched
k := cache.Key("gerrit", "event", "id", md5)
var existString string
b, _ := store.Get(k, &existString)
if !b {
// Dispatch event
gerritEventChan <- event
_ = store.SetWithTTL(k, md5, 300)
}
cpt := 0
for {
if err := store.Unlock(lockKey); err == nil {
break
}
if cpt > 100 {
break
}
log.Warning(ctx, "gerrit> Cannot remove event lock. Retry in 100ms")
time.Sleep(100 * time.Millisecond)
cpt++
}
if cpt > 100 {
log.Error(ctx, "gerrit> Event lock cannot be removed.")
}
// release lock
}
}

Expand Down
3 changes: 1 addition & 2 deletions engine/hooks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,11 @@ func (s *Service) synchronizeTasks(ctx context.Context) error {
}

func (s *Service) initGerritStreamEvent(ctx context.Context, vcsName string, vcsConfig map[string]sdk.VCSConfiguration) {

// Create channel to store gerrit event
gerritEventChan := make(chan GerritEvent, 20)
// Listen to gerrit event stream
sdk.GoRoutine(ctx, "gerrit.EventStream."+vcsName, func(ctx context.Context) {
ListenGerritStreamEvent(ctx, vcsConfig[vcsName], gerritEventChan)
ListenGerritStreamEvent(ctx, s.Cache, vcsConfig[vcsName], gerritEventChan)
})
// Listen to gerrit event stream
sdk.GoRoutine(ctx, "gerrit.EventStreamCompute."+vcsName, func(ctx context.Context) {
Expand Down

0 comments on commit bd70519

Please sign in to comment.