Skip to content

Commit

Permalink
Refactor ClientService into an explicit state machine.
Browse files Browse the repository at this point in the history
  • Loading branch information
tomprince committed Nov 10, 2016
1 parent 1155239 commit 2c82ec7
Show file tree
Hide file tree
Showing 2 changed files with 235 additions and 82 deletions.
316 changes: 234 additions & 82 deletions src/twisted/application/internet.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
CancelledError, gatherResults, Deferred, succeed, fail
)

from automat import MethodicalMachine


def _maybeGlobalReactor(maybeReactor):
Expand Down Expand Up @@ -531,6 +532,232 @@ def _noop():
"""


class _ClientMachine(object):

_machine = MethodicalMachine()

def __init__(self, endpoint, factory, retryPolicy, clock, log):
self._endpoint = endpoint
self._failedAttempts = 0
self._stopped = False
self._factory = factory
self._timeoutForAttempt = retryPolicy
self._clock = clock
self._stopRetry = _noop
self._connectionInProgress = succeed(None)
self._loseConnection = _noop

self._currentConnection = None
self._awaitingConnected = []

self._stopWaiters = []
self._log = log

@_machine.state(initial=True)
def _init(self):
"""
The service is not running.
"""

@_machine.state()
def _stopped(self):
pass

@_machine.state()
def _connecting(self):
"""
The service has started connecting.
"""

@_machine.state()
def _waiting(self):
"""
The service is waiting for the reconnecting period
before reconnecting.
"""

@_machine.state()
def _connected(self):
"""
The service is connected.
"""

@_machine.state()
def _disconnecting(self):
"""
The service is disconnecting after being asked to shutdown.
"""

@_machine.state()
def _disconnecting_restart(self):
"""
The service is disconnecting and has been asked to restart.
"""

@_machine.input()
def _clientConnect(self, protocol):
self._failedAttempts = 0

@_machine.output()
def _notifyWaiters(self, protocol):
self._loseConnection = protocol.transport.loseConnection
self._currentConnection = protocol._protocol
self._unawait(self._currentConnection)

@_machine.input()
def _clientDisconnect(self):
pass

@_machine.output()
def _do_clientDisconnect(self):
self._currentConnection = None
self._loseConnection = _noop


@_machine.input()
def _retry(self):
pass

@_machine.input()
def _connectionFailed(self):
pass

@_machine.output()
def _do_retry(self):
self._failedAttempts += 1
delay = self._timeoutForAttempt(self._failedAttempts)
self._log.info("Scheduling retry {attempt} to connect {endpoint} "
"in {delay} seconds.", attempt=self._failedAttempts,
endpoint=self._endpoint, delay=delay)
self._stopRetry = self._clock.callLater(delay, self._reconnect).cancel



@_machine.input()
def _reconnect(self):
"""
"""

@_machine.input()
def start(self):
"""
Start this L{ClientService}, initiating the connection retry loop.
"""
self._failedAttempts = 0

@_machine.output()
def _connectNow(self):
factoryProxy = _DisconnectFactory(self._factory, lambda _: self._clientDisconnect())

self._stopRetry = _noop
self._connectionInProgress = (self._endpoint.connect(factoryProxy)
.addCallback(self._clientConnect)
.addErrback(lambda _: self._connectionFailed()))

@_machine.input()
def stop(self):
"""
"""

@_machine.output()
def _do_stopService(self):
self._stopRetry()
self._stopRetry = _noop
print self._connectionFailed
self._connectionInProgress.cancel()
self._loseConnection()
self._currentConnection = None


@_machine.output()
def _cancelConnectWaiters(self):
self._stopped = True
self._unawait(Failure(CancelledError()))

@_machine.output()
def _finishStopping(self):
self._stopWaiters, waiting = [], self._stopWaiters
for w in waiting:
w.callback(None)

@_machine.output()
def _waitForStop(self):
self._stopWaiters.append(Deferred())
return self._stopWaiters[-1]


@_machine.input()
def _disconnect(self):
pass

@_machine.input()
def whenConnected(self):
pass

@_machine.output()
def currentConnection(self):
return succeed(self._currentConnection)

@_machine.output()
def noConnection(self):
return fail(CancelledError())

@_machine.output()
def awaitingConnection(self):
result = Deferred()
self._awaitingConnected.append(result)
return result

def _unawait(self, value):
"""
Fire all outstanding L{ClientService.whenConnected} L{Deferred}s.
@param value: the value to fire the L{Deferred}s with.
"""
self._awaitingConnected, waiting = [], self._awaitingConnected
for w in waiting:
w.callback(value)


next = lambda _: list(_)[0]
_connecting.upon(_connectionFailed, enter=_waiting, outputs=[_do_retry])
_connecting.upon(start, enter=_connecting, outputs=[])
_connecting.upon(stop, enter=_disconnecting, outputs=[_waitForStop, _do_stopService], collector=next)
_connecting.upon(_clientConnect, enter=_connected, outputs=[_notifyWaiters])
_connecting.upon(whenConnected, enter=_connecting, outputs=[awaitingConnection], collector=next)

_connected.upon(stop, enter=_disconnecting, outputs=[_waitForStop, _do_stopService], collector=next)
_connected.upon(_clientDisconnect, enter=_waiting, outputs=[_do_clientDisconnect, _do_retry])
_connected.upon(whenConnected, enter=_connected, outputs=[currentConnection], collector=next)

_init.upon(_retry, enter=_init, outputs=[])
_init.upon(start, enter=_connecting, outputs=[_connectNow])
_init.upon(stop, enter=_init, outputs=[])
_init.upon(whenConnected, enter=_init, outputs=[awaitingConnection], collector=next)

_stopped.upon(_retry, enter=_stopped, outputs=[])
_stopped.upon(start, enter=_connecting, outputs=[_connectNow])
_stopped.upon(stop, enter=_stopped, outputs=[])
_stopped.upon(whenConnected, enter=_stopped, outputs=[noConnection], collector=next)

_waiting.upon(_reconnect, enter=_connecting, outputs=[_connectNow])
_waiting.upon(stop, enter=_disconnecting, outputs=[_waitForStop, _do_stopService], collector=next)
_waiting.upon(whenConnected, enter=_waiting, outputs=[awaitingConnection], collector=next)

_disconnecting.upon(start, enter=_disconnecting_restart, outputs=[])
_disconnecting.upon(_clientDisconnect, enter=_stopped, outputs=[_cancelConnectWaiters, _finishStopping, _do_clientDisconnect])
_disconnecting.upon(_connectionFailed, enter=_stopped, outputs=[_cancelConnectWaiters, _finishStopping])
_disconnecting.upon(whenConnected, enter=_disconnecting, outputs=[noConnection], collector=next)

_disconnecting_restart.upon(stop, enter=_disconnecting, outputs=[])
_disconnecting_restart.upon(_clientDisconnect, enter=_connecting, outputs=[_finishStopping, _connectNow])
_disconnecting_restart.upon(_connectionFailed, enter=_stopped, outputs=[_finishStopping, _connectNow])
_disconnecting_restart.upon(whenConnected, enter=_disconnecting_restart, outputs=[awaitingConnection], collector=next)
del next





class ClientService(service.Service, object):
"""
Expand All @@ -542,7 +769,6 @@ class ClientService(service.Service, object):
"""

_log = Logger()

def __init__(self, endpoint, factory, retryPolicy=None, clock=None):
"""
@param endpoint: A L{stream client endpoint
Expand All @@ -567,19 +793,10 @@ def __init__(self, endpoint, factory, retryPolicy=None, clock=None):
clock = _maybeGlobalReactor(clock)
retryPolicy = _defaultPolicy if retryPolicy is None else retryPolicy

self._endpoint = endpoint
self._failedAttempts = 0
self._stopped = False
self._factory = factory
self._timeoutForAttempt = retryPolicy
self._clock = clock
self._stopRetry = _noop
self._lostDeferred = succeed(None)
self._connectionInProgress = succeed(None)
self._loseConnection = _noop

self._currentConnection = None
self._awaitingConnected = []
self._machine = _ClientMachine(
endpoint, factory, retryPolicy, clock,
log=self._log,
)


def whenConnected(self):
Expand All @@ -592,25 +809,7 @@ def whenConnected(self):
@rtype: L{Deferred} firing with L{IProtocol} or failing with
L{CancelledError} the service is stopped.
"""
if self._currentConnection is not None:
return succeed(self._currentConnection)
elif self._stopped:
return fail(CancelledError())
else:
result = Deferred()
self._awaitingConnected.append(result)
return result


def _unawait(self, value):
"""
Fire all outstanding L{ClientService.whenConnected} L{Deferred}s.
@param value: the value to fire the L{Deferred}s with.
"""
self._awaitingConnected, waiting = [], self._awaitingConnected
for w in waiting:
w.callback(value)
return self._machine.whenConnected()


def startService(self):
Expand All @@ -621,42 +820,7 @@ def startService(self):
self._log.warn("Duplicate ClientService.startService {log_source}")
return
super(ClientService, self).startService()
self._failedAttempts = 0

def connectNow():
thisLostDeferred = Deferred()

def clientConnect(protocol):
self._failedAttempts = 0
self._loseConnection = protocol.transport.loseConnection
self._lostDeferred = thisLostDeferred
self._currentConnection = protocol._protocol
self._unawait(self._currentConnection)

def clientDisconnect(reason):
self._currentConnection = None
self._loseConnection = _noop
thisLostDeferred.callback(None)
retry(reason)

factoryProxy = _DisconnectFactory(self._factory, clientDisconnect)

self._stopRetry = _noop
self._connectionInProgress = (self._endpoint.connect(factoryProxy)
.addCallback(clientConnect)
.addErrback(retry))

def retry(failure):
if not self.running:
return
self._failedAttempts += 1
delay = self._timeoutForAttempt(self._failedAttempts)
self._log.info("Scheduling retry {attempt} to connect {endpoint} "
"in {delay} seconds.", attempt=self._failedAttempts,
endpoint=self._endpoint, delay=delay)
self._stopRetry = self._clock.callLater(delay, connectNow).cancel

connectNow()
self._machine.start()


def stopService(self):
Expand All @@ -667,19 +831,7 @@ def stopService(self):
closed and all in-progress connection attempts halted.
"""
super(ClientService, self).stopService()
self._stopRetry()
self._stopRetry = _noop
self._connectionInProgress.cancel()
self._loseConnection()
self._currentConnection = None
def finishStopping(result):
if not self.running:
self._stopped = True
self._unawait(Failure(CancelledError()))
return None
return (gatherResults([self._connectionInProgress, self._lostDeferred])
.addBoth(finishStopping))

return self._machine.stop()


__all__ = (['TimerService', 'CooperatorService', 'MulticastServer',
Expand Down
1 change: 1 addition & 0 deletions src/twisted/python/_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ class my_build_ext(build_ext_twisted):

requirements.append("constantly >= 15.1")
requirements.append("incremental >= 16.10.1")
requirements.append("Automat > 0.2.1")

arguments.update(dict(
packages=find_packages("src"),
Expand Down

0 comments on commit 2c82ec7

Please sign in to comment.