Skip to content

Commit

Permalink
Add X-chain networking implementation (ava-labs#2731)
Browse files Browse the repository at this point in the history
Co-authored-by: Dan Laine <daniel.laine@avalabs.org>
Co-authored-by: Chloe <chloe.nguyen@avalabs.org>
  • Loading branch information
3 people authored Mar 16, 2023
1 parent ff53418 commit 8ceb523
Show file tree
Hide file tree
Showing 3 changed files with 683 additions and 0 deletions.
138 changes: 138 additions & 0 deletions vms/avm/network/atomic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright (C) 2019-2022, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package network

import (
"context"
"time"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/utils"
)

var _ Atomic = (*atomic)(nil)

type Atomic interface {
common.AppHandler

Set(common.AppHandler)
}

type atomic struct {
handler utils.Atomic[common.AppHandler]
}

func NewAtomic(h common.AppHandler) Atomic {
a := &atomic{}
a.handler.Set(h)
return a
}

func (a *atomic) CrossChainAppRequest(
ctx context.Context,
chainID ids.ID,
requestID uint32,
deadline time.Time,
msg []byte,
) error {
h := a.handler.Get()
return h.CrossChainAppRequest(
ctx,
chainID,
requestID,
deadline,
msg,
)
}

func (a *atomic) CrossChainAppRequestFailed(
ctx context.Context,
chainID ids.ID,
requestID uint32,
) error {
h := a.handler.Get()
return h.CrossChainAppRequestFailed(
ctx,
chainID,
requestID,
)
}

func (a *atomic) CrossChainAppResponse(
ctx context.Context,
chainID ids.ID,
requestID uint32,
msg []byte,
) error {
h := a.handler.Get()
return h.CrossChainAppResponse(
ctx,
chainID,
requestID,
msg,
)
}

func (a *atomic) AppRequest(
ctx context.Context,
nodeID ids.NodeID,
requestID uint32,
deadline time.Time,
msg []byte,
) error {
h := a.handler.Get()
return h.AppRequest(
ctx,
nodeID,
requestID,
deadline,
msg,
)
}

func (a *atomic) AppRequestFailed(
ctx context.Context,
nodeID ids.NodeID,
requestID uint32,
) error {
h := a.handler.Get()
return h.AppRequestFailed(
ctx,
nodeID,
requestID,
)
}

func (a *atomic) AppResponse(
ctx context.Context,
nodeID ids.NodeID,
requestID uint32,
msg []byte,
) error {
h := a.handler.Get()
return h.AppResponse(
ctx,
nodeID,
requestID,
msg,
)
}

func (a *atomic) AppGossip(
ctx context.Context,
nodeID ids.NodeID,
msg []byte,
) error {
h := a.handler.Get()
return h.AppGossip(
ctx,
nodeID,
msg,
)
}

func (a *atomic) Set(h common.AppHandler) {
a.handler.Set(h)
}
202 changes: 202 additions & 0 deletions vms/avm/network/network.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
// Copyright (C) 2019-2022, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package network

import (
"context"
"sync"

"go.uber.org/zap"

"github.com/ava-labs/avalanchego/cache"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/vms/avm/blocks/executor"
"github.com/ava-labs/avalanchego/vms/avm/txs"
"github.com/ava-labs/avalanchego/vms/avm/txs/mempool"
"github.com/ava-labs/avalanchego/vms/components/message"
)

// We allow [recentTxsCacheSize] to be fairly large because we only store hashes
// in the cache, not entire transactions.
const recentTxsCacheSize = 512

var _ Network = (*network)(nil)

type Network interface {
common.AppHandler

// IssueTx verifies the transaction at the currently preferred state, adds
// it to the mempool, and gossips it to the network.
//
// Invariant: Assumes the context lock is held.
IssueTx(context.Context, *txs.Tx) error
}

type network struct {
// We embed a noop handler for all unhandled messages
common.AppHandler

ctx *snow.Context
parser txs.Parser
manager executor.Manager
mempool mempool.Mempool
appSender common.AppSender

// gossip related attributes
recentTxsLock sync.Mutex
recentTxs *cache.LRU[ids.ID, struct{}]
}

func New(
ctx *snow.Context,
parser txs.Parser,
manager executor.Manager,
mempool mempool.Mempool,
appSender common.AppSender,
) Network {
return &network{
AppHandler: common.NewNoOpAppHandler(ctx.Log),

ctx: ctx,
parser: parser,
manager: manager,
mempool: mempool,
appSender: appSender,

recentTxs: &cache.LRU[ids.ID, struct{}]{
Size: recentTxsCacheSize,
},
}
}

func (n *network) AppGossip(ctx context.Context, nodeID ids.NodeID, msgBytes []byte) error {
n.ctx.Log.Debug("called AppGossip message handler",
zap.Stringer("nodeID", nodeID),
zap.Int("messageLen", len(msgBytes)),
)

msgIntf, err := message.Parse(msgBytes)
if err != nil {
n.ctx.Log.Debug("dropping AppGossip message",
zap.String("reason", "failed to parse message"),
)
return nil
}

msg, ok := msgIntf.(*message.Tx)
if !ok {
n.ctx.Log.Debug("dropping unexpected message",
zap.Stringer("nodeID", nodeID),
)
return nil
}

tx, err := n.parser.ParseTx(msg.Tx)
if err != nil {
n.ctx.Log.Verbo("received invalid tx",
zap.Stringer("nodeID", nodeID),
zap.Binary("tx", msg.Tx),
zap.Error(err),
)
return nil
}

// We need to grab the context lock here to avoid racy behavior with
// transaction verification + mempool modifications.
n.ctx.Lock.Lock()
err = n.issueTx(tx)
n.ctx.Lock.Unlock()
if err == nil {
txID := tx.ID()
n.gossipTx(ctx, txID, msgBytes)
}
return nil
}

func (n *network) IssueTx(ctx context.Context, tx *txs.Tx) error {
if err := n.issueTx(tx); err != nil {
return err
}

txBytes := tx.Bytes()
msg := &message.Tx{
Tx: txBytes,
}
msgBytes, err := message.Build(msg)
if err != nil {
return err
}

txID := tx.ID()
n.gossipTx(ctx, txID, msgBytes)
return nil
}

// returns nil if the tx is in the mempool
func (n *network) issueTx(tx *txs.Tx) error {
txID := tx.ID()
if n.mempool.Has(txID) {
// The tx is already in the mempool
return nil
}

if reason := n.mempool.GetDropReason(txID); reason != nil {
// If the tx is being dropped - just ignore it
//
// TODO: Should we allow re-verification of the transaction even if it
// failed previously?
return reason
}

// Verify the tx at the currently preferred state
if err := n.manager.VerifyTx(tx); err != nil {
n.ctx.Log.Debug("tx failed verification",
zap.Stringer("txID", txID),
zap.Error(err),
)

n.mempool.MarkDropped(txID, err)
return err
}

if err := n.mempool.Add(tx); err != nil {
n.ctx.Log.Debug("tx failed to be added to the mempool",
zap.Stringer("txID", txID),
zap.Error(err),
)

n.mempool.MarkDropped(txID, err)
return err
}

n.mempool.RequestBuildBlock()
return nil
}

func (n *network) gossipTx(ctx context.Context, txID ids.ID, msgBytes []byte) {
// This lock is just to ensure there isn't racy behavior between checking if
// the tx was gossiped and marking the tx as gossiped.
n.recentTxsLock.Lock()
_, has := n.recentTxs.Get(txID)
n.recentTxs.Put(txID, struct{}{})
n.recentTxsLock.Unlock()

// Don't gossip a transaction if it has been recently gossiped.
if has {
return
}

n.ctx.Log.Debug("gossiping tx",
zap.Stringer("txID", txID),
)

if err := n.appSender.SendAppGossip(ctx, msgBytes); err != nil {
n.ctx.Log.Error("failed to gossip tx",
zap.Stringer("txID", txID),
zap.Error(err),
)
}
}
Loading

0 comments on commit 8ceb523

Please sign in to comment.