In [1]:
import os
import math
import datetime
import joblib
import logging

from tqdm import tqdm

import numpy as np
import pandas as pd

import paddle
from paddle.io import Dataset, DataLoader
import paddle.nn.functional as F

In [2]:
paddle.enable_static()

In [3]:
class StaticModel:
    def __init__(self):
        self.cost = None
        self.infer_target_var = None
        self._init_hyper_parameters()

    def _init_hyper_parameters(self):        
        self.sparse_feature_number = 88000000
        self.sparse_feature_dim = 9
        self.sparse_inputs_slots = 28
        self.learning_rate = 0.001
        self.fc_sizes = [512, 256, 128, 32]
        
    def create_feeds(self, is_infer=False):
        sparse_input_ids = [
            paddle.static.data(
                name="C" + str(i), shape=[None, 1], lod_level=1, dtype="int64")
            for i in range(1, self.sparse_inputs_slots)
        ]

        label = paddle.static.data(
            name="label", shape=[None, 1], dtype="int64"
        )

        feeds_list = [label] + sparse_input_ids
        
        return feeds_list

    def net(self, _input, is_infer=False):
        self.log_key = _input[0]
        self.label_input = _input[1]
        self.sparse_inputs = _input[2:self.sparse_inputs_slots]
        sparse_number = self.sparse_inputs_slots - 2

        def embedding_layer(_input):
            emb = paddle.fluid.layers.embedding(
                input=_input,
                is_sparse=True,
                is_distributed=False,
                size=[
                    self.sparse_feature_number, self.sparse_feature_dim
                ],
                param_attr=paddle.fluid.ParamAttr(
                    name="SparseFeatFactors",
                    initializer=paddle.fluid.initializer.Uniform()
                )
            )
            
            emb_sum = paddle.fluid.layers.sequence_pool(
                input=emb, 
                pool_type='sum'
            )
            
            return emb_sum

        sparse_embs = list(map(embedding_layer, self.sparse_inputs))

        dnn_model = StaticDNNLayer(
            self.sparse_feature_number, 
            self.sparse_feature_dim,
            sparse_number, 
            self.fc_sizes
        )

        pred = dnn_model.forward(sparse_embs)

        predict_2d = paddle.concat(x=[1 - pred, pred], axis=1)

        auc, batch_auc_var, _ = paddle.static.auc(
            input=predict_2d,
            label=self.label_input,
            slide_steps=0
        )
        self.inference_target_var = auc
        
        if is_infer:
            fetch_dict = {'log_key': self.log_key,'pred': pred}
            return fetch_dict

        cost = paddle.nn.functional.log_loss(
            input=pred, 
            label=paddle.cast(
                self.label_input, 
                dtype="float32"
            )
        )
        
        avg_cost = paddle.mean(x=cost)
        self._cost = avg_cost
        fetch_dict = {'cost': avg_cost, 'auc': auc}
        
        return fetch_dict

    def create_optimizer(self, strategy=None):
        optimizer = paddle.optimizer.Adam(
            learning_rate=self.learning_rate, 
            lazy_mode=True
        )
        
        if strategy != None:
            import paddle.distributed.fleet as fleet
            optimizer = fleet.distributed_optimizer(optimizer, strategy)
            
        optimizer.minimize(self._cost)

    def infer_net(self, _input):
        return self.net(_input, is_infer=True)

In [4]:
class StaticDNNLayer(paddle.nn.Layer):
    def __init__(self, sparse_feature_number, sparse_feature_dim, num_field, layer_sizes):
        super().__init__()
        
        self.sparse_feature_number = sparse_feature_number
        self.sparse_feature_dim = sparse_feature_dim
        self.num_field = num_field
        self.layer_sizes = layer_sizes

        sizes = [sparse_feature_dim * num_field 
                 ] + self.layer_sizes + [1]
        acts = ["relu" for _ in range(len(self.layer_sizes))] + [None]
        
        self._mlp_layers = []
        
        for i in range(len(layer_sizes) + 1):
            linear = paddle.nn.Linear(
                in_features=sizes[i],
                out_features=sizes[i + 1],
                weight_attr=paddle.ParamAttr(
                    initializer=paddle.nn.initializer.Normal(
                        std=1.0 / math.sqrt(sizes[i])
                    )
                )
            )
            
            self.add_sublayer('linear_%d' % i, linear)
            self._mlp_layers.append(linear)
            
            if acts[i] == 'relu':
                act = paddle.nn.ReLU()
                self.add_sublayer('act_%d' % i, act)
                self._mlp_layers.append(act)

    def forward(self, sparse_embs):
        y_dnn = paddle.concat(x=sparse_embs, axis=1)
        
        for n_layer in self._mlp_layers:
            y_dnn = n_layer(y_dnn)

        predict = F.sigmoid(y_dnn)
        
        return predict

In [5]:
def get_filepath(dir_path, list_name):
    for file in os.listdir(dir_path):
        file_path = os.path.join(dir_path, file)
        
        if os.path.isdir(file_path):
            get_filepath(file_path, list_name)
        else:
            list_name.append(file_path)
    
    return list_name

In [6]:
def get_file_list(data_path):
    assert os.path.exists(data_path)
    list_name = []
    file_list = get_filepath(data_path, list_name)

    print("File list: {}".format(file_list))
    
    return file_list

In [7]:
def get_reader(input_var):
    reader_type = "QueueDataset"
    train_data_path = "../data/data205411/2023-cvr-contest-data/train_data"
    
    assert train_data_path != ""

    assert reader_type in [
        "QueueDataset"
    ]
    
    file_list = get_file_list(train_data_path)
    print("train file_list: {}".format(file_list))
    
    reader_instance = Queue(input_var, file_list)
    return reader_instance.get_reader(), file_list

In [8]:
class Queue:
    def __init__(self, input_var, file_list):
        assert isinstance(input_var, list)
        assert len(file_list) > 0

        self.input_var = input_var
        self.file_list = file_list

        self.pipe_command = "python3 queuedataset_reader.py"
        self.train_reader = "criteo_reader"
        
        assert self.pipe_command != None
                        
        print("pipe_command is: {}".format(self.pipe_command))
        
        self.batch_size = 128
        assert self.batch_size >= 1
        
        self.thread_num = 1
        print("dataset init thread_num:", self.thread_num)
        assert self.thread_num >= 1

    def get_reader(self):
        print("Get Train Dataset")
        dataset = paddle.distributed.QueueDataset()
        dataset.init(
            use_var=self.input_var,
            pipe_command=self.pipe_command,
            batch_size=self.batch_size,
            thread_num=self.thread_num
        )
        print("dataset get_reader thread_num:", self.thread_num)
        dataset.set_filelist(self.file_list)
        return dataset

    def get_infer_reader(self):
        print("Get Infer Dataset")
        dataset = paddle.distributed.QueueDataset()
        self.infer_batch_size = 128
        self.infer_thread_num = self.thread_num
        dataset.init(
            use_var=self.input_var,
            pipe_command=self.pipe_command,
            batch_size=self.infer_batch_size,
            thread_num=self.infer_thread_num
        )
        print("dataset get_infer_reader thread_num:", self.infer_thread_num)
        dataset.set_filelist(self.file_list)
        return dataset

In [9]:
def dataset_train(epoch_id, dataset, fetch_vars, exe):
    fetch_info = [
        "Epoch {} Var {}".format(epoch_id, var_name) for var_name in fetch_vars
    ]
    
    fetch_vars = [var for _, var in fetch_vars.items()]
    print_interval = 50
    
    exe.train_from_dataset(
        program=paddle.static.default_main_program(),
        dataset=dataset,
        fetch_list=fetch_vars,
        fetch_info=fetch_info,
        print_period=print_interval,
        debug=False
    )

In [10]:
static_model_class = StaticModel()

In [11]:
input_data = static_model_class.create_feeds()
input_data

[var label : LOD_TENSOR.shape(-1, 1).dtype(int64).stop_gradient(True),
 var C1 : LOD_TENSOR.shape(-1, 1).dtype(int64).stop_gradient(True),
 var C2 : LOD_TENSOR.shape(-1, 1).dtype(int64).stop_gradient(True),
 var C3 : LOD_TENSOR.shape(-1, 1).dtype(int64).stop_gradient(True),
 var C4 : LOD_TENSOR.shape(-1, 1).dtype(int64).stop_gradient(True),
 var C5 : LOD_TENSOR.shape(-1, 1).dtype(int64).stop_gradient(True),
 var C6 : LOD_TENSOR.shape(-1, 1).dtype(int64).stop_gradient(True),
 var C7 : LOD_TENSOR.shape(-1, 1).dtype(int64).stop_gradient(True),
 var C8 : LOD_TENSOR.shape(-1, 1).dtype(int64).stop_gradient(True),
 var C9 : LOD_TENSOR.shape(-1, 1).dtype(int64).stop_gradient(True),
 var C10 : LOD_TENSOR.shape(-1, 1).dtype(int64).stop_gradient(True),
 var C11 : LOD_TENSOR.shape(-1, 1).dtype(int64).stop_gradient(True),
 var C12 : LOD_TENSOR.shape(-1, 1).dtype(int64).stop_gradient(True),
 var C13 : LOD_TENSOR.shape(-1, 1).dtype(int64).stop_gradient(True),
 var C14 : LOD_TENSOR.shape(-1, 1).dtype(

In [12]:
input_data_names = [data.name for data in input_data]
input_data_names

['label',
 'C1',
 'C2',
 'C3',
 'C4',
 'C5',
 'C6',
 'C7',
 'C8',
 'C9',
 'C10',
 'C11',
 'C12',
 'C13',
 'C14',
 'C15',
 'C16',
 'C17',
 'C18',
 'C19',
 'C20',
 'C21',
 'C22',
 'C23',
 'C24',
 'C25',
 'C26',
 'C27']

In [13]:
fetch_vars = static_model_class.net(input_data)

In [14]:
fetch_vars

{'cost': var mean_0.tmp_0 : LOD_TENSOR.shape(1,).dtype(float32).stop_gradient(False),
 'auc': var auc_0.tmp_0 : LOD_TENSOR.shape(1,).dtype(int64).stop_gradient(False)}

In [15]:
place = paddle.set_device("cpu")

In [16]:
static_model_class.create_optimizer()

In [17]:
exe = paddle.static.Executor(place)

In [18]:
exe.run(paddle.static.default_startup_program())

[]

In [19]:
last_epoch_id = -1
step_num = 0

In [20]:
dataset, file_list = get_reader(input_data)

File list: ['../data/data205411/2023-cvr-contest-data/train_data/file_27.txt', '../data/data205411/2023-cvr-contest-data/train_data/file_26.txt', '../data/data205411/2023-cvr-contest-data/train_data/file_18.txt', '../data/data205411/2023-cvr-contest-data/train_data/file_24.txt', '../data/data205411/2023-cvr-contest-data/train_data/file_30.txt', '../data/data205411/2023-cvr-contest-data/train_data/file_25.txt', '../data/data205411/2023-cvr-contest-data/train_data/file_19.txt', '../data/data205411/2023-cvr-contest-data/train_data/file_21.txt', '../data/data205411/2023-cvr-contest-data/train_data/file_09.txt', '../data/data205411/2023-cvr-contest-data/train_data/file_08.txt', '../data/data205411/2023-cvr-contest-data/train_data/file_20.txt', '../data/data205411/2023-cvr-contest-data/train_data/file_22.txt', '../data/data205411/2023-cvr-contest-data/train_data/file_23.txt', '../data/data205411/2023-cvr-contest-data/train_data/file_12.txt', '../data/data205411/2023-cvr-contest-data/train_da

In [21]:
for epoch_id in range(last_epoch_id + 1, 1):
    fetch_batch_var = dataset_train(epoch_id, dataset, fetch_vars, exe)

device worker program id: 4875610480


In [22]:
fetch_batch_var