🚦 Semaphore pattern implementation with timeout of lock/unlock operations based on channels.
Clone or download
Latest commit 41765da Oct 7, 2018
Permalink
Type Name Latest commit message Commit time
Failed to load latest commit information.
.github update checklist May 5, 2018
cmd/semaphore update readme Sep 22, 2018
makes update .github, makes, travis and scrutinizer configs May 5, 2018
.gitattributes fix issue #113: use runtime env vars in help command Oct 28, 2017
.gitignore update .github, makes, travis and scrutinizer configs May 5, 2018
.goreleaser.yml fix #136: add Go 1.10; fix #135: integrate with scrutinizer; fix #134:… Feb 19, 2018
.scrutinizer.yml update #144: extend exlude paths in scrutinizer config May 6, 2018
.travis.yml fix #144: fix builds on go 1.5 and 1.6 May 6, 2018
LICENSE fix #136: add Go 1.10; fix #135: integrate with scrutinizer; fix #134:… Feb 19, 2018
Makefile update .github, makes, travis and scrutinizer configs May 5, 2018
README.md Merge branch 'dev' Oct 7, 2018
channel.go fix #137: hold vendor dir, simplify travis and scrutinizer config Mar 6, 2018
channel_test.go fix #144: fix builds on go 1.5 and 1.6 May 6, 2018
context.go fix #137: hold vendor dir, simplify travis and scrutinizer config Mar 6, 2018
context_test.go fix #137: hold vendor dir, simplify travis and scrutinizer config Mar 6, 2018
default.go fix issue #88: complete worker pool example Aug 30, 2017
default_test.go fix issue #88: complete worker pool example Aug 30, 2017
example_helper_test.go fix issue #87: add interrupt example Aug 29, 2017
example_pool_workers_test.go issue #97: update README.md Oct 7, 2017
example_request_throughput_limit_test.go fix issue #88: complete worker pool example Aug 30, 2017
example_response_time_limit_test.go fix issue #87: add interrupt example Aug 29, 2017
example_semaphore_with_context_test.go fix issue #88: complete worker pool example Aug 30, 2017
example_user_rate_limit_test.go issue #97: update README.md Oct 7, 2017
semaphore.go update description Dec 21, 2017
semaphore_test.go issue #118: remove benchmarks from dev branch Nov 29, 2017

README.md

🚦 semaphore Tweet

Analytics Semaphore pattern implementation with timeout of lock/unlock operations based on channels.

Awesome Patreon Build Status Code Coverage Code Quality GoDoc Research License

Usage

Quick start

limiter := semaphore.New(1000)

http.HandleFunc("/", func(rw http.ResponseWriter, _ *http.Request) {
	if _, err := limiter.Acquire(semaphore.WithTimeout(time.Minute)); err != nil {
		http.Error(rw, http.StatusText(http.StatusTooManyRequests), http.StatusTooManyRequests)
		return
	}
	defer limiter.Release()
	// handle request
})

log.Fatal(http.ListenAndServe(":80", nil))

Console tool for command execution in parallel

This example shows how to execute many console commands in parallel.

$ semaphore create 2
$ semaphore add -- docker build
$ semaphore add -- vagrant up
$ semaphore add -- ansible-playbook
$ semaphore wait --timeout=1m --notify

asciicast

See more details here.

HTTP response time limitation

This example shows how to follow SLA.

sla := 100 * time.Millisecond
sem := semaphore.New(1000)

http.Handle("/do-with-timeout", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
	done := make(chan struct{})
	deadline := semaphore.WithTimeout(sla)

	go func() {
		release, err := sem.Acquire(deadline)
		if err != nil {
			return
		}
		defer release()
		defer close(done)

		// do some heavy work
	}()

	// wait what happens before
	select {
	case <-deadline:
		http.Error(rw, "operation timeout", http.StatusGatewayTimeout)
	case <-done:
		// send success response
	}
}))

HTTP request throughput limitation

This example shows how to limit request throughput.

limiter := func(limit int, timeout time.Duration, handler http.HandlerFunc) http.HandlerFunc {
	throughput := semaphore.New(limit)
	return func(rw http.ResponseWriter, req *http.Request) {
		deadline := semaphore.WithTimeout(timeout)

		release, err := throughput.Acquire(deadline)
		if err != nil {
			http.Error(rw, err.Error(), http.StatusTooManyRequests)
			return
		}
		defer release()

		handler.ServeHTTP(rw, req)
	}
}

http.HandleFunc("/do-with-limit", limiter(1000, time.Minute, func(rw http.ResponseWriter, req *http.Request) {
	// do some limited work
}))

HTTP personal rate limitation

This example shows how to create user-specific rate limiter.

func LimiterForUser(user User, cnf Config) semaphore.Semaphore {
	mx.RLock()
	limiter, ok := limiters[user]
	mx.RUnlock()
	if !ok {
		mx.Lock()
		// handle negative case
		mx.Unlock()
	}
	return limiter
}

func RateLimiter(cnf Config, handler http.HandlerFunc) http.HandlerFunc {
	return func(rw http.ResponseWriter, req *http.Request) {
		user, ok := // get user from request context

		limiter := LimiterForUser(user, cnf)
		release, err := limiter.Acquire(semaphore.WithTimeout(cnf.SLA))
		if err != nil {
			http.Error(rw, err.Error(), http.StatusGatewayTimeout)
			return
		}

		// handle the request in separated goroutine because the current will be held
		go func() { handler.ServeHTTP(rw, req) }()

		// hold the place for a required time
		rl, ok := cnf.RateLimit[user]
		if !ok {
			rl = cnf.DefaultRateLimit
		}
		time.Sleep(rl)
		release()
		// rate limit = semaphore capacity / rate limit time, e.g. 10 request per second 
	}
}

http.HandleFunc("/do-with-rate-limit", RateLimiter(cnf, func(rw http.ResponseWriter, req *http.Request) {
	// do some rate limited work
}))

Use context for cancellation

This example shows how to use context and semaphore together.

deadliner := func(limit int, timeout time.Duration, handler http.HandlerFunc) http.HandlerFunc {
	throughput := semaphore.New(limit)
	return func(rw http.ResponseWriter, req *http.Request) {
		ctx := semaphore.WithContext(req.Context(), semaphore.WithTimeout(timeout))

		release, err := throughput.Acquire(ctx.Done())
		if err != nil {
			http.Error(rw, err.Error(), http.StatusGatewayTimeout)
			return
		}
		defer release()

		handler.ServeHTTP(rw, req.WithContext(ctx))
	}
}

http.HandleFunc("/do-with-deadline", deadliner(1000, time.Minute, func(rw http.ResponseWriter, req *http.Request) {
	// do some limited work
}))

A pool of workers

This example shows how to create a pool of workers based on semaphore.

type Pool struct {
	sem  semaphore.Semaphore
	work chan func()
}

func (p *Pool) Schedule(task func()) {
	select {
	case p.work <- task: // delay the task to already running workers
	case release, ok := <-p.sem.Signal(nil): if ok { go p.worker(task, release) } // ok is always true in this case
	}
}

func (p *Pool) worker(task func(), release semaphore.ReleaseFunc) {
	defer release()
	var ok bool
	for {
		task()
		task, ok = <-p.work
		if !ok { return }
	}
}

func New(size int) *Pool {
	return &Pool{
		sem:  semaphore.New(size),
		work: make(chan func()),
	}
}

func main() {
	pool := New(2)
	pool.Schedule(func() { /* do some work */ })
	...
	pool.Schedule(func() { /* do some work */ })
}

Interrupt execution

sem := semaphore.New(runtime.GOMAXPROCS(0))
interrupter := semaphore.Multiplex(
	semaphore.WithTimeout(time.Second),
	semaphore.WithSignal(os.Interrupt),
)
_, err := sem.Acquire(interrupter)
if err == nil {
	panic("press Ctrl+C")
}
// successful interruption

Installation

$ go get github.com/kamilsk/semaphore
$ # or use mirror
$ egg bitbucket.org/kamilsk/semaphore

egg1 is an extended go get.

Update

This library is using SemVer for versioning, and it is not BC-safe. Therefore, do not use go get -u to update it, use dep or something similar for this purpose.

1 The project is still in prototyping.


Gitter @kamilsk @octolab

made with ❤️ by OctoLab