In [1]:
import torch
from torch.utils.data import Dataset
import torch.utils.data as D
from torch import nn
import pandas as pd
import numpy as np
import copy
import os
from sklearn.metrics import roc_auc_score,log_loss
from tqdm import tqdm

In [2]:
#参数配置

config = {
    "train_csv":'train_noseq.csv',
    "valid_csv":'valid_noseq.csv',
    "test_csv":'test_noseq.csv',
    "sparse_cols":['user_id','item_id','item_type','dayofweek','is_workday','city','county',
                  'town','village','lbs_city','lbs_district','hardware_platform','hardware_ischarging',
                  'os_type','network_type','position'],
    "dense_cols" : ['item_expo_1d','item_expo_7d','item_expo_14d','item_expo_30d','item_clk_1d',
                   'item_clk_7d','item_clk_14d','item_clk_30d','use_duration'],
    "label_col":['click','scroll'],
    "num_task":2,
    "debug_mode" : True,
    "epoch" : 10,
    "batch_size" : 512,
    "num_workers":0,
    "lr" : 0.001,
    "device" : 0,
}

In [3]:
class MultiTaskDataset(Dataset):
    def __init__(self,config,df,enc_dict=None):
        self.config = config
        self.df = df
        self.enc_dict = enc_dict
        for idx, col in enumerate(self.config['label_col']):
            self.df = self.df.rename(columns={col :f'task{idx + 1}_label'})
        self.dense_cols = list(set(self.config['dense_cols']))
        self.sparse_cols = list(set(self.config['sparse_cols']))
        self.feature_name = self.dense_cols+self.sparse_cols

        #数据编码
        if self.enc_dict == None:
            self.get_enc_dict()
        self.enc_data()
    def get_enc_dict(self):
        #计算enc_dict
        self.enc_dict = dict(zip( list(self.dense_cols+self.sparse_cols),[dict() for _ in range(len(self.dense_cols+self.sparse_cols))]))
        for f in self.sparse_cols:
            self.df[f] = self.df[f].astype('str')
            map_dict = dict(zip(self.df[f].unique(), range(self.df[f].nunique())))
            self.enc_dict[f] = map_dict
            self.enc_dict[f]['vocab_size'] = self.df[f].nunique()+1

        for f in self.dense_cols:
            self.enc_dict[f]['min'] = self.df[f].min()
            self.enc_dict[f]['max'] = self.df[f].max()

        return self.enc_dict

    def enc_dense_data(self,col):
        return (self.df[col] - self.enc_dict[col]['min']) / (self.enc_dict[col]['max'] - self.enc_dict[col]['min'])

    def enc_sparse_data(self,col):
        return self.df[col].apply(lambda x : self.enc_dict[col].get(x,0))

    def enc_data(self):
        #使用enc_dict对数据进行编码
        self.enc_df = copy.deepcopy(self.df)

        for col in self.dense_cols:
            self.enc_df[col] = self.enc_dense_data(col)
        for col in self.sparse_cols:
            self.enc_df[col] = self.enc_sparse_data(col)

    def __getitem__(self, index):
        data = dict()
        for col in self.feature_name:
            if col in self.dense_cols:
                data[col] = torch.Tensor([self.enc_df[col].iloc[index]]).squeeze(-1)
            elif col in self.sparse_cols:
                data[col] = torch.Tensor([self.enc_df[col].iloc[index]]).long().squeeze(-1)
        for idx, col in enumerate(self.config['label_col']):
            data[f'task{idx + 1}_label'] = torch.Tensor([self.enc_df[f'task{idx + 1}_label'].iloc[index]]).squeeze(-1)
        return data

    def __len__(self):
        return len(self.enc_df)

In [4]:
train_df = pd.read_csv(config['train_csv'])
valid_df = pd.read_csv(config['valid_csv'])
test_df = pd.read_csv(config['test_csv'])

train_dataset = MultiTaskDataset(config,train_df)
enc_dict = train_dataset.get_enc_dict()
valid_dataset = MultiTaskDataset(config, valid_df,enc_dict=enc_dict)
test_dataset = MultiTaskDataset(config, test_df,enc_dict=enc_dict)

In [5]:
train_loader = D.DataLoader(train_dataset,batch_size=config['batch_size'],shuffle=True,num_workers=config['num_workers'])
valid_loader = D.DataLoader(valid_dataset,batch_size=config['batch_size'],shuffle=False,num_workers=config['num_workers'])
test_loader = D.DataLoader(test_dataset,batch_size=config['batch_size'],shuffle=False,num_workers=config['num_workers'])

In [6]:
train_dataset.__getitem__(0)

{'item_clk_7d': tensor(0.8888),
 'item_expo_30d': tensor(0.8498),
 'item_expo_1d': tensor(0.9155),
 'use_duration': tensor(0.0001),
 'item_clk_14d': tensor(0.8849),
 'item_clk_30d': tensor(0.8725),
 'item_expo_7d': tensor(0.8427),
 'item_expo_14d': tensor(0.8571),
 'item_clk_1d': tensor(0.9414),
 'item_id': tensor(0),
 'os_type': tensor(0),
 'town': tensor(0),
 'lbs_district': tensor(0),
 'hardware_ischarging': tensor(0),
 'network_type': tensor(0),
 'position': tensor(0),
 'lbs_city': tensor(0),
 'county': tensor(0),
 'city': tensor(0),
 'dayofweek': tensor(0),
 'is_workday': tensor(0),
 'item_type': tensor(0),
 'hardware_platform': tensor(0),
 'user_id': tensor(0),
 'village': tensor(0),
 'task1_label': tensor(0.),
 'task2_label': tensor(0.)}

In [7]:
#基本网络模块

#通用Emb
class EmbeddingLayer(nn.Module):
    def __init__(self,
                 enc_dict = None,
                 embedding_dim = None):
        super(EmbeddingLayer, self).__init__()
        self.enc_dict = enc_dict
        self.embedding_dim = embedding_dim
        self.embedding_layer = nn.ModuleDict()

        self.emb_feature = []

        for col in self.enc_dict.keys():
            if 'vocab_size' in self.enc_dict[col].keys():
                self.emb_feature.append(col)
                self.embedding_layer.update({col : nn.Embedding(
                    self.enc_dict[col]['vocab_size'],
                    self.embedding_dim,
                )})

    def forward(self, X):
        #对所有的sparse特征挨个进行embedding
        feature_emb_list = []
        for col in self.emb_feature:
            inp = X[col].long().view(-1, 1)
            feature_emb_list.append(self.embedding_layer[col](inp))
        return feature_emb_list
    

In [8]:
def set_device(gpu=-1):
    if gpu >= 0 and torch.cuda.is_available():
        os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu)
        device = torch.device(f"cuda:{gpu}")
    else:
        device = torch.device("cpu")
    return device
    
def set_activation(activation):
    if isinstance(activation, str):
        if activation.lower() == "relu":
            return nn.ReLU()
        elif activation.lower() == "sigmoid":
            return nn.Sigmoid()
        elif activation.lower() == "tanh":
            return nn.Tanh()
        else:
            return getattr(nn, activation)()
    else:
        return activation
    
def get_dnn_input_dim(enc_dict,embedding_dim):
    num_sparse = 0
    num_dense = 0
    for col in enc_dict.keys():
        if 'min' in enc_dict[col].keys():
            num_dense+=1
        elif 'vocab_size' in enc_dict[col].keys():
            num_sparse+=1
    return num_sparse*embedding_dim+num_dense

def get_linear_input(enc_dict,data):
    res_data = []
    for col in enc_dict.keys():
        if 'min' in enc_dict[col].keys():
            res_data.append(data[col])
    res_data = torch.stack(res_data,axis=1)
    return res_data

def get_feature_num(enc_dict):
    num_sparse = 0
    num_dense = 0
    for col in enc_dict.keys():
        if 'min' in enc_dict[col].keys():
            num_dense+=1
        elif 'vocab_size' in enc_dict[col].keys():
            num_sparse+=1
    return num_sparse,num_dense

In [9]:
class MMOE(nn.Module):
    def __init__(self,
                 num_task=2,
                 n_expert=3,
                 embedding_dim=40,
                 mmoe_hidden_dim=128,
                 expert_activation=None,
                 hidden_dim=[128, 64],
                 dropouts=[0.2, 0.2],
                 enc_dict=None):
        super(MMOE, self).__init__()
        self.enc_dict = enc_dict
        self.num_task = num_task
        self.n_expert = n_expert
        self.mmoe_hidden_dim = mmoe_hidden_dim
        self.expert_activation = expert_activation
        self.hidden_dim = hidden_dim
        self.dropouts = dropouts
        self.embedding_dim = embedding_dim
        self.embedding_layer = EmbeddingLayer(enc_dict=self.enc_dict, embedding_dim=self.embedding_dim)

        self.num_sparse_fea, self.num_dense_fea = get_feature_num(self.enc_dict)

        hidden_size = self.num_sparse_fea * self.embedding_dim + self.num_dense_fea

        # experts
        self.experts = torch.nn.Parameter(torch.rand(hidden_size, mmoe_hidden_dim, n_expert), requires_grad=True)
        # self.experts.data.normal_(0, 1)
        self.experts_bias = torch.nn.Parameter(torch.rand(mmoe_hidden_dim, n_expert), requires_grad=True)
        # gates
        self.gates = [torch.nn.Parameter(torch.rand(hidden_size, n_expert), requires_grad=True) for _ in
                      range(num_task)]
        for gate in self.gates:
            gate.data.normal_(0, 1)
        self.gates_bias = [torch.nn.Parameter(torch.rand(n_expert), requires_grad=True) for _ in range(num_task)]

        for i in range(self.num_task):
            setattr(self, 'task_{}_dnn'.format(i + 1), nn.ModuleList()) #eval()
            hid_dim = [mmoe_hidden_dim] + hidden_dim
            for j in range(len(hid_dim) - 1):
                getattr(self, 'task_{}_dnn'.format(i + 1)).add_module('ctr_hidden_{}'.format(j),
                                                                      nn.Linear(hid_dim[j], hid_dim[j + 1]))
                getattr(self, 'task_{}_dnn'.format(i + 1)).add_module('ctr_batchnorm_{}'.format(j),
                                                                      nn.BatchNorm1d(hid_dim[j + 1]))
                getattr(self, 'task_{}_dnn'.format(i + 1)).add_module('ctr_dropout_{}'.format(j),
                                                                      nn.Dropout(dropouts[j]))
            getattr(self, 'task_{}_dnn'.format(i + 1)).add_module('task_last_layer', nn.Linear(hid_dim[-1], 1))
            getattr(self, 'task_{}_dnn'.format(i + 1)).add_module('task_sigmoid', nn.Sigmoid())

    def set_device(self, device):
        for i in range(self.num_task):
            self.gates[i] = self.gates[i].to(device)
            self.gates_bias[i] = self.gates_bias[i].to(device)
        print(f'Successfully set device:{device}')

    def forward(self, data):
        feature_embedding = self.embedding_layer(data)
        hidden = torch.stack(feature_embedding, 1).flatten(start_dim=1)

        dense_fea = get_linear_input(self.enc_dict, data) #hidden = self.num_sparse_fea * self.embedding_dim + self.num_dense_fea

        hidden = torch.cat([hidden, dense_fea], axis=-1) #batch,hidden

        # mmoe
        #矩阵乘法：torch.bmm(),torch.matmul(),torch.mm().....
        '''
        i:batch,j:hidden
        j:hidden,k:mmoe_hidden_size,l:n_expert
        '''
        experts_out = torch.einsum('ij, jkl -> ikl', hidden, self.experts)  # batch * mmoe_hidden_size * num_experts
        experts_out += self.experts_bias
        if self.expert_activation is not None:
            experts_out = self.expert_activation(experts_out)

                
        gates_out = list()
        for idx, gate in enumerate(self.gates):
            '''
            a:batch,b:hidden
            b:hidden,c:n_expert
            '''
            gate_out = torch.einsum('ab, bc -> ac', hidden, gate)  # batch * num_experts
            if self.gates_bias:
                gate_out += self.gates_bias[idx]
            gate_out = nn.Softmax(dim=-1)(gate_out)
            gates_out.append(gate_out)

        outs = list()
        for gate_output in gates_out:
            '''
            experts_out: batch,mmoe_hidden_size,num_experts
            gate_output: batch,num_experts -> batch,mmoe_hidden_size,num_experts
            '''
            expanded_gate_output = torch.unsqueeze(gate_output, 1)  # batch * 1 * num_experts
            weighted_expert_output = experts_out * expanded_gate_output.expand_as(
                experts_out)  # batch * mmoe_hidden_size * num_experts
            outs.append(torch.sum(weighted_expert_output, 2))  # batch * mmoe_hidden_size

        # task tower
        output_dict = dict()
        task_outputs = list()
        for i in range(self.num_task):
            x = outs[i]
            for mod in getattr(self, 'task_{}_dnn'.format(i + 1)):
                x = mod(x)
            task_outputs.append(x)
            output_dict[f'task{i + 1}_pred'] = x
        # get loss
        loss = self.loss(task_outputs, data)
        output_dict['loss'] = loss

        return output_dict

    def loss(self, task_outputs, data, weight=None):
        if weight == None:
            weight = np.ones(self.num_task) / self.num_task
        loss = 0
        for i in range(len(task_outputs)):
            loss += weight[i] * nn.functional.binary_cross_entropy(task_outputs[i].squeeze(-1)+1e-6,
                                                                   data[f'task{i + 1}_label'])

        return loss

In [10]:
def train_model(model, train_loader, optimizer, device, metric_list=['roc_auc_score','log_loss'], num_task =1):
    model.train()
    if num_task == 1:
        pred_list = []
        label_list = []
        pbar = tqdm(train_loader)
        for data in pbar:

            for key in data.keys():
                data[key] = data[key].to(device)

            output = model(data)
            pred = output['pred']
            loss = output['loss']

            loss.backward()
            optimizer.step()
            model.zero_grad()

            pred_list.extend(pred.squeeze(-1).cpu().detach().numpy())
            label_list.extend(data['label'].squeeze(-1).cpu().detach().numpy())
            pbar.set_description("Loss {}".format(loss))

        res_dict = dict()
        for metric in metric_list:
            if metric =='log_loss':
                res_dict[f'train_{metric}'] = log_loss(label_list,pred_list, eps=1e-7)
            else:
                res_dict[f'train_{metric}'] = eval(metric)(label_list,pred_list)

        return
    else:
        multi_task_pred_list = [[] for _ in range(num_task)]
        multi_task_label_list = [[] for _ in range(num_task)]
        pbar = tqdm(train_loader)
        for data in pbar:

            for key in data.keys():
                data[key] = data[key].to(device)

            output = model(data)
            loss = output['loss']

            loss.backward()
            optimizer.step()
            model.zero_grad()
            for i in range(num_task):
                multi_task_pred_list[i].extend(list(output[f'task{i + 1}_pred'].squeeze(-1).cpu().detach().numpy()))
                multi_task_label_list[i].extend(list(data[f'task{i + 1}_label'].squeeze(-1).cpu().detach().numpy()))
            pbar.set_description("Loss {}".format(loss))

        res_dict = dict()
        for i in range(num_task):
            for metric in metric_list:
                if metric == 'log_loss':
                    res_dict[f'train_task{i+1}_{metric}'] = log_loss(multi_task_label_list[i], multi_task_pred_list[i], eps=1e-7)
                else:
                    res_dict[f'train_task{i+1}_{metric}'] = eval(metric)(multi_task_label_list[i], multi_task_pred_list[i])
        return res_dict

def valid_model(model, valid_loader, device, metric_list=['roc_auc_score','log_loss'],num_task =1):
    model.eval()
    if num_task == 1:
        pred_list = []
        label_list = []
        for data in tqdm(valid_loader):

            for key in data.keys():
                data[key] = data[key].to(device)

            output = model(data)
            pred = output['pred']

            pred_list.extend(pred.squeeze(-1).cpu().detach().numpy())
            label_list.extend(data['label'].squeeze(-1).cpu().detach().numpy())

        res_dict = dict()
        for metric in metric_list:
            if metric =='log_loss':
                res_dict[f'valid_{metric}'] = log_loss(label_list,pred_list, eps=1e-7)
            else:
                res_dict[f'valid_{metric}'] = eval(metric)(label_list,pred_list)

        return res_dict
    else:
        multi_task_pred_list = [[] for _ in range(num_task)]
        multi_task_label_list = [[] for _ in range(num_task)]
        for data in valid_loader:

            for key in data.keys():
                data[key] = data[key].to(device)

            output = model(data)

            for i in range(num_task):
                multi_task_pred_list[i].extend(list(output[f'task{i + 1}_pred'].squeeze(-1).cpu().detach().numpy()))
                multi_task_label_list[i].extend(list(data[f'task{i + 1}_label'].squeeze(-1).cpu().detach().numpy()))

        res_dict = dict()
        for i in range(num_task):
            for metric in metric_list:
                if metric == 'log_loss':
                    res_dict[f'valid_task{i+1}_{metric}'] = log_loss(multi_task_label_list[i], multi_task_pred_list[i], eps=1e-7)
                else:
                    res_dict[f'valid_task{i+1}_{metric}'] = eval(metric)(multi_task_label_list[i], multi_task_pred_list[i])
        return res_dict

def test_model(model, test_loader, device, metric_list=['roc_auc_score','log_loss'],num_task =1):
    model.eval()
    if num_task == 1:
        pred_list = []
        label_list = []
        for data in tqdm(test_loader):

            for key in data.keys():
                data[key] = data[key].to(device)

            output = model(data)
            pred = output['pred']

            pred_list.extend(pred.squeeze(-1).cpu().detach().numpy())
            label_list.extend(data['label'].squeeze(-1).cpu().detach().numpy())

        res_dict = dict()
        for metric in metric_list:
            if metric == 'log_loss':
                res_dict[f'test_{metric}'] = log_loss(label_list, pred_list, eps=1e-7)
            else:
                res_dict[f'test_{metric}'] = eval(metric)(label_list, pred_list)

        return res_dict
    else:
        multi_task_pred_list = [[] for _ in range(num_task)]
        multi_task_label_list = [[] for _ in range(num_task)]
        for data in test_loader:

            for key in data.keys():
                data[key] = data[key].to(device)

            output = model(data)

            for i in range(num_task):
                multi_task_pred_list[i].extend(list(output[f'task{i + 1}_pred'].squeeze(-1).cpu().detach().numpy()))
                multi_task_label_list[i].extend(list(data[f'task{i + 1}_label'].squeeze(-1).cpu().detach().numpy()))

        res_dict = dict()
        for i in range(num_task):
            for metric in metric_list:
                if metric == 'log_loss':
                    res_dict[f'test_task{i + 1}_{metric}'] = log_loss(multi_task_label_list[i], multi_task_pred_list[i],
                                                                 eps=1e-7)
                else:
                    res_dict[f'test_task{i + 1}_{metric}'] = eval(metric)(multi_task_label_list[i], multi_task_pred_list[i])
        return res_dict


In [11]:
model = MMOE(enc_dict=enc_dict)

In [12]:
device = set_device(config['device'])
optimizer = torch.optim.Adam(model.parameters(), lr=config['lr'], betas=(0.9, 0.999), eps=1e-08, weight_decay=0)
model = model.to(device)
#模型训练流程
for i in range(config['epoch']):
    #模型训练
    train_metirc = train_model(model,train_loader,optimizer=optimizer,device=device,num_task=config['num_task'])
    #模型验证
    valid_metric = valid_model(model,valid_loader,device,num_task=config['num_task'])

    print("Train Metric:")
    print(train_metirc)
    print("Valid Metric:")
#测试模型
test_metric = test_model(model,test_loader,device,num_task=config['num_task'])
print('Test Metric:')
print(test_metric)

Loss 0.21911370754241943: 100%|███████████████████████████████████████████| 363/363 [05:07<00:00,  1.18it/s]


Train Metric:
{'train_task1_roc_auc_score': 0.5690551256967591, 'train_task1_log_loss': 0.3429387164022583, 'train_task2_roc_auc_score': 0.5558025842083217, 'train_task2_log_loss': 0.4047207226744438}
Valid Metric:


Loss 0.22694231569766998: 100%|███████████████████████████████████████████| 363/363 [05:04<00:00,  1.19it/s]


Train Metric:
{'train_task1_roc_auc_score': 0.7201995323064578, 'train_task1_log_loss': 0.18646943025961882, 'train_task2_roc_auc_score': 0.643444685036402, 'train_task2_log_loss': 0.2702671524461413}
Valid Metric:


Loss 0.19703590869903564: 100%|███████████████████████████████████████████| 363/363 [04:53<00:00,  1.24it/s]
Loss 0.1906682252883911: 100%|████████████████████████████████████████████| 363/363 [05:07<00:00,  1.18it/s]


Train Metric:
{'train_task1_roc_auc_score': 0.773343736574327, 'train_task1_log_loss': 0.17231779247282358, 'train_task2_roc_auc_score': 0.686045363670525, 'train_task2_log_loss': 0.25854246215160065}
Valid Metric:


Loss 0.20725512504577637: 100%|███████████████████████████████████████████| 363/363 [05:05<00:00,  1.19it/s]


Train Metric:
{'train_task1_roc_auc_score': 0.7943071665175145, 'train_task1_log_loss': 0.16643851440382257, 'train_task2_roc_auc_score': 0.7080943582420599, 'train_task2_log_loss': 0.25378364326784864}
Valid Metric:


Loss 0.17220842838287354: 100%|███████████████████████████████████████████| 363/363 [05:06<00:00,  1.18it/s]


Train Metric:
{'train_task1_roc_auc_score': 0.8183920108029228, 'train_task1_log_loss': 0.16022485525541283, 'train_task2_roc_auc_score': 0.7387514433894964, 'train_task2_log_loss': 0.24665741245579187}
Valid Metric:


Loss 0.17191338539123535: 100%|███████████████████████████████████████████| 363/363 [04:59<00:00,  1.21it/s]


Train Metric:
{'train_task1_roc_auc_score': 0.8405782576902938, 'train_task1_log_loss': 0.15328210445466148, 'train_task2_roc_auc_score': 0.7694727938695708, 'train_task2_log_loss': 0.23870366522537825}
Valid Metric:


Loss 0.1915300041437149:  31%|█████████████▊                              | 114/363 [01:32<03:31,  1.18it/s]IOPub message rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_msg_rate_limit`.

Current values:
NotebookApp.iopub_msg_rate_limit=1000.0 (msgs/sec)
NotebookApp.rate_limit_window=3.0 (secs)



In [13]:
test_metric

{'test_task1_roc_auc_score': 0.7683968055513677,
 'test_task1_log_loss': 0.1874662285723263,
 'test_task2_roc_auc_score': 0.662262847074074,
 'test_task2_log_loss': 0.29286973183338544}

In [None]:
#可能的开源～涉及rank模型以及多任务模型～有兴趣的私聊我
'''
train_loader,valid_laoder,test_loader, enc_dict = get_dataloader(train_df,valid_df,test_df,config)

model = MMOE(enc_dict=enc_dict,**config)

trainer = Trainer(model,config)

trainer.fit(train_loader,valid_loader,**config)
'''