Skip to content

Commit

Permalink
Merge branch 'trunk' into 9101-wdauchy-tcp_reuseport
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkowl committed Apr 27, 2017
2 parents 9c64e4e + 0aef336 commit 66929c6
Show file tree
Hide file tree
Showing 15 changed files with 386 additions and 43 deletions.
8 changes: 4 additions & 4 deletions docs/core/howto/application.rst
Expand Up @@ -104,10 +104,10 @@ Given a file named ``my.py`` with the code:

.. code-block:: python
from twisted.python.log import FileLogObserver
from twisted.logger import textFileLogObserver
def logger():
return FileLogObserver(open("/tmp/my.log", "w")).emit
return textFileLogObserver(open("/tmp/my.log", "w"))
Invoking ``twistd --logger my.logger ...`` will log to a file named ``/tmp/my.log`` (this simple example could easily be replaced with use of the ``--logfile`` parameter to twistd).
Expand All @@ -120,12 +120,12 @@ Here is an example of how to use :api:`twisted.python.logfile.DailyLogFile <Dail
.. code-block:: python
from twisted.application.service import Application
from twisted.python.log import ILogObserver, FileLogObserver
from twisted.logger import ILogObserver, textFileLogObserver
from twisted.python.logfile import DailyLogFile
application = Application("myapp")
logfile = DailyLogFile("my.log", "/tmp")
application.setComponent(ILogObserver, FileLogObserver(logfile).emit)
application.setComponent(ILogObserver, textFileLogObserver(logfile))
Invoking ``twistd -y my.tac`` will create a log file at ``/tmp/my.log``.
Expand Down
30 changes: 30 additions & 0 deletions docs/core/howto/endpoints.rst
Expand Up @@ -152,6 +152,36 @@ Keep in mind that you may need to wrap this up for your particular application,
For example, that little snippet is slightly oversimplified: at the time ``connectedNow`` is run, the bot hasn't authenticated or joined the channel yet, so its message will be refused.
A real-life IRC bot would need to have its own method for waiting until the connection is fully ready for chat before chatting.

Reporting an Initial Failure
----------------------------

Often times, a failure of the very first connection attempt is special.
It may indicate a problem that won't go away by just trying harder.
The service may be configured with the wrong hostname, or the user may not have an internet connection at all (perhaps they forgot to turn on their wifi adapter).

Applications can ask ``whenConnected`` to make their ``Deferred`` fail if the service makes one or more connection attempts in a row without success.
You can pass the ``failAfterFailures`` parameter into ``ClientService`` to set this threshold.

By calling ``whenConnected(failAfterFailures=1)`` when the service is first started (just before or just after ``startService``), your application will get notification of an initial connection failure.

Setting it to 1 makes it fail after a single connection failure.
Setting it to 2 means it will try once, wait a bit, try again, and then either fail or succeed depending upon the outcome of the second connection attempt.
You can use 3 or more too, if you're feeling particularly patient.
The default of ``None`` means it will wait forever for a successful connection.

Regardless of ``failAfterFailures``, the ``Deferred`` will always fail with :api:`twisted.internet.defer.CancelledError <CancelledError>` if the service is stopped before a connection is made.

.. code-block:: python
waitForConnection = myReconnectingService.whenConnected(failAfterFailures=1)
def connectedNow(clientForIRC):
clientForIRC.say("#bot-test", "hello, world!")
def failed(f):
print("initial connection failed: %s" % (f,))
# now you should stop the service and report the error upwards
waitForConnection.addCallbacks(connectedNow, failed)
Retry Policies
--------------

Expand Down
132 changes: 111 additions & 21 deletions src/twisted/application/internet.py
Expand Up @@ -555,6 +555,10 @@ def __init__(self, endpoint, factory, retryPolicy, clock, log):
@param log: The logger for the L{ClientService} instance this state
machine is associated to.
@type log: L{Logger}
@ivar _awaitingConnected: notifications to make when connection
succeeds, fails, or is cancelled
@type _awaitingConnected: list of (Deferred, count) tuples
"""
self._endpoint = endpoint
self._failedAttempts = 0
Expand Down Expand Up @@ -610,7 +614,7 @@ def _restarting(self):
@_machine.state()
def _stopped(self):
"""
The service has been stopped an is disconnected.
The service has been stopped and is disconnected.
"""

@_machine.input()
Expand All @@ -630,7 +634,7 @@ def _connect(self):
self._connectionInProgress = (
self._endpoint.connect(factoryProxy)
.addCallback(self._connectionMade)
.addErrback(lambda _: self._connectionFailed()))
.addErrback(self._connectionFailed))


@_machine.output()
Expand Down Expand Up @@ -715,16 +719,27 @@ def _notifyWaiters(self, protocol):


@_machine.input()
def _connectionFailed(self):
def _connectionFailed(self, f):
"""
The current connection attempt failed.
"""


@_machine.output()
def _wait(self):
"""
Schedule a retry attempt.
"""
self._doWait()

@_machine.output()
def _ignoreAndWait(self, f):
"""
Schedule a retry attempt, and ignore the Failure passed in.
"""
return self._doWait()

def _doWait(self):
self._failedAttempts += 1
delay = self._timeoutForAttempt(self._failedAttempts)
self._log.info("Scheduling retry {attempt} to connect {endpoint} "
Expand Down Expand Up @@ -761,31 +776,67 @@ def _cancelConnectWaiters(self):
"""
self._unawait(Failure(CancelledError()))

@_machine.output()
def _ignoreAndCancelConnectWaiters(self, f):
"""
Notify all pending requests for a connection that no more connections
are expected, after ignoring the Failure passed in.
"""
self._unawait(Failure(CancelledError()))


@_machine.output()
def _finishStopping(self):
"""
Notify all deferreds waiting on the service stopping.
"""
self._doFinishStopping()

@_machine.output()
def _ignoreAndFinishStopping(self, f):
"""
Notify all deferreds waiting on the service stopping, and ignore the
Failure passed in.
"""
self._doFinishStopping()

def _doFinishStopping(self):
self._stopWaiters, waiting = [], self._stopWaiters
for w in waiting:
w.callback(None)


@_machine.input()
def whenConnected(self):
def whenConnected(self, failAfterFailures=None):
"""
Retrieve the currently-connected L{Protocol}, or the next one to
connect.
@return: a Deferred that fires with a protocol produced by the factory
passed to C{__init__}
@rtype: L{Deferred} firing with L{IProtocol} or failing with
L{CancelledError} the service is stopped.
@param failAfterFailures: number of connection failures after which
the Deferred will deliver a Failure (None means the Deferred will
only fail if/when the service is stopped). Set this to 1 to make
the very first connection failure signal an error. Use 2 to
allow one failure but signal an error if the subsequent retry
then fails.
@type failAfterFailures: L{int} or None
@return: a Deferred that fires with a protocol produced by the
factory passed to C{__init__}
@rtype: L{Deferred} that may:
- fire with L{IProtocol}
- fail with L{CancelledError} when the service is stopped
- fail with e.g.
L{DNSLookupError<twisted.internet.error.DNSLookupError>} or
L{ConnectionRefusedError<twisted.internet.error.ConnectionRefusedError>}
when the number of consecutive failed connection attempts
equals the value of "failAfterFailures"
"""

@_machine.output()
def _currentConnection(self):
def _currentConnection(self, failAfterFailures=None):
"""
Return the currently connected protocol.
Expand All @@ -795,7 +846,7 @@ def _currentConnection(self):


@_machine.output()
def _noConnection(self):
def _noConnection(self, failAfterFailures=None):
"""
Notify the caller that no connection is expected.
Expand All @@ -805,14 +856,14 @@ def _noConnection(self):


@_machine.output()
def _awaitingConnection(self):
def _awaitingConnection(self, failAfterFailures=None):
"""
Return a deferred that will fire with the next connected protocol.
@return: L{Deferred} that will fire with the next connected protocol.
"""
result = Deferred()
self._awaitingConnected.append(result)
self._awaitingConnected.append((result, failAfterFailures))
return result


Expand All @@ -833,9 +884,30 @@ def _unawait(self, value):
@param value: the value to fire the L{Deferred}s with.
"""
self._awaitingConnected, waiting = [], self._awaitingConnected
for w in waiting:
for (w, remaining) in waiting:
w.callback(value)

@_machine.output()
def _deliverConnectionFailure(self, f):
"""
Deliver connection failures to any L{ClientService.whenConnected}
L{Deferred}s that have met their failAfterFailures threshold.
@param f: the Failure to fire the L{Deferred}s with.
"""
ready = []
notReady = []
for (w, remaining) in self._awaitingConnected:
if remaining is None:
notReady.append((w, remaining))
elif remaining <= 1:
ready.append(w)
else:
notReady.append((w, remaining-1))
self._awaitingConnected = notReady
for w in ready:
w.callback(f)

# State Transitions

_init.upon(start, enter=_connecting,
Expand All @@ -853,7 +925,7 @@ def _unawait(self, value):
_connecting.upon(_connectionMade, enter=_connected,
outputs=[_notifyWaiters])
_connecting.upon(_connectionFailed, enter=_waiting,
outputs=[_wait])
outputs=[_ignoreAndWait, _deliverConnectionFailure])

_waiting.upon(start, enter=_waiting,
outputs=[])
Expand Down Expand Up @@ -886,7 +958,8 @@ def _unawait(self, value):
# Note that this is triggered synchonously with the transition from
# _connecting
_disconnecting.upon(_connectionFailed, enter=_stopped,
outputs=[_cancelConnectWaiters, _finishStopping])
outputs=[_ignoreAndCancelConnectWaiters,
_ignoreAndFinishStopping])

_restarting.upon(start, enter=_restarting,
outputs=[])
Expand Down Expand Up @@ -966,17 +1039,34 @@ def __init__(self, endpoint, factory, retryPolicy=None, clock=None):
)


def whenConnected(self):
def whenConnected(self, failAfterFailures=None):
"""
Retrieve the currently-connected L{Protocol}, or the next one to
connect.
@return: a Deferred that fires with a protocol produced by the factory
passed to C{__init__}
@rtype: L{Deferred} firing with L{IProtocol} or failing with
L{CancelledError} the service is stopped.
@param failAfterFailures: number of connection failures after which
the Deferred will deliver a Failure (None means the Deferred will
only fail if/when the service is stopped). Set this to 1 to make
the very first connection failure signal an error. Use 2 to
allow one failure but signal an error if the subsequent retry
then fails.
@type failAfterFailures: L{int} or None
@return: a Deferred that fires with a protocol produced by the
factory passed to C{__init__}
@rtype: L{Deferred} that may:
- fire with L{IProtocol}
- fail with L{CancelledError} when the service is stopped
- fail with e.g.
L{DNSLookupError<twisted.internet.error.DNSLookupError>} or
L{ConnectionRefusedError<twisted.internet.error.ConnectionRefusedError>}
when the number of consecutive failed connection attempts
equals the value of "failAfterFailures"
"""
return self._machine.whenConnected()
return self._machine.whenConnected(failAfterFailures)


def startService(self):
Expand Down

0 comments on commit 66929c6

Please sign in to comment.