Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Turn on proposal synchronisation from Broker discovery #1479

Merged
merged 8 commits into from
Jan 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 30 additions & 23 deletions cmd/di.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
discovery_api "github.com/mysteriumnetwork/node/core/discovery/api"
discovery_broker "github.com/mysteriumnetwork/node/core/discovery/broker"
discovery_composite "github.com/mysteriumnetwork/node/core/discovery/composite"
discovery_noop "github.com/mysteriumnetwork/node/core/discovery/noop"
"github.com/mysteriumnetwork/node/core/ip"
"github.com/mysteriumnetwork/node/core/location"
"github.com/mysteriumnetwork/node/core/node"
Expand Down Expand Up @@ -129,10 +130,9 @@ type Dependencies struct {
IdentityRegistry identity_registry.IdentityRegistry
IdentitySelector identity_selector.Handler

DiscoveryFactory service.DiscoveryFactory
DiscoveryFinder *discovery.Finder
ProposalStorage *discovery.ProposalStorage
DiscoveryFetcherAPI discovery_api.Fetcher
DiscoveryFactory service.DiscoveryFactory
DiscoveryStorage *discovery.ProposalStorage
DiscoveryFinder discovery.ProposalFinder

QualityMetricsSender *quality.Sender
QualityClient *quality.MysteriumMORQA
Expand Down Expand Up @@ -263,7 +263,7 @@ func (di *Dependencies) Bootstrap(nodeOptions node.Options) error {
if err = di.subscribeEventConsumers(); err != nil {
return err
}
if err = di.DiscoveryFetcherAPI.Start(); err != nil {
if err = di.DiscoveryFinder.Start(); err != nil {
return err
}
if err := di.Node.Start(); err != nil {
Expand Down Expand Up @@ -364,8 +364,8 @@ func (di *Dependencies) Shutdown() (err error) {
errs = append(errs, err)
}
}
if di.DiscoveryFetcherAPI != nil {
di.DiscoveryFetcherAPI.Stop()
if di.DiscoveryFinder != nil {
di.DiscoveryFinder.Stop()
}
if di.Storage != nil {
if err := di.Storage.Close(); err != nil {
Expand Down Expand Up @@ -554,10 +554,10 @@ func (di *Dependencies) bootstrapTequilapi(nodeOptions node.Options, listener ne
tequilapi_endpoints.AddRouteForStop(router, utils.SoftKiller(di.Shutdown))
tequilapi_endpoints.AddRoutesForAuthentication(router, di.Authenticator, di.JWTAuthenticator)
tequilapi_endpoints.AddRoutesForIdentities(router, di.IdentityManager, di.IdentitySelector, di.IdentityRegistry, nodeOptions.Transactor.RegistryAddress, channelImplementation, di.ConsumerBalanceTracker.GetBalance)
tequilapi_endpoints.AddRoutesForConnection(router, di.ConnectionManager, di.StatisticsTracker, di.DiscoveryFinder, di.IdentityRegistry)
tequilapi_endpoints.AddRoutesForConnection(router, di.ConnectionManager, di.StatisticsTracker, di.DiscoveryStorage, di.IdentityRegistry)
tequilapi_endpoints.AddRoutesForConnectionSessions(router, di.SessionStorage)
tequilapi_endpoints.AddRoutesForConnectionLocation(router, di.ConnectionManager, di.IPResolver, di.LocationResolver, di.LocationResolver)
tequilapi_endpoints.AddRoutesForProposals(router, di.DiscoveryFinder, di.QualityClient)
tequilapi_endpoints.AddRoutesForProposals(router, di.DiscoveryStorage, di.QualityClient)
tequilapi_endpoints.AddRoutesForService(router, di.ServicesManager, serviceTypesRequestParser, nodeOptions.AccessPolicyEndpointAddress)
tequilapi_endpoints.AddRoutesForServiceSessions(router, di.StateKeeper)
tequilapi_endpoints.AddRoutesForPayout(router, di.IdentityManager, di.SignerFactory, di.MysteriumAPI)
Expand Down Expand Up @@ -726,35 +726,42 @@ func (di *Dependencies) bootstrapIdentityComponents(options node.Options) {
}

func (di *Dependencies) bootstrapDiscoveryComponents(options node.OptionsDiscovery) error {
registry := discovery_composite.NewRegistry()
di.DiscoveryStorage = discovery.NewStorage()

discoveryRegistry := discovery_composite.NewRegistry()
discoveryFinder := discovery_composite.NewFinder()
for _, discoveryType := range options.Types {
switch discoveryType {
case node.DiscoveryTypeAPI:
registry.AddRegistry(
discoveryRegistry.AddRegistry(
discovery_api.NewRegistry(di.MysteriumAPI),
)

if !options.ProposalFetcherEnabled {
discoveryFinder.AddFinder(discovery_noop.NewFinder())
} else {
discoveryFinder.AddFinder(
discovery_api.NewFinder(di.DiscoveryStorage, di.MysteriumAPI.Proposals, 30*time.Minute),
)
}
case node.DiscoveryTypeBroker:
registry.AddRegistry(
discoveryRegistry.AddRegistry(
discovery_broker.NewRegistry(di.BrokerConnection),
)
discoveryFinder.AddFinder(
// Proposals are pinged each 60 seconds, see `discovery.NewService()`
// So timeout proposals after 61 second (1 second inactivity tolerated)
discovery_broker.NewFinder(di.DiscoveryStorage, di.BrokerConnection, 61*time.Second, 1*time.Second),
)
default:
return errors.Errorf("unknown discovery adapter: %s", discoveryType)
}
}

di.DiscoveryFinder = discoveryFinder
di.DiscoveryFactory = func() service.Discovery {
return discovery.NewService(di.IdentityRegistry, registry, di.SignerFactory, di.EventBus)
return discovery.NewService(di.IdentityRegistry, discoveryRegistry, 60*time.Second, di.SignerFactory, di.EventBus)
}

di.ProposalStorage = discovery.NewStorage()
di.DiscoveryFinder = discovery.NewFinder(di.ProposalStorage)

if !options.ProposalFetcherEnabled {
di.DiscoveryFetcherAPI = discovery_api.NewNoopFetcher()
} else {
di.DiscoveryFetcherAPI = discovery_api.NewFetcher(di.ProposalStorage, di.MysteriumAPI.Proposals, 30*time.Second)
}

return nil
}

Expand Down
96 changes: 0 additions & 96 deletions core/discovery/api/fetcher.go

This file was deleted.

77 changes: 55 additions & 22 deletions core/discovery/api/finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,40 +18,73 @@
package api

import (
"time"

"github.com/mysteriumnetwork/node/core/discovery"
"github.com/mysteriumnetwork/node/market"
"github.com/mysteriumnetwork/node/market/mysterium"
"github.com/rs/zerolog/log"
)

// finderAPI implements ProposalFinder, which finds proposals from Mysterium API
// FetchCallback does real fetch of proposals through Mysterium API
type FetchCallback func() ([]market.ServiceProposal, error)

// finderAPI represents async proposal fetcher from Mysterium API
type finderAPI struct {
mysteriumAPI *mysterium.MysteriumAPI
fetch FetchCallback
fetchInterval time.Duration
fetchShutdown chan bool

proposalStorage *discovery.ProposalStorage
}

// NewFinder creates new instance of finderAPI
func NewFinder(mysteriumAPI *mysterium.MysteriumAPI) *finderAPI {
// NewFinder create instance of API finder
func NewFinder(proposalsStorage *discovery.ProposalStorage, callback FetchCallback, interval time.Duration) *finderAPI {
return &finderAPI{
mysteriumAPI: mysteriumAPI,
fetch: callback,
fetchInterval: interval,

proposalStorage: proposalsStorage,
}
}

// GetProposal fetches service proposal from discovery by exact ID
func (finder *finderAPI) GetProposal(id market.ProposalID) (*market.ServiceProposal, error) {
proposals, err := finder.mysteriumAPI.QueryProposals(mysterium.ProposalsQuery{
NodeKey: id.ProviderID,
ServiceType: id.ServiceType,
})
if err != nil {
return nil, err
}
if len(proposals) == 0 {
return nil, nil
}
// Start begins fetching proposals to storage
func (fa *finderAPI) Start() error {
go func() {
if err := fa.fetchDo(); err != nil {
log.Warn().Err(err).Msg("Initial proposal fetch failed, continuing")
}
}()

fa.fetchShutdown = make(chan bool, 1)
go fa.fetchLoop()

return &proposals[0], nil
return nil
}

// FindProposals fetches currently active service proposals from discovery by given filter
func (finder *finderAPI) FindProposals(filter discovery.ProposalFilter) ([]market.ServiceProposal, error) {
return finder.mysteriumAPI.QueryProposals(filter.ToAPIQuery())
// Stop ends fetching proposals to storage
func (fa *finderAPI) Stop() {
fa.fetchShutdown <- true
}

func (fa *finderAPI) fetchLoop() {
for {
select {
case <-fa.fetchShutdown:
break
case <-time.After(fa.fetchInterval):
_ = fa.fetchDo()
}
}
}

func (fa *finderAPI) fetchDo() error {
proposals, err := fa.fetch()
if err != nil {
log.Warn().Err(err).Msg("Failed to fetch proposals")
return err
}

log.Debug().Msgf("Proposals fetched: %d", len(proposals))
fa.proposalStorage.AddProposal(proposals...)
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func waitForInitialFetch() {

func Test_Fetcher_StartFetchesInitialProposals(t *testing.T) {
storage := discovery.NewStorage()
fetcher := NewFetcher(storage, proposalsCurrent.Fetch, time.Hour)
fetcher := NewFinder(storage, proposalsCurrent.Fetch, time.Hour)

proposalsCurrent.Mock(proposalFirst, proposalSecond)
err := fetcher.Start()
Expand All @@ -72,7 +72,7 @@ func Test_Fetcher_StartFetchesInitialProposals(t *testing.T) {

func Test_Fetcher_StartFetchesNewProposals(t *testing.T) {
storage := discovery.NewStorage()
fetcher := NewFetcher(storage, proposalsCurrent.Fetch, time.Millisecond)
fetcher := NewFinder(storage, proposalsCurrent.Fetch, time.Millisecond)

err := fetcher.Start()
defer fetcher.Stop()
Expand Down
12 changes: 6 additions & 6 deletions core/discovery/api/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,16 @@ func NewRegistry(mysteriumAPI *mysterium.MysteriumAPI) *registryAPI {
}

// RegisterProposal registers service proposal to discovery service
func (registry *registryAPI) RegisterProposal(proposal market.ServiceProposal, signer identity.Signer) error {
return registry.mysteriumAPI.RegisterProposal(proposal, signer)
func (ra *registryAPI) RegisterProposal(proposal market.ServiceProposal, signer identity.Signer) error {
return ra.mysteriumAPI.RegisterProposal(proposal, signer)
}

// UnregisterProposal unregisters a service proposal when client disconnects
func (registry *registryAPI) UnregisterProposal(proposal market.ServiceProposal, signer identity.Signer) error {
return registry.mysteriumAPI.UnregisterProposal(proposal, signer)
func (ra *registryAPI) UnregisterProposal(proposal market.ServiceProposal, signer identity.Signer) error {
return ra.mysteriumAPI.UnregisterProposal(proposal, signer)
}

// PingProposal pings service proposal as being alive
func (registry *registryAPI) PingProposal(proposal market.ServiceProposal, signer identity.Signer) error {
return registry.mysteriumAPI.PingProposal(proposal, signer)
func (ra *registryAPI) PingProposal(proposal market.ServiceProposal, signer identity.Signer) error {
return ra.mysteriumAPI.PingProposal(proposal, signer)
}
Loading