-
Notifications
You must be signed in to change notification settings - Fork 11
/
builder.py
305 lines (264 loc) · 11 KB
/
builder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
import fnmatch
import os.path
import re
import tempfile
import typing
import warnings
import fsspec
import joblib
import pandas as pd
import pydantic
import toolz
from intake_esm.cat import (
Aggregation,
AggregationControl,
Assets,
Attribute,
DataFormat,
ESMCatalogModel,
)
INVALID_ASSET = 'INVALID_ASSET'
TRACEBACK = 'TRACEBACK'
def glob_to_regex(*, include_patterns, exclude_patterns):
include_regex = r'|'.join([fnmatch.translate(x) for x in include_patterns])
exclude_regex = r'|'.join([fnmatch.translate(x) for x in exclude_patterns]) or r'$.'
return include_regex, exclude_regex
class RootDirectory(pydantic.BaseModel):
path: str
depth: int = 0
storage_options: typing.Dict[typing.Any, typing.Any] = pydantic.Field(default_factory=dict)
exclude_regex: str = pydantic.Field(default_factory=str)
include_regex: str = pydantic.Field(default_factory=str)
def __hash__(self):
return hash(f'{self.path}{self.raw_path}')
@property
def mapper(self):
return fsspec.get_mapper(self.path, **self.storage_options)
@property
def protocol(self):
protocol = self.mapper.fs.protocol
if isinstance(protocol, (list, tuple)):
protocol = protocol[0]
return protocol
@property
def raw_path(self):
return self.mapper.fs._strip_protocol(self.path)
def walk(self):
all_assets = []
for root, dirs, files in self.mapper.fs.walk(self.raw_path, maxdepth=self.depth + 1):
# exclude dirs
dirs[:] = [os.path.join(root, directory) for directory in dirs]
dirs[:] = [
directory for directory in dirs if not re.match(self.exclude_regex, directory)
]
if files:
# exclude/include assets
if self.protocol != 'file':
files = [f'{self.protocol}://{os.path.join(root, file)}' for file in files]
else:
files = [os.path.join(root, file) for file in files]
files = [file for file in files if not re.match(self.exclude_regex, file)]
files = [file for file in files if re.match(self.include_regex, file)]
all_assets.extend(files)
# Look for zarr assets. This works for zarr stores created with consolidated metadata
# print(all_assets)
for directory in dirs:
if self.mapper.fs.exists(f'{directory}/.zmetadata'):
path = (
f'{self.protocol}://{directory}' if self.protocol != 'file' else directory
)
all_assets.append(path)
return all_assets
@pydantic.dataclasses.dataclass
class Builder:
"""Generates a catalog from a list of netCDF files or zarr stores
Parameters
----------
paths : list of str
List of paths to crawl for assets/files.
storage_options : dict, optional
Parameters passed to the backend file-system such as Google Cloud Storage,
Amazon Web Service S3
depth : int, optional
Maximum depth to crawl for assets. Default is 0.
exclude_patterns : list of str, optional
List of glob patterns to exclude from crawling.
include_patterns : list of str, optional
List of glob patterns to include from crawling.
joblib_parallel_kwargs : dict, optional
Parameters passed to joblib.Parallel. Default is {}.
"""
paths: typing.List[str]
storage_options: typing.Dict[typing.Any, typing.Any] = None
depth: int = 0
exclude_patterns: typing.List[str] = None
include_patterns: typing.List[str] = None
joblib_parallel_kwargs: typing.Dict[str, typing.Any] = None
def __post_init_post_parse__(self):
self.storage_options = self.storage_options or {}
self.joblib_parallel_kwargs = self.joblib_parallel_kwargs or {}
self.exclude_patterns = self.exclude_patterns or []
self.include_patterns = self.include_patterns or []
# transform glob patterns to regular expressions
self.include_regex, self.exclude_regex = glob_to_regex(
include_patterns=self.include_patterns, exclude_patterns=self.exclude_patterns
)
self._root_dirs = [
RootDirectory(
path=path,
storage_options=self.storage_options,
depth=self.depth,
exclude_regex=self.exclude_regex,
include_regex=self.include_regex,
)
for path in self.paths
]
self.assets = None
self.invalid_assets = pd.DataFrame()
self.entries = None
self.df = pd.DataFrame()
def get_assets(self):
assets = [directory.walk() for directory in self._root_dirs]
self.assets = sorted(toolz.unique(toolz.concat(assets)))
return self
@pydantic.validate_arguments
def parse(self, *, parsing_func: typing.Callable, parsing_func_kwargs: dict = None):
if not self.assets:
raise ValueError('asset list provided is None. Please run `.get_assets()` first')
parsing_func_kwargs = {} if parsing_func_kwargs is None else parsing_func_kwargs
entries = joblib.Parallel(**self.joblib_parallel_kwargs)(
joblib.delayed(parsing_func)(asset, **parsing_func_kwargs) for asset in self.assets
)
self.entries = entries
self.df = pd.DataFrame(entries)
return self
def clean_dataframe(self):
"""Clean the dataframe by excluding invalid assets and removing duplicate entries."""
if INVALID_ASSET in self.df.columns:
invalid_assets = self.df[self.df[INVALID_ASSET].notnull()][[INVALID_ASSET, TRACEBACK]]
df = self.df[self.df[INVALID_ASSET].isnull()].drop(columns=[INVALID_ASSET, TRACEBACK])
self.invalid_assets = invalid_assets
if not self.invalid_assets.empty:
warnings.warn(
f'Unable to parse {len(self.invalid_assets)} assets. A list of these assets can be found in `.invalid_assets` attribute.',
stacklevel=2,
)
self.df = df
return self
@pydantic.validate_arguments
def build(
self,
*,
parsing_func: typing.Callable,
parsing_func_kwargs: dict = None,
postprocess_func: typing.Callable = None,
postprocess_func_kwargs: dict = None,
):
"""Builds a catalog from a list of netCDF files or zarr stores.
Parameters
----------
parsing_func : callable
Function that parses the asset and returns a dictionary of metadata.
parsing_func_kwargs : dict, optional
Parameters passed to the parsing function. Default is {}.
postprocess_func : callable, optional
Function that post-processes the built dataframe and returns a pandas dataframe.
Default is None.
postprocess_func_kwargs : dict, optional
Parameters passed to the post-processing function. Default is {}.
Returns
-------
:py:class:`~ecgtools.Builder`
The builder object.
"""
self.get_assets().parse(
parsing_func=parsing_func, parsing_func_kwargs=parsing_func_kwargs
).clean_dataframe()
if postprocess_func:
postprocess_func_kwargs = postprocess_func_kwargs or {}
self.df = postprocess_func(self.df, **postprocess_func_kwargs)
return self
@pydantic.validate_arguments
def save(
self,
*,
name: str,
path_column_name: str,
variable_column_name: str,
data_format: DataFormat,
groupby_attrs: typing.List[str] = None,
aggregations: typing.List[Aggregation] = None,
esmcat_version: str = '0.0.1',
description: str = None,
directory: str = None,
catalog_type: str = 'file',
to_csv_kwargs: dict = None,
json_dump_kwargs: dict = None,
):
"""Persist catalog contents to files.
Parameters
----------
name: str
The name of the file to save the catalog to.
path_column_name : str
The name of the column containing the path to the asset.
Must be in the header of the CSV file.
variable_column_name : str
Name of the attribute column in csv file that contains the variable name.
data_format : str
The data format. Valid values are netcdf and zarr.
aggregations : List[dict]
List of aggregations to apply to query results, default None
esmcat_version : str
The ESM Catalog version the collection implements, default None
description : str
Detailed multi-line description to fully explain the collection,
default None
directory: str
The directory to save the catalog to. If None, use the current directory
catalog_type: str
The type of catalog to save. Whether to save the catalog table as a dictionary
in the JSON file or as a separate CSV file. Valid options are 'dict' and 'file'.
to_csv_kwargs : dict, optional
Additional keyword arguments passed through to the :py:meth:`~pandas.DataFrame.to_csv` method.
json_dump_kwargs : dict, optional
Additional keyword arguments passed through to the :py:func:`~json.dump` function.
Returns
-------
:py:class:`~ecgtools.Builder`
The builder object.
Notes
-----
See https://github.com/NCAR/esm-collection-spec/blob/master/collection-spec/collection-spec.md
for more
"""
for col in {variable_column_name, path_column_name}.union(set(groupby_attrs or [])):
assert col in self.df.columns, f'{col} must be a column in the dataframe.'
attributes = [Attribute(column_name=column, vocabulary='') for column in self.df.columns]
_aggregation_control = AggregationControl(
variable_column_name=variable_column_name,
groupby_attrs=groupby_attrs,
aggregations=aggregations,
)
cat = ESMCatalogModel(
esmcat_version=esmcat_version,
description=description,
attributes=attributes,
aggregation_control=_aggregation_control,
assets=Assets(column_name=path_column_name, format=data_format),
)
cat._df = self.df
cat.save(
name=name,
directory=directory,
catalog_type=catalog_type,
to_csv_kwargs=to_csv_kwargs,
json_dump_kwargs=json_dump_kwargs,
)
if not self.invalid_assets.empty:
invalid_assets_report_file = f'{tempfile.gettempdir()}/{name}_invalid_assets.csv'
warnings.warn(
f'Unable to parse {len(self.invalid_assets)} assets/files. A list of these assets can be found in {invalid_assets_report_file}.',
stacklevel=2,
)
self.invalid_assets.to_csv(invalid_assets_report_file, index=False)