Skip to content

Commit

Permalink
feat(time/rate): add NewReorderBuffer as sugar grammar, shorthand for…
Browse files Browse the repository at this point in the history
… BurstLimiter with exactly only one token that allows instructions to be committed in-order.
  • Loading branch information
searKing committed May 7, 2024
1 parent 720f67a commit a4713fc
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 12 deletions.
67 changes: 58 additions & 9 deletions go/time/rate/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,55 @@ import (
"github.com/searKing/golang/go/time/rate"
)

func ExampleNewReorderBuffer() {
const n = 10
// See https://web.archive.org/web/20040724215416/http://lgjohn.okstate.edu/6253/lectures/reorder.pdf for more about Reorder buffer.
limiter := rate.NewReorderBuffer()
ctx, cancel := context.WithTimeout(context.Background(), time.Hour)
defer cancel()

var wg sync.WaitGroup

for i := 0; i < n; i++ {
i := i

// Allocate: The dispatch stage reserves space in the reorder buffer for instructions in program order.
r := limiter.Reserve(ctx)

wg.Add(1)
go func() {
defer wg.Done()
// Execute out of order
runtime.Gosched() // Increase probability of a race for out-of-order mock
//fmt.Printf("%03d Execute out of order\n", i)

defer r.PutToken()
//fmt.Printf("%03d Wait until in order\n", i)
// Wait: The complete stage must wait for instructions to finish execution.
err := r.Wait(ctx) // Commit in order
if err != nil {
fmt.Printf("err: %s\n", err.Error())
return
}
// Complete: Finished instructions are allowed to write results in order into the architected registers.
fmt.Printf("%03d Complete in order\n", i)
}()
}
wg.Wait()
// Output:
// 000 Complete in order
// 001 Complete in order
// 002 Complete in order
// 003 Complete in order
// 004 Complete in order
// 005 Complete in order
// 006 Complete in order
// 007 Complete in order
// 008 Complete in order
// 009 Complete in order

}

func ExampleNewFullBurstLimiter() {
const (
burst = 3
Expand All @@ -23,7 +72,7 @@ func ExampleNewFullBurstLimiter() {
limiter.PutToken()

for i := 0; ; i++ {
//fmt.Printf("%03d %s\n", i, time.Now().Format(time.RFC3339))
// fmt.Printf("%03d %s\n", i, time.Now().Format(time.RFC3339))
fmt.Printf("Wait %03d, tokens left: %d\n", i, limiter.Tokens())
err := limiter.Wait(ctx)
if err != nil {
Expand Down Expand Up @@ -88,7 +137,7 @@ func ExampleNewEmptyBurstLimiter() {
wg.Add(1)
go func() {
defer wg.Done()
//fmt.Printf("%03d %s\n", i, time.Now().Format(time.RFC3339))
// fmt.Printf("%03d %s\n", i, time.Now().Format(time.RFC3339))
mu.Lock()
fmt.Printf("Wait 1 Token, tokens left: %d\n", limiter.Tokens())
mu.Unlock()
Expand Down Expand Up @@ -162,7 +211,7 @@ func ExampleBurstLimiter_Reserve() {
ctx, cancel := context.WithTimeout(context.Background(), time.Hour)
defer cancel()

// expect dropped, as limiter is initialized with full tokens(3)
// expect dropped, as limiter is initialized with full tokens(1)
limiter.PutToken()

type Reservation struct {
Expand All @@ -175,8 +224,8 @@ func ExampleBurstLimiter_Reserve() {
var rs []*Reservation

for i := 0; i < n; i++ {
//fmt.Printf("%03d %s\n", i, time.Now().Format(time.RFC3339))
//fmt.Printf("Reserve %03d\n", i)
// fmt.Printf("%03d %s\n", i, time.Now().Format(time.RFC3339))
// fmt.Printf("Reserve %03d\n", i)
r := &Reservation{
index: i,
r: limiter.Reserve(ctx),
Expand All @@ -188,8 +237,8 @@ func ExampleBurstLimiter_Reserve() {
wg.Add(1)
go func() {
defer wg.Done()
//fmt.Printf("%03d %s\n", r.index, time.Now().Format(time.RFC3339))
//fmt.Printf("Wait %03d\n", r.index)
// fmt.Printf("%03d %s\n", r.index, time.Now().Format(time.RFC3339))
// fmt.Printf("Wait %03d\n", r.index)
err := r.r.Wait(ctx)
if err != nil {
mu.Lock()
Expand All @@ -209,8 +258,8 @@ func ExampleBurstLimiter_Reserve() {
wg.Add(1)
go func() {
defer wg.Done()
//fmt.Printf("%03d %s\n", r.index, time.Now().Format(time.RFC3339))
//fmt.Printf("Wait %03d\n", r.index)
// fmt.Printf("%03d %s\n", r.index, time.Now().Format(time.RFC3339))
// fmt.Printf("Wait %03d\n", r.index)
err := r.r.Wait(ctx)
if err != nil {
mu.Lock()
Expand Down
18 changes: 15 additions & 3 deletions go/time/rate/rate.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ var expectTokensKey expectKeyType
// Reorder Buffer
// It allows instructions to be committed in-order.
// - Allocated by `Reserve` or `ReserveN` into account when allowing future events
// - Wait by `Wait` or `WaitN` blocks until lim permits n events to happen.
// - Wait by `Wait` or `WaitN` blocks until lim permits n events to happen
// - Allow and Wait Complete by `PutToken` or `PutTokenN`
// - Reserve Complete by `Cancel` of the Reservation self, GC Cancel supported
// See https://en.wikipedia.org/wiki/Re-order_buffer for more about Reorder buffer.
Expand Down Expand Up @@ -82,7 +82,7 @@ func (lim *BurstLimiter) Tokens() int {
return lim.tokens
}

// NewFullBurstLimiter returns a new BurstLimiter inited with full tokens that allows
// NewFullBurstLimiter returns a new BurstLimiter with full tokens that allows
// events up to burst b and permits bursts of at most b tokens.
func NewFullBurstLimiter(b int) *BurstLimiter {
return &BurstLimiter{
Expand All @@ -91,14 +91,26 @@ func NewFullBurstLimiter(b int) *BurstLimiter {
}
}

// NewEmptyBurstLimiter returns a new BurstLimiter inited with zero tokens that allows
// NewEmptyBurstLimiter returns a new BurstLimiter with zero tokens that allows
// events up to burst b and permits bursts of at most b tokens.
func NewEmptyBurstLimiter(b int) *BurstLimiter {
return &BurstLimiter{
burst: b,
}
}

// NewReorderBuffer returns a new BurstLimiter with exactly only one token that allows
// instructions to be committed in-order.
// - Allocated by `Reserve` into account when allowing future events
// - Wait by `Wait` blocks until lim permits n events to happen
// - Allow and Wait Complete by `PutToken`
// - Reserve Complete by `Cancel` of the Reservation self, GC Cancel supported
// See https://en.wikipedia.org/wiki/Re-order_buffer for more about Reorder buffer.
// See https://web.archive.org/web/20040724215416/http://lgjohn.okstate.edu/6253/lectures/reorder.pdf for more about Reorder buffer.
func NewReorderBuffer() *BurstLimiter {
return NewFullBurstLimiter(1)
}

// SetBurst sets a new burst size for the limiter.
func (lim *BurstLimiter) SetBurst(newBurst int) {
lim.mu.Lock()
Expand Down

0 comments on commit a4713fc

Please sign in to comment.