Skip to content

Commit

Permalink
Merge pull request #24 from platinummonkey/patterns
Browse files Browse the repository at this point in the history
add patterns for common pool pattern
  • Loading branch information
platinummonkey committed Dec 27, 2019
2 parents 86be4ef + ab99a52 commit b4dce7d
Show file tree
Hide file tree
Showing 8 changed files with 566 additions and 8 deletions.
18 changes: 10 additions & 8 deletions core/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type MeasurementInterface interface {

// SampleWindow represents the details of the current sample window
type SampleWindow interface {
// StartTimeNanoseoncds returns the epoch start time in nanoseconds.
// StartTimeNanoseconds returns the epoch start time in nanoseconds.
StartTimeNanoseconds() int64
// CandidateRTTNanoseconds returns the candidate RTT in the sample window. This is traditionally the minimum rtt.
CandidateRTTNanoseconds() int64
Expand All @@ -45,14 +45,16 @@ type Limit interface {
EstimatedLimit() int

// NotifyOnChange will register a callback to receive notification whenever the limit is updated to a new value.
// @consumer the callback
//
// consumer - the callback
NotifyOnChange(consumer LimitChangeListener)

// OnSample the concurrency limit using a new rtt sample.
// @startTime in epoch nanoseconds
// @rtt round trip time of sample
// @inFlight in flight observed count during the sample
// @didDrop true if there was a timeout
//
// startTime - in epoch nanoseconds
// rtt - round trip time of sample
// inFlight - in flight observed count during the sample
// didDrop - true if there was a timeout
OnSample(startTime int64, rtt int64, inFlight int, didDrop bool)
}

Expand All @@ -74,10 +76,10 @@ type Listener interface {
// and must also release the returned listener when the operation completes. Releasing the Listener
// may trigger an update to the concurrency limit based on error rate or latency measurement.
type Limiter interface {
// Acquire a token from the limiter. Returns an Optional.empty() if the limit has been exceeded.
// Acquire a token from the limiter. Returns a nil listener if the limit has been exceeded.
// If acquired the caller must call one of the Listener methods when the operation has been completed to release
// the count.
//
// context Context for the request. The context is used by advanced strategies such as LookupPartitionStrategy.
// context - Context for the request. The context is used by advanced strategies such as LookupPartitionStrategy.
Acquire(ctx context.Context) (listener Listener, ok bool)
}
129 changes: 129 additions & 0 deletions examples/example_concurrent_loading/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package main

import (
"context"
"fmt"
"log"
"math"
"math/rand"
"os"
"sync"
"sync/atomic"
"time"

"github.com/platinummonkey/go-concurrency-limits/core"
"github.com/platinummonkey/go-concurrency-limits/limit"
"github.com/platinummonkey/go-concurrency-limits/limiter"
"github.com/platinummonkey/go-concurrency-limits/strategy"
)

type contextKey string

const testContextKey contextKey = "jobID"

type resource struct {
counter *int64
}

func (r *resource) poll(ctx context.Context) (bool, error) {
currentCount := atomic.AddInt64(r.counter, 1)
id := ctx.Value(testContextKey).(int)
log.Printf("request started for id=%d currentCount=%d\n", id, currentCount)
numKeys := rand.Int63n(1000)

// fake a scan, every key lookup additional non-linear time
scanTime := time.Duration(numKeys)*time.Nanosecond + time.Duration(int64(math.Exp(float64(numKeys)/100.0)))*time.Millisecond
scanTimeMillis := scanTime.Milliseconds()

// sleep some time
time.Sleep(scanTime)
currentCount = atomic.AddInt64(r.counter, -1)
log.Printf("request succeeded for id=%d currentCount=%d scanTime=%d ms\n", id, currentCount, scanTimeMillis)
return true, nil
}

type protectedResource struct {
external *resource
guard core.Limiter
}

func (r *protectedResource) poll(ctx context.Context) (bool, error) {
id := ctx.Value(testContextKey).(int)
log.Printf("guarded request started for id=%d\n", id)
token, ok := r.guard.Acquire(ctx)
if !ok {
// short circuit no need to try
log.Printf("guarded request short circuited for id=%d\n", id)
if token != nil {
token.OnDropped()
}
return false, fmt.Errorf("short circuited request id=%d", id)
}

// try to make request
_, err := r.external.poll(ctx)
if err != nil {
token.OnDropped()
log.Printf("guarded request failed for id=%d err=%v\n", id, err)
return false, fmt.Errorf("request failed err=%v", err)
}
token.OnSuccess()
log.Printf("guarded request succeeded for id=%d\n", id)
return true, nil
}

func main() {
l := 1000
limitStrategy := strategy.NewSimpleStrategy(l)
logger := limit.BuiltinLimitLogger{}
defaultLimiter, err := limiter.NewDefaultLimiter(
limit.NewFixedLimit(
"initializer_limiter",
l,
core.EmptyMetricRegistryInstance,
),
int64(time.Millisecond*250),
int64(time.Millisecond*500),
int64(time.Millisecond*10),
100,
limitStrategy,
logger,
core.EmptyMetricRegistryInstance,
)
externalResourceLimiter := limiter.NewBlockingLimiter(defaultLimiter, time.Second, logger)

if err != nil {
log.Fatalf("Error creating limiter err=%v\n", err)
os.Exit(-1)
}

initialCount := int64(0)
fakeExternalResource := &resource{
counter: &initialCount,
}

guardedResource := &protectedResource{
external: fakeExternalResource,
guard: externalResourceLimiter,
}
atomic.StoreInt64(fakeExternalResource.counter, 0)

endOfExampleTimer := time.NewTimer(time.Second * 10)
wg := sync.WaitGroup{}

// spin up 10*l consumers
wg.Add(10*l)
for i := 0; i < 10*l; i++ {
go func(c int) {
for i := 0; i < 5; i++ {
defer wg.Done()
ctx := context.WithValue(context.Background(), testContextKey, c+i)
guardedResource.poll(ctx)
}
}(i)
}

<-endOfExampleTimer.C
log.Printf("Waiting for go-routines to finish...")
wg.Wait()
}
56 changes: 56 additions & 0 deletions patterns/example_fixed_pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package patterns

import (
"context"
"log"
"sync"
"time"

"github.com/platinummonkey/go-concurrency-limits/limit"
)

func ExampleFixedPool() {
var JobKey = "job_id"

l := 1000 // limit to 1000 concurrent requests.
// create a new pool
pool, err := NewFixedPool(
"protected_resource_pool",
l,
100,
time.Millisecond*250,
time.Millisecond*500,
time.Millisecond*10,
time.Second,
limit.BuiltinLimitLogger{},
nil,
)
if err != nil {
panic(err)
}

wg := sync.WaitGroup{}
wg.Add(l * 3)
// spawn 3000 concurrent requests that would normally be too much load for the protected resource.
for i := 0; i <= l*3; i++ {
go func(c int) {
defer wg.Done()
ctx := context.WithValue(context.Background(), JobKey, c)
// this will block until timeout or token was acquired.
listener, ok := pool.Acquire(ctx)
if !ok {
log.Printf("was not able to acquire lock for id=%d\n", c)
return
}
log.Printf("acquired lock for id=%d\n", c)
// do something...
time.Sleep(time.Millisecond * 10)
listener.OnSuccess()
log.Printf("released lock for id=%d\n", c)
}(i)
}

// wait for completion
wg.Wait()
log.Println("Finished")
}
75 changes: 75 additions & 0 deletions patterns/example_generic_pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package patterns

import (
"context"
"log"
"sync"
"time"

"github.com/platinummonkey/go-concurrency-limits/limit"
"github.com/platinummonkey/go-concurrency-limits/limiter"
"github.com/platinummonkey/go-concurrency-limits/strategy"
)

func ExamplePool() {
var JobKey = "job_id"

l := 1000 // limit to adjustable 1000 concurrent requests.
delegateLimit := limit.NewDefaultAIMLimit(
"aimd_limiter",
nil,
)
// wrap with a default limiter and simple strategy
// you could of course get very complicated with this.
delegateLimiter, err := limiter.NewDefaultLimiter(
delegateLimit,
(time.Millisecond * 250).Nanoseconds(),
(time.Millisecond * 500).Nanoseconds(),
(time.Millisecond * 10).Nanoseconds(),
100,
strategy.NewSimpleStrategy(l),
limit.BuiltinLimitLogger{},
nil,
)
if err != nil {
panic(err)
}

// create a new pool
pool, err := NewPool(
delegateLimiter,
false,
0,
time.Second,
limit.BuiltinLimitLogger{},
nil,
)
if err != nil {
panic(err)
}

wg := sync.WaitGroup{}
wg.Add(l * 3)
// spawn 3000 concurrent requests that would normally be too much load for the protected resource.
for i := 0; i <= l*3; i++ {
go func(c int) {
defer wg.Done()
ctx := context.WithValue(context.Background(), JobKey, c)
// this will block until timeout or token was acquired.
listener, ok := pool.Acquire(ctx)
if !ok {
log.Printf("was not able to acquire lock for id=%d\n", c)
return
}
log.Printf("acquired lock for id=%d\n", c)
// do something...
time.Sleep(time.Millisecond * 10)
listener.OnSuccess()
log.Printf("released lock for id=%d\n", c)
}(i)
}

// wait for completion
wg.Wait()
log.Println("Finished")
}
57 changes: 57 additions & 0 deletions patterns/example_lifo_fixed_pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package patterns

import (
"context"
"log"
"sync"
"time"

"github.com/platinummonkey/go-concurrency-limits/limit"
)

func ExampleLIFOFixedPool() {
var JobKey = "job_id"

l := 1000 // limit to 1000 concurrent requests.
// create a new pool
pool, err := NewLIFOFixedPool(
"protected_resource_pool",
l,
100,
time.Millisecond*250,
time.Millisecond*500,
time.Millisecond*10,
3*l,
time.Second,
limit.BuiltinLimitLogger{},
nil,
)
if err != nil {
panic(err)
}

wg := sync.WaitGroup{}
wg.Add(l * 3)
// spawn 3000 concurrent requests that would normally be too much load for the protected resource.
for i := 0; i <= l*3; i++ {
go func(c int) {
defer wg.Done()
ctx := context.WithValue(context.Background(), JobKey, c)
// this will block until timeout or token was acquired.
listener, ok := pool.Acquire(ctx)
if !ok {
log.Printf("was not able to acquire lock for id=%d\n", c)
return
}
log.Printf("acquired lock for id=%d\n", c)
// do something...
time.Sleep(time.Millisecond * 10)
listener.OnSuccess()
log.Printf("released lock for id=%d\n", c)
}(i)
}

// wait for completion
wg.Wait()
log.Println("Finished")
}
Loading

0 comments on commit b4dce7d

Please sign in to comment.