Skip to content

Commit

Permalink
Merge pull request #378 from mysteriumnetwork/cancelable-context
Browse files Browse the repository at this point in the history
Custom cancelable implementation replaced with context
  • Loading branch information
zolia committed Sep 19, 2018
2 parents 44d95da + 05c62ab commit 1d49785
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 315 deletions.
1 change: 1 addition & 0 deletions core/connection/interface.go
Expand Up @@ -20,6 +20,7 @@ package connection
import (
"github.com/mysteriumnetwork/go-openvpn/openvpn"
"github.com/mysteriumnetwork/go-openvpn/openvpn/middlewares/state"

"github.com/mysteriumnetwork/node/communication"
"github.com/mysteriumnetwork/node/identity"
"github.com/mysteriumnetwork/node/service_discovery/dto"
Expand Down
92 changes: 32 additions & 60 deletions core/connection/manager.go
Expand Up @@ -18,6 +18,7 @@
package connection

import (
"context"
"errors"
"sync"

Expand All @@ -30,7 +31,6 @@ import (
"github.com/mysteriumnetwork/node/server"
"github.com/mysteriumnetwork/node/service_discovery/dto"
"github.com/mysteriumnetwork/node/session"
"github.com/mysteriumnetwork/node/utils"
)

const managerLogPrefix = "[connection-manager] "
Expand All @@ -53,10 +53,10 @@ type connectionManager struct {
newVpnClient VpnClientCreator
statsKeeper stats.SessionStatsKeeper
//these are populated by Connect at runtime
ctx context.Context
mutex sync.RWMutex
status ConnectionStatus
cleanConnection func()

mutex sync.RWMutex
}

// NewManager creates connection manager with given dependencies
Expand All @@ -78,6 +78,7 @@ func (manager *connectionManager) Connect(consumerID, providerID identity.Identi
}

manager.mutex.Lock()
manager.ctx, manager.cleanConnection = context.WithCancel(context.Background())
manager.status = statusConnecting()
manager.mutex.Unlock()
defer func() {
Expand All @@ -89,76 +90,57 @@ func (manager *connectionManager) Connect(consumerID, providerID identity.Identi
}()

err = manager.startConnection(consumerID, providerID, options)
if err == utils.ErrRequestCancelled {
if err == context.Canceled {
return ErrConnectionCancelled
}
return err
}

func (manager *connectionManager) startConnection(consumerID, providerID identity.Identity, options ConnectOptions) (err error) {
cancelable := utils.NewCancelable()

manager.mutex.Lock()
manager.cleanConnection = utils.CallOnce(func() {
log.Info(managerLogPrefix, "Cancelling connection initiation")
manager.status = statusDisconnecting()
cancelable.Cancel()
})
cancelCtx := manager.cleanConnection
manager.mutex.Unlock()

val, err := cancelable.
NewRequest(func() (interface{}, error) {
return manager.findProposalByProviderID(providerID)
}).
Call()
var cancel []func()
defer func() {
manager.cleanConnection = func() {
manager.status = statusDisconnecting()
cancelCtx()
for _, f := range cancel {
f()
}
}
if err != nil {
log.Info(managerLogPrefix, "Cancelling connection initiation")
defer manager.cleanConnection()
}
}()

proposal, err := manager.findProposalByProviderID(providerID)
if err != nil {
return err
}
proposal := val.(*dto.ServiceProposal)

val, err = cancelable.
NewRequest(func() (interface{}, error) {
return manager.newDialog(consumerID, providerID, proposal.ProviderContacts[0])
}).
Cleanup(utils.InvokeOnSuccess(func(val interface{}) {
val.(communication.Dialog).Close()
})).
Call()

dialog, err := manager.newDialog(consumerID, providerID, proposal.ProviderContacts[0])
if err != nil {
return err
}
dialog := val.(communication.Dialog)
cancel = append(cancel, func() { dialog.Close() })

val, err = cancelable.
NewRequest(func() (interface{}, error) {
return session.RequestSessionCreate(dialog, proposal.ID)
}).
Call()
vpnSession, err := session.RequestSessionCreate(dialog, proposal.ID)
if err != nil {
dialog.Close()
return err
}
vpnSession := val.(*session.SessionDto)

stateChannel := make(chan openvpn.State, 10)
val, err = cancelable.
NewRequest(func() (interface{}, error) {
return manager.startOpenvpnClient(*vpnSession, consumerID, providerID, stateChannel, options)
}).
Cleanup(utils.InvokeOnSuccess(func(val interface{}) {
val.(openvpn.Process).Stop()
})).
Call()
openvpnClient, err := manager.startOpenvpnClient(*vpnSession, consumerID, providerID, stateChannel, options)
if err != nil {
dialog.Close()
return err
}
openvpnClient := val.(openvpn.Process)
cancel = append(cancel, openvpnClient.Stop)

err = manager.waitForConnectedState(stateChannel, vpnSession.ID, cancelable.Cancelled)
err = manager.waitForConnectedState(stateChannel, vpnSession.ID)
if err != nil {
dialog.Close()
openvpnClient.Stop()
return err
}

Expand All @@ -168,15 +150,6 @@ func (manager *connectionManager) startConnection(consumerID, providerID identit
firewall.NewKillSwitch().Enable()
}

manager.mutex.Lock()
manager.cleanConnection = func() {
log.Info(managerLogPrefix, "Closing active connection")
manager.status = statusDisconnecting()
openvpnClient.Stop()
log.Info(managerLogPrefix, "Openvpn client stop requested")
}
manager.mutex.Unlock()

go openvpnClientWaiter(openvpnClient, dialog)
go manager.consumeOpenvpnStates(stateChannel, vpnSession.ID)
return nil
Expand Down Expand Up @@ -246,8 +219,7 @@ func (manager *connectionManager) startOpenvpnClient(vpnSession session.SessionD
return openvpnClient, nil
}

func (manager *connectionManager) waitForConnectedState(stateChannel <-chan openvpn.State, sessionID session.SessionID, cancelRequest utils.CancelChannel) error {

func (manager *connectionManager) waitForConnectedState(stateChannel <-chan openvpn.State, sessionID session.SessionID) error {
for {
select {
case state, more := <-stateChannel:
Expand All @@ -262,8 +234,8 @@ func (manager *connectionManager) waitForConnectedState(stateChannel <-chan open
default:
manager.onStateChanged(state, sessionID)
}
case <-cancelRequest:
return utils.ErrRequestCancelled
case <-manager.ctx.Done():
return manager.ctx.Err()
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions discovery/discovery.go
Expand Up @@ -104,11 +104,11 @@ func (d *Discovery) stopLoop() {
log.Info(logPrefix, "stopping discovery loop..")
d.RLock()
if d.status == WaitingForRegistration {
d.RUnlock()
d.unsubscribe()
d.RLock()
}
d.RUnlock()

d.RLock()
if d.status == RegisterProposal || d.status == PingProposal {
d.RUnlock()
d.changeStatus(UnregisterProposal)
Expand Down
4 changes: 2 additions & 2 deletions discovery/discovery_fake.go
Expand Up @@ -25,8 +25,8 @@ import (
"github.com/mysteriumnetwork/node/server"
)

// NewFakeDiscrovery creates fake discovery structure
func NewFakeDiscrovery() *Discovery {
// NewFakeDiscovery creates fake discovery structure
func NewFakeDiscovery() *Discovery {
return &Discovery{
statusChan: make(chan Status),
proposalAnnouncementStopped: &sync.WaitGroup{},
Expand Down
10 changes: 5 additions & 5 deletions discovery/discovery_test.go
Expand Up @@ -35,7 +35,7 @@ var (
)

func TestStartRegistersProposal(t *testing.T) {
d := NewFakeDiscrovery()
d := NewFakeDiscovery()
d.identityRegistry = &identity_registry.FakeRegistry{RegistrationEventExists: false, Registered: true}

d.Start(providerID, proposal)
Expand All @@ -45,7 +45,7 @@ func TestStartRegistersProposal(t *testing.T) {
}

func TestStartRegistersIdentitySuccessfully(t *testing.T) {
d := NewFakeDiscrovery()
d := NewFakeDiscovery()
d.identityRegistry = &identity_registry.FakeRegistry{RegistrationEventExists: true, Registered: false}

d.Start(providerID, proposal)
Expand All @@ -55,7 +55,7 @@ func TestStartRegistersIdentitySuccessfully(t *testing.T) {
}

func TestStartRegisterIdentityCancelled(t *testing.T) {
d := NewFakeDiscrovery()
d := NewFakeDiscovery()
d.identityRegistry = &identity_registry.FakeRegistry{RegistrationEventExists: false, Registered: false}

d.Start(providerID, proposal)
Expand All @@ -70,7 +70,7 @@ func TestStartRegisterIdentityCancelled(t *testing.T) {
}

func TestStartStopUnregisterProposal(t *testing.T) {
d := NewFakeDiscrovery()
d := NewFakeDiscovery()
d.identityRegistry = &identity_registry.FakeRegistry{RegistrationEventExists: false, Registered: true}

d.Start(providerID, proposal)
Expand All @@ -91,7 +91,7 @@ func observeStatus(d *Discovery, status Status) Status {
d.RUnlock()
return d.status
}
time.Sleep(10 * time.Millisecond)
d.RUnlock()
time.Sleep(10 * time.Millisecond)
}
}
12 changes: 6 additions & 6 deletions discovery/factory.go
Expand Up @@ -52,12 +52,12 @@ func NewService(
signerCreate identity.SignerFactory,
) *Discovery {
return &Discovery{
identityRegistry: identityRegistry,
identityRegistration: identityRegistration,
mysteriumClient: mysteriumClient,
signerCreate: signerCreate,
statusChan: make(chan Status),
status: StatusUndefined,
identityRegistry: identityRegistry,
identityRegistration: identityRegistration,
mysteriumClient: mysteriumClient,
signerCreate: signerCreate,
statusChan: make(chan Status),
status: StatusUndefined,
proposalAnnouncementStopped: &sync.WaitGroup{},
unsubscribe: func() {},
stop: func() {},
Expand Down
1 change: 0 additions & 1 deletion requests/request.go
Expand Up @@ -75,7 +75,6 @@ func encodeToJSON(value interface{}) ([]byte, error) {
}

func newRequest(method, apiURI, path string, body []byte) (*http.Request, error) {

fullUrl := fmt.Sprintf("%v/%v", apiURI, path)
req, err := http.NewRequest(method, fullUrl, bytes.NewBuffer(body))
if err != nil {
Expand Down

0 comments on commit 1d49785

Please sign in to comment.