Join GitHub today
GitHub is home to over 40 million developers working together to host and review code, manage projects, and build software together.Sign up
dilation: add half-close support #344
The upcoming "dilation" feature offers multiple "subchannels" all multiplexed over a single durable+abstract connection. Each subchannel behaves like a regular Twisted TCP endpoint.
As I'm rewriting the file-transfer protocol to use these subchannels, I'm discovering that it would be really handy if they were half-closeable. In the original FTP protocol (RFC959, written 34 years ago) they'd use one TCP connection per file and just read until the socket was closed, instead of sending a length ahead of time and then fetching exactly that number of bytes. This is also how HTTP/1.0 worked, before
For MW+Dilation transfers, I'm thinking we use one subchannel per file transfer. The first thing across the subchannel is a length-prefix and header, which tells the remote side:
Then the sender just streams the (maybe encoded) data, perhaps pausing when the receiver uses the producer/consumer API, and when they've sent the last byte, the sender half-closes the subchannel. The receiver observes the half-close and wraps things up, then sends back an ACK message with a hash of the received data. When the sender gets the ACK, they can check the hash to tell the user that the transfer succeeded.
When sending a directory, the estimated size could be a
If we didn't have half-close, we'd need to be very precise about when the transfer is complete. For single files and no compression, we know ahead of time exactly how many bytes will be sent over the wire, so the header can specify that, and the receiver can hang up after receiving that many bytes. With compression, we'd either need to compress the file ahead of time to get the wire-size up front (expensive), or we'd use the uncompressed size from disk and have the receipient count the bytes coming out of the decompressor (and we'd have to flush the decompressor after every chunk, also expensive). With directories, we'd have the same dilemma.
The API for this should match the standard Twisted interfaces. The ability to half-close is activated by attaching a Protocol which implements (
I'm still trying to figure out the instead-of vs in-addition-to questions. I'm looking at the original Twisted ticket for half-close (from 2003!) for hints, since the feature is somewhat underdocumented.
Testing with Twisted-19.2.1 and a localhost TCP socket tells me:
So it seems that
I'm not sure how much of this we should emulate. It feels like protocols should either be half-closeable (and only use
For now, I'm going to proceed as if that ideal is maintained, except I'm also going to call
For reference, here's the testing tool:
from __future__ import print_function, unicode_literals import sys from twisted.python import usage from twisted.internet.endpoints import clientFromString, serverFromString from twisted.internet.defer import Deferred from twisted.internet.task import react, LoopingCall from twisted.internet.protocol import Protocol, Factory from twisted.internet.interfaces import IHalfCloseableProtocol from zope.interface import implementer from twisted.python import log log.startLogging(sys.stderr) class Options(usage.Options): def parseArgs(self, mode, *args): if mode not in ["tx", "rx"]: raise usage.UsageError("mode must be 'tx' or 'rx', not '%s'" % mode) self.mode = mode self.args = args @implementer(IHalfCloseableProtocol) class ReceiveProtocol(Protocol): def __init__(self): super(Protocol, self).__init__() self._rx = 0 def connectionMade(self): print("RP.connectionMade") def dataReceived(self, data): old = self._rx self._rx += len(data) print("RP.dataReceived %d+%d=%d" %(old, len(data), self._rx)) if self._rx == 2: print("RP.loseWriteConnection") self.transport.loseWriteConnection() #print("RP.loseConnection") #self.transport.loseConnection() def readConnectionLost(self): print("RP.readConnectionLost") def writeConnectionLost(self): print("RP.writeConnectionLost") def connectionLost(self, why=None): print("RP.connectionLost") @implementer(IHalfCloseableProtocol) class SendProtocol(Protocol): def __init__(self): super(Protocol, self).__init__() self._rx = 0 self._tx = 0 def connectionMade(self): print("SP.connectionMade") self.lc = LoopingCall(self.do_write) self.lc.start(1.0, False) def do_write(self): print("SP.write %s+1=%s" % (self._tx, self._tx+1)) self._tx += 1 self.transport.write(b"a") if self._tx == 6: print("SP.loseWriteConnection") self.transport.loseWriteConnection() #print("SP.loseConnection") #self.transport.loseConnection() def dataReceived(self, data): old = self._rx self._rx += len(data) print("SP.dataReceived %d+%d=%d" %(old, len(data), self._rx)) def readConnectionLost(self): print("SP.readConnectionLost") def writeConnectionLost(self): print("SP.writeConnectionLost") def connectionLost(self, why=None): print("SP.connectionLost") class InstanceFactory(Factory): def __init__(self, instance): super(Factory, self).__init__() self.instance = instance def buildProtocol(self, addr): return self.instance def open(reactor, options): if options.mode == "tx": ep = clientFromString(reactor, "tcp:localhost:6777") ep.connect(InstanceFactory(SendProtocol())) else: ep = serverFromString(reactor, "tcp:6777") ep.listen(InstanceFactory(ReceiveProtocol())) return Deferred() def run(): options = Options() options.parseOptions() return react(open, (options,)) run()