Skip to content

Commit

Permalink
Merge pull request #4697 from carlaKC/4481-batchstartandblocks
Browse files Browse the repository at this point in the history
contractcourt: batch startup reads and block epoch notifications
  • Loading branch information
Roasbeef committed Nov 14, 2020
2 parents 7afb5b4 + 697dbf7 commit c024758
Show file tree
Hide file tree
Showing 5 changed files with 317 additions and 141 deletions.
106 changes: 66 additions & 40 deletions contractcourt/briefcase.go
Expand Up @@ -54,8 +54,10 @@ type ArbitratorLog interface {
// TODO(roasbeef): document on interface the errors expected to be
// returned

// CurrentState returns the current state of the ChannelArbitrator.
CurrentState() (ArbitratorState, error)
// CurrentState returns the current state of the ChannelArbitrator. It
// takes an optional database transaction, which will be used if it is
// non-nil, otherwise the lookup will be done in its own transaction.
CurrentState(tx kvdb.RTx) (ArbitratorState, error)

// CommitState persists, the current state of the chain attendant.
CommitState(ArbitratorState) error
Expand Down Expand Up @@ -96,8 +98,10 @@ type ArbitratorLog interface {
InsertConfirmedCommitSet(c *CommitSet) error

// FetchConfirmedCommitSet fetches the known confirmed active HTLC set
// from the database.
FetchConfirmedCommitSet() (*CommitSet, error)
// from the database. It takes an optional database transaction, which
// will be used if it is non-nil, otherwise the lookup will be done in
// its own transaction.
FetchConfirmedCommitSet(tx kvdb.RTx) (*CommitSet, error)

// FetchChainActions attempts to fetch the set of previously stored
// chain actions. We'll use this upon restart to properly advance our
Expand Down Expand Up @@ -412,34 +416,49 @@ func (b *boltArbitratorLog) writeResolver(contractBucket kvdb.RwBucket,
return contractBucket.Put(resKey, buf.Bytes())
}

// CurrentState returns the current state of the ChannelArbitrator.
// CurrentState returns the current state of the ChannelArbitrator. It takes an
// optional database transaction, which will be used if it is non-nil, otherwise
// the lookup will be done in its own transaction.
//
// NOTE: Part of the ContractResolver interface.
func (b *boltArbitratorLog) CurrentState() (ArbitratorState, error) {
var s ArbitratorState
err := kvdb.View(b.db, func(tx kvdb.RTx) error {
scopeBucket := tx.ReadBucket(b.scopeKey[:])
if scopeBucket == nil {
return errScopeBucketNoExist
}
func (b *boltArbitratorLog) CurrentState(tx kvdb.RTx) (ArbitratorState, error) {
var (
s ArbitratorState
err error
)

stateBytes := scopeBucket.Get(stateKey)
if stateBytes == nil {
return nil
}
if tx != nil {
s, err = b.currentState(tx)
} else {
err = kvdb.View(b.db, func(tx kvdb.RTx) error {
s, err = b.currentState(tx)
return err
}, func() {
s = 0
})
}

s = ArbitratorState(stateBytes[0])
return nil
}, func() {
s = 0
})
if err != nil && err != errScopeBucketNoExist {
return s, err
}

return s, nil
}

func (b *boltArbitratorLog) currentState(tx kvdb.RTx) (ArbitratorState, error) {
scopeBucket := tx.ReadBucket(b.scopeKey[:])
if scopeBucket == nil {
return 0, errScopeBucketNoExist
}

stateBytes := scopeBucket.Get(stateKey)
if stateBytes == nil {
return 0, nil
}

return ArbitratorState(stateBytes[0]), nil
}

// CommitState persists, the current state of the chain attendant.
//
// NOTE: Part of the ContractResolver interface.
Expand Down Expand Up @@ -851,29 +870,20 @@ func (b *boltArbitratorLog) InsertConfirmedCommitSet(c *CommitSet) error {
}

// FetchConfirmedCommitSet fetches the known confirmed active HTLC set from the
// database.
// database. It takes an optional database transaction, which will be used if it
// is non-nil, otherwise the lookup will be done in its own transaction.
//
// NOTE: Part of the ContractResolver interface.
func (b *boltArbitratorLog) FetchConfirmedCommitSet() (*CommitSet, error) {
func (b *boltArbitratorLog) FetchConfirmedCommitSet(tx kvdb.RTx) (*CommitSet, error) {
if tx != nil {
return b.fetchConfirmedCommitSet(tx)
}

var c *CommitSet
err := kvdb.View(b.db, func(tx kvdb.RTx) error {
scopeBucket := tx.ReadBucket(b.scopeKey[:])
if scopeBucket == nil {
return errScopeBucketNoExist
}

commitSetBytes := scopeBucket.Get(commitSetKey)
if commitSetBytes == nil {
return errNoCommitSet
}

commitSet, err := decodeCommitSet(bytes.NewReader(commitSetBytes))
if err != nil {
return err
}

c = commitSet
return nil
var err error
c, err = b.fetchConfirmedCommitSet(tx)
return err
}, func() {
c = nil
})
Expand All @@ -884,6 +894,22 @@ func (b *boltArbitratorLog) FetchConfirmedCommitSet() (*CommitSet, error) {
return c, nil
}

func (b *boltArbitratorLog) fetchConfirmedCommitSet(tx kvdb.RTx) (*CommitSet,
error) {

scopeBucket := tx.ReadBucket(b.scopeKey[:])
if scopeBucket == nil {
return nil, errScopeBucketNoExist
}

commitSetBytes := scopeBucket.Get(commitSetKey)
if commitSetBytes == nil {
return nil, errNoCommitSet
}

return decodeCommitSet(bytes.NewReader(commitSetBytes))
}

// WipeHistory is to be called ONLY once *all* contracts have been fully
// resolved, and the channel closure if finalized. This method will delete all
// on-disk state within the persistent log.
Expand Down
12 changes: 6 additions & 6 deletions contractcourt/briefcase_test.go
Expand Up @@ -611,7 +611,7 @@ func TestStateMutation(t *testing.T) {
defer cleanUp()

// The default state of an arbitrator should be StateDefault.
arbState, err := testLog.CurrentState()
arbState, err := testLog.CurrentState(nil)
if err != nil {
t.Fatalf("unable to read arb state: %v", err)
}
Expand All @@ -625,7 +625,7 @@ func TestStateMutation(t *testing.T) {
if err := testLog.CommitState(StateFullyResolved); err != nil {
t.Fatalf("unable to write state: %v", err)
}
arbState, err = testLog.CurrentState()
arbState, err = testLog.CurrentState(nil)
if err != nil {
t.Fatalf("unable to read arb state: %v", err)
}
Expand All @@ -643,7 +643,7 @@ func TestStateMutation(t *testing.T) {

// If we try to query for the state again, we should get the default
// state again.
arbState, err = testLog.CurrentState()
arbState, err = testLog.CurrentState(nil)
if err != nil {
t.Fatalf("unable to query current state: %v", err)
}
Expand Down Expand Up @@ -687,11 +687,11 @@ func TestScopeIsolation(t *testing.T) {

// Querying each log, the states should be the prior one we set, and be
// disjoint.
log1State, err := testLog1.CurrentState()
log1State, err := testLog1.CurrentState(nil)
if err != nil {
t.Fatalf("unable to read arb state: %v", err)
}
log2State, err := testLog2.CurrentState()
log2State, err := testLog2.CurrentState(nil)
if err != nil {
t.Fatalf("unable to read arb state: %v", err)
}
Expand Down Expand Up @@ -752,7 +752,7 @@ func TestCommitSetStorage(t *testing.T) {
t.Fatalf("unable to write commit set: %v", err)
}

diskCommitSet, err := testLog.FetchConfirmedCommitSet()
diskCommitSet, err := testLog.FetchConfirmedCommitSet(nil)
if err != nil {
t.Fatalf("unable to read commit set: %v", err)
}
Expand Down

0 comments on commit c024758

Please sign in to comment.