Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup teardown #1

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 76 additions & 38 deletions bucket.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package ioshape

import (
"fmt"
"sync"
"sync/atomic"
"time"
)

Expand All @@ -27,18 +27,22 @@ type Bucket struct {
ticker *time.Ticker
ticks int64
stopCh chan struct{}
stopped int32
stopMu sync.Mutex
stopped bool
doneCh chan struct{}
tokenRequests chan *bucketTokenRequest
tokenReturns chan *bucketTokenReturn
}

// NewBucket returns a new Bucket.
func NewBucket() (bu *Bucket) {
bu = &Bucket{}
bu.ticker = time.NewTicker(time.Second / freq)
bu.stopCh = make(chan struct{}, 1)
bu.tokenRequests = make(chan *bucketTokenRequest)
bu.tokenReturns = make(chan *bucketTokenReturn)
bu = &Bucket{
ticker: time.NewTicker(time.Second / freq),
stopCh: make(chan struct{}),
doneCh: make(chan struct{}),
tokenRequests: make(chan *bucketTokenRequest),
tokenReturns: make(chan *bucketTokenReturn),
}
go bu.timer()
return
}
Expand All @@ -51,21 +55,15 @@ func NewBucketRate(rate int64) (bu *Bucket) {
}

func (bu *Bucket) timer() {
defer close(bu.doneCh)
var n, k, b, m int64
tokenRequests := bu.tokenRequests
var pendingRequest *bucketTokenRequest
for {
select {
case <-bu.stopCh:
atomic.StoreInt32(&bu.stopped, 1)
time.Sleep(10 * time.Millisecond)
for ok := true; ok; {
select {
case tokenRequest := <-bu.tokenRequests:
tokenRequest.callback <- tokenRequest.count
default:
ok = false
}
}
return

case <-bu.ticker.C:
bu.setMu.RLock()
n = bu.n
Expand All @@ -84,19 +82,24 @@ func (bu *Bucket) timer() {
if bu.ticks > freq {
bu.ticks = 0
}
case tokenRequest := <-bu.tokenRequests:
count := tokenRequest.count
if count > bu.tokens {
count = bu.tokens
}
if count > m {
count = m

if pendingRequest != nil {
done := bu.handleReaquest(pendingRequest, m)
if !done {
fmt.Println("Warning: This should not happen")
pendingRequest.callback <- pendingRequest.count
}
tokenRequests = bu.tokenRequests
pendingRequest = nil
}
if tokenRequest.priority > int((priorityScale*bu.ticks/freq)%priorityScale) {
count = 0

case tokenRequest := <-tokenRequests:
done := bu.handleReaquest(tokenRequest, m)
if !done {
tokenRequests = nil
pendingRequest = tokenRequest
}
tokenRequest.callback <- count
bu.tokens -= count

case tokenReturn := <-bu.tokenReturns:
count := tokenReturn.count
bu.tokens += count
Expand All @@ -107,14 +110,37 @@ func (bu *Bucket) timer() {
}
}

// handleReaquest may only be called from the timer loop
func (bu *Bucket) handleReaquest(r *bucketTokenRequest, m int64) bool {
count := r.count
if count > bu.tokens {
count = bu.tokens
}
if count > m {
count = m
}
if count == 0 {
return false
}
if r.priority > int((priorityScale*bu.ticks/freq)%priorityScale) {
count = 0
}
r.callback <- count
bu.tokens -= count
return true
}

// Stop turns off a bucket. After Stop, bucket won't shape traffic. Stop
// must be call to free resources, after the bucket doesn't be needing.
func (bu *Bucket) Stop() {
bu.ticker.Stop()
select {
case bu.stopCh <- struct{}{}:
default:
bu.stopMu.Lock()
defer bu.stopMu.Unlock()
if bu.stopped {
return
}
bu.ticker.Stop()
close(bu.stopCh)
bu.stopped = true
}

// Set sets buckets rate and burst in bytes per second. The burst should be
Expand Down Expand Up @@ -144,17 +170,29 @@ func (bu *Bucket) SetRate(rate int64) {

func (bu *Bucket) getTokens(count int64, priority int) int64 {
callback := make(chan int64)
if count > 0 && bu.stopped == 0 {
bu.tokenRequests <- &bucketTokenRequest{
if count > 0 {
select {
case bu.tokenRequests <- &bucketTokenRequest{
count: count,
callback: callback,
priority: priority}
return <-callback
priority: priority}:
select {
case c := <-callback:
return c
case <-bu.doneCh:
return count
}
case <-bu.doneCh:
return count
}
}
return count
}

func (bu *Bucket) giveTokens(count int64) {
bu.tokenReturns <- &bucketTokenReturn{
count: count}
select {
case bu.tokenReturns <- &bucketTokenReturn{
count: count}:
case <-bu.doneCh:
}
}
4 changes: 4 additions & 0 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ type Reader struct {
Pr int // priority
}

func (rr *Reader) Close() error {
return nil
}

// Read reads from R by b.
func (rr *Reader) Read(p []byte) (n int, err error) {
if rr.B == nil {
Expand Down