Skip to content

Commit

Permalink
fix: safe webhook concurrent map access (#2389)
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum committed Sep 6, 2022
1 parent 8ef6c39 commit b72f6be
Showing 1 changed file with 15 additions and 5 deletions.
20 changes: 15 additions & 5 deletions gateway/webhook/webhook.go
Expand Up @@ -52,6 +52,7 @@ type batchWebhookT struct {
}

type HandleT struct {
requestQMu sync.RWMutex
requestQ map[string]chan *webhookT
batchRequestQ chan *batchWebhookT
netClient *retryablehttp.Client
Expand Down Expand Up @@ -179,7 +180,10 @@ func (webhook *HandleT) RequestHandler(w http.ResponseWriter, r *http.Request) {

done := make(chan transformerResponse)
req := webhookT{request: r, writer: w, done: done, sourceType: sourceDefName, writeKey: writeKey}
webhook.requestQ[sourceDefName] <- &req
webhook.requestQMu.RLock()
requestQ := webhook.requestQ[sourceDefName]
requestQ <- &req
webhook.requestQMu.RUnlock()

// Wait for batcher process to be done
resp := <-done
Expand All @@ -206,12 +210,12 @@ func (webhook *HandleT) RequestHandler(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write(payload)
}

func (webhook *HandleT) batchRequests(sourceDef string) {
func (webhook *HandleT) batchRequests(sourceDef string, requestQ chan *webhookT) {
reqBuffer := make([]*webhookT, 0)
timeout := time.After(webhookBatchTimeout)
for {
select {
case req, hasMore := <-webhook.requestQ[sourceDef]:
case req, hasMore := <-requestQ:
if !hasMore {
if len(reqBuffer) > 0 {
// If there are requests in the buffer, send them to the batcher
Expand Down Expand Up @@ -344,24 +348,30 @@ func (webhook *HandleT) enqueueInGateway(req *webhookT, payload []byte) string {
}

func (webhook *HandleT) Register(name string) {
webhook.requestQMu.Lock()
defer webhook.requestQMu.Unlock()
if _, ok := webhook.requestQ[name]; !ok {
webhook.requestQ[name] = make(chan *webhookT)
requestQ := make(chan *webhookT)
webhook.requestQ[name] = requestQ

webhook.batchRequestsWg.Add(1)
go (func() {
defer webhook.batchRequestsWg.Done()
webhook.batchRequests(name)
webhook.batchRequests(name, requestQ)
})()
}
}

func (webhook *HandleT) Shutdown() error {
webhook.backgroundCancel()
webhook.requestQMu.Lock()
defer webhook.requestQMu.Unlock()
for _, q := range webhook.requestQ {
close(q)
}
webhook.batchRequestsWg.Wait()
close(webhook.batchRequestQ)
webhook.requestQ = make(map[string](chan *webhookT))

return webhook.backgroundWait()
}
Expand Down

0 comments on commit b72f6be

Please sign in to comment.