Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

go/runtime/txpool: Limit outstanding transactions per sender #4665

Merged
merged 3 commits into from Apr 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions .changelog/4665.breaking.md
@@ -0,0 +1,4 @@
go/runtime: Mandate the use of runtime schedule control

Runtimes that do not support the schedule control feature will no longer work
with executor nodes.
1 change: 1 addition & 0 deletions .changelog/4665.feature.md
@@ -0,0 +1 @@
go/runtime/txpool: Limit outstanding transactions per sender
1 change: 1 addition & 0 deletions go/go.mod
Expand Up @@ -27,6 +27,7 @@ require (
github.com/dgraph-io/badger/v3 v3.2103.2
github.com/eapache/channels v1.1.0
github.com/fxamacker/cbor/v2 v2.4.0
github.com/gammazero/deque v0.1.1
github.com/go-kit/log v0.2.0
github.com/golang/protobuf v1.5.2
github.com/golang/snappy v0.0.4
Expand Down
2 changes: 2 additions & 0 deletions go/go.sum
Expand Up @@ -290,6 +290,8 @@ github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWp
github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU=
github.com/fxamacker/cbor/v2 v2.4.0 h1:ri0ArlOR+5XunOP8CRUowT0pSJOwhW098ZCUyskZD88=
github.com/fxamacker/cbor/v2 v2.4.0/go.mod h1:TA1xS00nchWmaBnEIxPSE5oHLuJBAVvqrtAnWBwBCVo=
github.com/gammazero/deque v0.1.1 h1:xRVkDuSvDmFuMGf3IquHuRc2jlL0+v/WpFCWaauzwbE=
github.com/gammazero/deque v0.1.1/go.mod h1:KQw7vFau1hHuM8xmI9RbgKFbAsQFWmBpqQ2KenFLk6M=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI=
github.com/gin-gonic/gin v1.5.0/go.mod h1:Nd6IXA8m5kNZdNEHMBd93KT+mdY3+bewLgRvmCsR2Do=
Expand Down
28 changes: 0 additions & 28 deletions go/runtime/host/helpers.go
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"

beacon "github.com/oasisprotocol/oasis-core/go/beacon/api"
"github.com/oasisprotocol/oasis-core/go/common/cbor"
"github.com/oasisprotocol/oasis-core/go/common/errors"
consensus "github.com/oasisprotocol/oasis-core/go/consensus/api"
"github.com/oasisprotocol/oasis-core/go/roothash/api/block"
Expand Down Expand Up @@ -46,14 +45,6 @@ type RichRuntime interface {
method string,
args []byte,
) ([]byte, error)

// QueryBatchLimits requests the runtime to answer the batch limits query.
QueryBatchLimits(
ctx context.Context,
rb *block.Block,
lb *consensus.LightBlock,
epoch beacon.EpochTime,
) (map[transaction.Weight]uint64, error)
}

type richRuntime struct {
Expand Down Expand Up @@ -126,25 +117,6 @@ func (r *richRuntime) Query(
return resp.RuntimeQueryResponse.Data, nil
}

// Implements RichRuntime.
func (r *richRuntime) QueryBatchLimits(
ctx context.Context,
rb *block.Block,
lb *consensus.LightBlock,
epoch beacon.EpochTime,
) (map[transaction.Weight]uint64, error) {
resp, err := r.Query(ctx, rb, lb, epoch, 0, protocol.MethodQueryBatchWeightLimits, cbor.Marshal(nil))
if err != nil {
return nil, err
}

var weightLimits map[transaction.Weight]uint64
if err = cbor.Unmarshal(resp, &weightLimits); err != nil {
return nil, errors.WithContext(ErrInternal, fmt.Sprintf("malformed runtime response: %v", err))
}
return weightLimits, nil
}

// NewRichRuntime creates a new higher-level wrapper for a given runtime. It provides additional
// convenience functions for talking with a runtime.
func NewRichRuntime(rt Runtime) RichRuntime {
Expand Down
38 changes: 29 additions & 9 deletions go/runtime/host/mock/mock.go
Expand Up @@ -49,7 +49,11 @@ func (r *runtime) GetInfo(ctx context.Context) (rsp *protocol.RuntimeInfoRespons
return &protocol.RuntimeInfoResponse{
ProtocolVersion: version.RuntimeHostProtocol,
RuntimeVersion: version.MustFromString("0.0.0"),
Features: nil,
Features: &protocol.Features{
ScheduleControl: &protocol.FeatureScheduleControl{
InitialBatchSize: 100,
},
},
}, nil
}

Expand All @@ -73,10 +77,28 @@ func (r *runtime) Call(ctx context.Context, body *protocol.Body) (*protocol.Body
tree := transaction.NewTree(nil, emptyRoot)
defer tree.Close()

for i := 0; i < len(rq.Inputs); i++ {
// Generate input root.
var txHashes []hash.Hash
for _, tx := range rq.Inputs {
err := tree.AddTransaction(ctx, transaction.Transaction{
Input: rq.Inputs[0],
Output: rq.Inputs[0],
Input: tx,
}, tags)
if err != nil {
return nil, fmt.Errorf("(mock) failed to create I/O tree: %w", err)
}

txHashes = append(txHashes, hash.NewFromBytes(tx))
}
txInputWriteLog, txInputRoot, err := tree.Commit(ctx)
if err != nil {
return nil, fmt.Errorf("(mock) failed to create I/O tree: %w", err)
}

// Generate outputs.
for _, tx := range rq.Inputs {
err = tree.AddTransaction(ctx, transaction.Transaction{
Input: tx,
Output: tx,
}, tags)
if err != nil {
return nil, fmt.Errorf("(mock) failed to create I/O tree: %w", err)
Expand Down Expand Up @@ -104,6 +126,9 @@ func (r *runtime) Call(ctx context.Context, body *protocol.Body) (*protocol.Body
},
IOWriteLog: ioWriteLog,
},
TxHashes: txHashes,
TxInputRoot: txInputRoot,
TxInputWriteLog: txInputWriteLog,
// No RakSig in mock response.
}}, nil
case body.RuntimeCheckTxBatchRequest != nil:
Expand Down Expand Up @@ -135,11 +160,6 @@ func (r *runtime) Call(ctx context.Context, body *protocol.Body) (*protocol.Body
rq := body.RuntimeQueryRequest

switch rq.Method {
// Handle Batch Weight Limits request.
case protocol.MethodQueryBatchWeightLimits:
return &protocol.Body{RuntimeQueryResponse: &protocol.RuntimeQueryResponse{
Data: cbor.Marshal(map[transaction.Weight]uint64{}),
}}, nil
default:
return &protocol.Body{RuntimeQueryResponse: &protocol.RuntimeQueryResponse{
Data: cbor.Marshal(rq.Method + " world at:" + fmt.Sprintf("%d", rq.ConsensusBlock.Height)),
Expand Down
35 changes: 16 additions & 19 deletions go/runtime/host/protocol/types.go
Expand Up @@ -6,6 +6,7 @@ import (

beacon "github.com/oasisprotocol/oasis-core/go/beacon/api"
"github.com/oasisprotocol/oasis-core/go/common"
"github.com/oasisprotocol/oasis-core/go/common/cbor"
"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
"github.com/oasisprotocol/oasis-core/go/common/crypto/signature"
"github.com/oasisprotocol/oasis-core/go/common/errors"
Expand All @@ -20,9 +21,6 @@ import (
storage "github.com/oasisprotocol/oasis-core/go/storage/api"
)

// MethodQueryBatchWeightLimits is the name of the runtime batch weight limits query method.
const MethodQueryBatchWeightLimits = "internal.BatchWeightLimits"

// NOTE: Bump RuntimeProtocol version in go/common/version if you
// change any of the structures below.

Expand Down Expand Up @@ -258,27 +256,22 @@ type CheckTxMetadata struct {
// Priority is the transaction's priority.
Priority uint64 `json:"priority,omitempty"`

// Weight are runtime specific transaction weights.
Weights map[transaction.Weight]uint64 `json:"weights,omitempty"`
// Sender is the unique identifier of the transaction sender.
Sender []byte `json:"sender,omitempty"`
// SenderSeq is the per-sender sequence number of the transaction.
SenderSeq uint64 `json:"sender_seq,omitempty"`

// Fields below are deprecated to avoid breaking protocol changes. They may be removed once
// all runtimes stop sending those fields.

Deprecated1 cbor.RawMessage `json:"weights,omitempty"`
}

// IsSuccess returns true if transaction execution was successful.
func (r *CheckTxResult) IsSuccess() bool {
return r.Error.Code == errors.CodeNoError
}

// ToCheckedTransaction creates CheckedTransaction from CheckTx result.
//
// Assumes a successful result.
func (r *CheckTxResult) ToCheckedTransaction(rawTx []byte) *transaction.CheckedTransaction {
switch r.Meta {
case nil:
return transaction.NewCheckedTransaction(rawTx, 0, nil)
default:
return transaction.NewCheckedTransaction(rawTx, r.Meta.Priority, r.Meta.Weights)
}
}

// RuntimeCheckTxBatchResponse is a worker check tx batch response message body.
type RuntimeCheckTxBatchResponse struct {
// Batch of CheckTx results corresponding to transactions passed on input.
Expand Down Expand Up @@ -352,8 +345,7 @@ type RuntimeExecuteTxBatchRequest struct {

// RuntimeExecuteTxBatchResponse is a worker execute tx batch response message body.
type RuntimeExecuteTxBatchResponse struct {
Batch ComputedBatch `json:"batch"`
BatchWeightLimits map[transaction.Weight]uint64 `json:"batch_weight_limits"`
Batch ComputedBatch `json:"batch"`

// TxHashes are the transaction hashes of the included batch.
TxHashes []hash.Hash `json:"tx_hashes,omitempty"`
Expand All @@ -364,6 +356,11 @@ type RuntimeExecuteTxBatchResponse struct {
TxInputRoot hash.Hash `json:"tx_input_root,omitempty"`
// TxInputWriteLog is the write log for generating transaction inputs.
TxInputWriteLog storage.WriteLog `json:"tx_input_write_log,omitempty"`

// Fields below are deprecated to avoid breaking protocol changes. They may be removed once
// all runtimes stop sending those fields.

Deprecated1 cbor.RawMessage `json:"batch_weight_limits,omitempty"`
}

// RuntimeKeyManagerPolicyUpdateRequest is a runtime key manager policy request
Expand Down
98 changes: 0 additions & 98 deletions go/runtime/transaction/transaction.go
Expand Up @@ -149,104 +149,6 @@ type outputArtifacts struct {
Output []byte
}

// Weight is the transaction weight type.
type Weight string

const (
// WeightConsensusMessages is the consensus messages weight key.
WeightConsensusMessages = Weight("consensus_messages")
// WeightSizeBytes is the transaction byte size weight key.
WeightSizeBytes = Weight("size_bytes")
// WeightCount is the transaction count weight key.
WeightCount = Weight("count")
)

// IsCustom returns if the weight is a custom runtime weight.
func (w Weight) IsCustom() bool {
switch w {
case WeightConsensusMessages, WeightSizeBytes, WeightCount:
return false
default:
return true
}
}

// CheckedTransaction is a checked transaction to be scheduled.
type CheckedTransaction struct {
// tx represents the raw binary transaction data.
tx []byte

// priority defines the transaction's priority as specified by the runtime
// in the CheckTx response.
priority uint64
// weights defines the transaction's runtime specific weights as specified
// in the CheckTx response.
weights map[Weight]uint64

hash hash.Hash
}

// String returns string representation of the raw transaction data.
func (t *CheckedTransaction) String() string {
return fmt.Sprintf("CheckedTransaction{hash: %v, priority: %v, weights: %v}", t.hash, t.priority, t.weights)
}

// RawCheckedTransactions creates a new CheckedTransactions from the raw bytes.
func RawCheckedTransaction(raw []byte) *CheckedTransaction {
return NewCheckedTransaction(raw, 0, nil)
}

// NewCheckedTransaction creates a new CheckedTransactions from the provided
// bytes, priority and weights.
func NewCheckedTransaction(tx []byte, priority uint64, weights map[Weight]uint64) *CheckedTransaction {
if weights == nil {
weights = make(map[Weight]uint64)
}
checkedTx := &CheckedTransaction{
tx: tx,
priority: priority,
weights: weights,
hash: hash.NewFromBytes(tx),
}
checkedTx.weights[WeightSizeBytes] = checkedTx.Size()
checkedTx.weights[WeightCount] = 1

return checkedTx
}

// Priority returns the transaction priority.
func (t *CheckedTransaction) Priority() uint64 {
return t.priority
}

// Weight returns the specific transaction weight.
func (t *CheckedTransaction) Weight(w Weight) uint64 {
return t.weights[w]
}

// Weights returns all transaction transaction weights.
//
// To avoid unnecessary allocations the internal weights map is returned.
// The caller should not modify the map.
func (t *CheckedTransaction) Weights() map[Weight]uint64 {
return t.weights
}

// Hash returns the hash of the transaction binary data.
func (t *CheckedTransaction) Hash() hash.Hash {
return t.hash
}

// Size returns the size (in bytes) of the raw transaction data.
func (t *CheckedTransaction) Size() uint64 {
return uint64(len(t.tx))
}

// Raw returns the raw transaction data.
func (t *CheckedTransaction) Raw() []byte {
return t.tx
}

// Transaction is an executed (or executing) transaction.
//
// This is the transaction representation used for convenience as a collection
Expand Down