In [2]:
import tensorflow as tf
import pandas as pd
import numpy as np
import math
from sklearn.metrics import roc_auc_score
from sklearn.preprocessing import  MinMaxScaler
from sklearn.model_selection import train_test_split
import pickle
import gc

# Data 
criteo_data: label I1-I13 C1-C26

In [None]:
data = pd.read_csv('../../data/criteo_data/train.txt', sep='\t', header=None)
y = data[0]
X = data.drop([0], axis=1)

del data
gc.collect()

In [None]:
continuous_feature = list(range(1,14))
category_feature = list(range(14, 40))
X[continuous_feature] = X[continuous_feature].fillna(0)
X[category_feature] = X[category_feature].fillna('-1')
mms = MinMaxScaler(feature_range=(0, 1))
X[continuous_feature] = mms.fit_transform(X[continuous_feature])

In [None]:
X_trian, X_valid, y_train, y_valid = train_test_split(X, y, test_size=0.1)

In [None]:
class DataParse:
    def __init__(self, category_feature, continuous_feature, ignore_feature=[], feature_dict={}, feature_size=0, field_size=0):
        self.feature_dict = feature_dict
        self.feature_size = feature_size
        self.field_size = field_size
        self.ignore_feature = ignore_feature
        self.category_feature = category_feature
        self.continuous_feature = continuous_feature
        self.category_field_size = len(category_feature)
        self.continuous_size = len(continuous_feature)
    
    def FeatureDictionary(self, train, test):
        """
        目的是给每一个特征维度都进行编号。
        1. 对于离散特征，one-hot之后每一列都是一个新的特征维度(计算编号时，不算0)。所以，原来的一维度对应的是很多维度，编号也是不同的。
        2. 对于连续特征，原来的一维特征依旧是一维特征。
        返回一个feat_dict，用于根据原特征名称和特征取值 快速查询出 对应的特征编号。
        train: 原始训练集
        test:  原始测试集
        continuous_feature: 所有数值型特征
        ignore_feature: 所有忽略的特征. 除了数值型和忽略的，剩下的全部认为是离散型
        feat_dict, feat_size
             1. feat_size: one-hot之后总的特征维度。
             2. feat_dict是一个{}， key是特征string的col_name, value可能是编号（int），可能也是一个字典。
             如果原特征是连续特征： value就是int，表示对应的特征编号；
             如果原特征是离散特征：value就是dict，里面是根据离散特征的 实际取值 查询 该维度的特征编号。 因为离散特征one-hot之后，
             一个取值就是一个维度，而一个维度就对应一个编号。
        """
        df = pd.concat([train, test], axis=0)
        feat_dict = {}
        total_cnt = 0
        
        for col in df.columns:
            # 连续特征只有一个编号
            if col in self.continuous_feature:
                feat_dict[col] = total_cnt
                total_cnt = total_cnt + 1
            elif col in self.category_feature:
                unique_vals = df[col].unique()
                unique_cnt = df[col].nunique()
                feat_dict[col] = dict(zip(unique_vals, range(total_cnt, total_cnt + unique_cnt)))
                total_cnt = total_cnt + unique_cnt
        
        self.feature_size = total_cnt
        self.feature_dict = feat_dict
        print('feat_dict=', feat_dict)
        print('feature_size=', total_cnt)
    
    def parse(self, df):
        dfi = df.copy()
        dfv = df.copy()
        for col in dfi.columns:
            if col in self.ignore_feature:
                dfi.drop([col], axis=1, inplace=True)
                dfv.drop([col], axis=1, inplace=True)

            elif col in self.continuous_feature:  # 连续特征1个维度，对应1个编号，这个编号是一个定值
                dfi[col] = self.feature_dict[col]

            elif col in self.category_feature:  # 离散特征。不同取值对应不同的特征维度，编号也是不同的。
                dfi[col] = dfi[col].map(self.feature_dict[col])
                dfv[col] = 1.0

        feature_index = dfi.values.tolist()
        feature_val = dfv.values.tolist()
        self.field_size = len(feature_index[0])
        del dfi, dfv
        gc.collect()

        return feature_index, feature_val

In [None]:
dataParse = DataParse(continuous_feature=continuous_feature, category_feature=category_feature)
dataParse.FeatureDictionary(X_trian, X_valid)
train_feature_index, train_feature_val = dataParse.parse(X_trian)
valid_feature_index, valid_feature_val = dataParse.parse(X_valid)

In [None]:
print('feature_num', dataParse.feature_size)
print('field_num', dataParse.field_size)
print('category_field_size', dataParse.category_field_size)

### Data Persistent

In [None]:
y_train.to_csv('../../data/criteo_data/train_y.txt', header=None, index=False)
y_valid.to_csv('../../data/criteo_data/valid_y.txt', header=None, index=False)

In [None]:
train_feature_index = pd.DataFrame(train_feature_index)
train_feature_index.to_csv('../../data/criteo_data/train_index.txt', header=None, index=False, sep='\t')

train_feature_val = pd.DataFrame(train_feature_val)
train_feature_val.to_csv('../../data/criteo_data/train_value.txt', header=None, index=False, sep='\t')

valid_feature_index = pd.DataFrame(valid_feature_index)
valid_feature_index.to_csv('../../data/criteo_data/valid_index.txt', header=None, index=False, sep='\t')

valid_feature_val = pd.DataFrame(valid_feature_val)
valid_feature_val.to_csv('../../data/criteo_data/valid_value.txt', header=None, index=False, sep='\t')

# Model

In [3]:
BATCH_SIZE = 1024

In [4]:
def get_batch_dataset(label_path, idx_path, value_path):
    label = tf.data.TextLineDataset(label_path)
    idx = tf.data.TextLineDataset(idx_path)
    value = tf.data.TextLineDataset(value_path)

    label = label.map(lambda x: tf.strings.to_number(tf.strings.split(x, sep='\t')), num_parallel_calls=12)
    idx = idx.map(lambda x: tf.strings.to_number(tf.strings.split(x, sep='\t')), num_parallel_calls=12)
    value = value.map(lambda x: tf.strings.to_number(tf.strings.split(x, sep='\t')), num_parallel_calls=12)

    batch_dataset = tf.data.Dataset.zip((label, idx, value))
    batch_dataset = batch_dataset.shuffle(buffer_size=BATCH_SIZE*5)
    batch_dataset = batch_dataset.batch(BATCH_SIZE)
    batch_dataset = batch_dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
    return batch_dataset

In [44]:
class DeepFM_layer(tf.keras.Model):
    def __init__(self,
                 num_feat,
                 num_field,
                 embedding_size=10,
                 drop_keeps=[1,1,1],
                 deep_layer_sizes=[64,64,64],
                 reg_l1=0.01,
                 reg_l2=1e-5):
        super().__init__()
        self.num_feat = num_feat
        self.num_field = num_field
        self.embedding_size = embedding_size
        self.drop_keeps = drop_keeps
        self.deep_layer_sizes = deep_layer_sizes
        self.reg_l1 = reg_l1
        self.reg_l2 = reg_l2
        
        # Embedding layer
        self.embedding = tf.keras.layers.Embedding(num_feat, embedding_size)
        
        # FM part
        self.fm_weight = tf.keras.layers.Embedding(num_feat, 1)
        self.fm_bias = tf.Variable(tf.random.normal(shape=[1]))
        
        # Deep part
        for i in range(len(deep_layer_sizes)):
            setattr(self, 'dense_' + str(i), tf.keras.layers.Dense(deep_layer_sizes[i]))
            setattr(self, 'batchNorm_' + str(i), tf.keras.layers.BatchNormalization())
            setattr(self, 'activation_' + str(i), tf.keras.layers.Activation('relu'))
            setattr(self, 'dropout_' + str(i), tf.keras.layers.Dropout(drop_keeps[i]))
        
        self.fc = tf.keras.layers.Dense(1, activation=None)
    
    def call(self, fea_index, fea_value):
        fea_value = tf.expand_dims(fea_value, axis=-1)     # None * num_field * 1
        
        # FM first-order
        fm_weight = self.fm_weight(fea_index)              # num_field * 1
        fm_first = tf.reduce_sum(tf.multiply(fm_weight, fea_value), axis=2)   # None * num_field 
        # FM second-order
        embedding = self.embedding(fea_index)              # num_field * embedding_size
        embedding = tf.multiply(embedding, fea_value)      # None * num_field * embedding_size
        sum_square = tf.square(tf.reduce_sum(embedding, axis=1))  # None * embedding_size
        square_sum = tf.reduce_sum(tf.square(embedding), axis=1)  # None * embedding_size
        fm_second = 0.5 * tf.subtract(sum_square, square_sum)
        
        fm_out = tf.concat([fm_first, fm_second], axis=1) # None * (num_field + embedding_size)
        # Deep
        y_deep = tf.reshape(embedding, shape=(-1, self.num_field * self.embedding_size))
        for i in range(len(self.deep_layer_sizes)):
            y_deep = getattr(self, 'dense_' + str(i))(y_deep)
            y_deep = getattr(self, 'batchNorm_' + str(i))(y_deep)
            y_deep = getattr(self, 'activation_' + str(i))(y_deep)
            y_deep = getattr(self, 'dropout_' + str(i))(y_deep)
        
        #out
        concat_in = tf.concat([fm_out, y_deep], axis=1)
        out = self.fc(concat_in)
        
        return out

In [34]:
def cross_entropy_loss(y_true, y_pred):
    return tf.reduce_mean(tf.losses.binary_crossentropy(y_true, y_pred))

In [35]:
def train_one_step(model, optimizer, idx, value, label):
    with tf.GradientTape() as tape:
        output = model(idx, value)
        loss = cross_entropy_loss(y_true=label, y_pred=output)

        reg_loss = []
        for p in model.trainable_variables:
            reg_loss.append(tf.nn.l2_loss(p))
        reg_loss = tf.reduce_sum(tf.stack(reg_loss))
        loss = loss + model.reg_l2 * reg_loss

    grads = tape.gradient(loss, model.trainable_variables)
    grads = [tf.clip_by_norm(g, 100) for g in grads]
    optimizer.apply_gradients(grads_and_vars=zip(grads, model.trainable_variables))
    return loss

In [36]:
def train_model(model, train_batch_dataset, optimizer, epoch):
    for batch_idx, (label, idx, value) in enumerate(train_batch_dataset):
        if len(label) == 0:
            break

        loss = train_one_step(model, optimizer, idx, value, label)

        if batch_idx % 100 == 0:
            print('Train Epoch:{}, Step:{}, Loss:{:.6f}'.format(epoch, batch_idx, loss.numpy()))

In [37]:
def test_model(model, test_batch_dataset):
    pred_y, true_y = [], []
    binaryloss = tf.keras.metrics.BinaryCrossentropy()
    for batch_idx, (label, idx, value) in enumerate(test_batch_dataset):
        if len(label) == 0:
            break

        output = model(idx, value)
        binaryloss.update_state(y_true=label, y_pred=output)
        pred_y.extend(list(output.numpy()))
        true_y.extend(list(label.numpy()))
    print('Roc AUC: %.5f' % roc_auc_score(y_true=np.array(true_y), y_score=np.array(pred_y)))
    print('LogLoss: %.5f' % binaryloss.result())

In [45]:
deep_fm = DeepFM_layer(num_feat=2605299, num_field=39, embedding_size=8,
                       drop_keeps=[0.5, 0.5, 0.5], deep_layer_sizes=[400,400,400],
                       reg_l1=0.01, reg_l2=1e-5)

In [38]:
train_label_path='../../data/criteo_data/train_y.txt'
train_idx_path='../../data/criteo_data/train_index.txt'
train_value_path='../../data/criteo_data/train_value.txt'

valid_label_path='../../data/criteo_data/valid_y.txt'
valid_idx_path='../../data/criteo_data/valid_index.txt'
valid_value_path='../../data/criteo_data/valid_value.txt'

In [39]:
train_batch_dataset = get_batch_dataset(train_label_path, train_idx_path, train_value_path)
test_batch_dataset = get_batch_dataset(valid_label_path, valid_idx_path, valid_value_path)

In [46]:
%%time
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
for epoch in range(5):
    train_model(deep_fm, train_batch_dataset, optimizer, epoch)

Train Epoch:0, Step:0, Loss:2.897695
Train Epoch:0, Step:100, Loss:0.511630
Train Epoch:0, Step:200, Loss:0.522743
Train Epoch:0, Step:300, Loss:0.493887
Train Epoch:0, Step:400, Loss:0.509999
Train Epoch:0, Step:500, Loss:0.476522
Train Epoch:0, Step:600, Loss:0.486773
Train Epoch:0, Step:700, Loss:0.492809
Train Epoch:0, Step:800, Loss:0.489716
Train Epoch:0, Step:900, Loss:0.496157
Train Epoch:0, Step:1000, Loss:0.530928
Train Epoch:0, Step:1100, Loss:0.478121
Train Epoch:0, Step:1200, Loss:0.516640
Train Epoch:0, Step:1300, Loss:0.486139
Train Epoch:0, Step:1400, Loss:0.479957
Train Epoch:0, Step:1500, Loss:0.516011
Train Epoch:0, Step:1600, Loss:0.456608
Train Epoch:0, Step:1700, Loss:0.482456
Train Epoch:1, Step:0, Loss:0.482918
Train Epoch:1, Step:100, Loss:0.480385
Train Epoch:1, Step:200, Loss:0.458799
Train Epoch:1, Step:300, Loss:0.501760
Train Epoch:1, Step:400, Loss:0.472458
Train Epoch:1, Step:500, Loss:0.502384
Train Epoch:1, Step:600, Loss:0.479668
Train Epoch:1, Step:7

In [47]:
test_model(deep_fm, test_batch_dataset)

Roc AUC: 0.73534
LogLoss: 0.81725
