Skip to content

Commit

Permalink
Remove PendingTxs from the DAGVM interface (ava-labs#1641)
Browse files Browse the repository at this point in the history
  • Loading branch information
StephenButtolph committed Jun 22, 2023
1 parent cc73cd5 commit 2fc0d3b
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 168 deletions.
14 changes: 0 additions & 14 deletions snow/engine/avalanche/vertex/mock_vm.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 4 additions & 17 deletions snow/engine/avalanche/vertex/test_vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
)

var (
errPending = errors.New("unexpectedly called Pending")
errLinearize = errors.New("unexpectedly called Linearize")

_ LinearizableVM = (*TestVM)(nil)
Expand All @@ -24,18 +23,16 @@ var (
type TestVM struct {
block.TestVM

CantLinearize, CantPendingTxs, CantParse, CantGet bool
CantLinearize, CantParse, CantGet bool

LinearizeF func(context.Context, ids.ID) error
PendingTxsF func(context.Context) []snowstorm.Tx
ParseTxF func(context.Context, []byte) (snowstorm.Tx, error)
GetTxF func(context.Context, ids.ID) (snowstorm.Tx, error)
LinearizeF func(context.Context, ids.ID) error
ParseTxF func(context.Context, []byte) (snowstorm.Tx, error)
GetTxF func(context.Context, ids.ID) (snowstorm.Tx, error)
}

func (vm *TestVM) Default(cant bool) {
vm.TestVM.Default(cant)

vm.CantPendingTxs = cant
vm.CantParse = cant
vm.CantGet = cant
}
Expand All @@ -50,16 +47,6 @@ func (vm *TestVM) Linearize(ctx context.Context, stopVertexID ids.ID) error {
return errLinearize
}

func (vm *TestVM) PendingTxs(ctx context.Context) []snowstorm.Tx {
if vm.PendingTxsF != nil {
return vm.PendingTxsF(ctx)
}
if vm.CantPendingTxs && vm.T != nil {
require.FailNow(vm.T, errPending.Error())
}
return nil
}

func (vm *TestVM) ParseTx(ctx context.Context, b []byte) (snowstorm.Tx, error) {
if vm.ParseTxF != nil {
return vm.ParseTxF(ctx, b)
Expand Down
3 changes: 0 additions & 3 deletions snow/engine/avalanche/vertex/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,6 @@ type DAGVM interface {
block.ChainVM
Getter

// Return any transactions that have not been sent to consensus yet
PendingTxs(ctx context.Context) []snowstorm.Tx

// Convert a stream of bytes to a transaction or return an error
ParseTx(ctx context.Context, txBytes []byte) (snowstorm.Tx, error)
}
Expand Down
153 changes: 36 additions & 117 deletions vms/avm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"errors"
"fmt"
"reflect"
"time"

stdjson "encoding/json"

Expand All @@ -33,7 +32,6 @@ import (
"github.com/ava-labs/avalanchego/utils/json"
"github.com/ava-labs/avalanchego/utils/linkedhashmap"
"github.com/ava-labs/avalanchego/utils/set"
"github.com/ava-labs/avalanchego/utils/timer"
"github.com/ava-labs/avalanchego/utils/timer/mockable"
"github.com/ava-labs/avalanchego/utils/wrappers"
"github.com/ava-labs/avalanchego/version"
Expand All @@ -57,8 +55,6 @@ import (
)

const (
batchTimeout = time.Second
batchSize = 30
assetToFxCacheSize = 1024
txDeduplicatorSize = 8192
)
Expand Down Expand Up @@ -110,12 +106,6 @@ type VM struct {
// Asset ID --> Bit set with fx IDs the asset supports
assetToFxCache *cache.LRU[ids.ID, set.Bits64]

// Transaction issuing
timer *timer.Timer
batchTimeout time.Duration
txs []snowstorm.Tx
toEngine chan<- common.Message

baseDB database.Database
db *versiondb.Database

Expand Down Expand Up @@ -162,7 +152,7 @@ func (vm *VM) Initialize(
genesisBytes []byte,
_ []byte,
configBytes []byte,
toEngine chan<- common.Message,
_ chan<- common.Message,
fxs []*common.Fx,
appSender common.AppSender,
) error {
Expand Down Expand Up @@ -197,7 +187,6 @@ func (vm *VM) Initialize(

db := dbManager.Current().Database
vm.ctx = ctx
vm.toEngine = toEngine
vm.appSender = appSender
vm.baseDB = db
vm.db = versiondb.New(db)
Expand Down Expand Up @@ -248,15 +237,6 @@ func (vm *VM) Initialize(
return err
}

vm.timer = timer.NewTimer(func() {
ctx.Lock.Lock()
defer ctx.Lock.Unlock()

vm.FlushTxs()
})
go ctx.Log.RecoverAndPanic(vm.timer.Dispatch)
vm.batchTimeout = batchTimeout

vm.uniqueTxs = &cache.EvictableLRU[ids.ID, *UniqueTx]{
Size: txDeduplicatorSize,
}
Expand Down Expand Up @@ -339,16 +319,10 @@ func (vm *VM) SetState(_ context.Context, state snow.State) error {
}

func (vm *VM) Shutdown(context.Context) error {
if vm.timer == nil {
if vm.state == nil {
return nil
}

// There is a potential deadlock if the timer is about to execute a timeout.
// So, the lock must be released before stopping the timer.
vm.ctx.Lock.Unlock()
vm.timer.Stop()
vm.ctx.Lock.Lock()

errs := wrappers.Errs{}
errs.Add(
vm.state.Close(),
Expand Down Expand Up @@ -480,16 +454,30 @@ func (vm *VM) Linearize(_ context.Context, stopVertexID ids.ID, toEngine chan<-
return nil
}

func (vm *VM) PendingTxs(context.Context) []snowstorm.Tx {
vm.timer.Cancel()
func (vm *VM) ParseTx(_ context.Context, bytes []byte) (snowstorm.Tx, error) {
rawTx, err := vm.parser.ParseTx(bytes)
if err != nil {
return nil, err
}

txs := vm.txs
vm.txs = nil
return txs
}
tx := &UniqueTx{
TxCachedState: &TxCachedState{
Tx: rawTx,
},
vm: vm,
txID: rawTx.ID(),
}
if err := tx.SyntacticVerify(); err != nil {
return nil, err
}

func (vm *VM) ParseTx(_ context.Context, b []byte) (snowstorm.Tx, error) {
return vm.parseTx(b)
if tx.Status() == choices.Unknown {
vm.state.AddTx(tx.Tx)
tx.setStatus(choices.Processing)
return tx, vm.state.Commit()
}

return tx, nil
}

func (vm *VM) GetTx(_ context.Context, txID ids.ID) (snowstorm.Tx, error) {
Expand All @@ -513,60 +501,27 @@ func (vm *VM) GetTx(_ context.Context, txID ids.ID) (snowstorm.Tx, error) {
// either accepted or rejected with the appropriate status. This function will
// go out of scope when the transaction is removed from memory.
func (vm *VM) IssueTx(b []byte) (ids.ID, error) {
if !vm.bootstrapped {
if !vm.bootstrapped || vm.Builder == nil {
return ids.ID{}, errBootstrapping
}

// If the chain has been linearized, issue the tx to the network.
if vm.Builder != nil {
tx, err := vm.parser.ParseTx(b)
if err != nil {
vm.ctx.Log.Debug("failed to parse tx",
zap.Error(err),
)
return ids.ID{}, err
}

err = vm.network.IssueTx(context.TODO(), tx)
if err != nil {
vm.ctx.Log.Debug("failed to add tx to mempool",
zap.Error(err),
)
return ids.ID{}, err
}

return tx.ID(), nil
}

// TODO: After the chain is linearized, remove the following code.
tx, err := vm.parseTx(b)
tx, err := vm.parser.ParseTx(b)
if err != nil {
vm.ctx.Log.Debug("failed to parse tx",
zap.Error(err),
)
return ids.ID{}, err
}
if err := tx.verifyWithoutCacheWrites(); err != nil {

err = vm.network.IssueTx(context.TODO(), tx)
if err != nil {
vm.ctx.Log.Debug("failed to add tx to mempool",
zap.Error(err),
)
return ids.ID{}, err
}
vm.issueTx(tx)
return tx.ID(), nil
}

/*
******************************************************************************
********************************** Timer API *********************************
******************************************************************************
*/

// FlushTxs into consensus
func (vm *VM) FlushTxs() {
vm.timer.Cancel()
if len(vm.txs) != 0 {
select {
case vm.toEngine <- common.PendingTxs:
default:
vm.ctx.Log.Debug("dropping message to engine due to contention")
vm.timer.SetTimeoutIn(vm.batchTimeout)
}
}
return tx.ID(), nil
}

/*
Expand Down Expand Up @@ -638,42 +593,6 @@ func (vm *VM) initState(tx *txs.Tx) {
}
}

func (vm *VM) parseTx(bytes []byte) (*UniqueTx, error) {
rawTx, err := vm.parser.ParseTx(bytes)
if err != nil {
return nil, err
}

tx := &UniqueTx{
TxCachedState: &TxCachedState{
Tx: rawTx,
},
vm: vm,
txID: rawTx.ID(),
}
if err := tx.SyntacticVerify(); err != nil {
return nil, err
}

if tx.Status() == choices.Unknown {
vm.state.AddTx(tx.Tx)
tx.setStatus(choices.Processing)
return tx, vm.state.Commit()
}

return tx, nil
}

func (vm *VM) issueTx(tx snowstorm.Tx) {
vm.txs = append(vm.txs, tx)
switch {
case len(vm.txs) == batchSize:
vm.FlushTxs()
case len(vm.txs) == 1:
vm.timer.SetTimeoutIn(vm.batchTimeout)
}
}

// LoadUser returns:
// 1) The UTXOs that reference one or more addresses controlled by the given user
// 2) A keychain that contains this user's keys
Expand Down
2 changes: 0 additions & 2 deletions vms/metervm/vertex_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
)

type vertexMetrics struct {
pending,
parse,
parseErr,
get,
Expand All @@ -27,7 +26,6 @@ func (m *vertexMetrics) Initialize(
reg prometheus.Registerer,
) error {
errs := wrappers.Errs{}
m.pending = newAverager(namespace, "pending_txs", reg, &errs)
m.parse = newAverager(namespace, "parse_tx", reg, &errs)
m.parseErr = newAverager(namespace, "parse_tx_err", reg, &errs)
m.get = newAverager(namespace, "get_tx", reg, &errs)
Expand Down
8 changes: 0 additions & 8 deletions vms/metervm/vertex_vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,6 @@ func (vm *vertexVM) Initialize(
)
}

func (vm *vertexVM) PendingTxs(ctx context.Context) []snowstorm.Tx {
start := vm.clock.Time()
txs := vm.LinearizableVMWithEngine.PendingTxs(ctx)
end := vm.clock.Time()
vm.vertexMetrics.pending.Observe(float64(end.Sub(start)))
return txs
}

func (vm *vertexVM) ParseTx(ctx context.Context, b []byte) (snowstorm.Tx, error) {
start := vm.clock.Time()
tx, err := vm.LinearizableVMWithEngine.ParseTx(ctx, b)
Expand Down
7 changes: 0 additions & 7 deletions vms/tracedvm/vertex_vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,6 @@ func (vm *vertexVM) Initialize(
)
}

func (vm *vertexVM) PendingTxs(ctx context.Context) []snowstorm.Tx {
ctx, span := vm.tracer.Start(ctx, "vertexVM.PendingTxs")
defer span.End()

return vm.LinearizableVMWithEngine.PendingTxs(ctx)
}

func (vm *vertexVM) ParseTx(ctx context.Context, txBytes []byte) (snowstorm.Tx, error) {
ctx, span := vm.tracer.Start(ctx, "vertexVM.ParseTx", oteltrace.WithAttributes(
attribute.Int("txLen", len(txBytes)),
Expand Down

0 comments on commit 2fc0d3b

Please sign in to comment.