## Цель
- В этой записной книжке я оптимизировал некоторые методы предварительной обработки данных, которые вы можете найти здесь, мы увидим, как выполнять предварительную обработку с использованием ускорителя GPU. В частности, мы сравним GPU P100 и GPU T4 x2.

- СОВЕТ: никогда не выполняйте всю предварительную обработку на GPU (на Kaggle это ограничено), но также удобно использовать одновременно и CPU.

- Выполненные расчеты  выполнил некоторые стандартные расчеты для инженерии признаков: подсчет, последнее значение, количество уникальных значений

- Этот ноутбук представляет собой практическое руководство по оптимизации обработки данных, демонстрируя преимущества использования GPU. Особое внимание уделено сравнению производительности различных моделей GPU, что полезно для выбора подходящего оборудования для конкретных задач обработки данных.


In [10]:
!pip install pandas==1.3.3  # Пример установки совместимой версии Pandas
!pip install numpy==1.22.0  # Пример установки совместимой версии NumPy
# !pip install scipy --upgrade  # Обновление SciPy до последней версии



In [11]:
import gc  # Модуль для управления сборкой мусора в Python
import operator as op  # Модуль для работы с встроенными операторами Python
import numpy as np  # Библиотека для научных вычислений
import cupy as cp  # Библиотека для массивов на GPU, аналогичная NumPy
import pandas as pd  # Библиотека для анализа и обработки данных
from tqdm.auto import tqdm  # Инструмент для отображения прогресс-баров
import cudf  # Библиотека для работы с DataFrame на GPU
import time  # Модуль для работы со временем
import os  # Модуль для работы с операционной системой

import warnings 
warnings.filterwarnings('ignore')  # Отключение предупреждений для чистоты вывода



In [12]:
/
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

/kaggle/input/talkingdata-adtracking-fraud-detection/sample_submission.csv
/kaggle/input/talkingdata-adtracking-fraud-detection/train_sample.csv
/kaggle/input/talkingdata-adtracking-fraud-detection/test_supplement.csv
/kaggle/input/talkingdata-adtracking-fraud-detection/train.csv
/kaggle/input/talkingdata-adtracking-fraud-detection/test.csv


### GPU P100

In [13]:
import cudf

# Разделение нашего набора данных на N частей
num_parts = 4

def read_preprocess_divide(num_parts):
    """
    Чтение и предварительная обработка данных, а также их разделение на части.

    Параметры:
    num_parts (int): Количество частей, на которые необходимо разделить данные.

    Возвращает:
    DataFrame, общее количество строк, размер одной части.
    """
    # Желаемые столбцы
    columns = ['ip', 'channel', 'click_time']
    dtypes = {
        'ip': 'int32',
        'channel': 'int16',
        'click_time': 'datetime64[us]',
    }
    # Чтение данных с использованием cudf
    df = cudf.read_csv('../input/talkingdata-adtracking-fraud-detection/train.csv', usecols=columns, dtype=dtypes)
    all_rows = len(df)
    chunk = all_rows // num_parts
    # Сортировка набора данных по IP и сброс индекса
    df = df.sort_values(by=['ip', 'click_time']).reset_index(drop=True)
    return df, all_rows, chunk 

def window1(df):
    """
    Вычисление наиболее часто встречающегося значения (мода) и размера окна.

    Параметры:
    df (DataFrame): Набор данных для анализа.

    Возвращает:
    Размер окна.
    """
    # Вычисление моды и размера окна
    most_common = df['ip'].mode().values.tolist()[0]
    window = len(df[df['ip'] == most_common]) + 1
    return window

def feature_engineering(df, start, new_end):
    """
    Генерация признаков для заданного сегмента данных.

    Параметры:
    df (DataFrame): Исходный набор данных.
    start (int): Начальный индекс сегмента.
    new_end (int): Конечный индекс сегмента.

    Возвращает:
    DataFrame с новыми признаками.
    """
    if new_end is not None:
        end = new_end + 1
    else:
        end = None
    features = [c for c in list(df.columns) if c not in ['ip', 'click_time']]
    cat_function = ['count', 'last', 'nunique']    
    new_chunk = df[start:end].groupby('ip')[features].agg(cat_function)
    
#     # Проверка, является ли new_chunk.columns мультииндексом
#     if isinstance(new_chunk.columns, pd.MultiIndex):
#         # Обработка мультииндекса
#         new_chunk.columns = ['_'.join(col).strip() for col in new_chunk.columns.values]
#     else:
#         # Обычное присвоение имен столбцов
#         new_chunk.columns = ['_'.join(x) for x in new_chunk.columns]

#     new_chunk.reset_index(inplace=True)
    
    new_chunk.columns = ['_'.join(x) for x in new_chunk.columns]
    new_chunk.reset_index(inplace=True)
    diff_num_features = [f'diff_{col}' for col in features]
    df = df.to_pandas()
    ips = df[start:end]['ip'].values
    new_chunk_diff = df[start:end].groupby('ip')[features].diff().add_prefix('diff_')
    new_chunk_diff.insert(0, 'ip', ips)
    new_chunk_diff = cudf.DataFrame(new_chunk_diff)
    new_chunk_diff = new_chunk_diff.groupby('ip')[diff_num_features].agg(cat_function)
    new_chunk_diff.columns = ['_'.join(x) for x in new_chunk_diff.columns]
    new_chunk_diff.reset_index(inplace=True)
     # Объединение и сортировка результата
    new_chunk = new_chunk.merge(new_chunk_diff, how='inner', on='ip')
    new_chunk = new_chunk.sort_values(by=['ip']).reset_index(drop=True)
    return new_chunk

In [14]:
%%time

df, all_rows, chunk = read_preprocess_divide(num_parts)

CPU times: user 7.3 s, sys: 1.2 s, total: 8.5 s
Wall time: 8.44 s


Этот код особенно полезен при работе с большими наборами данных, когда требуется разделить данные на управляемые части и применить к каждой части сложную обработку или инженерию признаков

In [15]:
df.info()

<class 'cudf.core.dataframe.DataFrame'>
RangeIndex: 184903890 entries, 0 to 184903889
Data columns (total 3 columns):
 #   Column      Dtype
---  ------      -----
 0   ip          int32
 1   channel     int16
 2   click_time  datetime64[us]
dtypes: datetime64[us](1), int16(1), int32(1)
memory usage: 2.4 GB


In [16]:
%%time
# Функция для выбора безопасного окна строк
window = window1(df)

# Новый DataFrame для добавления результатов цикла for
new_df = cudf.DataFrame()

# Установка начального значения start в 0
start = 0
for p in range(0, num_parts):
    # Определение конечного индекса текущей части
    end = p * chunk + chunk

    if end < all_rows:
        # Выбор окна последних строк текущей части
        chunk_window = df[start:end].tail(window)

        # Нахождение второго с конца уникального значения IP
        second_last_unique = chunk_window['ip'].unique().values.tolist()[-2]

        # Нахождение индекса последней строки с этим IP
        new_end = chunk_window[chunk_window['ip'] == second_last_unique].tail(1).index[0]

        print(f"Обработка {(new_end + 1) - start} строк части N° {p + 1}")
        new_chunk = feature_engineering(df, start, new_end)
    else:
        # Обработка оставшихся строк, если достигнут конец DataFrame
        print(f"Обработка {all_rows - (new_end + 1)} строк части N° {p + 1}")
        new_chunk = feature_engineering(df, start, None)

    # Обновление начального индекса для следующей части
    start = new_end + 1

    # Добавление результатов в новый DataFrame
    new_df = new_df.append(new_chunk, ignore_index=True)


Обработка 46220468 строк части N° 1
Обработка 46231465 строк части N° 2
Обработка 46223993 строк части N° 3
Обработка 46227948 строк части N° 4
CPU times: user 2min 5s, sys: 15.3 s, total: 2min 20s
Wall time: 2min 19s


In [17]:
new_df

Unnamed: 0,ip,channel_count,channel_last,channel_nunique,diff_channel_count,diff_channel_last,diff_channel_nunique
0,1,47,113,15,46,0.0,28
1,5,24,205,13,23,104.0,20
2,6,1454,127,88,1453,0.0,467
3,9,4029,21,106,4028,-114.0,667
4,10,1180,466,97,1179,221.0,422
...,...,...,...,...,...,...,...
277390,364773,15,113,9,14,0.0,11
277391,364774,3,213,1,2,0.0,1
277392,364775,24,330,15,23,223.0,16
277393,364776,309,280,75,308,179.0,169


### GPU T4 x2


In [18]:
import cudf

def read_preprocess_divide(num_parts):
    """
    Читает и предварительно обрабатывает данные, разделяя их на заданное количество частей.

    Параметры:
    num_parts (int): Количество частей, на которые нужно разделить данные.

    Возвращает:
    Tuple[cudf.DataFrame, int, int]: DataFrame, общее количество строк и размер части.
    """
    # Определение столбцов, которые необходимо загрузить, и их типов данных
    columns = ['ip', 'channel', 'click_time']
    dtypes = {
        'ip': 'int32',
        'channel': 'int16',
        'click_time': 'datetime64[us]',
    }

    # Чтение данных с использованием cudf
    df = cudf.read_csv('../input/talkingdata-adtracking-fraud-detection/train.csv', usecols=columns, dtype=dtypes)
    all_rows = len(df)
    chunk = all_rows // num_parts  # Размер каждой части данных

    # Сортировка данных по IP и времени клика, сброс индекса для упорядочивания
    df = df.sort_values(by=['ip', 'click_time']).reset_index(drop=True)
    return df, all_rows, chunk 

def window(df):
    """
    Вычисляет размер окна на основе самого частого значения IP.

    Параметры:
    df (cudf.DataFrame): DataFrame, для которого необходимо вычислить размер окна.

    Возвращает:
    int: Размер окна.
    """
    # Нахождение самого частого значения IP
    most_common = df['ip'].mode().values.tolist()[0]
    # Вычисление размера окна как количество строк с самым частым IP плюс один
    window = len(df[df['ip'] == most_common]) + 1
    return window

def feature_engineering(df, start, new_end):
    """
    Выполняет инженерию признаков на части данных.

    Параметры:
    df (cudf.DataFrame): DataFrame для инженерии признаков.
    start (int): Начальный индекс части данных.
    new_end (int): Конечный индекс части данных.

    Возвращает:
    cudf.DataFrame: DataFrame с новыми признаками.
    """
    # Определение конечного индекса для обработки
    end = new_end + 1 if new_end is not None else None

    # Определение признаков для агрегации
    features = [c for c in list(df.columns) if c not in ['ip', 'click_time']]
    cat_function = ['count', 'last', 'nunique']    

    # Группировка и агрегация данных
    new_chunk = df[start:end].groupby('ip')[features].agg(cat_function)
    # Создание новых названий столбцов
    new_chunk.columns = ['_'.join(x) for x in new_chunk.columns]
    new_chunk.reset_index(inplace=True)

    # Вычисление разностей признаков
    diff_num_features = [f'diff_{col}' for col in features]
    df = df.to_pandas()
    ips = df[start:end]['ip'].values
    new_chunk_diff = df[start:end].groupby('ip')[features].diff().add_prefix('diff_')
    new_chunk_diff.insert(0, 'ip', ips)
    new_chunk_diff = cudf.DataFrame(new_chunk_diff)
    new_chunk_diff = new_chunk_diff.groupby('ip')[diff_num_features].agg(cat_function)
    new_chunk_diff.columns = ['_'.join(x) for x in new_chunk_diff.columns]
    new_chunk_diff.reset_index(inplace=True)

    # Объединение и сортировка результата
    new_chunk = new_chunk.merge(new_chunk_diff, how='inner', on='ip')
    new_chunk = new_chunk.sort_values(by=['ip']).reset_index(drop=True)
    return new_chunk


In [19]:
%%time
df, all_rows, chunk = read_preprocess_divide(num_parts)

CPU times: user 7.31 s, sys: 908 ms, total: 8.22 s
Wall time: 8.15 s


In [20]:
%%time
# Функция для выбора безопасного окна строк
window = window(df)

# Новый DataFrame для добавления результатов цикла for
new_df = cudf.DataFrame()

# Установка начального значения start в 0
start = 0
for p in range(0, num_parts):
    # Определение конечного индекса текущей части
    end = p * chunk + chunk

    if end < all_rows:
        # Выбор окна последних строк текущей части
        chunk_window = df[start:end].tail(window)

        # Нахождение второго с конца уникального значения IP
        second_last_unique = chunk_window['ip'].unique().values.tolist()[-2]

        # Нахождение индекса последней строки с этим IP
        new_end = chunk_window[chunk_window['ip'] == second_last_unique].tail(1).index[0]

        print(f"Обработка {(new_end + 1) - start} строк части N° {p + 1}")
        new_chunk = feature_engineering(df, start, new_end)
    else:
        # Обработка оставшихся строк, если достигнут конец DataFrame
        print(f"Обработка {all_rows - (new_end + 1)} строк части N° {p + 1}")
        new_chunk = feature_engineering(df, start, None)

    # Обновление начального индекса для следующей части
    start = new_end + 1

    # Добавление результатов в новый DataFrame
    new_df = new_df.append(new_chunk, ignore_index=True)


Обработка 46220468 строк части N° 1
Обработка 46231465 строк части N° 2
Обработка 46223993 строк части N° 3
Обработка 46227948 строк части N° 4
CPU times: user 2min 2s, sys: 15 s, total: 2min 17s
Wall time: 2min 16s


In [21]:
new_df

Unnamed: 0,ip,channel_count,channel_last,channel_nunique,diff_channel_count,diff_channel_last,diff_channel_nunique
0,1,47,113,15,46,0.0,28
1,5,24,205,13,23,104.0,20
2,6,1454,127,88,1453,0.0,467
3,9,4029,21,106,4028,-114.0,667
4,10,1180,466,97,1179,221.0,422
...,...,...,...,...,...,...,...
277390,364773,15,113,9,14,0.0,11
277391,364774,3,213,1,2,0.0,1
277392,364775,24,330,15,23,223.0,16
277393,364776,309,280,75,308,179.0,169


## РЕЗУЛЬТАТЫ GPU P100

Чтение, предварительная обработка и разделение заняли 1 минуту 8 секунд. Обработка с инженерией признаков заняла 2 минуты 17 секунд.

GPU T4 x2

Чтение, предварительная обработка и разделение заняли 1 минуту 34 секунды. Обработка с инженерией признаков заняла 2 минуты 28 секунд.

GPU P100, по-видимому, кажется более мощным, но давайте помнить, что GPU T4 x2 был разработан для обработки изображений и нейронных сетей. Мой совет для новичков - использовать только CPU и GPU P100 для табличных наборов данных и GPU T4 x2 для изображений.
