# 因式分解机（Factorization Machines）教程

**Amazon Sagemaker内置分类以及回归算法之一**

---

1. [笔记本简介](#笔记本简介)
2. [基本设置](#基本设置)
   1. [数据加载](#数据加载)
   2. [数据预处理](#数据预处理)
   3. [数据上传](#数据上传)
3. [模型训练](#模型训练)
4. [模型推理](#模型推理)
5. [性能评估](#性能评估)
6. [资源回收](#资源回收)

# 笔记本简介
***
Amazon Sagemaker Factorization Machines 从算法原理上看是对的线性模型的升级，适合处理高维稀疏的数据，可以高效地衡量稀疏数据中两两特征值的交互程度，胜任回归和分类任务两种场景。需要注意的是在分类任务中，目前只支持二分类。但在实际的应用场景中，更多的是将该算法用于推荐，因为推荐场景下，人和物的二维矩阵数据通常都是稀疏的，刚好符合算法本身的特性。这一点从Amazon Personalize服务中也可以看出。早期Personalize版本中曾经将Factorization Machines作为Predefined Recipe之一存在。

本Notebook将使用Factorization Machines算法解决一个二分类的问题，数据集是由UCI机器学习存储库提供的公开可用的成人收入数据集。该数据集包括约48842名匿名成年人的人口统计信息，以及他们的收入分类(“>$ 50,000”或“<$ 50,000”)。本练习的目标是使用收入类别(incom_cat)作为目标变量，针对该数据训练FM模型。记录包含14个属性:
* age: continuous.
* workclass: Private, Self-emp-not-inc, Self-emp-inc, Federal-gov, Local-gov, State-gov, Without-pay, Never-worked.
* fnlwgt: continuous.
* education: Bachelors, Some-college, 11th, HS-grad, Prof-school, Assoc-acdm, Assoc-voc, 9th, 7th-8th, 12th, Masters, 1st-4th, 10th, Doctorate, 5th-6th, Preschool.
* education-num: continuous.
* marital-status: Married-civ-spouse, Divorced, Never-married, Separated, Widowed, Married-spouse-absent, Married-AF-spouse.
* occupation: Tech-support, Craft-repair, Other-service, Sales, Exec-managerial, Prof-specialty, Handlers-cleaners, Machine-op-inspct, Adm-clerical, Farming-fishing, Transport-moving, Priv-house-serv, Protective-serv, Armed-Forces.
* relationship: Wife, Own-child, Husband, Not-in-family, Other-relative, Unmarried.
* race: White, Asian-Pac-Islander, Amer-Indian-Eskimo, Other, Black.
* sex: Female, Male.
* capital-gain: continuous.
* capital-loss: continuous.
* hours-per-week: continuous.
* native-country: United-States, Cambodia, England, Puerto-Rico, Canada, Germany, Outlying-US(Guam-USVI-etc), India, Japan, Greece, South, China, Cuba, Iran, Honduras, Philippines, Italy, Poland, Jamaica, Vietnam, Mexico, Portugal, Ireland, France, Dominican-Republic, Laos, Ecuador, Taiwan, Haiti, Columbia, Hungary, Guatemala, Nicaragua, Scotland, Thailand, Yugoslavia, El-Salvador, Trinadad&Tobago, Peru, Hong, Holand-Netherlands.
* income_cat:  2 income categories, either “>50K” or “<=50K”.

# 基本设置

本次笔记本中涉及的数据量不大，所以采用的是ml.c4.xlarge机型来进行模型的训练。
我们从指定数据存储桶和IAM角色开始：

* 指定要用于训练和存储模型数据的S3桶名以及前缀。S3的位置应该与Amazon Sagemaker服务位于同一Region。
* IAM角色用于提供对数据的训练和托管访问。有关如何创建IAM角色，请参阅Samemaker文档。注意，如果notebook实例、训练和托管需要多个角色，请使用相应IAM角色的ARN字符串进行替换。

In [None]:
import boto3
import sagemaker
from sagemaker import get_execution_role

sess = sagemaker.Session()
role = get_execution_role()
bucket = sess.default_bucket()
region_name = boto3.Session().region_name
prefix = 'gcr_sagemaker_workshop/classification_regression/fm'

# 数据加载

本次使用的数据已经提前下载到notebook实例的本地目录，我们使用pandas将数据读入内存中进行预处理，之所以这么做的原因是数据量不大。如果需要处理的数据量巨大，应该使用诸如Amazon EMR、Amazon Athena或者Amazon Redshift等服务来做预处理。在数据预处理完成后，我们将把数据上传到S3对应的存储桶中供Sagemaker训练使用。

In [None]:
import numpy as np                                
import pandas as pd                               
import matplotlib.pyplot as plt   
from IPython.display import Image                 
from IPython.display import display               
from sklearn.datasets import dump_svmlight_file   
from time import gmtime, strftime   
import os
import re
import sys                                        
import math                                       
import json
import boto3

In [None]:
data = pd.read_csv('./data/knn_fm-adult.csv')
data.head()

# 数据预处理

原始的CSV文件中，每个字符类型字段的值都带有空格这一类的不可见字符，这里统一去掉。

In [None]:
data['workclass'] = data['workclass'].apply(lambda workclass: workclass.strip())
data['education'] = data['education'].apply(lambda education: education.strip())
data['marital_status'] = data['marital_status'].apply(lambda marital_status: marital_status.strip())
data['occupation'] = data['occupation'].apply(lambda occupation: occupation.strip())
data['relationship'] = data['relationship'].apply(lambda relationship: relationship.strip())
data['race'] = data['race'].apply(lambda race: race.strip())
data['sex'] = data['sex'].apply(lambda sex: sex.strip())
data['native_country'] = data['native_country'].apply(lambda native_country: native_country.strip())
data['income_cat'] = data['income_cat'].apply(lambda income_cat: income_cat.strip())

检查数据特征中存在多少空值，可以发现没有空值存在，不需要特别处理。有空值存在的情况下，通常直接丢弃或者采用平均值等进行插值操作。

In [None]:
data.isnull().sum()

通常我们会观察下样本在目标分类特征上的分布情况，以获得较好的分类模型。

In [None]:
plt.bar(['<=50K', '>50K'], data['income_cat'].value_counts())
plt.show()

处理目标特征，将‘<= 50K’以及‘>50K'转换为0和1这样模型好处理的数值类型

In [None]:
data['y'] = (data['income_cat'].isin(['>50K']))
data['y'] = data['y'].apply(lambda y: 1 if y == True else 0)
data = data.drop(['income_cat'], axis=1)

同理使用one hot encode对其他的字符类型特征进行处理

In [None]:
model_data = pd.get_dummies(data)

In [None]:
model_data.head()

使用corr函数对特征之间的相关性做分析，并输出目标特征‘y’的结果。世界对单身🐶是充满恶意的。

In [None]:
corr_matrix = model_data.corr()
corr_matrix["y"].sort_values(ascending=False)

Sagemaker FM算法接受train和test两个通道数据，这里将原数据集按照1:9的比例随机地分为train和test两个数据集合。

In [None]:
train_data, test_data = np.split(model_data.sample(frac=1, random_state=1729), [int(0.9 * len(model_data))])

# 数据上传

因为Sagemaker FM算法只能接受包含精度为Float32张量的 recordIO-protobuf 格式文件，所以需要做数据格式的转换，转换后将train和test数据集合上传到对应的S3桶中。

In [None]:
import io
import numpy as np
import sagemaker.amazon.common as smac
train_labels = np.array(train_data['y']).astype('float32')
train_vectors = np.array(train_data.drop(['y'], axis=1)).astype('float32')
train_buf = io.BytesIO()
smac.write_numpy_to_dense_tensor(train_buf, train_vectors, train_labels)
train_buf.seek(0)

In [None]:
train_labels.shape

In [None]:
train_vectors.shape

In [None]:
test_labels = np.array(test_data['y']).astype('float32')
test_vectors = np.array(test_data.drop(['y'], axis=1)).astype('float32')
test_buf = io.BytesIO()
smac.write_numpy_to_dense_tensor(test_buf, test_vectors, test_labels)
test_buf.seek(0)

In [None]:
test_labels.shape

In [None]:
test_vectors.shape

In [None]:
key = 'recordio-pb-data-train'
boto3.resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'train', key)).upload_fileobj(train_buf)
s3_train_data = 's3://{}/{}/train/{}'.format(bucket, prefix, key)
print('uploaded training data location: {}'.format(s3_train_data))

In [None]:
key = 'recordio-pb-data-test'
boto3.resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'test', key)).upload_fileobj(test_buf)
s3_test_data = 's3://{}/{}/test/{}'.format(bucket, prefix, key)
print('uploaded test data location: {}'.format(s3_test_data))

定义存储训练好模型的S3桶

In [None]:
output_location = 's3://{}/{}/output'.format(bucket, prefix)
print('training artifacts will be uploaded to: {}'.format(output_location))

# 模型训练

我们对数据进行预处理并以正确的格式存储在了S3桶中，下一步就是使用数据实际来训练模型。由于这个数据相对较小，所以它的运行结果并不能展现Sagemaker内置算法过人的性能，但是官方在上TB数据的训练集合上得到了较好的结果。

我们将使用Amazon SageMaker Python SDK启动模型训练，并通过日志输出监控整个训练过程，直到完成为止。在本例中，整个训练耗时大概7分钟，但是大部分时间都是消耗在底层资源准备上，真正用于模型训练的收费时长在一分钟左右。

首先，让我们指定包含Factorization Machines算法的容器URI。关于算法容器的更多细节可以在[AWS文档](https://docs-aws.amazon.com/sagemaker/latest/dg/sagemaker-algo-docker-registration-paths.html)中找到。

In [None]:
from sagemaker.amazon.amazon_estimator import get_image_uri
container = get_image_uri(region_name, 'factorization-machines')
print(container)

In [None]:
fm = sagemaker.estimator.Estimator(container,
                                   role, 
                                   train_instance_count=1, 
                                   train_instance_type='ml.c4.xlarge',
                                   output_path=output_location,
                                   sagemaker_session=sess)

In [None]:
fm.set_hyperparameters(feature_dim=108,
                      predictor_type='binary_classifier',
                      epochs = 5,
                      mini_batch_size=1000,
                      num_factors=64)

In [None]:
fm.fit({'train': s3_train_data,'test': s3_test_data})

In [None]:
fm_predictor = fm.deploy(initial_instance_count=1, instance_type='ml.m4.xlarge')

# 模型推理

使用application/json作为推理请求的content_type，但是为了不必填充很多无意义的空值（虽然本次练习中没有空值数据，但是FM算法的常用场景中空值是普遍存在的），实现了新的序列化函数，直接取张量的一行，然后输出为json格式。

In [None]:
from sagemaker.predictor import json_deserializer

def fm_serializer(data):
    js = {'instances': []}
    for row in data:
        js['instances'].append({'features': row.tolist()})
    return json.dumps(js)

fm_predictor.content_type = 'application/json'
fm_predictor.serializer = fm_serializer
fm_predictor.deserializer = json_deserializer

In [None]:
fm_serializer(test_vectors)

In [None]:
result = fm_predictor.predict(test_vectors)
predictions = []
predictions += [r['predicted_label'] for r in result['predictions']]
predictions = np.array(predictions)

# 性能评估

输出混淆矩阵,其中行代表真实值，列代表预测值。可以看出结果和日志输出的模型在测试数据集合上的表现一致。

In [None]:
pd.crosstab(test_labels, predictions, rownames=['actuals'], colnames=['predictions'])

# 资源回收

删除相关模型推理的终端节点

In [None]:
sess.delete_endpoint(fm_predictor.endpoint)