New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/MYST-431-make-connect-method-cancelable #212
Changes from 10 commits
241fef9
5708306
4ab321a
2754806
ee0dc51
4b275ec
15d1814
25d4790
cf6eb2e
1e96d45
7be95b9
05a6368
9604e00
f1e895c
e791802
c7e5464
b131ee0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,44 +2,57 @@ package connection | |
|
||
import ( | ||
"errors" | ||
log "github.com/cihub/seelog" | ||
"github.com/mysterium/node/communication" | ||
"github.com/mysterium/node/identity" | ||
"github.com/mysterium/node/openvpn" | ||
"github.com/mysterium/node/openvpn/middlewares/client/bytescount" | ||
"github.com/mysterium/node/server" | ||
"github.com/mysterium/node/service_discovery/dto" | ||
"github.com/mysterium/node/session" | ||
"github.com/mysterium/node/utils" | ||
) | ||
|
||
const managerLogPrefix = "[connection-manager] " | ||
|
||
const channelClosed = openvpn.State("") | ||
|
||
var ( | ||
// ErrNoConnection error indicates that action applied to manager expects active connection (i.e. disconnect) | ||
ErrNoConnection = errors.New("no connection exists") | ||
// ErrAlreadyExists error indicates that aciton applieto to manager expects no active connection (i.e. connect) | ||
ErrAlreadyExists = errors.New("connection already exists") | ||
// ErrConnectionCancelled indicates that connection in progress was cancelled by request of api user | ||
ErrConnectionCancelled = errors.New("connection was cancelled") | ||
// ErrOpenvpnProcessDied indicates that Connect method didn't reach "Connected" phase due to openvpn error | ||
ErrOpenvpnProcessDied = errors.New("openvpn process died") | ||
) | ||
|
||
type connectionManager struct { | ||
//these are passed on creation | ||
mysteriumClient server.Client | ||
newDialogEstablisher DialogEstablisherCreator | ||
newVpnClient VpnClientCreator | ||
statsKeeper bytescount.SessionStatsKeeper | ||
mysteriumClient server.Client | ||
newDialog DialogCreator | ||
newVpnClient VpnClientCreator | ||
statsKeeper bytescount.SessionStatsKeeper | ||
//these are populated by Connect at runtime | ||
status ConnectionStatus | ||
dialog communication.Dialog | ||
openvpnClient openvpn.Client | ||
sessionID session.SessionID | ||
status ConnectionStatus | ||
closeAction func() | ||
} | ||
|
||
func warnOnClose() { | ||
log.Warn(managerLogPrefix, "WARNING! Trying to close when there is nothing to close. Possible bug or race condition") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't need to start each warning with "WARNING!" - There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed |
||
} | ||
|
||
// NewManager creates connection manager with given dependencies | ||
func NewManager(mysteriumClient server.Client, dialogEstablisherCreator DialogEstablisherCreator, | ||
func NewManager(mysteriumClient server.Client, dialogCreator DialogCreator, | ||
vpnClientCreator VpnClientCreator, statsKeeper bytescount.SessionStatsKeeper) *connectionManager { | ||
return &connectionManager{ | ||
mysteriumClient: mysteriumClient, | ||
newDialogEstablisher: dialogEstablisherCreator, | ||
newVpnClient: vpnClientCreator, | ||
statsKeeper: statsKeeper, | ||
status: statusNotConnected(), | ||
mysteriumClient: mysteriumClient, | ||
newDialog: dialogCreator, | ||
newVpnClient: vpnClientCreator, | ||
statsKeeper: statsKeeper, | ||
status: statusNotConnected(), | ||
closeAction: warnOnClose, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like cleanConnection more maybe. Renamed |
||
} | ||
} | ||
|
||
|
@@ -55,33 +68,72 @@ func (manager *connectionManager) Connect(consumerID, providerID identity.Identi | |
} | ||
}() | ||
|
||
proposal, err := manager.findProposalByProviderID(providerID) | ||
cancelable := utils.NewCancelable() | ||
manager.closeAction = utils.CallOnce(func() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes there is a tiny possibility. But mutexes and locking would be better. Lets leave it to other PR |
||
log.Info(managerLogPrefix, "Closing active connection") | ||
manager.status = statusDisconnecting() | ||
cancelable.Cancel() | ||
}) | ||
|
||
val, err := cancelable. | ||
Request(func() (interface{}, error) { | ||
return manager.findProposalByProviderID(providerID) | ||
}). | ||
Call() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks very unusual to keep There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes that go syntax style. Otherwise - syntax error :) |
||
if err != nil { | ||
return err | ||
} | ||
proposal := val.(*dto.ServiceProposal) | ||
|
||
dialogEstablisher := manager.newDialogEstablisher(consumerID) | ||
manager.dialog, err = dialogEstablisher.EstablishDialog(providerID, proposal.ProviderContacts[0]) | ||
val, err = cancelable. | ||
Request(func() (interface{}, error) { | ||
return manager.newDialog(consumerID, providerID, proposal.ProviderContacts[0]) | ||
}). | ||
Cleanup(utils.SkipOnError(func(val interface{}) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This error skipping looks complicated:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now it's Cleanup(utils.InvokeOnSuccess(...)) because it's not a part of cancelable request, but a function decorator which invoces callback only if error == nil There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know pattern is applied, but still complicated There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. either if err == nil then do cleanup or function decorator. I would stay with decorator |
||
val.(communication.Dialog).Close() | ||
})). | ||
Call() | ||
if err != nil { | ||
return err | ||
} | ||
dialog := val.(communication.Dialog) | ||
|
||
vpnSession, err := session.RequestSessionCreate(manager.dialog, proposal.ID) | ||
val, err = cancelable. | ||
Request(func() (interface{}, error) { | ||
return session.RequestSessionCreate(dialog, proposal.ID) | ||
}). | ||
Call() | ||
if err != nil { | ||
dialog.Close() | ||
return err | ||
} | ||
manager.sessionID = vpnSession.ID | ||
vpnSession := val.(*session.SessionDto) | ||
|
||
manager.openvpnClient, err = manager.newVpnClient(*vpnSession, consumerID, providerID, manager.onVpnStatusUpdate) | ||
stateChannel := make(chan openvpn.State, 10) | ||
val, err = cancelable. | ||
Request(func() (interface{}, error) { | ||
return manager.startOpenvpnClient(*vpnSession, consumerID, providerID, stateChannel) | ||
}). | ||
Cleanup(utils.SkipOnError(func(val interface{}) { | ||
val.(openvpn.Client).Stop() | ||
})). | ||
Call() | ||
if err != nil { | ||
manager.dialog.Close() | ||
dialog.Close() | ||
return err | ||
} | ||
openvpnClient := val.(openvpn.Client) | ||
|
||
if err = manager.openvpnClient.Start(); err != nil { | ||
manager.dialog.Close() | ||
err = manager.waitForConnectedState(stateChannel, vpnSession.ID, cancelable.Cancelled) | ||
if err != nil { | ||
dialog.Close() | ||
openvpnClient.Stop() | ||
return err | ||
} | ||
|
||
go openvpnClientStopper(openvpnClient, cancelable.Cancelled) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can construct new closeAction to avoid exposing manager.closeAction = utils.CallOnce(func() {
log.Info(managerLogPrefix, "Closing active connection")
manager.status = statusDisconnecting()
openvpnClientStopper(openvpnClient)
}) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Refactored. However - discussable. Do we have a better readability? |
||
go openvpnClientWaiter(openvpnClient, dialog) | ||
go manager.consumeOpenvpnStates(stateChannel, vpnSession.ID) | ||
return nil | ||
} | ||
|
||
|
@@ -93,24 +145,10 @@ func (manager *connectionManager) Disconnect() error { | |
if manager.status.State == NotConnected { | ||
return ErrNoConnection | ||
} | ||
manager.status = statusDisconnecting() | ||
manager.openvpnClient.Stop() | ||
manager.closeAction() | ||
return nil | ||
} | ||
|
||
func (manager *connectionManager) onVpnStatusUpdate(vpnState openvpn.State) { | ||
switch vpnState { | ||
case openvpn.ConnectedState: | ||
manager.statsKeeper.MarkSessionStart() | ||
manager.status = statusConnected(manager.sessionID) | ||
case openvpn.ExitingState: | ||
manager.status = statusNotConnected() | ||
manager.statsKeeper.MarkSessionEnd() | ||
case openvpn.ReconnectingState: | ||
manager.status = statusReconnecting() | ||
} | ||
} | ||
|
||
// TODO this can be extraced as depencency later when node selection criteria will be clear | ||
func (manager *connectionManager) findProposalByProviderID(providerID identity.Identity) (*dto.ServiceProposal, error) { | ||
proposals, err := manager.mysteriumClient.FindProposals(providerID.Address) | ||
|
@@ -123,3 +161,83 @@ func (manager *connectionManager) findProposalByProviderID(providerID identity.I | |
} | ||
return &proposals[0], nil | ||
} | ||
|
||
func openvpnClientStopper(openvpnClient openvpn.Client, cancelRequest utils.CancelChannel) { | ||
<-cancelRequest | ||
log.Debug(managerLogPrefix, "Stopping openvpn client") | ||
err := openvpnClient.Stop() | ||
if err != nil { | ||
log.Error(managerLogPrefix, "Failed to stop openvpn client: ", err) | ||
} | ||
} | ||
|
||
func openvpnClientWaiter(openvpnClient openvpn.Client, dialog communication.Dialog) { | ||
defer dialog.Close() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use explicit close at the end. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed |
||
err := openvpnClient.Wait() | ||
if err != nil { | ||
log.Warn(managerLogPrefix, "Openvpn client exited with error: ", err) | ||
} else { | ||
log.Info(managerLogPrefix, "Openvpn client exited") | ||
} | ||
} | ||
|
||
func (manager *connectionManager) startOpenvpnClient(vpnSession session.SessionDto, consumerID, providerID identity.Identity, stateChannel chan openvpn.State) (openvpn.Client, error) { | ||
openvpnClient, err := manager.newVpnClient( | ||
vpnSession, | ||
consumerID, | ||
providerID, | ||
channelToStateCallbackAdapter(stateChannel), | ||
) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
if err = openvpnClient.Start(); err != nil { | ||
return nil, err | ||
} | ||
|
||
return openvpnClient, nil | ||
} | ||
|
||
func (manager *connectionManager) waitForConnectedState(stateChannel <-chan openvpn.State, sessionID session.SessionID, cancelRequest utils.CancelChannel) error { | ||
|
||
for { | ||
select { | ||
case state := <-stateChannel: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Very strange that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. read state and also check if channel was closed explicitly |
||
switch state { | ||
case openvpn.ConnectedState: | ||
manager.onStateChanged(state, sessionID) | ||
return nil | ||
case channelClosed: | ||
return ErrOpenvpnProcessDied | ||
default: | ||
manager.onStateChanged(state, sessionID) | ||
} | ||
case <-cancelRequest: | ||
return ErrConnectionCancelled | ||
} | ||
} | ||
} | ||
|
||
func (manager *connectionManager) consumeOpenvpnStates(stateChannel <-chan openvpn.State, sessionID session.SessionID) { | ||
func() { | ||
for state := range stateChannel { | ||
manager.onStateChanged(state, sessionID) | ||
} | ||
manager.status = statusNotConnected() | ||
log.Debug(managerLogPrefix, "State updater stopped") | ||
}() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unneeded func. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed |
||
|
||
} | ||
|
||
func (manager *connectionManager) onStateChanged(state openvpn.State, sessionID session.SessionID) { | ||
switch state { | ||
case openvpn.ConnectedState: | ||
manager.statsKeeper.MarkSessionStart() | ||
manager.status = statusConnected(sessionID) | ||
case openvpn.ExitingState: | ||
manager.statsKeeper.MarkSessionEnd() | ||
case openvpn.ReconnectingState: | ||
manager.status = statusReconnecting() | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This private function should go after
NewManager
, which is public.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to the bottom