-
Notifications
You must be signed in to change notification settings - Fork 489
/
Copy pathebc_benchmarks_utils.py
119 lines (90 loc) · 3.12 KB
/
ebc_benchmarks_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
# pyre-strict
import time
from typing import Dict, List, Optional, Tuple
import numpy as np
import torch
from torch.utils.data.dataset import IterableDataset
from torchrec.datasets.random import RandomRecDataset
from torchrec.datasets.utils import Batch
from torchrec.modules.embedding_configs import EmbeddingBagConfig
def get_random_dataset(
batch_size: int,
num_batches: int,
num_dense_features: int,
embedding_bag_configs: List[EmbeddingBagConfig],
pooling_factors: Optional[Dict[str, int]] = None,
) -> IterableDataset[Batch]:
if pooling_factors is None:
pooling_factors = {}
keys = []
ids_per_features = []
hash_sizes = []
for table in embedding_bag_configs:
for feature_name in table.feature_names:
keys.append(feature_name)
# guess a pooling factor here
ids_per_features.append(pooling_factors.get(feature_name, 64))
hash_sizes.append(table.num_embeddings)
return RandomRecDataset(
keys=keys,
batch_size=batch_size,
hash_sizes=hash_sizes,
ids_per_features=ids_per_features,
num_dense=num_dense_features,
num_batches=num_batches,
)
def train_one_epoch(
model: torch.nn.Module,
optimizer: torch.optim.Optimizer,
dataset: IterableDataset[Batch],
device: torch.device,
) -> float:
start_time = time.perf_counter()
for data in dataset:
sparse_features = data.sparse_features.to(device)
pooled_embeddings = model(sparse_features)
optimizer.zero_grad()
vals = []
for _name, param in pooled_embeddings.to_dict().items():
vals.append(param)
torch.cat(vals, dim=1).sum().backward()
optimizer.step()
end_time = time.perf_counter()
return end_time - start_time
def train_one_epoch_fused_optimizer(
model: torch.nn.Module,
dataset: IterableDataset[Batch],
device: torch.device,
) -> float:
start_time = time.perf_counter()
for data in dataset:
sparse_features = data.sparse_features.to(device)
fused_pooled_embeddings = model(sparse_features)
fused_vals = []
for _name, param in fused_pooled_embeddings.to_dict().items():
fused_vals.append(param)
torch.cat(fused_vals, dim=1).sum().backward()
end_time = time.perf_counter()
return end_time - start_time
def train(
model: torch.nn.Module,
optimizer: Optional[torch.optim.Optimizer],
dataset: IterableDataset[Batch],
device: torch.device,
epochs: int = 100,
) -> Tuple[float, float]:
training_time = []
for _ in range(epochs):
if optimizer:
training_time.append(train_one_epoch(model, optimizer, dataset, device))
else:
training_time.append(
train_one_epoch_fused_optimizer(model, dataset, device)
)
return np.mean(training_time), np.std(training_time)