Skip to content
This repository was archived by the owner on Sep 11, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions nowcasting_dataset/data_sources/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ and the geospatial shape of each GSP region).
# data_source.py

General class used for making a data source. It has the following functions
- get_batch: gets a whole batch of data for that data source
- get_batch: gets a whole batch of data for that data source. The list of 'xr.Dataset' examples are converted to
one xr.Dataset by changing the coordinates to indexes, and then joining the examples along an extra dimension.
- datetime_index: gets the all available datatimes of the source
- get_example: gets one "example" (a single consecutive sequence). Each batch is made up of multiple examples.
Each example is a 'xr.Dataset'
- get_locations_for_batch: Samples the geospatial x,y location for each example in a batch. This is useful because,
typically, we want a single DataSource to dictate the geospatial locations of the examples (for example,
we want each example to be centered on the centroid of the grid supply point region). All the other
Expand All @@ -27,20 +29,19 @@ General class used for making a data source. It has the following functions
# datasource_output.py

General pydantic model of output of the data source. Contains the following methods
- to_numpy: changes all data points to numpy objects
- split: converts a batch to a list of items
- join: joins list of items to one
- to_xr_dataset: changes data items to xarrays and returns a dataset
- from_xr_dataset: loads from an xarray dataset
- select_time_period: subselect data, depending on a time period
- save_netcdf: save to netcdf file
- check_nan_and_inf: check if any values are nans or infinite
- check_dataset_greater_than_or_equal_to: check values are >= a value
- check_dataset_less_than_or_equal_to: check values are <= a value
- check_dataset_not_equal: check values are !>= a value
- check_data_var_dim: check the dimensions of a data variable

# <X> Data Source folder

Roughly each of the data source folders follows this pattern
- A class which defines how to load the data source, how to select for batches etc. This inherits from 'data_source.DataSource',
- A class which contains the output model of the data source, built from an xarray Dataset. This is the information used in the batches.
- A class which contains the output model of the data source, built from a xarray Dataset. This is the information used in the batches.
This inherits from 'datasource_output.DataSourceOutput'.
- A second class (pydantic) which moves the xarray Dataset to tensor fields. This will be used for training in ML models


# fake
Expand Down
17 changes: 12 additions & 5 deletions nowcasting_dataset/data_sources/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
from nowcasting_dataset import square
from nowcasting_dataset.consts import SPATIAL_AND_TEMPORAL_LOCATIONS_COLUMN_NAMES
from nowcasting_dataset.data_sources.datasource_output import DataSourceOutput
from nowcasting_dataset.dataset.xr_utils import join_list_dataset_to_batch_dataset, make_dim_index
from nowcasting_dataset.dataset.xr_utils import (
convert_coordinates_to_indexes_for_list_datasets,
join_list_dataset_to_batch_dataset,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -257,10 +260,10 @@ def get_batch(
examples = [future_example.result() for future_example in future_examples]

# Get the DataSource class, this could be one of the data sources like Sun
cls = examples[0].__class__
cls = self.get_data_model_for_batch()

# Set the coords to be indices before joining into a batch
examples = [make_dim_index(example) for example in examples]
examples = convert_coordinates_to_indexes_for_list_datasets(examples)

# join the examples together, and cast them to the cls, so that validation can occur
return cls(join_list_dataset_to_batch_dataset(examples))
Expand All @@ -271,6 +274,10 @@ def datetime_index(self) -> pd.DatetimeIndex:
# of a list of datetimes (e.g. for DatetimeDataSource).
raise NotImplementedError()

def get_data_model_for_batch(self):
"""Get the model that is used in the batch"""
raise NotImplementedError()

def get_contiguous_time_periods(self) -> pd.DataFrame:
"""Get all the time periods for which this DataSource has contiguous data.

Expand Down Expand Up @@ -378,7 +385,7 @@ def data(self):

def get_example(
self, t0_dt: pd.Timestamp, x_meters_center: Number, y_meters_center: Number
) -> DataSourceOutput:
) -> xr.Dataset:
"""
Get Example data

Expand Down Expand Up @@ -419,7 +426,7 @@ def get_example(
f"actual shape {selected_data.shape}"
)

return selected_data.load()
return selected_data.load().to_dataset(name="data")

def geospatial_border(self) -> List[Tuple[Number, Number]]:
"""
Expand Down
6 changes: 4 additions & 2 deletions nowcasting_dataset/data_sources/datasource_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,15 @@ def check_nan_and_inf(self, data: xr.Dataset, variable_name: str = None):

if isnan(data).any():
message = f"Some {self.__class__.__name__} data values are NaNs"
message += f" ({variable_name})" if variable_name is not None else None
if variable_name is not None:
message += f" ({variable_name})"
logger.error(message)
raise Exception(message)

if isinf(data).any():
message = f"Some {self.__class__.__name__} data values are Infinite"
message += f" ({variable_name})" if variable_name is not None else None
if variable_name is not None:
message += f" ({variable_name})"
logger.error(message)
raise Exception(message)

Expand Down
65 changes: 43 additions & 22 deletions nowcasting_dataset/data_sources/fake.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

Wanted to keep this out of the testing frame works, as other repos, might want to use this
"""
from typing import List

import numpy as np
import pandas as pd
import xarray as xr
Expand All @@ -15,8 +17,8 @@
from nowcasting_dataset.data_sources.sun.sun_model import Sun
from nowcasting_dataset.data_sources.topographic.topographic_model import Topographic
from nowcasting_dataset.dataset.xr_utils import (
convert_data_array_to_dataset,
join_list_data_array_to_batch_dataset,
convert_coordinates_to_indexes,
convert_coordinates_to_indexes_for_list_datasets,
join_list_dataset_to_batch_dataset,
)

Expand All @@ -28,7 +30,7 @@ def gsp_fake(
):
"""Create fake data"""
# make batch of arrays
xr_arrays = [
xr_datasets = [
create_gsp_pv_dataset(
seq_length=seq_length_30,
freq="30T",
Expand All @@ -37,8 +39,11 @@ def gsp_fake(
for _ in range(batch_size)
]

# change dimensions to dimension indexes
xr_datasets = convert_coordinates_to_indexes_for_list_datasets(xr_datasets)

# make dataset
xr_dataset = join_list_dataset_to_batch_dataset(xr_arrays)
xr_dataset = join_list_dataset_to_batch_dataset(xr_datasets)

return GSP(xr_dataset)

Expand All @@ -47,6 +52,9 @@ def metadata_fake(batch_size):
"""Make a xr dataset"""
xr_arrays = [create_metadata_dataset() for _ in range(batch_size)]

# change to indexes
xr_arrays = [convert_coordinates_to_indexes(xr_array) for xr_array in xr_arrays]

# make dataset
xr_dataset = join_list_dataset_to_batch_dataset(xr_arrays)

Expand Down Expand Up @@ -81,7 +89,7 @@ def nwp_fake(
def pv_fake(batch_size, seq_length_5, n_pv_systems_per_batch):
"""Create fake data"""
# make batch of arrays
xr_arrays = [
xr_datasets = [
create_gsp_pv_dataset(
seq_length=seq_length_5,
freq="5T",
Expand All @@ -90,8 +98,11 @@ def pv_fake(batch_size, seq_length_5, n_pv_systems_per_batch):
for _ in range(batch_size)
]

# change dimensions to dimension indexes
xr_datasets = convert_coordinates_to_indexes_for_list_datasets(xr_datasets)

# make dataset
xr_dataset = join_list_dataset_to_batch_dataset(xr_arrays)
xr_dataset = join_list_dataset_to_batch_dataset(xr_datasets)

return PV(xr_dataset)

Expand Down Expand Up @@ -150,6 +161,7 @@ def topographic_fake(batch_size, image_size_pixels):
x=np.sort(np.random.randn(image_size_pixels)),
y=np.sort(np.random.randn(image_size_pixels))[::-1].copy(),
),
name="data",
)
for _ in range(batch_size)
]
Expand Down Expand Up @@ -184,6 +196,7 @@ def create_image_array(
)
),
coords=coords,
name="data",
) # Fake data for testing!
return image_data_array

Expand All @@ -197,7 +210,7 @@ def create_gsp_pv_dataset(
"""Create gsp or pv fake dataset"""
ALL_COORDS = {
"time": pd.date_range("2021-01-01", freq=freq, periods=seq_length),
"id": np.random.randint(low=0, high=1000, size=number_of_systems),
"id": np.random.choice(range(1000), number_of_systems, replace=False),
}
coords = [(dim, ALL_COORDS[dim]) for dim in dims]
data_array = xr.DataArray(
Expand All @@ -208,22 +221,20 @@ def create_gsp_pv_dataset(
coords=coords,
) # Fake data for testing!

data = convert_data_array_to_dataset(data_array)
data = data_array.to_dataset(name="data")

x_coords = xr.DataArray(
data=np.sort(np.random.randn(number_of_systems)),
dims=["id_index"],
coords=dict(
id_index=range(number_of_systems),
data=np.sort(
np.random.choice(range(2 * number_of_systems), number_of_systems, replace=False)
),
dims=["id"],
)

y_coords = xr.DataArray(
data=np.sort(np.random.randn(number_of_systems)),
dims=["id_index"],
coords=dict(
id_index=range(number_of_systems),
data=np.sort(
np.random.choice(range(2 * number_of_systems), number_of_systems, replace=False)
),
dims=["id"],
)

data["x_coords"] = x_coords
Expand Down Expand Up @@ -265,13 +276,14 @@ def create_sun_dataset(
coords=coords,
) # Fake data for testing!

data = convert_data_array_to_dataset(data_array)
sun = data.rename({"data": "elevation"})
sun["azimuth"] = data.data
sun = data_array.to_dataset(name="elevation")
sun["azimuth"] = sun.elevation

sun.__setitem__("azimuth", sun.azimuth.clip(min=0, max=360))
sun.__setitem__("elevation", sun.elevation.clip(min=-90, max=90))

sun = convert_coordinates_to_indexes(sun)

return sun


Expand All @@ -282,11 +294,11 @@ def create_metadata_dataset() -> xr.Dataset:
"data": pd.date_range("2021-01-01", freq="5T", periods=1) + pd.Timedelta("30T"),
}

data = convert_data_array_to_dataset(xr.DataArray.from_dict(d))
data = (xr.DataArray.from_dict(d)).to_dataset(name="data")

for v in ["x_meters_center", "y_meters_center", "object_at_center_label"]:
d: dict = {"dims": ("t0_dt",), "data": [np.random.randint(0, 1000)]}
d: xr.Dataset = convert_data_array_to_dataset(xr.DataArray.from_dict(d)).rename({"data": v})
d: xr.Dataset = (xr.DataArray.from_dict(d)).to_dataset(name=v)
data[v] = getattr(d, v)

return data
Expand All @@ -307,11 +319,20 @@ def create_datetime_dataset(
coords=coords,
) # Fake data

data = convert_data_array_to_dataset(data_array)
data = data_array.to_dataset()

ds = data.rename({"data": "day_of_year_cos"})
ds["day_of_year_sin"] = data.rename({"data": "day_of_year_sin"}).day_of_year_sin
ds["hour_of_day_cos"] = data.rename({"data": "hour_of_day_cos"}).hour_of_day_cos
ds["hour_of_day_sin"] = data.rename({"data": "hour_of_day_sin"}).hour_of_day_sin

return data


def join_list_data_array_to_batch_dataset(data_arrays: List[xr.DataArray]) -> xr.Dataset:
"""Join a list of xr.DataArrays into an xr.Dataset by concatenating on the example dim."""
datasets = [
convert_coordinates_to_indexes(data_arrays[i].to_dataset()) for i in range(len(data_arrays))
]

return join_list_dataset_to_batch_dataset(datasets)
31 changes: 12 additions & 19 deletions nowcasting_dataset/data_sources/gsp/gsp_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from nowcasting_dataset.data_sources.data_source import ImageDataSource
from nowcasting_dataset.data_sources.gsp.eso import get_gsp_metadata_from_eso
from nowcasting_dataset.data_sources.gsp.gsp_model import GSP
from nowcasting_dataset.dataset.xr_utils import convert_data_array_to_dataset
from nowcasting_dataset.geospatial import lat_lon_to_osgb
from nowcasting_dataset.square import get_bounding_box_mask
from nowcasting_dataset.utils import scale_to_0_to_1
Expand Down Expand Up @@ -73,6 +72,10 @@ def sample_period_minutes(self) -> int:
"""Override the default sample minutes"""
return 30

def get_data_model_for_batch(self):
"""Get the model that is used in the batch"""
return GSP

def load(self):
"""
Load the meta data and load the GSP power data
Expand Down Expand Up @@ -153,7 +156,7 @@ def get_locations(self, t0_datetimes: pd.DatetimeIndex) -> Tuple[List[Number], L

def get_example(
self, t0_dt: pd.Timestamp, x_meters_center: Number, y_meters_center: Number
) -> GSP:
) -> xr.Dataset:
"""
Get data example from one time point (t0_dt) and for x and y coords.

Expand Down Expand Up @@ -201,41 +204,31 @@ def get_example(
da = xr.DataArray(
data=selected_gsp_power.values,
dims=["time", "id"],
coords=dict(
id=all_gsp_ids.values.astype(int),
time=selected_gsp_power.index.values,
),
)

# convert to dataset
gsp = convert_data_array_to_dataset(da)
gsp = da.to_dataset(name="data")

# add gsp x coords
gsp_x_coords = xr.DataArray(
data=gsp_x_coords.values,
dims=["id_index"],
coords=dict(
id_index=range(len(all_gsp_ids.values)),
),
dims=["id"],
)

gsp_y_coords = xr.DataArray(
data=gsp_y_coords.values,
dims=["id_index"],
coords=dict(
id_index=range(len(all_gsp_ids.values)),
),
dims=["id"],
)
gsp["x_coords"] = gsp_x_coords
gsp["y_coords"] = gsp_y_coords

# pad out so that there are always 32 gsp, fill with 0
pad_n = self.n_gsp_per_example - len(gsp.id_index)
gsp = gsp.pad(id_index=(0, pad_n), data=((0, 0), (0, pad_n)), constant_values=0)
pad_n = self.n_gsp_per_example - len(gsp.id)
gsp = gsp.pad(id=(0, pad_n), data=((0, 0), (0, pad_n)), constant_values=0)

gsp.__setitem__("id_index", range(self.n_gsp_per_example))
gsp.__setitem__("id", range(self.n_gsp_per_example))

return GSP(gsp)
return gsp

def _get_central_gsp_id(
self,
Expand Down
Loading