generated from fastai/nbdev_template
-
-
Notifications
You must be signed in to change notification settings - Fork 29
/
app.py
251 lines (233 loc) · 8.63 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
""" Application that pulls data from the EUMETSAT API and saves to a zarr file"""
import glob
import os
import random
import tempfile
from typing import Optional
import click
import pandas as pd
import structlog
import satip
from satip import utils
from satip.eumetsat import EUMETSATDownloadManager
log = structlog.stdlib.get_logger()
@click.command()
@click.option(
"--api-key",
default=None,
envvar="API_KEY",
help="The API key for EUMETSAT Data Center",
type=click.STRING,
)
@click.option(
"--api-secret",
default=None,
envvar="API_SECRET",
help="The API secret for EUMETSAT Data Center",
type=click.STRING,
)
@click.option(
"--save-dir",
default="./",
envvar="SAVE_DIR",
help="Where to save the zarr files",
type=click.STRING,
)
@click.option(
"--save-dir-native",
default="./raw",
envvar="SAVE_DIR_NATIVE",
help="Where to save the zarr files",
type=click.STRING,
)
@click.option(
"--history",
default="60 minutes",
envvar="HISTORY",
help="How much history to save",
type=click.STRING,
)
@click.option(
"--db-url",
default=None,
envvar="DB_URL",
help="Database to save when this has run",
type=click.STRING,
)
@click.option(
"--use-rescaler",
default=False,
envvar="USE_RESCALER",
help="Whether to rescale data to between 0 and 1 or not",
type=click.BOOL,
)
@click.option(
"--start-time",
envvar="START_TIME",
default=pd.Timestamp.utcnow().isoformat(timespec="minutes").split("+")[0],
help="Start time, defaults to the current UTC time",
type=click.STRING,
)
@click.option(
"--cleanup",
envvar="CLEANUP",
default=False,
help="Run Data Tailor Cleanup and exit",
type=click.BOOL,
)
@click.option(
"--use-backup",
envvar="USE_BACKUP",
default=False,
help="Option not to use the RSS imaginary. If True, use the 15 mins data. ",
type=click.BOOL,
)
@click.option(
"--maximum-n-datasets",
envvar="MAXIMUM_N_DATASETS",
default=-1,
help="Set the maximum number of dataset to load, default gets them all",
type=click.INT,
)
def run(
api_key,
api_secret,
save_dir,
save_dir_native,
history,
db_url: Optional[str] = None,
use_rescaler: bool = False,
start_time: str = pd.Timestamp.utcnow().isoformat(timespec="minutes").split("+")[0],
cleanup: bool = False,
use_backup: bool = False,
maximum_n_datasets: int = -1,
):
"""Run main application
Args:
api_key: API Key for EUMETSAT
api_secret: Secret for EUMETSAT
save_dir: Save directory
save_dir_native: where the native files are saved
history: History time
db_url: URL of database
use_rescaler: Rescale data to between 0 and 1 or not
start_time: Start time in UTC ISO Format
cleanup: Cleanup Data Tailor
use_backup: use 15 min data, not RSS
maximum_n_datasets: Set the maximum number of dataset to load, default gets them all
"""
utils.setupLogging()
try:
if save_dir != "./":
log.info("Checking if save_dir directory exists")
if utils.check_path_is_exists_and_directory(save_dir):
log.info("save_dir directory exists, continuing execution")
# dont check if save_dir_native exists, as it is created by the download manager
# and only used if the data tailor service is used
log.info(
f'Running application and saving to "{save_dir}"',
version=satip.__version__,
memory=utils.get_memory(),
)
# 1. Get data from API, download native files
with tempfile.TemporaryDirectory() as tmpdir:
download_manager = EUMETSATDownloadManager(
user_key=api_key,
user_secret=api_secret,
data_dir=tmpdir,
native_file_dir=save_dir_native,
)
if cleanup:
log.debug("Running Data Tailor Cleanup", memory=utils.get_memory())
download_manager.cleanup_datatailor()
return
start_date = pd.Timestamp(start_time, tz="UTC") - pd.Timedelta(history)
log.info(
f"Fetching datasets for {start_date} - {start_time}", memory=utils.get_memory()
)
datasets = download_manager.identify_available_datasets(
start_date=start_date.strftime("%Y-%m-%d-%H:%M:%S"),
end_date=pd.Timestamp(start_time, tz="UTC").strftime("%Y-%m-%d-%H:%M:%S"),
)
# Check if any RSS imagery is available, if not, fall back to 15 minutely data
if (len(datasets) == 0) or use_backup:
log.warn(
f"No RSS Imagery available or using backup ({use_backup=}), "
f"falling back to 15-minutely data",
memory=utils.get_memory(),
)
datasets = download_manager.identify_available_datasets(
start_date=start_date.strftime("%Y-%m-%d-%H:%M:%S"),
end_date=pd.Timestamp(start_time, tz="UTC").strftime("%Y-%m-%d-%H:%M:%S"),
product_id="EO:EUM:DAT:MSG:HRSEVIRI",
)
use_backup = True
# Filter out ones that already exist
# if both final files don't exist, then we should make sure we run the whole process
datasets = utils.filter_dataset_ids_on_current_files(datasets, save_dir)
log.info(
f"Files to download after filtering: {len(datasets)}", memory=utils.get_memory()
)
if len(datasets) == 0:
log.info("No files to download, exiting", memory=utils.get_memory())
updated_data = False
else:
if maximum_n_datasets != -1:
log.debug(
f"Ony going to get at most {maximum_n_datasets} datasets",
memory=utils.get_memory(),
)
datasets = datasets[0:maximum_n_datasets]
random.shuffle(datasets) # Shuffle so subsequent runs might download different data
updated_data = True
if use_backup:
# Check before downloading each tailored dataset, as it can take awhile
for dset in datasets:
dset = utils.filter_dataset_ids_on_current_files([dset], save_dir)
if len(dset) > 0:
download_manager.download_tailored_datasets(
dset,
product_id="EO:EUM:DAT:MSG:HRSEVIRI",
)
else:
# Check before downloading each tailored dataset, as it can take awhile
for dset in datasets:
dset = utils.filter_dataset_ids_on_current_files([dset], save_dir)
if len(dset) > 0:
download_manager.download_datasets(
dset,
product_id="EO:EUM:DAT:MSG:MSG15-RSS",
)
# 2. Load nat files to one Xarray Dataset
native_files = (
list(glob.glob(os.path.join(tmpdir, "*.nat")))
if not use_backup
else list(glob.glob(os.path.join(tmpdir, "*HRSEVIRI*")))
)
log.debug(
"Saving native files to Zarr: " + native_files.__str__(),
memory=utils.get_memory(),
)
# Save to S3
utils.save_native_to_zarr(
native_files,
save_dir=save_dir,
use_rescaler=use_rescaler,
using_backup=use_backup,
)
# Move around files into and out of latest
utils.move_older_files_to_different_location(
save_dir=save_dir, history_time=(start_date - pd.Timedelta("30 min"))
)
if not utils.check_both_final_files_exists(save_dir=save_dir, using_backup=use_backup):
updated_data = True
if updated_data:
# Collate files into single NetCDF file
utils.collate_files_into_latest(save_dir=save_dir, using_backup=use_backup)
log.debug("Collated files", memory=utils.get_memory())
log.info("Finished Running application", memory=utils.get_memory())
except Exception as e:
log.error(f"Error caught during run: {e}", exc_info=True)
raise e
if __name__ == "__main__":
run()