Skip to content

Commit

Permalink
fix: fifo() and fifoWithBody() with canceled requests
Browse files Browse the repository at this point in the history
Both filters did not check for canceled context from request before semaphore.Acquire, see golang/go#63615

Signed-off-by: Sandor Szücs <sandor.szuecs@zalando.de>
  • Loading branch information
szuecs committed Oct 19, 2023
1 parent 4028656 commit f8abdfe
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 69 deletions.
3 changes: 0 additions & 3 deletions filters/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ const (

// BackendRatelimit is the key used in the state bag to configure backend ratelimit in proxy
BackendRatelimit = "backend:ratelimit"

// FifoWithBody
FifoWithBody = "fifo:body:func"
)

// FilterContext object providing state and information that is unique to a request.
Expand Down
49 changes: 13 additions & 36 deletions filters/scheduler/fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,48 +11,31 @@ import (
"github.com/zalando/skipper/scheduler"
)

type fifoType int

const (
fifo fifoType = iota + 1
fifoWithBody
)

func (t fifoType) String() string {
switch t {
case fifo:
return filters.FifoName
case fifoWithBody:
return filters.FifoWithBodyName
}
return "unknown"
}

type (
fifoSpec struct {
typ fifoType
typ string
}
fifoFilter struct {
config scheduler.Config
queue *scheduler.FifoQueue
typ fifoType
typ string
}
)

func NewFifo() filters.Spec {
return &fifoSpec{
typ: fifo,
typ: filters.FifoName,
}
}

func NewFifoWithBody() filters.Spec {
return &fifoSpec{
typ: fifoWithBody,
typ: filters.FifoWithBodyName,
}
}

func (s *fifoSpec) Name() string {
return s.typ.String()
return s.typ
}

// CreateFilter creates a fifoFilter, that will use a semaphore based
Expand Down Expand Up @@ -157,25 +140,16 @@ func (f *fifoFilter) Request(ctx filters.FilterContext) {
}

// ok
pending, _ := ctx.StateBag()[f.typ.String()].([]func())
ctx.StateBag()[f.typ.String()] = append(pending, done)
pending, _ := ctx.StateBag()[f.typ].([]func())
ctx.StateBag()[f.typ] = append(pending, done)
}

// Response will decrease the number of inflight requests to release
// the concurrency reservation for the request.
func (f *fifoFilter) Response(ctx filters.FilterContext) {
g := f.createResponse(ctx)
switch f.typ {
case fifo:
g()
case fifoWithBody:
ctx.StateBag()[filters.FifoWithBody] = g
}
}

func (f *fifoFilter) createResponse(ctx filters.FilterContext) func() {
return func() {
pending, ok := ctx.StateBag()[f.typ.String()].([]func())
case filters.FifoName:
pending, ok := ctx.StateBag()[f.typ].([]func())
if !ok {
return
}
Expand All @@ -184,7 +158,10 @@ func (f *fifoFilter) createResponse(ctx filters.FilterContext) func() {
return
}
pending[last]()
ctx.StateBag()[f.typ.String()] = pending[:last]
ctx.StateBag()[f.typ] = pending[:last]

case filters.FifoWithBodyName:
// nothing to do here, handled in the proxy after copyStream()
}
}

Expand Down

0 comments on commit f8abdfe

Please sign in to comment.