/
deepspeed_dataloader.py
92 lines (77 loc) · 2.96 KB
/
deepspeed_dataloader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
'''
Copyright 2019 The Microsoft DeepSpeed Team
'''
import torch
import logging
from torch.utils.data import DataLoader, RandomSampler
from torch.utils.data.distributed import DistributedSampler
from tqdm import tqdm
class DeepSpeedDataSource(object):
def __init__(self, filenames):
all_lines = []
for filename in filenames:
logging.info("Start reading file %s" % filename)
with open(filename, "r") as f:
for i, line in enumerate(tqdm(f)):
all_lines.append(line.strip())
self.all_lines = all_lines
self.len = len(self.all_lines)
def __len__(self):
return self.len
class DeepSpeedDataLoader(object):
def __init__(self,
dataset,
batch_size,
pin_memory,
local_rank,
tput_timer,
collate_fn=None,
num_local_io_workers=None,
data_sampler=None):
self.tput_timer = tput_timer
self.batch_size = batch_size
if local_rank >= 0:
if data_sampler is None:
data_sampler = DistributedSampler(dataset)
device_count = 1
else:
if data_sampler is None:
data_sampler = RandomSampler(dataset)
device_count = torch.cuda.device_count()
batch_size *= device_count
if num_local_io_workers is None:
num_local_io_workers = 2 * device_count
self.num_local_io_workers = num_local_io_workers
self.data_sampler = data_sampler
self.dataset = dataset
self.collate_fn = collate_fn
self.device_count = device_count
self.batch_size = batch_size
self.pin_memory = pin_memory
self.len = len(self.data_sampler)
self.data = None
def __iter__(self):
self._create_dataloader()
return self
def __len__(self):
return self.len
def __next__(self):
if self.tput_timer:
self.tput_timer.start()
return next(self.data)
def _create_dataloader(self):
if self.collate_fn is None:
self.dataloader = DataLoader(self.dataset,
batch_size=self.batch_size,
pin_memory=self.pin_memory,
sampler=self.data_sampler,
num_workers=self.num_local_io_workers)
else:
self.dataloader = DataLoader(self.dataset,
batch_size=self.batch_size,
pin_memory=self.pin_memory,
sampler=self.data_sampler,
collate_fn=self.collate_fn,
num_workers=self.num_local_io_workers)
self.data = (x for x in self.dataloader)
return self.dataloader