Skip to content

Commit

Permalink
Merge pull request #917 from mysteriumnetwork/master
Browse files Browse the repository at this point in the history
0.7.1
  • Loading branch information
soffokl committed Apr 26, 2019
2 parents 5ce9c6c + 38a630d commit 85b311d
Show file tree
Hide file tree
Showing 19 changed files with 508 additions and 68 deletions.
3 changes: 3 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 25 additions & 9 deletions cmd/commands/cli/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ func (c *cliApp) handleActions(line string) {
{"help", c.help},
{"status", c.status},
{"healthcheck", c.healthcheck},
{"nat", c.natStatus},
{"ip", c.ip},
{"disconnect", c.disconnect},
{"stop", c.stopClient},
Expand All @@ -153,15 +154,15 @@ func (c *cliApp) handleActions(line string) {
command string
handler func(argsString string)
}{
{command: "connect", handler: c.connect},
{command: "unlock", handler: c.unlock},
{command: "identities", handler: c.identities},
{command: "payout", handler: c.payout},
{command: "version", handler: c.version},
{command: "license", handler: c.license},
{command: "registration", handler: c.registration},
{command: "proposals", handler: c.proposals},
{command: "service", handler: c.service},
{"connect", c.connect},
{"unlock", c.unlock},
{"identities", c.identities},
{"payout", c.payout},
{"version", c.version},
{"license", c.license},
{"registration", c.registration},
{"proposals", c.proposals},
{"service", c.service},
}

for _, cmd := range staticCmds {
Expand Down Expand Up @@ -449,6 +450,20 @@ func (c *cliApp) healthcheck() {
info(buildString)
}

func (c *cliApp) natStatus() {
status, err := c.tequilapi.NATStatus()
if err != nil {
warn("Failed to retrieve NAT traversal status:", err)
return
}

if status.Error == "" {
infof("NAT traversal status: %q\n", status.Status)
} else {
infof("NAT traversal status: %q (error: %q)\n", status.Status, status.Error)
}
}

func (c *cliApp) proposals(filter string) {
proposals := c.fetchProposals()
c.fetchedProposals = proposals
Expand Down Expand Up @@ -673,6 +688,7 @@ func newAutocompleter(tequilapi *tequilapi_client.Client, proposals []tequilapi_
),
readline.PcItem("status"),
readline.PcItem("healthcheck"),
readline.PcItem("nat"),
readline.PcItem("proposals"),
readline.PcItem("ip"),
readline.PcItem("disconnect"),
Expand Down
44 changes: 33 additions & 11 deletions cmd/di.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/mysteriumnetwork/node/metrics"
"github.com/mysteriumnetwork/node/money"
"github.com/mysteriumnetwork/node/nat"
"github.com/mysteriumnetwork/node/nat/event"
"github.com/mysteriumnetwork/node/nat/mapping"
"github.com/mysteriumnetwork/node/nat/traversal"
"github.com/mysteriumnetwork/node/nat/traversal/config"
Expand Down Expand Up @@ -95,14 +96,20 @@ type NatPinger interface {

// NatEventTracker is responsible for tracking NAT events
type NatEventTracker interface {
ConsumeNATEvent(event traversal.Event)
LastEvent() *traversal.Event
WaitForEvent() traversal.Event
ConsumeNATEvent(event event.Event)
LastEvent() *event.Event
WaitForEvent() event.Event
}

// NatEventSender is responsible for sending NAT events to metrics server
type NatEventSender interface {
ConsumeNATEvent(event traversal.Event)
ConsumeNATEvent(event event.Event)
}

// NATStatusTracker tracks status of NAT traversal by consuming NAT events
type NATStatusTracker interface {
Status() nat.Status
ConsumeNATEvent(event event.Event)
}

// Dependencies is DI container for top level components which is reused in several places
Expand Down Expand Up @@ -140,9 +147,10 @@ type Dependencies struct {
ServiceRegistry *service.Registry
ServiceSessionStorage *session.StorageMemory

NATPinger NatPinger
NATTracker NatEventTracker
NATEventSender NatEventSender
NATPinger NatPinger
NATTracker NatEventTracker
NATEventSender NatEventSender
NATStatusTracker NATStatusTracker

PortPool *port.Pool

Expand Down Expand Up @@ -299,11 +307,15 @@ func (di *Dependencies) subscribeEventConsumers() error {
}

// NAT events
err = di.EventBus.Subscribe(traversal.EventTopic, di.NATEventSender.ConsumeNATEvent)
err = di.EventBus.Subscribe(event.Topic, di.NATEventSender.ConsumeNATEvent)
if err != nil {
return err
}
err = di.EventBus.Subscribe(event.Topic, di.NATTracker.ConsumeNATEvent)
if err != nil {
return err
}
return di.EventBus.Subscribe(traversal.EventTopic, di.NATTracker.ConsumeNATEvent)
return di.EventBus.Subscribe(event.Topic, di.NATStatusTracker.ConsumeNATEvent)
}

func (di *Dependencies) bootstrapNodeComponents(nodeOptions node.Options) {
Expand Down Expand Up @@ -344,6 +356,7 @@ func (di *Dependencies) bootstrapNodeComponents(nodeOptions node.Options) {
tequilapi_endpoints.AddRoutesForServiceSessions(router, di.ServiceSessionStorage)
tequilapi_endpoints.AddRoutesForPayout(router, di.IdentityManager, di.SignerFactory, di.MysteriumAPI)
tequilapi_endpoints.AddRoutesForAccessPolicies(router, nodeOptions.AccessPolicyEndpointAddress)
tequilapi_endpoints.AddRoutesForNAT(router, di.NATStatusTracker.Status)
identity_registry.AddIdentityRegistrationEndpoint(router, di.IdentityRegistration, di.IdentityRegistry)

corsPolicy := tequilapi.NewMysteriumCorsPolicy()
Expand Down Expand Up @@ -485,8 +498,7 @@ func (di *Dependencies) bootstrapMetrics(options node.Options) {
}

func (di *Dependencies) bootstrapNATComponents(options node.Options) {
di.NATTracker = traversal.NewEventsTracker()
di.NATEventSender = traversal.NewEventsSender(di.MetricsSender, di.IPResolver.GetPublicIP)
di.NATTracker = event.NewTracker()
if options.ExperimentNATPunching {
di.NATPinger = traversal.NewPingerFactory(
di.NATTracker,
Expand All @@ -499,4 +511,14 @@ func (di *Dependencies) bootstrapNATComponents(options node.Options) {
} else {
di.NATPinger = &traversal.NoopPinger{}
}

di.NATEventSender = event.NewSender(di.MetricsSender, di.IPResolver.GetPublicIP)

var lastStageName string
if options.ExperimentNATPunching {
lastStageName = traversal.StageName
} else {
lastStageName = mapping.StageName
}
di.NATStatusTracker = nat.NewStatusTracker(lastStageName)
}
31 changes: 29 additions & 2 deletions mobile/mysterium/openvpn_connection_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,19 @@
package mysterium

import (
"errors"
"sync"

log "github.com/cihub/seelog"
"github.com/mysteriumnetwork/go-openvpn/openvpn3"
"github.com/mysteriumnetwork/node/cmd"
"github.com/mysteriumnetwork/node/consumer"
"github.com/mysteriumnetwork/node/core/connection"
"github.com/mysteriumnetwork/node/core/ip"
"github.com/mysteriumnetwork/node/identity"
"github.com/mysteriumnetwork/node/nat/traversal"
"github.com/mysteriumnetwork/node/services/openvpn"
"github.com/mysteriumnetwork/node/services/openvpn/session"
"github.com/pkg/errors"
)

type openvpn3SessionFactory func(connection.ConnectOptions) (*openvpn3.Session, error)
Expand All @@ -37,6 +40,8 @@ var errSessionWrapperNotStarted = errors.New("session wrapper not started")
type sessionWrapper struct {
session *openvpn3.Session
createSession openvpn3SessionFactory
natPinger cmd.NatPinger
ipResolver ip.Resolver
}

func (wrapper *sessionWrapper) Start(options connection.ConnectOptions) error {
Expand All @@ -63,7 +68,23 @@ func (wrapper *sessionWrapper) Wait() error {
}

func (wrapper *sessionWrapper) GetConfig() (connection.ConsumerConfig, error) {
return nil, nil
ip, err := wrapper.ipResolver.GetPublicIP()
if err != nil {
return nil, errors.Wrap(err, "failed to get consumer config")
}

switch wrapper.natPinger.(type) {
case *traversal.NoopPinger:
log.Infof("noop pinger detected, returning nil client config.")
return nil, nil
}

return &openvpn.ConsumerConfig{
// TODO: since GetConfig is executed before Start we cannot access VPNConfig structure yet
// TODO skip sending port here, since provider generates port for consumer in VPNConfig
Port: 50221,
IP: &ip,
}, nil
}

func channelToCallbacks(stateChannel connection.StateChannel, statisticsChannel connection.StatisticsChannel) openvpn3.MobileSessionCallbacks {
Expand Down Expand Up @@ -156,6 +177,8 @@ type OpenvpnConnectionFactory struct {
sessionTracker *sessionTracker
signerFactory identity.SignerFactory
tunnelSetup Openvpn3TunnelSetup
natPinger cmd.NatPinger
ipResolver ip.Resolver
}

// Create creates a new openvpn connection
Expand Down Expand Up @@ -194,6 +217,8 @@ func (ocf *OpenvpnConnectionFactory) Create(stateChannel connection.StateChannel
}
return &sessionWrapper{
createSession: sessionFactory,
natPinger: ocf.natPinger,
ipResolver: ocf.ipResolver,
}, nil
}

Expand All @@ -206,6 +231,8 @@ func (mobNode *MobileNode) OverrideOpenvpnConnection(tunnelSetup Openvpn3TunnelS
sessionTracker: st,
signerFactory: mobNode.di.SignerFactory,
tunnelSetup: tunnelSetup,
natPinger: mobNode.di.NATPinger,
ipResolver: mobNode.di.IPResolver,
}
mobNode.di.EventBus.Subscribe(connection.StateEventTopic, st.handleState)

Expand Down
24 changes: 12 additions & 12 deletions nat/traversal/events_sender.go → nat/event/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package traversal
package event

import (
log "github.com/cihub/seelog"
)

const eventsSenderLogPrefix = "[traversal-events-sender] "
const senderLogPrefix = "[traversal-events-sender] "

// EventsSender allows subscribing to NAT events and sends them to server
type EventsSender struct {
// Sender allows subscribing to NAT events and sends them to server
type Sender struct {
metricsSender metricsSender
ipResolver ipResolver
lastIp string
Expand All @@ -38,16 +38,16 @@ type metricsSender interface {

type ipResolver func() (string, error)

// NewEventsSender returns a new instance of events sender
func NewEventsSender(metricsSender metricsSender, ipResolver ipResolver) *EventsSender {
return &EventsSender{metricsSender: metricsSender, ipResolver: ipResolver, lastIp: ""}
// NewSender returns a new instance of events sender
func NewSender(metricsSender metricsSender, ipResolver ipResolver) *Sender {
return &Sender{metricsSender: metricsSender, ipResolver: ipResolver, lastIp: ""}
}

// ConsumeNATEvent sends received event to server
func (es *EventsSender) ConsumeNATEvent(event Event) {
func (es *Sender) ConsumeNATEvent(event Event) {
publicIP, err := es.ipResolver()
if err != nil {
log.Warnf(eventsSenderLogPrefix, "resolving public ip failed: ", err)
log.Warnf(senderLogPrefix, "resolving public ip failed: ", err)
return
}
if publicIP == es.lastIp && es.matchesLastEvent(event) {
Expand All @@ -56,22 +56,22 @@ func (es *EventsSender) ConsumeNATEvent(event Event) {

err = es.sendNATEvent(event)
if err != nil {
log.Warnf(eventsSenderLogPrefix, "sending event failed: ", err)
log.Warnf(senderLogPrefix, "sending event failed: ", err)
}

es.lastIp = publicIP
es.lastEvent = &event
}

func (es *EventsSender) sendNATEvent(event Event) error {
func (es *Sender) sendNATEvent(event Event) error {
if event.Successful {
return es.metricsSender.SendNATMappingSuccessEvent(event.Stage)
}

return es.metricsSender.SendNATMappingFailEvent(event.Stage, event.Error)
}

func (es *EventsSender) matchesLastEvent(event Event) bool {
func (es *Sender) matchesLastEvent(event Event) bool {
if es.lastEvent == nil {
return false
}
Expand Down
16 changes: 8 additions & 8 deletions nat/traversal/events_sender_test.go → nat/event/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package traversal
package event

import (
"errors"
Expand Down Expand Up @@ -60,7 +60,7 @@ func (resolver *mockIPResolver) GetPublicIP() (string, error) {
func Test_EventsSender_ConsumeNATEvent_SendsSuccessEvent(t *testing.T) {
mockMetricsSender := buildMockMetricsSender(nil)
mockIPResolver := mockIPResolver{mockIp: "1st ip"}
sender := NewEventsSender(mockMetricsSender, mockIPResolver.GetPublicIP)
sender := NewSender(mockMetricsSender, mockIPResolver.GetPublicIP)

sender.ConsumeNATEvent(Event{Stage: "hole_punching", Successful: true})

Expand All @@ -71,7 +71,7 @@ func Test_EventsSender_ConsumeNATEvent_SendsSuccessEvent(t *testing.T) {
func Test_EventsSender_ConsumeNATEvent_WithSameIp_DoesNotSendSuccessEventAgain(t *testing.T) {
mockMetricsSender := buildMockMetricsSender(nil)
mockIPResolver := mockIPResolver{mockIp: "1st ip"}
sender := NewEventsSender(mockMetricsSender, mockIPResolver.GetPublicIP)
sender := NewSender(mockMetricsSender, mockIPResolver.GetPublicIP)

sender.ConsumeNATEvent(Event{Successful: true})

Expand All @@ -84,7 +84,7 @@ func Test_EventsSender_ConsumeNATEvent_WithSameIp_DoesNotSendSuccessEventAgain(t
func Test_EventsSender_ConsumeNATEvent_WithDifferentIP_SendsSuccessEventAgain(t *testing.T) {
mockMetricsSender := buildMockMetricsSender(nil)
mockIPResolver := &mockIPResolver{mockIp: "1st ip"}
sender := NewEventsSender(mockMetricsSender, mockIPResolver.GetPublicIP)
sender := NewSender(mockMetricsSender, mockIPResolver.GetPublicIP)

sender.ConsumeNATEvent(Event{Successful: true})

Expand All @@ -98,7 +98,7 @@ func Test_EventsSender_ConsumeNATEvent_WithDifferentIP_SendsSuccessEventAgain(t
func Test_EventsSender_ConsumeNATEvent_WhenIPResolverFails_DoesNotSendEvent(t *testing.T) {
mockMetricsSender := buildMockMetricsSender(nil)
mockIPResolver := &mockIPResolver{mockErr: errors.New("mock error")}
sender := NewEventsSender(mockMetricsSender, mockIPResolver.GetPublicIP)
sender := NewSender(mockMetricsSender, mockIPResolver.GetPublicIP)

sender.ConsumeNATEvent(Event{Successful: true})

Expand All @@ -108,7 +108,7 @@ func Test_EventsSender_ConsumeNATEvent_WhenIPResolverFails_DoesNotSendEvent(t *t
func Test_EventsSender_ConsumeNATEvent_SendsFailureEvent(t *testing.T) {
mockMetricsSender := buildMockMetricsSender(nil)
mockIPResolver := mockIPResolver{mockIp: "1st ip"}
sender := NewEventsSender(mockMetricsSender, mockIPResolver.GetPublicIP)
sender := NewSender(mockMetricsSender, mockIPResolver.GetPublicIP)

testErr := errors.New("test error")
sender.ConsumeNATEvent(Event{Stage: "hole_punching", Successful: false, Error: testErr})
Expand All @@ -120,7 +120,7 @@ func Test_EventsSender_ConsumeNATEvent_SendsFailureEvent(t *testing.T) {
func Test_EventsSender_ConsumeNATEvent_WithFailuresOfDifferentStages_SendsBothEvents(t *testing.T) {
mockMetricsSender := buildMockMetricsSender(nil)
mockIPResolver := &mockIPResolver{mockIp: "1st ip"}
sender := NewEventsSender(mockMetricsSender, mockIPResolver.GetPublicIP)
sender := NewSender(mockMetricsSender, mockIPResolver.GetPublicIP)

testErr1 := errors.New("test error 1")
sender.ConsumeNATEvent(Event{Successful: false, Error: testErr1, Stage: "test 1"})
Expand All @@ -133,7 +133,7 @@ func Test_EventsSender_ConsumeNATEvent_WithFailuresOfDifferentStages_SendsBothEv
func Test_EventsSender_ConsumeNATEvent_WithSuccessAndFailureOnSameIp_SendsBothEvents(t *testing.T) {
mockMetricsSender := buildMockMetricsSender(nil)
mockIPResolver := &mockIPResolver{mockIp: "1st ip"}
sender := NewEventsSender(mockMetricsSender, mockIPResolver.GetPublicIP)
sender := NewSender(mockMetricsSender, mockIPResolver.GetPublicIP)

sender.ConsumeNATEvent(Event{Successful: true})
testErr := errors.New("test error")
Expand Down
Loading

0 comments on commit 85b311d

Please sign in to comment.