1.2. Создание пайплайна для обработки признаков

Принцип: берем список отобранных ранее полей из features, загружаем датафреймы, считываем данные из файла features.csv порциями по 100 тыс. строк, пропускаем полученные DataFrame'-ы через pipeline (заполнение пропусков), делаем left join через pd.DataFrame.merge и удаляем null-строки

In [1]:
import pandas as pd, numpy as np, luigi, dill

from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import FeatureUnion, make_pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from copy import deepcopy

#создаем класс выбора столбцов pipeline
class ColumnSelector(BaseEstimator, TransformerMixin):
    def __init__(self, columns):
        self.columns = columns

    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        assert isinstance(X, pd.DataFrame)

        try:
            return X[self.columns]
        except KeyError:
            cols_error = list(set(self.columns) - set(X.columns))
            raise KeyError("DataFrame не содердит следующие колонки: %s" % cols_error)




In [2]:
#загружаем поля
with open('cls.dat','rb') as f:
    f_ok = dill.load(f)

f_binary = f_ok[0]
f_categorical = f_ok[1]
f_numeric = f_ok[2]
f_ok = f_ok[0] + f_ok[1] + f_ok[2]

#Извлечем данные о пользователях
data_train = pd.read_csv('data_train.csv')
data_test = pd.read_csv('data_test.csv')

#Первый столбец в датафреймах - просто номер строки. Уберем его:
data_train = data_train[data_train.columns[1:]]
data_test = data_test[data_test.columns[1:]]

Все как на прошлом шаге.

ВНИМАНИЕ: SimpleImputer ТРЕБУЕТ ПО КРАЙНЕЙ МЕРЕ ОДНОГО МАССИВА НА ВХОДЕ. А ИНАЧЕ ОШИБКА
ValueError: at least one array or dtype is required

Поэтому, поскольку у нас нет категориальных признаков, мы не добавляем соответствующий трансформер

In [3]:
#создаем pipeline
f_prep_pipeline = make_pipeline(
    ColumnSelector(columns=f_ok),
    FeatureUnion(transformer_list=[
        ("numeric_features", make_pipeline(
            ColumnSelector(f_numeric),
            SimpleImputer(strategy="mean"),
            StandardScaler()
        )),
        #("categorical_features", make_pipeline(
        #    ColumnSelector(f_categorical),
        #    SimpleImputer(strategy="most_frequent"),
        #    OneHotEncoder(handle_unknown='ignore')
        #)),
        ("boolean_features", make_pipeline(
            ColumnSelector(f_binary),
        ))
    ])
    #ConcatenatorClear(columns=['id','buy_time'],X1 = data_train)
)
f_prep_pipeline.steps

[('columnselector',
  ColumnSelector(columns=['252', '25', '80', '14', '50', '27', '158', '178',
                          '183', '175', '1', '141', '74', '219', '124', '133',
                          '151', '37', '68', '16', '60', '251', '47', '97', '46',
                          '174', '91', '32', '109', '132', ...])),
 ('featureunion',
  FeatureUnion(transformer_list=[('numeric_features',
                                  Pipeline(steps=[('columnselector',
                                                   ColumnSelector(columns=['25',
                                                                           '80',
                                                                           '14',
                                                                           '50',
                                                                           '27',
                                                                           '158',
                                               

In [4]:
#Черновик
'''
class ConcatenatorClear(BaseEstimator, TransformerMixin):
    
    # Выполняет left join двух датафреймов
    # Датафреймы должны содержать обязательные колонки, по которым
    # будет производиться left join
    
    def __init__(self, columns, X1):
        self.columns = columns
        
        assert isinstance(X1, pd.DataFrame)
        
        if len(set(self.columns) - set(X1))==0:
            self.X1 = X1
        else:
            raise KeyError("ConcatenatorClear.init(): X1 содержитне все колонки из списка: %s" % self.columns)

    def fit(self, X, y=None):
        return self
    
    def transform(self, X2):
        assert isinstance(X2, pd.DataFrame) #проблема: X2 почему-то приходит типа sparse matrix вместо pd.DataFrame
        
        try:
            return self.X1.merge(X2, how='left', on=self.columns)
        except:
            raise KeyError("Второй DataFrame не содержит необходимые поля: %s" % self.columns)
'''

'\nclass ConcatenatorClear(BaseEstimator, TransformerMixin):\n    \n    # Выполняет left join двух датафреймов\n    # Датафреймы должны содержать обязательные колонки, по которым\n    # будет производиться left join\n    \n    def __init__(self, columns, X1):\n        self.columns = columns\n        \n        assert isinstance(X1, pd.DataFrame)\n        \n        if len(set(self.columns) - set(X1))==0:\n            self.X1 = X1\n        else:\n            raise KeyError("ConcatenatorClear.init(): X1 содержитне все колонки из списка: %s" % self.columns)\n\n    def fit(self, X, y=None):\n        return self\n    \n    def transform(self, X2):\n        assert isinstance(X2, pd.DataFrame) #проблема: X2 почему-то приходит типа sparse matrix вместо pd.DataFrame\n        \n        try:\n            return self.X1.merge(X2, how=\'left\', on=self.columns)\n        except:\n            raise KeyError("Второй DataFrame не содержит необходимые поля: %s" % self.columns)\n'

Настроим параметры цикла создания объединенных данных:

In [5]:
portion_n = 1           #начальная порция
portion_end = 100       #конечная порция
portion_strs = 100000   #число строк в порции

Запустим цикл:

In [7]:
for i in range(portion_n,portion_end):

    print('Portion {}... '.format(i),end='')
    
    #Теперь извлечем признаки пользователей (первые 100000 строк):
    if i==1:    
        features = pd.read_csv('features.csv',nrows=portion_strs,delimiter='\t')
        features_columns = list(features.columns) #названия столбцов таблицы из исходного файла
    else:
        features = pd.read_csv('features.csv', nrows=portion_strs, skiprows=(i-1)*portion_strs+1, delimiter='\t', header=0, names=features_columns)
    
    features = features[features.columns[1:]]
    
    #отдельно обработаем признак с номером 252: он бинарный, но иногда попадаются
    #выбросы. Из-за этого кол-во столбцов после pipeline получается не всегда одним и тем же. Удалим соответствующие строки
    ind_incor = features.loc[((features['252']!=0) & (features['252']!=1))].index
    features.drop(ind_incor,inplace=True)
    
    features_left = features[['id','buy_time']] #левая часть features
    
    try:
        features = f_prep_pipeline.fit_transform(features)
    except:
        print('Данные закончились')
        break
    
    n_features = features.shape[1]
    n_features
    
    #преобразование sparse-матрицы в DataFrame:
    #features = features.todense()
    features = pd.DataFrame(features)
    features = pd.concat((features_left,features),axis=1,ignore_index=True)
    
    #делаем left join таблиц data_train и features2
    features.columns = ['id','buy_time'] + list(features.columns)[:-2]
    data_train2 = data_train.merge(features, how='left', on=['id','buy_time'])
    data_test2 = data_test.merge(features, how='left', on=['id','buy_time'])
    
    #Ранее мы определили, что в data_train пропусков нет. Поэтому сейчас для большинства
    #строк нет признаков. Поэтому удалим все строки, которые содержат nan-значения:
    data_train2.dropna(inplace=True)
    data_test2.dropna(inplace=True)
        
    if i==1:
        #data_train_all = deepcopy(data_train2)
        data_train2.to_csv('data_train_all.csv',mode='w',header=True,index=False)
        data_test2.to_csv('data_test_all.csv',mode='w',header=True,index=False)
    else:
        #data_train_all = pd.concat((data_train_all,data_train2),axis=0,ignore_index=True)
        data_train2.to_csv('data_train_all.csv',mode='a',header=False,index=False)
        data_test2.to_csv('data_test_all.csv',mode='a',header=False,index=False)
    
    print('ended')

Portion 1... ended
Portion 2... ended
Portion 3... ended
Portion 4... ended
Portion 5... ended
Portion 6... ended
Portion 7... ended
Portion 8... ended
Portion 9... ended
Portion 10... ended
Portion 11... ended
Portion 12... ended
Portion 13... ended
Portion 14... ended
Portion 15... ended
Portion 16... ended
Portion 17... ended
Portion 18... ended
Portion 19... ended
Portion 20... ended
Portion 21... ended
Portion 22... ended
Portion 23... ended
Portion 24... ended
Portion 25... ended
Portion 26... ended
Portion 27... ended
Portion 28... ended
Portion 29... ended
Portion 30... ended
Portion 31... ended
Portion 32... ended
Portion 33... ended
Portion 34... ended
Portion 35... ended
Portion 36... ended
Portion 37... ended
Portion 38... ended
Portion 39... ended
Portion 40... ended
Portion 41... ended
Portion 42... ended
Portion 43... ended
Portion 44... ended
Portion 45... ended
Portion 46... ended
Portion 47... Данные закончились


Получается, мы ошиблись с размером данных - вместо 11 млн. строк в файле features.csv их кол-во составляет чуть более 4,6 млн.