# `Практикум по программированию на языке Python`
<br>

## `Занятие 15: Методы повышения эффективности кода`
<br><br>

### Ноутбук подготовлен на основе материалов
### `Мурата Апишева (mel-lain@yandex.ru)`
**Источник:** https://github.com/MelLain/mipt-python
[Запись 2022](https://www.youtube.com/watch?v=IzejCnDXrxg&t=2334s&ab_channel=%D0%9A%D0%B0%D1%84%D0%B5%D0%B4%D1%80%D0%B0%D0%9C%D0%9C%D0%9F)

#### `Москва, 2025`

In [1]:
import warnings
warnings.filterwarnings('ignore')

### `Производительность Python`

- Быстродействие кода на Python существенно уступает коду, написанному на компилируемых языках<br><br>
- Но Python очень удобен и популярен, поэтому сделано много попыток его ускорения<br><br>
- Популярен подход с написанием основного кода на C/C++/CUDA и предоставлением пользователю удобного интерфейса к нему в Python (numpy, pycuda, библиотеки для глубинного обучения)<br><br>
- Однако часто хочется получать эффективный код прямо в Python

### `Возможные методы повышения производительности`

- Многопоточность и многопроцессорность
- Альтернативные реализации Python: Pypy, Jython, Iron Python
- Библиотека Numba
- Расширение Cython
- Фреймворки Hadoop/Spark

### `Процесс и поток`

- __Процесс__ - это единица приложения, работающая в операционной системе, имеет собственную изолированную память
- __Поток__ - вычислительная единица внутри процесса, работает в памяти процесса, имеет собственный код для выполнения<br><br>
- В процессе может быть >= 1 потока
- Одно ядро процессора в моменте времени может выполнять код одного потока
- Если потоков много, а ядро одно, то оно будет переключаться между ними, создавая иллюзию параллельности для пользователя
- Если и потоков, и ядер много, то потоки могут действительно выполняться параллельно
- Аналогичным образом работает параллелизм между процессами

### `Мотивация` 

Рассмотрим простенький пример с обкачкой статей Хабра, однако не будем это делать в лоб

In [2]:
import requests
import time 

from bs4 import BeautifulSoup # удобная библиотека для парсинга html
from multiprocessing.dummy import Pool as ThreadPool

In [3]:
base_url = "https://habr.com/ru/articles/"

def get_page(url, n_attempts=5, t_sleep=1, headers={}, **kwargs):
    for i in range(n_attempts):
        page = requests.get(url, headers=headers)
        if page.status_code == 200:
            return page
        time.sleep(t_sleep)
    print("DEBUG: url doesn't load", url)
    return None

def get_habr_artical(artical_number):
    
    page = get_page(f"{base_url}{artical_number}")
    if page:
        return page.content
    return None

In [4]:
%%time

page = get_page(f"{base_url}{1}")

CPU times: user 18.4 ms, sys: 14.8 ms, total: 33.2 ms
Wall time: 440 ms


In [5]:
%%time
N = 20

for i in range(1, N):
    get_habr_artical(i)

DEBUG: url doesn't load https://habr.com/ru/articles/3
DEBUG: url doesn't load https://habr.com/ru/articles/5
DEBUG: url doesn't load https://habr.com/ru/articles/15
DEBUG: url doesn't load https://habr.com/ru/articles/16
CPU times: user 883 ms, sys: 25.8 ms, total: 909 ms
Wall time: 38.9 s


In [8]:
%%time
N = 20

with ThreadPool(10) as pool:
    pages = pool.map(get_habr_artical, range(1, N))

DEBUG: url doesn't load https://habr.com/ru/articles/15
DEBUG: url doesn't load https://habr.com/ru/articles/16
DEBUG: url doesn't load https://habr.com/ru/articles/3
DEBUG: url doesn't load https://habr.com/ru/articles/5
CPU times: user 682 ms, sys: 83.2 ms, total: 765 ms
Wall time: 8.02 s


In [9]:
soups = [BeautifulSoup(page, 'lxml') for page in pages if page]
titles = [soup.title.text for soup in soups]

len(titles), titles

(15,
 ['Wiki-FAQ для Хабрахабра / Хабр',
  'Мы знаем много недоделок на сайте… но! / Хабр',
  'Маслов, Сокур и партнеры пиарят Google в России / Хабр',
  'Подкасты на Хабрахабре / Хабр',
  'Самопроизвольное разлогинивание / Хабр',
  'Неожиданная популярность ресурсов среди региональных аудиторий / Хабр',
  'А нужно ли удаление? / Хабр',
  'Пометки у новых комментариев / Хабр',
  'Stand-along cообщества против сообществ в рамках социальных сетей / Хабр',
  'Поиск по сайту / Хабр',
  'Рекомендации в профиле – что это такое? / Хабр',
  'Миграция чистых социальных сетей в направлении stand-along сообществ / Хабр',
  'Все домены Гугла / Хабр',
  'Теги — тэги — таги / Хабр',
  'RSS комментариев / Хабр'])

### `Создание потока в Python`

- Для работы с потоками в Python есть стандартный модуль threading
- Основным классом является `Thread` (поток)

In [1]:
import threading

Функциональное создание потока:

In [2]:
def thread_function(x):
    print(x ** 3)

thread = threading.Thread(target=thread_function, args=(10, ))
thread.start()
thread.join()

1000


- Потоку передается функция, которую он должен выполнить, и аргументы
- После запуска `start` он будет выполняться до выхода из функции или ошибки
- Запустить его второй раз не получится (RuntimeError)

### `Создание потока в Python`

- Поток также можно описать в виде класс-наследника `Thread`
- Класс должен определять метод `run`

In [4]:
class MyThread(threading.Thread):
    def __init__(self, x, power):
        threading.Thread.__init__(self)
        self.x = x
        self.power = power

    def thread_function(self):
        print(self.x ** self.power)

    def run(self):
        self.thread_function()

print_thread = MyThread(x=10, power=3)
print_thread.start()

1000


### `Потоки-демоны`

- Потоки могут выполняться в обычном и фоновом (daemon) режимах
- Потоки-демоны используются для выполнения фоновых задач (отправка ping-ов, очистка мусора), они имеют смысл только в процессе существования основного потока
- При своем завершении процесс будет дожидаться окончания работы обычных потоков
- Потоки-демоны будут автоматически уничтожены при завершении процесса
- Такие потоки можно создавать и дальше за ними не следить, обычные же потоки нужно явно завершать<br><br>

Создание потока в режиме демона:

In [5]:
thread = threading.Thread(target=thread_function, args=(10, ), daemon=True)

### `Основные методы и атрибуты класса Thread`

Методы:

- `start` - запуск потока
- `run` - выполняет код потока
- `join` - блокирует вызывающий поток до завершения потока, у которого вызван `join`
- `is_alive` - возвращает флаг незавершенности потока

Атрибуты:
- `name` - имя потока
- `daemon` - флаг потоком фоновый или обычный
- `ident` - идентификатор потока, пока не вызван `start` он `None`.

### `Пример запуска потока`

In [3]:
import time

def wait(s):
    print('wait started')
    time.sleep(s)
    print('wait finished')

thread = threading.Thread(target=wait, args=(10, ), name='Wait Thread')
thread.start()

print('Is alive: ', thread.is_alive())
print('Is daemon: ', thread.daemon)
print('Thread name: ', thread.name)
print('Thread identificator: ', thread.ident)
thread.join()
print('Is alive: ', thread.is_alive())

wait startedIs alive:  True
Is daemon:  False
Thread name:  Wait Thread
Thread identificator:  6273904640

wait finished
Is alive:  False


### `Доступ к разделяемым ресурсам`

- Потоки не имеют своей памяти и все работают с одной памятью родительского процесса
- Порядок выполнения потоков недетерминирован, в многопоточной среде несколько потоков могут одновременно получить доступ к одному ресурсу
- Без надлежащего контроля это может привести к ошибкам и падению процесса
- Потоки нельзя просто так останавливать извне, поэтому ограничения нужно наложить заранее, до запуска<br><br>

- Для контроля за доступом к ресурсам есть несколько инструментов:
    - Мьютекс (Mutex, Lock)
    - Рекурсивный мьютекс (RLock)
    - Семафор (Semaphore)
    - Событие (Event)
    - Условная переменная (Condition variable)
    - Барьер (Barrier)

### `Простая блокировка Lock`

- Mutex - это объект, который может в каждый момент времени быть занят не более чем одним потоком
- Все потоки проверяют его на занятость и, если он занят, ожидают освобождения

In [7]:
mutex = threading.Lock()

def thread_safe_function():
    mutex.acquire()
    # Critical section
    mutex.release()

Для избежания взаимных блокировок (deadlocks) освобождение mutex нужно делать всегда, в т.ч. при ошибках и исключениях

### `Пример использования Mutex`

Сравним потокобезопасный и непотокобезопасный коды в задаче инкремента разделяемого ресурса ([источник](https://geekbrains.ru/posts/python_multithreading_pt2))

In [6]:
protected_resource, unprotected_resource = 0, 0
mutex = threading.Lock()
NUM = 100000

In [7]:
def safe_plus():
    global protected_resource
    for i in range(NUM):
        with mutex:
            protected_resource += 1

def risky_plus():
    global unprotected_resource
    for i in range(NUM):
        unprotected_resource += 1

def safe_minus():
    global protected_resource
    for i in range(NUM):
        with mutex:
            protected_resource -= 1

def risky_minus():
    global unprotected_resource
    for i in range(NUM):
        unprotected_resource -= 1

### `Пример использования Mutex`

In [10]:
thread1 = threading.Thread(target=safe_plus)
thread2 = threading.Thread(target=safe_minus)
thread3 = threading.Thread(target=risky_plus)
thread4 = threading.Thread(target=risky_minus)

thread1.start(), thread2.start(), thread3.start(), thread4.start()
thread1.join(), thread2.join(), thread3.join(), thread4.join()

print (f'Threadsafe result: {protected_resource}')
print (f'Non-threadsafe result: {unprotected_resource}')

Threadsafe result: 0
Non-threadsafe result: 0


### `Прочие инструменты контроля доступа`

- Рекурсивный мьютекс - такой же, как и обычный, но один поток может многократно входить в критическую секцию<br><br>
- Семафор - дает доступ к ресурсу заданному числу потоков<br><br>
- Событие - блокирует ресурс до выполнения некоторого условия, о котором уведовляет один из потоков<br><br>
- Условная переменная - обертка типа Event вокруг других блокировок для блокирования по определенным событиям<br><br>
- Барьер - создает точку синхронизации для заданного числа потоков, пока они все не дойдут до него, никто дальше не выполняется

### `Проблемы с многопоточностью в Python`

- Существует несколько реализаций Python, наиболее популярная (стандартная) - CPython
- Исторически CPython был реализован с глобальной блокировкой потоков Global Interpreter Lock (GIL)

Что такое GIL:
- GIL - это блокировка, которая не позволяет в каждый момент времени выполняться не более чем одному потоку
- Для этого через равные промежутки времени (100 тиков) интерпретатор усыпляет работающий поток и дает возможность поработать один промежуток какому-то иному потоку
- Остановка происходит даже в том случае, если поток всего один

Зачем нужен GIL:
- Реализация менеджмента памяти и сборки мусора в CPython не является потокобезопасной
- Без него C-расширения языка, которые активно используются, не смогут нормально работать
- За счет отсутствия необходимости учета разных ссылок на объекты из разных потоков однопоточные приложения выполняются с GIL быстрее

Где еще есть GIL:
- В Jython и IronPython GIL нет
- В PyPy есть
- В Cython есть, но может выключаться на секцию кода

### `К чему приводит наличие GIL`

- Многопоточные приложения, ориентированные на использование вычислений на CPU (не I/O операций) не могут работать параллельно<br><br>
- Вычисления не только не становятся быстрее, но и сильно замедляются при увеличении числа потоков, поскольку каждое переключение требует большого числа системных вызовов<br><br>
- Замедление на многоядерном процессоре будет еще больше, чем на одноядерном, поскольку интерпретатор не синхронизирован с ОС (он хочет разместить все на одном ядре, а ОС пытается разложить потоки на разные ядра)<br><br>
- В интерпретаторе нет своего расписания очереди потоков, используется расписание ОС, это приводит к проблемам при обработке сигналов

### `Реакция на сигналы ОС`

- В Python сигналы могут обрабатываться только в главном потоке<br><br>
- При этом интерпретатор не может сам потребовать, чтобы на следующем промежутке включился главный поток<br><br>
- Он пытается переключать потоки с GIL не раз в 100 тиков, а каждый тик<br><br>
- В результате выполнение кода может замедлиться, и до перехода в главный поток Python не реагирует на сигналы<br><br>
- Помимо этого, внутри одного тика интерпретатор невозможно прервать<br><br>
- Тик длится не фиксированное время, а некоторый набор инструкций (по-умолчанию - 100 байткодов), в которые интерпретатор транслировал исходный код<br><br>

### `Пример проблем с GIL`

Однопоточный код:

In [11]:
def func(i):
    while i < 5e+7: i += 1

ts = time.time()
func(0)
print(f'Elapsed time: {round(time.time() - ts, 2)}')

Elapsed time: 2.62


Многопоточный код:

In [12]:
ts = time.time()
threads = []
for _ in range(2):
    thread = threading.Thread(target=func, args=(0, ))
    thread.start()
    threads.append(thread)

for thread in threads:
    thread.join()

print(f'Elapsed time: {round(time.time() - ts, 2)}')

Elapsed time: 5.27


### `Когда многопоточный код полезен`

- Независимые и не интенсивные I/O-операции с файлами, запросами или БД, логгирование<br><br>
    - время ожидание завершения работы системных вызовов, связанных с I/O-операциями относительно велико
    - поток, ожидающий этого, не меняет ничего в состоянии интерпретатора до выполнения следующего фрагмента кода
    - значит на это время он может освободить GIL и дать другому потоку продолжить выполнение<br><br>

- Отрисовка GUI

Пример с логгированием:

In [13]:
flag = True
def print_message():
    while flag:
        time.sleep(1)
        print('I print this while func works')
    
log_thread = threading.Thread(target=print_message)
log_thread.start()
func(-5e+7)
print('Finished')
flag = False

I print this while func works
I print this while func works
I print this while func works
Finished


### `Будущее GIL`

- Проблема с GIL действительно есть, однако её важность для большинства пользователей преувеличена
- Попытки избавиться от GIL предпринимались многократно, но часто проваливались из-за требования неухудшения производительности для однопоточных приложений
- Но есть и успешные перспективные примеры (https://github.com/colesbury/nogil)
- Одна из возможных альтернатив - механизм подинтерпретаторов (subinterpreters, версия Python 3.9+)
- Пробуются разные идеи, основанные на изменении методов подсчёта ссылок, аллокации памяти и принципов работы виртуальной машины

### `Многопроцессорный параллелизм`

- Процессы работают в собственных адресных пространствах и имеют каждый свои данные, в т.ч. и GIL в случае Python
- Взаимодействие между ними можно организовать также, как и для любых других процессов: через файлы (в т.ч. вирутуальные) или обмен сообщениями (multiprocessing, MPI)<br><br>

- У каждого процесса есть `pid`, который можно получить вызовом `os.getpid()`
- Воспользуемся для примера модулем multiprocessing, все похоже на потоки:

In [1]:
def func(x):
    pass

In [3]:
import os
from multiprocessing import Process

# def func(number):
    # pass
    # result = number ** 2
    # print(f'func was called by process id {os.getpid()} with number {number}')

procs, numbers = [], [5, 10, 15, 20, 25]
    
for index, number in enumerate(numbers):
    proc = Process(target=func, args=(number,))
    procs.append(proc)
    proc.start()
    
for proc in procs:
    proc.join()

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/Users/vladimirbogachev/.pyenv/versions/3.12.11/lib/python3.12/multiprocessing/spawn.py", line 122, in spawn_main
    exitcode = _main(fd, parent_sentinel)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/vladimirbogachev/.pyenv/versions/3.12.11/lib/python3.12/multiprocessing/spawn.py", line 132, in _main
    self = reduction.pickle.load(from_parent)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: Can't get attribute 'func' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/Users/vladimirbogachev/.pyenv/versions/3.12.11/lib/python3.12/multiprocessing/spawn.py", line 122, in spawn_main
    exitcode = _main(fd, parent_sentinel)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/vladimirbogachev/.pyenv/versions/3.12.11/lib/python3.12/multiprocessing/spawn.py", line 132, in _main
  

### `Блокировки для процессов`

- `multiprocessing.Lock` реализован как объект семафора в ОС вне памяти процесса, и каждый процесс может получить к нему доступ и изменять его

In [15]:
from multiprocessing import Process, Lock

def func(lock, num):
    with lock:
        print(num)

lock = Lock()
for num in range(10):
    Process(target=func, args=(lock, num)).start()

0

12
3
4
5
6
7
8
9


### `Совместный доступ к объектам`

- Передавать параметры по ссылке бессмысленно - они копируются и сериализуются для передачи в подпроцесс
- Тем не менее, можно пользоваться блоками разделяемой памяти в ОЗУ - `multiprocessing.shared_memory`
- Это относительно низкоуровневый подход, на его основе реализованы более удобные `Value` и `Array`:

In [16]:
from multiprocessing import Value, Array

def func(number, array):
    number.value = 10
    for i in range(len(array)):
        array[i] = -array[i]

number = Value('d', 0.0)
array = Array('i', range(10))

p = Process(target=func, args=(number, array))
p.start()
p.join()

print(number.value)
print(array[:])

10.0
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
I print this while func works


### `Pypy Python`

- Реализация интерпретатора Python, написанная на фреймворке RPython (синтаксис аналогичен Python)<br><br>
- Использует just-in-time (JIT) компиляцию:
    - при запуске код анализируется, наиболее часто используемые фрагменты (как циклы) переводятся в машинный код
    - в таком виде они оптимизируются
    - при выполнении исходные фрагменты заменяются на оптимизированные<br><br>
- В ряде случаев JIT позволяет в разы ускорить выполнение кода на Python, как правило, код должен выполняться длительное время<br><br>
- Цена использования JIT - более высокое потребление памяти в сложных и долго работающих процессах<br><br>
- PyPy поддерживает все конструкции соответствующей версии языка Python, но работает только с частью библиотек и C-расширений<br><br>
- JIT - не компиляция, т.е. получить на выходе исполняемый бинарный файл не получится

### `Библиотека Numba`

- Инструмент для компиляции части кода в LLVM, переводит часть кода на Python и NumPy в машинный код<br><br>
- Далее эти фрагменты кода выполняются на CPU (или GPU) минуя интерпретатор<br><br>
- Охватывает не весь язык, но может помочь в каких-то высоконагруженных секциях кода, скорость получается сопоставимой с реализацией на C/C++<br><br>
- Numba использует JIT для ускорения отдельных функций в коде<br><br>
- Библитека представлена набором декораторов и позволяет
    - оставаться в рамках синтаксиса Python
    - работать с его стандартным интерпретатором CPython<br><br>
- Все, что нужно - поставить библиотеку и правильно расставить декораторы

### `Поддерживаемые возможности Python и NumPy`

- Ссылки на актуальные версии для [Python](https://numba.pydata.org/numba-doc/dev/reference/pysupported.html) и [NumPy](https://numba.pydata.org/numba-doc/dev/reference/numpysupported.html)

Основные поддерживаемые или частично поддерживаемые Numba возможности Python:

- условные операторы
- операторы циклов
- генераторы
- исключения
- менеджеры контекстов
- функции (\*\*kwargs не поддерживается)
- передача функций в качестве аргументов

Большинство стандартных типов поддерживается, строки тоже, хотя часть их методов реализуется не очень эффективно

### `Использование Numba`

__Декораторы функций для их ускорения__:
- `@jit` - Numba ускоряет все, что поддерживается до первой неподдерживаемой операции, после весь код будет выполняться интерпретатором (даже поддерживаемый)
- `@njit`(или `@jit(nopython=True)`) - Numba будет сообщать об ошибке при работе с неподдерживаемым кодом (по возможности его стоит переписать)<br><br>
- Из ускоренных функций допускается вызов только ускоренных функций
- Ускоренные функции можно вызывать из любых функций

In [4]:
def timed(method):
    import time
    def __timed(*args, **kw):
        time_start = time.time()
        result = method(*args, **kw)
        time_end = time.time()
        
        print('{}  {:.3f} ms'.format(method.__name__, (time_end - time_start) * 1000))
        return result

    return __timed

### `Пример использования Numba`

In [5]:
import numba
import numpy as np
from numba import jit

@timed
def dot_product(x, y):
    np.dot(x, y)

@timed
@jit(cache=True, nopython=True)
def dot_product_numba(x, y):
    np.dot(x, y)

Запуск до и после кэширования кода:

In [6]:
N = 1000
x, y = np.random.random((N, N)), np.random.random((N, N))

dot_product(x, y)
dot_product_numba(x, y)
dot_product_numba(x, y)

dot_product  15.575 ms
dot_product_numba  602.335 ms
dot_product_numba  5.010 ms


Оптимизация не всегда оказывается действенной, в каждом случае нужно проверять

### `Параллелельные вычисления в Numba`

- Параметр `parallel=True` декоратора приводит поиску в коде функции участки кода, которые можно выполнить параллельно, и запускает их на нескольких ядрах ([ссылка на документацию](https://numba.pydata.org/numba-doc/dev/user/parallel.html))
- Проще всего рассматривать на примере цикла

Обычный код с циклом:

In [7]:
@timed
def two_d_array_reduction_prod(n):
    result = 2 * np.ones((13, 17), np.int_)
    tmp = 2 * np.ones_like(result)

    for i in range(n):
        result *= tmp

two_d_array_reduction_prod(10000000)
two_d_array_reduction_prod(10000000)

two_d_array_reduction_prod  2251.127 ms
two_d_array_reduction_prod  2241.606 ms


### `Параллелельные вычисления в Numba`

Обычный код с numba и код с numba и параллелизацией цикла (`prange`):

In [8]:
@timed
@jit(cache=True, nopython=True)
def two_d_array_reduction_prod_numba(n):
    result = 2 * np.ones((13, 17), np.int_)
    tmp = 2 * np.ones_like(result)

    for i in range(n):
        result *= tmp

two_d_array_reduction_prod_numba(10000000)
two_d_array_reduction_prod_numba(10000000)

two_d_array_reduction_prod_numba  588.462 ms
two_d_array_reduction_prod_numba  383.593 ms


In [None]:
@timed
@jit(cache=True, nopython=True, parallel=True)
def two_d_array_reduction_prod_numba_parallel(n):
    result = 2 * np.ones((13, 17), np.int_)
    tmp = 2 * np.ones_like(result)

    for i in numba.prange(n):
        result *= tmp

two_d_array_reduction_prod_numba_parallel(10000000)
two_d_array_reduction_prod_numba_parallel(10000000)

two_d_array_reduction_prod_numba_parallel  790.460 ms
two_d_array_reduction_prod_numba_parallel  170.034 ms


## `Спасибо за внимание!`