In [62]:
# test encoded weather datasets (visually)
%load_ext autoreload
%autoreload 2
import numpy as np
import random
from wseqrecorder.wseqrecord import WSeqRecord
import itertools
import os
import re
from typing import List

def sort_weatherfiles(files: List[os.DirEntry]) -> List[os.DirEntry]:
    """Return sorted files in dir, make sure files are sorted incrementally in time.
    # todo: check with Jayesh this sorting is correct
    Example file names: 195001010600-195501010000_33.npz from CMIP6/MPI-ESM/1.40625deg_equally_np_all_levels/train/

    Args:
        files (List[os.DirEntry]): each element in list is a file name
    """

    def str2nums(direntry):
        nums = re.split("-|_", direntry.name.split(".")[0])
        nums = tuple(map(int, nums))
        return nums

    return sorted(files, key=str2nums)

dataset_mount_dir = "/datadrive/weatherdatastorage2/datasets"
wsrecord_dir = f"{dataset_mount_dir}/CMIP6/MPI-ESM/wseqrecord/1.40625deg_equally_np_all_levels/train/"
wdataset_dir = f"{dataset_mount_dir}/CMIP6/MPI-ESM/1.40625deg_equally_np_all_levels/train/"

num_frames_per_file = 146
num_wdataset_files = 100
wsrecord = WSeqRecord.load_record_from_dict(wsrecord_dir)

files_dirs = os.scandir(wdataset_dir)
files = list(filter(lambda direntry: direntry.is_file(), files_dirs))
files = sort_weatherfiles(files)

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [63]:

# utils to read wdataset
def frame_from_wdataset(nth_frame:int, num_frames_per_file:int):
    nth_file = nth_frame // num_frames_per_file
    nth_frame_in_file = nth_frame % num_frames_per_file
    assert nth_file < 100
    file = files[nth_file]
    print(f"file that contains {nth_frame}-th frame is the {nth_file}-th file in dataset named {files[nth_file]}")
    data = np.load(file)
    frame = {}
    for key in data.files:
        frame[key] = data[key][nth_frame_in_file]
    return frame

In [60]:
print(wsrecord.metadata[32])

{'frame_idx': 32, 'file_idx': 0, 'bytes_offset': 171966464, 't2m': {'is_none': False, 'dtype': dtype('float32'), 'shape': (1, 128, 256), 'bytes_offset': 0, 'nbytes': 131072}, 'u10': {'is_none': False, 'dtype': dtype('float32'), 'shape': (1, 128, 256), 'bytes_offset': 131072, 'nbytes': 131072}, 'v10': {'is_none': False, 'dtype': dtype('float32'), 'shape': (1, 128, 256), 'bytes_offset': 262144, 'nbytes': 131072}, 'z_50': {'is_none': False, 'dtype': dtype('float32'), 'shape': (1, 128, 256), 'bytes_offset': 393216, 'nbytes': 131072}, 'z_250': {'is_none': False, 'dtype': dtype('float32'), 'shape': (1, 128, 256), 'bytes_offset': 524288, 'nbytes': 131072}, 'z_500': {'is_none': False, 'dtype': dtype('float32'), 'shape': (1, 128, 256), 'bytes_offset': 655360, 'nbytes': 131072}, 'z_600': {'is_none': False, 'dtype': dtype('float32'), 'shape': (1, 128, 256), 'bytes_offset': 786432, 'nbytes': 131072}, 'z_700': {'is_none': False, 'dtype': dtype('float32'), 'shape': (1, 128, 256), 'bytes_offset': 917

In [64]:
# load arbitrary nth frame and compare against the original frame in npy dataset

features = ["t2m","u10","v10"]
target_features = ["z_50", "z_250", "z_500"]

nth_frame = 110 #random.randint(0, num_frames_per_file * num_wdataset_files -1)

frame_wsrecord = next(itertools.islice(wsrecord.iterate_frames(features), nth_frame, num_frames_per_file*num_wdataset_files -1))
frame_wdataset = frame_from_wdataset(nth_frame, num_frames_per_file)

for key in frame_wsrecord:
    np.testing.assert_equal(frame_wsrecord[key], frame_wdataset[key], err_msg = "", verbose=True)


0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
file that contains 110-th frame is the 0-th file in dataset named <DirEntry '185001010600-185501010000_0.npz'>


In [72]:
# load arbitrary nth framec pairs

features = ["t2m","u10","v10"]
target_features = ["z_50", "z_250", "z_500"]

nth_frame = 168 # random.randint(0, num_frames_per_file * num_wdataset_files -2)

frame_wsrecord, target_frame_wsrecord, lookahead_steps = next(itertools.islice(wsrecord.iterate_frame_pairs(features, target_features, 28), nth_frame, num_frames_per_file * num_wdataset_files-2))
frame_wdataset = frame_from_wdataset(nth_frame, num_frames_per_file)
target_frame_wdataset = frame_from_wdataset(nth_frame + lookahead_steps, num_frames_per_file)
for key in frame_wsrecord:
    np.testing.assert_equal(frame_wsrecord[key], frame_wdataset[key], err_msg = "", verbose=True)
for key in target_frame_wsrecord:
    np.testing.assert_equal(target_frame_wsrecord[key], target_frame_wdataset[key], err_msg="", verbose=True)

file that contains 168-th frame is the 1-th file in dataset named <DirEntry '185001010600-185501010000_1.npz'>
file that contains 189-th frame is the 1-th file in dataset named <DirEntry '185001010600-185501010000_1.npz'>


In [24]:
# test
import pickle
file_path = os.path.join(wsrecord_dir, "record.dict")
with open(file_path, mode="rb") as f:
    obj_dict = pickle.load(f)
print(obj_dict.keys())

dict_keys(['recorddir', 'features_written', 'num_bytes', 'idx_range_of_files', 'file_desc', 'metadata', 'frame_idx', 'num_frames', 'num_files'])


In [25]:
print(obj_dict['num_files'])

157


In [1]:
from typing import List, Tuple
def distribute_loads(works: int, num_processes: int) -> List[Tuple[int, int]]:
    """Given the overall works and number of processes, allocate evenly the loads that each process should take.

    Args:
        works (int): amount of over all work
        num_processes (int): number of processes available

    Returns:
        List[Tuple[int, int]]: indices of work each process is responsible for
    """
    assert works >= num_processes, "The amount of works is less than number of processes."
    ans = []
    start = 0
    loads_per_process = round(works / num_processes)
    for i in range(num_processes):
        end = start + loads_per_process if i < num_processes - 1 else works
        ans.append((start, end))
        start = end
    return ans

In [3]:
distribute_loads(4, 3)

[(0, 1), (1, 2), (2, 4)]