In [1]:
import numpy as np
import pandas as pd
from multiprocessing import Pool
from functools import partial

假设现在有一个DataFrame，它有`100000`个样本，需要对每一个样本去进行统计。在`pandas`中一般会考虑`apply`执行，但其是串行执行的，效率很低。

In [2]:
df = pd.DataFrame(np.random.randn(100000, 205))

In [3]:
def calculate_mean(sample):
    """
    对一个样本的所有时间步分别按步长计算区间的均值。

    Args:
        sample (pandas.Series): 一条样本
    """
    # 只取前200个值
    values = np.asarray(sample)[:-5]

    # 分别保存值与列名
    results = []
    names = []

    for step in [5, 10, 15]:
        groups = len(values) // step

        for group_id in range(groups - 1):
            sub_values = values[group_id * step: (group_id + 1) * step]
            results.append(np.mean(sub_values))
            names.append('step_{}_group_{}'.format(step, group_id))

    return pd.Series(results, index=names)

In [4]:
%%time
results1 = df.apply(calculate_mean, axis=1)

CPU times: user 1min 16s, sys: 431 ms, total: 1min 16s
Wall time: 1min 16s


下面使用`multiprocessing`，将多个`100000`个样本进行分割，使用多进程并行执行，这样能大大缩短执行时间，提升效率。

In [5]:
def parallelize(input_df, target_func, n_jobs=8):
    """
    并行执行的入口。
    """
    # 使用`array_split`进行切分
    sub_dfs = np.array_split(input_df, n_jobs)

    # 开启进程池
    pool = Pool(n_jobs)
    results = pool.map(target_func, sub_dfs)
    pool.close()
    pool.join()

    return pd.concat(results)
 
def run_for_sub(target_func, sub_df):
    """
    为子样本集进行apply操作
    """
    return sub_df.apply(target_func, axis=1)

In [6]:
%%time
# `calculate_mean`在这里将送入`run_for_sub`作为其第一个参数。
# 使用`partial`的好处是比较灵活，可以送入将任意函数作为参数送入`run_for_sub`。
results2 = parallelize(df, partial(run_for_sub, calculate_mean), 4)

CPU times: user 207 ms, sys: 192 ms, total: 399 ms
Wall time: 18.7 s


In [7]:
assert (results1 == results2).all().all()

可以看到两次结果并不存在差别，但使用并行执行时，效率得到了很大提升。