填充nans、infs和naive_outlier

In [1]:
import pandas as pd
import numpy as np
import joblib
from tsfresh.utilities.dataframe_functions import impute
import os
from joblib import Parallel, delayed

In [2]:
# def load_sensor(data_no, idx):
#     sensor = joblib.load('./sensors_id_sort/0%d/%d.lz4'%(data_no, idx))
#     return sensor

def gen_boxplot_bound(ser):
    '''
    描述：
        输出箱线图的异常值边界,输出上下边界以及fillna()之后的结果
    '''
    ser = ser.fillna(method='pad')
    tmp = ser.values.ravel()
    q1 = np.percentile(tmp, 0.1)
    q3 = np.percentile(tmp, 99.9)
    iqr = q3 -q1
    up_bound = q3 + 10*iqr
    down_bound = q1 - 10*iqr
    return up_bound, down_bound, ser
    
def naive_outliers_impute(df):
    '''
    描述：
        naive_outliers->up_bound, down_bound
    '''

    for col in df.columns:
        up_bound, down_bound, ser = gen_boxplot_bound(df[col])
        
        df.loc[ser<down_bound, col] = down_bound
        df.loc[ser>up_bound, col] = up_bound
    return df

In [3]:
def clean_sensor(df):
    '''
    描述：
        填充nans->median, infs->minmax, naive_outliers->up_bound, down_bound
    '''
    df_1 = impute(df)
    df_2 = naive_outliers_impute(df_1)
    return df_2

def clean_sensor_parallel(train_no, csv_nos, opt_func):
    '''
    描述：
        并行清洗数据
    参数：
        train_no：第几个plc
        csv_nos：plc对应的sensor文件个数
    '''
    
    input_dir = './sensors_id_sort/0%d/'%train_no
    output_dir = './sensors_clean/0%d/'%train_no

    if not os.path.exists('./sensors_clean/'):
        os.mkdir('./sensors_clean')
    if not os.path.exists('./sensors_clean/0%d'%train_no):
        os.mkdir('./sensors_clean/0%d'%train_no)
    
    def basis_func(idx):
        sensor = joblib.load(input_dir + '%d.lz4'%idx)
        tmp = opt_func(sensor)
        joblib.dump(tmp, output_dir+'%d.lz4'%idx, compress='lz4')

    Parallel(n_jobs=48,verbose=10)(delayed(basis_func)(i) for i in range(1,csv_nos+1))

In [4]:
clean_sensor_parallel(1, 48, clean_sensor)

[Parallel(n_jobs=48)]: Using backend LokyBackend with 48 concurrent workers.
[Parallel(n_jobs=48)]: Done   3 out of  48 | elapsed:   53.1s remaining: 13.3min
[Parallel(n_jobs=48)]: Done   8 out of  48 | elapsed:  1.3min remaining:  6.5min
[Parallel(n_jobs=48)]: Done  13 out of  48 | elapsed:  1.8min remaining:  4.9min
[Parallel(n_jobs=48)]: Done  18 out of  48 | elapsed:  1.9min remaining:  3.2min
[Parallel(n_jobs=48)]: Done  23 out of  48 | elapsed:  2.0min remaining:  2.1min
[Parallel(n_jobs=48)]: Done  28 out of  48 | elapsed:  2.0min remaining:  1.4min
[Parallel(n_jobs=48)]: Done  33 out of  48 | elapsed:  2.0min remaining:   54.5s
[Parallel(n_jobs=48)]: Done  38 out of  48 | elapsed:  2.0min remaining:   31.6s
[Parallel(n_jobs=48)]: Done  43 out of  48 | elapsed:  2.0min remaining:   14.1s
[Parallel(n_jobs=48)]: Done  48 out of  48 | elapsed:  2.0min remaining:    0.0s
[Parallel(n_jobs=48)]: Done  48 out of  48 | elapsed:  2.0min finished


In [5]:
clean_sensor_parallel(2, 48, clean_sensor)

[Parallel(n_jobs=48)]: Using backend LokyBackend with 48 concurrent workers.
[Parallel(n_jobs=48)]: Done   3 out of  48 | elapsed:  1.3min remaining: 19.8min
[Parallel(n_jobs=48)]: Done   8 out of  48 | elapsed:  1.9min remaining:  9.5min
[Parallel(n_jobs=48)]: Done  13 out of  48 | elapsed:  2.0min remaining:  5.4min
[Parallel(n_jobs=48)]: Done  18 out of  48 | elapsed:  2.1min remaining:  3.4min
[Parallel(n_jobs=48)]: Done  23 out of  48 | elapsed:  2.1min remaining:  2.3min
[Parallel(n_jobs=48)]: Done  28 out of  48 | elapsed:  2.1min remaining:  1.5min
[Parallel(n_jobs=48)]: Done  33 out of  48 | elapsed:  2.1min remaining:   57.0s
[Parallel(n_jobs=48)]: Done  38 out of  48 | elapsed:  2.1min remaining:   33.2s
[Parallel(n_jobs=48)]: Done  43 out of  48 | elapsed:  2.1min remaining:   14.8s
[Parallel(n_jobs=48)]: Done  48 out of  48 | elapsed:  2.1min remaining:    0.0s
[Parallel(n_jobs=48)]: Done  48 out of  48 | elapsed:  2.1min finished


In [6]:
clean_sensor_parallel(3, 37, clean_sensor)

[Parallel(n_jobs=48)]: Using backend LokyBackend with 48 concurrent workers.
[Parallel(n_jobs=48)]: Done   2 out of  37 | elapsed:   43.5s remaining: 12.7min
[Parallel(n_jobs=48)]: Done   6 out of  37 | elapsed:  1.0min remaining:  5.4min
[Parallel(n_jobs=48)]: Done  10 out of  37 | elapsed:  1.4min remaining:  3.7min
[Parallel(n_jobs=48)]: Done  14 out of  37 | elapsed:  1.5min remaining:  2.5min
[Parallel(n_jobs=48)]: Done  18 out of  37 | elapsed:  1.5min remaining:  1.6min
[Parallel(n_jobs=48)]: Done  22 out of  37 | elapsed:  1.5min remaining:  1.1min
[Parallel(n_jobs=48)]: Done  26 out of  37 | elapsed:  1.6min remaining:   39.5s
[Parallel(n_jobs=48)]: Done  30 out of  37 | elapsed:  1.6min remaining:   21.8s
[Parallel(n_jobs=48)]: Done  34 out of  37 | elapsed:  1.6min remaining:    8.3s
[Parallel(n_jobs=48)]: Done  37 out of  37 | elapsed:  1.6min finished
