Skip to content

Commit

Permalink
use atomic operations instead of event loop handling
Browse files Browse the repository at this point in the history
  • Loading branch information
ross-pure committed Dec 6, 2021
1 parent 0e8e090 commit 17a441b
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 37 deletions.
5 changes: 3 additions & 2 deletions process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package process

import (
"fmt"
"sync/atomic"

"github.com/renproject/id"
"github.com/renproject/surge"
Expand Down Expand Up @@ -315,7 +316,7 @@ func (p *Process) StartRound(round Round) {
// sequence. We do not have special methods dedicated to change the current
// Round, or changing the current Step to Proposing, because StartRound is
// the only location where this logic happens.
p.CurrentRound = round
atomic.StoreInt64((*int64)(&p.CurrentRound), int64(round))
p.CurrentStep = Proposing

// If we are not the proposer, then we trigger the propose timeout.
Expand Down Expand Up @@ -707,7 +708,7 @@ func (p *Process) tryCommitUponSufficientPrecommits(round Round) {
if scheduler != nil {
p.scheduler = scheduler
}
p.CurrentHeight++
atomic.AddInt64((*int64)(&p.CurrentHeight), 1)

// Reset lockedRound, lockedValue, validRound, and validValue to initial
// values.
Expand Down
35 changes: 5 additions & 30 deletions replica/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package replica

import (
"context"
"sync/atomic"

"github.com/renproject/hyperdrive/mq"
"github.com/renproject/hyperdrive/process"
Expand Down Expand Up @@ -142,12 +143,6 @@ func (replica *Replica) Run(ctx context.Context) {
replica.procsAllowed[sig] = true
}
}
case getState:
m.responder <- getStateResponse{
height: replica.proc.CurrentHeight,
round: replica.proc.CurrentRound,
step: replica.proc.CurrentStep,
}
}
}

Expand Down Expand Up @@ -240,32 +235,12 @@ func (replica *Replica) ResetHeight(ctx context.Context, newHeight process.Heigh
}
}

type getState struct {
responder chan getStateResponse
}

type getStateResponse struct {
height process.Height
round process.Round
step process.Step
func (replica Replica) Height() process.Height {
return process.Height(atomic.LoadInt64((*int64)(&replica.proc.State.CurrentHeight)))
}

// State returns the current height, round and step of the underlying process.
func (replica Replica) State(ctx context.Context) (process.Height, process.Round, process.Step, error) {
responder := make(chan getStateResponse, 1)

select {
case <-ctx.Done():
return process.Height(0), process.Round(0), process.Step(0), ctx.Err()
case replica.mch <- getState{responder}:
}

select {
case <-ctx.Done():
return process.Height(0), process.Round(0), process.Step(0), ctx.Err()
case response := <-responder:
return response.height, response.round, response.step, nil
}
func (replica Replica) Round() process.Round {
return process.Round(atomic.LoadInt64((*int64)(&replica.proc.State.CurrentRound)))
}

func (replica *Replica) filterHeight(height process.Height) bool {
Expand Down
6 changes: 1 addition & 5 deletions replica/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,11 +825,7 @@ var _ = Describe("Replica", func() {

// if its not yet killed
if _, isKilled := killedReplicas[uint8(id)]; !isKilled {
var err error
intermTargetHeight, _, _, err = replicas[id].State(context.Background())
if err != nil {
panic(err)
}
intermTargetHeight = replicas[id].Height()

// kill the replica
killedReplicas[uint8(id)] = true
Expand Down

0 comments on commit 17a441b

Please sign in to comment.