## 1. Libraries

In [1]:
import time

import numpy as np
import torch
import torch.utils.data as torchdata

from multiprocessing import cpu_count


torch.__version__

'1.4.0'

## 2. DataLoaderで`num_workers > 0`を設定した時の挙動を確認する

### 2.1 Dataset定義

オプションをつけることでわざと遅くする機能を持たせます。

In [2]:
class DatasetWrapper(torchdata.Dataset):
    def __init__(self, slow=False, wait=0.01):
        self.slow = slow
        self.wait = wait
        
    def __len__(self):
        return 128
    
    def __getitem__(self, idx):
        if self.slow:
            time.sleep(self.wait)
        return np.random.random(10)
    
    
def create_loader(slow=False, wait=0.01, num_workers=0):
    # parameters
    batch_size = 32
    shuffle = False
    pin_memory = False
    loader = torchdata.DataLoader(
        DatasetWrapper(slow=slow, wait=wait),
        num_workers=num_workers,
        batch_size=batch_size,
        shuffle=shuffle,
        pin_memory=pin_memory)
    return loader

### 2.2 `num_workers > 0`の場合の挙動を確認する

`num_workers == 0`の場合と`num_workers > 0`の場合でデータのロードの仕方がどう変化するかを見てみましょう。

In [8]:
# num_workers == 0の場合
loader = create_loader(slow=True)
time_from_last_batch = time.time()

for i, _ in enumerate(loader):
    batch_end = time.time()
    elapsed = batch_end - time_from_last_batch
    print(f"elapsed time for batch: {i + 1} is {elapsed:.5f} s")
    time_from_last_batch = batch_end

elapsed time for batch: 1 is 0.35927 s
elapsed time for batch: 2 is 0.38517 s
elapsed time for batch: 3 is 0.38399 s
elapsed time for batch: 4 is 0.37444 s


In [9]:
# num_workers > 0の場合
loader = create_loader(slow=True, num_workers=1)
time_from_last_batch = time.time()

for i, _ in enumerate(loader):
    batch_end = time.time()
    elapsed = batch_end - time_from_last_batch
    print(f"elapsed time for batch: {i + 1} is {elapsed:.5f} s")
    time_from_last_batch = batch_end

elapsed time for batch: 1 is 0.39021 s
elapsed time for batch: 2 is 0.38094 s
elapsed time for batch: 3 is 0.36544 s
elapsed time for batch: 4 is 0.36317 s


In [10]:
loader = create_loader(slow=True, num_workers=2)
time_from_last_batch = time.time()

for i, _ in enumerate(loader):
    batch_end = time.time()
    elapsed = batch_end - time_from_last_batch
    print(f"elapsed time for batch: {i + 1} is {elapsed:.5f} s")
    time_from_last_batch = batch_end

elapsed time for batch: 1 is 0.38794 s
elapsed time for batch: 2 is 0.00779 s
elapsed time for batch: 3 is 0.36236 s
elapsed time for batch: 4 is 0.00942 s


In [11]:
loader = create_loader(slow=True, num_workers=3)
time_from_last_batch = time.time()

for i, _ in enumerate(loader):
    batch_end = time.time()
    elapsed = batch_end - time_from_last_batch
    print(f"elapsed time for batch: {i + 1} is {elapsed:.5f} s")
    time_from_last_batch = batch_end

elapsed time for batch: 1 is 0.40101 s
elapsed time for batch: 2 is 0.00495 s
elapsed time for batch: 3 is 0.00020 s
elapsed time for batch: 4 is 0.36986 s


In [12]:
loader = create_loader(slow=True, num_workers=4)
time_from_last_batch = time.time()

for i, _ in enumerate(loader):
    batch_end = time.time()
    elapsed = batch_end - time_from_last_batch
    print(f"elapsed time for batch: {i + 1} is {elapsed:.5f} s")
    time_from_last_batch = batch_end

elapsed time for batch: 1 is 0.40932 s
elapsed time for batch: 2 is 0.00212 s
elapsed time for batch: 3 is 0.00112 s
elapsed time for batch: 4 is 0.00104 s


上の例でわかるように、`num_workers`が2以上に設定されている場合、`DataLoader`の挙動としては最初の`n`バッチ分(`n`は指定したworker数)をまず取りにいき、それらが揃った段階で`for`ブロックの中の処理が走ります。`for`ブロックの中の処理が走っている最中も手が空いたworkerはバッチを次の`n`バッチ分を確保していますが、`for`ブロックの中の処理が十分に速いとすでに確保していてプールしてあったバッチをすぐに消費してしまうため、待ちが発生しworkerが次の`n`バッチ分を揃えるまで処理が止まります。

また、`num_workers > 0`とした場合には最初の`n`バッチの用意をする時間に、subprocessを立ち上げるオーバーヘッドが加わります。この効果は、比較的小さいので、上のコードを再実行する場合は何回か試してみて確認してみてください。

## 3. `DataLoader`で`num_workers > 0`を設定するといいのはどんな時か?

### 3.1 挙動を元に考察

まず、`num_workers`を2以上に設定することでバッチを用意するのにかかる時間を$\frac{1}{num\_workers}$にすることができます。しかしこれは速度向上の下限であり、`for`ブロック内の処理が比較的長い場合はバッチを用意するのにかかる時間の短縮効果はさらに高まります。

具体的には$t_{batch}$をworkerがバッチを一つ用意するのにかかる時間、$t_{for}$を`for`ブロック内のコードが一バッチ分実行されるのにかかる時間とした場合に
$$
t_{batch} < t_{for}
$$

が成り立てば、上の例にあるような`for`ブロックの中の処理がプールしてあるバッチを消費しきってしまってメインプロセスが待ちに入るという状態を経ないため最も効用が高くなります。

### num_workes > 0に指定して効果検証

In [4]:
print("cpuの数:", cpu_count())
fastloader = torchdata.DataLoader(
    DatasetWrapper(), 
    num_workers=0, 
    batch_size=batch_size,
    shuffle=shuffle,
    pin_memory=pin_memory)

fastloader_multi = torchdata.DataLoader(
    DatasetWrapper(), 
    num_workers=cpu_count() // 2, 
    batch_size=batch_size,
    shuffle=shuffle,
    pin_memory=pin_memory)

slowloader = torchdata.DataLoader(
    DatasetWrapper(slow=True, wait=0.01), 
    num_workers=0, 
    batch_size=batch_size,
    shuffle=shuffle,
    pin_memory=pin_memory)

slowloader_multi = torchdata.DataLoader(
    DatasetWrapper(slow=True, wait=0.01), 
    num_workers=cpu_count() // 2, 
    batch_size=batch_size,
    shuffle=shuffle,
    pin_memory=pin_memory)

cpuの数: 8


#### `DataLoader`のロード時間 < `for`文内の場合

In [5]:
%%time
for batch in fastloader:
    time.sleep(1.0)

CPU times: user 4.31 ms, sys: 1.77 ms, total: 6.09 ms
Wall time: 4.01 s


In [6]:
%%time
for batch in fastloader_multi:
    time.sleep(1.0)

CPU times: user 14.3 ms, sys: 18.9 ms, total: 33.1 ms
Wall time: 4.16 s


#### `DataLoader`のロード時間 > `for`文内の場合

In [7]:
%%time
for batch in slowloader:
    time.sleep(1.0)

CPU times: user 13.3 ms, sys: 4.77 ms, total: 18.1 ms
Wall time: 5.52 s


In [8]:
%%time
for batch in slowloader_multi:
    time.sleep(1.0)

CPU times: user 13.2 ms, sys: 20.3 ms, total: 33.5 ms
Wall time: 4.4 s
