Skip to content

Commit

Permalink
Merge pull request #1561 from mysteriumnetwork/feature/sync-policy-rules
Browse files Browse the repository at this point in the history
Synchronise access policies with TrustOracle
  • Loading branch information
Waldz committed Jan 29, 2020
2 parents d40f0e5 + 85e71e7 commit 0c45ddb
Show file tree
Hide file tree
Showing 21 changed files with 737 additions and 171 deletions.
8 changes: 4 additions & 4 deletions cmd/commands/cli/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (c *cliApp) serviceStart(providerID, serviceType string, args ...string) {
}

ap := tequilapi_client.AccessPoliciesRequest{
IDs: sharedOpts.AccessPolicies,
IDs: sharedOpts.AccessPolicyList,
}

service, err := c.tequilapi.ServiceStart(providerID, serviceType, opts, ap)
Expand Down Expand Up @@ -665,7 +665,7 @@ func newAutocompleter(tequilapi *tequilapi_client.Client, proposals []tequilapi_
)
}

func parseStartFlags(serviceType string, args ...string) (service.Options, config.Options, error) {
func parseStartFlags(serviceType string, args ...string) (service.Options, config.ServicesOptions, error) {
var flags []cli.Flag
config.RegisterFlagsServiceShared(&flags)
config.RegisterFlagsServiceOpenvpn(&flags)
Expand All @@ -677,7 +677,7 @@ func parseStartFlags(serviceType string, args ...string) (service.Options, confi
}

if err := set.Parse(args); err != nil {
return nil, config.Options{}, err
return nil, config.ServicesOptions{}, err
}

ctx := cli.NewContext(nil, set, nil)
Expand All @@ -694,5 +694,5 @@ func parseStartFlags(serviceType string, args ...string) (service.Options, confi
return openvpn_service.GetOptions(), services.SharedConfiguredOptions(), nil
}

return nil, config.Options{}, errors.New("service type not found")
return nil, config.ServicesOptions{}, errors.New("service type not found")
}
2 changes: 1 addition & 1 deletion cmd/commands/service/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func NewCommand(licenseCommandName string) *cli.Command {
tequilapi: client.NewClient(nodeOptions.TequilapiAddress, nodeOptions.TequilapiPort),
errorChannel: quit,
ap: client.AccessPoliciesRequest{
IDs: services.SharedConfiguredOptions().AccessPolicies,
IDs: services.SharedConfiguredOptions().AccessPolicyList,
},
}

Expand Down
14 changes: 11 additions & 3 deletions cmd/di.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/mysteriumnetwork/node/core/location"
"github.com/mysteriumnetwork/node/core/node"
nodevent "github.com/mysteriumnetwork/node/core/node/event"
"github.com/mysteriumnetwork/node/core/policy"
"github.com/mysteriumnetwork/node/core/quality"
"github.com/mysteriumnetwork/node/core/service"
"github.com/mysteriumnetwork/node/core/state"
Expand All @@ -67,6 +68,7 @@ import (
"github.com/mysteriumnetwork/node/nat/traversal"
"github.com/mysteriumnetwork/node/nat/upnp"
"github.com/mysteriumnetwork/node/requests"
"github.com/mysteriumnetwork/node/services"
service_noop "github.com/mysteriumnetwork/node/services/noop"
service_openvpn "github.com/mysteriumnetwork/node/services/openvpn"
"github.com/mysteriumnetwork/node/services/openvpn/discovery/dto"
Expand Down Expand Up @@ -124,6 +126,8 @@ type Dependencies struct {
IPResolver ip.Resolver
LocationResolver *location.Cache

PolicyRepository *policy.Repository

StatisticsTracker *statistics.SessionStatisticsTracker
StatisticsReporter *statistics.SessionStatisticsReporter
SessionStorage *consumer_session.Storage
Expand Down Expand Up @@ -217,7 +221,7 @@ func (di *Dependencies) Bootstrap(nodeOptions node.Options) error {
return err
}

if err := di.bootstrapServices(nodeOptions); err != nil {
if err := di.bootstrapServices(nodeOptions, services.SharedConfiguredOptions()); err != nil {
return err
}

Expand Down Expand Up @@ -337,6 +341,10 @@ func (di *Dependencies) Shutdown() (err error) {
}
}

if di.PolicyRepository != nil {
di.PolicyRepository.Stop()
}

if di.NATService != nil {
if err := di.NATService.Disable(); err != nil {
errs = append(errs, err)
Expand Down Expand Up @@ -541,10 +549,10 @@ func (di *Dependencies) bootstrapTequilapi(nodeOptions node.Options, listener ne
tequilapi_endpoints.AddRoutesForConnectionSessions(router, di.SessionStorage)
tequilapi_endpoints.AddRoutesForConnectionLocation(router, di.ConnectionManager, di.IPResolver, di.LocationResolver, di.LocationResolver)
tequilapi_endpoints.AddRoutesForProposals(router, di.ProposalRepository, di.QualityClient)
tequilapi_endpoints.AddRoutesForService(router, di.ServicesManager, serviceTypesRequestParser, nodeOptions.AccessPolicyEndpointAddress)
tequilapi_endpoints.AddRoutesForService(router, di.ServicesManager, serviceTypesRequestParser)
tequilapi_endpoints.AddRoutesForServiceSessions(router, di.StateKeeper)
tequilapi_endpoints.AddRoutesForPayout(router, di.IdentityManager, di.SignerFactory, di.MysteriumAPI)
tequilapi_endpoints.AddRoutesForAccessPolicies(di.HTTPClient, router, nodeOptions.AccessPolicyEndpointAddress)
tequilapi_endpoints.AddRoutesForAccessPolicies(di.HTTPClient, router, services.SharedConfiguredOptions().AccessPolicyAddress)
tequilapi_endpoints.AddRoutesForNAT(router, di.StateKeeper.GetState)
tequilapi_endpoints.AddRoutesForSSE(router, di.SSEHandler)
tequilapi_endpoints.AddRoutesForTransactor(router, di.Transactor, di.AccountantPromiseSettler)
Expand Down
37 changes: 14 additions & 23 deletions cmd/di_desktop.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/mysteriumnetwork/node/communication"
nats_dialog "github.com/mysteriumnetwork/node/communication/nats/dialog"
"github.com/mysteriumnetwork/node/config"
"github.com/mysteriumnetwork/node/core/location"
"github.com/mysteriumnetwork/node/core/node"
"github.com/mysteriumnetwork/node/core/policy"
"github.com/mysteriumnetwork/node/core/port"
"github.com/mysteriumnetwork/node/core/service"
"github.com/mysteriumnetwork/node/identity"
Expand All @@ -53,8 +55,8 @@ import (
)

// bootstrapServices loads all the components required for running services
func (di *Dependencies) bootstrapServices(nodeOptions node.Options) error {
err := di.bootstrapServiceComponents(nodeOptions)
func (di *Dependencies) bootstrapServices(nodeOptions node.Options, servicesOptions config.ServicesOptions) error {
err := di.bootstrapServiceComponents(nodeOptions, servicesOptions)
if err != nil {
return errors.Wrap(err, "service bootstrap failed")
}
Expand Down Expand Up @@ -212,39 +214,28 @@ func (di *Dependencies) bootstrapAccountantPromiseSettler(nodeOptions node.Optio
}

// bootstrapServiceComponents initiates ServicesManager dependency
func (di *Dependencies) bootstrapServiceComponents(nodeOptions node.Options) error {
func (di *Dependencies) bootstrapServiceComponents(nodeOptions node.Options, servicesOptions config.ServicesOptions) error {
di.NATService = nat.NewService()
if err := di.NATService.Enable(); err != nil {
log.Warn().Err(err).Msg("Failed to enable NAT forwarding")
}
di.ServiceRegistry = service.NewRegistry()
storage := session.NewEventBasedStorage(di.EventBus, session.NewStorageMemory())
di.ServiceSessionStorage = storage

err := storage.Subscribe()
if err != nil {
return errors.Wrap(err, "could not bootstrap service components")
storage := session.NewEventBasedStorage(di.EventBus, session.NewStorageMemory())
if err := storage.Subscribe(); err != nil {
return errors.Wrap(err, "could not subscribe session to node events")
}
di.ServiceSessionStorage = storage

newDialogWaiter := func(providerID identity.Identity, serviceType string, allowedIDs []identity.Identity) (communication.DialogWaiter, error) {
allowedIdentityValidator := func(peerID identity.Identity) error {
if len(allowedIDs) == 0 {
return nil
}

for _, id := range allowedIDs {
if peerID.Address == id.Address {
return nil
}
}
return errors.New("identity is not allowed")
}
di.PolicyRepository = policy.NewRepository(di.HTTPClient, servicesOptions.AccessPolicyAddress, servicesOptions.AccessPolicyFetchInterval)
di.PolicyRepository.Start()

newDialogWaiter := func(providerID identity.Identity, serviceType string, policies *[]market.AccessPolicy) (communication.DialogWaiter, error) {
return nats_dialog.NewDialogWaiter(
di.BrokerConnection,
fmt.Sprintf("%v.%v", providerID.Address, serviceType),
di.SignerFactory(providerID),
allowedIdentityValidator,
policy.ValidateAllowedIdentity(di.PolicyRepository, policies),
), nil
}
newDialogHandler := func(proposal market.ServiceProposal, configProvider session.ConfigProvider, serviceID string) (communication.DialogHandler, error) {
Expand Down Expand Up @@ -279,7 +270,7 @@ func (di *Dependencies) bootstrapServiceComponents(nodeOptions node.Options) err
newDialogHandler,
di.DiscoveryFactory,
di.EventBus,
di.HTTPClient,
di.PolicyRepository,
)

serviceCleaner := service.Cleaner{SessionStorage: di.ServiceSessionStorage}
Expand Down
17 changes: 12 additions & 5 deletions communication/nats/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,18 @@ func (receiver *receiverNATS) ReceiveUnsubscribe(endpoint communication.MessageE
defer receiver.mu.Unlock()

messageTopic := receiver.messageTopic + string(endpoint)
if subscription, found := receiver.subs[messageTopic]; found {
if err := subscription.Unsubscribe(); err != nil {
log.Error().Err(err).Msg("Failed to unsubscribe from topic: " + messageTopic)
}
log.Info().Msg("Unsubscribed from " + messageTopic)
subscription, found := receiver.subs[messageTopic]
if !found {
log.Error().Msg("Unknown topic to unsubscribe: " + messageTopic)
return
}

if err := subscription.Unsubscribe(); err != nil {
log.Error().Err(err).Msg("Failed to unsubscribe from topic: " + messageTopic)
return
}

log.Info().Msg("Unsubscribed from " + messageTopic)
}

func (receiver *receiverNATS) Unsubscribe() {
Expand All @@ -100,6 +106,7 @@ func (receiver *receiverNATS) Unsubscribe() {
for topic, s := range receiver.subs {
if err := s.Unsubscribe(); err != nil {
log.Error().Err(err).Msg("Failed to unsubscribe from topic: " + topic)
return
}
log.Info().Msg("Unsubscribed from " + topic)
}
Expand Down
2 changes: 1 addition & 1 deletion config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func TestConfig_ParseStringSliceFlag(t *testing.T) {
// given
sliceFlag := cli.StringSliceFlag{
Name: "discovery.type",
Usage: `Proposal discovery adapter(s) separated by comma Options: { "api", "broker", "api,broker" }`,
Usage: `Proposal discovery adapter(s) separated by comma. Options: { "api", "broker", "api,broker" }`,
Value: tc.defaults,
}
cfg := NewConfig()
Expand Down
8 changes: 0 additions & 8 deletions config/flags_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,6 @@ var (
Usage: "URL of Mysterium API",
Value: metadata.DefaultNetwork.MysteriumAPIAddress,
}
// FlagAccessPolicyAddress Trust oracle URL for retrieving access policies.
FlagAccessPolicyAddress = cli.StringFlag{
Name: "access-policy-address",
Usage: "URL of trust oracle endpoint for retrieving lists of access policies",
Value: metadata.DefaultNetwork.AccessPolicyOracleAddress,
}
// FlagBrokerAddress message broker URI.
FlagBrokerAddress = cli.StringFlag{
Name: "broker-address",
Expand Down Expand Up @@ -82,7 +76,6 @@ func RegisterFlagsNetwork(flags *[]cli.Flag) {
&FlagBrokerAddress,
&FlagEtherRPC,
&FlagQualityOracleAddress,
&FlagAccessPolicyAddress,
)
}

Expand All @@ -91,7 +84,6 @@ func ParseFlagsNetwork(ctx *cli.Context) {
Current.ParseBoolFlag(ctx, FlagTestnet)
Current.ParseBoolFlag(ctx, FlagLocalnet)
Current.ParseStringFlag(ctx, FlagAPIAddress)
Current.ParseStringFlag(ctx, FlagAccessPolicyAddress)
Current.ParseStringFlag(ctx, FlagBrokerAddress)
Current.ParseStringFlag(ctx, FlagEtherRPC)
Current.ParseStringFlag(ctx, FlagQualityOracleAddress)
Expand Down
39 changes: 30 additions & 9 deletions config/flags_service_shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@
package config

import (
"time"

"github.com/mysteriumnetwork/node/metadata"
"github.com/urfave/cli/v2"
)

// Options describes options shared among multiple services
type Options struct {
AccessPolicies []string
ShaperEnabled bool
// ServicesOptions describes options shared among multiple services
type ServicesOptions struct {
AccessPolicyAddress string
AccessPolicyList []string
AccessPolicyFetchInterval time.Duration
ShaperEnabled bool
}

var (
Expand All @@ -45,12 +50,24 @@ var (
Name: "agreed-terms-and-conditions",
Usage: "Agree with terms & conditions",
}
// FlagAccessPolicies a comma-separated list of access policies that determines allowed identities to use the service.
FlagAccessPolicies = cli.StringFlag{
// FlagAccessPolicyAddress Trust oracle URL for retrieving access policies.
FlagAccessPolicyAddress = cli.StringFlag{
Name: "access-policy.address",
Usage: "URL of trust oracle endpoint for retrieving lists of access policies",
Value: metadata.DefaultNetwork.AccessPolicyOracleAddress,
}
// FlagAccessPolicyList a comma-separated list of access policies that determines allowed identities to use the service.
FlagAccessPolicyList = cli.StringFlag{
Name: "access-policy.list",
Usage: "Comma separated list that determines the allowed identities on our service.",
Usage: "Comma separated list that determines the access policies applied to provide service.",
Value: "",
}
// FlagAccessPolicyFetchInterval policy list fetch interval.
FlagAccessPolicyFetchInterval = cli.DurationFlag{
Name: "access-policy.fetch",
Usage: `Proposal fetch interval { "30s", "3m", "1h20m30s" }`,
Value: 10 * time.Minute,
}
// FlagShaperEnabled enables bandwidth limitation.
FlagShaperEnabled = cli.BoolFlag{
Name: "shaper.enabled",
Expand All @@ -64,7 +81,9 @@ func RegisterFlagsServiceShared(flags *[]cli.Flag) {
&FlagIdentity,
&FlagIdentityPassphrase,
&FlagAgreedTermsConditions,
&FlagAccessPolicies,
&FlagAccessPolicyAddress,
&FlagAccessPolicyList,
&FlagAccessPolicyFetchInterval,
&FlagShaperEnabled,
)
}
Expand All @@ -74,6 +93,8 @@ func ParseFlagsServiceShared(ctx *cli.Context) {
Current.ParseStringFlag(ctx, FlagIdentity)
Current.ParseStringFlag(ctx, FlagIdentityPassphrase)
Current.ParseBoolFlag(ctx, FlagAgreedTermsConditions)
Current.ParseStringFlag(ctx, FlagAccessPolicies)
Current.ParseStringFlag(ctx, FlagAccessPolicyAddress)
Current.ParseStringFlag(ctx, FlagAccessPolicyList)
Current.ParseDurationFlag(ctx, FlagAccessPolicyFetchInterval)
Current.ParseBoolFlag(ctx, FlagShaperEnabled)
}
15 changes: 7 additions & 8 deletions core/node/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,13 @@ func GetOptions() *Options {
},
LogOptions: *GetLogOptions(),
OptionsNetwork: OptionsNetwork{
Testnet: config.GetBool(config.FlagTestnet),
Localnet: config.GetBool(config.FlagLocalnet),
ExperimentNATPunching: config.GetBool(config.FlagNATPunching),
MysteriumAPIAddress: config.GetString(config.FlagAPIAddress),
AccessPolicyEndpointAddress: config.GetString(config.FlagAccessPolicyAddress),
BrokerAddress: config.GetString(config.FlagBrokerAddress),
EtherClientRPC: config.GetString(config.FlagEtherRPC),
QualityOracle: config.GetString(config.FlagQualityOracleAddress),
Testnet: config.GetBool(config.FlagTestnet),
Localnet: config.GetBool(config.FlagLocalnet),
ExperimentNATPunching: config.GetBool(config.FlagNATPunching),
MysteriumAPIAddress: config.GetString(config.FlagAPIAddress),
BrokerAddress: config.GetString(config.FlagBrokerAddress),
EtherClientRPC: config.GetString(config.FlagEtherRPC),
QualityOracle: config.GetString(config.FlagQualityOracleAddress),
},
Discovery: *GetDiscoveryOptions(),
MMN: OptionsMMN{
Expand Down
5 changes: 2 additions & 3 deletions core/node/options_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ type OptionsNetwork struct {

ExperimentNATPunching bool

MysteriumAPIAddress string
AccessPolicyEndpointAddress string
BrokerAddress string
MysteriumAPIAddress string
BrokerAddress string

EtherClientRPC string

Expand Down

0 comments on commit 0c45ddb

Please sign in to comment.