Skip to content

Commit

Permalink
Replace semaphore implementation with golang.org/x/sync/semaphore
Browse files Browse the repository at this point in the history
  • Loading branch information
DarthSim committed May 30, 2024
1 parent c1fc3fc commit 68d3894
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 61 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ require (
go.uber.org/automaxprocs v1.5.3
golang.org/x/image v0.15.0
golang.org/x/net v0.24.0
golang.org/x/sync v0.7.0
golang.org/x/sys v0.19.0
google.golang.org/api v0.176.1
google.golang.org/grpc v1.63.2
Expand Down Expand Up @@ -167,7 +168,6 @@ require (
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/oauth2 v0.19.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/term v0.19.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.5.0 // indirect
Expand Down
27 changes: 14 additions & 13 deletions processing_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

log "github.com/sirupsen/logrus"
"golang.org/x/sync/semaphore"

"github.com/imgproxy/imgproxy/v3/config"
"github.com/imgproxy/imgproxy/v3/cookies"
Expand All @@ -25,24 +26,23 @@ import (
"github.com/imgproxy/imgproxy/v3/processing"
"github.com/imgproxy/imgproxy/v3/router"
"github.com/imgproxy/imgproxy/v3/security"
"github.com/imgproxy/imgproxy/v3/semaphore"
"github.com/imgproxy/imgproxy/v3/svg"
"github.com/imgproxy/imgproxy/v3/vips"
)

var (
queueSem *semaphore.Semaphore
processingSem *semaphore.Semaphore
queueSem *semaphore.Weighted
processingSem *semaphore.Weighted

headerVaryValue string
)

func initProcessingHandler() {
if config.RequestsQueueSize > 0 {
queueSem = semaphore.New(config.RequestsQueueSize + config.Workers)
queueSem = semaphore.NewWeighted(int64(config.RequestsQueueSize + config.Workers))
}

processingSem = semaphore.New(config.Workers)
processingSem = semaphore.NewWeighted(int64(config.Workers))

vary := make([]string, 0)

Expand Down Expand Up @@ -205,11 +205,11 @@ func handleProcessing(reqID string, rw http.ResponseWriter, r *http.Request) {
ctx := r.Context()

if queueSem != nil {
token, acquired := queueSem.TryAcquire()
acquired := queueSem.TryAcquire(1)
if !acquired {
panic(ierrors.New(429, "Too many requests", "Too many requests"))
}
defer token.Release()
defer queueSem.Release(1)
}

path := r.RequestURI
Expand Down Expand Up @@ -282,21 +282,22 @@ func handleProcessing(reqID string, rw http.ResponseWriter, r *http.Request) {
}
}

// The heavy part start here, so we need to restrict worker number
var processingSemToken *semaphore.Token
// The heavy part starts here, so we need to restrict worker number
func() {
defer metrics.StartQueueSegment(ctx)()

var acquired bool
processingSemToken, acquired = processingSem.Acquire(ctx)
if !acquired {
err = processingSem.Acquire(ctx, 1)
if err != nil {
// We don't actually need to check timeout here,
// but it's an easy way to check if this is an actual timeout
// or the request was canceled
checkErr(ctx, "queue", router.CheckTimeout(ctx))
// We should never reach this line as err could be only ctx.Err()
// and we've already checked for it. But beter safe than sorry
sendErrAndPanic(ctx, "queue", err)
}
}()
defer processingSemToken.Release()
defer processingSem.Release(1)

stats.IncImagesInProgress()
defer stats.DecImagesInProgress()
Expand Down
47 changes: 0 additions & 47 deletions semaphore/semaphore.go

This file was deleted.

0 comments on commit 68d3894

Please sign in to comment.