Skip to content

Commit

Permalink
Filter Peers Properly With Connection Gater (#6251)
Browse files Browse the repository at this point in the history
* add new filtering
* add new tests
* gaz
* shay's review
* Merge branch 'master' into fixFiltering
  • Loading branch information
nisdas committed Jun 14, 2020
1 parent 933ab68 commit 1dfeb64
Show file tree
Hide file tree
Showing 6 changed files with 327 additions and 107 deletions.
3 changes: 3 additions & 0 deletions beacon-chain/p2p/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ go_library(
"@com_github_libp2p_go_libp2p_pubsub//:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//pb:go_default_library",
"@com_github_multiformats_go_multiaddr//:go_default_library",
"@com_github_multiformats_go_multiaddr_net//:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
Expand All @@ -95,6 +96,7 @@ go_test(
srcs = [
"addr_factory_test.go",
"broadcaster_test.go",
"connection_gater_test.go",
"dial_relay_node_test.go",
"discovery_test.go",
"fork_test.go",
Expand All @@ -115,6 +117,7 @@ go_test(
"//beacon-chain/core/feed:go_default_library",
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/p2p/peers:go_default_library",
"//beacon-chain/p2p/testing:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//proto/testing:go_default_library",
Expand Down
69 changes: 64 additions & 5 deletions beacon-chain/p2p/connection_gater.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package p2p

import (
"net"

"github.com/libp2p/go-libp2p-core/control"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multiaddr"
filter "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
"github.com/sirupsen/logrus"
)

Expand All @@ -15,8 +19,8 @@ func (s *Service) InterceptPeerDial(p peer.ID) (allow bool) {

// InterceptAddrDial tests whether we're permitted to dial the specified
// multiaddr for the given peer.
func (s *Service) InterceptAddrDial(peer.ID, multiaddr.Multiaddr) (allow bool) {
return true
func (s *Service) InterceptAddrDial(_ peer.ID, m multiaddr.Multiaddr) (allow bool) {
return filterConnections(s.addrFilter, m)
}

// InterceptAccept tests whether an incipient inbound connection is allowed.
Expand All @@ -26,16 +30,71 @@ func (s *Service) InterceptAccept(n network.ConnMultiaddrs) (allow bool) {
"reason": "at peer limit"}).Trace("Not accepting inbound dial")
return false
}
return true
return filterConnections(s.addrFilter, n.RemoteMultiaddr())
}

// InterceptSecured tests whether a given connection, now authenticated,
// is allowed.
func (s *Service) InterceptSecured(network.Direction, peer.ID, network.ConnMultiaddrs) (allow bool) {
func (s *Service) InterceptSecured(_ network.Direction, _ peer.ID, n network.ConnMultiaddrs) (allow bool) {
return true
}

// InterceptUpgraded tests whether a fully capable connection is allowed.
func (s *Service) InterceptUpgraded(network.Conn) (allow bool, reason control.DisconnectReason) {
func (s *Service) InterceptUpgraded(n network.Conn) (allow bool, reason control.DisconnectReason) {
return true, 0
}

// configureFilter looks at the provided allow lists and
// deny lists to appropriately create a filter.
func configureFilter(cfg *Config) (*filter.Filters, error) {
addrFilter := filter.NewFilters()
// Configure from provided allow list in the config.
if cfg.AllowListCIDR != "" {
_, ipnet, err := net.ParseCIDR(cfg.AllowListCIDR)
if err != nil {
return nil, err
}
addrFilter.AddFilter(*ipnet, filter.ActionAccept)
}
// Configure from provided deny list in the config.
if len(cfg.DenyListCIDR) > 0 {
for _, cidr := range cfg.DenyListCIDR {
_, ipnet, err := net.ParseCIDR(cidr)
if err != nil {
return nil, err
}
addrFilter.AddFilter(*ipnet, filter.ActionDeny)
}
}
return addrFilter, nil
}

// filterConnections checks the appropriate ip subnets from our
// filter and decides what to do with them. By default libp2p
// accepts all incoming dials, so if we have an allow list
// we will reject all inbound dials except for those in the
// appropriate ip subnets.
func filterConnections(f *filter.Filters, a filter.Multiaddr) bool {
acceptedNets := f.FiltersForAction(filter.ActionAccept)
restrictConns := len(acceptedNets) != 0

// If we have an allow list added in, we by default reject all
// connection attempts except for those coming in from the
// appropriate ip subnets.
if restrictConns {
ip, err := manet.ToIP(a)
if err != nil {
log.Tracef("Multiaddress has invalid ip: %v", err)
return false
}
found := false
for _, ipnet := range acceptedNets {
if ipnet.Contains(ip) {
found = true
break
}
}
return found
}
return !f.AddrBlocked(a)
}
253 changes: 253 additions & 0 deletions beacon-chain/p2p/connection_gater_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
package p2p

import (
"context"
"fmt"
"testing"

"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
)

func TestPeer_AtMaxLimit(t *testing.T) {
// create host and remote peer
ipAddr, pkey := createAddrAndPrivKey(t)
ipAddr2, pkey2 := createAddrAndPrivKey(t)

listen, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ipAddr, 2000))
if err != nil {
t.Fatalf("Failed to p2p listen: %v", err)
}
s := &Service{}
s.peers = peers.NewStatus(3)
s.cfg = &Config{MaxPeers: 0}
s.addrFilter, err = configureFilter(&Config{})
if err != nil {
t.Fatal(err)
}
h1, err := libp2p.New(context.Background(), []libp2p.Option{privKeyOption(pkey), libp2p.ListenAddrs(listen), libp2p.ConnectionGater(s)}...)
if err != nil {
t.Fatal(err)
}
s.host = h1
defer func() {
err := h1.Close()
if err != nil {
t.Fatal(err)
}
}()

// create alternate host
listen, err = multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ipAddr2, 3000))
if err != nil {
t.Fatalf("Failed to p2p listen: %v", err)
}
h2, err := libp2p.New(context.Background(), []libp2p.Option{privKeyOption(pkey2), libp2p.ListenAddrs(listen)}...)
if err != nil {
t.Fatal(err)
}
defer func() {
err := h2.Close()
if err != nil {
t.Fatal(err)
}
}()
multiAddress, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d/p2p/%s", ipAddr, 2000, h1.ID()))
addrInfo, err := peer.AddrInfoFromP2pAddr(multiAddress)
if err != nil {
t.Fatal(err)
}
err = h2.Connect(context.Background(), *addrInfo)
if err == nil {
t.Error("Wanted connection to fail with max peer")
}
}

func TestPeer_BelowMaxLimit(t *testing.T) {
// create host and remote peer
ipAddr, pkey := createAddrAndPrivKey(t)
ipAddr2, pkey2 := createAddrAndPrivKey(t)

listen, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ipAddr, 2000))
if err != nil {
t.Fatalf("Failed to p2p listen: %v", err)
}
s := &Service{}
s.peers = peers.NewStatus(3)
s.cfg = &Config{MaxPeers: 1}
s.addrFilter, err = configureFilter(&Config{})
if err != nil {
t.Fatal(err)
}
h1, err := libp2p.New(context.Background(), []libp2p.Option{privKeyOption(pkey), libp2p.ListenAddrs(listen), libp2p.ConnectionGater(s)}...)
if err != nil {
t.Fatal(err)
}
s.host = h1
defer func() {
err := h1.Close()
if err != nil {
t.Fatal(err)
}
}()

// create alternate host
listen, err = multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ipAddr2, 3000))
if err != nil {
t.Fatalf("Failed to p2p listen: %v", err)
}
h2, err := libp2p.New(context.Background(), []libp2p.Option{privKeyOption(pkey2), libp2p.ListenAddrs(listen)}...)
if err != nil {
t.Fatal(err)
}
defer func() {
err := h2.Close()
if err != nil {
t.Fatal(err)
}
}()
multiAddress, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d/p2p/%s", ipAddr, 2000, h1.ID()))
addrInfo, err := peer.AddrInfoFromP2pAddr(multiAddress)
if err != nil {
t.Fatal(err)
}
err = h2.Connect(context.Background(), *addrInfo)
if err != nil {
t.Errorf("Wanted connection to succeed: %v", err)
}
}

func TestPeerAllowList(t *testing.T) {
// create host with allow list
ipAddr, pkey := createAddrAndPrivKey(t)
ipAddr2, pkey2 := createAddrAndPrivKey(t)

// use unattainable subnet, which will lead to
// peer rejecting all peers, except for those
// from that subnet.
cidr := "202.35.89.12/16"

listen, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ipAddr, 2000))
if err != nil {
t.Fatalf("Failed to p2p listen: %v", err)
}
s := &Service{}
s.addrFilter, err = configureFilter(&Config{AllowListCIDR: cidr})
if err != nil {
t.Fatal(err)
}
h1, err := libp2p.New(context.Background(), []libp2p.Option{privKeyOption(pkey), libp2p.ListenAddrs(listen), libp2p.ConnectionGater(s)}...)
if err != nil {
t.Fatal(err)
}
s.host = h1
defer func() {
err := h1.Close()
if err != nil {
t.Fatal(err)
}
}()

// create alternate host
listen, err = multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ipAddr2, 3000))
if err != nil {
t.Fatalf("Failed to p2p listen: %v", err)
}
h2, err := libp2p.New(context.Background(), []libp2p.Option{privKeyOption(pkey2), libp2p.ListenAddrs(listen)}...)
if err != nil {
t.Fatal(err)
}
defer func() {
err := h2.Close()
if err != nil {
t.Fatal(err)
}
}()
multiAddress, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d/p2p/%s", ipAddr2, 3000, h2.ID()))
addrInfo, err := peer.AddrInfoFromP2pAddr(multiAddress)
if err != nil {
t.Fatal(err)
}
err = h1.Connect(context.Background(), *addrInfo)
if err == nil {
t.Error("Wanted connection to fail with allow list")
}
}

func TestPeerDenyList(t *testing.T) {
// create host with deny list
ipAddr, pkey := createAddrAndPrivKey(t)
ipAddr2, pkey2 := createAddrAndPrivKey(t)

mask := ipAddr2.DefaultMask()
ones, _ := mask.Size()
maskedIP := ipAddr2.Mask(mask)
cidr := maskedIP.String() + fmt.Sprintf("/%d", ones)

listen, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ipAddr, 2000))
if err != nil {
t.Fatalf("Failed to p2p listen: %v", err)
}
s := &Service{}
s.addrFilter, err = configureFilter(&Config{DenyListCIDR: []string{cidr}})
if err != nil {
t.Fatal(err)
}
h1, err := libp2p.New(context.Background(), []libp2p.Option{privKeyOption(pkey), libp2p.ListenAddrs(listen), libp2p.ConnectionGater(s)}...)
if err != nil {
t.Fatal(err)
}
s.host = h1
defer func() {
err := h1.Close()
if err != nil {
t.Fatal(err)
}
}()

// create alternate host
listen, err = multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ipAddr2, 3000))
if err != nil {
t.Fatalf("Failed to p2p listen: %v", err)
}
h2, err := libp2p.New(context.Background(), []libp2p.Option{privKeyOption(pkey2), libp2p.ListenAddrs(listen)}...)
if err != nil {
t.Fatal(err)
}
defer func() {
err := h2.Close()
if err != nil {
t.Fatal(err)
}
}()
multiAddress, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d/p2p/%s", ipAddr2, 3000, h2.ID()))
addrInfo, err := peer.AddrInfoFromP2pAddr(multiAddress)
if err != nil {
t.Fatal(err)
}
err = h1.Connect(context.Background(), *addrInfo)
if err == nil {
t.Error("Wanted connection to fail with deny list")
}
}

func TestService_InterceptAddrDial_Allow(t *testing.T) {
s := &Service{}
var err error
cidr := "212.67.89.112/16"
s.addrFilter, err = configureFilter(&Config{AllowListCIDR: cidr})
if err != nil {
t.Fatal(err)
}
ip := "212.67.10.122"
multiAddress, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ip, 3000))
if err != nil {
t.Fatal(err)
}
valid := s.InterceptAddrDial("", multiAddress)
if !valid {
t.Errorf("Expected multiaddress with ip %s to not be rejected with an allow cidr mask of %s", ip, cidr)
}
}

0 comments on commit 1dfeb64

Please sign in to comment.