# 第四章 部署

部署是将我们的模型用于实际应用的关键一步。部署的方式有很多，本章我们主要讨论两种常用的方式：定时任务和API。

## 定时任务

定时任务就是按一定规定时间或者间隔一定时间运行我们的程序，例如每天早上9点运行，每小时运行一次等。这种方式比较适合批量处理，例如数据清洗，或者训练模型等。    
接下来我们将介绍2个中方法，一种是在`Linux`系统下，使用`crontab`；另一种方法是使用`apscheduler`库。

### crontab

`crontab`可以用于设置周期性被执行的指令，其常用的命令如下：  

- `crontab -e`：编辑crontab
- `crontab -l`：列出crontab列表
- `crontab -r`：删除crontab

`crontab`的设置格式为`分 时 日 月 星期 要运行的命令`。例如，我们每天早上8点自动运行`main.py`的文件：

~~~bash
0 8 * * * python /home/user/project/main.py
~~~

`crontab`是`Linux`下常用的命令之一，使用非常简单，相关资料也比较多，本文在这里就不在赘述了，有兴趣的朋友可以搜索查阅相关资料。

### apscheduler

`apscheduler`是Python下的一个定时任务框架，在这里我们做下简单介绍。

`apscheduler`中最主要的配置就是调度器了。这里常用的主要有2种调度器

- BlockingScheduler：在调用start函数会阻塞当前线程，不能立即返回。适合调度程序是进程中唯一运行的进程。
- BackgroundScheduler：调用start函数后主线程不会阻塞。适合调度可以在后台运行的进程。

其调度方式中，一般常用的也是2中

- interval：调度间隔性的任务。
- cron：调度可以设置具体时间，与上一小节中的`crontab`比较像，但是更为精细，例如可以配置到秒级。

例如我们配置一个间隔3秒输出当前时间的脚本

In [1]:
import datetime
from apscheduler.schedulers.blocking import BlockingScheduler


# 定义一个要运行的任务
def echo_time():
    print('now is : {}'.format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))


# 定义一个调度器
scheduler = BlockingScheduler()
# 设置间隔为3秒
scheduler.add_job(echo_time, 'interval', seconds=3)
try:
    # 启动调度器
    scheduler.start()
except KeyboardInterrupt:
    print('Keyboard interrupt...')
except SystemExit:
    print('System exit...')
except Exception as e:
    print('Error: {}'.format(e))
finally:
    # 关闭调度器
    scheduler.shutdown()

now is : 2020-02-26 15:39:40
now is : 2020-02-26 15:39:43
now is : 2020-02-26 15:39:46
Keyboard interrupt...


`apscheduler`使用起来也比较方便，定义一个需要的调度器，然后通过`add_job`加入要运行的任务，配置其运行的策略，这里间隔3秒，随后启动调度器，最后在停止后，关闭调度器。  

例如，下面我们定一个需求，基于第三章的建模实现，现在我们每天早上8点，自动读入数据进行建模。

~~~python
from apscheduler.schedulers.blocking import BlockingScheduler
import pandas as pd

from model.pfm_model import Modeler
from preprocess.preprocessor import Preprocessor


# 定义一个建模任务
def build_model():
    data = pd.read_excel('pfm_data.xlsx')

    x = data.drop(columns='Attrition')
    y = data['Attrition']

    preprocessor = Preprocessor()
    x = preprocessor.preprocess_data(data=x)
    preprocessor.save_transformer('preprocessor.pickle')

    modeler = Modeler(x, y)
    modeler.smote()
    modeler.determine_hyper_params()
    modeler.train_model()
    modeler.save_model('logistics.model')

# 定义一个调度器
scheduler = BlockingScheduler()
# 设置间隔为3秒
scheduler.add_job(build_model, 'cron', hour=8)
try:
    # 启动调度器
    scheduler.start()
except KeyboardInterrupt:
    print('Keyboard interrupt...')
except SystemExit:
    print('System exit...')
except Exception as e:
    print('Error: {}'.format(e))
finally:
    # 关闭调度器
    scheduler.shutdown()
~~~

随后，我们可以将这个脚本保存为main_job.py，然后可以在`Linux`命令行中，在后台运行该脚本。

~~~bash
python main_job.py &
~~~



`apscheduler`还有许多高级功能，我们在这里就不再赘述了，有兴趣的话，可以查阅其官方文档和相关资料。

## API

对外提供API也是常用的部署方式，外部应用调用API获取相应的结果，其好处我们已经在第一章中`API`小节讨论过了。API方式一般比较适用于实时预测的场景，例如由上层应用通过API的形式调用预测结果。  
Python中有好几个框架支持API，我们在这里将会使用`flask`来实现。`flask`是Python库中一个轻量级的web框架，使用也较为方便。  
下面我们将进入`Landing`小节，用`flask`来构建一个简单的API服务，来实现使用上一章中训练好的模型通过API的形式对外提供预测服务。`flask`作为一个流行的web框架，有更多强大的功能，有兴趣的话，可以查阅更多的相关资料。

## Landing

本章的`Landing`部分我们将会搭建一个API服务，具体的代码可以参考`code/chp4`

要对外提供一个API服务，我们第一步应当是和调用方定义我们的接口，和调用方约定通过什么方法调用（`GET`、`POST`请求），传什么样的数据格式等问题。  
这里我们可以与调用方约定接受`POST`请求，使用`JSON`进行交互，然后关键的就是定义具体数据格式了。数据格式往往不是唯一的，可以有很多种方式，例如按列的方式，按行的方式等，我们在这里约定如下格式：

~~~JSON
{
    'data': [
        {
            'EmployeeNumber': 999,
            'Age': 37,
            'BusinessTravel': 'Travel_Rarely',
            'Department': 'Sales',
            'DistanceFromHome': 12,
            'Education': 4,
            'EducationField': 'Medical',
            'Gender': 'Female',
            'JobSatisfaction': 4,
            'MonthlyIncome': 9998,
            'NumCompaniesWorked': 4,
            'Over18': 'Y',
            'OverTime': 'No',
            'StandardHours': 80,
            'TotalWorkingYears': 12,
            'YearsAtCompany': 4
        },
        {
            'EmployeeNumber': 1001,
            'Age': 27,
            'BusinessTravel': 'Travel_Rarely',
            'Department': 'Sales',
            'DistanceFromHome': 25,
            'Education': 2,
            'EducationField': 'Medical',
            'Gender': 'Male',
            'JobSatisfaction': 2,
            'MonthlyIncome': 1200,
            'NumCompaniesWorked': 2,
            'Over18': 'Y',
            'OverTime': 'No',
            'StandardHours': 80,
            'TotalWorkingYears': 2,
            'YearsAtCompany': 1
        }
    ]
}
~~~

`data`字段中定义一个`list`，每个元素则是一个`dict`，定义各个字段的值。  
在确定好接口形式后，我们便可以开始实现我们的API了。

### 后端

我们需要先定义一个后端的功能。API首先需要读取模型，同时，也要读取预处理器，因为上游请求预测时，会传过来原始的数据，而不是经过预处理后的数据。随后使用读入的模型来实现预测方法。我们将这两个方法封装到一个类中。

In [None]:
import pickle

# 引入第二章完成的preprocessor模块
from preprocess.preprocessor import Preprocessor


class Predictor:
    def __init__(self, preprocessor_path, model_path):
        '''

        :param preprocessor_path: 预处理器路径
        :param model_path: 模型路径
        '''
        self.preprocessor_path = preprocessor_path
        self.model_path = model_path

        self.preprocessor = None
        self.model = None

    def load_model(self):
        '''
        读取模型和预处理器
        :return:
        '''
        # 读取预处理器
        self.preprocessor = Preprocessor()
        self.preprocessor.load_transformer(self.preprocessor_path)

        # 读取模型
        with open(self.model_path, 'wb') as f:
            print('loading model from {}'.format(self.model_path))
            self.model = pickle.load(self.model_path)

    def predict(self, data):
        '''
        进行预测
        :param data: 数据
        :return:
        '''
        data = self.preprocessor.preprocess_data(data, train_or_test=1)
        result = self.model.predict(data)
        return result

需要注意的是，`predict`方法中的`data`参数应当是一个`DataFrame`，因此在写相应API时，应当注意转换。  

### API接口

接下来我们可以定义API方法了。在`flask`中，需要先定义如何解析参数，我们可以使用`reqparse`模块中的`RequestParser`，然后根据约定的数据格式来定义参数解析类。

In [None]:
from flask_restful import Resource, reqparse
import pandas as pd

from backend.predictor import Predictor

args = reqparse.RequestParser()
# data字段后是list，在这里action需要使用append，类型则定义为dict
args.add_argument('data', type=dict, action='append', required=True)

将之前写好的`Predictor`类先实例化，并且读入模型。

In [None]:
predictor = Predictor('preprocessor.pickle', 'logistics.model')
predictor.load_model()

定义一个接口，继承`Resource`，并且按照之前所说的约定，实现`post`方法。

In [None]:
class Predict(Resource):
    def post(self):
        # 解析参数
        data = args.parse_args()
        # 将数据转为DataFrame
        data = pd.DataFrame(data['data'])
        
        result = predictor.predict(data)
        return result.tolist()

我们也可以定义个模型刷新的接口，其作用是更新了模型文件后，我们可以调用这个接口，使其重新读取模型文件，这样就不用重新启动服务才能更换新的模型了。这里我们使用`GET`方法。

In [None]:
class Reload(Resource):
    def get(self):
        predictor.load_model()

        return 'message: success to reload model'

### 启动API服务

最后我们需要启动我们的API服务，对外提供服务。`flask`框架中，我们需要初始化一个`Flask`类，然后添加API和其访问路径即可。

In [None]:
from flask import Flask
from flask_restful import Api

from apis.predict_api import Predict

app = Flask(__name__)
api = Api(app)

# 预测接口
api.add_resource(Predict, '/predict')
# 模型刷新接口
api.add_resource(Reload, '/reload')

if __name__ == '__main__':
    app.run()

### 调用API服务

在启动服务后，可以调用预测服务了。一般我们会将API部署在`Linux`服务器上，外部调用者可以通过你的ip进行请求。  
这里我们的服务是在本地启动的，因此，可以请求`http://127.0.0.1:5000/predict`来调用预测服务。

In [2]:
import json
import requests


# 服务地址
predict_url = 'http://127.0.0.1:5000/predict'
# 待预测的数据
data = {
    'data': [
        {
            'EmployeeNumber': 999,
            'Age': 37,
            'BusinessTravel': 'Travel_Rarely',
            'Department': 'Sales',
            'DistanceFromHome': 12,
            'Education': 4,
            'EducationField': 'Medical',
            'Gender': 'Female',
            'JobSatisfaction': 4,
            'MonthlyIncome': 9998,
            'NumCompaniesWorked': 4,
            'Over18': 'Y',
            'OverTime': 'No',
            'StandardHours': 80,
            'TotalWorkingYears': 12,
            'YearsAtCompany': 4
        },
        {
            'EmployeeNumber': 1001,
            'Age': 27,
            'BusinessTravel': 'Travel_Rarely',
            'Department': 'Sales',
            'DistanceFromHome': 25,
            'Education': 2,
            'EducationField': 'Medical',
            'Gender': 'Male',
            'JobSatisfaction': 2,
            'MonthlyIncome': 1200,
            'NumCompaniesWorked': 2,
            'Over18': 'Y',
            'OverTime': 'No',
            'StandardHours': 80,
            'TotalWorkingYears': 2,
            'YearsAtCompany': 1
        }
    ]
}

# 调用API服务
response = requests.post(predict_url, json=data)
if response.status_code != 200:
    print('error code @{}, content: {}'.format(response.status_code, response.content))
else:
    results = json.loads(response.content)
    print('predict results: {}'.format(results))

predict results: [0, 1]


我们可以看到返回的预测的结果分别是0和1，表示不会离职和会离职。我们再调用一下重新加载模型的接口。

In [3]:
reload_url = 'http://127.0.0.1:5000/reload'
response = requests.get(reload_url)
print(response.content.decode('utf-8'))

"message: success to reload model"



返回的结果提示我们已经成功重新加载了模型。

## 小结

本章主要讨论了模型完成，我们如何进行部署，主要讲述了定时任何和API两种方式。部署完成后，我们的模型便能开始实际应用了。  
本章讲述的还是比较浅显，在实践中，还需要考虑响应时间，机器资源消耗情况（如CPU消耗，内存消耗）等各方面情况，但是，希望通过我们这一章节，能够使大家能大致了解模型最终是如何被应用起来的。