-
Notifications
You must be signed in to change notification settings - Fork 285
/
tropomi_l2.py
246 lines (207 loc) · 9.36 KB
/
tropomi_l2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2019 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy. If not, see <http://www.gnu.org/licenses/>.
"""Interface to TROPOMI L2 Reader.
The TROPOspheric Monitoring Instrument (TROPOMI) is the satellite instrument
on board the Copernicus Sentinel-5 Precursor satellite. It measures key
atmospheric trace gasses, such as ozone, nitrogen oxides, sulfur dioxide,
carbon monoxide, methane, and formaldehyde.
Level 2 data products are available via the Copernicus Open Access Hub.
For more information visit the following URL:
http://www.tropomi.eu/data-products/level-2-products
"""
import logging
import dask.array as da
import numpy as np
import xarray as xr
from satpy import CHUNK_SIZE
from satpy.readers.netcdf_utils import NetCDF4FileHandler, netCDF4
logger = logging.getLogger(__name__)
class TROPOMIL2FileHandler(NetCDF4FileHandler):
"""File handler for TROPOMI L2 netCDF files."""
@property
def start_time(self):
"""Get start time."""
return self.filename_info['start_time']
@property
def end_time(self):
"""Get end time."""
return self.filename_info.get('end_time', self.start_time)
@property
def platform_shortname(self):
"""Get platform shortname."""
return self.filename_info['platform_shortname']
@property
def sensor(self):
"""Get sensor."""
res = self['/attr/sensor']
if isinstance(res, np.ndarray):
return str(res.astype(str)).lower()
return res.lower()
@property
def sensor_names(self):
"""Get sensor set."""
return {self.sensor}
def available_datasets(self, configured_datasets=None):
"""Automatically determine datasets provided by this file."""
logger.debug("Available_datasets begin...")
# Determine shape of the geolocation data (lat/lon)
lat_shape = None
for var_name, _val in self.file_content.items():
# Could probably avoid this hardcoding, will think on it
if (var_name == 'PRODUCT/latitude'):
lat_shape = self[var_name + "/shape"]
break
handled_variables = set()
# update previously configured datasets
logger.debug("Starting previously configured variables loop...")
# if bounds exists, we can assemble them later
bounds_exist = 'latitude_bounds' in self and 'longitude_bounds' in self
for is_avail, ds_info in (configured_datasets or []):
# some other file handler knows how to load this
if is_avail is not None:
yield is_avail, ds_info
var_name = ds_info.get('file_key', ds_info['name'])
# logger.debug("Evaluating previously configured variable: %s", var_name)
matches = self.file_type_matches(ds_info['file_type'])
# we can confidently say that we can provide this dataset and can
# provide more info
assembled = var_name in ['assembled_lat_bounds', 'assembled_lon_bounds']
if (matches and var_name in self) or (assembled and bounds_exist):
logger.debug("Handling previously configured variable: %s", var_name)
if not assembled:
# Because assembled variables and bounds use the same file_key,
# we need to omit file_key once.
handled_variables.add(var_name)
new_info = ds_info.copy() # don't mess up the above yielded
yield True, new_info
elif is_avail is None:
# if we didn't know how to handle this dataset and no one else did
# then we should keep it going down the chain
yield is_avail, ds_info
yield from self._iterate_over_dataset_contents(handled_variables, lat_shape)
def _iterate_over_dataset_contents(self, handled_variables, shape):
"""Iterate over dataset contents.
This is where we dynamically add new datasets
We will sift through all groups and variables, looking for data matching
the geolocation bounds
"""
for var_name, val in self.file_content.items():
# Only evaluate variables
if isinstance(val, netCDF4.Variable):
logger.debug("Evaluating new variable: %s", var_name)
var_shape = self[var_name + "/shape"]
logger.debug("Dims:{}".format(var_shape))
if shape == var_shape[:len(shape)]:
logger.debug("Found valid additional dataset: %s", var_name)
# Skip anything we have already configured
if var_name in handled_variables:
logger.debug("Already handled, skipping: %s", var_name)
continue
handled_variables.add(var_name)
last_index_separator = var_name.rindex('/')
last_index_separator = last_index_separator + 1
var_name_no_path = var_name[last_index_separator:]
logger.debug("Using short name of: %s", var_name_no_path)
# Create new ds_info object
if var_name_no_path in ['latitude_bounds', 'longitude_bounds']:
coordinates = []
else:
coordinates = ['longitude', 'latitude']
new_info = {
'name': var_name_no_path,
'file_key': var_name,
'coordinates': coordinates,
'file_type': self.filetype_info['file_type'],
}
yield True, new_info
def get_metadata(self, data, ds_info):
"""Get metadata."""
metadata = {}
metadata.update(data.attrs)
metadata.update(ds_info)
metadata.update({
'platform_shortname': self.platform_shortname,
'sensor': self.sensor,
'start_time': self.start_time,
'end_time': self.end_time,
})
return metadata
def _rename_dims(self, data_arr):
"""Normalize dimension names with the rest of Satpy."""
dims_dict = {}
if 'ground_pixel' in data_arr.dims:
dims_dict['ground_pixel'] = 'x'
if 'scanline' in data_arr.dims:
dims_dict['scanline'] = 'y'
return data_arr.rename(dims_dict)
def prepare_geo(self, bounds_data):
"""Prepare lat/lon bounds for pcolormesh.
lat/lon bounds are ordered in the following way::
3----2
| |
0----1
Extend longitudes and latitudes with one element to support
"pcolormesh"::
(X[i+1, j], Y[i+1, j]) (X[i+1, j+1], Y[i+1, j+1])
+--------+
| C[i,j] |
+--------+
(X[i, j], Y[i, j]) (X[i, j+1], Y[i, j+1])
"""
# Create the left array
left = np.vstack([bounds_data[:, :, 0], bounds_data[-1:, :, 3]])
# Create the right array
right = np.vstack([bounds_data[:, -1:, 1], bounds_data[-1:, -1:, 2]])
# Stack horizontally
dest = np.hstack([left, right])
# Convert to DataArray
dask_dest = da.from_array(dest, chunks=CHUNK_SIZE)
dest = xr.DataArray(dask_dest,
dims=('y_bounds', 'x_bounds'),
attrs=bounds_data.attrs
)
return dest
def get_dataset(self, ds_id, ds_info):
"""Get dataset."""
logger.debug("Getting data for: %s", ds_id['name'])
file_key = ds_info.get('file_key', ds_id['name'])
data = self[file_key]
data.attrs = self.get_metadata(data, ds_info)
fill_value = data.attrs.get('_FillValue', np.float32(np.nan))
data = data.squeeze()
# preserve integer data types if possible
if np.issubdtype(data.dtype, np.integer):
new_fill = fill_value
else:
new_fill = np.float32(np.nan)
data.attrs.pop('_FillValue', None)
good_mask = data != fill_value
scale_factor = data.attrs.get('scale_factor')
add_offset = data.attrs.get('add_offset')
if scale_factor is not None:
data = data * scale_factor + add_offset
data = data.where(good_mask, new_fill)
data = self._rename_dims(data)
# drop coords whose units are not meters
drop_list = ['y', 'x', 'layer', 'vertices']
coords_exist = [coord for coord in drop_list if coord in data.coords]
if coords_exist:
data = data.drop_vars(coords_exist)
if ds_id['name'] in ['assembled_lat_bounds', 'assembled_lon_bounds']:
data = self.prepare_geo(data)
return data