Skip to content

Commit

Permalink
feat: opt-in Swarm.ResourceMgr (go-libp2p v0.18) (#8680)
Browse files Browse the repository at this point in the history
* update go-libp2p to v0.18.0

* initialize the resource manager

* add resource manager stats/limit commands

* load limit file when building resource manager

* log absent limit file

* write rcmgr to file when IPFS_DEBUG_RCMGR is set

* fix: mark swarm limit|stats as experimental

* feat(cfg): opt-in Swarm.ResourceMgr

This ensures we can safely test the resource manager without impacting
default behavior.

- Resource manager is disabled by default
    - Default for Swarm.ResourceMgr.Enabled is false for now
- Swarm.ResourceMgr.Limits allows user to tweak limits per specific
  scope in a way that is persisted across restarts
- 'ipfs swarm limit system' outputs human-readable json
- 'ipfs swarm limit system new-limits.json' sets new runtime limits
  (but does not change Swarm.ResourceMgr.Limits in the config)

Conventions to make libp2p devs life easier:
- 'IPFS_RCMGR=1 ipfs daemon' overrides the config and enables resource manager
- 'limit.json' overrides implicit defaults from libp2p (if present)

* docs(config): small tweaks

* fix: skip libp2p.ResourceManager if disabled

This ensures 'ipfs swarm limit|stats' work only when enabled.

* fix: use NullResourceManager when disabled

This reverts commit b19f7c9.
after clarification feedback from
#8680 (comment)

* style: rename IPFS_RCMGR to LIBP2P_RCMGR

preexisting libp2p toggles use LIBP2P_ prefix

* test: Swarm.ResourceMgr

* fix: location of opt-in limit.json and rcmgr.json.gz

Places these files inside of IPFS_PATH

* Update docs/config.md

* feat: expose rcmgr metrics when enabled (#8785)

* add metrics for the resource manager
* export protocol and service name in Prometheus metrics
* fix: expose rcmgr metrics only when enabled

Co-authored-by: Marcin Rataj <lidel@lidel.org>

* refactor: rcmgr_metrics.go

* refactor: rcmgr_defaults.go

This file defines implicit limit defaults used when Swarm.ResourceMgr.Enabled

We keep vendored copy to ensure go-ipfs is not impacted when go-libp2p
decides to change defaults in any of the future releases.

* refactor: adjustedDefaultLimits

Cleans up the way we initialize defaults and adds a fix for case
when connection manager runs with high limits.

It also hides `Swarm.ResourceMgr.Limits` until we have a better
understanding what syntax makes sense.

* chore: cleanup after a review

* fix: restore go-ipld-prime v0.14.2

* fix: restore go-ds-flatfs v0.5.1

Co-authored-by: Lucas Molas <schomatis@gmail.com>
Co-authored-by: Marcin Rataj <lidel@lidel.org>
  • Loading branch information
3 people committed Apr 8, 2022
1 parent 7871a0b commit 514411b
Show file tree
Hide file tree
Showing 24 changed files with 1,439 additions and 109 deletions.
59 changes: 59 additions & 0 deletions config/swarm.go
Expand Up @@ -49,6 +49,9 @@ type SwarmConfig struct {

// ConnMgr configures the connection manager.
ConnMgr ConnMgr

// ResourceMgr configures the libp2p Network Resource Manager
ResourceMgr ResourceMgr
}

type RelayClient struct {
Expand Down Expand Up @@ -129,3 +132,59 @@ type ConnMgr struct {
HighWater int
GracePeriod string
}

// ResourceMgr defines configuration options for the libp2p Network Resource Manager
// <https://github.com/libp2p/go-libp2p-resource-manager#readme>
type ResourceMgr struct {
// Enables the Network Resource Manager feature
Enabled Flag `json:",omitempty"`

/* TODO: decide if and how we want to expose limits in our config
Limits *ResourceMgrScopeConfig `json:",omitempty"` */
}

const (
ResourceMgrSystemScope = "system"
ResourceMgrTransientScope = "transient"
ResourceMgrServiceScopePrefix = "svc:"
ResourceMgrProtocolScopePrefix = "proto:"
ResourceMgrPeerScopePrefix = "peer:"
)

/* TODO: decide if and how we want to expose limits in our config
type ResourceMgrLimitsConfig struct {
System *ResourceMgrScopeConfig `json:",omitempty"`
Transient *ResourceMgrScopeConfig `json:",omitempty"`
ServiceDefault *ResourceMgrScopeConfig `json:",omitempty"`
ServicePeerDefault *ResourceMgrScopeConfig `json:",omitempty"`
Service map[string]ResourceMgrScopeConfig `json:",omitempty"`
ServicePeer map[string]ResourceMgrScopeConfig `json:",omitempty"`
ProtocolDefault *ResourceMgrScopeConfig `json:",omitempty"`
ProtocolPeerDefault *ResourceMgrScopeConfig `json:",omitempty"`
Protocol map[string]ResourceMgrScopeConfig `json:",omitempty"`
ProtocolPeer map[string]ResourceMgrScopeConfig `json:",omitempty"`
PeerDefault *ResourceMgrScopeConfig `json:",omitempty"`
Peer map[string]ResourceMgrScopeConfig `json:",omitempty"`
Conn *ResourceMgrScopeConfig `json:",omitempty"`
Stream *ResourceMgrScopeConfig `json:",omitempty"`
}
*/

// libp2p Network Resource Manager config for a scope
type ResourceMgrScopeConfig struct {
Dynamic bool `json:",omitempty"`
// set if Dynamic is false
Memory int64 `json:",omitempty"`
// set if Dynamic is true
MemoryFraction float64 `json:",omitempty"`
MinMemory int64 `json:",omitempty"`
MaxMemory int64 `json:",omitempty"`

Streams, StreamsInbound, StreamsOutbound int
Conns, ConnsInbound, ConnsOutbound int
FD int
}
2 changes: 2 additions & 0 deletions core/commands/commands_test.go
Expand Up @@ -237,11 +237,13 @@ func TestCommands(t *testing.T) {
"/swarm/filters",
"/swarm/filters/add",
"/swarm/filters/rm",
"/swarm/limit",
"/swarm/peers",
"/swarm/peering",
"/swarm/peering/add",
"/swarm/peering/ls",
"/swarm/peering/rm",
"/swarm/stats",
"/tar",
"/tar/add",
"/tar/cat",
Expand Down
20 changes: 11 additions & 9 deletions core/commands/config.go
Expand Up @@ -215,18 +215,20 @@ NOTE: For security reasons, this command will omit your private key and remote s
return cmds.EmitOnce(res, &cfg)
},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *map[string]interface{}) error {
buf, err := config.HumanOutput(out)
if err != nil {
return err
}
buf = append(buf, byte('\n'))
_, err = w.Write(buf)
return err
}),
cmds.Text: HumanJSONEncoder,
},
}

var HumanJSONEncoder = cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *map[string]interface{}) error {
buf, err := config.HumanOutput(out)
if err != nil {
return err
}
buf = append(buf, byte('\n'))
_, err = w.Write(buf)
return err
})

// Scrubs value and returns error if missing
func scrubValue(m map[string]interface{}, key []string) (map[string]interface{}, error) {
return scrubMapInternal(m, key, false)
Expand Down
145 changes: 139 additions & 6 deletions core/commands/swarm.go
@@ -1,7 +1,9 @@
package commands

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
Expand All @@ -10,15 +12,17 @@ import (
"sync"
"time"

commands "github.com/ipfs/go-ipfs/commands"
cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
repo "github.com/ipfs/go-ipfs/repo"
fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo"
files "github.com/ipfs/go-ipfs-files"
"github.com/ipfs/go-ipfs/commands"
"github.com/ipfs/go-ipfs/config"
"github.com/ipfs/go-ipfs/core/commands/cmdenv"
"github.com/ipfs/go-ipfs/core/node/libp2p"
"github.com/ipfs/go-ipfs/repo"
"github.com/ipfs/go-ipfs/repo/fsrepo"

cmds "github.com/ipfs/go-ipfs-cmds"
config "github.com/ipfs/go-ipfs/config"
inet "github.com/libp2p/go-libp2p-core/network"
peer "github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns"
mamask "github.com/whyrusleeping/multiaddr-filter"
Expand Down Expand Up @@ -52,6 +56,8 @@ ipfs peers in the internet.
"filters": swarmFiltersCmd,
"peers": swarmPeersCmd,
"peering": swarmPeeringCmd,
"stats": swarmStatsCmd, // libp2p Network Resource Manager
"limit": swarmLimitCmd, // libp2p Network Resource Manager
},
}

Expand Down Expand Up @@ -304,6 +310,133 @@ var swarmPeersCmd = &cmds.Command{
Type: connInfos{},
}

var swarmStatsCmd = &cmds.Command{
Status: cmds.Experimental,
Helptext: cmds.HelpText{
Tagline: "Report resource usage for a scope.",
LongDescription: `Report resource usage for a scope.
The scope can be one of the following:
- system -- reports the system aggregate resource usage.
- transient -- reports the transient resource usage.
- svc:<service> -- reports the resource usage of a specific service.
- proto:<proto> -- reports the resource usage of a specific protocol.
- peer:<peer> -- reports the resource usage of a specific peer.
- all -- reports the resource usage for all currently active scopes.
The output of this command is JSON.
`},
Arguments: []cmds.Argument{
cmds.StringArg("scope", true, false, "scope of the stat report"),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
node, err := cmdenv.GetNode(env)
if err != nil {
return err
}

if node.ResourceManager == nil {
return libp2p.NoResourceMgrError
}

if len(req.Arguments) != 1 {
return fmt.Errorf("must specify exactly one scope")
}
scope := req.Arguments[0]
result, err := libp2p.NetStat(node.ResourceManager, scope)
if err != nil {
return err
}

b := new(bytes.Buffer)
enc := json.NewEncoder(b)
err = enc.Encode(result)
if err != nil {
return err
}
return cmds.EmitOnce(res, b)
},
Encoders: cmds.EncoderMap{
cmds.Text: HumanJSONEncoder,
},
}

var swarmLimitCmd = &cmds.Command{
Status: cmds.Experimental,
Helptext: cmds.HelpText{
Tagline: "Get or set resource limits for a scope.",
LongDescription: `Get or set resource limits for a scope.
The scope can be one of the following:
- system -- limits for the system aggregate resource usage.
- transient -- limits for the transient resource usage.
- svc:<service> -- limits for the resource usage of a specific service.
- proto:<proto> -- limits for the resource usage of a specific protocol.
- peer:<peer> -- limits for the resource usage of a specific peer.
The output of this command is JSON.
It is possible to use this command to inspect and tweak limits at runtime:
$ ipfs swarm limit system > limit.json
$ vi limit.json
$ ipfs swarm limit system limit.json
Changes made via command line are discarded on node shutdown.
For permanent limits set Swarm.ResourceMgr.Limits in the $IPFS_PATH/config file.
`},
Arguments: []cmds.Argument{
cmds.StringArg("scope", true, false, "scope of the limit"),
cmds.FileArg("limit.json", false, false, "limits to be set").EnableStdin(),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
node, err := cmdenv.GetNode(env)
if err != nil {
return err
}

if node.ResourceManager == nil {
return libp2p.NoResourceMgrError
}

scope := req.Arguments[0]

// set scope limit to new values (when limit.json is passed as a second arg)
if req.Files != nil {
var newLimit config.ResourceMgrScopeConfig
it := req.Files.Entries()
if it.Next() {
file := files.FileFromEntry(it)
if file == nil {
return errors.New("expected a JSON file")
}
if err := json.NewDecoder(file).Decode(&newLimit); err != nil {
return errors.New("failed to decode JSON as ResourceMgrScopeConfig")
}
return libp2p.NetSetLimit(node.ResourceManager, scope, newLimit)
}
if err := it.Err(); err != nil {
return fmt.Errorf("error opening limit JSON file: %w", err)
}
}

// get scope limit
result, err := libp2p.NetLimit(node.ResourceManager, scope)
if err != nil {
return err
}

b := new(bytes.Buffer)
enc := json.NewEncoder(b)
err = enc.Encode(result)
if err != nil {
return err
}
return cmds.EmitOnce(res, b)
},
Encoders: cmds.EncoderMap{
cmds.Text: HumanJSONEncoder,
},
}

type streamInfo struct {
Protocol string
}
Expand Down
24 changes: 13 additions & 11 deletions core/core.go
Expand Up @@ -30,6 +30,7 @@ import (
ic "github.com/libp2p/go-libp2p-core/crypto"
p2phost "github.com/libp2p/go-libp2p-core/host"
metrics "github.com/libp2p/go-libp2p-core/metrics"
"github.com/libp2p/go-libp2p-core/network"
peer "github.com/libp2p/go-libp2p-core/peer"
pstore "github.com/libp2p/go-libp2p-core/peerstore"
routing "github.com/libp2p/go-libp2p-core/routing"
Expand Down Expand Up @@ -85,17 +86,18 @@ type IpfsNode struct {
RecordValidator record.Validator

// Online
PeerHost p2phost.Host `optional:"true"` // the network host (server+client)
Peering *peering.PeeringService `optional:"true"`
Filters *ma.Filters `optional:"true"`
Bootstrapper io.Closer `optional:"true"` // the periodic bootstrapper
Routing routing.Routing `optional:"true"` // the routing system. recommend ipfs-dht
DNSResolver *madns.Resolver // the DNS resolver
Exchange exchange.Interface // the block exchange + strategy (bitswap)
Namesys namesys.NameSystem // the name system, resolves paths to hashes
Provider provider.System // the value provider system
IpnsRepub *ipnsrp.Republisher `optional:"true"`
GraphExchange graphsync.GraphExchange `optional:"true"`
PeerHost p2phost.Host `optional:"true"` // the network host (server+client)
Peering *peering.PeeringService `optional:"true"`
Filters *ma.Filters `optional:"true"`
Bootstrapper io.Closer `optional:"true"` // the periodic bootstrapper
Routing routing.Routing `optional:"true"` // the routing system. recommend ipfs-dht
DNSResolver *madns.Resolver // the DNS resolver
Exchange exchange.Interface // the block exchange + strategy (bitswap)
Namesys namesys.NameSystem // the name system, resolves paths to hashes
Provider provider.System // the value provider system
IpnsRepub *ipnsrp.Republisher `optional:"true"`
GraphExchange graphsync.GraphExchange `optional:"true"`
ResourceManager network.ResourceManager `optional:"true"`

PubSub *pubsub.PubSub `optional:"true"`
PSRouter *psrouter.PubsubValueStore `optional:"true"`
Expand Down
4 changes: 2 additions & 2 deletions core/coreapi/test/api_test.go
Expand Up @@ -23,7 +23,7 @@ import (
coreiface "github.com/ipfs/interface-go-ipfs-core"
"github.com/ipfs/interface-go-ipfs-core/tests"
"github.com/libp2p/go-libp2p-core/crypto"
peer "github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peer"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
)

Expand All @@ -32,7 +32,7 @@ const testPeerID = "QmTFauExutTsy4XP6JbMFcw2Wa9645HJt2bTqL6qYDCKfe"
type NodeProvider struct{}

func (NodeProvider) MakeAPISwarm(ctx context.Context, fullIdentity bool, n int) ([]coreiface.CoreAPI, error) {
mn := mocknet.New(ctx)
mn := mocknet.New()

nodes := make([]*core.IpfsNode, n)
apis := make([]coreiface.CoreAPI, n)
Expand Down
6 changes: 2 additions & 4 deletions core/mock/mock.go
Expand Up @@ -26,12 +26,10 @@ import (

// NewMockNode constructs an IpfsNode for use in tests.
func NewMockNode() (*core.IpfsNode, error) {
ctx := context.Background()

// effectively offline, only peer in its network
return core.NewNode(ctx, &core.BuildCfg{
return core.NewNode(context.Background(), &core.BuildCfg{
Online: true,
Host: MockHostOption(mocknet.New(ctx)),
Host: MockHostOption(mocknet.New()),
})
}

Expand Down

0 comments on commit 514411b

Please sign in to comment.