Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bridge: refactor p2p logic into pkg/p2p #65

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 19 additions & 1 deletion bridge/cmd/guardiand/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ import (
"syscall"

eth_common "github.com/ethereum/go-ethereum/common"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer"
"go.uber.org/zap"
"golang.org/x/sys/unix"

"github.com/certusone/wormhole/bridge/pkg/common"
"github.com/certusone/wormhole/bridge/pkg/devnet"
"github.com/certusone/wormhole/bridge/pkg/ethereum"
"github.com/certusone/wormhole/bridge/pkg/p2p"
"github.com/certusone/wormhole/bridge/pkg/processor"
gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1"
solana "github.com/certusone/wormhole/bridge/pkg/solana"
Expand Down Expand Up @@ -182,11 +184,27 @@ func main() {
// VAAs to submit to Solana
solanaVaaC := make(chan *vaa.VAA)

// Load p2p private key
var priv crypto.PrivKey
if *unsafeDevMode {
idx, err := devnet.GetDevnetIndex()
if err != nil {
logger.Fatal("Failed to parse hostname - are we running in devnet?")
}
priv = devnet.DeterministicP2PPrivKeyByIndex(int64(idx))
} else {
priv, err = getOrCreateNodeKey(logger, *nodeKeyPath)
if err != nil {
logger.Fatal("Failed to load node key", zap.Error(err))
}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test comment

// Run supervisor.
supervisor.New(rootCtx, logger, func(ctx context.Context) error {
// TODO: use a dependency injection framework like wire?

if err := supervisor.Run(ctx, "p2p", p2p(obsvC, sendC)); err != nil {
if err := supervisor.Run(ctx, "p2p", p2p.Run(
obsvC, sendC, priv, *p2pPort, *p2pNetworkID, *p2pBootstrap, *nodeName, rootCtxCancel)); err != nil {
return err
}

Expand Down
80 changes: 35 additions & 45 deletions bridge/cmd/guardiand/p2p.go → bridge/pkg/p2p/p2p.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package main
package p2p

import (
"context"
"fmt"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multiaddr"
"strings"
"time"

"github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multiaddr"

"github.com/libp2p/go-libp2p"
connmgr "github.com/libp2p/go-libp2p-connmgr"
"github.com/libp2p/go-libp2p-core/crypto"
Expand All @@ -21,33 +22,22 @@ import (
"go.uber.org/zap"
"google.golang.org/protobuf/proto"

"github.com/certusone/wormhole/bridge/pkg/devnet"
gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1"
"github.com/certusone/wormhole/bridge/pkg/supervisor"
)

func p2p(obsvC chan *gossipv1.LockupObservation, sendC chan []byte) func(ctx context.Context) error {
func Run(obsvC chan *gossipv1.LockupObservation,
sendC chan []byte,
priv crypto.PrivKey,
port uint,
networkID string,
bootstrapPeers string,
nodeName string,
rootCtxCancel context.CancelFunc) func(ctx context.Context) error {

return func(ctx context.Context) (re error) {
logger := supervisor.Logger(ctx)

var priv crypto.PrivKey
var err error

if *unsafeDevMode {
idx, err2 := devnet.GetDevnetIndex()
if err2 != nil {
logger.Fatal("Failed to parse hostname - are we running in devnet?")
}
priv = devnet.DeterministicP2PPrivKeyByIndex(int64(idx))
} else {
priv, err = getOrCreateNodeKey(logger, *nodeKeyPath)
if err != nil {
return fmt.Errorf("failed to load node key: %w", err)
}
}

var idht *dht.IpfsDHT

h, err := libp2p.New(ctx,
// Use the keypair we generated
libp2p.Identity(priv),
Expand All @@ -57,8 +47,8 @@ func p2p(obsvC chan *gossipv1.LockupObservation, sendC chan []byte) func(ctx con
// Listen on QUIC only.
// TODO(leo): is this more or less stable than using both TCP and QUIC transports?
// https://github.com/libp2p/go-libp2p/issues/688
fmt.Sprintf("/ip4/0.0.0.0/udp/%d/quic", *p2pPort),
fmt.Sprintf("/ip6/::/udp/%d/quic", *p2pPort),
fmt.Sprintf("/ip4/0.0.0.0/udp/%d/quic", port),
fmt.Sprintf("/ip6/::/udp/%d/quic", port),
),

// Enable TLS security as the only security protocol.
Expand All @@ -78,9 +68,9 @@ func p2p(obsvC chan *gossipv1.LockupObservation, sendC chan []byte) func(ctx con
// Let this host use the DHT to find other hosts
libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
// TODO(leo): Persistent data store (i.e. address book)
idht, err = dht.New(ctx, h, dht.Mode(dht.ModeServer),
idht, err := dht.New(ctx, h, dht.Mode(dht.ModeServer),
// TODO(leo): This intentionally makes us incompatible with the global IPFS DHT
dht.ProtocolPrefix(protocol.ID("/"+*p2pNetworkID)),
dht.ProtocolPrefix(protocol.ID("/"+networkID)),
)
return idht, err
}),
Expand All @@ -96,9 +86,9 @@ func p2p(obsvC chan *gossipv1.LockupObservation, sendC chan []byte) func(ctx con
rootCtxCancel()
}()

logger.Info("Connecting to bootstrap peers", zap.String("bootstrap_peers", *p2pBootstrap))
logger.Info("Connecting to bootstrap peers", zap.String("bootstrap_peers", bootstrapPeers))

topic := fmt.Sprintf("%s/%s", *p2pNetworkID, "broadcast")
topic := fmt.Sprintf("%s/%s", networkID, "broadcast")

logger.Info("Subscribing pubsub topic", zap.String("topic", topic))
ps, err := pubsub.NewGossipSub(ctx, h)
Expand All @@ -122,9 +112,9 @@ func p2p(obsvC chan *gossipv1.LockupObservation, sendC chan []byte) func(ctx con
// the service and have supervisor retry it.
successes := 0
// Are we a bootstrap node? If so, it's okay to not have any peers.
bootstrap_node := false
bootstrapNode := false

for _, addr := range strings.Split(*p2pBootstrap, ",") {
for _, addr := range strings.Split(bootstrapPeers, ",") {
if addr == "" {
continue
}
Expand All @@ -141,7 +131,7 @@ func p2p(obsvC chan *gossipv1.LockupObservation, sendC chan []byte) func(ctx con

if pi.ID == h.ID() {
logger.Info("We're a bootstrap node")
bootstrap_node = true
bootstrapNode = true
continue
}

Expand All @@ -153,8 +143,8 @@ func p2p(obsvC chan *gossipv1.LockupObservation, sendC chan []byte) func(ctx con
}

// TODO: continually reconnect to bootstrap nodes?
if successes == 0 && !bootstrap_node {
return fmt.Errorf("Failed to connect to any bootstrap peer")
if successes == 0 && !bootstrapNode {
return fmt.Errorf("failed to connect to any bootstrap peer")
} else {
logger.Info("Connected to bootstrap peers", zap.Int("num", successes))
}
Expand All @@ -174,7 +164,7 @@ func p2p(obsvC chan *gossipv1.LockupObservation, sendC chan []byte) func(ctx con
case <-tick.C:
msg := gossipv1.GossipMessage{Message: &gossipv1.GossipMessage_Heartbeat{
Heartbeat: &gossipv1.Heartbeat{
NodeName: *nodeName,
NodeName: nodeName,
Counter: ctr,
Timestamp: time.Now().UnixNano(),
}}}
Expand Down Expand Up @@ -207,43 +197,43 @@ func p2p(obsvC chan *gossipv1.LockupObservation, sendC chan []byte) func(ctx con
}()

for {
envl, err := sub.Next(ctx)
envelope, err := sub.Next(ctx)
if err != nil {
return fmt.Errorf("failed to receive pubsub message: %w", err)
}

var msg gossipv1.GossipMessage
err = proto.Unmarshal(envl.Data, &msg)
err = proto.Unmarshal(envelope.Data, &msg)
if err != nil {
logger.Info("received invalid message",
zap.String("data", string(envl.Data)),
zap.String("from", envl.GetFrom().String()))
zap.String("data", string(envelope.Data)),
zap.String("from", envelope.GetFrom().String()))
continue
}

if envl.GetFrom() == h.ID() {
if envelope.GetFrom() == h.ID() {
logger.Debug("received message from ourselves, ignoring",
zap.Any("payload", msg.Message))
continue
}

logger.Debug("received message",
zap.Any("payload", msg.Message),
zap.Binary("raw", envl.Data),
zap.String("from", envl.GetFrom().String()))
zap.Binary("raw", envelope.Data),
zap.String("from", envelope.GetFrom().String()))

switch m := msg.Message.(type) {
case *gossipv1.GossipMessage_Heartbeat:
logger.Info("heartbeat received",
zap.Any("value", m.Heartbeat),
zap.String("from", envl.GetFrom().String()))
zap.String("from", envelope.GetFrom().String()))
case *gossipv1.GossipMessage_LockupObservation:
obsvC <- m.LockupObservation
default:
logger.Warn("received unknown message type (running outdated software?)",
zap.Any("payload", msg.Message),
zap.Binary("raw", envl.Data),
zap.String("from", envl.GetFrom().String()))
zap.Binary("raw", envelope.Data),
zap.String("from", envelope.GetFrom().String()))
}
}
}
Expand Down