# Начало

Моя предметная область - гео-данные. Подготовим датасет с каггла

In [1]:
import kagglehub
import psycopg2
import pandas as pd

import csv
from io import StringIO

from time import sleep, time
from threading import Thread

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# https://www.kaggle.com/datasets/sobhanmoosavi/us-accidents/data
PATH = kagglehub.dataset_download('sobhanmoosavi/us-accidents')

In [3]:
data = pd.read_csv(f'{PATH}/US_Accidents_March23.csv')

print(f'Columns of the dataset: {data.columns}')
data = data[['Start_Lat', 'Start_Lng', 'State', 'Description', 'Severity']].rename(columns={
    'Start_Lat': 'lat',
    'Start_Lng': 'lng',
    'State': 'state',
    'Description': 'description',
    'Severity': 'sev',
})
print(f'New columns of the dataset: {data.columns}')
# for brin index
data.sort_values('lat', inplace=True, ascending=True)
data.head()

Columns of the dataset: Index(['ID', 'Source', 'Severity', 'Start_Time', 'End_Time', 'Start_Lat',
       'Start_Lng', 'End_Lat', 'End_Lng', 'Distance(mi)', 'Description',
       'Street', 'City', 'County', 'State', 'Zipcode', 'Country', 'Timezone',
       'Airport_Code', 'Weather_Timestamp', 'Temperature(F)', 'Wind_Chill(F)',
       'Humidity(%)', 'Pressure(in)', 'Visibility(mi)', 'Wind_Direction',
       'Wind_Speed(mph)', 'Precipitation(in)', 'Weather_Condition', 'Amenity',
       'Bump', 'Crossing', 'Give_Way', 'Junction', 'No_Exit', 'Railway',
       'Roundabout', 'Station', 'Stop', 'Traffic_Calming', 'Traffic_Signal',
       'Turning_Loop', 'Sunrise_Sunset', 'Civil_Twilight', 'Nautical_Twilight',
       'Astronomical_Twilight'],
      dtype='object')
New columns of the dataset: Index(['lat', 'lng', 'state', 'description', 'sev'], dtype='object')


Unnamed: 0,lat,lng,state,description,sev
540549,24.5548,-81.780472,FL,Main roadway closed due to crash on Flagler Av...,2
2107213,24.555269,-81.803993,FL,Lane blocked due to accident on Whitehead St a...,2
2002499,24.5574,-81.805573,FL,Lane blocked due to accident on Whitehead St a...,2
774094,24.559731,-81.783669,FL,Accident on US-1 Roosevelt Blvd at FL-C5A 1st St.,2
2126069,24.55987,-81.793983,FL,Lane blocked due to accident on Southard St at...,2


In [4]:
data.describe()

Unnamed: 0,lat,lng,sev
count,7728394.0,7728394.0,7728394.0
mean,36.20119,-94.70255,2.212384
std,5.076079,17.39176,0.4875313
min,24.5548,-124.6238,1.0
25%,33.39963,-117.2194,2.0
50%,35.82397,-87.76662,2.0
75%,40.08496,-80.35368,2.0
max,49.0022,-67.11317,4.0


In [5]:
def get_db_connection():
    return psycopg2.connect(
        dbname='db',
        user='username',
        password='password',
        host='localhost'
    )


conn = get_db_connection()
cursor = conn.cursor()

In [6]:
# drop everything
cursor.execute('''
    DROP SCHEMA public CASCADE;
    CREATE SCHEMA public;
    GRANT ALL ON SCHEMA public TO username;
    GRANT ALL ON SCHEMA public TO public;
''')
conn.commit()

cursor.execute('''
    CREATE TABLE data (
        id SERIAL PRIMARY KEY,
        lat FLOAT,
        lng FLOAT,
        state TEXT,
        description TEXT,
        sev INTEGER
    )
''')
conn.commit()

output = StringIO()
data.to_csv(
    output, sep='\t', header=False, index=False, quoting=csv.QUOTE_NONE, escapechar='\\'
)
output.seek(0)

cursor.copy_expert(
    'COPY data (lat, lng, state, description, sev) FROM STDIN', output
)
conn.commit()

# 1.1: Типы индексов и их использование на практике

In [7]:
def time_filter_sev(sev: int) -> tuple[float, str]:
    s = time()

    cursor.execute('''
        EXPLAIN ANALYZE
        SELECT * FROM data WHERE sev > %s
    ''', (sev,))
    plan = '\t\t' + '\n\t\t'.join(row[0] for row in cursor.fetchall())

    e = time()

    return (e - s, plan)


def time_filter_lat(lat_range: tuple[float, float]) -> tuple[float, str]:
    s = time()

    cursor.execute('''
        EXPLAIN ANALYZE
        SELECT * FROM data WHERE lat BETWEEN %s AND %s
    ''', lat_range)
    plan = '\t\t' + '\n\t\t'.join(row[0] for row in cursor.fetchall())

    e = time()

    return (e - s, plan)


def time_filter_description(word: str) -> tuple[float, str]:
    s = time()

    cursor.execute(f'''
        EXPLAIN ANALYZE
        SELECT * FROM data WHERE description LIKE %s
    ''', (f'*{word}*',))
    plan = '\t\t' + '\n\t\t'.join(row[0] for row in cursor.fetchall())

    e = time()

    return (e - s, plan)

## Запросы без индексов

In [8]:
(t_sev_no_index, p_sev_no_index) = time_filter_sev(2)
(t_lat_no_index, p_lat_no_index) = time_filter_lat((28.5, 29.3))
(t_description_no_index, p_description_no_index) = time_filter_description('crash')

print('Время запросов и план запроса без индексов:')
print(f'\tБез BTREE: {t_sev_no_index} сек, план:\n{p_sev_no_index}')
print(f'\tБез BRIN: {t_lat_no_index} сек, план:\n{p_lat_no_index}')
print(
    f'\tБез GIN: {t_description_no_index} сек, план:\n{p_description_no_index}')

Время запросов и план запроса без индексов:
	Без BTREE: 0.808307409286499 сек, план:
		Seq Scan on data  (cost=0.00..236450.62 rows=2942497 width=88) (actual time=2.339..763.952 rows=1504047 loops=1)
		  Filter: (sev > 2)
		  Rows Removed by Filter: 6224347
		Planning Time: 0.094 ms
		JIT:
		  Functions: 2
		  Options: Inlining false, Optimization false, Expressions true, Deforming true
		  Timing: Generation 0.167 ms, Inlining 0.000 ms, Optimization 0.178 ms, Emission 2.051 ms, Total 2.395 ms
		Execution Time: 806.931 ms
	Без BRIN: 0.17760944366455078 сек, план:
		Gather  (cost=1000.00..186692.51 rows=44137 width=88) (actual time=27.106..174.450 rows=113840 loops=1)
		  Workers Planned: 2
		  Workers Launched: 2
		  ->  Parallel Seq Scan on data  (cost=0.00..181278.81 rows=18390 width=88) (actual time=12.249..152.507 rows=37947 loops=3)
		        Filter: ((lat >= '28.5'::double precision) AND (lat <= '29.3'::double precision))
		        Rows Removed by Filter: 2538185
		Planning Time:

Добавим индексы

In [9]:
cursor.execute('''
    CREATE INDEX idx_btree_sev ON data USING BTREE (sev)
''')
conn.commit()

cursor.execute('''
    CREATE INDEX idx_brin_lat ON data USING BRIN (lat)
''')
conn.commit()

cursor.execute('''
    CREATE EXTENSION pg_trgm;
    ALTER EXTENSION pg_trgm SET SCHEMA public;
    CREATE INDEX idx_gin_description ON data USING GIN (description gin_trgm_ops);
''')
conn.commit()

## Запросы с индексами

In [10]:
(t_sev_index, p_sev_index) = time_filter_sev(2)
(t_lat_index, p_lat_index) = time_filter_lat((28.8, 29.3))
(t_description_index, p_description_index) = time_filter_description('crash')

print('Время запросов и план запроса c индексами:')
print(f'\tBTREE: {t_sev_index} сек, план:\n{p_sev_index}')
print(f'\tBRIN: {t_lat_index} сек, план:\n{p_lat_index}')
print(
    f'\tGIN: {t_description_index} сек, план:\n{p_description_index}')

Время запросов и план запроса c индексами:
	BTREE: 0.5499238967895508 сек, план:
		Seq Scan on data  (cost=0.00..222711.92 rows=2576131 width=88) (actual time=1.573..523.913 rows=1504047 loops=1)
		  Filter: (sev > 2)
		  Rows Removed by Filter: 6224347
		Planning Time: 0.166 ms
		JIT:
		  Functions: 2
		  Options: Inlining false, Optimization false, Expressions true, Deforming true
		  Timing: Generation 0.215 ms, Inlining 0.000 ms, Optimization 0.143 ms, Emission 1.328 ms, Total 1.685 ms
		Execution Time: 548.802 ms
	BRIN: 0.17094087600708008 сек, план:
		Gather  (cost=1000.00..179273.66 rows=38642 width=88) (actual time=36.039..169.489 rows=28284 loops=1)
		  Workers Planned: 2
		  Workers Launched: 2
		  ->  Parallel Seq Scan on data  (cost=0.00..174409.46 rows=16101 width=88) (actual time=18.155..148.281 rows=9428 loops=3)
		        Filter: ((lat >= '28.8'::double precision) AND (lat <= '29.3'::double precision))
		        Rows Removed by Filter: 2566703
		Planning Time: 0.086 ms


In [11]:
print('Разница между запросами с индексами и без:')
print(f'\tBTREE: {t_sev_no_index - t_sev_index:.02} сек')
print(f'\tBRIN: {t_lat_no_index - t_lat_index:.02} сек')
print(f'\tGIN: {t_description_no_index - t_description_index:.02} сек')

Разница между запросами с индексами и без:
	BTREE: 0.26 сек
	BRIN: 0.0067 сек
	GIN: -0.24 сек


## Вывод
Даже на таком маленьком датасете, есть прирост к скорости выполнения запросов, с его увеличением ускорение будет тоже расти

Еще интересное наблюдение, если брать слишком большой рендж для колонки, индексированной с помощью BRIN, то БД не будет использовать индекс, так как быстрее будет искать как обычно

Чтобы этого избежать, можно настроить размер страницы индекса (сделать ее больше)

# 1.2: Транзакции в PostgreSQL: виды и использование на практике

_Для того, чтобы увидеть, аномалии, замените уровни изоляции на более слабые. Вы увидете в stdout `EPIC FAIL`_

## READ COMMITTED

In [12]:
def update_severity_read_committed(id: int, new_sev: int):
    try:
        cursor.execute('SET TRANSACTION ISOLATION LEVEL READ COMMITTED')
        cursor.execute('BEGIN')

        cursor.execute('SELECT sev FROM data WHERE id = %s FOR UPDATE', (id,))
        old_sev = cursor.fetchone()

        cursor.execute('UPDATE data SET sev = %s WHERE id = %s', (new_sev, id))

        cursor.execute('SELECT sev FROM data WHERE id = %s', (id,))
        updated_sev = cursor.fetchone()

        if updated_sev != old_sev:
            print('READ COMMITTED работает!')
        else:
            print('EPIC FAIL')

        cursor.execute('COMMIT')
    except Exception as e:
        conn.rollback()
        print('Ошибка:', e)


update_severity_read_committed(1, 10)

READ COMMITTED работает!


## REPEATABLE READ

In [13]:
def update_severity_repeatable_read(sev: int):
    conn = get_db_connection()
    cursor = conn.cursor()

    try:
        cursor.execute('SET TRANSACTION ISOLATION LEVEL REPEATABLE READ')
        cursor.execute('BEGIN')

        cursor.execute('INSERT INTO data (sev) VALUES (%s)', (sev,))
        print(f't1: inserted sev = {sev}')
        cursor.execute('COMMIT')
        print(f't1: commit')

        sleep(2)

        # cleanup
        cursor.execute('BEGIN')
        cursor.execute('DELETE FROM data WHERE sev = %s', (sev,))
        print(f't1: deleted sev = {sev}')
        cursor.execute('COMMIT')
        print(f't1: commit')
    except Exception as e:
        conn.rollback()
        print('Ошибка:', e)


def check_repeatable_read(sev: int):
    conn = get_db_connection()
    cursor = conn.cursor()

    try:
        cursor.execute('SET TRANSACTION ISOLATION LEVEL REPEATABLE READ')
        cursor.execute('BEGIN')
        sleep(1)

        cursor.execute('SELECT sev FROM data WHERE sev = %s', (sev,))
        res1 = cursor.fetchone()
        print(f't2: res1 = {res1}; sev = {sev}')

        sleep(2)

        cursor.execute('SELECT sev FROM data WHERE sev = %s', (sev,))
        res2 = cursor.fetchone()
        print(f't2: res2 = {res2}; sev = {sev}')
        if res2 == res1:
            print('REPEATABLE READ работает!')
        else:
            print('EPIC FAIL')

        cursor.execute('COMMIT')
    except Exception as e:
        conn.rollback()
        print('Ошибка:', e)


id = 1
sev = 500

t1 = Thread(target=lambda: update_severity_repeatable_read(sev))
t2 = Thread(target=lambda: check_repeatable_read(sev))

t1.start()
t2.start()

t1.join()
t2.join()

t1: inserted sev = 500
t1: commit
t2: res1 = (500,); sev = 500
t1: deleted sev = 500
t1: commit
t2: res2 = (500,); sev = 500
REPEATABLE READ работает!


## SERIALIZABLE

In [14]:
SEV_SUM = 17098187


def update_severity_serializable_1(id: int):
    conn = get_db_connection()
    cursor = conn.cursor()

    try:
        cursor.execute('SET TRANSACTION ISOLATION LEVEL SERIALIZABLE')
        cursor.execute('BEGIN')

        sleep(1)

        cursor.execute('SELECT sev FROM data WHERE id = %s', (id,))
        old_sev = cursor.fetchone()[0]
        cursor.execute('SELECT SUM(sev) FROM data')
        ssev = cursor.fetchone()[0]
        print(f't1: ssev = {ssev}')

        if ssev == SEV_SUM:
            cursor.execute(
                'UPDATE data SET sev = sev + 1 WHERE id = %s', (id,))
            print(f't1: updated at id = {id}')
        sleep(2)
        cursor.execute('COMMIT')
        print(f't1: commit')

        sleep(3)

        # cleanup
        cursor.execute('BEGIN')
        cursor.execute('UPDATE data SET sev = %s WHERE id = %s', (old_sev, id))
        cursor.execute('COMMIT')
    except Exception as e:
        conn.rollback()
        print('Ошибка:', e)


def update_severity_serializable_2(id: int):
    conn = get_db_connection()
    cursor = conn.cursor()

    try:
        cursor.execute('SET TRANSACTION ISOLATION LEVEL SERIALIZABLE')
        cursor.execute('BEGIN')

        sleep(2)

        cursor.execute('SELECT sev FROM data WHERE id = %s', (id,))
        old_sev = cursor.fetchone()[0]
        cursor.execute('SELECT SUM(sev) FROM data')
        ssev = cursor.fetchone()[0]
        print(f't2: ssev = {ssev}')

        if ssev == SEV_SUM:
            cursor.execute(
                'UPDATE data SET sev = sev + 1 WHERE id = %s', (id,))
            print(f't2: updated at id = {id}')
        sleep(2)
        cursor.execute('COMMIT')
        print(f't2: commit')

    except Exception as e:
        conn.rollback()
        print('Ошибка:', e)


id = 2
sev = 15

t1 = Thread(target=lambda: update_severity_serializable_1(id))
t2 = Thread(target=lambda: update_severity_serializable_2(id + 1))

t1.start()
t2.start()

t1.join()
t2.join()

cursor.execute('SELECT SUM(sev) FROM data')
conn.commit()

if cursor.fetchone()[0] != SEV_SUM:
    print('EPIC FAIL')
else:
    print('SERIALIZABLE работает!')

t1: ssev = 17098187
t1: updated at id = 2
t2: ssev = 17098187
t2: updated at id = 3
t1: commit
Ошибка: could not serialize access due to read/write dependencies among transactions
DETAIL:  Reason code: Canceled on identification as a pivot, during commit attempt.
HINT:  The transaction might succeed if retried.

SERIALIZABLE работает!


# 1.3: Использование расширений PostgreSQL для полнотекстового поиска и криптографических операций

## `pg_trgm`

`pg_trgm` -- расширение postgresql, позволяющее искать в тексте эффективно, строя три-граммы

n-граммы -- последовательности из n символов *(3 буквоцифры в случае `pg_trgm`)* в определенном порядке

Это расширение привносит в БД большой функционал работы с текстом, в том числе индексирование текста при помощь `GIN`

## `pg_bigm`

`pg_bigm` -- брат `pg_trgm`, которые считает би-граммы, суть та же, но немного отличается моментами. Например в запросах с ключевыми словами длиной 1-2 символа он быстрее, однако `pg_trgm` может искать с опечатками, а `pg_bigm` только по точным частичным совпадениям

In [15]:
(t1, p) = time_filter_description('cras')
print(f'Результаты pg_trgm:\n\t{t1} сек, план:\n{p}')

cursor.execute('''
    DROP INDEX IF EXISTS idx_gin_description
''')
cursor.execute('CREATE EXTENSION IF NOT EXISTS pg_bigm')
cursor.execute('''
    CREATE INDEX IF NOT EXISTS idx_bigm_description 
    ON data USING GIN (description gin_bigm_ops)
''')
conn.commit()

(t2, p) = time_filter_description('cras')
print(f'Результаты pg_bigm:\n\t{t2} сек, план:\n{p}')

print(f'\nРазница -- {abs(t2 - t1):.03} сек')

Результаты pg_trgm:
	0.037453413009643555 сек, план:
		Bitmap Heap Scan on data  (cost=443.48..83142.92 rows=38642 width=88) (actual time=36.817..36.818 rows=0 loops=1)
		  Recheck Cond: (description ~~ '*cras*'::text)
		  Rows Removed by Index Recheck: 26843
		  Heap Blocks: exact=7515
		  ->  Bitmap Index Scan on idx_gin_description  (cost=0.00..433.81 rows=38642 width=0) (actual time=25.896..25.896 rows=26843 loops=1)
		        Index Cond: (description ~~ '*cras*'::text)
		Planning Time: 0.104 ms
		Execution Time: 36.842 ms
Результаты pg_bigm:
	0.0007367134094238281 сек, план:
		Bitmap Heap Scan on data  (cost=751.48..83450.92 rows=38642 width=88) (actual time=0.138..0.139 rows=0 loops=1)
		  Recheck Cond: (description ~~ '*cras*'::text)
		  ->  Bitmap Index Scan on idx_bigm_description  (cost=0.00..741.81 rows=38642 width=0) (actual time=0.137..0.137 rows=0 loops=1)
		        Index Cond: (description ~~ '*cras*'::text)
		Planning Time: 0.165 ms
		Execution Time: 0.160 ms

Разница -

## `pgcrypto`

`pgcrypto` -- расширение, вносящее в БД криптографические средства

В моем курсовом проекте можно использовать это расширение для хеширования паролей аккаунтов

In [16]:
cursor.execute('CREATE EXTENSION IF NOT EXISTS pgcrypto')
cursor.execute('''
    CREATE TABLE IF NOT EXISTS users (
        id SERIAL PRIMARY KEY,
        username VARCHAR(50) UNIQUE NOT NULL,
        password_hash TEXT NOT NULL
    )
''')

cursor.execute('''
    INSERT INTO users (username, password_hash) 
    VALUES (%s, crypt(%s, gen_salt('bf')))
''', ('admin', 'qwerty123'))
conn.commit()