# Cython应用——以DNN算法为例（1）: 矩阵计算

python作为一种动态类型的语言，运行效率不理想。其优化手段之一是使用cython，结合C的静态类型来优化已知类型的python代码。这里以DNN这一最简单机器学习算法为例，通过使用cython对其效率瓶颈的逐步优化，体验cython的一些进阶用法。

## 1. 使用python快速实现KNN算法

以下使用python简单实现一个KNN算法(忽略了一些变量合法性的确认)，KNN没有算法没有训练过程，因此其核心为预测函数case_pred的实现，通过调用numpy和heapq两个扩展包，每一步计算都可以通过一行代码实现，简明直观，开发效率较高。

In [1]:
import numpy as np
import heapq


class KNN:
    def __init__(self, train_x, train_y, k):
        """
        train_x, train_y: train data
        k: k value of knn
        """
        self.k = k
        self.train_x = train_x
        self.train_y = train_y

        self.n_case, self.n_dim = train_x.shape

    def case_pred(self, case_x):
        """单个case预测
        """
        distance_arr = self._calc_distance(case_x)  # 计算单个case与所有训练数据的欧式距离
        nearest_index = self._n_nearest_index(distance_arr)  # 距离最近的N个case的下标
        nearest_label = self.train_y[nearest_index]  # 距离最近的N个case的label
        case_y = np.bincount(nearest_label).argmax()  # 取最多的label作为预测值
        return case_y


    def _calc_distance(self, case):
        """计算case与所有训练数据的距离
        """
        # numpy的broadcase功能使其可以通过一行代码完成
        return np.sqrt(np.square(self.train_x - case).sum(axis=1))


    def _n_nearest_index(self, dist_arr):
        """ 获取top_n个最小值的下标
        """
        return heapq.nsmallest(self.k, range(self.n_case), lambda i: dist_arr[i])


以下生成100万条100维的数组作为训练数据，来测试上述代码的效率。可以看到预测一个case用时1.58秒。

In [2]:
# 生成实验数据
train_x = np.random.random([1000000, 100])  # 100万条100维的训练数据
train_y = np.random.randint(0, 2, 1000000)  # 1abel为0/1二分类
pred_x = np.random.random(100)  # 待预测的case

In [3]:
# 测试计算时间
knn_model = KNN(train_x, train_y, 9)
%timeit knn_model.case_pred(pred_x)

1.3 s ± 93.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


以下使用cProfile测试上述代码各个函数调用次数及其耗时，以发现代码的效率瓶颈和改进方向:
1. 第一列显示耗时最长的函数为'calc_distance'，该函数用来计算待预测case的特征向量与所有训练数据特征向量的欧氏距离
2. 其次为nsmallest，该函数在'n_nearest_index'被调用用来计算距离最近的k个case的下标

In [4]:
import cProfile
import pstats

cProfile.runctx("knn_model.case_pred(pred_x)", globals(), locals(), "Profile.prof")
s = pstats.Stats("Profile.prof")
s.strip_dirs().sort_stats("time").print_stats()

Sun Jul  5 09:23:33 2020    Profile.prof

         1000120 function calls in 1.446 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.922    0.922    1.042    1.042 <ipython-input-1-40056ac10278>:27(_calc_distance)
        1    0.258    0.258    0.403    0.403 heapq.py:461(nsmallest)
  1000000    0.145    0.000    0.145    0.000 <ipython-input-1-40056ac10278>:37(<lambda>)
        1    0.120    0.120    0.120    0.120 {method 'reduce' of 'numpy.ufunc' objects}
        1    0.000    0.000    1.446    1.446 <string>:1(<module>)
      100    0.000    0.000    0.000    0.000 {built-in method _heapq._heapreplace_max}
        1    0.000    0.000    1.446    1.446 {built-in method builtins.exec}
        1    0.000    0.000    0.000    0.000 <__array_function__ internals>:2(bincount)
        1    0.000    0.000    1.445    1.445 <ipython-input-1-40056ac10278>:17(case_pred)
        1    0.000    0.000    0.120    0.120 _m

<pstats.Stats at 0x7fa9e2417dd0>

## 2. calc_distance函数的优化

calc_distance函数涉及到的矩阵运算用numpy实现，numpy本身已经是底层经过优化的矩阵运算库，这里尝试用cython改写该部分函数，看看能不能继续实现性能的提升。

In [5]:
# 用numpy实现一个原始版本，作为效率提升的比较基准
def calc_distance_py(mat, case):
    """计算case与所有训练数据的距离
    """
    return np.sqrt(np.square(mat - case).sum(axis=1))

###  （1）通过cython高效率实现numpy的多维数组计算

cython中也可以如同一般python程序完全一样地使用numpy，但是这种方法无法明显提升运算效率，因为cython能快速调用numpy的ndarray（多维数据）数据结构，其具体使用方法可见官方文档(https://cython.readthedocs.io/en/latest/src/tutorial/numpy.html )，以下代码可以重点留意以下几点：
1. jupyter notebook中可以加载cython插件，其中第一行在jupyter魔法命令%%cython后可指定一些编译选项，-a能够生成的html标示出和python发生交互的代码，这些部分的代码通常是拖累代码性能的关键，需要尽量避免。
2. 需要通过"cimport numpy as cnp"，调用numpy模块的编译信息。可以看到代码中在用cdef声明变量类型时，使用的都是cnp模块中的数据类型（一般numpy的数据类型后加'_t'后缀）。这些类型适用于代码的编译
3. 注意代码中的numpy多维数组的变量声明格式为cnp.ndarray[cnp.float_t, ndim=2], 其中指定了数组的数据类型和维度值。如果仅声明为cnp.ndarray，数组的索引仍然会以python方式进行，降低运行效率。
4. 函数前的两个装饰器关闭了索引的相应功能以提升代码效率，相应的也要注意代码中索引值的正确性。
5. 虽然numpy中有sqrt等函数，但是最好通过cimport掉用C写成的函数，防止增加与python的交互而拉低性能。
6. 原始numpy的广播功能等在cython中也会增加与python的互动从而降低性能，所以相应的矩阵计算要写会循环模式。

In [6]:
# 为jupyter notebook加载cython插件
%load_ext cython

In [7]:
%%cython -a

import cython
import numpy as np
cimport numpy as cnp  # 调用numpy模块的cython版本，以提供编译时需要的信息
from libc.math cimport sqrt  # 使用c写成的函数以降低与python的交互量

@cython.boundscheck(False) # 关闭索引值是否越界的检查
@cython.wraparound(False)  # 关闭索引值为负的功能
def calc_distance_np(cnp.ndarray[cnp.float_t, ndim=2] mat, cnp.ndarray[cnp.float_t, ndim=1] vec):
    cdef int n_case = mat.shape[0]
    cdef int n_col = mat.shape[1]
    cdef size_t i, j
    cdef cnp.float_t diff, sqr, val
    
    # 注意ndarray的变量声明格式，这样能形实现对numpy多维数组的高效索引
    cdef cnp.ndarray[cnp.float_t, ndim=1] ret = np.empty(n_case, dtype=np.float)
    for i in range(n_case):
        val = 0
        for j in range(n_col):
            diff = mat[i, j] - vec[j]
            sqr = diff * diff
            val += sqr
        val = sqrt(val)
        ret[i] = val

    return ret

以下测试优化后的效果，可以发现性能与原始版本相比有了明显的提升

In [8]:
mat = np.random.random([10000, 100])
vec = np.random.random(100)
%timeit calc_distance_py(mat, vec)
%timeit calc_distance_np(mat, vec)

3.59 ms ± 348 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
1.15 ms ± 41.8 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)


### （2）typed memoryview

Typed memoryview是cython另外一个可以进行numpy多维数组操作的功能。实际上不光numpy多维数组，我们知道多维数组本质上是一块连续的内存空间和一个相应的索引方案，其存储方法与c语言的多维数组buffer一致。除此之外，python的bytes和array.array的数据类型都是如此，都支持python的buffer协议，python提供的memoryview函数可以对这一类支持buffer协议的数据进行访问。而cython中typed memoryview，则可以避免发生python交互实现对buffer更高效地访问。


cython的官方文档中对typed memoryview和旧版buffer访问方式进行了比较，指出其有以下三点优势：
1. 语法更加简明，比如二维数组的类型声明可以直接写float[:, :]，相比上以部分代码的类型声明更加简练。
2. 通常不需要GIL
3. 速度更快

从下面的改写来说，除了第一个优点，后两个优点并没有实现，效率上也没有提升。cython教程中指出，当buffer对象是python对象时，不能使用nogil。

In [11]:
%%cython  -a

import cython
import numpy as np
from libc.math cimport sqrt


@cython.boundscheck(False) # turn off bounds-checking for entire function
@cython.wraparound(False)  # turn off negative index wrapping for entire function
def calc_distance_mv(double[:, ::1] mat, double[:] vec):
    cdef int n_case = mat.shape[0]
    cdef int n_col = mat.shape[1]
    cdef size_t i, j
    cdef double diff, sqr, val
    
    ret = np.empty(n_case, dtype=np.double)
    cdef double[:] _ret = ret
    for i in range(n_case):
        val = 0
        for j in range(n_col):
            diff = mat[i, j] - vec[j]
            sqr = diff * diff
            val += sqr
        val = sqrt(val)
        _ret[i] = val
    
    return ret

In [12]:
mat = np.random.random([10000, 100])
vec = np.random.random(100)
%timeit calc_distance_py(mat, vec)
%timeit calc_distance_mv(mat, vec)

3.68 ms ± 244 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
1.06 ms ± 1.64 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)


### （3） 并行化

cython可以通过cython.parallel模块实现并行化，下面代码的改写中重点关注以下：
1. 通过cython.parallel的prange函数，实现循环的并行化。
2. 要实现并行化，需要释放GIL（with nogil ...）
3. 并行化后的cython代码在编译时需要增加OpenMP链接相关的参数(jupyter中写在第一行魔法命令%%cython后)

In [13]:
%env CC=gcc-9

env: CC=gcc-9


In [14]:
%%cython --compile-args=-fopenmp --link-args=-fopenmp -a

# cython: language_level=3

import cython
from cython.parallel import prange, parallel
cimport openmp
import numpy as np
from libc.math cimport sqrt


@cython.boundscheck(False) # turn off bounds-checking for entire function
@cython.wraparound(False)  # turn off negative index wrapping for entire function
def calc_distance_pl(double[:, ::1] mat, double[:] vec, int n_threads):
    cdef int n_case = mat.shape[0]
    cdef int n_col = mat.shape[1]
    cdef int i, j
    cdef double diff, sqr, val
    
    ret = np.empty(n_case, dtype=np.double)
    cdef double[:] _ret = ret
    
    with nogil, parallel(num_threads=n_threads):
        for i in prange(n_case):
            val = 0
            for j in range(n_col):
                diff = mat[i, j] - vec[j]
                sqr = diff * diff
                val += sqr
            val = sqrt(val)
            _ret[i] = val
    
    return ret

以下是以上三个版本的综合比较，经过cython改写的版本比普通python版本效率提升了三倍多，实现并行化后程序运行时间进一步缩短，是一个还不错的结果。

In [15]:
# speed test
mat = np.random.random([10000, 100])
vec = np.random.random(100)
%timeit calc_distance_py(mat, vec)
%timeit calc_distance_mv(mat, vec)
%timeit calc_distance_pl(mat, vec, 2)

3.22 ms ± 28.1 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
1.05 ms ± 3.24 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
538 µs ± 2.78 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)


### （4）KNN改写

将以上改写后的cython并行版本替换KNN中的相应函数，再次使用cProfile查看各函数的效率，目前calc_distance性能已经得到大幅优化，现在效率瓶颈发生在nsmallest函数上，这是下一步优化的函数。

In [16]:
import numpy as np
import heapq


class KNN_cython:
    def __init__(self, train_x, train_y, k):
        """
        train_x, train_y: train data
        k: k value of knn
        """
        self.k = k
        self.train_x = train_x
        self.train_y = train_y

        self.n_case, self.n_dim = train_x.shape

    def case_pred(self, case_x):
        """单个case预测
        """
        distance_arr = self._calc_distance(case_x)  # 计算单个case与所有训练数据的欧式距离
        nearest_index = self._n_nearest_index(distance_arr)  # 距离最近的N个case的下标
        nearest_label = self.train_y[nearest_index]  # 距离最近的N个case的label
        case_y = np.bincount(nearest_label).argmax()  # 取最多的label作为预测值
        return case_y


    def _calc_distance(self, case):
        """计算case与所有训练数据的距离
        """
        # numpy的broadcase功能使其可以通过一行代码完成
        return calc_distance_pl(self.train_x, case, 2)


    def _n_nearest_index(self, dist_arr):
        """ 获取top_n个最小值的下标
        """
        return heapq.nsmallest(self.k, range(self.n_case), lambda i: dist_arr[i])


In [18]:
knn_model_cython = KNN_cython(train_x, train_y, 9)
%timeit knn_model_cython.case_pred(pred_x)

336 ms ± 28.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [19]:
import cProfile
import pstats

cProfile.runctx("knn_model_cython.case_pred(pred_x)", globals(), locals(), "Profile.prof")
s = pstats.Stats("Profile.prof")
s.strip_dirs().sort_stats("time").print_stats()

Sun Jul  5 09:29:14 2020    Profile.prof

         1000118 function calls in 0.459 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.238    0.238    0.371    0.371 heapq.py:461(nsmallest)
  1000000    0.133    0.000    0.133    0.000 <ipython-input-16-fdfb5abade37>:37(<lambda>)
        1    0.087    0.087    0.087    0.087 {_cython_magic_bde84a3f043e0fb310aa1dcb6e380daa.calc_distance_pl}
        1    0.000    0.000    0.459    0.459 <string>:1(<module>)
      100    0.000    0.000    0.000    0.000 {built-in method _heapq._heapreplace_max}
        1    0.000    0.000    0.459    0.459 <ipython-input-16-fdfb5abade37>:17(case_pred)
        1    0.000    0.000    0.459    0.459 {built-in method builtins.exec}
        1    0.000    0.000    0.371    0.371 <ipython-input-16-fdfb5abade37>:34(_n_nearest_index)
        1    0.000    0.000    0.000    0.000 {built-in method numpy.core._multiarray_umath.implement_array_f

<pstats.Stats at 0x7fa91eb85190>