В формулировке заданий будет использоваться понятие **worker**. Это слово обозначает какую-то единицу параллельного выполнения, в случае питона это может быть **поток** или **процесс**, выбирайте то, что лучше будет подходить к конкретной задаче

В каждом задании нужно писать подробные аннотиции типов для:
1. Аргументов функций и классов
2. Возвращаемых значений
3. Классовых атрибутов (если такие есть)

В каждом задании нужно писать докстроки в определённом стиле (какой вам больше нравится) для всех функций, классов и методов

# Задание 1 (7 баллов)

В одном из заданий по ML от вас требовалось написать кастомную реализацию Random Forest. Её проблема состоит в том, что она работает медленно, так как использует всего один поток для работы. Добавление параллельного программирования в код позволит получить существенный прирост в скорости обучения и предсказаний.

В данном задании от вас требуется добавить возможность обучать случайный лес параллельно и использовать параллелизм для предсказаний. Для этого вам понадобится:
1. Добавить аргумент `n_jobs` в метод `fit`. `n_jobs` показывает количество worker'ов, используемых для распараллеливания
2. Добавить аргумент `n_jobs` в методы `predict` и `predict_proba`
3. Реализовать функционал по распараллеливанию в данных методах

В результате код `random_forest.fit(X, y, n_jobs=2)` и `random_forest.predict(X, y, n_jobs=2)` должен работать в ~1.5-2 раза быстрее, чем `random_forest.fit(X, y, n_jobs=1)` и `random_forest.predict(X, y, n_jobs=1)` соответственно

Если у вас по каким-то причинам нет кода случайного леса из ДЗ по ML, то вы можете написать его заново или попросить у однокурсника. *Детали* реализации ML части оцениваться не будут, НО, если вы поломаете логику работы алгоритма во время реализации параллелизма, то за это будут сниматься баллы

В задании можно использовать только модули из **стандартной библиотеки** питона, а также функции и классы из **sklearn** при помощи которых вы изначально писали лес

In [110]:
from sklearn.base import BaseEstimator
from sklearn.datasets import make_classification
import numpy as np #использовала в дз по мл, воть и оставила
from sklearn.tree import DecisionTreeClassifier
from concurrent.futures import ThreadPoolExecutor


class RandomForestClassifierCustom(BaseEstimator):
    
    def __init__(
        self, n_estimators=10, max_depth=None, max_features=None, random_state=100
    ):
        
    """
    Random Forest Classifier implementation.
    
    Parameters:
    -----------
    n_estimators : int, default=10
        The number of trees in the forest.
    max_depth : int or None, default=None
        The maximum depth of the tree. 
    max_features : int or None, default=None
        The number of features to consider when looking for the best split.
        None indicates that all features will be used.
    random_state : int, default=100
        Seed of the random number generator.
    """
        self.n_estimators = n_estimators
        self.max_depth = max_depth
        self.max_features = max_features
        self.random_state = random_state

        self.trees = []
        self.feat_ids_by_tree = []

    def fit(self, X, y, n_jobs):
        
        """
        Build a forest of decision trees from the training set.

        Parameters:
        -----------
        X : {array-like, sparse matrix} of shape (n_samples, n_features)
            The training input samples.
        y : array-like of shape (n_samples,)
            The target values.
        n_jobs : int or None, default=None
            The number of threads to use for parallel processing.
            None indicates that all available CPUs will be used.
            
        Returns:
        --------
        self : The fitted estimator.
        """
            
        with ThreadPoolExecutor(n_jobs) as pool:
            
            self.classes_ = sorted(np.unique(y)) 
            works = []
            
            for i in range(self.n_estimators):

                np.random.seed(self.random_state + i)

                features_ids = np.random.choice(X.shape[1], self.max_features, replace=False)
                self.feat_ids_by_tree.append(features_ids)

                bootstrap_ids = np.random.choice(X.shape[0], X.shape[0], replace=True)
                X_bootstrap = X[bootstrap_ids, :][:, features_ids]
                y_bootstrap = y[bootstrap_ids]

                tree = DecisionTreeClassifier(
                    max_depth=self.max_depth,
                    max_features=self.max_features,
                    random_state=self.random_state,
                )
                works.append(pool.submit(tree.fit, X_bootstrap, y_bootstrap))
                self.trees.append(tree)

        return self  
    
    def predict_proba(self, X, n_jobs):
        
        """
        Predict class probabilities for X.

        Parameters:
        -----------
        X : {array-like, sparse matrix} of shape (n_samples, n_features)
            The training input samples.
        n_jobs : int or None, default=None
            The number of threads to use for parallel processing.
            None indicates that all available CPUs will be used.
            
        Returns:
        --------
        probas : array-like of shape (n_samples, n_classes)
            The class probabilities of the input samples.
        """
        
        probas = np.zeros((X.shape[0], len(self.classes_)))
        
        with ThreadPoolExecutor(n_jobs) as pool:
            
            works = [pool.submit(tree.predict_proba, X[:, self.feat_ids_by_tree[i]])
                        for i, tree in enumerate(self.trees)]
            for w in works:
                probas += w.result()
                 
        return probas / len(self.trees)
    
    def predict(self, X, n_jobs):
        
        """
        Predict class labels for samples in X.
        
        Parameters
        ----------
        X : array-like of shape (n_samples, n_features)
            The input samples.
            
        n_jobs : int, default=None
            The number of jobs to run in parallel for predicting probabilities. 
            If None, then the number of jobs is set to the number of CPU cores.
        
        Returns
        -------
        y_pred : ndarray of shape (n_samples,)
            The predicted class labels for X.
        """
        
        probas = self.predict_proba(X, n_jobs)
        predictions = np.argmax(probas, axis=1)
        
        return predictions

X, y = make_classification(n_samples=100000)

In [111]:
random_forest = RandomForestClassifierCustom(max_depth=30, n_estimators=10, max_features=2, random_state=42)

In [112]:
%%time

_ = random_forest.fit(X, y, n_jobs=1)

CPU times: user 5.33 s, sys: 52.1 ms, total: 5.39 s
Wall time: 5.28 s


In [113]:
%%time

preds_1 = random_forest.predict(X, n_jobs=1)

CPU times: user 232 ms, sys: 11.8 ms, total: 244 ms
Wall time: 217 ms


In [114]:
random_forest = RandomForestClassifierCustom(max_depth=30, n_estimators=10, max_features=2, random_state=42)

In [115]:
%%time

_ = random_forest.fit(X, y, n_jobs=2)

CPU times: user 5.36 s, sys: 44.4 ms, total: 5.4 s
Wall time: 2.7 s


In [116]:
%%time

preds_2 = random_forest.predict(X, n_jobs=2)

CPU times: user 240 ms, sys: 13.1 ms, total: 253 ms
Wall time: 120 ms


In [117]:
(preds_1 == preds_2).all()   # Количество worker'ов не должно влиять на предсказания

True

#### Какие есть недостатки у вашей реализации параллельного Random Forest (если они есть)? Как это можно исправить? Опишите словами, можно без кода (+1 дополнительный балл)

Ответ пишите тут

# Задание 2 (9 баллов)

Напишите декоратор `memory_limit`, который позволит ограничивать использование памяти декорируемой функцией.

Декоратор должен принимать следующие аргументы:
1. `soft_limit` - "мягкий" лимит использования памяти. При превышении функцией этого лимита должен будет отображён **warning**
2. `hard_limit` - "жёсткий" лимит использования памяти. При превышении функцией этого лимита должно будет брошено исключение, а функция должна немедленно завершить свою работу
3. `poll_interval` - интервал времени (в секундах) между проверками использования памяти

Требования:
1. Потребление функцией памяти должно отслеживаться **во время выполнения функции**, а не после её завершения
2. **warning** при превышении `soft_limit` должен отображаться один раз, даже если функция переходила через этот лимит несколько раз
3. Если задать `soft_limit` или `hard_limit` как `None`, то соответствующий лимит должен быть отключён
4. Лимиты должны передаваться и отображаться в формате `<number>X`, где `X` - символ, обозначающий порядок единицы измерения памяти ("B", "K", "M", "G", "T", ...)
5. В тексте warning'ов и исключений должен быть указан текщий объём используемой памяти и величина превышенного лимита

В задании можно использовать только модули из **стандартной библиотеки** питона, можно писать вспомогательные функции и/или классы

В коде ниже для вас предопределены некоторые полезные функции, вы можете ими пользоваться, а можете не пользоваться

In [1]:
import multiprocessing
multiprocessing.set_start_method('fork')

In [8]:
import os
import psutil
import time
import warnings


def get_memory_usage(pid):    
    
    """
    Returns the current memory usage (in bytes) of a given process.

    Args:
    pid (int): The process ID for which to retrieve memory usage.

    Returns:
    int: The current memory usage (in bytes) of the process.
    """
    
    process = psutil.Process(pid)
    mem_info = process.memory_info()
    return mem_info.rss


def bytes_to_human_readable(n_bytes):
    
    """
    Converts a number of bytes into a human-readable format (e.g. 1.25GB).

    Args:
    n_bytes (int): The number of bytes to convert.

    Returns:
    str: The human-readable representation of the number of bytes.
    """
    
    symbols = ('K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
    prefix = {}
    for idx, s in enumerate(symbols):
        prefix[s] = 1 << (idx + 1) * 10
    for s in reversed(symbols):
        if n_bytes >= prefix[s]:
            value = float(n_bytes) / prefix[s]
            return f"{value:.2f}{s}"
    return f"{n_bytes}B"

def human_readable_to_bytes(size):
    
    """
    Converts a human-readable representation of a file size (e.g. '1.25GB') to its
    equivalent number of bytes.

    Args:
    size (str): The human-readable size string to convert.

    Returns:
    int: The number of bytes equivalent to the given size string.
    """

    num = float(size[:-1])
    factor = {'B': 1, 'K': 1024, 'M': 1024**2, 'G': 1024**3, 'T': 1024**4,
             'P': 1024**5, 'E': 1024**6, 'Z': 1024**7, 'Y': 1024**8}[size[-1]]
    
    return num * factor

def memory_limit(soft_limit=None, hard_limit=None, poll_interval=1):
    
    """
    A decorator that limits the memory usage of a decorated function.

    Args:
    soft_limit (str, optional): A string indicating the soft memory limit (e.g. '1.25GB').
        When the memory usage exceeds this limit, a warning will be raised.
    hard_limit (str, optional): A string indicating the hard memory limit (e.g. '2GB').
        When the memory usage exceeds this limit, a MemoryError will be raised.
    poll_interval (int, optional): The interval (in seconds) at which to poll the memory usage.

    Returns:
    function: A wrapper function that limits the memory usage of the decorated function.
    """
        
    def decorator(func):
        def wrapper(*args, **kwargs):
            
            if soft_limit is not None:
                soft_limit_bytes = human_readable_to_bytes(soft_limit)
            if hard_limit is not None:
                hard_limit_bytes = human_readable_to_bytes(hard_limit)
                
            memory_info_soft_limit = 0
            
            
            proc = multiprocessing.Process(target=func, args=args, kwargs=kwargs)
            proc.start()
            pid = proc.pid
                
            while proc.is_alive():
                
                memory_usage = get_memory_usage(pid)
                
                if hard_limit is not None:
                    if memory_usage > hard_limit_bytes:
                        message = f"Memory usage exceeds hard limit of {hard_limit}. \
                        \n Current usage: {bytes_to_human_readable(memory_usage)}. \
                        \n Limit exceeded by {bytes_to_human_readable(memory_usage-hard_limit_bytes)}"
                        
                        raise MemoryError(message)
                    
                if soft_limit is not None:
                    if memory_usage > soft_limit_bytes:
                        if memory_usage > memory_info_soft_limit:
                            memory_info_soft_limit = memory_usage
                    
                time.sleep(poll_interval)
                
            proc.join()
            
            if soft_limit is not None:
                if memory_info_soft_limit != 0:
                    message = f"Memory usage exceeds soft limit of {soft_limit}. \
                            \n Peak usage: {bytes_to_human_readable(memory_info_soft_limit)}. \
                            \n Limit exceeded by {bytes_to_human_readable(memory_info_soft_limit-soft_limit_bytes)}"
                    warnings.warn(message, UserWarning)
            
            #return все не могу разобраться как сделать, чтобы возвращать с функции ее ретерн(( хелп 
            #я нашла, что можно с помощью queue, но тогда надо менять фукцию саму, ну ретерн, как я поняла
            #а вот без этого как
            
        return wrapper
    return decorator

In [9]:
@memory_limit(soft_limit="200M", hard_limit="1.5G", poll_interval=0.1)
def memory_increment():
    """
    Функция для тестирования
    
    В течение нескольких секунд достигает использования памяти 1.89G
    Потребление памяти и скорость накопления можно варьировать, изменяя код
    """
    lst = []
    for i in range(50000000):
        if i % 500000 == 0:
            time.sleep(0.1)
        lst.append(i)
    return lst

In [10]:
memory_increment()

MemoryError: Memory usage exceeds hard limit of 1.5G.                         
 Current usage: 1.51G.                         
 Limit exceeded by 12.46M

# Задание 3 (11 баллов)

Напишите функцию `parallel_map`. Это должна быть **универсальная** функция для распараллеливания, которая эффективно работает в любых условиях.

Функция должна принимать следующие аргументы:
1. `target_func` - целевая функция (обязательный аргумент)
2. `args_container` - контейнер с позиционными аргументами для `target_func` (по-умолчанию `None` - позиционные аргументы не передаются)
3. `kwargs_container` - контейнер с именованными аргументами для `target_func` (по-умолчанию `None` - именованные аргументы не передаются)
4. `n_jobs` - количество workers, которые будут использованы для выполнения (по-умолчанию `None` - количество логических ядер CPU в системе)

Функция должна работать аналогично `***PoolExecutor.map`, применяя функцию к переданному набору аргументов, но с некоторыми дополнениями и улучшениями
    
Поскольку мы пишем **универсальную** функцию, то нам нужно будет выполнить ряд требований, чтобы она могла логично и эффективно работать в большинстве ситуаций

1. `target_func` может принимать аргументы любого вида в любом количестве
2. Любые типы данных в `args_container`, кроме `tuple`, передаются в `target_func` как единственный позиционный аргумент. `tuple` распаковываются в несколько аргументов
3. Количество элементов в `args_container` должно совпадать с количеством элементов в `kwargs_container` и наоборот, также значение одного из них или обоих может быть равно `None`, в иных случаях должна кидаться ошибка (оба аргумента переданы, но размеры не совпадают)

4. Функция должна выполнять определённое количество параллельных вызовов `target_func`, это количество зависит от числа переданных аргументов и значения `n_jobs`. Сценарии могут быть следующие
    + `args_container=None`, `kwargs_container=None`, `n_jobs=None`. В таком случае функция `target_func` выполнится параллельно столько раз, сколько на вашем устройстве логических ядер CPU
    + `args_container=None`, `kwargs_container=None`, `n_jobs=5`. В таком случае функция `target_func` выполнится параллельно **5** раз
    + `args_container=[1, 2, 3]`, `kwargs_container=None`, `n_jobs=5`. В таком случае функция `target_func` выполнится параллельно **3** раза, несмотря на то, что `n_jobs=5` (так как есть всего 3 набора аргументов для которых нам нужно получить результат, а лишние worker'ы создавать не имеет смысла)
    + `args_container=None`, `kwargs_container=[{"s": 1}, {"s": 2}, {"s": 3}]`, `n_jobs=5`. Данный случай аналогичен предыдущему, но здесь мы используем именованные аргументы
    + `args_container=[1, 2, 3]`, `kwargs_container=[{"s": 1}, {"s": 2}, {"s": 3}]`, `n_jobs=5`. Данный случай аналогичен предыдущему, но здесь мы используем и позиционные, и именованные аргументы
    + `args_container=[1, 2, 3, 4]`, `kwargs_container=None`, `n_jobs=2`. В таком случае в каждый момент времени параллельно будет выполняться **не более 2** функций `target_func`, так как нам нужно выполнить её 4 раза, но у нас есть только 2 worker'а.
    + В подобных случаях (из примера выше) должно оптимизироваться время выполнения. Если эти 4 вызова выполняются за 5, 1, 2 и 1 секунды, то параллельное выполнение с `n_jobs=2` должно занять **5 секунд** (не 7 и тем более не 10)

5. `parallel_map` возвращает результаты выполнения `target_func` **в том же порядке**, в котором были переданы соответствующие аргументы
6. Работает с функциями, созданными внутри других функций

Для базового решения от вас не ожидается **сверххорошая** оптимизация по времени и памяти для всех возможных случаев. Однако за хорошо оптимизированную логику работы можно получить до **+3 дополнительных баллов**

Вы можете сделать класс вместо функции, если вам удобнее

В задании можно использовать только модули из **стандартной библиотеки** питона

Ниже приведены тестовые примеры по каждому из требований

In [164]:
from concurrent.futures import ThreadPoolExecutor

def parallel_map(target_func,
                 args_container=None,
                 kwargs_container=None,
                 n_jobs=None):
    
    """
    Apply a function to multiple arguments in parallel.

    Args:
        target_func: A function to apply to the arguments.
        args_container: A container of arguments for the function. Default is None.
        kwargs_container: A container of keyword arguments for the function. Default is None.
        n_jobs: The number of threads to use. If None, the number of threads is set to the number of CPUs
            on the machine. Default is None.

    Raises:
        ValueError: If `args_container` and `kwargs_container` have different lengths.

    Returns:
        A list of the results of applying the function to the arguments.
    """
    
    if args_container is None:
        args_len = 0
    else:
        args_len = len(args_container)
    
    if kwargs_container is None:
        kwargs_len = 0
    else:
        kwargs_len = len(kwargs_container)
        
    if args_len != 0 and kwargs_len != 0:
        
        if args_len != kwargs_len:
            raise ValueError("args_container and kwargs_container should have same lengths")    
        
    if n_jobs is None:
        n_jobs = multiprocessing.cpu_count()

    if n_jobs > max(args_len, kwargs_len) and max(args_len, kwargs_len) != 0:
        n_jobs = max(args_len, kwargs_len)

    with ThreadPoolExecutor(n_jobs) as pool:
        
        results = []
        
        for i in range(max(args_len, kwargs_len)):
            if args_len == 0:
                args = []
            else:
                args = args_container[i]
                
            if kwargs_len == 0:
                kwargs = {}
            else:
                kwargs = kwargs_container[i]
                
            if isinstance(args, tuple) or args == []:
                result = pool.submit(target_func, *args, **kwargs)
                results.append(result)
                
            else:
                result = pool.submit(target_func, args, **kwargs)
                results.append(result)
            
        if args_len == 0 and kwargs_len == 0:
            for i in range(n_jobs):
                result = pool.submit(target_func)
                results.append(result)

    results_sum = []
    for result in results:
        results_sum.append(result.result())

    return results_sum

In [120]:
import time


# Это только один пример тестовой функции, ваша parallel_map должна уметь эффективно работать с ЛЮБЫМИ функциями
# Поэтому обязательно протестируйте код на чём-нибудбь ещё
def test_func(x=1, s=2, a=1, b=1, c=1):
    time.sleep(s)
    return a*x**2 + b*x + c

In [130]:
%%time

# Пример 2.1
# Отдельные значения в args_container передаются в качестве позиционных аргументов
parallel_map(test_func, args_container=[1, 2.0, 3j-1, 4])   # Здесь происходят параллельные вызовы: test_func(1) test_func(2.0) test_func(3j-1) test_func(4)

CPU times: user 2.89 ms, sys: 2.33 ms, total: 5.22 ms
Wall time: 2.01 s


[3, 7.0, (-8-3j), 21]

In [131]:
%%time

# Пример 2.2
# Элементы типа tuple в args_container распаковываются в качестве позиционных аргументов
parallel_map(test_func, [(1, 1), (2.0, 2), (3j-1, 3), 4])    # Здесь происходят параллельные вызовы: test_func(1, 1) test_func(2.0, 2) test_func(3j-1, 3) test_func(4)

CPU times: user 2.79 ms, sys: 2.04 ms, total: 4.83 ms
Wall time: 3.01 s


[3, 7.0, (-8-3j), 21]

In [132]:
%%time

# Пример 3.1
# Возможна одновременная передача args_container и kwargs_container, но количества элементов в них должны быть равны
parallel_map(test_func,
             args_container=[1, 2, 3, 4],
             kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}, {"s": 3}])

# Здесь происходят параллельные вызовы: test_func(1, s=3) test_func(2, s=3) test_func(3, s=3) test_func(4, s=3)

CPU times: user 3.04 ms, sys: 2.31 ms, total: 5.35 ms
Wall time: 3.01 s


[3, 7, 13, 21]

In [133]:
%%time

# Пример 3.2
# args_container может быть None, а kwargs_container задан явно
parallel_map(test_func,
             kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}, {"s": 3}])

CPU times: user 2.93 ms, sys: 2.29 ms, total: 5.23 ms
Wall time: 3.01 s


[3, 3, 3, 3]

In [134]:
%%time

# Пример 3.3
# kwargs_container может быть None, а args_container задан явно
parallel_map(test_func,
             args_container=[1, 2, 3, 4])

CPU times: user 5.63 ms, sys: 4.52 ms, total: 10.1 ms
Wall time: 2 s


[3, 7, 13, 21]

In [148]:
%%time

# Пример 3.4
# И kwargs_container, и args_container могут быть не заданы
parallel_map(test_func)

CPU times: user 3.68 ms, sys: 3.04 ms, total: 6.73 ms
Wall time: 2.01 s


[3, 3, 3, 3, 3, 3, 3, 3]

In [149]:
%%time

# Пример 3.4
# И kwargs_container, и args_container могут быть не заданы
parallel_map(test_func)

CPU times: user 3.72 ms, sys: 3.23 ms, total: 6.96 ms
Wall time: 2.01 s


[3, 3, 3, 3, 3, 3, 3, 3]

In [150]:
%%time

# Пример 3.5
# При несовпадении количеств позиционных и именованных аргументов кидается ошибка
parallel_map(test_func,
             args_container=[1, 2, 3, 4],
             kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}])

ValueError: args_container and kwargs_container should have same lengths

In [151]:
%%time

# Пример 4.1
# Если функция не имеет обязательных аргументов и аргумент n_jobs не был передан, то она выполняется параллельно столько раз, сколько ваш CPU имеет логических ядер
# В моём случае это 24, у вас может быть больше или меньше
parallel_map(test_func)

CPU times: user 3.87 ms, sys: 3.01 ms, total: 6.88 ms
Wall time: 2.01 s


[3, 3, 3, 3, 3, 3, 3, 3]

In [165]:
%%time

# Пример 4.2
# Если функция не имеет обязательных аргументов и передан только аргумент n_jobs, то она выполняется параллельно n_jobs раз
parallel_map(test_func, n_jobs=2)

CPU times: user 1.76 ms, sys: 1.85 ms, total: 3.61 ms
Wall time: 2.01 s


[3, 3]

In [166]:
%%time

# Пример 4.3
# Если аргументов для target_func указано МЕНЬШЕ, чем n_jobs, то используется такое же количество worker'ов, сколько было передано аргументов
parallel_map(test_func,
             args_container=[1, 2, 3],
             n_jobs=5)   # Здесь используется 3 worker'a

CPU times: user 3.47 ms, sys: 2.89 ms, total: 6.36 ms
Wall time: 2.01 s


[3, 7, 13]

In [167]:
%%time

# Пример 4.4
# Аналогичный предыдущему случай, но с именованными аргументами
parallel_map(test_func,
             kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}],
             n_jobs=5)   # Здесь используется 3 worker'a

CPU times: user 2.35 ms, sys: 1.78 ms, total: 4.13 ms
Wall time: 3 s


[3, 3, 3]

In [168]:
%%time

# Пример 4.5
# Комбинация примеров 4.3 и 4.4 (переданы и позиционные и именованные аргументы)
parallel_map(test_func,
             args_container=[1, 2, 3],
             kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}],
             n_jobs=5)   # Здесь используется 3 worker'a

CPU times: user 3.04 ms, sys: 2.56 ms, total: 5.6 ms
Wall time: 3.01 s


[3, 7, 13]

In [169]:
%%time

# Пример 4.6
# Если аргументов для target_func указано БОЛЬШЕ, чем n_jobs, то используется n_jobs worker'ов
parallel_map(test_func,
             args_container=[1, 2, 3, 4],
             kwargs_container=None,
             n_jobs=2)   # Здесь используется 2 worker'a

CPU times: user 2.65 ms, sys: 2.06 ms, total: 4.71 ms
Wall time: 4.01 s


[3, 7, 13, 21]

In [170]:
%%time

# Пример 4.7
# Время выполнения оптимизируется, данный код должен отрабатывать за 5 секунд
parallel_map(test_func,
             kwargs_container=[{"s": 5}, {"s": 1}, {"s": 2}, {"s": 1}],
             n_jobs=2)

CPU times: user 2.29 ms, sys: 1.57 ms, total: 3.86 ms
Wall time: 5 s


[3, 3, 3, 3]

In [171]:
def test_func2(string, sleep_time=1):
    time.sleep(sleep_time)
    return string

# Пример 5
# Результаты возвращаются в том же порядке, в котором были переданы соответствующие аргументы вне зависимости от того, когда завершился worker
arguments = ["first", "second", "third", "fourth", "fifth"]
parallel_map(test_func2,
             args_container=arguments,
             kwargs_container=[{"sleep_time": 5}, {"sleep_time": 4}, {"sleep_time": 3}, {"sleep_time": 2}, {"sleep_time": 1}])

['first', 'second', 'third', 'fourth', 'fifth']

In [172]:
%%time


def test_func3():
    def inner_test_func(sleep_time):
        time.sleep(sleep_time)
    return parallel_map(inner_test_func, args_container=[1, 2, 3])

# Пример 6
# Работает с функциями, созданными внутри других функций
test_func3()

CPU times: user 3.08 ms, sys: 2.56 ms, total: 5.65 ms
Wall time: 3 s


[None, None, None]