diff --git a/p2p/host/peerstore/pstoreds/peerstore.go b/p2p/host/peerstore/pstoreds/peerstore.go index 7b218155ba..f4ef0c91f2 100644 --- a/p2p/host/peerstore/pstoreds/peerstore.go +++ b/p2p/host/peerstore/pstoreds/peerstore.go @@ -6,10 +6,6 @@ import ( "io" "time" - "github.com/libp2p/go-libp2p-core/event" - - "github.com/libp2p/go-libp2p-peerstore/pstoremanager" - "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" pstore "github.com/libp2p/go-libp2p-peerstore" @@ -27,13 +23,6 @@ type Options struct { // MaxProtocols is the maximum number of protocols we store for one peer. MaxProtocols int - // The EventBus that is used to subscribe to EvtPeerConnectednessChanged events. - // This allows the automatic clean up when a peer disconnect. - // This configuration option is optional. If no EventBus is set, it's the callers - // responsibility to call RemovePeer to ensure that memory consumption of the - // peerstore doesn't grow unboundedly. - EventBus event.Bus - // Sweep interval to purge expired addresses from the datastore. If this is a zero value, GC will not run // automatically, but it'll be available on demand via explicit calls. GCPurgeInterval time.Duration @@ -71,17 +60,12 @@ type pstoreds struct { *dsAddrBook *dsProtoBook *dsPeerMetadata - - manager *pstoremanager.PeerstoreManager } var _ peerstore.Peerstore = &pstoreds{} // NewPeerstore creates a peerstore backed by the provided persistent datastore. -// It is recommended to construct the peerstore with an event bus, using the WithEventBus option. -// In that case, the peerstore will automatically perform cleanups when a peer disconnects -// (see the pstoremanager package for details). -// If constructed without an event bus, it's the caller's responsibility to call RemovePeer to ensure +// It's the caller's responsibility to call RemovePeer to ensure // that memory consumption of the peerstore doesn't grow unboundedly. func NewPeerstore(ctx context.Context, store ds.Batching, opts Options) (*pstoreds, error) { addrBook, err := NewAddrBook(ctx, store, opts) @@ -104,22 +88,13 @@ func NewPeerstore(ctx context.Context, store ds.Batching, opts Options) (*pstore return nil, err } - ps := &pstoreds{ + return &pstoreds{ Metrics: pstore.NewMetrics(), dsKeyBook: keyBook, dsAddrBook: addrBook, dsPeerMetadata: peerMetadata, dsProtoBook: protoBook, - } - if opts.EventBus != nil { - manager, err := pstoremanager.NewPeerstoreManager(ps, opts.EventBus) - if err != nil { - ps.Close() - return nil, err - } - ps.manager = manager - } - return ps, nil + }, nil } // uniquePeerIds extracts and returns unique peer IDs from database keys. @@ -156,10 +131,6 @@ func uniquePeerIds(ds ds.Datastore, prefix ds.Key, extractor func(result query.R return ids, nil } -func (ps *pstoreds) Start() { - ps.manager.Start() -} - func (ps *pstoreds) Close() (err error) { var errs []error weakClose := func(name string, c interface{}) { @@ -169,9 +140,6 @@ func (ps *pstoreds) Close() (err error) { } } } - if ps.manager != nil { - weakClose("manager", ps.manager) - } weakClose("keybook", ps.dsKeyBook) weakClose("addressbook", ps.dsAddrBook) weakClose("protobook", ps.dsProtoBook) diff --git a/p2p/host/peerstore/pstoremanager/mock_peerstore_test.go b/p2p/host/peerstore/pstoremanager/mock_peerstore_test.go deleted file mode 100644 index c1414cce52..0000000000 --- a/p2p/host/peerstore/pstoremanager/mock_peerstore_test.go +++ /dev/null @@ -1,456 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: github.com/libp2p/go-libp2p-core/peerstore (interfaces: Peerstore) - -// Package pstoremanager_test is a generated GoMock package. -package pstoremanager_test - -import ( - context "context" - reflect "reflect" - time "time" - - gomock "github.com/golang/mock/gomock" - crypto "github.com/libp2p/go-libp2p-core/crypto" - peer "github.com/libp2p/go-libp2p-core/peer" - multiaddr "github.com/multiformats/go-multiaddr" -) - -// MockPeerstore is a mock of Peerstore interface. -type MockPeerstore struct { - ctrl *gomock.Controller - recorder *MockPeerstoreMockRecorder -} - -// MockPeerstoreMockRecorder is the mock recorder for MockPeerstore. -type MockPeerstoreMockRecorder struct { - mock *MockPeerstore -} - -// NewMockPeerstore creates a new mock instance. -func NewMockPeerstore(ctrl *gomock.Controller) *MockPeerstore { - mock := &MockPeerstore{ctrl: ctrl} - mock.recorder = &MockPeerstoreMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockPeerstore) EXPECT() *MockPeerstoreMockRecorder { - return m.recorder -} - -// AddAddr mocks base method. -func (m *MockPeerstore) AddAddr(arg0 peer.ID, arg1 multiaddr.Multiaddr, arg2 time.Duration) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "AddAddr", arg0, arg1, arg2) -} - -// AddAddr indicates an expected call of AddAddr. -func (mr *MockPeerstoreMockRecorder) AddAddr(arg0, arg1, arg2 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddAddr", reflect.TypeOf((*MockPeerstore)(nil).AddAddr), arg0, arg1, arg2) -} - -// AddAddrs mocks base method. -func (m *MockPeerstore) AddAddrs(arg0 peer.ID, arg1 []multiaddr.Multiaddr, arg2 time.Duration) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "AddAddrs", arg0, arg1, arg2) -} - -// AddAddrs indicates an expected call of AddAddrs. -func (mr *MockPeerstoreMockRecorder) AddAddrs(arg0, arg1, arg2 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddAddrs", reflect.TypeOf((*MockPeerstore)(nil).AddAddrs), arg0, arg1, arg2) -} - -// AddPrivKey mocks base method. -func (m *MockPeerstore) AddPrivKey(arg0 peer.ID, arg1 crypto.PrivKey) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AddPrivKey", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 -} - -// AddPrivKey indicates an expected call of AddPrivKey. -func (mr *MockPeerstoreMockRecorder) AddPrivKey(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddPrivKey", reflect.TypeOf((*MockPeerstore)(nil).AddPrivKey), arg0, arg1) -} - -// AddProtocols mocks base method. -func (m *MockPeerstore) AddProtocols(arg0 peer.ID, arg1 ...string) error { - m.ctrl.T.Helper() - varargs := []interface{}{arg0} - for _, a := range arg1 { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "AddProtocols", varargs...) - ret0, _ := ret[0].(error) - return ret0 -} - -// AddProtocols indicates an expected call of AddProtocols. -func (mr *MockPeerstoreMockRecorder) AddProtocols(arg0 interface{}, arg1 ...interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{arg0}, arg1...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddProtocols", reflect.TypeOf((*MockPeerstore)(nil).AddProtocols), varargs...) -} - -// AddPubKey mocks base method. -func (m *MockPeerstore) AddPubKey(arg0 peer.ID, arg1 crypto.PubKey) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AddPubKey", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 -} - -// AddPubKey indicates an expected call of AddPubKey. -func (mr *MockPeerstoreMockRecorder) AddPubKey(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddPubKey", reflect.TypeOf((*MockPeerstore)(nil).AddPubKey), arg0, arg1) -} - -// AddrStream mocks base method. -func (m *MockPeerstore) AddrStream(arg0 context.Context, arg1 peer.ID) <-chan multiaddr.Multiaddr { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AddrStream", arg0, arg1) - ret0, _ := ret[0].(<-chan multiaddr.Multiaddr) - return ret0 -} - -// AddrStream indicates an expected call of AddrStream. -func (mr *MockPeerstoreMockRecorder) AddrStream(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddrStream", reflect.TypeOf((*MockPeerstore)(nil).AddrStream), arg0, arg1) -} - -// Addrs mocks base method. -func (m *MockPeerstore) Addrs(arg0 peer.ID) []multiaddr.Multiaddr { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Addrs", arg0) - ret0, _ := ret[0].([]multiaddr.Multiaddr) - return ret0 -} - -// Addrs indicates an expected call of Addrs. -func (mr *MockPeerstoreMockRecorder) Addrs(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Addrs", reflect.TypeOf((*MockPeerstore)(nil).Addrs), arg0) -} - -// ClearAddrs mocks base method. -func (m *MockPeerstore) ClearAddrs(arg0 peer.ID) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "ClearAddrs", arg0) -} - -// ClearAddrs indicates an expected call of ClearAddrs. -func (mr *MockPeerstoreMockRecorder) ClearAddrs(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClearAddrs", reflect.TypeOf((*MockPeerstore)(nil).ClearAddrs), arg0) -} - -// Close mocks base method. -func (m *MockPeerstore) Close() error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Close") - ret0, _ := ret[0].(error) - return ret0 -} - -// Close indicates an expected call of Close. -func (mr *MockPeerstoreMockRecorder) Close() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockPeerstore)(nil).Close)) -} - -// FirstSupportedProtocol mocks base method. -func (m *MockPeerstore) FirstSupportedProtocol(arg0 peer.ID, arg1 ...string) (string, error) { - m.ctrl.T.Helper() - varargs := []interface{}{arg0} - for _, a := range arg1 { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "FirstSupportedProtocol", varargs...) - ret0, _ := ret[0].(string) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// FirstSupportedProtocol indicates an expected call of FirstSupportedProtocol. -func (mr *MockPeerstoreMockRecorder) FirstSupportedProtocol(arg0 interface{}, arg1 ...interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{arg0}, arg1...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FirstSupportedProtocol", reflect.TypeOf((*MockPeerstore)(nil).FirstSupportedProtocol), varargs...) -} - -// Get mocks base method. -func (m *MockPeerstore) Get(arg0 peer.ID, arg1 string) (interface{}, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Get", arg0, arg1) - ret0, _ := ret[0].(interface{}) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// Get indicates an expected call of Get. -func (mr *MockPeerstoreMockRecorder) Get(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockPeerstore)(nil).Get), arg0, arg1) -} - -// GetProtocols mocks base method. -func (m *MockPeerstore) GetProtocols(arg0 peer.ID) ([]string, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetProtocols", arg0) - ret0, _ := ret[0].([]string) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// GetProtocols indicates an expected call of GetProtocols. -func (mr *MockPeerstoreMockRecorder) GetProtocols(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetProtocols", reflect.TypeOf((*MockPeerstore)(nil).GetProtocols), arg0) -} - -// LatencyEWMA mocks base method. -func (m *MockPeerstore) LatencyEWMA(arg0 peer.ID) time.Duration { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "LatencyEWMA", arg0) - ret0, _ := ret[0].(time.Duration) - return ret0 -} - -// LatencyEWMA indicates an expected call of LatencyEWMA. -func (mr *MockPeerstoreMockRecorder) LatencyEWMA(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LatencyEWMA", reflect.TypeOf((*MockPeerstore)(nil).LatencyEWMA), arg0) -} - -// PeerInfo mocks base method. -func (m *MockPeerstore) PeerInfo(arg0 peer.ID) peer.AddrInfo { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "PeerInfo", arg0) - ret0, _ := ret[0].(peer.AddrInfo) - return ret0 -} - -// PeerInfo indicates an expected call of PeerInfo. -func (mr *MockPeerstoreMockRecorder) PeerInfo(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PeerInfo", reflect.TypeOf((*MockPeerstore)(nil).PeerInfo), arg0) -} - -// Peers mocks base method. -func (m *MockPeerstore) Peers() peer.IDSlice { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Peers") - ret0, _ := ret[0].(peer.IDSlice) - return ret0 -} - -// Peers indicates an expected call of Peers. -func (mr *MockPeerstoreMockRecorder) Peers() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Peers", reflect.TypeOf((*MockPeerstore)(nil).Peers)) -} - -// PeersWithAddrs mocks base method. -func (m *MockPeerstore) PeersWithAddrs() peer.IDSlice { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "PeersWithAddrs") - ret0, _ := ret[0].(peer.IDSlice) - return ret0 -} - -// PeersWithAddrs indicates an expected call of PeersWithAddrs. -func (mr *MockPeerstoreMockRecorder) PeersWithAddrs() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PeersWithAddrs", reflect.TypeOf((*MockPeerstore)(nil).PeersWithAddrs)) -} - -// PeersWithKeys mocks base method. -func (m *MockPeerstore) PeersWithKeys() peer.IDSlice { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "PeersWithKeys") - ret0, _ := ret[0].(peer.IDSlice) - return ret0 -} - -// PeersWithKeys indicates an expected call of PeersWithKeys. -func (mr *MockPeerstoreMockRecorder) PeersWithKeys() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PeersWithKeys", reflect.TypeOf((*MockPeerstore)(nil).PeersWithKeys)) -} - -// PrivKey mocks base method. -func (m *MockPeerstore) PrivKey(arg0 peer.ID) crypto.PrivKey { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "PrivKey", arg0) - ret0, _ := ret[0].(crypto.PrivKey) - return ret0 -} - -// PrivKey indicates an expected call of PrivKey. -func (mr *MockPeerstoreMockRecorder) PrivKey(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PrivKey", reflect.TypeOf((*MockPeerstore)(nil).PrivKey), arg0) -} - -// PubKey mocks base method. -func (m *MockPeerstore) PubKey(arg0 peer.ID) crypto.PubKey { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "PubKey", arg0) - ret0, _ := ret[0].(crypto.PubKey) - return ret0 -} - -// PubKey indicates an expected call of PubKey. -func (mr *MockPeerstoreMockRecorder) PubKey(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PubKey", reflect.TypeOf((*MockPeerstore)(nil).PubKey), arg0) -} - -// Put mocks base method. -func (m *MockPeerstore) Put(arg0 peer.ID, arg1 string, arg2 interface{}) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Put", arg0, arg1, arg2) - ret0, _ := ret[0].(error) - return ret0 -} - -// Put indicates an expected call of Put. -func (mr *MockPeerstoreMockRecorder) Put(arg0, arg1, arg2 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Put", reflect.TypeOf((*MockPeerstore)(nil).Put), arg0, arg1, arg2) -} - -// RecordLatency mocks base method. -func (m *MockPeerstore) RecordLatency(arg0 peer.ID, arg1 time.Duration) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "RecordLatency", arg0, arg1) -} - -// RecordLatency indicates an expected call of RecordLatency. -func (mr *MockPeerstoreMockRecorder) RecordLatency(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecordLatency", reflect.TypeOf((*MockPeerstore)(nil).RecordLatency), arg0, arg1) -} - -// RemovePeer mocks base method. -func (m *MockPeerstore) RemovePeer(arg0 peer.ID) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "RemovePeer", arg0) -} - -// RemovePeer indicates an expected call of RemovePeer. -func (mr *MockPeerstoreMockRecorder) RemovePeer(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemovePeer", reflect.TypeOf((*MockPeerstore)(nil).RemovePeer), arg0) -} - -// RemoveProtocols mocks base method. -func (m *MockPeerstore) RemoveProtocols(arg0 peer.ID, arg1 ...string) error { - m.ctrl.T.Helper() - varargs := []interface{}{arg0} - for _, a := range arg1 { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "RemoveProtocols", varargs...) - ret0, _ := ret[0].(error) - return ret0 -} - -// RemoveProtocols indicates an expected call of RemoveProtocols. -func (mr *MockPeerstoreMockRecorder) RemoveProtocols(arg0 interface{}, arg1 ...interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{arg0}, arg1...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveProtocols", reflect.TypeOf((*MockPeerstore)(nil).RemoveProtocols), varargs...) -} - -// SetAddr mocks base method. -func (m *MockPeerstore) SetAddr(arg0 peer.ID, arg1 multiaddr.Multiaddr, arg2 time.Duration) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "SetAddr", arg0, arg1, arg2) -} - -// SetAddr indicates an expected call of SetAddr. -func (mr *MockPeerstoreMockRecorder) SetAddr(arg0, arg1, arg2 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetAddr", reflect.TypeOf((*MockPeerstore)(nil).SetAddr), arg0, arg1, arg2) -} - -// SetAddrs mocks base method. -func (m *MockPeerstore) SetAddrs(arg0 peer.ID, arg1 []multiaddr.Multiaddr, arg2 time.Duration) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "SetAddrs", arg0, arg1, arg2) -} - -// SetAddrs indicates an expected call of SetAddrs. -func (mr *MockPeerstoreMockRecorder) SetAddrs(arg0, arg1, arg2 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetAddrs", reflect.TypeOf((*MockPeerstore)(nil).SetAddrs), arg0, arg1, arg2) -} - -// SetProtocols mocks base method. -func (m *MockPeerstore) SetProtocols(arg0 peer.ID, arg1 ...string) error { - m.ctrl.T.Helper() - varargs := []interface{}{arg0} - for _, a := range arg1 { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "SetProtocols", varargs...) - ret0, _ := ret[0].(error) - return ret0 -} - -// SetProtocols indicates an expected call of SetProtocols. -func (mr *MockPeerstoreMockRecorder) SetProtocols(arg0 interface{}, arg1 ...interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{arg0}, arg1...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetProtocols", reflect.TypeOf((*MockPeerstore)(nil).SetProtocols), varargs...) -} - -// Start mocks base method. -func (m *MockPeerstore) Start() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "Start") -} - -// Start indicates an expected call of Start. -func (mr *MockPeerstoreMockRecorder) Start() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockPeerstore)(nil).Start)) -} - -// SupportsProtocols mocks base method. -func (m *MockPeerstore) SupportsProtocols(arg0 peer.ID, arg1 ...string) ([]string, error) { - m.ctrl.T.Helper() - varargs := []interface{}{arg0} - for _, a := range arg1 { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "SupportsProtocols", varargs...) - ret0, _ := ret[0].([]string) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// SupportsProtocols indicates an expected call of SupportsProtocols. -func (mr *MockPeerstoreMockRecorder) SupportsProtocols(arg0 interface{}, arg1 ...interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{arg0}, arg1...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SupportsProtocols", reflect.TypeOf((*MockPeerstore)(nil).SupportsProtocols), varargs...) -} - -// UpdateAddrs mocks base method. -func (m *MockPeerstore) UpdateAddrs(arg0 peer.ID, arg1, arg2 time.Duration) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "UpdateAddrs", arg0, arg1, arg2) -} - -// UpdateAddrs indicates an expected call of UpdateAddrs. -func (mr *MockPeerstoreMockRecorder) UpdateAddrs(arg0, arg1, arg2 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateAddrs", reflect.TypeOf((*MockPeerstore)(nil).UpdateAddrs), arg0, arg1, arg2) -} diff --git a/p2p/host/peerstore/pstoremanager/pstoremanager.go b/p2p/host/peerstore/pstoremanager/pstoremanager.go deleted file mode 100644 index f5d20dd69e..0000000000 --- a/p2p/host/peerstore/pstoremanager/pstoremanager.go +++ /dev/null @@ -1,116 +0,0 @@ -package pstoremanager - -import ( - "sync" - "time" - - "github.com/libp2p/go-libp2p-core/event" - "github.com/libp2p/go-libp2p-core/network" - "github.com/libp2p/go-libp2p-core/peer" - "github.com/libp2p/go-libp2p-core/peerstore" -) - -type Option func(*PeerstoreManager) error - -// WithGracePeriod sets the grace period. -// If a peer doesn't reconnect during the grace period, its data is removed. -// Default: 1 minute. -func WithGracePeriod(p time.Duration) Option { - return func(m *PeerstoreManager) error { - m.gracePeriod = p - return nil - } -} - -// WithCleanupInterval set the clean up interval. -// During a clean up run peers that disconnected before the grace period are removed. -// If unset, the interval is set to half the grace period. -func WithCleanupInterval(t time.Duration) Option { - return func(m *PeerstoreManager) error { - m.cleanupInterval = t - return nil - } -} - -type PeerstoreManager struct { - pstore peerstore.Peerstore - sub event.Subscription - - refCount sync.WaitGroup - - gracePeriod time.Duration - cleanupInterval time.Duration -} - -func NewPeerstoreManager(pstore peerstore.Peerstore, eventBus event.Bus, opts ...Option) (*PeerstoreManager, error) { - sub, err := eventBus.Subscribe(&event.EvtPeerConnectednessChanged{}) - if err != nil { - return nil, err - } - m := &PeerstoreManager{ - pstore: pstore, - gracePeriod: time.Minute, - sub: sub, - } - for _, opt := range opts { - if err := opt(m); err != nil { - return nil, err - } - } - if m.cleanupInterval == 0 { - m.cleanupInterval = m.gracePeriod / 2 - } - return m, nil -} - -func (m *PeerstoreManager) Start() { - m.refCount.Add(1) - go m.background() -} - -func (m *PeerstoreManager) background() { - defer m.refCount.Done() - disconnected := make(map[peer.ID]time.Time) - - ticker := time.NewTicker(m.cleanupInterval) - defer ticker.Stop() - - defer func() { - for p := range disconnected { - m.pstore.RemovePeer(p) - } - }() - - for { - select { - case e, ok := <-m.sub.Out(): - if !ok { - return - } - ev := e.(event.EvtPeerConnectednessChanged) - p := ev.Peer - switch ev.Connectedness { - case network.NotConnected: - if _, ok := disconnected[p]; !ok { - disconnected[p] = time.Now() - } - case network.Connected: - // If we reconnect to the peer before we've cleared the information, keep it. - delete(disconnected, p) - } - case now := <-ticker.C: - for p, disconnectTime := range disconnected { - if disconnectTime.Add(m.gracePeriod).Before(now) { - m.pstore.RemovePeer(p) - delete(disconnected, p) - } - } - } - } -} - -func (m *PeerstoreManager) Close() error { - err := m.sub.Close() - m.refCount.Wait() - return err -} diff --git a/p2p/host/peerstore/pstoremanager/pstoremanager_test.go b/p2p/host/peerstore/pstoremanager/pstoremanager_test.go deleted file mode 100644 index 060cae9d1e..0000000000 --- a/p2p/host/peerstore/pstoremanager/pstoremanager_test.go +++ /dev/null @@ -1,98 +0,0 @@ -package pstoremanager_test - -import ( - "testing" - "time" - - "github.com/libp2p/go-libp2p-peerstore/pstoremanager" - - "github.com/libp2p/go-libp2p-core/event" - "github.com/libp2p/go-libp2p-core/network" - "github.com/libp2p/go-libp2p-core/peer" - - "github.com/libp2p/go-eventbus" - - "github.com/golang/mock/gomock" - "github.com/stretchr/testify/require" -) - -//go:generate sh -c "mockgen -package pstoremanager_test -destination mock_peerstore_test.go github.com/libp2p/go-libp2p-core/peerstore Peerstore" - -func TestGracePeriod(t *testing.T) { - t.Parallel() - ctrl := gomock.NewController(t) - defer ctrl.Finish() - eventBus := eventbus.NewBus() - pstore := NewMockPeerstore(ctrl) - const gracePeriod = 250 * time.Millisecond - man, err := pstoremanager.NewPeerstoreManager(pstore, eventBus, pstoremanager.WithGracePeriod(gracePeriod)) - require.NoError(t, err) - defer man.Close() - man.Start() - - emitter, err := eventBus.Emitter(new(event.EvtPeerConnectednessChanged)) - require.NoError(t, err) - start := time.Now() - removed := make(chan struct{}) - pstore.EXPECT().RemovePeer(peer.ID("foobar")).DoAndReturn(func(p peer.ID) { - defer close(removed) - // make sure the call happened after the grace period - require.GreaterOrEqual(t, time.Since(start), gracePeriod) - require.LessOrEqual(t, time.Since(start), 3*gracePeriod) - }) - require.NoError(t, emitter.Emit(event.EvtPeerConnectednessChanged{ - Peer: "foobar", - Connectedness: network.NotConnected, - })) - <-removed -} - -func TestReconnect(t *testing.T) { - t.Parallel() - ctrl := gomock.NewController(t) - eventBus := eventbus.NewBus() - pstore := NewMockPeerstore(ctrl) - const gracePeriod = 200 * time.Millisecond - man, err := pstoremanager.NewPeerstoreManager(pstore, eventBus, pstoremanager.WithGracePeriod(gracePeriod)) - require.NoError(t, err) - defer man.Close() - man.Start() - - emitter, err := eventBus.Emitter(new(event.EvtPeerConnectednessChanged)) - require.NoError(t, err) - require.NoError(t, emitter.Emit(event.EvtPeerConnectednessChanged{ - Peer: "foobar", - Connectedness: network.NotConnected, - })) - require.NoError(t, emitter.Emit(event.EvtPeerConnectednessChanged{ - Peer: "foobar", - Connectedness: network.Connected, - })) - time.Sleep(gracePeriod * 3 / 2) - // There should have been no calls to RemovePeer. - ctrl.Finish() -} - -func TestClose(t *testing.T) { - t.Parallel() - ctrl := gomock.NewController(t) - defer ctrl.Finish() - eventBus := eventbus.NewBus() - pstore := NewMockPeerstore(ctrl) - const gracePeriod = time.Hour - man, err := pstoremanager.NewPeerstoreManager(pstore, eventBus, pstoremanager.WithGracePeriod(gracePeriod)) - require.NoError(t, err) - man.Start() - - emitter, err := eventBus.Emitter(new(event.EvtPeerConnectednessChanged)) - require.NoError(t, err) - require.NoError(t, emitter.Emit(event.EvtPeerConnectednessChanged{ - Peer: "foobar", - Connectedness: network.NotConnected, - })) - time.Sleep(10 * time.Millisecond) // make sure the event is sent before we close - done := make(chan struct{}) - pstore.EXPECT().RemovePeer(peer.ID("foobar")).Do(func(peer.ID) { close(done) }) - require.NoError(t, man.Close()) - <-done -} diff --git a/p2p/host/peerstore/pstoremem/inmem_test.go b/p2p/host/peerstore/pstoremem/inmem_test.go index 57474e0d4a..2d4b09132d 100644 --- a/p2p/host/peerstore/pstoremem/inmem_test.go +++ b/p2p/host/peerstore/pstoremem/inmem_test.go @@ -5,7 +5,6 @@ import ( pt "github.com/libp2p/go-libp2p-peerstore/test" - "github.com/libp2p/go-eventbus" pstore "github.com/libp2p/go-libp2p-core/peerstore" "github.com/stretchr/testify/require" @@ -21,7 +20,7 @@ func TestFuzzInMemoryPeerstore(t *testing.T) { // Just create and close a bunch of peerstores. If this leaks, we'll // catch it in the leak check below. for i := 0; i < 100; i++ { - ps, err := NewPeerstore(WithEventBus(eventbus.NewBus())) + ps, err := NewPeerstore() require.NoError(t, err) ps.Close() } @@ -29,7 +28,7 @@ func TestFuzzInMemoryPeerstore(t *testing.T) { func TestInMemoryPeerstore(t *testing.T) { pt.TestPeerstore(t, func() (pstore.Peerstore, func()) { - ps, err := NewPeerstore(WithEventBus(eventbus.NewBus())) + ps, err := NewPeerstore() require.NoError(t, err) return ps, func() { ps.Close() } }) @@ -37,7 +36,7 @@ func TestInMemoryPeerstore(t *testing.T) { func TestPeerstoreProtoStoreLimits(t *testing.T) { const limit = 10 - ps, err := NewPeerstore(WithEventBus(eventbus.NewBus()), WithMaxProtocols(limit)) + ps, err := NewPeerstore(WithMaxProtocols(limit)) require.NoError(t, err) defer ps.Close() pt.TestPeerstoreProtoStoreLimits(t, ps, limit) @@ -45,7 +44,7 @@ func TestPeerstoreProtoStoreLimits(t *testing.T) { func TestInMemoryAddrBook(t *testing.T) { pt.TestAddrBook(t, func() (pstore.AddrBook, func()) { - ps, err := NewPeerstore(WithEventBus(eventbus.NewBus())) + ps, err := NewPeerstore() require.NoError(t, err) return ps, func() { ps.Close() } }) @@ -53,7 +52,7 @@ func TestInMemoryAddrBook(t *testing.T) { func TestInMemoryKeyBook(t *testing.T) { pt.TestKeyBook(t, func() (pstore.KeyBook, func()) { - ps, err := NewPeerstore(WithEventBus(eventbus.NewBus())) + ps, err := NewPeerstore() require.NoError(t, err) return ps, func() { ps.Close() } }) @@ -61,7 +60,7 @@ func TestInMemoryKeyBook(t *testing.T) { func BenchmarkInMemoryPeerstore(b *testing.B) { pt.BenchmarkPeerstore(b, func() (pstore.Peerstore, func()) { - ps, err := NewPeerstore(WithEventBus(eventbus.NewBus())) + ps, err := NewPeerstore() require.NoError(b, err) return ps, func() { ps.Close() } }, "InMem") @@ -69,7 +68,7 @@ func BenchmarkInMemoryPeerstore(b *testing.B) { func BenchmarkInMemoryKeyBook(b *testing.B) { pt.BenchmarkKeyBook(b, func() (pstore.KeyBook, func()) { - ps, err := NewPeerstore(WithEventBus(eventbus.NewBus())) + ps, err := NewPeerstore() require.NoError(b, err) return ps, func() { ps.Close() } }) diff --git a/p2p/host/peerstore/pstoremem/peerstore.go b/p2p/host/peerstore/pstoremem/peerstore.go index b44430093a..2aefcf1803 100644 --- a/p2p/host/peerstore/pstoremem/peerstore.go +++ b/p2p/host/peerstore/pstoremem/peerstore.go @@ -1,17 +1,13 @@ package pstoremem import ( - "errors" "fmt" "io" - "github.com/libp2p/go-libp2p-core/event" - "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" pstore "github.com/libp2p/go-libp2p-peerstore" - "github.com/libp2p/go-libp2p-peerstore/pstoremanager" ) type pstoremem struct { @@ -21,45 +17,21 @@ type pstoremem struct { *memoryAddrBook *memoryProtoBook *memoryPeerMetadata - - eventBus event.Bus - manager *pstoremanager.PeerstoreManager } var _ peerstore.Peerstore = &pstoremem{} type Option interface{} -type PeerstoreOption func(*pstoremem) error - -// WithEventBus sets the eventBus that is used to subscribe to EvtPeerConnectednessChanged events. -// This allows the automatic clean up when a peer disconnect. -func WithEventBus(eventBus event.Bus) PeerstoreOption { - return func(ps *pstoremem) error { - ps.eventBus = eventBus - return nil - } -} // NewPeerstore creates an in-memory threadsafe collection of peers. -// It is recommended to construct the peerstore with an event bus, using the WithEventBus option. -// In that case, the peerstore will automatically perform cleanups when a peer disconnects -// (see the pstoremanager package for details). -// If constructed without an event bus, it's the caller's responsibility to call RemovePeer to ensure +// It's the caller's responsibility to call RemovePeer to ensure // that memory consumption of the peerstore doesn't grow unboundedly. func NewPeerstore(opts ...Option) (*pstoremem, error) { - var ( - protoBookOpts []ProtoBookOption - peerstoreOpts []PeerstoreOption - managerOpts []pstoremanager.Option - ) + var protoBookOpts []ProtoBookOption for _, opt := range opts { switch o := opt.(type) { - case PeerstoreOption: - peerstoreOpts = append(peerstoreOpts, o) case ProtoBookOption: protoBookOpts = append(protoBookOpts, o) - case pstoremanager.Option: - managerOpts = append(managerOpts, o) default: return nil, fmt.Errorf("unexpected peer store option: %v", o) } @@ -68,34 +40,13 @@ func NewPeerstore(opts ...Option) (*pstoremem, error) { if err != nil { return nil, err } - pstore := &pstoremem{ + return &pstoremem{ Metrics: pstore.NewMetrics(), memoryKeyBook: NewKeyBook(), memoryAddrBook: NewAddrBook(), memoryProtoBook: pb, memoryPeerMetadata: NewPeerMetadata(), - } - for _, opt := range peerstoreOpts { - if err := opt(pstore); err != nil { - return nil, err - } - } - if pstore.eventBus == nil && len(managerOpts) > 0 { - return nil, errors.New("peer store manager options set an event bus") - } - if pstore.eventBus != nil { - manager, err := pstoremanager.NewPeerstoreManager(pstore, pstore.eventBus, managerOpts...) - if err != nil { - pstore.Close() - return nil, err - } - pstore.manager = manager - } - return pstore, nil -} - -func (ps *pstoremem) Start() { - ps.manager.Start() + }, nil } func (ps *pstoremem) Close() (err error) { @@ -107,9 +58,6 @@ func (ps *pstoremem) Close() (err error) { } } } - if ps.manager != nil { - weakClose("manager", ps.manager) - } weakClose("keybook", ps.memoryKeyBook) weakClose("addressbook", ps.memoryAddrBook) weakClose("protobook", ps.memoryProtoBook) @@ -150,9 +98,6 @@ func (ps *pstoremem) PeerInfo(p peer.ID) peer.AddrInfo { // * the PeerMetadata // * the Metrics // It DOES NOT remove the peer from the AddrBook. -// It is only necessary to call this function if the peerstore was constructed without an event bus. -// If the peerstore was constructed with an event bus, peers are removed -// automatically when they disconnect (after a grace period). func (ps *pstoremem) RemovePeer(p peer.ID) { ps.memoryKeyBook.RemovePeer(p) ps.memoryProtoBook.RemovePeer(p)