Большие объемы данных часто являются возникновением проблем у специалистов по машинному обучению.

В машинном обучение специалисты часто сталкиваются с большими объемами данных из-за чего алгоритмы могут работать достаточно долго. Давайте посмотрим на примере алгоритма Random Forest, как применение параллельных вычислений ускоряет время выполнения алгоритма.


Для начала давайте разберемся, можем ли мы вообще применять параллельные вычисления в алгоритме Random Forest?

Напомню, что алгоритм случайного леса строит множество деревьев на подмножествах из исходных данных и усредняет их результаты. Почему бы не строить деревья параллельно?

In [2]:
import time 
import multiprocessing
import numpy as np 
from sklearn.ensemble import RandomForestClassifier 

#Generate a matrix 50000x1000 
size = 1000
num_cores = multiprocessing.cpu_count()
X = np.random.random((size, 1000)) 
y = np.random.randint(2, size=size) 

for n_jobs in range(1, num_cores+1): 
    start_time = time.time() 
    rfc = RandomForestClassifier(n_jobs=n_jobs)
    rfc.fit(X, y)
    end_time = time.time()

    print(f'n_jobs: {n_jobs} | time fitting: {end_time - start_time:.4f}') 

n_jobs: 1 | time fitting: 1.7408
n_jobs: 2 | time fitting: 0.8706
n_jobs: 3 | time fitting: 0.5813
n_jobs: 4 | time fitting: 0.4711
n_jobs: 5 | time fitting: 0.4176
n_jobs: 6 | time fitting: 0.3516
n_jobs: 7 | time fitting: 0.2986
n_jobs: 8 | time fitting: 0.2756
n_jobs: 9 | time fitting: 0.2752
n_jobs: 10 | time fitting: 0.2581
n_jobs: 11 | time fitting: 0.2451
n_jobs: 12 | time fitting: 0.2346
n_jobs: 13 | time fitting: 0.2252
n_jobs: 14 | time fitting: 0.2382
n_jobs: 15 | time fitting: 0.2255
n_jobs: 16 | time fitting: 0.2105


Объяснение, почему параллельные вычисления могут не работать на небольших размерах
Накладные расходы на параллелизм: Параллельные вычисления требуют дополнительных ресурсов для управления потоками или процессами. Если объем данных небольшой, затраты на запуск и координацию параллельных задач могут превышать выгоду от их параллельной обработки.

Сложность данных: Если массивы небольшие, время, необходимое для их обработки, может быть намного меньше времени, затрачиваемого на создание и управление потоками. В таких случаях использование единственного потока может оказаться более эффективным.

Наличие GIL (Global Interpreter Lock): В Python существует GIL, который ограничивает одновременное выполнение потоков. Хотя это не столь критично для вычислительно интенсивных задач, как в случае с NumPy (где операции выполняются в C), он может влиять на производительность в некоторых сценариях.

Эффективность кэширования: Параллельные вычисления могут нарушить кэширование данных, так как различные потоки могут обращаться к памяти. Для небольших массивов кэширование может быть более эффективным при использовании последовательного выполнения.

Таким образом, параллельные вычисления более эффективны при работе с большими объемами данных, где их преимущества могут быть более заметными.

Попробуем написать простую функцию которая бездействует одну секунду и выполним ее 50 раз.

In [3]:

from time import sleep
from typing import List
from joblib import Parallel, delayed


def parallel(n_jobs=-1):
    """Parallel computing"""
    result = Parallel(
        n_jobs=n_jobs, backend="multiprocessing", verbose=5 * n_jobs
    )(delayed(sleep)(0.2) for _ in range(50))
    return result


print(parallel(n_jobs=1))
print(parallel(n_jobs=2))

[Parallel(n_jobs=1)]: Done  17 tasks      | elapsed:    3.3s


[None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None]


[Parallel(n_jobs=2)]: Using backend MultiprocessingBackend with 2 concurrent workers.
[Parallel(n_jobs=2)]: Done   1 tasks      | elapsed:    0.4s
[Parallel(n_jobs=2)]: Done   4 tasks      | elapsed:    0.6s
[Parallel(n_jobs=2)]: Done   9 tasks      | elapsed:    1.2s
[Parallel(n_jobs=2)]: Done  14 tasks      | elapsed:    1.6s
[Parallel(n_jobs=2)]: Done  21 tasks      | elapsed:    2.4s
[Parallel(n_jobs=2)]: Done  28 tasks      | elapsed:    3.0s
[Parallel(n_jobs=2)]: Done  37 tasks      | elapsed:    4.0s
[Parallel(n_jobs=2)]: Done  46 tasks      | elapsed:    4.8s


[None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None]


[Parallel(n_jobs=2)]: Done  50 out of  50 | elapsed:    5.2s finished


Можно ли ускорить чтение данных?
Ваш коллега занимается обработкой естественного языка. Однако алгоритм в текущей реализации работает медленно. Вооружившись новыми знаниями о параллельных вычислениях, любезно согласились ему помочь.

Вот его текущий код:

In [5]:
import re
from string import punctuation

import pandas as pd
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from nltk.tokenize import word_tokenize


def clear_data(source_path: str, target_path: str):
    """Baseline process df

    Parameters
    ----------
    source_path : str
        Path to load dataframe from

    target_path : str
        Path to save dataframe to
    """
    data = pd.read_parquet(source_path)
    data = data.copy().dropna().reset_index(drop=True)

    lemmatizer = WordNetLemmatizer()

    cleaned_text_list = []
    for text in data["text"]:
        text = str(text)
        text = re.sub(r"https?://[^,\s]+,?", "", text)
        text = re.sub(r"@[^,\s]+,?", "", text)

        stop_words = stopwords.words("english")
        transform_text = text.translate(str.maketrans("", "", punctuation))
        transform_text = re.sub(" +", " ", transform_text)

        text_tokens = word_tokenize(transform_text)

        lemma_text = [
            lemmatizer.lemmatize(word.lower()) for word in text_tokens
        ]
        cleaned_text = " ".join(
            [str(word) for word in lemma_text if word not in stop_words]
        )
        cleaned_text_list.append(cleaned_text)

    data["cleaned_text"] = cleaned_text_list
    data.to_parquet(target_path)


# Задача
Вам необходимо реализовать функцию clear_data, которая может параллельно обработать тексты в датасете.

In [None]:
import re
from string import punctuation

import pandas as pd
from joblib import delayed
from joblib import Parallel
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from nltk.tokenize import word_tokenize


def clear_data(source_path: str, target_path: str, n_jobs: int):
    """Parallel process dataframe

    Parameters
    ----------
    source_path : str
        Path to load dataframe from

    target_path : str
        Path to save dataframe to

    n_jobs : int
        Count of job to process
    """
    data = pd.read_parquet(source_path)
    data = data.copy().dropna().reset_index(drop=True)

    lemmatizer = WordNetLemmatizer()
    cleaned_text_list = []
    
    def clean_text(text: str) -> str:
        """Функция для очистки текста."""
        text = str(text)
        text = re.sub(r"https?://[^,\s]+,?", "", text)
        text = re.sub(r"@[^,\s]+,?", "", text)

        stop_words = stopwords.words("english")
        transform_text = text.translate(str.maketrans("", "", punctuation))
        transform_text = re.sub(" +", " ", transform_text)

        text_tokens = word_tokenize(transform_text)

        lemma_text = [
            lemmatizer.lemmatize(word.lower()) for word in text_tokens
        ]
        cleaned_text = " ".join(
            [str(word) for word in lemma_text if word not in stop_words]
        )
        cleaned_text_list.append(cleaned_text)
        return cleaned_text

    result = Parallel(
        n_jobs=n_jobs, backend="multiprocessing", verbose=5 * n_jobs
    )(delayed(clean_text)(text) for text in data["text"])
    
    data["cleaned_text"] = result
    data.to_parquet(target_path)
    

In [6]:
import re
from string import punctuation

import pandas as pd
from joblib import delayed
from joblib import Parallel
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from nltk.tokenize import word_tokenize


def clean_text(text: str) -> str:
    """Функция для очистки текста."""
    lemmatizer = WordNetLemmatizer()

    # cleaned_text_list = []
    text = str(text)
    text = re.sub(r"https?://[^,\s]+,?", "", text)
    text = re.sub(r"@[^,\s]+,?", "", text)

    stop_words = stopwords.words("english")
    transform_text = text.translate(str.maketrans("", "", punctuation))
    transform_text = re.sub(" +", " ", transform_text)

    text_tokens = word_tokenize(transform_text)

    lemma_text = [
        lemmatizer.lemmatize(word.lower()) for word in text_tokens
    ]
    cleaned_text = " ".join(
        [str(word) for word in lemma_text if word not in stop_words]
    )
    # cleaned_text_list.append(cleaned_text)
    return cleaned_text


def clear_data(source_path: str, target_path: str, n_jobs: int):
    """Parallel process dataframe

    Parameters
    ----------
    source_path : str
        Path to load dataframe from

    target_path : str
        Path to save dataframe to

    n_jobs : int
        Count of job to process
    """
    data = pd.read_parquet(source_path)
    data = data.copy().dropna().reset_index(drop=True)

    result = Parallel(
        n_jobs=n_jobs, backend="multiprocessing", verbose=5 * n_jobs
    )(delayed(clean_text)(text) for text in data["text"])

    data["cleaned_text"] = result
    data.to_parquet(target_path)
    



In [None]:
import re
from string import punctuation

import pandas as pd
from joblib import delayed
from joblib import Parallel
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from nltk.tokenize import word_tokenize


def process_text(text: str):
    """Process text"""
    lemmatizer = WordNetLemmatizer()

    text = str(text)
    text = re.sub(r"https?://[^,\s]+,?", "", text)
    text = re.sub(r"@[^,\s]+,?", "", text)

    stop_words = stopwords.words("english")
    transform_text = text.translate(str.maketrans("", "", punctuation))
    transform_text = re.sub(" +", " ", transform_text)

    text_tokens = word_tokenize(transform_text)

    lemma_text = [lemmatizer.lemmatize(word.lower()) for word in text_tokens]
    cleaned_text = " ".join(
        [str(word) for word in lemma_text if word not in stop_words]
    )
    return cleaned_text


def clear_data(source_path: str, target_path: str, n_jobs: int):
    """Parallel process dataframe

    Parameters
    ----------
    source_path : str
        Path to load dataframe from

    target_path : str
        Path to save dataframe to

    n_jobs : int
        Count of job to process
    """
    data = pd.read_parquet(source_path)
    data = data.copy().dropna().reset_index(drop=True)

    count_stop_words = Parallel(n_jobs=n_jobs, backend="multiprocessing")(
        delayed(process_text)(str(text)) for text in data["text"]
    )

    data["cleaned_text"] = count_stop_words
    data.to_parquet(target_path)
