# Transports & Protocols #

The [`asyncio` event loop](https://docs.python.org/3/library/asyncio-eventloop.html#opening-network-connections) offers three ways of managing asynchronous network connections:

* at the lowest level, directly doing byte-oriented I/O on [socket objects](https://docs.python.org/3/library/asyncio-eventloop.html#working-with-socket-objects-directly).
* at a higher level, but still doing byte-oriented I/O, via its [streams](https://docs.python.org/3/library/asyncio-stream.html) mechanism.
* at a more advanced level, via [*Transports and Protocols*](https://docs.python.org/3/library/asyncio-protocol.html); these allow you to define custom message formats. The documentation says this offers the highest performance, but it can take more work to set up.

Note that none of these offers an asynchronous equivalent of [`socket.connect()`](https://docs.python.org/3/library/socket.html#socket.socket.connect).

The third one is the one we will be talking about here. The two main object classes involved are
* *Transports* — manage the transmission and reception of bytes of data over the connection.
* *Protocols* — manage the interpretation of _incoming_ bytes as meaningful message data.

The two concepts are closely tied together. You cannot meaningfully instantiate a transport without giving it a protocol to handle the data. Conversely, the transport will call the protocol’s [`connection_made()`](https://docs.python.org/3/library/asyncio-protocol.html#asyncio.BaseProtocol.connection_made) method once communication with the other end is functional, which the latter can use to save a reference to the transport object that it can use for *sending* data.

How a protocol communicates with the calling program — notifying it of incoming decoded messages, and accepting outgoing messages to be encoded and sent out via the transport — are not specified as part of the base transport and protocol classes. These mechanisms can be defined in whatever way the protocol implementation sees fit. For example, the actual encoding of outgoing messages might be done via some code elsewhere in the calling program, which then calls the transport to send it directly, without involving the protocol object at all.

## Example Protocol ##

To try to make things clearer, let us start by implementing a custom Protocol. This will be a very simple one, which just sends and receives decimal integers, one to a line.

In [None]:
import sys
import socket
import asyncio

In [None]:
class NumberStream(asyncio.Protocol) :
    "parses the incoming data stream into a sequence of integers, one to a line." \
    " Also provides the send_number() method, which will send an integer in the same" \
    " format."

    def __init__(self) :
        super().__init__()
        self.curline = ""
        self.numbers = []
        self.awaiting = None
        self.eof = False
        self.transport = None
    #end __init__

    # Methods defined by Protocol interface:
    # (I implement just the bare minimum of these, enough for the
    # example to work)

    def connection_made(self, transport) :
        # Save transport object for use to send outgoing messages.
        self.transport = transport
    #end connection_made

    def connection_lost(self, exc) :
        self.transport = None
    #end connection_lost

    def data_received(self, data) :
        sys.stderr.write("NumberStream %s: data received: %s\n" % (repr(self), repr(data))) # debug
        data = self.curline + data.decode()
        while True :
            data = data.split("\n", 1)
            if len(data) == 1 :
                self.curline, = data
                break
            #end if
            curnum, data = data
            self.numbers.append(int(curnum))
            if self.awaiting != None and not self.awaiting.done() :
                # wake up any waiters
                self.awaiting.set_result(None)
            #end if
        #end while
    #end data_received

    def eof_received(self) :
        sys.stderr.write("NumberStream %s: eof received\n" % repr(self)) # debug
        self.eof = True
        if self.awaiting != None :
            self.awaiting.set_exception(StopAsyncIteration("transport EOF"))
        #end if
    #end eof_received

    # Application-specific methods for caller use:

    async def get_next_number(self) :
        "for caller’s use to retrieve incoming decoded numbers."
        while True :
            if len(self.numbers) != 0 :
                result = self.numbers.pop(0)
                break
            #end if
            if self.eof :
                raise StopAsyncIteration("transport EOF")
            #end if
            awaiting = None
            if self.awaiting == None :
                awaiting = asyncio.get_running_loop().create_future()
                self.awaiting = awaiting
            #end if
            await self.awaiting
            if awaiting == self.awaiting :
                self.awaiting = None # I created it, I get rid of it
            #end if
        #end while
        return result
    #end get_next_number

    def send_number(self, num) :
        "for caller’s use to encode and send outgoing numbers."
        data = (str(num) + "\n").encode()
        self.transport.write(data)
    #end send_number

#end NumberStream

To start with, we will use a standard transport type: client and server will communicate via an `AF_UNIX` socket with the following name. Note that the leading null means the name exists in Linux’s [abstract socket namespace](https://manpages.debian.org/bullseye/manpages/unix.7.en.html#Abstract_sockets), so no actual filename needs to be created.

In [None]:
NUMBER_SERVER = b"\x00Number Server"

Here is the code for the server task. All this server does is receive a number, add 1 to it, and send the result back.

Note that the [`create_unix_server`](https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.create_unix_server) call returns a [`Server`](https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.Server) object, which is not actually very useful. For example, it provides no (public) way to retrieve the associated protocol instance. To get this, you must provide a protocol factory which does its own instantiation of the protocol class and stashes the result somewhere before returning a copy of it.

Note also that the protocol object is not created immediately, so it is returned via a future that the main loop can await.

In [None]:
async def run_server() :

    number_stream = asyncio.get_running_loop().create_future()

    def create_number_stream() :
        sys.stderr.write("server creating number stream\n") # debug
        proto = NumberStream()
        number_stream.set_result(proto)
        return proto
    #end create_number_stream

    server = await asyncio.get_running_loop().create_unix_server \
      (
        protocol_factory = create_number_stream,
        path = NUMBER_SERVER
      )
    number_stream = await number_stream
    while not number_stream.eof :
        try :
            num = await number_stream.get_next_number()
        except StopAsyncIteration :
            break
        #end try
        sys.stderr.write("Server got number %d, sending back %d\n" % (num, num + 1))
        number_stream.send_number(num + 1)
    #end while
    server.close()
#end run_server


Here is a client task that opens a connection to the server, sends it some numbers, gets the results back, and closes the connection.

In [None]:
async def run_client() :
    conn, proto = await asyncio.get_running_loop().create_unix_connection \
      (
        protocol_factory = NumberStream,
        path = NUMBER_SERVER
      )
    for request in (12345, 54321, 98765) :
        proto.send_number(request)
        response = await proto.get_next_number()
        sys.stderr.write("Client sent %d, got back %d from server\n" % (request, response))
        await asyncio.sleep(2)
    #end for
    sys.stderr.write("client closing\n") # debug
    conn.close()
#end run_client


Finally, create the actual tasks, and give them a chance to run.

In [None]:
server_task = asyncio.create_task(run_server())
client_task = asyncio.create_task(run_client())

await asyncio.gather(server_task, client_task)

# Example Transport #

Next, let us try implementing our own transport. Rather than bother with any actual network connection, we will create a simple in-memory pipe which just copies data between its two ends.

In [None]:
class MemPipeTransport(asyncio.Transport) :
    "One end of a simple in-memory pipe transport. Do not instantiate" \
    " directly; use the create_pair() classmethod to create a linked pair" \
    " of these."

    def __init__(self) :
        self.protocol = None
        self.peer = None
    #end __init__

    @classmethod
    def create_pair(celf) :
        "creates a pair of MemPipeTransport objects such that anything" \
        " written to one can be read by the other."
        pipe1 = celf()
        pipe2 = celf()
        pipe1.peer = pipe2
        pipe2.peer = pipe1
        return pipe1, pipe2
    #end create_pair

    # Minimal set of Transport methods implemented, just enough
    # to get the example working:

    def set_protocol(self, protocol) :
        if self.protocol != None :
            self.protocol.connection_lost(None)
        #end if
        self.protocol = protocol
        protocol.connection_made(self)
    #end set_protocol

    def write(self, data) :
        self.peer.protocol.data_received(data)
    #end write

    def close(self) :
        if self.protocol != None :
            self.protocol.eof_received()
            self.protocol = None
        #end if
        if self.peer != None :
            self.peer.peer = None # avoid endless recursion
            self.peer.close()
            self.peer = None
        #end if
    #end close

#end MemPipeTransport

There are no generic `asyncio` methods for setting up connections using custom transports, so we will do all the transport and protocol setup ourselves.

In [None]:
client_pipe, server_pipe = MemPipeTransport.create_pair()
client_proto = NumberStream()
server_proto = NumberStream()
client_pipe.set_protocol(client_proto)
server_pipe.set_protocol(server_proto)

Here are sample server and client tasks to make use of our custom transport connection:

In [None]:
async def run_pipe_server() :
    while not server_proto.eof :
        try :
            num = await server_proto.get_next_number()
        except StopAsyncIteration :
            break
        #end try
        sys.stderr.write("Server got number %d, sending back %d\n" % (num, num + 1))
        server_proto.send_number(num + 1)
    #end while
#end run_pipe_server

async def run_pipe_client() :
    for request in (12345, 54321, 98765) :
        client_proto.send_number(request)
        response = await client_proto.get_next_number()
        sys.stderr.write("Client sent %d, got back %d from server\n" % (request, response))
        await asyncio.sleep(2)
    #end for
    client_pipe.close()
#end run_pipe_client

Let us run them and see what happens:

In [None]:
pipe_server_task = asyncio.create_task(run_pipe_server())
pipe_client_task = asyncio.create_task(run_pipe_client())
await asyncio.gather(pipe_server_task, pipe_client_task)

# Conclusion #

Don’t think of the Transports+Protocols system as some way to freely “mix and match” different protocols with different transports. Things are not really that general. Bytestream-based protocols are going to assume some kind of reliable bytestream-based transport. And the standard calls for setting up connections assume transports based on POSIX sockets (or [pipes](https://docs.python.org/3/library/asyncio-protocol.html#subprocess-protocols)) anyway. You can step a bit beyond this if you do more work yourself. I think you are going to implement custom protocols more often than you are going to implement custom transports.