# Qaio
### Асинхронная очередь задач и планировщик асинхронных сопрограмм в Python

## Асинхронная очередь задач : проблема

Разработка очереди задач, которая была бы
- Персистентной, отказоустойчивой
- Легко развертываемой
- Быстрой


Направления работы :
- Улучшение имевшегося наброска библиотеки на Redis
- Реализация очереди на Tarantool


## Интерфейс очереди задач

In [8]:
import aioredis
import aioredisqueue

redis = await aioredis.create_redis(('localhost', 6379), db=0)
queue = aioredisqueue.queue.Queue(redis, key_prefix='test_queue')

await queue.put('Some payload string')
task = await queue.get()
await task.ack()
task.payload

'Some payload string'

## Redis : обзор возможностей и проблем
- Key-value хранилище
- Типы данных : списки
- Изоляция транзакций : multi/exec и Lua scripting

## Redis : реализованный функционал
### Сериализация

### Доступные по умолчанию методы : raw, pickle, json

In [9]:
queue = aioredisqueue.queue.Queue(redis, serialization_method='pickle')
await queue.put({'a': 1, '2': ['b', [13]]})
task = await queue.get()
await task.ack()
task.payload

{'a': 1, '2': ['b', [13]]}

In [10]:
await queue.put(b'binary string', serialization_method='raw')
task = await queue.get()
await task.ack()
task.payload

b'binary string'

### Возможность регистрировать пользовательские методы

In [12]:
import json
from aioredisqueue.serializers import register

register('my_serializer', json.loads, lambda payload: payload)

await queue.put('{"a": 42}', serialization_method='my_serializer')
task = await queue.get()
await task.ack()
task.payload['a']

42

## Background requeueing
Улучшена очистка очереди от взятых, но не завершенных по таймауту задач

# Tarantool

- NoSQL хранилище и сервер приложений на Lua
- Lua - основной скриптовый язык
- Много готовых Lua модули от разработчиков, в том числе модуль очереди
- Асинхронность поддерживается на уровне чтения из базы данных

Проблемы, с которыми пришлось столкнуться
- Малое внимание от разработчиков к экосистеме проекта
- Отсутствие официального асинхронного коннектора для Python
- Ошибки в неофициальных коннекторе и библиотеке для очередей от разработчика Tarantool
- Сложность конфигурации инстансов Tarantool для этой задачи

# Перспективы
- Использование нового типа данных Redis - streams
- Реализация в Redis настраиваемых таймаутов для задач
- Использование библиотек для Tarantool от сторонних разработчиков или написание своих

# Планировщик асинхронных сопрограмм (корутин)

- Асинхронная обработка запросов упирается в вычисления с ограниченной скоростью работы
- Ограничение количества и планировка вызовов корутин
- Троттлинг запросов в nginx : leaky bucket + delay

![Alt text](nginx.png)

## Статический эмулятор шедулера
As simple as this :

In [82]:
from aioleakybucket import static

zones = {'login': {'size': 1000, 'rate': 5}}
resources = {'/login': {'zone': 'login', 'burst': 12, 'delay': 8}}
limiter = static.RequestLimiter(zones, resources)

request_times = [0.1, 0.2, 0.3, 0.4, 0.42, 0.6, 0.7, 0.73, 0.8, 0.82, 0.9, 1.1, 1.2, 1.3, 1.5, 1.6, 
                 1.7, 1.85, 1.9, 2.1, 2.3, 2.4, 2.5, 2.7, 2.8]
requests = [(time, 0, '/login') for time in request_times]

for request in requests:
    result = limiter.get_request_delay(request)
    print(request, " => ", result)

(0.1, 0, '/login')  =>  (0.1, 0, True, 0.0, 1)
(0.2, 0, '/login')  =>  (0.2, 0, True, 0.0, 2)
(0.3, 0, '/login')  =>  (0.3, 0, True, 0.0, 3)
(0.4, 0, '/login')  =>  (0.4, 0, True, 0.0, 3)
(0.42, 0, '/login')  =>  (0.42, 0, True, 0.0, 4)
(0.6, 0, '/login')  =>  (0.6, 0, True, 0.0, 4)
(0.7, 0, '/login')  =>  (0.7, 0, True, 0.0, 5)
(0.73, 0, '/login')  =>  (0.73, 0, True, 0.0, 5)
(0.8, 0, '/login')  =>  (0.8, 0, True, 0.0, 6)
(0.82, 0, '/login')  =>  (0.82, 0, True, 0.0, 7)
(0.9, 0, '/login')  =>  (0.9, 0, True, 0.0, 7)
(1.1, 0, '/login')  =>  (1.1, 0, True, 0.0, 7)
(1.2, 0, '/login')  =>  (1.2, 0, True, 0.0, 8)
(1.3, 0, '/login')  =>  (1.3, 0, True, 0.0, 8)
(1.5, 0, '/login')  =>  (1.5, 0, True, 0.0, 8)
(1.6, 0, '/login')  =>  (1.6, 0, True, 0.19999999999999996, 9)
(1.7, 0, '/login')  =>  (1.7, 0, True, 0.30000000000000004, 9)
(1.85, 0, '/login')  =>  (1.85, 0, True, 0.3500000000000001, 10)
(1.9, 0, '/login')  =>  (1.9, 0, True, 0.5000000000000004, 10)
(2.1, 0, '/login')  =>  (2.1, 0, Tr

In [96]:
import utils

request_intervals = [
    (0, 3, 5),
    (3, 5, 8),
    (5, 6, 7),
    (6, 10, 4),
    (10, 12, 3),
    (12, 13, 7),
    (13, 14, 12),
    (14, 18, 4)
]
request_times = utils.combine_lists(*[utils.generate_times(*interval) for interval in 
                                      request_intervals])

zones = {'login': {'size': 1000, 'rate': 5}}
resources = {'/login': {'zone': 'login', 'burst': 12, 'delay': 8}}
limiter = static.RequestLimiter(zones, resources)
(fails, success_rate) = utils.try_requests(limiter, [(time, 0, '/login') for time in request_times])
print(fails, success_rate)

0 1.0


In [226]:
request_times = utils.generate_times(0, 1000, 8)
zones = {'login': {'size': 1000, 'rate': 5}}
resources = {'/login': {'zone': 'login', 'burst': 12, 'delay': 8}}
limiter = static.RequestLimiter(zones, resources)
(fails, success_rate) = utils.try_requests(limiter, [(time, 0, '/login') for time in request_times])
print(success_rate)

0.626375


In [111]:
import random

In [220]:
sample = random.choices(
    [0,   1,    2,   3,    39,    40,    83,    94],
    [60, 16, 10, 10, 1,  1,  1,  1],
    k=6000
)
request_times = utils.generate_times_from_sample(sample)
zones = {'login': {'size': 1000, 'rate': 5}}
resources = {'/login': {'zone': 'login', 'burst': 100, 'delay': 10}}
limiter = static.RequestLimiter(zones, resources)
(fails, success_rate) = utils.try_requests(limiter, [(time, 0, '/login') for time in request_times])
print(fails, success_rate)

0 1.0


# Декораторы для корутин

In [None]:
from aioleakybucket.dynamic
zones = {'login': {'size': 1000, 'rate': 5}}
dynamic.config(zones)

@dynamic.limit_calls('ip', 'login', 100, 8)
async def target_function(ip):
    # Time-consuming calculation
    await queue.put({'a': 1, '2': ['b', [13]]})
    task = await queue.get()
    await task.ack()
    return task.payload

# Спасибо за внимание!