### **Python - базы данных и миграции**

In [None]:
#vibo: запускаем Wireshark — анализатор сетевых протоколов 
#vibo: дать разрешение sudo chmod +x /usr/bin/dumpcap
#vibo: обновиться, запустить loopback
#vibo: отфильтровать по pgsql

**Клинет-серверный протокол PostgreSQL:** 
- Клиент -> cообщение (TCP socket) -> PostgreSQL
- Клиент <- cообщение (TCP socket) <- PostgreSQL

**Фазы клиент-серверного протокола PostgreSQL:**
- фаза авторизации (только сообщения про авторизацию)
- фаза сеанса (гоняем сообщения - запросы/ответы)
- завершение соединения

Два ***протокола*** выполнения запросов PostgreSQL:
- Simple Query (протокол простых запросов)
- Extended Query (протокол расширенных запросов)

#### **=== Simple Query (протокол простых запросов) ===**

1. Клиент -> Query: SELECT 1 -> PostgreSQL      (запрос данных)
2. Клиент <- RowDescription <- PostgreSQL       (описание возвращаемых данных)
3. Клиент <- DataRow <- PostgreSQL              (возврат данных, НЕ ДОЖИДАЯСЬ ДОПОЛНИТЕЛЬНЫХ КОМАНД ОТ КЛИЕНТА)
4. Клиент <- CommandComplete <- PostgreSQL      (сообщение о завершении передачи данных)
5. Клиент <- ReadyForQuery <- PostgreSQL        (готов к запросу)

**СООБЩЕНИЕ != ЗАПРОС** (сообщение может содержать несколько запросов)

**Особенности протокола простых запросов:**
- экранировать запрос необходимо клиенту;
- в одном сообщении можно передать несколько запросов, разделенных точкой с запятой;
- сервер возвращает все данные после того, как отправлен запрос, нет возможности получать их частями на уровне протокола, но можно читать их по одному из сокета на уровне ***драйвера*** или используя ***курсоры***;
- можно отправить несколько сообщений с запросами не дожидаясь ответа и затем обработать ответы в таком же порядке;
- этот протокол реализован **psycopg (2, 3)** и **aiopg**

**драйвер** - программа, реализующая протокол определенной базы данных.

**курсор** - объект в python, который позволяет выполнять запросы и получать результаты.

In [3]:
#vibo: выполним запрос generate_series, некий аналог range из python, только в postgres
import psycopg2


conn = psycopg2.connect("postgresql://postgres:hackme@localhost")

with conn.cursor() as cur:
    cur.execute("SELECT generate_series(%s, %s);", (0, 1))
    print(cur.fetchall())

[(0,), (1,)]


В клиенте видим, что запрос уже ушел с подставленными 0 и 1 -> SELECT generate_series(0, 1)

    Frame 363: 121 bytes on wire (968 bits), 121 bytes captured (968 bits) on interface lo, id 0
    Ethernet II, Src: 00:00:00_00:00:00 (00:00:00:00:00:00), Dst: 00:00:00_00:00:00 (00:00:00:00:00:00)
    Internet Protocol Version 6, Src: ::1, Dst: ::1
    Transmission Control Protocol, Src Port: 44406, Dst Port: 5432, Seq: 61, Ack: 409, Len: 35
    PostgreSQL
        Type: Simple query
        Length: 34
        Query: SELECT generate_series(0, 1);


In [4]:
#vibo: выполним несколько запросов в одном сообщении
import psycopg2


conn = psycopg2.connect("postgresql://postgres:hackme@localhost")

with conn.cursor() as cur:
    cur.execute("SELECT 1; SELECT 2;")
    print(cur.fetchall())

[(2,)]


Ответ запроса 1. потерялся, если хотим увидеть обе строки, то делаем запрос через оператор - **UNION**

In [6]:
#vibo: выполним несколько запросов в одном сообщении с оператором UNION
import psycopg2


conn = psycopg2.connect("postgresql://postgres:hackme@localhost")

with conn.cursor() as cur:
    cur.execute("SELECT 1 UNION SELECT 2;")
    print(cur.fetchall())

[(1,), (2,)]


In [8]:
#vibo: а если сделать несколько сообщений, не дожидаясь ответа
import psycopg2


conn = psycopg2.connect("postgresql://postgres:hackme@localhost")

with conn.cursor() as cur:
    cur.execute("SELECT pg_sleep(5); SELECT 1;")
    print(cur.fetchall())                             #vibo: если не выводить увидим только результат последнего запроса
    cur.execute("SELECT pg_sleep(5); SELECT 2;")
    print(cur.fetchall())

[(1,)]
[(2,)]


**psycopg2 не позволяет отправить сообщение 2 не дождавшись ответа на сообщение 1**

In [9]:
#vibo: ок, пишем свой драйвер, запросы ушли одновременно (чего мы и добивались), но на результат это не повлияло. Скорость ответа осталась такая же. 


**ИТОГО: ПРОТОКОЛ ПРОСТЫХ ЗАПРОСОВ**
- чтобы получить все данные из нескольких запросов в одном сообщении нужен UNION;
- в **psycopg2** есть интерфейс для экранирования данных, но работает он на стороне клиента;
- можно передавать сообщения с запросами не дожидаясь ответа на предыдущие, но это не дает никаких преимуществ по времени;
- если хочется выполнить несколько запросов параллельно - нужно несколько соединений.

In [15]:
#vibo: вот так можно прочитать базу данных из лекции по scooters
import psycopg2

conn = psycopg2.connect("postgres://postgres:hackme@localhost:5432/scooters")

with conn.cursor() as cur: 
    cur.execute("SELECT * FROM scooters")
    print(cur.fetchall())                             

[('58c5080f-6726-42f3-a997-f143ad984201', '(37.46829361638372,55.05982583193995)', None), ('8f484690-f92b-417e-9b68-2bd58f1ed700', '(37.05780740487775,55.683570543711426)', None), ('becd1d25-1ab8-4f89-a310-e8177a94093f', '(37.94867354172965,55.313234742670744)', 'df65a1d8-e3c5-452d-a5b0-4e8b8abd55d3'), ('917e9894-0822-4a33-9bfc-5121b937b637', '(37.14825826460845,55.00325180712231)', None), ('5f32966e-87c9-4725-bf18-3d8439044c11', '(37.14117043538411,55.131343164062876)', None), ('45bec7d8-b60e-4fc0-9752-c0b3f6d204ee', '(37.70299881637874,55.75460344872543)', None), ('5898c44f-bfb8-4b6f-a14a-36e5797828b1', '(37.0970577108166,55.644623924666305)', None), ('abfbb1f6-c9b5-47a5-9f4f-0da16763310b', '(37.238075893682485,55.157759477336185)', None), ('0533e457-40f9-468a-ba70-4f1903b55d6b', '(37.26313503696669,55.3111815759084)', None), ('aff866f5-6877-4eea-9c19-b90980dd0d65', '(37.446735940409766,55.702897019510004)', None)]


***ОЧЕНЬ ВАЖНО ПОНИМАТЬ!!!*** PostgreSQL не может параллельно (асинхронно) обрабатывать запросы. Когда есть соединение с PostgreSQL, мы работаем с ним синхронно или асинхронно в зависимости от выбора протокола. Разница заключается в том, что когда мы работаем **синхронно** - и дошли до извлечения данных наше приложение засыпает до тех пор пока к нему не пришли данные, а если приложение **асинхронное** - у нас засыпает не весь процесс (тред), а засыпает корутина, т.е. ваш код может продолжать работать дальше. Т.к. в асинхронном режиме не весь процесс засыпает, и нам никто не мешает создать еще одно соединение и во втором соединении начать выполнять еще одну команду и дожидаться, что PostgreSQL на нее скажет.

#### **=== Extended Query (протокол расширенных запросов) ===**

1. Запрос Query -> Команда **Parse** с запросом -> на стороне PostgreSQL появляется сущность "Prepared statement"
2. Командой "Descibe" мы можем увидеть, какие данные для нас подготовлены, что будет нам выдавать PostgreSQL ("PretranedDescription", "RowDescription")
3. Делаем команду **Bind**, на стороне PostgreSQL создается новая сущность Portal
4. Командой "Descibe" мы можем увидеть, типы данных ("RowDescription")
5. Выполняем команду **Execute** (в команде можем контролировать сколько строк хотим получить) -> PostgreSQL возвращает нам строки данных DataRow

**Протокол расширенных запросов:**
- можно не посылать запрос каждый раз, а переиспользовать ранее посланные;
- позволяет экранировать параметры средствами PostgreSQL;
- можно контролирвоать процесс получения данных от сервера (для этого не нкжно создавать отдельный курсор);
- в одном сообщении может быть толко один запрос;
- процесс выполнени запроса разбит на несколько шагов: **parse, bind, execute**;
- реализован в **asyncpg** и **psycopg3**

In [18]:
!pip install asyncpg

Collecting asyncpg
  Downloading asyncpg-0.26.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.7/2.7 MB[0m [31m160.1 kB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: asyncpg
Successfully installed asyncpg-0.26.0


In [14]:
#vibo: пример кода с расширенным протоколом запросов
import asyncio
import asyncpg

async def main():
    #vibo: подключаемся через asyncpg к PostgreSQL
    conn = await asyncpg.connect('postgresql://postgres:hackme@localhost')
    async with conn.transaction():
        cursor = await conn.cursor(
            #vibo: указываем отдельно строку, отдельно параметры
            'SELECT generate_series(0, $1) as id', 1
        )
        #vibo: по одной строке забираем из базы данных
        print(await cursor.fetch(1))    #[<Record id=0>]
        print(await cursor.fetch(1))    #[<Record id=1>]

In [15]:
#vibo: эту функцию можно запустить из jupyter так
await main()

[<Record id=0>]
[<Record id=1>]


In [13]:
#vibo: или через python
!python main1.py

[<Record id=0>]
[<Record id=1>]


Видим в wireshark следующее:

36	9.372794274	::1	::1	PGSQL	178	>P/D/H          #vibo: запрос Parse - Describe - Flush
37	9.373112400	::1	::1	PGSQL	130	<1/t/T          #vibo: овет Parse completion
38	9.373551040	::1	::1	PGSQL	154	>B/S            #vibo: запрос Bind
39	9.373650575	::1	::1	PGSQL	97	<2/Z            #vibo: ответ Bined completion
40	9.373765348	::1	::1	PGSQL	121	>E/S            #vibo: запрос1 Execute
41	9.373783438	::1	::1	PGSQL	112	<D/s/Z          #vibo: собственно Data row 1
42	9.373901571	::1	::1	PGSQL	121	>E/S            #vibo: запрос2 Execute
43	9.373911852	::1	::1	PGSQL	112	<D/s/Z          #vibo: собственно Data row 2
44	9.374088159	::1	::1	PGSQL	99	>Q              #vibo: Simple Query
45	9.374120895	::1	::1	PGSQL	104	<C/Z            #vibo: Ready Query 
46	9.374199691	::1	::1	PGSQL	91	>X              #vibo: Termination

Видим (36), что сразу отправляются три команды Parse - Describe - Flush

Frame 36: 178 bytes on wire (1424 bits), 178 bytes captured (1424 bits) on interface lo, id 0
Ethernet II, Src: 00:00:00_00:00:00 (00:00:00:00:00:00), Dst: 00:00:00_00:00:00 (00:00:00:00:00:00)
Internet Protocol Version 6, Src: ::1, Dst: ::1
Transmission Control Protocol, Src Port: 55784, Dst Port: 5432, Seq: 86, Ack: 409, Len: 92
PostgreSQL
    Type: Parse
    Length: 61
    Statement: __asyncpg_stmt_3__
    Query: SELECT generate_series(0, $1) as id
    Parameters: 0
PostgreSQL
    Type: Describe
    Length: 24
    Statement: __asyncpg_stmt_3__
PostgreSQL
    Type: Flush
    Length: 4

В ответе обращаем внимание на Type OID: 23 (по этому параметру asyncpg понимает, как ему данные мапить). Стандартные данные в PostgreSQL 23 = int4

Frame 37: 130 bytes on wire (1040 bits), 130 bytes captured (1040 bits) on interface lo, id 0
Ethernet II, Src: 00:00:00_00:00:00 (00:00:00:00:00:00), Dst: 00:00:00_00:00:00 (00:00:00:00:00:00)
Internet Protocol Version 6, Src: ::1, Dst: ::1
Transmission Control Protocol, Src Port: 5432, Dst Port: 55784, Seq: 409, Ack: 178, Len: 44
PostgreSQL
    Type: Parse completion
    Length: 4
PostgreSQL
    Type: Parameter description
    Length: 10
    Parameters: 1
        Type OID: 23
PostgreSQL
    Type: Row description
    Length: 27
    Field count: 1

**Параметры в DDL**

In [20]:
#vibo: создадим таблицу с какими-нибудь данными от пользователя и постараемся ее экранировать силами PostgreSQL
import asyncio
import asyncpg

async def main():
    #vibo: подключаемся через asyncpg к PostgreSQL
    conn = await asyncpg.connect(
        'postgresql://postgres:hackme@localhost'
    )

    await conn.execute(
        'CREATE TABLE $1(id SERIAL PRIMARY KEY)', 'mytable'
    )

    await conn.execute(
        'CREATE TABLE mytable($1 SERIAL PRIMARY KEY)', 'mycolumn'
    )

In [21]:
#vibo: эту функцию можно запустить из jupyter так
await main()

PostgresSyntaxError: syntax error at or near "$1"

**PostgresSyntaxError: syntax error at or near "$1" - НЕЛЬЗЯ ЭКРАНИРОВАТЬ DDL НА УРОНЕ ПРОТОКОЛА**, если хотим использовать запросы в которых есть пользовательский ввод, которому мы не доверяем - прийдется экранировать это на уровне клиента, на уровне протокола не получится.

**Почему DDL работает в SQLAlchemy 1.4?**

In [24]:
!pip install sqlalchemy

Collecting sqlalchemy
  Downloading SQLAlchemy-1.4.40-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m412.3 kB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
Installing collected packages: sqlalchemy
Successfully installed sqlalchemy-1.4.40


In [31]:
#vibo: используем SQLAlchemy 1.4+, т.к. в ней появился asyncpg
import asyncio
from sqlalchemy import Table, MetaData, Column, Integer
from sqlalchemy.ext.asyncio import create_async_engine

metadata = MetaData()
example = Table('example', metadata, Column('id', Integer()))

async def main():
    engine = create_async_engine(
        'postgresql+asyncpg://postgres:hackme@localhost'
    )
    async with engine.begin() as conn:
        await conn.run_sync(metadata.create_all)

In [32]:
#vibo: эту функцию можно запустить из jupyter так
await main()

**ВСЕ РАБОТАЕТ! SQLAlchemy экранирует данные на стророне клиента** Запрос CREATE TABLE отправляется без всяких $1, он уже полностью сформирован. Alchemy сделала за нас всю работу. Магии в этом нет DDL экранировать в данном случае нельзя. Еще одна особенность использования данного протокола - это ораничение на количество переменных (32767 + 1 -> лимит, органичение). В  SQLAlchemy такие органичения отсутствуют.

**ИТОГО: ПРОТОКОЛ РАСШИРЕННЫХ ЗАПРОСОВ**
- протокол более сложный в реализации, но позволяет более эффективно работать с запросами, данными (как минимум экономим на команде parse);
- дает больше контроля над передачей данных на уровне протокола;
- состояние описывается несколькими сущностями;
- не позволяет экранировать DDL, нужно делать на клиенте (как в psycopg2);
- его нет в psycopg2, есть в **psycopg3** и в **libpq**.

**экранирование** - ограничение, накладываемое на данные от пользователя, например, чтобы в одном сообщении на регистрацию в базе данных не пришел запрос DROP TABLE.

#### **=== Д Р А Й В Е Р Ы ====**

**Синхронные**:
- **psycopg2, 3**
- pg8000
- PyGreSQL
- py-postgresql
- psycopg2cffi

**Ассинхронные**:
- **aiopg**
- **asyncpg**
- psycopg3

libpq - интерфейс PostgreSQL для программирования приложений на языке Си. Содержит набор функций, которые позволяют клиенту передавать запросы серверу PostgreSQL и принимать результаты этих запросов.

DBAPI 2.0 (PEP 249) - набор рекомендаций по реализации драйверов, предлагающий единый синтаксис и способ доступа к базе данных.

**psycopg2** (наиболее популярный синхронный драйвер с очень богатыми возможностями):
- используется в SQLAlchemy и Django
- реализован на Си (только CPython)
- использует libpq
- потокобезопасный (одно соединение в нескольких потоках)
- умеет экранировать DDL на клиенте
- DictCursor, RealDictCursor, NamedTupleCursor
- LoggingConnection, Logging Cursor
- COPY TO, COPY FROM
- LogicalReplicationConnection, PhysicalReplicationConnection и др.


**psycopg3**
- поддерживает оба протокола запросов в PosgreSQL
- может использоваться как в синхронном, так и асинхронном режиме
- поддерживается в SQLAlchemy 2.0 (она еще в разработке)
- использует libpq
- пулы соединений (пакет psycopg_pool)

**pg8000** (реализация синхронного драйвера на чистом python, но медленный)
- не требует libpq
- совместим с CPython, Jython, PyPy

**aiopg** (ассинхронная реализация psycopg2)

**asyncpg** (самый быстрый ассинхронный драйвер для CPython)
- использует бинарный протокол PostgreSQL, напрямую, без DBAPI
- только Prepared statement
- транзакции
- курсоры и частичная интеграция по данным
- пулы соединениц

#### **=== Как подключиться? ===**

**Docker**

docker run --rm \
    --detach \
    --publish 5432:5432 \
    --env POSTGRES_DB=test \
    --env POSTGRES_USER=user \
    --env POSTGRES_PASSWORD=hackme \
    postgres

**Локально**

In [5]:
import psycopg2

conn = psycopg2.connect('postgresql://postgres:hackme@localhost')
cur = conn.cursor()

cur.execute("SELECT 1")
data = cur.fetchone() # <class 'tuple'>: (1,)
print(data)

cur.close()
conn.close()

(1,)


Оъекты Connection и Cursor представляют интерфейс контекстного менеджера (если бы не это, то пришлось бы писать close)

In [7]:
import psycopg2

with psycopg2.connect('postgresql://postgres:hackme@localhost') as conn:
    with conn.cursor() as cur:
        cur.execute("SELECT 1")
        data = cur.fetchone() # <class 'tuple'>: (1,)
        print(data)

(1,)


**Получаем несколько строк (все результаты запроса)**
Cursor.fetchall() возвращает все (оставшиеся) результаты запроса в виде кортежа, либо пустой список

In [9]:
import psycopg2

with psycopg2.connect('postgresql://postgres:hackme@localhost') as conn:
    with conn.cursor() as cur:
        cur.execute("SELECT * FROM generate_series(1, 999)")
        data = cur.fetchall() # <class 'list'>: [(1,), (2,), (3,), (4,), ...]
        #print(data)

In [10]:
#vibo: несмотря на то, что мы просим одно значение PostgreSQL возвращает все
import psycopg2

with psycopg2.connect('postgresql://postgres:hackme@localhost') as conn:
    with conn.cursor() as cur:
        cur.execute("SELECT * FROM generate_series(1, 999)")
        
        for row in cur:
            print(row) # <class 'tuple'>: (1,)
            break

(1,)


#### **=== К У Р С О Р Ы ===**

- позволяют выполнять запросы и получать результаты
- бывают client-side и server-side (именованные)
- в одном соединении может быть несколько курсоров одновременно, но только один курсор работает в один момент времени
- курсоры одного соединение не изолированы

**На стороне КЛИЕНТА:**
- conn.cursor()
- обычный объект в Python
- получает результаты запроса сразу целиком в память
- подходят дял работы с небольшим количеством данных
- неэффективно расходуют память

**На стороное СЕРВЕРА:**
- conn.cursor(name='somename')
- специальный объект в PostgreSQL, создается с помощью команды DECLARE
- может получать результаты запроса по частям
- может двигаться по результатам вперед-назад
- хороши для эффективной работы с большим количеством данных
- позволяют по кусочкам отрабатывать результаты сразу нескольких запросов

**Итерация по курсору на стороне сервера**

In [14]:
import psycopg2

#vibo: подключаемся к PostgreSQL
with psycopg2.connect('postgresql://postgres:hackme@localhost') as conn:
    #vibo: создаем именованный курсок example
    with conn.cursor(name='example') as cur:
        
        #vibo: изменяем количество объектов, которые курсор будет забирвть каждый раз (по умолчанию их 2000)
        # by default: 2000
        cur.itersize = 1

        #vibo: задаем range от 1 до 999
        cur.execute("SELECT * FROM generate_series(1, 999)")
        
        #vibo: берем первую строчку и прерываемся
        for row in cur:
            print(row) # <class 'tuple'>: (1,)
            break

(1,)


In [17]:
import psycopg2

with psycopg2.connect('postgresql://postgres:hackme@localhost') as conn:
    with conn.cursor(name='example') as cur:
        cur.itersize = 1
        cur.execute("SELECT * FROM generate_series(1, 5)")

        print(cur.fetchall()) # [(1,), (2,), (3,), (4,) (5,)]

        # cur.scroll (0, mode='absolute')
        cur.scroll(-6)

        print(cur.fetchall()) # [(1,), (2,), (3,), (4,) (5,)]

[(1,), (2,), (3,), (4,), (5,)]
[(1,), (2,), (3,), (4,), (5,)]


**DictCursor** (позволяет обращаться к контейнеру с данными по названию столбцов)

In [18]:
import json
import psycopg2
from psycopg2.extras import DictCursor

with psycopg2.connect('postgresql://postgres:hackme@localhost') as conn:
    with conn.cursor(cursor_factory=DictCursor) as cur:
        cur.execute("SELECT * FROM generate_series(1, 999) as col")
        row = cur.fetchone() # {psycopg2.extras.DictRow} [1]
        row[0] # {int} 1
        row['col'] # {int} 1
        json.dumps(row) # {str} "[1]"

**RealDictCursor** (основным преимуществом является простота получения данных в виде json)

In [21]:
import json
import psycopg2
from psycopg2.extras import RealDictCursor

with psycopg2.connect('postgresql://postgres:hackme@localhost') as conn:
    with conn.cursor(cursor_factory=RealDictCursor) as cur:
        cur.execute("SELECT * FROM generate_series(1, 999) as col")
        row = cur.fetchone() # {psycopg2.extras.RealDictRow} [1]
        row['col'] # {int} 1
        row[0] # KeyError
        json.dumps(row) # {str} "{'col": 1}"

KeyError: 0

**NamedTupleCursor** (вызов через точку)

In [22]:
import psycopg2
from psycopg2.extras import NamedTupleCursor

with psycopg2.connect('postgresql://postgres:hackme@localhost') as conn:
    with conn.cursor(cursor_factory=NamedTupleCursor) as cur:
        cur.execute("SELECT * FROM generate_series(1, 999) as col")
        row = cur.fetchone() # {psycopg2.extras.Record} [1]
        row[0] # {int} 1
        row.col # {int} 1

#### **=== Т Р А Н З А К Ц И И ===**

- psycopg2 создает транзакию перед выполнением первого запроса
- следующие запросы (в т.ч. других курсоров) выполняются в контексте одной транзакции
- чтобы изменения применились необходимо явно обратитсья **Connection.commit()**
- если какой-либо запрос завершается неудачно, транзакция будет прервана. Никакой другой запрос не будет выполнен до вызова метода **rollback()**
- в одном соединении можно выполнять несколько почледовательных транакций

In [24]:
import psycopg2

with psycopg2.connect('postgresql://postgres:hackme@localhost') as conn:
    with conn.cursor() as cur:
        # 1st transaction
        cur.execute(
            "CREATE TABLE users(id SERIAL PRIMARY KEY, name TEXT)"
        )
        conn.commit()

        # 2st transaction
        cur.execute(
            "INSERT INTO users(name) VALUES ('Elon Musk')"
        )
        conn.commit()

- многие драйверы (или абстаркции более высокого уровня) представляют контекстные менеджеры для работы с транакциями
- если контекстный менеджер завершаетс яуспешно - вызывается Connection.commit() и все изменения сохраняются
- если внутри контекстного менеджена происходит ошибка - вызывается Connection.rollback() и все изменения откладываются 

**Как сделать контекстный менеджер для транзакций**

In [28]:
from psycopg2.extensions import STATUS_IN_TRANSACTION

class TransactionCtx:
    def __init__(self, conn):
        self.conn = conn
    
    def __enter__(self):
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        if self.conn.status == STATUS_IN_TRANSACTION:
            if exc_val:
                self.conn.rollback()
            else:
                self.conn.commit()


**Как испоьзовать контекстный менеджер для транзакций**

In [29]:
import psycopg2

with psycopg2.connect('postgresql://postgres:hackme@localhost') as conn:
    with TransactionCtx(conn) as transaction:
        with conn.cursor() as cur:
            cur.execute('SELECT 1')
            raise RuntimeError


RuntimeError: 

**Точки сохранения в транзакциях** (SAVEPOINT позволяет откатить все команды, выполненные после нее и восстановить состояние на момент утсановки этой точки)

In [None]:
with psycopg2.connect('postgresql://postgres:hackme@localhost') as conn:
    with conn.cursor() as cur:
        cur.execute("CREATE TABLE users(id ...)")
        cur.execute("SAVEPOINT sp1")

        try:
            cur.execute("CREATE TABLE users(id ...)")
        except psycopg2.errors.DatabaseError:
            cur.execute("ROLLBACK TO SAVEPOINT sp1")
        
        conn.commit()

**RETURNING** (позволяет получать данные из модифицируемых строк в процессе их обработки в запросах INSERT, UPDATE, DELETE)


In [31]:
import psycopg2

query = "INSERT INTO users(name) VALUES ('John') RETURNING users.*"

with psycopg2.connect('postgresql://postgres:hackme@localhost') as conn:
    with conn.cursor() as cur:
        cur.execute(query)
        row = cur.fetchone() # <class 'tuple'>: (1, 'John')
        conn.commit()

In [32]:
import psycopg2

query = "DELETE FROM users RETURNING users.*"

with psycopg2.connect('postgresql://postgres:hackme@localhost') as conn:
    with conn.cursor() as cur:
        cur.execute(query)
        row = cur.fetchall() # <class 'list'>: (1, 'John')
        conn.commit()

**UPSERT: update или insert**
- указывает действие, выполняемое в случае нарушения ограничения уникальности или ограничения-исключения
- DO NOTHING - отмняет добавление строки
- DO UPDATE - производит изменение строки
- позволят выполнять операции (если нет - создать, если есть - обновить) атомарно за один запрос, можно совмещать с RETURNING
- не может дважды за один запрос обрабоать одну и ту же строку 

In [7]:
import psycopg2

with psycopg2.connect('postgresql://postgres:hackme@localhost') as conn:
    with conn.cursor() as cur:
        cur.execute("""
            CREATE TABLE domains (
                id SERIAL PRIMARY KEY, name TEXT UNIQUE, owner TEXT
            )
        """)
        conn.commit()


In [8]:
import psycopg2

with psycopg2.connect('postgresql://postgres:hackme@localhost') as conn:
    with conn.cursor() as cur:

        cur.execute("""
            INSERT INTO domains (name, owner) VALUES
            ('example.com', 'John Travolta')
        """)

        cur.execute("""
            INSERT INTO domains (name, owner) VALUES
            ('example.com', 'Angelina Jolie')
            ON CONFLICT (name) DO UPDATE SET owner = EXCLUDED.owner
            RETURNING domains.*
        """)

        # <class 'tuple'>: (1, 'example.com', 'Angelina Jolie')
        row = cur.fetchone()
        conn.commit()

In [9]:
conn = psycopg2.connect("postgresql://postgres:hackme@localhost")

with conn.cursor() as cur: 
    cur.execute("SELECT * FROM domains")
    print(cur.fetchall())   

[(1, 'example.com', 'Angelina Jolie')]


**SELECT FOR UPDATE** (блокирует выбранные строки для изменения; защищает от блокировки, изменения и удаленич другими транзакциями до заврешения текущей)


In [10]:
import psycopg2

with psycopg2.connect('postgresql://postgres:hackme@localhost') as conn:
    with conn.cursor() as cur:
        cur.execute(
            "SELECT * FROM domains WHERE name = %s FOR UPDATE",
            ('example.com', )
        )

        domain_id, name, *_ = cur.fetchone()

In [13]:
with psycopg2.connect('postgresql://postgres:hackme@localhost') as conn:
    with conn.cursor() as cur:
        cur.execute(
            """
            UPDATE domains SET owner = %(owner)s
            WHERE id = %(domain_id)s
            """,
            {
                'owner': 'New Company Ins',
                'domain_id': domain_id
            }
        )

        conn.commit()

**Сырые SQL запросы**

Недостатки:
- нельзя переиспользвать/расширять
- большие запросы сложно читать, писать во вложенных конструкциях из-за отступов
- нет инструментов для динамического построения запросов
- при миграциях нужно погрепать все сырые запросы в проекте

Достоинства:
- не требуется дополнительных библиотек
- нет накладных расходов на обработку

**Query builder**

Недостатки:
- за удобство приходится платить накладными расходами - временем на генерацию запросов и памятью

Достоинства:
- Python-синтаксис, легко разделять на части и читать
- запросы можно переиспользовать и расширять
- построение динамических запросов
- при изменениях достатоно статического анализа кода

#### **=== SQL Alchemy===**

- поддерживает огромное количество драйверов, баз данных, очень богатые возможности, при этом очень гибкая и дает честный единый интерфейс
- отличная документация, большое сообщество, активно разрабатывается
- предлагает ORM из коробки, а также много совместимых инструментов для разработки, миграций и тестирования
- не диктует каких-либо фреймворков и библиотек
- у нее очень крутой разработчик!

**ORM (Object-Relational Mapping)** - объектно-реляционное отображение, или преобразование - технология программирования,, которая связывает базы данных с концепциями объектно-ориентированных языков программирования, создавая "виртуальную объектную базу данных".

**Из чего состоит Alchemy:**

**1. Dialect** используется для общения с различными имплементациями DBAPI, драйверами и базами данных

По умолчанию из коробки доступны:
- PostgreSQL
- MySQL
- SQLite
- Oracle
- Microsoft SQL Server

**2. Engine** скрывает за собой пул подключений и диалект, которые работают непосредственно с модулями DBAPI (драйвером) и базой данных

In [2]:
from sqlalchemy import create_engine

#vibo: говорим engine подключайся сюда, он сам выбирает драйвер
engine = create_engine(
    'postgresql://postgres:hackme@localhost', echo=True
)
with engine.connect() as conn:

    # {sqlalchemy.engine.result.RowProxy} (1,)
    data = conn.execute('SELECT 1').fetchone()

2022-08-11 10:54:22,371 INFO sqlalchemy.engine.Engine select pg_catalog.version()
2022-08-11 10:54:22,371 INFO sqlalchemy.engine.Engine [raw sql] {}
2022-08-11 10:54:22,374 INFO sqlalchemy.engine.Engine select current_schema()
2022-08-11 10:54:22,375 INFO sqlalchemy.engine.Engine [raw sql] {}
2022-08-11 10:54:22,377 INFO sqlalchemy.engine.Engine show standard_conforming_strings
2022-08-11 10:54:22,378 INFO sqlalchemy.engine.Engine [raw sql] {}
2022-08-11 10:54:22,379 INFO sqlalchemy.engine.Engine SELECT 1
2022-08-11 10:54:22,379 INFO sqlalchemy.engine.Engine [raw sql] {}


**3. MetaDate и Table**

- **MetaDate** - контейнер, который содержит информацию о схеме быз данных: таблицах, индексах, типах данных и т.д.
- **Table** - содержит описание таблиц. Для того, чтобы SQLAlchemy могла генерировать запросы ей требуется информация о таблицах (типы столбцов, индексы и т.п)

**Создание таблицы с помощью SQLAlchemy**

In [8]:
from sqlalchemy import MetaData, Table, Column, Integer, String

metadata = MetaData()
domains = Table(
    'domains',
    metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String, unique=True),
    Column('owner', String),
)

engine = create_engine(
    'postgresql://postgres:hackme@localhost', echo=True
)
with engine.connect() as conn:
    metadata.create_all(engine)

2022-08-11 11:05:05,926 INFO sqlalchemy.engine.Engine select pg_catalog.version()
2022-08-11 11:05:05,927 INFO sqlalchemy.engine.Engine [raw sql] {}
2022-08-11 11:05:05,929 INFO sqlalchemy.engine.Engine select current_schema()
2022-08-11 11:05:05,930 INFO sqlalchemy.engine.Engine [raw sql] {}
2022-08-11 11:05:05,931 INFO sqlalchemy.engine.Engine show standard_conforming_strings
2022-08-11 11:05:05,931 INFO sqlalchemy.engine.Engine [raw sql] {}
2022-08-11 11:05:05,934 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2022-08-11 11:05:05,935 INFO sqlalchemy.engine.Engine select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where pg_catalog.pg_table_is_visible(c.oid) and relname=%(name)s
2022-08-11 11:05:05,936 INFO sqlalchemy.engine.Engine [generated in 0.00072s] {'name': 'domains'}
2022-08-11 11:05:05,938 INFO sqlalchemy.engine.Engine COMMIT


**Добавление данных с помощью Alchemy**

In [10]:
with engine.begin() as conn:

    query = domains.insert().values([
        {'name': 'example.org', 'owner': 'Vim Diesel'},
        {'name': 'anoter.com', 'owner': 'Pg Dumpledore'}
    ]).returning(*domains.columns)

    # <clas 'list'>: [(1, 'example.org', 'Vim Die..]
    data = conn.execute(query).fetchall()

2022-08-11 11:18:21,074 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2022-08-11 11:18:21,076 INFO sqlalchemy.engine.Engine INSERT INTO domains (name, owner) VALUES (%(name_m0)s, %(owner_m0)s), (%(name_m1)s, %(owner_m1)s) RETURNING domains.id, domains.name, domains.owner
2022-08-11 11:18:21,077 INFO sqlalchemy.engine.Engine [no key 0.00078s] {'name_m0': 'example.org', 'owner_m0': 'Vim Diesel', 'name_m1': 'anoter.com', 'owner_m1': 'Pg Dumpledore'}
2022-08-11 11:18:21,082 INFO sqlalchemy.engine.Engine COMMIT


**Получение данных с помощью SQLAlchemy**

In [12]:
with engine.connect() as conn:
    
    query = domains.select()

    # <class 'list'>: [(1, 'example.org', 'Vim Die...]
    data = conn.execute(query).fetchall()

2022-08-11 11:20:43,916 INFO sqlalchemy.engine.Engine SELECT domains.id, domains.name, domains.owner 
FROM domains
2022-08-11 11:20:43,917 INFO sqlalchemy.engine.Engine [cached since 616.4s ago] {}


**Расширение запросов**

In [31]:
#vibo: пример 1.

def get_domains(is_deleted=False) -> Select:
    return domains.select().where(
        domains.c.is_deleted == is_deleted
    )

def get_expiring_domains() -> Select:
    date = datetime.now() + timedelta(days=2)
    return get_domains().where(domains.expire_at < date)

ModuleNotFoundError: No module named 'Select'

In [35]:
#vibo: пример 2.

def get_domains(is_deleted=False, filter_query=) -> Select:
    query = domains.select().where(
        domains.c.is_deleted == is_deleted
    )
    
    if filter_query:
        subq = filter_query.alias()
        query = select(query.columns).select_from(
            quert.jion(
                subq, subq.c.domain_id == query.c.domain_id
            )
        )
        
    return query

SyntaxError: invalid syntax (331547766.py, line 3)

**Query builder SQLAlchemy и асинхронный код**

- Core можно использовать в качестве генератора запросов с асинхронными драйверами (aiopg для psycopg2, asyncpgsa для asyncpg)
- в этом случае Engine не будет использоваться, SQLAlchemy будет только генерировать запросы, а ввыполняться они могут отдельным драйвером

#### **=== ORM - Object Relational Mapping ===**

- абстракция высокого уровня, позволяет работать с данными с использованием объектно-ориентированной парадигмы
- позволяет размещать всю логику (методы, константы), связанную с сущностью в класс

ORM Объект (Сессия) ---> Query builder --> SQL запросы

**Declarative: SQLAlchemy ORM**:
- с помощью метакласса создается атрибут класса Table, связывает поля объекта со столбцами таблицы (Mapper), добавляет ссылку на объект MetaData
- для синхронизации состояния объектов в базе данных и объектной модели используется специальный объект - Session
- предоставляет механизм кэширования создания объектов и запросов - Baked Queries (после 1.4/2.0 больше не требуется)

**Как описать сущность в Declarative**

In [26]:
from sqlalchemy.ext.declarative import declarative_base

#vibo: возвращаем базовый класс, от которого можем наследоваться
Base = declarative_base()

#vibo: описываем уже не таблицу, а класс
class Domain(Base):
    __tablename__ = 'domains'

    id = Column(Integer, primary_key=True)
    name = Column(String, unique=True)
    owner = Column(String)

    def __repr__(self):
        return "<Domain(name='%s', owner='%s')>" %(
            self.name, self.owner
        )
    
    def get_zone(self) -> str:
        return self.name.split('.')[-1]

**Как работать с объектами ORM**

In [34]:
from sqlalchemy.orm import sessionmaker

print(Domain.__table__) # {Table} domains
print(Domain.metadata) # MetaData (bind=None)

engine = create_engine(
    'postgresql://postgres:hackme@localhost', echo=True
)

#vibo: это уже сессия Alchemy
Session = sessionmaker(bind=engine)
session = Session()

#vibo: создаем объект domain, добавляем его в сессию
domain = Domain(name='example1.com', owner='Ed Jones')
session.add(domain)
#vibo: комитим
session.commit()

domains
MetaData()
2022-08-11 14:28:09,320 INFO sqlalchemy.engine.Engine select pg_catalog.version()
2022-08-11 14:28:09,321 INFO sqlalchemy.engine.Engine [raw sql] {}
2022-08-11 14:28:09,324 INFO sqlalchemy.engine.Engine select current_schema()
2022-08-11 14:28:09,325 INFO sqlalchemy.engine.Engine [raw sql] {}
2022-08-11 14:28:09,327 INFO sqlalchemy.engine.Engine show standard_conforming_strings
2022-08-11 14:28:09,328 INFO sqlalchemy.engine.Engine [raw sql] {}
2022-08-11 14:28:09,330 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2022-08-11 14:28:09,333 INFO sqlalchemy.engine.Engine INSERT INTO domains (name, owner) VALUES (%(name)s, %(owner)s) RETURNING domains.id
2022-08-11 14:28:09,334 INFO sqlalchemy.engine.Engine [generated in 0.00156s] {'name': 'example1.com', 'owner': 'Ed Jones'}
2022-08-11 14:28:09,337 INFO sqlalchemy.engine.Engine COMMIT


**В SQLAlchemy 1.4+ добавлена поддержка asyncpg:**
- асинхронная только работа с сокетом, но фактически логика внутри работы с данными не меняется
- можно асинхронно работать как в Core так и в ORM режимах
- провязка реализована с помощью greenlet (аналог coroutine)
- накладные расходы порядка 15%

#### **=== М И Г Р А Ц И И ===**

- некоторый код, который меняет структуру базы данных, а иногда и сами данные
- код, который откатывает изменения обратно
- процесс, при котором применяется код, меняющий базу данных

**Какими свойствами должны обладать миграции:**
- атомарность - миграция (или группа миграций) должна быть применена либо полностью, либо никак
- обратимость - миграции должны содержать код, который позволит вернуться к предыдущему состоянию
- упорядоченнотсь - должно быть понятно, в каком порядке нужно накатывать миграции, какую катить следующей

**Alembic** (стек Alchemy):
- построен поверх SQLAlchemy
- умеет генерировать код миграций, используя объект MetaData
- позволяет писатьочень сложную бизнесс-логику без каких-либо ограничений
- имеет два режима: online (можно выполнять запросы) и offline (генерирует SQL, который можно выполнить позже)

In [None]:
!pip install alembic

**Как начать использовать alembic**
- $ alembic init alembic
- после инициализации будут созданы:
- alembic/env.py
- alembic/script.py.mako
- alembic/versions
- alembic.ini

**script.py.mako - шаблон для миграций**

Как выглядит этот файл. Есть некий шаблон по которому будут осуществляться миграции. Шаблон состоит из идентификаторов. up_revision - индентификатор миграции, down_revision - индентификатор предыдущей миграции. И есть два метода, когда мы хотим накатить миграцию или откатить миграцию. Важн отметить, т.к. Alembic генерирует нам миграции, то в таблицах мы не указываем, как должны называться все индексы, разные ключи и т.д. Хорошим тоном считаетс указать, как Alchemy собирается это делать. 

In [None]:
'''$(massege)
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
'''
from alembic import op
import sqlalchemy as sa
${imports if importe else ""}

# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}

**Naming conventions**

In [37]:
convention = {
    'all_colunm_names': lambda constraint, table: '_'.json([
        column.name for column in constraint.columns.values()
    ]),
    'ix': 'ix__%(all_column_names)s',
    'uq': 'uq__%(table_name)s__%(all_column_names)s',
    'ck': 'ck__%(table_name)s__%(constraint_name)s',
    'fk': (
        'fk__%(table_name)s__',
        '%(all_column_names)s__',
        '%(referred_table_name)s'
    ),
    'pk': 'pk__%(table_name)s'
}

metadata = MetaData(naming_convention=convention)

Полученную metadate мы передаем в declarative base, от которого мы наследуемся

**Базовый класс для моделей**

In [None]:
@as_declarative(metadata=metadata)
class Base:
    @declared_attr
    def created_at(cls):
        return Column(DateTime(timezone=True),
                     server_default=text('clock_timestamp()'),
                     nullable=False)

**Подключение alembic в проект**

в файле alembic/env.py необходимо подключить объект MetaData, который содержит всю информацию о нашей схеме

In [40]:
# add your model's MetaDate object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = None

**Генерация миграций**

In [None]:
!alembic revision --massage="Initial" --autogenerate

- получает существующую схему базы данных
- сравнивает с содержимым объекта MetaData
- генерирует новый файл DDL, котроый приводит базу в одинаковое состояние с объектом MetaData используя шаблон script.py.mako

**Миграция**

In [None]:
"""Initial

Revision ID: 7ca4000d1b18
Revises: 
Create DAte: 2019-10-04 15:39:47.168093

"""
from alembic import op
import sqlalchemy as sa

#revision identifiers, used by Alembic.
revision = '7ca4000d1b18'
down_revision = None
branch_labels = None
depends_on = None

In [None]:
def upgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.create_table(
        'domains',
        sa.Column('id', sa.Integer(), nullable=False),
        sa.Column('name', sa.String(), nullable=False),
        sa.Column('owner', sa.String(), nullable=False),
        sa.Column('created_at', sa.DateTime(timezone=True),
                  server_default=sa.text('clock_timestamp()'),
                  nullable=False),
        sa.PrimaryKeyConstraint('id', name=op.f('pk__domains')),
        sa.UniqueConstrain('name', name=op.f('uq__domains_name'))
    )
    # ### end Alembic commands ###

In [None]:
def downgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.drop_table('domains')
    # ### end Alembic commands ###

**Применение миграций**

Для применения миграций необходимо выполнить команду upgrade head

In [None]:
!alembic upgrade head

Готово, работы минимум. Вы описали модели, позвали команду генерации миграций, прокатили.

**Как и когда применять миграции**

- если приложение on-premises или доступ к нему ограничен - можно и нужно накатывать миграции автоматически, например при запуске приложения
- если в вас важный сервис, доступ к которому есть - накатывание миграций вручную дает больший контроль и позволяет оперативно реагировать на проблемы

**Как быть с необратимми изменениями - НЕ ДЕАЛАЙТЕ ЭТО СРАЗУ!!!**
- если вам большене нужна большая таблица или столбец, и вы хотите их удалить без возможности восстановить данные - не стоит их удалять сразу
- убрать все обращения к ресурсам в коде как будто их не существует
- столбцы/таблицы/типы данных стоит пометить специальным декторатором, который будет сообщать о доступе к ресурсам, которые должны быть удалены
- создать задачу в бэког на последующее удаление

**Batch operations (еще одна функциональность у Alimpic)**

SQLite поддерживает ограниченное подмножество ALTER TABLE
- переименовать таблицу
- переименовать существующий столбец
- добавить столбец

Alembic предоставляет batch-режим, который:
- создает новую таблицу на основе писания миграци, используя временное имя
- копирует данные в новую таблицу из существующей
- существующая таблица удаляется
- новая таблица переименовывается в существующее имя таблицы

**О чем стоит помнить**
- менять данные на стороне Python - медленно. Лучше писать запросы, которые Alembic может выполнтиь полностью на стороне Postgres
- если требуемые операции реализовать на стороне Postgres невозможно - используйте пагинацию
- добавление поля в postgres без значения по умолчанию - очень быстро. С умолчанием - очень медленно. Добавление столбца без умолчания + добавление умолчания работает в итоге быстрее
- создание индексов на таблицах и в режиме CONCURRENTLY может помочь избежать блокировок

**Зачем тестировать миграции**
- применение миграции в production всегда сопряжено с риском
- базы данных разработки и тестирования, как правило, меньше и чище. Данные в них лучше понимаются или, если все остальное не удается, объем данных достаточно мал для обработки человеком
- Production базы данных обычно огромны, стары и полны сюрпризов

**Из-за чего могут возникнуть проблемы**
- поврежденные данные, которые были написаны старыми версиями программного обеспечения и не очищены должным образом
- подразумеваемые зависимости в данных, о которых больше никто не знает
- люди непосредственно меняют базу данных без испоьзования назначенных инструментов
- ошибки в инструментах миграции схем
- ошибки в предположениях о том, как следует переностить данные

**Что мы можем проверить**
- запускается ли миграция в целом, нет ли опечаток
- не забыт ли downgrade
- не забыл ли разработчик удалить все типы данных, которые SQLAlchemy создает автоматически
- если во время миграции изменяются данные - то корректно ли они изменяются и правильно ли откатываются обратно
- соответствуют ли миграции (текущее состояние базы) описанным моделям

**Как тестировать - Stairway тест** (лесенка)
- простой, но эффективный метод проверить - полностью ли миграция откатила изменения. Можно добавить в проект 1 раз и забыть
- тест получает список всех миграций, и итерируетс по ним. Дял каждой он вызывает upgrade, downgrade и еще раз upgrade.

**Тестирование миграций, изменяющих данные**
- если в миграции не просто добавляется новый столбец или таблица, а каким-либо образом меняются существующие данные - то ошибка может иметь самые серьезные последствия
- обычно такие миграции кажутся чем-то простым, но по оптыту именно в них бывает очень много багов, которые очень сложно обнаружить
- дорогой в разработке, но очень надежный

**Как тестировать миграции, меняющие данные**
- применяются все миграции до тестируемой (не включительно)
- создается набор данных, который будет изменен тестируемой миграцией
- выполняется upgrade и проверяется, что все данные корреткно изменены
- выполняется downgrade до предыдущей миграции, проверяется, что все изменения были корректно отменены

**Тест на соответствие моделей миграцияи**
- позволяет проверить, не забыл ли разработчик меняя что-то в моделях или миграциях поменять во втором месте
- тест прогоняет все миграции, получает состояние базы данных с помощью MetaData reflection и ищет отличия (тем же механизмом, которым alembic генерирует миграции)

**Пара советов по тестированию**
- базу данных лучше создавать отдельную на каждый тест - это позволит запускать много тестов параллельно и все тесты будут максимально изолированы друг от друга
- лучше не создавать/удалять базу данных вручную - модуль sqlalchemy_utils справится с этим лучше
- пишите простые понятные фикстуры

#### **=== СОЕДИНЕНИЯ В PostgreSQL ===**

**Напишем простое приложение**

Приложение будет создавать соединение с Postgres, отправлять запросы и получать данные. Т.к. Python3 позволяет писать асинхронные приложения будем использовать **aiohttp** и **aiopg**. Для каждого соединения в Postgres создается отдельный процесс. Процесс польностью изолирован и в случае падения другие клиенты будут работать не заметив никаких проблем.

**Простое приложение: скелет**

In [None]:
#vibo: создаем init
async def init(app):
    #vibo: создаем соединение и приложение хапускается
    app['conn'] = ... # app['conn'] = ... и request.app['conn'] один и тот же объект
    yield

async def get_data(conn):
    ...

async def handle(request):
    #vibo: когда приходит запрос - вызываем метод get_data
    result = await get_data(request.app['conn']) # app['conn'] = ... и request.app['conn'] один и тот же объект
    #vibo: возвращаем ответ клиенту
    return aiohttp.web.json_response(result)

logging.basicConfig(level=logging.DEBUG)

#vibo: создаем объект Application
app = aiohttp.web.Application()
#vibo: добавляем ему route на корень
#vibo: если человек будет обращаться к этому корню - 
#vibo: будет вызываться функция handle
app.router.add_route('GET', '/', handle)
#vibo: добавляем приложению в cleaanup контекст функцию init
#vibo: это значит, что когда приложение будет запускаться оно выполнит все в init до yield,
#vibo: а когда приложение будет останавливаться оно выполнит оставшуюся часть
app.cleanup_ctx.append(init)
#vibo: после этого запускаем приложение на 8081-порту
aiohttp.web.run_app(app, port=8081)

**Простое приложение: реализация**

In [4]:
!pip install aiopg

Collecting aiopg
  Downloading aiopg-1.3.4-py3-none-any.whl (34 kB)
Collecting async-timeout<5.0,>=3.0
  Downloading async_timeout-4.0.2-py3-none-any.whl (5.8 kB)
Installing collected packages: async-timeout, aiopg
Successfully installed aiopg-1.3.4 async-timeout-4.0.2


In [25]:
!pip install aiohttp

Collecting aiohttp
  Downloading aiohttp-3.8.1-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl (1.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/1.2 MB[0m [31m3.0 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m0m
Collecting multidict<7.0,>=4.5
  Downloading multidict-6.0.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (114 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m114.2/114.2 kB[0m [31m10.9 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting aiosignal>=1.1.2
  Downloading aiosignal-1.2.0-py3-none-any.whl (8.2 kB)
Collecting frozenlist>=1.1.1
  Downloading frozenlist-1.3.1-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (158 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m158.8/158.8 kB[0m [31m318.4 kB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
Collecting yarl<2.0,>=1.0
  Downloading yarl-1.8.1-cp39-cp39-manylin

In [16]:
#vibo: используем асинхронный контекстный менеджер

import aiopg
import psycopg2.extras

#vibo: подключаемся к Postgres
async def init(app):
    async with aiopg.connect(
        #vibo: предварительно надо создать бд
        'postgresql://postgres:hackme@localhost/mydb'
    ) as conn:
        #vibo: сохраняем соединение в объект приложения
        #vibo: запускается риложение, как-то обслуживаются клиенты
        app['conn'] = conn
        yield
        #vibo: выходим из yield
    #vibo: выходим из блока aiopg.connect, соединение разрывается

#vibo: реализация функции get_data
async def get_data(conn):
    #vibo: берем курсор соединения
    #vibo: курсор - это специальный объект, который в питоне предназначен для соединения и обработки данных
    #vibo: указываем aiopg, что берем не простой курсор, а реализующий dict
    async with conn.cursor(
        cursor_factory=psycopg2.extras.RealDictCursor
    ) as cur:
        #vibo: выполняем запрос range от 0 до 1000
        await cur.execute("SELECT generate_series(0, 1000) as id")
        return await cur.fetchall()

In [53]:
!python example1.py

DEBUG:asyncio:Using selector: EpollSelector
(Press CTRL+C to quit)
ERROR:aiohttp.server:Error handling request
Traceback (most recent call last):
  File "/home/vibo/vs_code/venv-vsc/lib/python3.9/site-packages/aiohttp/web_protocol.py", line 435, in _handle_request
    resp = await request_handler(request)
  File "/home/vibo/vs_code/venv-vsc/lib/python3.9/site-packages/aiohttp/web_app.py", line 504, in _handle
    resp = await handler(request)
  File "/home/vibo/vs_code/example1.py", line 24, in handle
    result = await get_data(request.app['conn'])
  File "/home/vibo/vs_code/example1.py", line 20, in get_data
    await cur.execute("SELECT generate_series(0, 1000) as id")
  File "/home/vibo/vs_code/venv-vsc/lib/python3.9/site-packages/aiopg/connection.py", line 416, in execute
    waiter = self._conn._create_waiter("cursor.execute")
  File "/home/vibo/vs_code/venv-vsc/lib/python3.9/site-packages/aiopg/connection.py", line 867, in _create_waiter
    raise RuntimeError(
RuntimeError: cur

Через Htop видим отдельный честный процесс Postgre...mybd 

**Проверим, что приложение работает**

In [None]:
import asyncio
import aiohttp

async def request(session):
    async with session.get('http://localhost:8081') as resp:
        return await resp.json()

async def main():
    async with aiohttp.ClientSession(raise_for_status=True) as session:
        return await request(session)

print(asyncio.run(main()))

In [46]:
!python example2.py

[{'id': 0}, {'id': 1}, {'id': 2}, {'id': 3}, {'id': 4}, {'id': 5}, {'id': 6}, {'id': 7}, {'id': 8}, {'id': 9}, {'id': 10}, {'id': 11}, {'id': 12}, {'id': 13}, {'id': 14}, {'id': 15}, {'id': 16}, {'id': 17}, {'id': 18}, {'id': 19}, {'id': 20}, {'id': 21}, {'id': 22}, {'id': 23}, {'id': 24}, {'id': 25}, {'id': 26}, {'id': 27}, {'id': 28}, {'id': 29}, {'id': 30}, {'id': 31}, {'id': 32}, {'id': 33}, {'id': 34}, {'id': 35}, {'id': 36}, {'id': 37}, {'id': 38}, {'id': 39}, {'id': 40}, {'id': 41}, {'id': 42}, {'id': 43}, {'id': 44}, {'id': 45}, {'id': 46}, {'id': 47}, {'id': 48}, {'id': 49}, {'id': 50}, {'id': 51}, {'id': 52}, {'id': 53}, {'id': 54}, {'id': 55}, {'id': 56}, {'id': 57}, {'id': 58}, {'id': 59}, {'id': 60}, {'id': 61}, {'id': 62}, {'id': 63}, {'id': 64}, {'id': 65}, {'id': 66}, {'id': 67}, {'id': 68}, {'id': 69}, {'id': 70}, {'id': 71}, {'id': 72}, {'id': 73}, {'id': 74}, {'id': 75}, {'id': 76}, {'id': 77}, {'id': 78}, {'id': 79}, {'id': 80}, {'id': 81}, {'id': 82}, {'id': 83}, {

**Сколько одновременных запросов выдержит приложение?** (ответ - одно, говорили выше, postgres не выполняет параллельные запросы в одном соединении, это защита драйвера). Если поробуем выполнить два одновременных запроса на первый запрос ответ - 200, на второй - 500 (ошибка) - called while another coroutine is already waiting for incoming data

In [None]:
import asyncio
import aiohttp

async def request(session):
    async with session.get('http://localhost:8081') as resp:
        return await resp.json()

async def main():
    async with aiohttp.ClientSession(raise_for_status=True) as session:
        coros = [request(session) for _ in range(2)]
        return await asyncio.gather(*coros)

print(await main())

RuntimeError: cursor.execute() called while another coroutine is already waiting for incoming data
INFO:aiohttp.access:127.0.0.1 [11/Aug/2022:17:48:19 +0000] "GET / HTTP/1.1" 500 244 "-" "Python/3.9 aiohttp/3.8.1"
INFO:aiohttp.access:::1 [11/Aug/2022:17:48:19 +0000] "GET / HTTP/1.1" 200 13064 "-" "Python/3.9 aiohttp/3.8.1"

**Подведем итоги:**
- это можно починить подключаясь к БД каждый запрос, или если отдельно брать блокировку на соединение в приложении
- пока соединение выполняет запрос, другой конкурентно выполнять нельзя (например позвать gather) 
- RPS(чтение) = Секунда/(время на 1 запрос); при 50ms/запрос - лучшим возможным результатом будет 20 RPS
- RPS на запись может быть ниже, в зависимости от требуемых блокировок
- вывод - приложению нужно больше соединений!

#### **RPS(чтение) = Секунда/(время на 1 запрос); при 50ms/запрос - лучшим возможным результатом будет 20 RPS**

In [47]:
RPS = 1 / (50 * 0.001) #vibo: мили сек
RPS

20.0

#### **=== ПУЛЫ СОЕДИНЕНИЙ ===**

**Соединения дешевле не закрывать**

**Много соединений в приложении. Проблемы?**
- открытвать соединения иногда дорого (терминирование SSL, создание нового процесса, и т.д.), но их не обязательно закрывать
- максимальное количество подключений к Postgres ограничивается директивой max_connections (по умолчанию 100), а также ограничено физическими ресурсами и др.
- на старте приложения скорее всего одного подключения не хватит, нужно при запуске создвать минимальное количество соединений
- пики обращений (необходимость иногда обрабатывать больше клиентов, чем есть соединений)

Приложение --> Пул или группа соединений (сообщение, сообщение) --> Postgres (процесс, процесс)
Приложение --> Пул или группа соединений (сообщение, сообщение) --> Postgres (процесс, процесс)

**Как устроен пул соединений**

In [55]:
#vibo: ЭТО ИГРУШЕЧНЫЙ ПУЛ, НЕ ДЛЯ PRODUCTION

import asyncio
import asyncpg

#vibo: класс
class Pool:
    #vibo: принимает в инит строчку для подключения бд
    #vibo: у него есть размер, т.е. количество соединений, которые нужно создать
    def __init__(self, dsn, size: int = 10):
        self.dsn = dsn
        self.size = size
        self.started = False
        #vibo: самое важно - асинхронная очередь
        self.conns = asyncio.Queue()
    
    #vibo: метод старт, вызываем и делаем сразу все соединения (10 штук)
    async def start(self):
        #vibo: создаем корутины в цикле
        coros = [asyncpg.connect(self.dsn) for _ in range(self.size)]
        #vibo: зовем им всем gather
        for conn in await asyncio.gather(*coros):
            #vibo: кладем все в очередь и говорим, что пул готов к работе
            await self.conns.put(conn)
        self.stareted = True
    
    #vibo: пробегаем по всем соединения, забираем информацию и закрываем
    async def close(self):
        while not self.conns.empty():
            conn = self.conns.get_nowait()
            await conn.close()

Как нам из него получить соединение, чтобы еще ни с кем за него не подраться...

**Метод acquire**

In [None]:
#vibo: это асинхронный контекстный менеджер, т.е. это такой декоратор, который позволяет эту функцию асинхронную сделать 
#vibo: не асинхронным генератором, а асинхронным контекстным менеджером
#vibo: когда будем в него входить - все выполнится до yield, когда выходить - все после yield 

import asyncio
import contextlib

class Pool:
    ...
    @contextlib.asynccontextmanager
    async def acquire(self) -> Connection:
        #vibo: если пул не был запущен
        if not self.started:
            #vibo: мы его запускаем
            await self.start()
        #vibo: получаем из очереди первое освободившееся соединение,
        #vibo: если соединения нет - зависаем, пока соединение не появится
        conn = await self.conns.get()
        try:
            yield conn
        finally:
            #vibo: после того, как операция завершена - возвращаем соединение, чтобы его могла вязть другая корутина
            #vibo: соответственно мы можем запускать много корутин, которые будут работать с меньшим количеством соединений,
            #vibo: за счет блокировок этой очереди
            await self.conns.put(conn)

**Как использовать наш игрушечный пул соединений**

In [None]:
import asyncio
import asyncpg
import contextlib

async def get_data(pool: Pool):
    async with pool.acquire() as conn:
        return await conn.fetch("SELECT generate_series(0, 1000) as id")

async def request(session):
    async with session.get('http://localhost:8081') as resp:
        return await resp.json()

async def main():
    #vibo: говорим, что мы хотим в пул только два соединения
    pool = Pool('postgresql://postgres:hackme@localhost', 2)
    #vibo: потом говорим, что хотим создать сразу пять корутин в нем для получения данных
    coros = [get_data(pool) for _ in range(5)]
    # {list: 5} [[<Record id=0>, ...], [...], [...], [...], [...]]
    #vibo: зовем gather и получаем все данные - это работает благодаря асинхронной очереди
    series = await asyncio.gather(*coros)
    await pool.close()

#asyncio.run(main())
await main()

**Пулы соединений: подводим итоги**
- экономят время на подключение к PostgreSQL (при запросе от клиента обработчик сразу плучает готовое к работе соединение из очереди)
- позволяют контролировать количество одновременных подключений, в лучшем случа использует минимальное необходимое количество соединений
- сглаживает резкие пики нагрузки
- RPS(чтение) = секунда / (время на 1 запрос * количество соединений) при 50 ms/запрос с 10 подклчениями лучшим возможным результатом будет 200 RPS

In [63]:
#vibo: раньше было 20 RPS на одно соединение, то на 10 будет 200 RPS

n = 10 #количество подключений
RPS = (1 / (50 * 0.001)) * n #vibo: мили сек
RPS

200.0

**Что происходит при наращивании количества инстансов приложения**

Становится все больше и больше соединений. Мы начинае видеть ошибки PostgreSQL. Пора увеличивать количество подключений (max_connections, по умолчанию 100).

**Что происходит  при наращивании количества подключений**
- из-за сетевых задержек и времени обработки результатов OLTP запросов (быстрые) приложениями соединения могут простаивать большую чатсь времени (для справкиесть еще OLAP - аналитические запросы, более тяжлые, более долго обрабатываются)
- всплески нагрузки требуют создавать пулы соединений с запасом, соединения простаивают
- масштабирование PostgreSQL (вертикальное или горизонтальное) с большим % простаивающих соединеий (и, соответсвенно, ресурсов) - дорого
- растут расходы на териминирование SSL у PostgreSQL и клиентов

**Очень большое количество соединений снижает производительность... уменьшаем количество соединений**

**Как лучше утилизировать подключения и сократить простаивающие**

#### **=== П У Л Е Р Ы ===**

**Прокси пулеры:**
- PgBouncer (самый популярный)
- Odyssey (более продвинутая верся PgBouncer)
- Pgpool II
- Crunchy-Proxy

**PgBouncer: режим работы**
- session pooling (режим сессии - это не сессии SQLAlchemy, а сессии с которыми работаем в PostgreSQL)
- transaction pooling (режим транзакции)
- statement pooling (режим стейтмент)

**Зачем нужны балансировщики** - если хотим обновить бд, в балансровщиках можно все сообщения поставить на паузу.

**PgBouncer и SSL** - мы поставили PgBouncer после PgBouncera, чтобы ты мог использовать пул пока ты используешь пул (с)

**PgBouncer: подведем итоги**
- позволяет сократить количество простаиващих подключений (и ресурсов) к PostgreSQL для мелких транзакций/запросов
- дает возможность перезагружать/обновлять PostgreSQL без отключения клиентов
- может потреблять только 1 ядро, плохо масштабируется (особенно с SSL), именно поэтому нам нужно много PgBouncer
- много уровневый каскад с SO_REUSEPORT справляется с SSL лучше, о дороже в поддержке и все рано есть проблемы

**Odyssey: пулер соединений от Яндекса**
- умеет утилизировать несколько CPU, что важно, когда начинается SSL (не нужно поддерживать каскад PgBouncer)
- позволяет указывать индивидуальный режим изоляции на базу и каждого отдельног пользователя (в PgBouncer жесткая конфигурация - режим работы баунсера один для всех пользователей и всех баз)
- умеет возвращать понятные ошибки
- умеет очень много всего, активно развиваетсч

#### **=== Р Е П Л И К А Ц И Я ===**

**Как масштабируются реляционные базы данных**
- вертикально (берем сервер помощнее)
- устанавливаем несколько серверов и реплицируем между ними данные

Например, два сервера Master и Replica, на Master идут запросы на запись, на Replica идут запросы на чтение

**Реплики бывают асинхронные и синхронные:**
- ***асинхронная репликация***: возможна небольшая задержка между подтвержением транзакции на ведущем сервере и появлением этих изменений на резервном; позволяет завершать транзакции быстрее, но в случае краха СУБД последние изменения могут быть потеряны; подходит дял OLAP, pg_dump и проч
- ***синхронная реаликация***: гарантирует, что подтвержденная сервером транзакция будет защищена, даже если сразу после этого произойдет крах мастера; для коротких транакций основной составляющей общего времени транзакции будет задержка синхронизации 


**Как выглядит обычный PostgreSQL кластер в Яндексе (Yandex Cloud)**
- мастер (DC1)
- синхронная реплика (DC2)
- асинхронная реплика (DC3)

**Можно ли автоматизировать переключение мастера и реплики**
- если какой-нибудь из хостов в кластере сломался (особенно масте) - сервис лежит, исправление руками занимает время
- когда кластеров больше 10.000, руками их не починишь, нужна автоматика
- если в случаее падения одного из хостов кластеров автоматически перестроится (кто-то станет новым мастером, а кто-то синхронной репликой), как приложению понять, кто реплика, а кто матсер?

**Как приложению отличить мастер и реплику**

In [64]:
import psycopg2

conn = psycopg2.connect(
    "postgresql://postgres:hackme@localhost"
)
with conn.cursor() as cur:
    cur.execute("SHOW transaction_read_only")
    print(cur.fetchone())

('off',)


Если не read_only ('off') значит мастер

**Что начет libpq и target_sessin)attrs**

In [65]:
import psycopg2

psycopg2.connect(
    'postgresql://postgres:hackme@localhost'
    "?target_session_attrs=read-write"
)

<connection object at 0x7f50ef2f6e00; dsn: 'user=postgres password=xxx host=localhost target_session_attrs=read-write', closed: 0>

**HASQL - запуск**

In [None]:
from halsql.psycopg3 import PoolManager

host = ",".join([
    "master-host:5432", "replica-host-1:5432", "replica-host-2:5432"
])
multihost_dsn - f"postgresql://user:password@{hosts}/dbname"

async def create_pool(dsn) -> PoolManager:
    pool = PoolManager(multihost_dsn)

    # Waiting for 1 master and 1 replica will be avilable
    await pool_manager.read(masters_count=1, replicas_count=1)
    return pool

**HASQL - получение соединения к мастеру**

In [None]:
async def do_writes():
    pool = await creat_pool(multihost_dsn)
    async with pool.acquire(read_only=False) as connection:
        ...

async def do_writes():
    pool = await creat_pool(multihost_dsn)
    async with pool.acquire_master() as connection:
        ...