In [6]:
import pyarrow.parquet as pq
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
import os
import pandas as pd
import tqdm
import pyarrow as pa

import warnings
warnings.filterwarnings('ignore')

plt.style.use('fivethirtyeight')

In [7]:
# Задаем данные
target_path = "train_target.csv"
path = 'train_data'

target_df = pd.read_csv(target_path)

table = pq.read_table(path)
df = table.to_pandas()

print(f"Обработка файла завершена.")

Обработка файла завершена.


In [8]:
def read_parquet_dataset_from_local(path_to_dataset: str, start_from: int = 0,
                                    num_parts_to_read: int = 2, columns=None, verbose=False) -> pd.DataFrame:
    """
    читает num_parts_to_read партиций, преобразовывает их к pd.DataFrame и возвращает
    :param path_to_dataset: путь до директории с партициями
    :param start_from: номер партиции, с которой нужно начать чтение
    :param num_parts_to_read: количество партиций, которые требуется прочитать
    :param columns: список колонок, которые нужно прочитать из партиции
    :return: pd.DataFrame
    """

    res = []
    dataset_paths = sorted([os.path.join(path_to_dataset, filename) for filename in os.listdir(path_to_dataset)
                            if filename.startswith('train')])
    print(dataset_paths)

    start_from = max(0, start_from)
    chunks = dataset_paths[start_from: start_from + num_parts_to_read]
    if verbose:
        print('Reading chunks:\n')
        for chunk in chunks:
            print(chunk)
    for chunk_path in tqdm.tqdm_notebook(chunks, desc="Reading dataset with pandas"):
        print('chunk_path', chunk_path)
        chunk = pd.read_parquet(chunk_path,columns=columns)
        res.append(chunk)

    return pd.concat(res).reset_index(drop=True)

In [9]:
def prepare_transactions_dataset(path_to_dataset: str, num_parts_to_preprocess_at_once: int = 1, num_parts_total: int=50,
                                 save_to_path=None, verbose: bool=False):
    """
    возвращает готовый pd.DataFrame с признаками, на которых можно учить модель для целевой задачи
    path_to_dataset: str
        путь до датасета с партициями
    num_parts_to_preprocess_at_once: int
        количество партиций, которые будут одновременно держаться и обрабатываться в памяти
    num_parts_total: int
        общее количество партиций, которые нужно обработать
    save_to_path: str
        путь до папки, в которой будет сохранён каждый обработанный блок в .parquet-формате; если None, то не будет сохранён
    verbose: bool
        логирует каждую обрабатываемую часть данных
    """
    preprocessed_frames = []

    for step in tqdm.tqdm_notebook(range(0, num_parts_total, num_parts_to_preprocess_at_once),
                                   desc="Transforming transactions data"):
        df = read_parquet_dataset_from_local(path_to_dataset, step, num_parts_to_preprocess_at_once,
                                                             verbose=verbose)


        #здесь должен быть препроцессинг данных
        df['no_delinquencies'] = (df['is_zero_loans530'].astype(bool) & df['is_zero_loans3060'].astype(bool) &
                                  df['is_zero_loans90'].astype(bool) & df['is_zero_loans6090'].astype(bool) &
                                  df['is_zero_loans5'].astype(bool)).astype(int)

        df['total_delinquencies'] = (df['is_zero_loans530'] + df['is_zero_loans3060'] + df['is_zero_loans5'] +
                                     df['is_zero_loans90'] + df['is_zero_loans6090'])
        
        enc_loans_columns = ['enc_loans_account_holder_type', 'enc_loans_credit_status', 
                             'enc_loans_account_cur', 'enc_loans_credit_type', 'rn', 'id']
        filtered_columns = [col for col in df.columns if col.startswith('enc_paym_')]
        
        # Создание нового датафрейма с выбранными колонками
        new_df_to_ohe = df[enc_loans_columns + filtered_columns]
        for col in new_df_to_ohe.select_dtypes(include=['category']).columns:
            new_df_to_ohe[col] = new_df_to_ohe[col].astype('str')
        feature_columns = list(new_df_to_ohe.columns.values)
        feature_columns.remove("id")
        feature_columns.remove("rn")
        
        # Применяем ohe
        dummies = pd.get_dummies(new_df_to_ohe[feature_columns], columns=feature_columns)
        dummy_features = dummies.columns.values
        
        # Объединяем датафреймы 
        ohe_features = pd.concat([new_df_to_ohe, dummies], axis=1)
        ohe_features = ohe_features.drop(columns=feature_columns)
        
        # Группируем полученные данные по id и sum
        ohe_features.groupby("id")
        features = ohe_features.groupby("id")[dummy_features].sum().reset_index(drop=False)

        enc_loans_columns1 = ['enc_loans_account_holder_type', 'enc_loans_credit_status',
                              'enc_loans_account_cur', 'enc_loans_credit_type']
        
        # Определяем признаки для масштабирования
        df = df.drop(enc_loans_columns1, axis=1)
        df = df.drop(filtered_columns, axis=1)
        df_scale = df.drop('id', axis=1)
        
        cols_to_scale = df_scale.columns
        
        scaler = StandardScaler()
        scaler.fit(df[cols_to_scale])
        
        df[cols_to_scale] = scaler.transform(df[cols_to_scale])
        df = df.groupby('id').mean().reset_index()
        # Объединяем признаки с ohe и StandardScaler
        df = pd.merge(df, features, on='id', how='inner')
        
        #записываем подготовленные данные в файл
        if save_to_path:
            block_as_str = str(step)
            if len(block_as_str) == 1:
                block_as_str = '00' + block_as_str
            else:
                block_as_str = '0' + block_as_str
            df.to_parquet(os.path.join(save_to_path, f'processed_chunk_{block_as_str}.parquet'))

        preprocessed_frames.append(df)
    return pd.concat(preprocessed_frames, join='inner')

In [10]:
data = prepare_transactions_dataset(path, num_parts_to_preprocess_at_once=2, num_parts_total=12)

Transforming transactions data:   0%|          | 0/6 [00:00<?, ?it/s]

['train_data/train_data_0.pq', 'train_data/train_data_1.pq', 'train_data/train_data_10.pq', 'train_data/train_data_11.pq', 'train_data/train_data_2.pq', 'train_data/train_data_3.pq', 'train_data/train_data_4.pq', 'train_data/train_data_5.pq', 'train_data/train_data_6.pq', 'train_data/train_data_7.pq', 'train_data/train_data_8.pq', 'train_data/train_data_9.pq']


Reading dataset with pandas:   0%|          | 0/2 [00:00<?, ?it/s]

chunk_path train_data/train_data_0.pq
chunk_path train_data/train_data_1.pq
['train_data/train_data_0.pq', 'train_data/train_data_1.pq', 'train_data/train_data_10.pq', 'train_data/train_data_11.pq', 'train_data/train_data_2.pq', 'train_data/train_data_3.pq', 'train_data/train_data_4.pq', 'train_data/train_data_5.pq', 'train_data/train_data_6.pq', 'train_data/train_data_7.pq', 'train_data/train_data_8.pq', 'train_data/train_data_9.pq']


Reading dataset with pandas:   0%|          | 0/2 [00:00<?, ?it/s]

chunk_path train_data/train_data_10.pq
chunk_path train_data/train_data_11.pq
['train_data/train_data_0.pq', 'train_data/train_data_1.pq', 'train_data/train_data_10.pq', 'train_data/train_data_11.pq', 'train_data/train_data_2.pq', 'train_data/train_data_3.pq', 'train_data/train_data_4.pq', 'train_data/train_data_5.pq', 'train_data/train_data_6.pq', 'train_data/train_data_7.pq', 'train_data/train_data_8.pq', 'train_data/train_data_9.pq']


Reading dataset with pandas:   0%|          | 0/2 [00:00<?, ?it/s]

chunk_path train_data/train_data_2.pq
chunk_path train_data/train_data_3.pq
['train_data/train_data_0.pq', 'train_data/train_data_1.pq', 'train_data/train_data_10.pq', 'train_data/train_data_11.pq', 'train_data/train_data_2.pq', 'train_data/train_data_3.pq', 'train_data/train_data_4.pq', 'train_data/train_data_5.pq', 'train_data/train_data_6.pq', 'train_data/train_data_7.pq', 'train_data/train_data_8.pq', 'train_data/train_data_9.pq']


Reading dataset with pandas:   0%|          | 0/2 [00:00<?, ?it/s]

chunk_path train_data/train_data_4.pq
chunk_path train_data/train_data_5.pq
['train_data/train_data_0.pq', 'train_data/train_data_1.pq', 'train_data/train_data_10.pq', 'train_data/train_data_11.pq', 'train_data/train_data_2.pq', 'train_data/train_data_3.pq', 'train_data/train_data_4.pq', 'train_data/train_data_5.pq', 'train_data/train_data_6.pq', 'train_data/train_data_7.pq', 'train_data/train_data_8.pq', 'train_data/train_data_9.pq']


Reading dataset with pandas:   0%|          | 0/2 [00:00<?, ?it/s]

chunk_path train_data/train_data_6.pq
chunk_path train_data/train_data_7.pq
['train_data/train_data_0.pq', 'train_data/train_data_1.pq', 'train_data/train_data_10.pq', 'train_data/train_data_11.pq', 'train_data/train_data_2.pq', 'train_data/train_data_3.pq', 'train_data/train_data_4.pq', 'train_data/train_data_5.pq', 'train_data/train_data_6.pq', 'train_data/train_data_7.pq', 'train_data/train_data_8.pq', 'train_data/train_data_9.pq']


Reading dataset with pandas:   0%|          | 0/2 [00:00<?, ?it/s]

chunk_path train_data/train_data_8.pq
chunk_path train_data/train_data_9.pq


In [11]:
data.head()

Unnamed: 0,id,rn,pre_since_opened,pre_since_confirmed,pre_pterm,pre_fterm,pre_till_pclose,pre_till_fclose,pre_loans_credit_limit,pre_loans_next_pay_summ,...,enc_paym_22_2,enc_paym_22_3,enc_paym_23_0,enc_paym_23_1,enc_paym_23_2,enc_paym_23_3,enc_paym_24_1,enc_paym_24_2,enc_paym_24_3,enc_paym_24_4
0,0,-0.227671,-0.203621,-0.168349,-0.228292,-0.179451,0.842199,0.55344,-0.022794,0.511008,...,0,8,2,0,0,8,0,0,0,10
1,1,0.156628,0.375623,-0.159211,-0.315085,-0.083074,0.585842,-0.322131,-0.273967,-0.186453,...,0,11,3,0,0,11,3,0,0,11
2,2,-0.900194,-0.163016,0.485552,-0.247278,-0.516773,-0.392724,0.151151,-1.390292,-0.776346,...,0,2,0,1,0,2,0,0,0,3
3,3,0.252702,-0.395045,-0.22521,-0.133363,-0.111987,-0.354133,-0.053938,0.023172,0.146662,...,0,8,7,0,0,8,5,0,0,10
4,4,-1.092344,0.475065,0.130171,-0.816854,-0.067011,-1.164551,0.624432,0.390902,-1.019243,...,0,1,0,0,0,1,0,0,0,1


In [12]:
data.isna().sum().sum()

0

In [13]:
df = pd.merge(data, target_df[['id', 'flag']], on='id', how='left')
df = df.drop('id', axis=1)
print(f"Объединение файлов завершено.")
parquet_file_path = 'data_part_ohe_with_std.parquet'
# Преобразуем DataFrame в таблицу pyarrow
table = pa.Table.from_pandas(df)

# Сохраняем таблицу в формате Parquet
pq.write_table(table, parquet_file_path)
print(f"Сохранение файла завершено.")

Объединение файлов завершено.
Сохранение файла завершено.


In [15]:
df.total_delinquencies

0          0.343826
1         -0.839046
2         -0.941017
3          0.486586
4          0.486586
             ...   
2999995   -0.121921
2999996    0.465294
2999997    0.367425
2999998    0.269555
2999999    0.282605
Name: total_delinquencies, Length: 3000000, dtype: float64