In [1]:
import import_ipynb
from tower_layer import tower_layer
import os
from typing import List, Tuple, Any
import pandas as pd
import tensorflow as tf
from tensorflow import feature_column as fc

importing Jupyter notebook from tower_layer.ipynb


In [2]:
# 训练参数
model_dir = "model\\model_dir\\PLE" #Directory where model parameters, graph, etc are saved
output_dir = "model\\output_dir\\PLE" #Directory where pb file are saved

train_data = "E:/Deep Learning/dataset/wechat_bigdata/tfrecord/train.tfrecord" #Path to the train data
eval_data = "E:/Deep Learning/dataset/wechat_bigdata/tfrecord/test.tfrecord" #Path to the evaluation data
vocabulary_dir = "E:/Deep Learning/dataset/wechat_bigdata/vocabulary/" #Folder where the vocabulary file is stored
num_epochs = 10 #Epoch of training phase
train_steps = 10000 #Number of (global) training steps to perform
shuffle_buffer_size = 10000 #Dataset shuffle buffer size
num_parallel_readers = -1 #Number of parallel readers for training data
save_checkpoints_steps = 1000 #Save checkpoints every this many steps

# 模型参数
batch_size = 1024 #Training batch size
learning_rate = 0.005 #Learning rate
hidden_units = [512,256,128] #Comma-separated list of number of units in each hidden layer of the final output part
batch_norm = True, #Perform batch normalization (True or False)
dropout_rate = 0.1 #Dropout rate

expert_hidden_units = 256 #Expert module output dimension
num_tasks = 3 #Number of tasks, that's number of gates
task_names = ["read_comment","like","click_avatar"] #Comma-separated list of task names, each must be in keys of tfrecord file

num_extract_network = 1
num_experts_per_task = [5,5,5]
num_experts_in_shared = 10


In [3]:
# 返回一个 tuple 
def create_feature_columns() -> Tuple[list, list, list]:
    """
        生成模型输入特征和label
    Returns:
        dense_feature_columns (list): 连续特征的feature_columns
        category_feature_columns (list): 类别特征的feature_columns(包括序列特征)
        label_feature_columns (list): 因变量的feature_columns
    """

    dense_feature_columns, category_feature_columns, label_feature_columns = [], [], []

    # 连续特征
    videoplayseconds = fc.numeric_column('videoplayseconds', default_value=0.0)
    u_read_comment_7d_sum = fc.numeric_column('u_read_comment_7d_sum', default_value=0.0)
    u_like_7d_sum = fc.numeric_column('u_like_7d_sum', default_value=0.0)
    u_click_avatar_7d_sum = fc.numeric_column('u_click_avatar_7d_sum', default_value=0.0)
    u_forward_7d_sum = fc.numeric_column('u_forward_7d_sum', default_value=0.0)
    u_comment_7d_sum = fc.numeric_column('u_comment_7d_sum', default_value=0.0)
    u_follow_7d_sum = fc.numeric_column('u_follow_7d_sum', default_value=0.0)
    u_favorite_7d_sum = fc.numeric_column('u_favorite_7d_sum', default_value=0.0)

    i_read_comment_7d_sum = fc.numeric_column('i_read_comment_7d_sum', default_value=0.0)
    i_like_7d_sum = fc.numeric_column('i_like_7d_sum', default_value=0.0)
    i_click_avatar_7d_sum = fc.numeric_column('i_click_avatar_7d_sum', default_value=0.0)
    i_forward_7d_sum = fc.numeric_column('i_forward_7d_sum', default_value=0.0)
    i_comment_7d_sum = fc.numeric_column('i_comment_7d_sum', default_value=0.0)
    i_follow_7d_sum = fc.numeric_column('i_follow_7d_sum', default_value=0.0)
    i_favorite_7d_sum = fc.numeric_column('i_favorite_7d_sum', default_value=0.0)

    c_user_author_read_comment_7d_sum = fc.numeric_column('c_user_author_read_comment_7d_sum', default_value=0.0)

    dense_feature_columns += [videoplayseconds, u_read_comment_7d_sum, u_like_7d_sum, u_click_avatar_7d_sum,
                              u_forward_7d_sum, u_comment_7d_sum, u_follow_7d_sum, u_favorite_7d_sum,
                              i_read_comment_7d_sum, i_like_7d_sum, i_click_avatar_7d_sum, i_forward_7d_sum,
                              i_comment_7d_sum, i_follow_7d_sum, i_favorite_7d_sum,
                              c_user_author_read_comment_7d_sum]

    # 类别特征
    userid = fc.categorical_column_with_vocabulary_file('userid', os.path.join(vocabulary_dir, 'userid.txt'))
    feedid = fc.categorical_column_with_vocabulary_file('feedid', os.path.join(vocabulary_dir, 'feedid.txt'))
    device = fc.categorical_column_with_vocabulary_file('device', os.path.join(vocabulary_dir, 'device.txt'))
    authorid = fc.categorical_column_with_vocabulary_file('authorid', os.path.join(vocabulary_dir, 'authorid.txt'))
    bgm_song_id = fc.categorical_column_with_vocabulary_file('bgm_song_id', os.path.join(vocabulary_dir, 'bgm_song_id.txt'))
    bgm_singer_id = fc.categorical_column_with_vocabulary_file('bgm_singer_id', os.path.join(vocabulary_dir, 'bgm_singer_id.txt'))
    manual_tag_list = fc.categorical_column_with_vocabulary_file('manual_tag_list', os.path.join(vocabulary_dir, 'manual_tag_id.txt'))
    his_read_comment_7d_seq = fc.categorical_column_with_vocabulary_file('his_read_comment_7d_seq', os.path.join(vocabulary_dir, 'feedid.txt'))

    userid_emb = fc.embedding_column(userid, 16)
    feedid_emb = fc.embedding_column(feedid, 16, combiner='mean')
    device_emb = fc.embedding_column(device, 2)
    authorid_emb = fc.embedding_column(authorid, 4)
    bgm_song_id_emb = fc.embedding_column(bgm_song_id, 4)
    bgm_singer_id_emb = fc.embedding_column(bgm_singer_id, 4)
    manual_tag_id_emb = fc.embedding_column(manual_tag_list, 4, combiner='mean')

    category_feature_columns += [userid_emb, device_emb, authorid_emb, bgm_song_id_emb, bgm_singer_id_emb, manual_tag_id_emb]
    category_feature_columns += [feedid_emb]  # feedid_emb是list
    
    # label
    label_feature_columns += [fc.numeric_column(task_name, default_value=0.0) for task_name in task_names]
    
    return dense_feature_columns, category_feature_columns, label_feature_columns

In [4]:
def example_parser(serialized_example):
    """
        批量解析Example
    Args:
        serialized_example:

    Returns:
        features, labels
    """
    fea_columns = total_feature_columns
    label_columns = label_feature_columns
   
    feature_spec = tf.feature_column.make_parse_example_spec(fea_columns + label_columns)
    features = tf.io.parse_example(serialized_example, features=feature_spec)

    labels = {task_name: features.pop(task_name) for task_name in task_names}

    return features, labels

def train_input_fn(filepath, example_parser, batch_size, num_epochs, shuffle_buffer_size):
    """
        mmoe模型的input_fn
    Args:
        filepath (str): 训练集/验证集的路径
        example_parser (function): 解析example的函数
        batch_size (int): 每个batch样本大小
        num_epochs (int): 训练轮数
        shuffle_buffer_size (inr): shuffle时buffer的大小

    Returns:
        dataset
    """

    dataset = tf.data.TFRecordDataset(filepath)
    if shuffle_buffer_size > 0:
        dataset = dataset.shuffle(shuffle_buffer_size)
    dataset = dataset.repeat(num_epochs)
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(example_parser, num_parallel_calls=tf.data.experimental.AUTOTUNE)
    dataset = dataset.prefetch(1)

    return dataset

def eval_input_fn(filepath, example_parser, batch_size):
    """
        mmoe模型的eval阶段input_fn
    Args:
        filepath (str): 训练集/验证集的路径
        example_parser (function): 解析example的函数
        batch_size (int): 每个batch样本大小

    Returns:
        dataset
    """

    dataset = tf.data.TFRecordDataset(filepath)
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(example_parser, num_parallel_calls=tf.data.experimental.AUTOTUNE)
    dataset = dataset.prefetch(1)

    return dataset

In [5]:
import tensorflow as tf


def extraction_network(input, task_names, num_experts_per_task, num_experts_in_shared, expert_hidden_units, name):
    """
        实现PLE模型中的extraction_network模块
    Args:
        input (tf.Tensor):  输入, shape=(B, input_dim)
        task_names (list): 每个task的名字组合的list
        num_experts_per_task (list): 每个任务的task专家个数
        num_experts_in_shared (list): shared专家个数
        expert_hidden_units (int): 专家维度
        name (str): 传入tf.variable_scope的参数
    Returns:
        tf.Tensor, 输出, shape=(B, output_dim)
    """

    # 最终输出
    final_output = []

    # 所有task专家和shared专家
    all_experts = []

    with tf.compat.v1.variable_scope(name):
        # shared专家网络输出列表
        shared_specific_experts = [tf.compat.v1.layers.dense(input,
                                                   expert_hidden_units,
                                                   activation=tf.nn.relu,
                                                   name=f"shared_expert_{i}") for i in range(num_experts_in_shared)]
        # (B, expert_hidden_units) * num_experts_in_shared

        shared_specific_experts = [e[:, tf.newaxis, :] for e in shared_specific_experts]    # (B, 1, expert_hidden_units) * num_experts_in_shared
        shared_specific_experts = tf.concat(shared_specific_experts, axis=1)    # (B, num_experts_in_shared, expert_hidden_units)

        # 遍历每个任务的task专家数量, 与share专家组合
        for task_name, num_experts_in_task in zip(task_names, num_experts_per_task):
            # 每个任务的task专家网络输出列表
            task_specific_experts = [tf.compat.v1.layers.dense(input,
                                                     expert_hidden_units,
                                                     activation=tf.nn.relu,
                                                     name=f"task_specific_expert_{task_name}_{i}") for i in
                                                          range(num_experts_in_task)]
            # (B, expert_hidden_units) * num_experts_in_task

            task_specific_experts = [e[:, tf.newaxis, :] for e in task_specific_experts]  # (B, 1, expert_hidden_units) * num_experts_in_task
            task_specific_experts = tf.concat(task_specific_experts, axis=1)  # (B, num_experts_in_task, expert_hidden_units)
            # 放入all_experts内, 后面用
            all_experts.append(task_specific_experts)

            combined_experts = tf.concat([task_specific_experts, shared_specific_experts], axis=1)
            # (B, num_experts_in_task+num_experts_in_shared, expert_hidden_units)

            # 门输出
            gate = tf.compat.v1.layers.dense(input,
                                   num_experts_in_task+num_experts_in_shared,
                                   activation=tf.nn.softmax,
                                   use_bias=False,  # 论文中省略了bias
                                   name=f"gate_{task_name}")
            # (B, num_experts_in_task+num_experts_in_shared)

            gate = tf.expand_dims(gate, axis=-1)  # (B, num_experts_in_task+num_experts_in_shared, 1)
            # 组合专家网络输出和每一个任务相关的门输出做矩阵乘法
            task_output = tf.matmul(combined_experts, gate, transpose_a=True)  # (B, expert_hidden_units, 1)
            task_output = tf.squeeze(task_output, axis=-1)  # (B, expert_hidden_units)
            final_output.append(task_output)

        # 所有专家组合
        all_experts.append(shared_specific_experts)
        all_experts = tf.concat(all_experts, axis=1)    # (B, sum(num_experts_per_task)+num_experts_in_shared, expert_hidden_units)
        # 所有专家对应的门输出
        all_gate = tf.compat.v1.layers.dense(input,
                                   sum(num_experts_per_task)+num_experts_in_shared,
                                   activation=tf.nn.softmax,
                                   use_bias=False,  # 论文中省略了bias
                                   name=f"all_gate")
        # (B, sum(num_experts_per_task)+num_experts_in_shared)
        all_gate = tf.expand_dims(all_gate, axis=-1)    # (B, sum(num_experts_per_task)+num_experts_in_shared), 1)
        # 所有专家组合网络输出和对应门输出做矩阵乘法
        all_output = tf.matmul(all_experts, all_gate, transpose_a=True)  # (B, expert_hidden_units, 1)
        all_output = tf.squeeze(all_output, axis=-1)    # (B, expert_hidden_units)
        # 添加到最终输出
        final_output.append(all_output)

    return tf.add_n(final_output)   # (B, expert_hidden_units)

In [6]:
global total_feature_columns, label_feature_columns
dense_feature_columns, category_feature_columns, label_feature_columns = create_feature_columns()

total_feature_columns = dense_feature_columns + category_feature_columns

dataset = train_input_fn('E:/Deep Learning/dataset/wechat_bigdata/tfrecord/train.tfrecord',
                        example_parser,
                        batch_size,
                        num_epochs,
                        shuffle_buffer_size
                        )

one_element = tf.compat.v1.data.make_one_shot_iterator(dataset).get_next()

one_element



INFO:tensorflow:vocabulary_size = 19626 in userid is inferred from the number of elements in the vocabulary_file E:/Deep Learning/dataset/wechat_bigdata/vocabulary/userid.txt.
INFO:tensorflow:vocabulary_size = 106444 in feedid is inferred from the number of elements in the vocabulary_file E:/Deep Learning/dataset/wechat_bigdata/vocabulary/feedid.txt.
INFO:tensorflow:vocabulary_size = 2 in device is inferred from the number of elements in the vocabulary_file E:/Deep Learning/dataset/wechat_bigdata/vocabulary/device.txt.
INFO:tensorflow:vocabulary_size = 18789 in authorid is inferred from the number of elements in the vocabulary_file E:/Deep Learning/dataset/wechat_bigdata/vocabulary/authorid.txt.
INFO:tensorflow:vocabulary_size = 25159 in bgm_song_id is inferred from the number of elements in the vocabulary_file E:/Deep Learning/dataset/wechat_bigdata/vocabulary/bgm_song_id.txt.
INFO:tensorflow:vocabulary_size = 17500 in bgm_singer_id is inferred from the number of elements in the vocab

({'authorid': <tensorflow.python.framework.sparse_tensor.SparseTensor at 0x1f3415562e0>,
  'bgm_singer_id': <tensorflow.python.framework.sparse_tensor.SparseTensor at 0x1f341556160>,
  'bgm_song_id': <tensorflow.python.framework.sparse_tensor.SparseTensor at 0x1f3415560a0>,
  'device': <tensorflow.python.framework.sparse_tensor.SparseTensor at 0x1f3415569d0>,
  'feedid': <tensorflow.python.framework.sparse_tensor.SparseTensor at 0x1f341556ee0>,
  'manual_tag_list': <tensorflow.python.framework.sparse_tensor.SparseTensor at 0x1f3415756d0>,
  'userid': <tensorflow.python.framework.sparse_tensor.SparseTensor at 0x1f3415753a0>,
  'c_user_author_read_comment_7d_sum': <tf.Tensor: shape=(1024, 1), dtype=float32, numpy=
  array([[0.],
         [0.],
         [0.],
         ...,
         [0.],
         [0.],
         [0.]], dtype=float32)>,
  'i_click_avatar_7d_sum': <tf.Tensor: shape=(1024, 1), dtype=float32, numpy=
  array([[1.3862944],
         [0.6931472],
         [0.6931472],
         ...

In [7]:

def ple_model_fn(features, labels, mode, params):
    """
    ple模型的model_fn
    Args:
        features (dict): input_fn的第一个返回值, 模型输入样本特征
        labels (dict): input_fn的第二个返回值, 样本标签
        mode: tf.estimator.ModeKeys
        params (dict): 模型超参数

    Returns:
        tf.estimator.EstimatorSpec
    """

    # 连续特征
    with tf.compat.v1.variable_scope("dense_input"):
        dense_input = tf.compat.v1.feature_column.input_layer(features, params["dense_feature_columns"])

    
    print(dense_input)
    # 类别特征
    with tf.compat.v1.variable_scope("category_input"):
        category_input = tf.compat.v1.feature_column.input_layer(features, params["category_feature_columns"])

    # concat all
    concat_all_input = tf.concat([dense_input, category_input], axis=-1)
    
    
    # extract network
    input = concat_all_input
    for i in range(params["num_extract_network"]):
        extract_network_output = extraction_network(input=input,
                                                    task_names=params["task_names"],
                                                    num_experts_per_task=params["num_experts_per_task"],
                                                    num_experts_in_shared=params["num_experts_in_shared"],
                                                    expert_hidden_units=params["expert_hidden_units"],
                                                    name=f"extract_network_{i}")
        input = extract_network_output

    # 进入task tower的最后一层
    task_tower_inputs = []   # 存放最后任务塔的输入
    # shared experts
    with tf.compat.v1.variable_scope("shared_experts_final"):
        # shared专家网络输出列表
        shared_specific_experts_final = [tf.compat.v1.layers.dense(input,
                                                         params["expert_hidden_units"],
                                                         activation=tf.nn.relu,
                                                         name=f"shared_expert_final_{i}") for i in range(params["num_experts_in_shared"])]
        # (B, expert_hidden_units) * num_experts_in_shared

        shared_specific_experts_final = [e[:, tf.newaxis, :] for e in
                                         shared_specific_experts_final]  # (B, 1, expert_hidden_units) * num_experts_in_shared
        shared_specific_experts_final = tf.concat(shared_specific_experts_final,
                                            axis=1)  # (B, num_experts_in_shared, expert_hidden_units)

    # task specific experts
    with tf.compat.v1.variable_scope("task_specific_experts_final"):
        # 每个任务的task专家网络输出列表
        for task_name, num_experts_in_task in zip(params["task_names"], params["num_experts_per_task"]):
            task_specific_experts_final = [tf.compat.v1.layers.dense(input,
                                                           params["expert_hidden_units"],
                                                           activation=tf.nn.relu,
                                                           name=f"task_specific_expert_final_{task_name}_{i}") for i in
                                                                range(num_experts_in_task)]
            # (B, expert_hidden_units) * num_experts_in_task

            task_specific_experts_final = [e[:, tf.newaxis, :] for e in task_specific_experts_final]  # (B, 1, expert_hidden_units) * num_experts_in_task
            task_specific_experts_final = tf.concat(task_specific_experts_final, axis=1)  # (B, num_experts_in_task, expert_hidden_units)

            # 与shared专家组合
            combined_experts = tf.concat([task_specific_experts_final, shared_specific_experts_final], axis=1)
            # (B, num_experts_in_task+num_experts_in_shared, expert_hidden_units)
            with tf.compat.v1.variable_scope("task_gate_final"):
                gate = tf.compat.v1.layers.dense(input,
                                       num_experts_in_task + params["num_experts_in_shared"],
                                       activation=tf.nn.softmax,
                                       use_bias=False,  # 论文中省略了bias
                                       name=f"gate_final_{task_name}")
                # (B, num_experts_in_task+num_experts_in_shared)
                gate = tf.expand_dims(gate, axis=-1)  # (B, num_experts_in_task+num_experts_in_shared, 1)
                # 组合专家网络输出和每一个任务相关的门输出做矩阵乘法, 结果作为任务tower的输入
                task_tower_input = tf.matmul(combined_experts, gate, transpose_a=True)  # (B, expert_hidden_units, 1)
                task_tower_input = tf.squeeze(task_tower_input, axis=-1)  # (B, expert_hidden_units)
                task_tower_inputs.append(task_tower_input)  # (B, expert_hidden_units) * num_task

    # 任务塔
    with tf.compat.v1.variable_scope("tower"):
        logit_list = [tower_layer(x=x,
                                  hidden_units=params["hidden_units"],
                                  mode=mode,
                                  batch_norm=params["batch_norm"],
                                  dropout_rate=params["dropout_rate"],
                                  name=task_name) for x, task_name in zip(task_tower_inputs, params["task_names"])]
        # (B, 1) * num_tasks

        
    # -----定义PREDICT阶段行为-----
    prediction_list = [tf.sigmoid(ple_output, name=f"prediction_{task_name}") for ple_output, task_name in zip(logit_list, params["task_names"])]

    
    if mode == tf.estimator.ModeKeys.PREDICT:
        predictions = {
            f"{task_name}_probabilities": prediction for task_name, prediction in zip(task_names, prediction_list)
        }
        export_outputs = {
            'prediction': tf.estimator.export.PredictOutput(predictions)
        }
        return tf.estimator.EstimatorSpec(mode, predictions=predictions, export_outputs=export_outputs)
    # -----定义完毕-----

    losses = [tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(labels=labels[task_name], logits=logit), name=f"loss_{task_name}")
              for logit, task_name in zip(logit_list, task_names)]
    total_loss = tf.add_n(losses)

    accuracy_list = [tf.compat.v1.metrics.accuracy(labels=labels[task_name], predictions=tf.compat.v1.to_float(tf.greater_equal(prediction, 0.5)))
                     for task_name, prediction in zip(task_names, prediction_list)]
    auc_list = [tf.compat.v1.metrics.auc(labels=labels[task_name], predictions=prediction)
                for task_name, prediction in zip(task_names, prediction_list)]

    # -----定义EVAL阶段行为-----
    auc_metrics = {f"eval_{task_name}_auc": auc for task_name, auc in zip(task_names, auc_list)}
    accuracy_metrics = {f"eval_{task_name}_accuracy": accuracy for task_name, accuracy in zip(task_names, accuracy_list)}
    if mode == tf.estimator.ModeKeys.EVAL:
        return tf.estimator.EstimatorSpec(mode, loss=total_loss, eval_metric_ops={**accuracy_metrics, **auc_metrics})
    # -----定义完毕-----

    optimizer = tf.compat.v1.train.AdamOptimizer(learning_rate=params["learning_rate"], beta1=0.9,
                                       beta2=0.999, epsilon=1e-8)
    update_ops = tf.compat.v1.get_collection(tf.compat.v1.GraphKeys.UPDATE_OPS)
    with tf.control_dependencies(update_ops):
        train_op = optimizer.minimize(loss=total_loss, global_step=tf.compat.v1.train.get_global_step())

    # -----定义TRAIN阶段行为-----
    assert mode == tf.estimator.ModeKeys.TRAIN

    # tensorboard收集
    for task_name, auc in zip(task_names, auc_list):
        tf.summary.scalar(f"train_{task_name}_auc", auc[1])
    for task_name, accuracy in zip(task_names, accuracy_list):
        tf.summary.scalar(f"train_{task_name}_accuracy", accuracy[1])

    # 训练log打印
    # 观测loss
    loss_log = {f"train_{task_name}_loss": loss for task_name, loss in zip(task_names, losses)}
    # 观测训练auc
    auc_log = {f"train_{task_name}_auc": auc[1] for task_name, auc in zip(task_names, auc_list)}
    # 观测gate输出
    # gate_log = {f"{task_name}_gate_expert_weight": gate for task_name, gate, in zip(task_names, gates)}

    loss_log_hook = tf.compat.v1.train.LoggingTensorHook(
        loss_log,
        every_n_iter=100
    )
    auc_log_hook = tf.compat.v1.train.LoggingTensorHook(
        auc_log,
        every_n_iter=100
    )
#     gate_log_hook = tf.compat.v1.train.LoggingTensorHook(
#         gate_log,
#         every_n_iter=100
#     )
    return tf.estimator.EstimatorSpec(mode, loss=total_loss, train_op=train_op,
                                      training_hooks=[loss_log_hook, auc_log_hook])
    # -----定义完毕-----


In [8]:

"""训练入口"""

global total_feature_columns, label_feature_columns
dense_feature_columns, category_feature_columns, label_feature_columns = create_feature_columns()
total_feature_columns = dense_feature_columns + category_feature_columns

params = {
             "dense_feature_columns": dense_feature_columns,
             "category_feature_columns": category_feature_columns,
             "hidden_units": hidden_units,
             "dropout_rate": dropout_rate,
             "batch_norm": batch_norm,
             "learning_rate": learning_rate,
             # "num_experts": num_experts,
             "num_tasks": num_tasks,
             "expert_hidden_units": expert_hidden_units,
             "task_names": task_names,
             "num_extract_network": num_extract_network,
             "num_experts_per_task": [int(x) for x in num_experts_per_task],
             "num_experts_in_shared": num_experts_in_shared,
         }
# print(params)

# 任务数要和任务名列表长度一致
assert params["num_tasks"] == len(params["task_names"]), "num_tasks must equals length of task_names"

INFO:tensorflow:vocabulary_size = 19626 in userid is inferred from the number of elements in the vocabulary_file E:/Deep Learning/dataset/wechat_bigdata/vocabulary/userid.txt.
INFO:tensorflow:vocabulary_size = 106444 in feedid is inferred from the number of elements in the vocabulary_file E:/Deep Learning/dataset/wechat_bigdata/vocabulary/feedid.txt.
INFO:tensorflow:vocabulary_size = 2 in device is inferred from the number of elements in the vocabulary_file E:/Deep Learning/dataset/wechat_bigdata/vocabulary/device.txt.
INFO:tensorflow:vocabulary_size = 18789 in authorid is inferred from the number of elements in the vocabulary_file E:/Deep Learning/dataset/wechat_bigdata/vocabulary/authorid.txt.
INFO:tensorflow:vocabulary_size = 25159 in bgm_song_id is inferred from the number of elements in the vocabulary_file E:/Deep Learning/dataset/wechat_bigdata/vocabulary/bgm_song_id.txt.
INFO:tensorflow:vocabulary_size = 17500 in bgm_singer_id is inferred from the number of elements in the vocab

In [9]:
estimator = tf.estimator.Estimator(
    model_fn=ple_model_fn,
    params=params,
    config=tf.estimator.RunConfig(model_dir=model_dir, save_checkpoints_steps=save_checkpoints_steps)
)

train_spec = tf.estimator.TrainSpec(
    input_fn=lambda: train_input_fn(filepath=train_data, example_parser=example_parser,
                                    batch_size=batch_size, num_epochs=num_epochs,
                                    shuffle_buffer_size=shuffle_buffer_size),
    max_steps=train_steps
)

feature_spec = tf.feature_column.make_parse_example_spec(total_feature_columns)
serving_input_receiver_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(feature_spec)
exporters = [
    tf.estimator.BestExporter(
        name="best_exporter",
        serving_input_receiver_fn=serving_input_receiver_fn,
        exports_to_keep=5)
]

eval_spec = tf.estimator.EvalSpec(
    input_fn=lambda: eval_input_fn(filepath=eval_data, example_parser=example_parser,
                                   batch_size=batch_size),
    throttle_secs=600,
    steps=None,
    exporters=exporters
)

tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)

INFO:tensorflow:Using config: {'_model_dir': 'model\\model_dir\\PLE', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': 1000, '_save_checkpoints_secs': None, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': None, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_checkpoint_save_graph_def': True, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1}
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Running training and evaluation locally (non-di

  shared_specific_experts = [tf.compat.v1.layers.dense(input,
  return layer.apply(inputs)
  task_specific_experts = [tf.compat.v1.layers.dense(input,
  gate = tf.compat.v1.layers.dense(input,
  all_gate = tf.compat.v1.layers.dense(input,
  shared_specific_experts_final = [tf.compat.v1.layers.dense(input,
  task_specific_experts_final = [tf.compat.v1.layers.dense(input,
  gate = tf.compat.v1.layers.dense(input,


Instructions for updating:
Colocations handled automatically by placer.


  return layer.apply(inputs, training=training)
  return layer.apply(inputs, training=training)


Instructions for updating:
Use `tf.cast` instead.
Instructions for updating:
The value of AUC returned by this may race with the update so this is deprecated. Please use tf.keras.metrics.AUC instead.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Saving checkpoints for 0 into model\model_dir\PLE\model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
INFO:tensorflow:loss = 2.735826, step = 0
INFO:tensorflow:train_click_avatar_loss = 0.933015, train_like_loss = 0.9798341, train_read_comment_loss = 0.82297695
INFO:tensorflow:train_click_avatar_auc = 0.82681024, train_like_auc = 0.4157327, train_read_comment_auc = 0.6769942
INFO:tensorflow:global_step/sec: 3.3537
INFO:tensorflow:loss = 0.24683607, step = 100 (29.818 

INFO:tensorflow:loss = 0.2415678, step = 1200 (30.711 sec)
INFO:tensorflow:train_click_avatar_loss = 0.05115383, train_like_loss = 0.08327815, train_read_comment_loss = 0.10713582 (30.710 sec)
INFO:tensorflow:train_click_avatar_auc = 0.717052, train_like_auc = 0.7402016, train_read_comment_auc = 0.87692577 (30.711 sec)
INFO:tensorflow:global_step/sec: 3.30946
INFO:tensorflow:loss = 0.22800916, step = 1300 (30.216 sec)
INFO:tensorflow:train_click_avatar_loss = 0.03659078, train_like_loss = 0.06970894, train_read_comment_loss = 0.121709436 (30.217 sec)
INFO:tensorflow:train_click_avatar_auc = 0.72569597, train_like_auc = 0.7439571, train_read_comment_auc = 0.8759414 (30.217 sec)
INFO:tensorflow:global_step/sec: 3.30156
INFO:tensorflow:loss = 0.22917391, step = 1400 (30.289 sec)
INFO:tensorflow:train_click_avatar_loss = 0.031449564, train_like_loss = 0.10078922, train_read_comment_loss = 0.09693514 (30.288 sec)
INFO:tensorflow:train_click_avatar_auc = 0.733451, train_like_auc = 0.7552982,

INFO:tensorflow:Signatures INCLUDED in export for Regress: None
INFO:tensorflow:Signatures INCLUDED in export for Predict: ['prediction', 'serving_default']
INFO:tensorflow:Signatures INCLUDED in export for Train: None
INFO:tensorflow:Signatures INCLUDED in export for Eval: None
INFO:tensorflow:Restoring parameters from model\model_dir\PLE\model.ckpt-3000
INFO:tensorflow:Assets added to graph.
INFO:tensorflow:Assets written to: model\model_dir\PLE\export\best_exporter\temp-1680422913\assets
INFO:tensorflow:SavedModel written to: model\model_dir\PLE\export\best_exporter\temp-1680422913\saved_model.pb
INFO:tensorflow:global_step/sec: 1.20422
INFO:tensorflow:loss = 0.20974736, step = 3000 (83.044 sec)
INFO:tensorflow:train_click_avatar_loss = 0.0139308665, train_like_loss = 0.10329427, train_read_comment_loss = 0.092522226 (83.042 sec)
INFO:tensorflow:train_click_avatar_auc = 0.76506364, train_like_auc = 0.77761084, train_read_comment_auc = 0.89514816 (83.043 sec)
INFO:tensorflow:global_s

INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 5000...
INFO:tensorflow:Saving checkpoints for 5000 into model\model_dir\PLE\model.ckpt.
Instructions for updating:
Use standard file APIs to delete files with this prefix.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 5000...
INFO:tensorflow:Calling model_fn.
Tensor("dense_input/input_layer/concat:0", shape=(None, 16), dtype=float32)
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Starting evaluation at 2023-04-02T16:19:00
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from model\model_dir\PLE\model.ckpt-5000
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Inference Time : 46.29241s
INFO:tensorflow:Finished evaluation at 2023-04-02-16:19:47
INFO:tensorflow:Saving dict for global step 5000: eval_click_avatar_accuracy = 0.9923781, eval_click_avatar_auc = 0.7712782, eval_like_accuracy = 0.97352535, eval_like_auc 

KeyboardInterrupt: 

In [10]:
# Evaluate Metrics.
metrics = estimator.evaluate(input_fn=lambda: eval_input_fn(filepath=eval_data, example_parser=example_parser,
                                                            batch_size=batch_size))


INFO:tensorflow:Calling model_fn.
Tensor("dense_input/input_layer/concat:0", shape=(None, 16), dtype=float32)


  shared_specific_experts = [tf.compat.v1.layers.dense(input,
  task_specific_experts = [tf.compat.v1.layers.dense(input,
  gate = tf.compat.v1.layers.dense(input,
  all_gate = tf.compat.v1.layers.dense(input,
  shared_specific_experts_final = [tf.compat.v1.layers.dense(input,
  task_specific_experts_final = [tf.compat.v1.layers.dense(input,
  gate = tf.compat.v1.layers.dense(input,


INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Starting evaluation at 2023-04-02T19:57:43
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from model\model_dir\PLE\model.ckpt-5000
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Inference Time : 36.05905s
INFO:tensorflow:Finished evaluation at 2023-04-02-19:58:19
INFO:tensorflow:Saving dict for global step 5000: eval_click_avatar_accuracy = 0.9923781, eval_click_avatar_auc = 0.7712782, eval_like_accuracy = 0.97352535, eval_like_auc = 0.7854846, eval_read_comment_accuracy = 0.9669215, eval_read_comment_auc = 0.90731907, global_step = 5000, loss = 0.24259779
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 5000: model\model_dir\PLE\model.ckpt-5000


In [14]:
for key in sorted(metrics):
    print('%s: %s' % (key, metrics[key]))

results = estimator.predict(input_fn=lambda: eval_input_fn(filepath=eval_data, example_parser=example_parser,
                                                           batch_size=batch_size))


eval_click_avatar_accuracy: 0.9923781
eval_click_avatar_auc: 0.7712782
eval_like_accuracy: 0.97352535
eval_like_auc: 0.7854846
eval_read_comment_accuracy: 0.9669215
eval_read_comment_auc: 0.90731907
global_step: 5000
loss: 0.24259779
INFO:tensorflow:Calling model_fn.
Tensor("dense_input/input_layer/concat:0", shape=(None, 16), dtype=float32)


  shared_specific_experts = [tf.compat.v1.layers.dense(input,
  task_specific_experts = [tf.compat.v1.layers.dense(input,
  gate = tf.compat.v1.layers.dense(input,
  all_gate = tf.compat.v1.layers.dense(input,
  shared_specific_experts_final = [tf.compat.v1.layers.dense(input,
  task_specific_experts_final = [tf.compat.v1.layers.dense(input,
  gate = tf.compat.v1.layers.dense(input,


INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from model\model_dir\PLE\model.ckpt-5000
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
<generator object Estimator.predict at 0x000001F34A352740>


In [18]:

predicts_df = pd.DataFrame.from_dict(results)
print(type(predicts_df))
for task_name in params["task_names"]:
    predicts_df[f"{task_name}_probabilities"] = predicts_df[f"{task_name}_probabilities"].apply(lambda x: x[0])
# test_df = pd.read_csv("../../dataset/wechat_algo_data1/dataframe/test.csv")
# for task_name in params["task_names"]:
#     predicts_df[task_name] = test_df[task_name]
# predicts_df.to_csv("predictions.csv")
# print("after evaluate")

<class 'pandas.core.frame.DataFrame'>


KeyError: 'read_comment_probabilities'