# Chapter 4
## WebDatasetのベンチマーク

このチャプターでは、大量のnumpy配列ファイルのデータセットをWebDataset形式で作成し、個々のファイルを読み込む通常の形式のデータセットとパフォーマンスの比較をします。

特に大規模データセットをクラウド上のストレージに置く場合に違いが顕著になります。

このチャプターは比較的大きなデータセット（〜10GB）を扱うため、データの作成やS3への事前のアップロードに時間・コストがかかる場合があります。SageMaker notebookからの実行をお勧めします。また、データセットの容量や個数は環境に応じて適宜調整してください。

In [None]:
#pip install webdataset

In [None]:
import time
import numpy as np
import os
import shutil
import glob
import webdataset as wds
import tqdm
import boto3
import torch
import pandas as pd
import datetime

大規模データセット用のnumpy.arrayファイルとシャードされたtarファイルを格納するフォルダ名を指定し、初期化します。

In [None]:
arr_dir = "npy_large"
shard_dir = "shard_large"

In [None]:
if os.path.exists(arr_dir):
    print("Deleting exitsting array directory. ")
    shutil.rmtree(arr_dir)
print("(re)creating new array directory")
os.makedirs(arr_dir)

if os.path.exists(shard_dir):
    print("Deleting exitsting shard directory. ")
    shutil.rmtree(shard_dir)
print("(re)creating new shard directory")
os.makedirs(shard_dir)

numpy配列形式のデータセットを擬似的に作成します。

入力データ`（X）`は倍精度の二次元配列、教師データ`(y)`は、倍精度のnumpy arrayです。
各要素の値はサンプル名と同様にしています。

作成した`X`および`y`をそれぞれ`.input.npy`, `.output.npy`の形式で保存します。



In [None]:
    X = np.ones((64,64)) * 0
    y = np.array(0)

In [None]:
X

In [None]:
y

In [None]:
num_samples = 100000 # number of sample files
X_shape = (64,64)

recipe_fn = "npy_recipe"
npy_recipe = open(recipe_fn, "w")

for i in tqdm.tqdm(range(num_samples)):
    # create input array
    X = np.ones(X_shape) * i
    # create outpu array
    y = np.array(i)
    
    # set basename for npy input/output files
    arr_fn = "arr_%06d" % i
    # set npy filenames
    X_name = f"{arr_fn}.input.npy"
    y_name = f"{arr_fn}.output.npy"
    # save array
    np.save(os.path.join(arr_dir, X_name), X)
    np.save(os.path.join(arr_dir, y_name), y)
    
    # write sample information onto recipe file
    npy_recipe.write(f"{X_name}\tfile:{os.path.join(arr_dir, X_name)}\n")
    npy_recipe.write(f"{y_name}\tfile:{os.path.join(arr_dir, y_name)}\n")

npy_recipe.close()

 

保存した2種類のnumpy arrayファイル (`.input.npy`,`.output.npy`)からtarファイルを作成します。

一つのtarファイルにまとめるのではなく、サンプル数ごとにシャードされたtarファイルに分割します。
WebDatasetのTarWriterクラスを使うと、tarファイルを簡単に作ることができます。

ここではarr_000000.input.npyとarr_000000.output.npyを例にとります。
これらサンプルをWebDatasetに読み込めるためのtarファイルに保存するためには以下のような辞書形式のオブジェクト`sample`を作成します。


`"__key__"`: 拡張子なしのベースネーム <BR>
`"{入力ファイルの拡張子}"`:入力ファイルのバイトストリームデータ <BR>
`"{出力ファイルの拡張子}"`:出力ファイルのバイトストリームデータ <BR>

今回の場合、入力・出力ともnumpyファイル`.npy`であるため、両者を区別するために、拡張子の前に識別子を付け加えています。画像ファイルとjsonファイルのような場合は、


`"__key__"`:sample_0000<BR>
`".jpeg"`: jpegファイルのバイトストリームデータ<BR>
`".json"`:jsonファイルのバイトストリームデータ<BR>
    
といった形式にしてください。

ここでは、`sample_per_shard`変数によって一つのシャード(tarファイル)中のサンプルの数を指定しています。

In [None]:
fullnames = glob.glob(os.path.join(arr_dir, "*.input.npy"))
fullnames.sort()

sample_per_shard = 1000
i = 0
while i*sample_per_shard < len(fullnames):
    print(f"creating {i}-th tar file...")
    sink = wds.TarWriter(f"{shard_dir}/npy_webdataset-%04d.tar" % i, encoder=False)
    fullnames_per_shard = fullnames[(i*sample_per_shard):((i+1)*sample_per_shard)]
    for fullname in fullnames_per_shard:
        fullname_wo_ext = fullname.split(".")[0]
        basename_wo_ext = os.path.basename(fullname_wo_ext)
    
        #print(basename_wo_ext)
        with open(f"{fullname_wo_ext}.input.npy", "rb") as stream:
            inp = stream.read()
        with open(f"{fullname_wo_ext}.output.npy", "rb") as stream:
            out = stream.read()        
        sample = {
            "__key__": basename_wo_ext,
            "input.npy": inp,
            "output.npy": out
        }
        sink.write(sample)
    sink.close()
    i +=1
print("Finished!!!")

### S3へのアップロード

In [None]:
パフォーマンスを比較するため、tarファイルに固めたデータと、元のnpyデータの両方をS3にアップロードします。

In [None]:
bucket = "put your bucket name"
prefix = "put your prefix"

まず、作成したtarファイルをS3上にアップロードします。


In [None]:
!aws s3 sync --delete {shard_dir} s3://{bucket}/{prefix}/{shard_dir}/

次に、元のnpyデータをS3上にアップロードします。<BR>
**この作業は場合によっては1時間以上かかる場合があります。**
時間がかかりすぎる場合はサンプル数を調節してください。

In [None]:
!aws s3 sync --delete {arr_dir} s3://{bucket}/{prefix}/{arr_dir}/

### WebDatasetのパフォーマンス比較

作成したtarファイルからWebDatasetを作成します。作成したtarファイルの数に応じて、urlの`{0000..0099}`の部分を調整してください。

In [None]:
url = f"s3://{bucket}/{prefix}/{shard_dir}/" + "npy_webdataset-{0000..0099}.tar"
url = f"pipe:aws s3 cp {url} - || true"
print(url)

In [None]:
shard_dir

In [None]:
import os
os.cpu_count()

### バッチデータのフィード

#### case1: num_batch=1; シャッフルなし

In [None]:
def print_time():
    return datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%dT%H:%M:%S.%fZ')

In [None]:
print(f"start: {print_time()}")
webdataset = wds.WebDataset(url, shardshuffle=False, cache_dir=None).decode()
webdataset = webdataset.to_tuple("input.npy", "output.npy")
webloader = wds.WebLoader(webdataset, num_workers=0)
print(f"end: {print_time()}")


In [None]:
start = time.time()
print(f"start: {print_time()}")

for i, (inp, out) in enumerate(webloader):
    #print(out)
    print('.', end='')
    pass
end = time.time()
print(f"total time: {(end-start)} sec")
print(f"end: {print_time()}")



#### case 2: num_batch = 100; シャッフルなし

In [None]:
print(f"start: {print_time()}")
webdataset = wds.WebDataset(url, shardshuffle=False, cache_dir=None).decode()
webdataset = webdataset.to_tuple("input.npy", "output.npy").batched(100)
webloader = wds.WebLoader(webdataset, num_workers=0)
print(f"end: {print_time()}")


In [None]:
start = time.time()
print(f"start: {print_time()}")

for i, (inp, out) in enumerate(webloader):
    #print(out)
    print('.', end='')
    pass
end = time.time()
print(f"total time: {(end-start)} sec")
print(f"end: {print_time()}")



In [None]:
!aws s3 cp s3://taturabe-dataset/dataset/webdataset/shard_large/npy_webdataset-0099.tar tmp.tar

In [None]:
athena = boto3.client('athena', region_name='us-east-1')

query =f'''
select

    eventtime,
    eventname,
    json_extract_scalar(requestparameters, '$.bucketName') as bucketName,
    json_extract_scalar(requestparameters, '$.key') as key

from 
    cloudtrail_logs_aws_cloudtrail_logs_820974724107_798abb34
WHERE eventName='GetObject'
    AND eventTime BETWEEN '{start}' and '{end}'
ORDER BY eventtime DESC
'''

query_exec = athena.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': 'default'
},
ResultConfiguration={
'OutputLocation': 's3://taturabe-dataset/tmp/'
}
)
    
query_id = query_exec['QueryExecutionId']


state=None
while(True):
    if state=='SUCCEEDED':
        print("query succeeded")
        break
    elif state=='FAILED':
        print("query failed!!")
        break
    time.sleep(1)
    print(".", end="")

    query_exec = athena.get_query_execution(
                                QueryExecutionId=query_id
                                )
    state = query_exec['QueryExecution']['Status']['State']

queryres = athena.get_query_results(
                        QueryExecutionId=query_id
                        )
csv_path=query_exec['QueryExecution']['ResultConfiguration']['OutputLocation']
query_df =pd.read_csv(csv_path)

In [None]:
query_df

In [None]:
query

In [None]:
for i in range(100):
    time.sleep(1)
    print(i)

In [None]:
pd.read_sql(res)

In [None]:
status = athena.get_query_execution(
QueryExecutionId=query_id
)



In [None]:
webdataset = wds.WebDataset(url, shardshuffle=False, cache_dir=None).decode()
webdataset = webdataset.to_tuple("input.npy", "output.npy").batched(32)
webloader = wds.WebLoader(webdataset, num_workers=os.cpu_count()-1)
start = time.time()
for i, (inp, out) in enumerate(webloader):
    #print(out)
    print('.', end='')
    pass


end = time.time()
print("total time: ", (end-start))

In [None]:
# s3のprefix中のフォルダを検索する
def get_all_s3_objects(s3, **base_kwargs):
    continuation_token = None
    while True:
        list_kwargs = dict(MaxKeys=1000, **base_kwargs)
        if continuation_token:
            list_kwargs['ContinuationToken'] = continuation_token
        response = s3.list_objects_v2(**list_kwargs)
        yield from response.get('Contents', [])
        if not response.get('IsTruncated'):  # At the end of the list?
            break
        continuation_token = response.get('NextContinuationToken')

In [None]:
s3_client = boto3.client('s3')

contents = list(get_all_s3_objects(boto3.client('s3'), Bucket=bucket, Prefix=os.path.join(prefix, arr_dir)))

input_arr_path = [c['Key'] for c in contents if c['Key'][-9:] == "input.npy"]
basename_path = [i.split(".input.npy")[0] for i in input_arr_path] 

In [None]:
len(basename_path)

In [None]:
class S3Dataset(torch.utils.data.Dataset):
    def __init__(self, input_images_path_list, client, bucket):        
        self.input_images_path_list = input_images_path_list
        self.client = client
        self.bucket = bucket
      
    def __len__(self):
        return len(self.input_images_path_list)
    
    def __getitem__(self, idx):
        # download .npy array from S3
        obj = self.client.get_object(Bucket=self.bucket, Key=self.input_images_path_list[idx] + ".input.npy")
        input_arr = np.frombuffer( obj['Body'].read(), dtype='float64', offset=128)
        input_arr = input_arr.reshape(64,64)
    
    
        obj = self.client.get_object(Bucket=self.bucket, Key=(self.input_images_path_list[idx]) + ".output.npy")
        output_arr = np.frombuffer( obj['Body'].read(), dtype='float64', offset=128)
        output_arr = output_arr.reshape(1)        

        input_tensor = torch.from_numpy(input_arr)
        output_tensor = torch.from_numpy(output_arr)

        #### 要修正
        # data typeの変更 float16は未対応
        input_tensor = input_tensor.type(torch.float32)
        output_tensor = output_tensor.type(torch.float32)


        return input_tensor, output_tensor, self.input_images_path_list[idx]
 

In [None]:
s3dataset = S3Dataset(basename_path, s3_client, bucket)
s3loader =  torch.utils.data.DataLoader(s3dataset, batch_size=32, 
                                        shuffle=False, num_workers=os.cpu_count()-1)


In [None]:
basename_path

In [None]:
start = time.time()
for i, (inp,out,path) in enumerate(s3loader):
    print(path)
    #print('.', end='')
end = time.time()
print("total time: ", (end-start))

In [None]:
a=1
print(a)

In [None]:
inp