## [Asynchronous I/O, event loop, and concurrency tools](https://pymotw.com/3/asyncio/index.html)

## Asynchronous Concurrency Concepts
**other concurrency model**: rely on threading or process management of the language runtime or operating system to change context

**asyncio**: explicitly handle context changes using eventloop and coroutine

**eventloop** - a place to register code to be run in application.

**coroutines** - as a mechanism for yielding control back to the event loop, these are special functions that give up control to their caller without losing their state (implented with generator in version older than Python 3.5)

**Future** - a data sturucture representing the result of work that has not been completed yet.

**Task** - a subclass of Future that knows how to wrap and manage the execution for a coroutine, can be scheduled with event loop.

## Cooperative Multitasking with Coroutines
Coroutines 
- a language construct designed for concurrency.
- creates a coroutine object when they called.
- then caller can run it using coroutine's `send()` method.
- while it is paused, their stats is manintaied
- when awakened, coroutines resume where it left off the next tinme.

### Starting a Coroutine

In [1]:
# asyncio_coroutine.py
import asyncio


async def coroutine():
    print('in coroutine')


event_loop = asyncio.get_event_loop()
try:
    print('starting coroutine')
    coro = coroutine()
    event_loop.run_until_complete(coro)
finally:
    print('closing event loop')
    # event_loop.close() - skipped in jupyter

starting coroutine
in coroutine
closing event loop


### Returning Values from Coroutines

In [2]:
# asyncio_coroutine_return.py
import asyncio


async def coroutine():
    print('in coroutine')
    return 'result'


event_loop = asyncio.get_event_loop()
try:
    return_value = event_loop.run_until_complete(
        coroutine()
    )
    print('it returned: {!r}'.format(return_value))
finally:
    pass
    # event_loop.close() - skipped in jupyter

in coroutine
it returned: 'result'


### Chaning Coroutines
decompising a task into resuable parts.

In [3]:
# asyncio_coroutine_chain.py
import asyncio


async def outer():
    print('in outer')
    print('waiting for result1')
    result1 = await phase1()
    print('waiting for result2')
    result2 = await phase2(result1)
    return (result1, result2)

async def phase1():
    print('in phase1')
    return 'result1'

async def phase2(arg):
    print('in phase2')
    return 'result2 derived from {}'.format(arg)

event_loop = asyncio.get_event_loop()
try:
    return_value = event_loop.run_until_complete(outer())
    print('return value: {!r}'.format(return_value))
finally:
    pass
    # event_loop.close() - skipped in jupyter

in outer
waiting for result1
in phase1
waiting for result2
in phase2
return value: ('result1', 'result2 derived from result1')


### Generators Instead of Coroutines
Earlier versions of Python 3 can use generator functions with the `asyncio.coroutine()` decorator and `yield from`

In [4]:
# asyncio_generator.py
import asyncio


@asyncio.coroutine
def outer():
    print('in outer')
    print('waiting for result1')
    result1 = yield from phase1()
    print('waiting for result2')
    result2 = yield from phase2(result1)
    return (result1, result2)


@asyncio.coroutine
def phase1():
    print('in phase1')
    return 'result1'


@asyncio.coroutine
def phase2(arg):
    print('in phase2')
    return 'result2 derived from {}'.format(arg)


event_loop = asyncio.get_event_loop()
try:
    return_value = event_loop.run_until_complete(outer())
    print('return value: {!r}'.format(return_value))
finally:
    pass
    # event_loop.close()

in outer
waiting for result1
in phase1
waiting for result2
in phase2
return value: ('result1', 'result2 derived from result1')


## Scheduling Calls to Regular Functions
schedule calls to regular functions based on the timer value in the loop.

In [5]:
# asyncio_call_soon.py
import asyncio
import functools


def callback(arg, *, kwarg='default'):
    print('callback invoked with {} and {}'.format(arg, kwarg))
    
async def main(loop):
    print('registering callbaks')
    loop.call_soon(callback, 1)
    wrapped = functools.partial(callback, kwarg='not default')
    loop.call_soon(wrapped, 2)
    
    await asyncio.sleep(0.1)
    
event_loop = asyncio.get_event_loop()
try:
    print('entering event loop')
    event_loop.run_until_complete(main(event_loop))
finally:
    print('closing event loop')
    # event_loop.close()

entering event loop
registering callbaks
callback invoked with 1 and default
callback invoked with 2 and not default
closing event loop


### Scheduling a Callback with a Delay

In [6]:
# asyncio_call_later.py

import asyncio

def callback(n):
    print('callback {} invoked'.format(n))
    
async def main(loop):
    print('registering callbacks')
    loop.call_later(0.2, callback, 1)
    loop.call_later(0.1, callback, 2)
    loop.call_soon(callback, 3)
    
    await asyncio.sleep(0.4)
    
event_loop = asyncio.get_event_loop()
try:
    print('entering event loop')
    event_loop.run_until_complete(main(event_loop))
finally:
    print('closing event loop')
    # event_loop.close()

entering event loop
registering callbacks
callback 3 invoked
callback 2 invoked
callback 1 invoked
closing event loop


### Scheduling a Callback for a Specific Time
Use `time()` method of event loop to start the internal state of clock; event loop uses a monotonic clock, not a wall-clock time, to ensure that value of "now" never regresses.

In [7]:
# asyncio_call_at.py

import asyncio
import time


def callback(n, loop):
    print('callback {} invoked at {}'.format(n, loop.time()))
    
    
async def main(loop):
    now = loop.time()
    print('clock time: {}'.format(time.time()))
    print('loop  time: {}'.format(now))
    
    print('registering callbacks')
    loop.call_at(now + 0.2, callback, 1, loop)
    loop.call_at(now + 0.1, callback, 2, loop)
    loop.call_soon(callback, 3, loop)
    
    await asyncio.sleep(0)
    

event_loop = asyncio.get_event_loop()
try:
    print('entering event loop')
    event_loop.run_until_complete(main(event_loop))
finally:
    print('closing event loop')
    # event_loop.close()

entering event loop
clock time: 1488725101.1836066
loop  time: 52733.020541308
registering callbacks
callback 3 invoked at 52733.021640027
closing event loop


## Producing Results Asynchronously
Future - the result of work that has not been completed yet.

### Waiting for a Future


In [8]:
# asyncio_future_event_loop.py
import asyncio


def mark_done(future, result):
    print('setting future result to {!r}'.format(result))
    future.set_result(result)

    
event_loop = asyncio.get_event_loop()
try:
    all_done = asyncio.Future()
    
    print('scheduling mark done')
    event_loop.call_soon(mark_done, all_done, 'the result')
    
    print('entering event loop')
    result = event_loop.run_until_complete(all_done)
    print('returning result: {!r}'.format(result))
finally:
    print('closing event loop')
    # event_loop.close()


print('future result: {!r}'.format(all_done.result()))

scheduling mark done
entering event loop
setting future result to 'the result'
callback 2 invoked at 52735.059753043
callback 1 invoked at 52735.059868977
returning result: 'the result'
closing event loop
future result: 'the result'


The result of the Future is returned by **`await`**.

In [9]:
# asyncio_future_await.py
import asyncio


def mark_done(future, result):
    print('setting future result to {!r}'.format(result))
    future.set_result(result)
    

async def main(loop):
    all_done = asyncio.Future()
    
    print('scheduling mark_done')
    loop.call_soon(mark_done, all_done, 'the result')
    
    result = await all_done
    print('returned result: {!r}'.format(result))
    

event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    pass
    # event_loop.close()

scheduling mark_done
setting future result to 'the result'
returned result: 'the result'


A Future can invoke callbacks when it is completed.

In [10]:
# asyncio_future_callback.py

import asyncio
import functools


def callback(future, n):
    print('{}: future done: {}'.format(n, future.result()))
    
    
async def register_callbacks(all_done):
    print('registering callbacks on future')
    all_done.add_done_callback(functools.partial(callback, n=1))
    all_done.add_done_callback(functools.partial(callback, n=2))
    

async def main(all_done):
    await register_callbacks(all_done)
    print('setting result of future')
    all_done.set_result('the result')
    
    
event_loop = asyncio.get_event_loop()
try:
    all_done = asyncio.Future()
    event_loop.run_until_complete(main(all_done))
finally:
    pass
    #event_loop.close()

registering callbacks on future
setting result of future
1: future done: the result
2: future done: the result


## Executing Tasks Concurrently 
Tasks - subclasses of Future, other coroutines can wait for them and each has a result.

### Starting a Task 
The resulting task will run as part of the concurrent operation

In [11]:
import asyncio


async def task_func():
    print('in task_func')
    return 'the result'


async def main(loop):
    print('creating task')
    task = loop.create_task(task_func())
    print('waiting for {!r}'.format(task))
    return_value = await task
    print('task completed {!r}'.format(task))
    print('return value: {!r}'.format(return_value))
    

event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    pass
    # event_loop.close()

creating task
waiting for <Task pending coro=<task_func() running at <ipython-input-11-e37780e9485b>:4>>
in task_func
task completed <Task finished coro=<task_func() done, defined at <ipython-input-11-e37780e9485b>:4> result='the result'>
return value: 'the result'


### Canceling a Task

In [12]:
# asyncio_cancel_task.py

import asyncio


async def task_func():
    print('in task_func')
    return 'the result'


async def main(loop):
    print('creating task')
    task = loop.create_task(task_func())
    
    print('canceling task')
    task.cancel()
    
    print('canceled task {!r}'.format(task))
    try:
        await task
    except  asyncio.CancelledError:
        print('caught error from canceled task')
    else:
        print('task result: {!r}'.format(task.result()))


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    pass
    # event_loop.close()

creating task
canceling task
canceled task <Task cancelling coro=<task_func() running at <ipython-input-12-cf37153fb0f9>:6>>
caught error from canceled task


If a task is canceled while it is waiting for another concurrent operation, the task is notified of its cancellation by having a CancelledError exception raised at the point where it is waiting.

In [13]:
# asyncio_cancel_task2.py
import asyncio


async def task_fun():
    print('in task_func, sleeping')
    try:
        await asyncio.sleep(1)
    except asyncio.CancelledError:
        print('task_func was canceled')
        raise
    return 'the result'

    
def task_canceller(t):
    print('in task_canceller')
    t.cancel()
    print('canceled the task')


async def main(loop):
    print('creating task')
    task = loop.create_task(task_func())
    loop.call_soon(task_canceller, task)
    try:
        await task
    except asyncio.CancelledError:
        print('task_func was canceled')
        raise
    return 'the result'
    

event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    pass
    # event_loop.close()

creating task
in task_func
in task_canceller
canceled the task


### Creating Tasks from Coroutines

**`ensure_future()`** returns a Task ties to the execution of a coroutine.

In [14]:
# asyncio_ensure_future.py
import asyncio


async def wrapped():
    print('wrapped')
    return 'result'


async def inner(task):
    print('inner: starting')
    print('inner: waiting for {!r}'.format(task))
    result = await task
    print('inner: task returned {!r}'.format(task))
    
    
async def starter():
    print('starter: creating task')
    task = asyncio.ensure_future(wrapped())
    print('starter: waiting for inner')
    await inner(task)
    print('starter: inner returned')
    

event_loop = asyncio.get_event_loop()
try:
    print('entering event loop')
    result = event_loop.run_until_complete(starter())
finally:
    pass
    # event_loop.close()



entering event loop
starter: creating task
starter: waiting for inner
inner: starting
inner: waiting for <Task pending coro=<wrapped() running at <ipython-input-14-c267531fe386>:5>>
wrapped
inner: task returned <Task finished coro=<wrapped() done, defined at <ipython-input-14-c267531fe386>:5> result='result'>
starter: inner returned


## Composing Coroutines with Control Structures

### Waiting for Multiple Coroutines
dividing one operation into many parts and execute them separately.

In [15]:
# asyncio_wait.py

import asyncio


async def phase(i):
    print('in phase {}'.format(i))
    await asyncio.sleep(0.1 * i)
    print('done with phase {}'.format(i))
    return 'phase {} result'.format(i)


async def main(num_phases):
    print('starting main')
    phases = [
        phase(i)
        for i in range(num_phases)
    ]
    print('waiting for phases to complete')
    completed, pending = await asyncio.wait(phases)
    results = [t.result() for t in completed]
    print('results: {!r}'.format(results))
    
event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(3))
finally:
    pass
    # event_loop.close()

starting main
waiting for phases to complete
in phase 1
in phase 2
in phase 0
done with phase 0
done with phase 1
done with phase 2
results: ['phase 2 result', 'phase 0 result', 'phase 1 result']


In [16]:
# asyncio_wait_timeout.py
import asyncio


async def phase(i):
    print('in phase {}'.format(i))
    try:
        await asyncio.sleep(0.1 * i)
    except asyncio.CancelledError:
        print('phase {} canceled'.format(i))
        raise
    else:
        print('done with phase {}'.format(i))
        return 'phase {} result'.format(i)


async def main(num_phases):
    print('starting main')
    phases = [
        phase(i)
        for i in range(num_phases)
    ]
    print('waiting 0.1 for phases to complete')
    completed, pending = await asyncio.wait(phases, timeout=0.1)
    print('{} completed and {} pending'.format(
        len(completed), len(pending),
    ))
   
    if pending:
        print('canceling tasks')
        for t in pending:
            t.cancel()
    print('exiting main')


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(3))
finally:
    pass
    # event_loop.close()


starting main
waiting 0.1 for phases to complete
in phase 1
in phase 2
in phase 0
done with phase 0
1 completed and 2 pending
canceling tasks
exiting main
phase 1 canceled
phase 2 canceled


### Gathering Results from Coroutines

In [17]:
# asyncio_gather.py
import asyncio


async def phase1():
    print('in phase1')
    await asyncio.sleep(2)
    print('done with phase1')
    return 'phase1 result'


async def phase2():
    print('in phase2')
    await asyncio.sleep(1)
    print('done with phase2')
    return 'phase2 result'


async def main():
    print('starting main')
    print('waiting for phases to complete')
    results = await asyncio.gather(
        phase1(),
        phase2(),
    )
    print('results: {!r}'.format(results))


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main())
finally:
    pass
    # event_loop.close()

starting main
waiting for phases to complete
in phase2
in phase1
done with phase2
done with phase1
results: ['phase1 result', 'phase2 result']


### Handling Background Operations as They Finish

In [18]:
# asyncio_as_completed.py
import asyncio


async def phase(i):
    print('in phase {}'.format(i))
    await asyncio.sleep(0.5 - (0.1 * i))
    print('done with phase {}'.format(i))
    return 'phase {} result'.format(i)


async def main(num_phases):
    print('starting main')
    phases = [
        phase(i)
        for i in range(num_phases)
    ]
    print('waiting for phases to complete')
    results = []
    for next_to_complete in asyncio.as_completed(phases):
        answer = await next_to_complete
        print('received answer {!r}'.format(answer))
        results.append(answer)
    print('results: {!r}'.format(results))
    return results


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(3))
finally:
    pass
    # event_loop.close()

starting main
waiting for phases to complete
in phase 2
in phase 1
in phase 0
done with phase 2
received answer 'phase 2 result'
done with phase 1
received answer 'phase 1 result'
done with phase 0
received answer 'phase 0 result'
results: ['phase 2 result', 'phase 1 result', 'phase 0 result']


## Synchronization Primitives
- `asyncio` applications as a single-threaded process are built as concurrent applications. 
- To safe concurrency, asyncio includes implementations of low-level primitives in the `threading` and `multiprocoessing` modules. 

### Locks

In [19]:
# asyncio_lock.py
import asyncio
import functools


def unlock(lock):
    print('callback releasing lock')
    lock.release()

    
async def coro1(lock):
    print('coro1 waiting for the lock')
    with await lock:
        print('coro1 acquired lock')
    print('coro1 released lock')
    

async def coro2(lock):
    print('coro2 waiting for the lock')
    await lock
    try:
        print('coro2 acquired lock')
    finally:
        print('coro2 released lock')
        lock.release()
        

async def main(loop):
    # Create
    lock = asyncio.Lock()
    print('acquiring the lock before starting coroutines')
    await lock.acquire()
    print('lock acquired: {}'.format(lock.locked()))

    # Schedule a callback to unlock the lock
    loop.call_later(0.1, functools.partial(unlock, lock))
    
    # Run
    print('waiting for coroutines')
    await asyncio.wait([coro1(lock), coro2(lock)])
    

event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    pass
    # event_loop.close()

acquiring the lock before starting coroutines
lock acquired: True
waiting for coroutines
coro1 waiting for the lock
coro2 waiting for the lock
callback releasing lock
coro1 acquired lock
coro1 released lock
coro2 acquired lock
coro2 released lock


### Events
`asyncio.Event` is based on `threading.Event` and is used to allow multiple consumers to wait something to happen

In [20]:
# asyncio_event.py

import asyncio
import functools


def set_event(event):
    print('setting event in callback')
    event.set()
    

async def coro1(event):
    print('coro1 waiting for event')
    await event.wait()
    print('coro1 triggered')
    
    
async def coro2(event):
    print('coro2 waiting for event')
    await event.wait()
    print('coro2 triggered')
    

async def main(loop):
    event = asyncio.Event()
    print('event start state: {}'.format(event.is_set()))
    
    loop.call_later(
        0.1,
        functools.partial(set_event, event)
    )
    
    await asyncio.wait([coro1(event), coro2(event)])
    print('event end state: {}'.format(event.is_set()))

    
event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    pass
    #event_loop.close()

event start state: False
coro1 waiting for event
coro2 waiting for event
setting event in callback
coro1 triggered
coro2 triggered
event end state: True


### Conditions
`condition` is used to notify all waiting coroutines the number of waiters awakened is controlled with an argument to `notify()`

In [21]:
# asyncio_condition.py

import asyncio

async def consumer(condition, n):
    with await condition:
        print('consumer {} is waiting'.format(n))
        await condition.wait()
        print('consumer {} is triggered'.format(n))
    print('ending consumer {}'.format(n))
    

async def manipulate_condition(condition):
    print('starting manipulate_condition')
    
    # pause to let consumers start
    await asyncio.sleep(0.1)
    
    for i in range(1, 3):
        with await condition:
            print('notifying {} consumers'.format(i))
            condition.notify(n=i)
        await asyncio.sleep(0.1)

    with await condition:
        print('notifying remaining consumers')
        condition.notify_all()
    
    print('ending manipulate_condition')
    
async def main(loop):
    # Create a condition
    condition = asyncio.Condition()
    
    # Set up tasks watching the condition
    consumers = [
        consumer(condition, i)
        for i in range(5)
    ]
    
    loop.create_task(manipulate_condition(condition))
    
    # Wait for the consumers to be done
    await asyncio.wait(consumers)
    
event_loop = asyncio.get_event_loop()
try:
    result = event_loop.run_until_complete(main(event_loop))
finally:
    pass
    # event_loop.close()

starting manipulate_condition
consumer 0 is waiting
consumer 2 is waiting
consumer 3 is waiting
consumer 1 is waiting
consumer 4 is waiting
notifying 1 consumers
consumer 0 is triggered
ending consumer 0
notifying 2 consumers
consumer 2 is triggered
ending consumer 2
consumer 3 is triggered
ending consumer 3
notifying remaining consumers
ending manipulate_condition
consumer 1 is triggered
ending consumer 1
consumer 4 is triggered
ending consumer 4


### Queue

In [22]:
# asyncio_queue.py

import asyncio

async def consumer(n, q):
    print('consumer {}: starting'.format(n))
    while True:
        print('consumer {}: waiting for item'.format(n))
        item = await q.get()
        print('consumer {}: has item {}'.format(n, item))
        if item is None:
            q.task_done()
            break
        else:
            await asyncio.sleep(0.01 * item)
            q.task_done()
    print('consumer {}: ending'.format(n))
    

async def producer(q, num_workers):
    print('producer: starting')
    # Add some numbers to the queue to simulate jobs
    for i in range(num_workers * 3):
        await q.put(i)
        
    print('producer: adding stop signals to the queue')
    for i in range(num_workers):
        await q.put(None)
    print('prodocer: waiting for queue to empty')
    await q.join()
    print('producer: ending')


async def main(loop, num_consumers):
    q = asyncio.Queue(maxsize=num_consumers)
    
    # Schedule the consumer tasks
    consumers = [
        loop.create_task(consumer(i, q))
        for i in range(num_consumers)
    ]
    
    # Schedule the producer task
    prod = loop.create_task(producer(q, num_consumers))

    # Wait for all of the coroutines to finish.
    await asyncio.wait(consumers + [prod])

    
event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop, 2))
finally:
    pass
    # event_loop.close()

consumer 0: starting
consumer 0: waiting for item
consumer 1: starting
consumer 1: waiting for item
producer: starting
consumer 0: has item 0
consumer 1: has item 1
consumer 0: waiting for item
consumer 0: has item 2
consumer 1: waiting for item
consumer 1: has item 3
producer: adding stop signals to the queue
consumer 0: waiting for item
consumer 0: has item 4
consumer 1: waiting for item
consumer 1: has item 5
prodocer: waiting for queue to empty
consumer 0: waiting for item
consumer 0: has item None
consumer 0: ending
consumer 1: waiting for item
consumer 1: has item None
consumer 1: ending
producer: ending


## Asynchronous I/O with Protocol Class Abstractions
Sample Echo Server and Client

### Echo Server

In [None]:
# asyncio_echo_server_protocol.py

import asyncio
import logging
import sys

SERVER_ADDRESS = ('localhost', 10000)

logging.basicConfig(
    level=logging.DEBUG,
    format='%(name)s: %(message)s',
    stream=sys.stderr,
)
log = logging.getLogger('main')

event_loop = asyncio.get_event_loop()


# The protocol object's methods are invoked based on events associated with
# the server socket.
class EchoServer(asyncio.Protocol):
    # each new client connection triggers a call to `connection_made()`
    def connection_made(self, transport):
        self.transport = transport
        self.address = transport.get_extra_info('peername')
        self.log = logging.getLogger(
            'EchoServer_{}_{}'.format(*self.address)
        )
        self.log.debug('connection accepted')

    # after a connection, when data is sent from the client to the server the
    # `data_received()` method of the protocol is invoked to pass the data in
    # for processing
    def data_received(self, data):
        self.log.debug('received {!r}'.format(data))
        self.transport.write(data)
        self.log.debug('sent {!r}'.format(data))

    # When an EOF is encountered, the `eof_received()` method is called
    def eof_received(self):
        self.log.debug('received EOF')
        if self.transport.can_write_eof():
            self.transport.write_eof()

    def connection_lost(self, error):
        if error:
            self.log.error('ERROR: {}'.format(error))
        else:
            self.log.debug('closing')
        super().connection_lost(error)


# Create the server and let the loop finish the coroutine before
# starting the real event loop.
factory = event_loop.create_server(EchoServer, *SERVER_ADDRESS)
server = event_loop.run_until_complete(factory)
log.debug('starting up on {} port {}'.format(*SERVER_ADDRESS))

try:
    # the event loop needs to be run in order to process events and handle
    # client requets.
    event_loop.run_forever()
finally:
    # When the event loop is stopped
    log.debug('closing server')
    server.close()
    event_loop.run_until_complete(server.wait_closed())
    log.debug('closing event loop')
    event_loop.close()

### Echo Client

In [None]:
#asyncio_echo_client_protocol.py


import asyncio
import functools
import logging
import sys

MESSAGES = [
    b'This is the message. ',
    b'It will be sent ',
    b'in parts.',
]
SERVER_ADDRESS = ('localhost', 10000)

logging.basicConfig(
    level=logging.DEBUG,
    format='%(name)s: %(message)s',
    stream=sys.stderr,
)

log = logging.getLogger('main')

event_loop = asyncio.get_event_loop()

class EchoClient(asyncio.Protocol):
    # the class constructor accepts two args
    #  - a list of the messages to send
    #  - a Future instance to use to signal that the client has completed a
    #  cycle of work by receiving a response from the server
    def __init__(self, messages, future):
        super().__init__()
        self.messages = messages
        self.log = logging.getLogger('EchoClient')
        self.f = future

    def connection_made(self, transport):
        self.transport = transport
        self.address = transport.get_extra_info('peername')
        self.log.debug(
            'connecting to {} port {}'.format(*self.address)
        )
        # This could be transport.writelines() except that
        # would make it harder to show each part of the message
        # being sent.
        for msg in self.messages:
            transport.write(msg)
            self.log.debug('sending {!r}'.format(msg))
        if transport.can_write_eof():
            transport.write_eof()

    def data_received(self, data):
        self.log.debug('received {!r}'.format(data))

    # when either an end-of-file marker is received or the connection is closed
    # from the server's side, the local transport object is closed and the
    # future object is marked as done by sending a result
    def eof_received(self):
        self.log.debug('received EOF')
        self.transport.close()
        if not self.f.done():
            self.f.set_result(True)

    def connection_lost(self, exc):
        self.log.debug('server closed connection')
        self.transport.close()
        if not self.f.done():
            self.f.set_result(True)
        super().connection_lost(exc)

# Normally the protocol class is passed to the event loop to create the
# connection. In this case, the event loop has no facility for passing extra
# arguments to the protocol constructor, it is necessary to create a partial to
# wrap the client class and pass the list of messages to send and the Future
# instance.
client_completed = asyncio.Future()

client_factory = functools.partial(
    EchoClient,
    messages=MESSAGES,
    future=client_completed,
)

factory_coroutine = event_loop.create_connection(
    client_factory,
    *SERVER_ADDRESS,
)

log.debug('waiting for client to complete')
try:
    event_loop.run_until_complete(factory_coroutine)
    event_loop.run_until_complete(client_completed)
finally:
    log.debug('closing event loop')
    event_loop.close()

## Asynchronous I/O Using Coroutines and Streams

### Echo Server

In [None]:
# asyncio_echo_server_coroutine.py

import asyncio
import logging
import sys

SERVER_ADDRESS = ('localhost', 10000)
logging.basicConfig(
    level=logging.DEBUG,
    format='%(name)s: %(message)s',
    stream=sys.stderr,
)
log = logging.getLogger('main')

event_loop = asyncio.get_event_loop()

async def echo(reader, writer):
    address = writer.get_extra_info('peername')
    log = logging.getLogger('echo_{}_{}'.format(*address))
    log.debug('connection accepted')
    while True:
        data = await reader.read(128)

        if data:
            log.debug('received {!r}'.format(data))
            writer.write(data)
            log.debug('sent {!r}'.format(data))
        else:
            log.debug('closing')
            writer.close()
            return

factory = asyncio.start_server(echo, *SERVER_ADDRESS)
server = event_loop.run_until_complete(factory)
log.debug('starting up on {} port {}'.format(*SERVER_ADDRESS))

# Enter the event loop permantly to handle all connections.
try:
    event_loop.run_forever()
except KeyboardInterrupt:
    pass
finally:
    log.debug('closing server')
    server.close()
    event_loop.run_until_complete(server.wait_closed())
    log.debug('closing event loop')
    event_loop.close()

### Echo Client

In [None]:
#asyncio_echo_client_protocol.py


import asyncio
import functools
import logging
import sys

MESSAGES = [
    b'This is the message. ',
    b'It will be sent ',
    b'in parts.',
]
SERVER_ADDRESS = ('localhost', 10000)

logging.basicConfig(
    level=logging.DEBUG,
    format='%(name)s: %(message)s',
    stream=sys.stderr,
)

log = logging.getLogger('main')

event_loop = asyncio.get_event_loop()

class EchoClient(asyncio.Protocol):
    # the class constructor accepts two args
    #  - a list of the messages to send
    #  - a Future instance to use to signal that the client has completed a
    #  cycle of work by receiving a response from the server
    def __init__(self, messages, future):
        super().__init__()
        self.messages = messages
        self.log = logging.getLogger('EchoClient')
        self.f = future

    def connection_made(self, transport):
        self.transport = transport
        self.address = transport.get_extra_info('peername')
        self.log.debug(
            'connecting to {} port {}'.format(*self.address)
        )
        # This could be transport.writelines() except that
        # would make it harder to show each part of the message
        # being sent.
        for msg in self.messages:
            transport.write(msg)
            self.log.debug('sending {!r}'.format(msg))
        if transport.can_write_eof():
            transport.write_eof()

    def data_received(self, data):
        self.log.debug('received {!r}'.format(data))

    # when either an end-of-file marker is received or the connection is closed
    # from the server's side, the local transport object is closed and the
    # future object is marked as done by sending a result
    def eof_received(self):
        self.log.debug('received EOF')
        self.transport.close()
        if not self.f.done():
            self.f.set_result(True)

    def connection_lost(self, exc):
        self.log.debug('server closed connection')
        self.transport.close()
        if not self.f.done():
            self.f.set_result(True)
        super().connection_lost(exc)

# Normally the protocol class is passed to the event loop to create the
# connection. In this case, the event loop has no facility for passing extra
# arguments to the protocol constructor, it is necessary to create a partial to
# wrap the client class and pass the list of messages to send and the Future
# instance.
client_completed = asyncio.Future()

client_factory = functools.partial(
    EchoClient,
    messages=MESSAGES,
    future=client_completed,
)

factory_coroutine = event_loop.create_connection(
    client_factory,
    *SERVER_ADDRESS,
)

log.debug('waiting for client to complete')
try:
    event_loop.run_until_complete(factory_coroutine)
    event_loop.run_until_complete(client_completed)
finally:
    log.debug('closing event loop')
    event_loop.close()

## Using SSL (Skip)

## Interacting with Domain Name Service (Skip)