Skip to content
This repository was archived by the owner on Jun 2, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
3deb84d
extend ECMWF for shetlands
dfulu Jan 16, 2024
c282633
optional list of dropouts
dfulu Feb 16, 2024
f9690ac
Merge branch 'config_bug' into ecmwf_shetlands
dfulu Feb 16, 2024
485f512
Merge branch 'main' into ecmwf_shetlands
dfulu Mar 11, 2024
7c6da30
Merge branch 'main' into ecmwf_shetlands
dfulu Apr 10, 2024
70c788b
add messy version of datapipe
dfulu Apr 15, 2024
08350f2
standardize pick functions
dfulu Apr 15, 2024
8e7c95f
update tests using pick functions
dfulu Apr 15, 2024
d30f234
refdactor to remove code duplication
dfulu Apr 15, 2024
49991e8
Merge branch 'main' into pvnet_concurrent_datapipe
dfulu Apr 23, 2024
86eaf30
Merge branch 'pvnet_concurrent_datapipe' into ecmwf_shetlands
dfulu Apr 23, 2024
03ad0c2
Merge branch 'buffer_fix' into ecmwf_shetlands
dfulu May 29, 2024
207d84d
Merge branch 'main' into pvnet_concurrent_datapipe
dfulu May 30, 2024
d8b10e0
refactor gsp_all_pipeline
dfulu May 31, 2024
c9bf866
Merge branch 'pvnet_concurrent_datapipe' into ecmwf_shetlands
dfulu May 31, 2024
3fa9e40
sub in new pipeline
dfulu May 31, 2024
801b7a8
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 31, 2024
4556973
fix tests
dfulu May 31, 2024
3029889
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 31, 2024
9a0dde1
linting
dfulu May 31, 2024
79da202
Merge branch 'pvnet_concurrent_datapipe' of https://github.com/opencl…
dfulu May 31, 2024
ab5b649
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 31, 2024
eb12fc7
linting
dfulu May 31, 2024
c60a4e0
Merge branch 'pvnet_concurrent_datapipe' into ecmwf_shetlands
dfulu Jun 3, 2024
0180c5c
update tests
dfulu Jun 3, 2024
4eb77a3
remove hack
dfulu Jun 3, 2024
d0b31f7
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jun 3, 2024
427a951
refactor convert submodule + test fix
dfulu Jun 3, 2024
1b5cda1
fix test
dfulu Jun 3, 2024
8627885
Merge branch 'pvnet_concurrent_datapipe' of https://github.com/opencl…
dfulu Jun 3, 2024
91355b1
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jun 3, 2024
50a6f9e
linting
dfulu Jun 3, 2024
5d1551b
fix
dfulu Jun 3, 2024
4a9fcb7
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jun 3, 2024
fb1358c
update tests
dfulu Jun 3, 2024
3c5efd5
Merge branch 'pvnet_concurrent_datapipe' of https://github.com/opencl…
dfulu Jun 3, 2024
9afccec
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jun 3, 2024
7728511
docs
dfulu Jun 4, 2024
e79560b
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jun 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions ocf_datapipes/convert/numpy_batch/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,7 @@
"""Conversion from Xarray to NumpyBatch"""
from .gsp import convert_gsp_to_numpy_batch
from .nwp import convert_nwp_to_numpy_batch
from .pv import convert_pv_to_numpy_batch
from .satellite import convert_satellite_to_numpy_batch
from .sensor import convert_sensor_to_numpy_batch
from .wind import convert_wind_to_numpy_batch
51 changes: 26 additions & 25 deletions ocf_datapipes/convert/numpy_batch/gsp.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,31 @@
logger = logging.getLogger(__name__)


def convert_gsp_to_numpy_batch(xr_data):
"""Convert from Xarray to NumpyBatch"""

example: NumpyBatch = {
BatchKey.gsp: xr_data.values,
BatchKey.gsp_t0_idx: xr_data.attrs["t0_idx"],
BatchKey.gsp_id: xr_data.gsp_id.values,
BatchKey.gsp_nominal_capacity_mwp: xr_data.isel(time_utc=0)["nominal_capacity_mwp"].values,
BatchKey.gsp_effective_capacity_mwp: (
xr_data.isel(time_utc=0)["effective_capacity_mwp"].values
),
BatchKey.gsp_time_utc: datetime64_to_float(xr_data["time_utc"].values),
}

# Coordinates
for batch_key, dataset_key in (
(BatchKey.gsp_y_osgb, "y_osgb"),
(BatchKey.gsp_x_osgb, "x_osgb"),
):
if dataset_key in xr_data.coords.keys():
example[batch_key] = xr_data[dataset_key].values

return example


@functional_datapipe("convert_gsp_to_numpy_batch")
class ConvertGSPToNumpyBatchIterDataPipe(IterDataPipe):
"""Convert GSP Xarray to NumpyBatch"""
Expand All @@ -25,29 +50,5 @@ def __init__(self, source_datapipe: IterDataPipe):

def __iter__(self) -> NumpyBatch:
"""Convert from Xarray to NumpyBatch"""
logger.debug("Converting GSP to numpy to batch")
for xr_data in self.source_datapipe:
example: NumpyBatch = {
BatchKey.gsp: xr_data.values,
BatchKey.gsp_t0_idx: xr_data.attrs["t0_idx"],
BatchKey.gsp_id: xr_data.gsp_id.values,
BatchKey.gsp_nominal_capacity_mwp: xr_data.isel(time_utc=0)[
"nominal_capacity_mwp"
].values,
BatchKey.gsp_effective_capacity_mwp: (
xr_data.isel(time_utc=0)["effective_capacity_mwp"].values
),
BatchKey.gsp_time_utc: datetime64_to_float(xr_data["time_utc"].values),
}

# Coordinates
for batch_key, dataset_key in (
(BatchKey.gsp_y_osgb, "y_osgb"),
(BatchKey.gsp_x_osgb, "x_osgb"),
):
if dataset_key in xr_data.coords.keys():
values = xr_data[dataset_key].values
# Expand dims so AddFourierSpaceTime works!
example[batch_key] = values # np.expand_dims(values, axis=1)

yield example
yield convert_gsp_to_numpy_batch(xr_data)
49 changes: 26 additions & 23 deletions ocf_datapipes/convert/numpy_batch/nwp.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,31 @@
from ocf_datapipes.utils.utils import datetime64_to_float


def convert_nwp_to_numpy_batch(xr_data):
"""Convert from Xarray to NWPBatchKey"""

example: NWPNumpyBatch = {
NWPBatchKey.nwp: xr_data.values,
NWPBatchKey.nwp_t0_idx: xr_data.attrs["t0_idx"],
NWPBatchKey.nwp_channel_names: xr_data.channel.values,
NWPBatchKey.nwp_init_time_utc: datetime64_to_float(xr_data.init_time_utc.values),
NWPBatchKey.nwp_step: (xr_data.step.values / np.timedelta64(1, "h")).astype(np.int64),
}

if "target_time_utc" in xr_data.coords:
target_time = xr_data.target_time_utc.values
example[NWPBatchKey.nwp_target_time_utc] = datetime64_to_float(target_time)

for batch_key, dataset_key in (
(NWPBatchKey.nwp_y_osgb, "y_osgb"),
(NWPBatchKey.nwp_x_osgb, "x_osgb"),
):
if dataset_key in xr_data.coords:
example[batch_key] = xr_data[dataset_key].values

return example


@functional_datapipe("convert_nwp_to_numpy_batch")
class ConvertNWPToNumpyBatchIterDataPipe(IterDataPipe):
"""Convert NWP Xarray objects to NWPNumpyBatch"""
Expand All @@ -23,26 +48,4 @@ def __init__(self, source_datapipe: IterDataPipe):
def __iter__(self) -> NWPNumpyBatch:
"""Convert from Xarray to NWPBatchKey"""
for xr_data in self.source_datapipe:
example: NWPNumpyBatch = {
NWPBatchKey.nwp: xr_data.values,
NWPBatchKey.nwp_t0_idx: xr_data.attrs["t0_idx"],
}
if "target_time_utc" in xr_data.coords:
target_time = xr_data.target_time_utc.values
example[NWPBatchKey.nwp_target_time_utc] = datetime64_to_float(target_time)
example[NWPBatchKey.nwp_channel_names] = xr_data.channel.values
example[NWPBatchKey.nwp_step] = (xr_data.step.values / np.timedelta64(1, "h")).astype(
np.int64
)
example[NWPBatchKey.nwp_init_time_utc] = datetime64_to_float(
xr_data.init_time_utc.values
)

for batch_key, dataset_key in (
(NWPBatchKey.nwp_y_osgb, "y_osgb"),
(NWPBatchKey.nwp_x_osgb, "x_osgb"),
):
if dataset_key in xr_data.coords.keys():
example[batch_key] = xr_data[dataset_key].values

yield example
yield convert_nwp_to_numpy_batch(xr_data)
35 changes: 19 additions & 16 deletions ocf_datapipes/convert/numpy_batch/pv.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,23 @@
logger = logging.getLogger(__name__)


def convert_pv_to_numpy_batch(xr_data):
"""Convert PV Xarray to NumpyBatch"""
example: NumpyBatch = {
BatchKey.pv: xr_data.values,
BatchKey.pv_t0_idx: xr_data.attrs["t0_idx"],
BatchKey.pv_ml_id: xr_data["ml_id"].values,
BatchKey.pv_id: xr_data["pv_system_id"].values.astype(np.float32),
BatchKey.pv_observed_capacity_wp: (xr_data["observed_capacity_wp"].values),
BatchKey.pv_nominal_capacity_wp: (xr_data["nominal_capacity_wp"].values),
BatchKey.pv_time_utc: datetime64_to_float(xr_data["time_utc"].values),
BatchKey.pv_latitude: xr_data["latitude"].values,
BatchKey.pv_longitude: xr_data["longitude"].values,
}

return example


@functional_datapipe("convert_pv_to_numpy_batch")
class ConvertPVToNumpyBatchIterDataPipe(IterDataPipe):
"""Convert PV Xarray to NumpyBatch"""
Expand All @@ -25,20 +42,6 @@ def __init__(self, source_datapipe: IterDataPipe):
self.source_datapipe = source_datapipe

def __iter__(self) -> NumpyBatch:
"""Iterate and convert PV Xarray to NumpyBatch"""
"""Convert PV Xarray to NumpyBatch"""
for xr_data in self.source_datapipe:
logger.debug("Converting PV xarray to numpy example")

example: NumpyBatch = {
BatchKey.pv: xr_data.values,
BatchKey.pv_t0_idx: xr_data.attrs["t0_idx"],
BatchKey.pv_ml_id: xr_data["ml_id"].values,
BatchKey.pv_id: xr_data["pv_system_id"].values.astype(np.float32),
BatchKey.pv_observed_capacity_wp: (xr_data["observed_capacity_wp"].values),
BatchKey.pv_nominal_capacity_wp: (xr_data["nominal_capacity_wp"].values),
BatchKey.pv_time_utc: datetime64_to_float(xr_data["time_utc"].values),
BatchKey.pv_latitude: xr_data["latitude"].values,
BatchKey.pv_longitude: xr_data["longitude"].values,
}

yield example
yield convert_pv_to_numpy_batch(xr_data)
72 changes: 44 additions & 28 deletions ocf_datapipes/convert/numpy_batch/satellite.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,49 @@
from ocf_datapipes.utils.utils import datetime64_to_float


def _convert_satellite_to_numpy_batch(xr_data):
example: NumpyBatch = {
BatchKey.satellite_actual: xr_data.values,
BatchKey.satellite_t0_idx: xr_data.attrs["t0_idx"],
BatchKey.satellite_time_utc: datetime64_to_float(xr_data["time_utc"].values),
}

for batch_key, dataset_key in (
(BatchKey.satellite_y_geostationary, "y_geostationary"),
(BatchKey.satellite_x_geostationary, "x_geostationary"),
):
# HRVSatellite coords are already float32.
example[batch_key] = xr_data[dataset_key].values

return example


def _convert_hrvsatellite_to_numpy_batch(xr_data):
example: NumpyBatch = {
BatchKey.hrvsatellite_actual: xr_data.values,
BatchKey.hrvsatellite_t0_idx: xr_data.attrs["t0_idx"],
BatchKey.hrvsatellite_time_utc: datetime64_to_float(xr_data["time_utc"].values),
}

for batch_key, dataset_key in (
(BatchKey.hrvsatellite_y_geostationary, "y_geostationary"),
(BatchKey.hrvsatellite_x_geostationary, "x_geostationary"),
):
# Satellite coords are already float32.
example[batch_key] = xr_data[dataset_key].values

return example


def convert_satellite_to_numpy_batch(xr_data, is_hrv=False):
"""Converts Xarray Satellite to NumpyBatch object"""
if is_hrv:
example = _convert_hrvsatellite_to_numpy_batch(xr_data)
else:
example = _convert_satellite_to_numpy_batch(xr_data)
return example


@functional_datapipe("convert_satellite_to_numpy_batch")
class ConvertSatelliteToNumpyBatchIterDataPipe(IterDataPipe):
"""Converts Xarray Satellite to NumpyBatch object"""
Expand All @@ -24,31 +67,4 @@ def __init__(self, source_datapipe: IterDataPipe, is_hrv: bool = False):
def __iter__(self) -> NumpyBatch:
"""Convert each example to a NumpyBatch object"""
for xr_data in self.source_datapipe:
if self.is_hrv:
example: NumpyBatch = {
BatchKey.hrvsatellite_actual: xr_data.values,
BatchKey.hrvsatellite_t0_idx: xr_data.attrs["t0_idx"],
BatchKey.hrvsatellite_time_utc: datetime64_to_float(xr_data["time_utc"].values),
}

for batch_key, dataset_key in (
(BatchKey.hrvsatellite_y_geostationary, "y_geostationary"),
(BatchKey.hrvsatellite_x_geostationary, "x_geostationary"),
):
# HRVSatellite coords are already float32.
example[batch_key] = xr_data[dataset_key].values
else:
example: NumpyBatch = {
BatchKey.satellite_actual: xr_data.values,
BatchKey.satellite_t0_idx: xr_data.attrs["t0_idx"],
BatchKey.satellite_time_utc: datetime64_to_float(xr_data["time_utc"].values),
}

for batch_key, dataset_key in (
(BatchKey.satellite_y_geostationary, "y_geostationary"),
(BatchKey.satellite_x_geostationary, "x_geostationary"),
):
# HRVSatellite coords are already float32.
example[batch_key] = xr_data[dataset_key].values

yield example
yield convert_satellite_to_numpy_batch(xr_data, self.is_hrv)
37 changes: 20 additions & 17 deletions ocf_datapipes/convert/numpy_batch/sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,34 +10,37 @@
logger = logging.getLogger(__name__)


def convert_sensor_to_numpy_batch(xr_data):
"""Convert Sensor Xarray to NumpyBatch"""

example: NumpyBatch = {
BatchKey.sensor: xr_data.values,
BatchKey.sensor_t0_idx: xr_data.attrs["t0_idx"],
BatchKey.sensor_id: xr_data["station_id"].values.astype(np.float32),
# BatchKey.sensor_observed_capacity_wp: (xr_data["observed_capacity_wp"].values),
# BatchKey.sensor_nominal_capacity_wp: (xr_data["nominal_capacity_wp"].values),
BatchKey.sensor_time_utc: datetime64_to_float(xr_data["time_utc"].values),
BatchKey.sensor_latitude: xr_data["latitude"].values,
BatchKey.sensor_longitude: xr_data["longitude"].values,
}
return example


@functional_datapipe("convert_sensor_to_numpy_batch")
class ConvertSensorToNumpyBatchIterDataPipe(IterDataPipe):
"""Convert Sensor Xarray to NumpyBatch"""

def __init__(self, source_datapipe: IterDataPipe):
"""
Convert PV Xarray objects to NumpyBatch objects
Convert sensor Xarray objects to NumpyBatch objects

Args:
source_datapipe: Datapipe emitting PV Xarray objects
source_datapipe: Datapipe emitting sensor Xarray objects
"""
super().__init__()
self.source_datapipe = source_datapipe

def __iter__(self) -> NumpyBatch:
"""Iterate and convert PV Xarray to NumpyBatch"""
"""Iterate and convert sensor Xarray to NumpyBatch"""
for xr_data in self.source_datapipe:
logger.debug("Converting Sensor xarray to numpy example")

example: NumpyBatch = {
BatchKey.sensor: xr_data.values,
BatchKey.sensor_t0_idx: xr_data.attrs["t0_idx"],
BatchKey.sensor_id: xr_data["station_id"].values.astype(np.float32),
# BatchKey.sensor_observed_capacity_wp: (xr_data["observed_capacity_wp"].values),
# BatchKey.sensor_nominal_capacity_wp: (xr_data["nominal_capacity_wp"].values),
BatchKey.sensor_time_utc: datetime64_to_float(xr_data["time_utc"].values),
BatchKey.sensor_latitude: xr_data["latitude"].values,
BatchKey.sensor_longitude: xr_data["longitude"].values,
}

yield example
yield convert_sensor_to_numpy_batch(xr_data)
34 changes: 19 additions & 15 deletions ocf_datapipes/convert/numpy_batch/wind.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,24 @@
logger = logging.getLogger(__name__)


def convert_wind_to_numpy_batch(xr_data):
"""Convert Wind Xarray to NumpyBatch"""

example: NumpyBatch = {
BatchKey.wind: xr_data.values,
BatchKey.wind_t0_idx: xr_data.attrs["t0_idx"],
BatchKey.wind_ml_id: xr_data["ml_id"].values,
BatchKey.wind_id: xr_data["wind_system_id"].values.astype(np.float32),
BatchKey.wind_observed_capacity_mwp: (xr_data["observed_capacity_mwp"].values),
BatchKey.wind_nominal_capacity_mwp: (xr_data["nominal_capacity_mwp"].values),
BatchKey.wind_time_utc: datetime64_to_float(xr_data["time_utc"].values),
BatchKey.wind_latitude: xr_data["latitude"].values,
BatchKey.wind_longitude: xr_data["longitude"].values,
}

return example


@functional_datapipe("convert_wind_to_numpy_batch")
class ConvertWindToNumpyBatchIterDataPipe(IterDataPipe):
"""Convert Wind Xarray to NumpyBatch"""
Expand All @@ -27,18 +45,4 @@ def __init__(self, source_datapipe: IterDataPipe):
def __iter__(self) -> NumpyBatch:
"""Iterate and convert PV Xarray to NumpyBatch"""
for xr_data in self.source_datapipe:
logger.debug("Converting Wind xarray to numpy example")

example: NumpyBatch = {
BatchKey.wind: xr_data.values,
BatchKey.wind_t0_idx: xr_data.attrs["t0_idx"],
BatchKey.wind_ml_id: xr_data["ml_id"].values,
BatchKey.wind_id: xr_data["wind_system_id"].values.astype(np.float32),
BatchKey.wind_observed_capacity_mwp: (xr_data["observed_capacity_mwp"].values),
BatchKey.wind_nominal_capacity_mwp: (xr_data["nominal_capacity_mwp"].values),
BatchKey.wind_time_utc: datetime64_to_float(xr_data["time_utc"].values),
BatchKey.wind_latitude: xr_data["latitude"].values,
BatchKey.wind_longitude: xr_data["longitude"].values,
}

yield example
yield convert_wind_to_numpy_batch(xr_data)
Loading