Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
credit refresh
Browse files Browse the repository at this point in the history
  • Loading branch information
kiranrg committed May 5, 2017
1 parent 62fa669 commit febd42b
Showing 1 changed file with 11 additions and 8 deletions.
19 changes: 11 additions & 8 deletions services/outputhost/replicaconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,9 @@ func (conn *replicaConnection) writeCreditsPump() {
defer conn.waitWG.Done()
defer conn.call.Done()

var localCredits int32 // credits accumulated
var creditRefresh time.Time

sendCreditsToStore := func(credits int32) (err error) {

//conn.logger.WithField(`Credits`,cFlow.GetCredits()).Debug(`Sending credits!!`)
Expand All @@ -334,9 +337,13 @@ func (conn *replicaConnection) writeCreditsPump() {

err = conn.call.Flush()
conn.sentCreds += credits
localCredits -= credits
creditRefresh = time.Now()
return
}

localCredits = conn.initialCredits

// send initial credits to the store.. so that we can actually start reading
conn.logger.WithField(`initialCredits`, conn.initialCredits).Info(`writeCreditsPump: sending initial credits to store`)
if err := sendCreditsToStore(conn.initialCredits); err != nil {
Expand All @@ -354,31 +361,28 @@ func (conn *replicaConnection) writeCreditsPump() {
creditBatchSize = minCreditBatchSize
}

var numMsgsRead int32
creditRequestTicker := time.NewTicker(creditRequestTimeout)
defer creditRequestTicker.Stop()

// Start the write pump
for {
// listen for credits only if we satisfy the batch size
if numMsgsRead > creditBatchSize {
if localCredits > creditBatchSize {
select {
case credits := <-conn.creditNotifyCh: // this is the common credit channel from redelivery cache
if err := sendCreditsToStore(credits); err != nil {
return
}
numMsgsRead -= credits
case credits := <-conn.localCreditCh: // this is the local credits channel only for this connection
if err := sendCreditsToStore(credits); err != nil {
return
}
numMsgsRead -= credits
case msgsRead := <-conn.readMsgsCh:
numMsgsRead += msgsRead
localCredits += msgsRead
case <-creditRequestTicker.C:
// if we didn't get any credits for a long time and we are starving for
// credits, then request some credits
if numMsgsRead >= conn.sentCreds {
if time.Since(creditRefresh) > creditRequestTimeout {
conn.logger.Warn("writeCreditsPump: starving for credits, requesting more")
conn.extCache.requestCredits()
}
Expand All @@ -389,14 +393,13 @@ func (conn *replicaConnection) writeCreditsPump() {
} else {
select {
case msgsRead := <-conn.readMsgsCh:
numMsgsRead += msgsRead
localCredits += msgsRead
// we should listen on the local credits ch, just in case we requested for some credits
// above and we are just getting it now
case credits := <-conn.localCreditCh:
if err := sendCreditsToStore(credits); err != nil {
return
}
numMsgsRead -= credits
case <-conn.closeChannel:
conn.logger.Info("writeCreditsPump: connection closed")
return
Expand Down

0 comments on commit febd42b

Please sign in to comment.