-
Notifications
You must be signed in to change notification settings - Fork 0
/
lr_nnmodule.py
139 lines (126 loc) · 4.58 KB
/
lr_nnmodule.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import torch
import torch.nn as nn
from torchmetrics.functional import auroc
from veloce.util import pprint_results
from veloce.environ import environ_validate
from veloce.optimizer import FTRL
from veloce import NeuralNetTrainer
from veloce.preprocessing import DataLoader
"""
A native nn.Module use-case of Veloce
What we have in the file:
1. A Native PyTorch nn.Module.
2. Use Veloce's DataLoader to load dataset, define features and do
preprocessing. This component is leveraged by _Ray Data_.
3. Use NeuralNetTrainer to launch a data scientist-friendly training
lifecycle. This component is leveraged by _Ray Train_.
"""
class LR(nn.Module):
def __init__(
self,
dense_defs,
sparse_defs,
seed=1024,
init_std=1e-4,
device="cpu",
output_fn=torch.sigmoid,
):
super(LR, self).__init__()
torch.manual_seed(seed)
self.sparse_defs = sparse_defs
self.dense_defs = dense_defs
self.output_fn = output_fn
self.device = device
# sparse definition
self.embedding_layer = nn.ModuleDict(
{
feat.name: nn.Embedding(
num_embeddings=feat.vocabulary_size,
embedding_dim=feat.embedding_dim,
sparse=False,
)
for feat in self.sparse_defs
}
)
for tensor in self.embedding_layer.values():
nn.init.normal_(tensor.weight, mean=0, std=init_std)
self.embedding_layer = self.embedding_layer.to(self.device)
# dense definition
self.weight = nn.Parameter(
torch.Tensor(sum(fc.dimension for fc in self.dense_defs), 1).to(self.device)
)
torch.nn.init.normal_(self.weight, mean=0, std=init_std)
# module locale
self.to(self.device)
def forward(self, X):
# sparse part
sparse_embeds = [
self.embedding_layer[feat.name](
X[:, feat.column_idx : feat.column_idx + 1].long()
)
for feat in self.sparse_defs
]
sparse_embeds_cat = torch.cat(sparse_embeds, dim=-1)
sparse_logit = torch.sum(sparse_embeds_cat, dim=-1, keepdim=False)
# dense part
dense_values = [
X[:, feat.column_idx : feat.column_idx + feat.dimension]
for feat in self.dense_defs
]
dense_logit = torch.cat(dense_values, dim=-1).matmul(self.weight)
# output
output = self.output_fn(dense_logit + sparse_logit)
return output
def train_lr_dist(num_workers=2, use_gpu=False, rand_seed=2021):
dataloader = DataLoader("examples/dataset/ctr/criteo_mini.txt")
dataloader = (
dataloader.set_label_column(label_name="label")
.set_dense_features(feat_names=[f"I{i}" for i in range(1, 14)])
.set_sparse_features(
feat_names=[f"C{i}" for i in range(1, 27)], embedding_dim=1
)
# this order should follow the data file
.set_features_order(order=("dense", "sparse"))
)
datasets = dataloader.split()
dense_defs = dataloader.dense_defs
sparse_defs = dataloader.sparse_defs
torch_dataset_options = dataloader.gen_torch_dataset_options()
# dense_defs is like,
# [{'name': 'I1', 'dimension': 1.0, 'dtype': 'float32', 'column_idx': 1.0,
# 'feat_type': 'DenseFeat'},
# {'name': 'I2', 'dimension': 1.0, 'dtype': 'float32', 'column_idx': 2.0,
# 'feat_type': 'DenseFeat'}, ...]
# sparse_defs is like,
# [{'name': 'C1', 'vocabulary_size': 557.0, 'embedding_dim': 1.0,
# 'dtype': 'int32', 'group_name': 'default_group', 'column_idx': 14.0,
# 'feat_type': 'SparseFeat'},
# {'name': 'C2', 'vocabulary_size': 507.0, 'embedding_dim': 1.0,
# 'dtype': 'int32', 'group_name': 'default_group', 'column_idx': 15.0,
# 'feat_type': 'SparseFeat'}, ...]
# launch the trainer
trainer = NeuralNetTrainer(
# module and dataset configs
module=LR,
module_params={
"dense_defs": dense_defs,
"sparse_defs": sparse_defs,
"seed": rand_seed,
},
dataset=datasets,
dataset_options=torch_dataset_options,
# trainer configs
epochs=20,
batch_size=512,
loss_fn=nn.BCELoss(),
optimizer=FTRL,
metric_fns=[auroc],
num_workers=num_workers,
use_gpu=use_gpu,
callbacks=["json", "tbx"],
)
results = trainer.run()
pprint_results(results)
if __name__ == "__main__":
environ_validate(n_cpus=1 + 2)
train_lr_dist(num_workers=2)