Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Synchronise access policies with TrustOracle #1561

Merged
merged 15 commits into from
Jan 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions cmd/commands/cli/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (c *cliApp) serviceStart(providerID, serviceType string, args ...string) {
}

ap := tequilapi_client.AccessPoliciesRequest{
IDs: sharedOpts.AccessPolicies,
IDs: sharedOpts.AccessPolicyList,
}

service, err := c.tequilapi.ServiceStart(providerID, serviceType, opts, ap)
Expand Down Expand Up @@ -665,7 +665,7 @@ func newAutocompleter(tequilapi *tequilapi_client.Client, proposals []tequilapi_
)
}

func parseStartFlags(serviceType string, args ...string) (service.Options, config.Options, error) {
func parseStartFlags(serviceType string, args ...string) (service.Options, config.ServicesOptions, error) {
var flags []cli.Flag
config.RegisterFlagsServiceShared(&flags)
config.RegisterFlagsServiceOpenvpn(&flags)
Expand All @@ -677,7 +677,7 @@ func parseStartFlags(serviceType string, args ...string) (service.Options, confi
}

if err := set.Parse(args); err != nil {
return nil, config.Options{}, err
return nil, config.ServicesOptions{}, err
}

ctx := cli.NewContext(nil, set, nil)
Expand All @@ -694,5 +694,5 @@ func parseStartFlags(serviceType string, args ...string) (service.Options, confi
return openvpn_service.GetOptions(), services.SharedConfiguredOptions(), nil
}

return nil, config.Options{}, errors.New("service type not found")
return nil, config.ServicesOptions{}, errors.New("service type not found")
}
2 changes: 1 addition & 1 deletion cmd/commands/service/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func NewCommand(licenseCommandName string) *cli.Command {
tequilapi: client.NewClient(nodeOptions.TequilapiAddress, nodeOptions.TequilapiPort),
errorChannel: quit,
ap: client.AccessPoliciesRequest{
IDs: services.SharedConfiguredOptions().AccessPolicies,
IDs: services.SharedConfiguredOptions().AccessPolicyList,
},
}

Expand Down
14 changes: 11 additions & 3 deletions cmd/di.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/mysteriumnetwork/node/core/location"
"github.com/mysteriumnetwork/node/core/node"
nodevent "github.com/mysteriumnetwork/node/core/node/event"
"github.com/mysteriumnetwork/node/core/policy"
"github.com/mysteriumnetwork/node/core/quality"
"github.com/mysteriumnetwork/node/core/service"
"github.com/mysteriumnetwork/node/core/state"
Expand All @@ -67,6 +68,7 @@ import (
"github.com/mysteriumnetwork/node/nat/traversal"
"github.com/mysteriumnetwork/node/nat/upnp"
"github.com/mysteriumnetwork/node/requests"
"github.com/mysteriumnetwork/node/services"
service_noop "github.com/mysteriumnetwork/node/services/noop"
service_openvpn "github.com/mysteriumnetwork/node/services/openvpn"
"github.com/mysteriumnetwork/node/services/openvpn/discovery/dto"
Expand Down Expand Up @@ -124,6 +126,8 @@ type Dependencies struct {
IPResolver ip.Resolver
LocationResolver *location.Cache

PolicyRepository *policy.Repository

StatisticsTracker *statistics.SessionStatisticsTracker
StatisticsReporter *statistics.SessionStatisticsReporter
SessionStorage *consumer_session.Storage
Expand Down Expand Up @@ -217,7 +221,7 @@ func (di *Dependencies) Bootstrap(nodeOptions node.Options) error {
return err
}

if err := di.bootstrapServices(nodeOptions); err != nil {
if err := di.bootstrapServices(nodeOptions, services.SharedConfiguredOptions()); err != nil {
return err
}

Expand Down Expand Up @@ -337,6 +341,10 @@ func (di *Dependencies) Shutdown() (err error) {
}
}

if di.PolicyRepository != nil {
di.PolicyRepository.Stop()
}

if di.NATService != nil {
if err := di.NATService.Disable(); err != nil {
errs = append(errs, err)
Expand Down Expand Up @@ -541,10 +549,10 @@ func (di *Dependencies) bootstrapTequilapi(nodeOptions node.Options, listener ne
tequilapi_endpoints.AddRoutesForConnectionSessions(router, di.SessionStorage)
tequilapi_endpoints.AddRoutesForConnectionLocation(router, di.ConnectionManager, di.IPResolver, di.LocationResolver, di.LocationResolver)
tequilapi_endpoints.AddRoutesForProposals(router, di.ProposalRepository, di.QualityClient)
tequilapi_endpoints.AddRoutesForService(router, di.ServicesManager, serviceTypesRequestParser, nodeOptions.AccessPolicyEndpointAddress)
tequilapi_endpoints.AddRoutesForService(router, di.ServicesManager, serviceTypesRequestParser)
tequilapi_endpoints.AddRoutesForServiceSessions(router, di.StateKeeper)
tequilapi_endpoints.AddRoutesForPayout(router, di.IdentityManager, di.SignerFactory, di.MysteriumAPI)
tequilapi_endpoints.AddRoutesForAccessPolicies(di.HTTPClient, router, nodeOptions.AccessPolicyEndpointAddress)
tequilapi_endpoints.AddRoutesForAccessPolicies(di.HTTPClient, router, services.SharedConfiguredOptions().AccessPolicyAddress)
tequilapi_endpoints.AddRoutesForNAT(router, di.StateKeeper.GetState)
tequilapi_endpoints.AddRoutesForSSE(router, di.SSEHandler)
tequilapi_endpoints.AddRoutesForTransactor(router, di.Transactor, di.AccountantPromiseSettler)
Expand Down
37 changes: 14 additions & 23 deletions cmd/di_desktop.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/mysteriumnetwork/node/communication"
nats_dialog "github.com/mysteriumnetwork/node/communication/nats/dialog"
"github.com/mysteriumnetwork/node/config"
"github.com/mysteriumnetwork/node/core/location"
"github.com/mysteriumnetwork/node/core/node"
"github.com/mysteriumnetwork/node/core/policy"
"github.com/mysteriumnetwork/node/core/port"
"github.com/mysteriumnetwork/node/core/service"
"github.com/mysteriumnetwork/node/identity"
Expand All @@ -53,8 +55,8 @@ import (
)

// bootstrapServices loads all the components required for running services
func (di *Dependencies) bootstrapServices(nodeOptions node.Options) error {
err := di.bootstrapServiceComponents(nodeOptions)
func (di *Dependencies) bootstrapServices(nodeOptions node.Options, servicesOptions config.ServicesOptions) error {
err := di.bootstrapServiceComponents(nodeOptions, servicesOptions)
if err != nil {
return errors.Wrap(err, "service bootstrap failed")
}
Expand Down Expand Up @@ -212,39 +214,28 @@ func (di *Dependencies) bootstrapAccountantPromiseSettler(nodeOptions node.Optio
}

// bootstrapServiceComponents initiates ServicesManager dependency
func (di *Dependencies) bootstrapServiceComponents(nodeOptions node.Options) error {
func (di *Dependencies) bootstrapServiceComponents(nodeOptions node.Options, servicesOptions config.ServicesOptions) error {
di.NATService = nat.NewService()
if err := di.NATService.Enable(); err != nil {
log.Warn().Err(err).Msg("Failed to enable NAT forwarding")
}
di.ServiceRegistry = service.NewRegistry()
storage := session.NewEventBasedStorage(di.EventBus, session.NewStorageMemory())
di.ServiceSessionStorage = storage

err := storage.Subscribe()
if err != nil {
return errors.Wrap(err, "could not bootstrap service components")
storage := session.NewEventBasedStorage(di.EventBus, session.NewStorageMemory())
if err := storage.Subscribe(); err != nil {
return errors.Wrap(err, "could not subscribe session to node events")
}
di.ServiceSessionStorage = storage

newDialogWaiter := func(providerID identity.Identity, serviceType string, allowedIDs []identity.Identity) (communication.DialogWaiter, error) {
allowedIdentityValidator := func(peerID identity.Identity) error {
if len(allowedIDs) == 0 {
return nil
}

for _, id := range allowedIDs {
if peerID.Address == id.Address {
return nil
}
}
return errors.New("identity is not allowed")
}
di.PolicyRepository = policy.NewRepository(di.HTTPClient, servicesOptions.AccessPolicyAddress, servicesOptions.AccessPolicyFetchInterval)
di.PolicyRepository.Start()

newDialogWaiter := func(providerID identity.Identity, serviceType string, policies *[]market.AccessPolicy) (communication.DialogWaiter, error) {
return nats_dialog.NewDialogWaiter(
di.BrokerConnection,
fmt.Sprintf("%v.%v", providerID.Address, serviceType),
di.SignerFactory(providerID),
allowedIdentityValidator,
policy.ValidateAllowedIdentity(di.PolicyRepository, policies),
), nil
}
newDialogHandler := func(proposal market.ServiceProposal, configProvider session.ConfigProvider, serviceID string) (communication.DialogHandler, error) {
Expand Down Expand Up @@ -279,7 +270,7 @@ func (di *Dependencies) bootstrapServiceComponents(nodeOptions node.Options) err
newDialogHandler,
di.DiscoveryFactory,
di.EventBus,
di.HTTPClient,
di.PolicyRepository,
)

serviceCleaner := service.Cleaner{SessionStorage: di.ServiceSessionStorage}
Expand Down
17 changes: 12 additions & 5 deletions communication/nats/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,18 @@ func (receiver *receiverNATS) ReceiveUnsubscribe(endpoint communication.MessageE
defer receiver.mu.Unlock()

messageTopic := receiver.messageTopic + string(endpoint)
if subscription, found := receiver.subs[messageTopic]; found {
if err := subscription.Unsubscribe(); err != nil {
log.Error().Err(err).Msg("Failed to unsubscribe from topic: " + messageTopic)
}
log.Info().Msg("Unsubscribed from " + messageTopic)
subscription, found := receiver.subs[messageTopic]
if !found {
log.Error().Msg("Unknown topic to unsubscribe: " + messageTopic)
return
}

if err := subscription.Unsubscribe(); err != nil {
log.Error().Err(err).Msg("Failed to unsubscribe from topic: " + messageTopic)
return
}

log.Info().Msg("Unsubscribed from " + messageTopic)
}

func (receiver *receiverNATS) Unsubscribe() {
Expand All @@ -100,6 +106,7 @@ func (receiver *receiverNATS) Unsubscribe() {
for topic, s := range receiver.subs {
if err := s.Unsubscribe(); err != nil {
log.Error().Err(err).Msg("Failed to unsubscribe from topic: " + topic)
return
}
log.Info().Msg("Unsubscribed from " + topic)
}
Expand Down
2 changes: 1 addition & 1 deletion config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func TestConfig_ParseStringSliceFlag(t *testing.T) {
// given
sliceFlag := cli.StringSliceFlag{
Name: "discovery.type",
Usage: `Proposal discovery adapter(s) separated by comma Options: { "api", "broker", "api,broker" }`,
Usage: `Proposal discovery adapter(s) separated by comma. Options: { "api", "broker", "api,broker" }`,
Value: tc.defaults,
}
cfg := NewConfig()
Expand Down
8 changes: 0 additions & 8 deletions config/flags_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,6 @@ var (
Usage: "URL of Mysterium API",
Value: metadata.DefaultNetwork.MysteriumAPIAddress,
}
// FlagAccessPolicyAddress Trust oracle URL for retrieving access policies.
FlagAccessPolicyAddress = cli.StringFlag{
Name: "access-policy-address",
Usage: "URL of trust oracle endpoint for retrieving lists of access policies",
Value: metadata.DefaultNetwork.AccessPolicyOracleAddress,
}
// FlagBrokerAddress message broker URI.
FlagBrokerAddress = cli.StringFlag{
Name: "broker-address",
Expand Down Expand Up @@ -82,7 +76,6 @@ func RegisterFlagsNetwork(flags *[]cli.Flag) {
&FlagBrokerAddress,
&FlagEtherRPC,
&FlagQualityOracleAddress,
&FlagAccessPolicyAddress,
)
}

Expand All @@ -91,7 +84,6 @@ func ParseFlagsNetwork(ctx *cli.Context) {
Current.ParseBoolFlag(ctx, FlagTestnet)
Current.ParseBoolFlag(ctx, FlagLocalnet)
Current.ParseStringFlag(ctx, FlagAPIAddress)
Current.ParseStringFlag(ctx, FlagAccessPolicyAddress)
Current.ParseStringFlag(ctx, FlagBrokerAddress)
Current.ParseStringFlag(ctx, FlagEtherRPC)
Current.ParseStringFlag(ctx, FlagQualityOracleAddress)
Expand Down
39 changes: 30 additions & 9 deletions config/flags_service_shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@
package config

import (
"time"

"github.com/mysteriumnetwork/node/metadata"
"github.com/urfave/cli/v2"
)

// Options describes options shared among multiple services
type Options struct {
AccessPolicies []string
ShaperEnabled bool
// ServicesOptions describes options shared among multiple services
type ServicesOptions struct {
AccessPolicyAddress string
AccessPolicyList []string
AccessPolicyFetchInterval time.Duration
ShaperEnabled bool
}

var (
Expand All @@ -45,12 +50,24 @@ var (
Name: "agreed-terms-and-conditions",
Usage: "Agree with terms & conditions",
}
// FlagAccessPolicies a comma-separated list of access policies that determines allowed identities to use the service.
FlagAccessPolicies = cli.StringFlag{
// FlagAccessPolicyAddress Trust oracle URL for retrieving access policies.
FlagAccessPolicyAddress = cli.StringFlag{
Name: "access-policy.address",
Usage: "URL of trust oracle endpoint for retrieving lists of access policies",
Value: metadata.DefaultNetwork.AccessPolicyOracleAddress,
}
// FlagAccessPolicyList a comma-separated list of access policies that determines allowed identities to use the service.
FlagAccessPolicyList = cli.StringFlag{
Name: "access-policy.list",
Usage: "Comma separated list that determines the allowed identities on our service.",
Usage: "Comma separated list that determines the access policies applied to provide service.",
Value: "",
}
// FlagAccessPolicyFetchInterval policy list fetch interval.
FlagAccessPolicyFetchInterval = cli.DurationFlag{
Name: "access-policy.fetch",
Usage: `Proposal fetch interval { "30s", "3m", "1h20m30s" }`,
Value: 10 * time.Minute,
}
// FlagShaperEnabled enables bandwidth limitation.
FlagShaperEnabled = cli.BoolFlag{
Name: "shaper.enabled",
Expand All @@ -64,7 +81,9 @@ func RegisterFlagsServiceShared(flags *[]cli.Flag) {
&FlagIdentity,
&FlagIdentityPassphrase,
&FlagAgreedTermsConditions,
&FlagAccessPolicies,
&FlagAccessPolicyAddress,
&FlagAccessPolicyList,
&FlagAccessPolicyFetchInterval,
&FlagShaperEnabled,
)
}
Expand All @@ -74,6 +93,8 @@ func ParseFlagsServiceShared(ctx *cli.Context) {
Current.ParseStringFlag(ctx, FlagIdentity)
Current.ParseStringFlag(ctx, FlagIdentityPassphrase)
Current.ParseBoolFlag(ctx, FlagAgreedTermsConditions)
Current.ParseStringFlag(ctx, FlagAccessPolicies)
Current.ParseStringFlag(ctx, FlagAccessPolicyAddress)
Current.ParseStringFlag(ctx, FlagAccessPolicyList)
Current.ParseDurationFlag(ctx, FlagAccessPolicyFetchInterval)
Current.ParseBoolFlag(ctx, FlagShaperEnabled)
}
15 changes: 7 additions & 8 deletions core/node/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,13 @@ func GetOptions() *Options {
},
LogOptions: *GetLogOptions(),
OptionsNetwork: OptionsNetwork{
Testnet: config.GetBool(config.FlagTestnet),
Localnet: config.GetBool(config.FlagLocalnet),
ExperimentNATPunching: config.GetBool(config.FlagNATPunching),
MysteriumAPIAddress: config.GetString(config.FlagAPIAddress),
AccessPolicyEndpointAddress: config.GetString(config.FlagAccessPolicyAddress),
BrokerAddress: config.GetString(config.FlagBrokerAddress),
EtherClientRPC: config.GetString(config.FlagEtherRPC),
QualityOracle: config.GetString(config.FlagQualityOracleAddress),
Testnet: config.GetBool(config.FlagTestnet),
Localnet: config.GetBool(config.FlagLocalnet),
ExperimentNATPunching: config.GetBool(config.FlagNATPunching),
MysteriumAPIAddress: config.GetString(config.FlagAPIAddress),
BrokerAddress: config.GetString(config.FlagBrokerAddress),
EtherClientRPC: config.GetString(config.FlagEtherRPC),
QualityOracle: config.GetString(config.FlagQualityOracleAddress),
},
Discovery: *GetDiscoveryOptions(),
MMN: OptionsMMN{
Expand Down
5 changes: 2 additions & 3 deletions core/node/options_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ type OptionsNetwork struct {

ExperimentNATPunching bool

MysteriumAPIAddress string
AccessPolicyEndpointAddress string
BrokerAddress string
MysteriumAPIAddress string
BrokerAddress string

EtherClientRPC string

Expand Down