forked from SciTools/iris
-
Notifications
You must be signed in to change notification settings - Fork 0
/
um_files.py
212 lines (173 loc) · 6.84 KB
/
um_files.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
# Copyright Iris contributors
#
# This file is part of Iris and is released under the LGPL license.
# See COPYING and COPYING.LESSER in the root of the repository for full
# licensing details.
"""
Generate FF, PP and NetCDF files based on a minimal synthetic FF file.
NOTE: uses the Mule package, so depends on an environment with Mule installed.
"""
def _create_um_files(
len_x: int, len_y: int, len_z: int, len_t: int, compress, save_paths: dict
) -> None:
"""
Generate an FF object of given shape and compression, save to FF/PP/NetCDF.
This is run externally
(:func:`benchmarks.generate_data.run_function_elsewhere`), so all imports
are self-contained and input parameters are simple types.
"""
from copy import deepcopy
from datetime import datetime
from tempfile import NamedTemporaryFile
from mo_pack import compress_wgdos as mo_pack_compress
from mule import ArrayDataProvider, Field3, FieldsFile
from mule.pp import fields_to_pp_file
import numpy as np
from iris import load_cube
from iris import save as save_cube
def packing_patch(*compress_args, **compress_kwargs) -> bytes:
"""
Force conversion from returned :class:`memoryview` to :class:`bytes`.
Downstream uses of :func:`mo_pack.compress_wgdos` were written
for the ``Python2`` behaviour, where the returned buffer had a
different ``__len__`` value to the current :class:`memoryview`.
Unable to fix directly in Mule, so monkey patching for now.
"""
return mo_pack_compress(*compress_args, **compress_kwargs).tobytes()
import mo_pack
mo_pack.compress_wgdos = packing_patch
########
template = {
"fixed_length_header": {"dataset_type": 3, "grid_staggering": 3},
"integer_constants": {
"num_p_levels": len_z,
"num_cols": len_x,
"num_rows": len_y,
},
"real_constants": {},
"level_dependent_constants": {"dims": (len_z + 1, None)},
}
new_ff = FieldsFile.from_template(deepcopy(template))
data_array = np.arange(len_x * len_y).reshape(len_x, len_y)
array_provider = ArrayDataProvider(data_array)
def add_field(level_: int, time_step_: int) -> None:
"""
Add a minimal field to the new :class:`~mule.FieldsFile`.
Includes the minimum information to allow Mule saving and Iris
loading, as well as incrementation for vertical levels and time
steps to allow generation of z and t dimensions.
"""
new_field = Field3.empty()
# To correspond to the header-release 3 class used.
new_field.lbrel = 3
# Mule uses the first element of the lookup to test for
# unpopulated fields (and skips them), so the first element should
# be set to something. The year will do.
new_field.raw[1] = datetime.now().year
# Horizontal.
new_field.lbcode = 1
new_field.lbnpt = len_x
new_field.lbrow = len_y
new_field.bdx = new_ff.real_constants.col_spacing
new_field.bdy = new_ff.real_constants.row_spacing
new_field.bzx = new_ff.real_constants.start_lon - 0.5 * new_field.bdx
new_field.bzy = new_ff.real_constants.start_lat - 0.5 * new_field.bdy
# Hemisphere.
new_field.lbhem = 32
# Processing.
new_field.lbproc = 0
# Vertical.
# Hybrid height values by simulating sequences similar to those in a
# theta file.
new_field.lbvc = 65
if level_ == 0:
new_field.lblev = 9999
else:
new_field.lblev = level_
level_1 = level_ + 1
six_rec = 20 / 3
three_rec = six_rec / 2
new_field.blev = level_1 ** 2 * six_rec - six_rec
new_field.brsvd1 = (
level_1 ** 2 * six_rec + (six_rec * level_1) - three_rec
)
brsvd2_simulated = np.linspace(0.995, 0, len_z)
shift = min(len_z, 2)
bhrlev_simulated = np.concatenate(
[np.ones(shift), brsvd2_simulated[:-shift]]
)
new_field.brsvd2 = brsvd2_simulated[level_]
new_field.bhrlev = bhrlev_simulated[level_]
# Time.
new_field.lbtim = 11
new_field.lbyr = time_step_
for attr_name in ["lbmon", "lbdat", "lbhr", "lbmin", "lbsec"]:
setattr(new_field, attr_name, 0)
new_field.lbyrd = time_step_ + 1
for attr_name in ["lbmond", "lbdatd", "lbhrd", "lbmind", "lbsecd"]:
setattr(new_field, attr_name, 0)
# Data and packing.
new_field.lbuser1 = 1
new_field.lbpack = int(compress)
new_field.bacc = 0
new_field.bmdi = -1
new_field.lbext = 0
new_field.set_data_provider(array_provider)
new_ff.fields.append(new_field)
for time_step in range(len_t):
for level in range(len_z):
add_field(level, time_step + 1)
ff_path = save_paths.get("FF", None)
pp_path = save_paths.get("PP", None)
nc_path = save_paths.get("NetCDF", None)
if ff_path:
new_ff.to_file(ff_path)
if pp_path:
fields_to_pp_file(str(pp_path), new_ff.fields)
if nc_path:
temp_ff_path = None
# Need an Iris Cube from the FF content.
if ff_path:
# Use the existing file.
ff_cube = load_cube(ff_path)
else:
# Make a temporary file.
temp_ff_path = NamedTemporaryFile()
new_ff.to_file(temp_ff_path.name)
ff_cube = load_cube(temp_ff_path.name)
save_cube(ff_cube, nc_path, zlib=compress)
if temp_ff_path:
temp_ff_path.close()
FILE_EXTENSIONS = {"FF": "", "PP": ".pp", "NetCDF": ".nc"}
def create_um_files(
len_x: int,
len_y: int,
len_z: int,
len_t: int,
compress: bool,
file_types: list,
) -> dict:
"""
Generate FF-based FF / PP / NetCDF files with specified shape and compression.
Saved to a directory for all files that shape. A dictionary of the saved
paths is returned.
"""
# Self contained imports to avoid linting confusion with _create_um_files().
from . import BENCHMARK_DATA, REUSE_DATA, run_function_elsewhere
save_name_sections = ["UM", len_x, len_y, len_z, len_t]
save_name = "_".join(str(section) for section in save_name_sections)
save_dir = BENCHMARK_DATA / save_name
if not save_dir.is_dir():
save_dir.mkdir(parents=True)
save_paths = {}
files_exist = True
for file_type in file_types:
file_ext = FILE_EXTENSIONS[file_type]
save_path = (save_dir / f"{compress}").with_suffix(file_ext)
files_exist = files_exist and save_path.is_file()
save_paths[file_type] = str(save_path)
if not REUSE_DATA or not files_exist:
_ = run_function_elsewhere(
_create_um_files, len_x, len_y, len_z, len_t, compress, save_paths
)
return save_paths