Skip to content

Commit

Permalink
Expose proposal storage
Browse files Browse the repository at this point in the history
  • Loading branch information
Waldz committed Feb 28, 2020
1 parent 7295754 commit 81fb0fc
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 9 deletions.
3 changes: 2 additions & 1 deletion cmd/di_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ func (di *Dependencies) bootstrapDiscoveryComponents(options node.OptionsDiscove
case node.DiscoveryTypeBroker:
discoveryRegistry.AddRegistry(brokerdiscovery.NewRegistry(di.BrokerConnection))

brokerRepository := brokerdiscovery.NewRepository(di.BrokerConnection, di.EventBus, options.PingInterval+time.Second, 1*time.Second)
storage := brokerdiscovery.NewStorage(di.EventBus)
brokerRepository := brokerdiscovery.NewRepository(di.BrokerConnection, storage, options.PingInterval+time.Second, 1*time.Second)
if options.FetchEnabled {
di.DiscoveryWorker = brokerRepository
if err := di.DiscoveryWorker.Start(); err != nil {
Expand Down
5 changes: 2 additions & 3 deletions core/discovery/brokerdiscovery/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/mysteriumnetwork/node/communication"
"github.com/mysteriumnetwork/node/communication/nats"
"github.com/mysteriumnetwork/node/core/discovery/proposal"
"github.com/mysteriumnetwork/node/eventbus"
"github.com/mysteriumnetwork/node/market"
)

Expand All @@ -45,12 +44,12 @@ type Repository struct {
// NewRepository constructs a new proposal repository (backed by the broker).
func NewRepository(
connection nats.Connection,
eventPublisher eventbus.Publisher,
storage *ProposalStorage,
proposalTimeoutInterval time.Duration,
proposalCheckInterval time.Duration,
) *Repository {
return &Repository{
storage: NewStorage(eventPublisher),
storage: storage,
receiver: nats.NewReceiver(connection, communication.NewCodecJSON(), "*"),
timeoutInterval: proposalTimeoutInterval,

Expand Down
10 changes: 5 additions & 5 deletions core/discovery/brokerdiscovery/repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func Test_Subscriber_StartSyncsNewProposals(t *testing.T) {
connection := nats.StartConnectionMock()
defer connection.Close()

repo := NewRepository(connection, eventbus.New(), 10*time.Millisecond, 1*time.Millisecond)
repo := NewRepository(connection, NewStorage(eventbus.New()), 10*time.Millisecond, 1*time.Millisecond)
err := repo.Start()
defer repo.Stop()
assert.NoError(t, err)
Expand All @@ -89,7 +89,7 @@ func Test_Subscriber_SkipUnsupportedProposal(t *testing.T) {
connection := nats.StartConnectionMock()
defer connection.Close()

repo := NewRepository(connection, eventbus.New(), 10*time.Millisecond, 1*time.Millisecond)
repo := NewRepository(connection, NewStorage(eventbus.New()), 10*time.Millisecond, 1*time.Millisecond)
err := repo.Start()
defer repo.Stop()
assert.NoError(t, err)
Expand All @@ -107,7 +107,7 @@ func Test_Subscriber_StartSyncsIdleProposals(t *testing.T) {
connection := nats.StartConnectionMock()
defer connection.Close()

repo := NewRepository(connection, eventbus.New(), 10*time.Millisecond, 1*time.Millisecond)
repo := NewRepository(connection, NewStorage(eventbus.New()), 10*time.Millisecond, 1*time.Millisecond)
err := repo.Start()
defer repo.Stop()
assert.NoError(t, err)
Expand All @@ -122,7 +122,7 @@ func Test_Subscriber_StartSyncsHealthyProposals(t *testing.T) {
connection := nats.StartConnectionMock()
defer connection.Close()

repo := NewRepository(connection, eventbus.New(), 10*time.Millisecond, 1*time.Millisecond)
repo := NewRepository(connection, NewStorage(eventbus.New()), 10*time.Millisecond, 1*time.Millisecond)
err := repo.Start()
defer repo.Stop()
assert.NoError(t, err)
Expand All @@ -143,7 +143,7 @@ func Test_Subscriber_StartSyncsStoppedProposals(t *testing.T) {
connection := nats.StartConnectionMock()
defer connection.Close()

repo := NewRepository(connection, eventbus.New(), 10*time.Millisecond, 1*time.Millisecond)
repo := NewRepository(connection, NewStorage(eventbus.New()), 10*time.Millisecond, 1*time.Millisecond)
repo.storage.AddProposal(proposalFirst, proposalSecond)
err := repo.Start()
defer repo.Stop()
Expand Down

0 comments on commit 81fb0fc

Please sign in to comment.