diff --git a/nowcasting_dataset/config/gcp.yaml b/nowcasting_dataset/config/gcp.yaml index 92574e07..4f5ded46 100644 --- a/nowcasting_dataset/config/gcp.yaml +++ b/nowcasting_dataset/config/gcp.yaml @@ -6,7 +6,7 @@ input_data: satellite_zarr_path: gs://solar-pv-nowcasting-data/satellite/EUMETSAT/SEVIRI_RSS/OSGB36/all_zarr_int16_single_timestep.zarr solar_pv_data_filename: gs://solar-pv-nowcasting-data/PV/PVOutput.org/UK_PV_timeseries_batch.nc solar_pv_metadata_filename: gs://solar-pv-nowcasting-data/PV/PVOutput.org/UK_PV_metadata.csv - gsp_zarr_path: gs://solar-pv-nowcasting-data/PV/PVOutput.org/PV/GSP/v0/pv_gsp.zarr + gsp_zarr_path: gs://solar-pv-nowcasting-data/PV/PVOutput.org/PV/GSP/v1/pv_gsp.zarr topographic_filename: gs://solar-pv-nowcasting-data/Topographic/europe_dem_1km_osgb.tif output_data: filepath: gs://solar-pv-nowcasting-data/prepared_ML_training_data/v6/ diff --git a/nowcasting_dataset/config/on_premises.yaml b/nowcasting_dataset/config/on_premises.yaml index 984cc132..20cf9931 100644 --- a/nowcasting_dataset/config/on_premises.yaml +++ b/nowcasting_dataset/config/on_premises.yaml @@ -7,7 +7,7 @@ input_data: solar_pv_path: solar_pv_data_filename: /storage/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/raw/PV/PVOutput.org/UK_PV_timeseries_batch.nc solar_pv_metadata_filename: /storage/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/raw/PV/PVOutput.org/UK_PV_metadata.csv - gsp_zarr_path: /storage/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/raw/PV/GSP/v0/pv_gsp.zarr + gsp_zarr_path: /storage/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/raw/PV/GSP/v1/pv_gsp.zarr output_data: filepath: /storage/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/prepared_ML_training_data/v7/ process: diff --git a/nowcasting_dataset/data_sources/gsp/gsp_data_source.py b/nowcasting_dataset/data_sources/gsp/gsp_data_source.py index c62a01ae..6b89e6c0 100644 --- a/nowcasting_dataset/data_sources/gsp/gsp_data_source.py +++ b/nowcasting_dataset/data_sources/gsp/gsp_data_source.py @@ -58,6 +58,8 @@ class GSPDataSource(ImageDataSource): get_center: bool = True # the maximum number of gsp's to be loaded for data sample n_gsp_per_example: int = DEFAULT_N_GSP_PER_EXAMPLE + # scale from zero to one + do_scale_0_to_1: bool = False def __post_init__(self, image_size_pixels: int, meters_per_pixel: int): """ @@ -96,7 +98,8 @@ def load(self): ) # scale from 0 to 1 - self.gsp_power = scale_to_0_to_1(self.gsp_power) + if self.do_scale_0_to_1: + self.gsp_power = scale_to_0_to_1(self.gsp_power) logger.debug(f"There are {len(self.gsp_power.columns)} GSP") @@ -389,7 +392,16 @@ def load_solar_gsp_data( # Open data - it may be quicker to open byte file first, but decided just to keep it like this at the moment gsp_power = xr.open_dataset(filename, engine="zarr") gsp_power = gsp_power.sel(datetime_gmt=slice(start_dt, end_dt)) + + # only take generation data + gsp_power = gsp_power.generation_mw + + # make dataframe with index datetime_gmt and columns og gsp_id gsp_power_df = gsp_power.to_dataframe() + gsp_power_df.reset_index(inplace=True) + gsp_power_df = gsp_power_df.pivot( + index="datetime_gmt", columns="gsp_id", values="generation_mw" + ) # Save memory del gsp_power diff --git a/nowcasting_dataset/data_sources/gsp/pvlive.py b/nowcasting_dataset/data_sources/gsp/pvlive.py index 112d2f61..d68ba519 100644 --- a/nowcasting_dataset/data_sources/gsp/pvlive.py +++ b/nowcasting_dataset/data_sources/gsp/pvlive.py @@ -3,6 +3,10 @@ import logging import pandas as pd from pvlive_api import PVLive +from typing import Optional +import pytz +from tqdm import tqdm +from concurrent import futures from nowcasting_dataset.data_sources.gsp.eso import get_list_of_gsp_ids @@ -12,7 +16,7 @@ def load_pv_gsp_raw_data_from_pvlive( - start: datetime, end: datetime, number_of_gsp: int = None + start: datetime, end: datetime, number_of_gsp: int = None, normalize_data: bool = True ) -> pd.DataFrame: """ Load raw pv gsp data from pvlive. Note that each gsp is loaded separately. Also the data is loaded in 30 day chunks. @@ -21,6 +25,7 @@ def load_pv_gsp_raw_data_from_pvlive( start: the start date for gsp data to load end: the end date for gsp data to load number_of_gsp: The number of gsp to load. Note that on 2021-09-01 there were 338 to load. + normalize_data: Option to normalize the generation according to installed capacity Returns: Data frame of time series of gsp data. Shows PV data for each GSP from {start} to {end} @@ -38,46 +43,59 @@ def load_pv_gsp_raw_data_from_pvlive( gsp_data_df = [] logger.debug(f"Will be getting data for {len(gsp_ids)} gsp ids") # loop over gsp ids - for gsp_id in gsp_ids: - - one_gsp_data_df = [] - - # set the first chunk start and end times - start_chunk = first_start_chunk - end_chunk = first_end_chunk - - # loop over 30 days chunks (nice to see progress instead of waiting a long time for one command - this might - # not be the fastest) - while start_chunk <= end: - logger.debug(f"Getting data for gsp id {gsp_id} from {start_chunk} to {end_chunk}") - - one_gsp_data_df.append( - pvl.between( + # limit the total number of concurrent tasks to be 4, so that we don't hit the pvlive api too much + future_tasks = [] + with futures.ThreadPoolExecutor(max_workers=4) as executor: + for gsp_id in gsp_ids: + + # set the first chunk start and end times + start_chunk = first_start_chunk + end_chunk = first_end_chunk + + # loop over 30 days chunks (nice to see progress instead of waiting a long time for one command - this might + # not be the fastest) + while start_chunk <= end: + logger.debug(f"Getting data for gsp id {gsp_id} from {start_chunk} to {end_chunk}") + + task = executor.submit( + pvl.between, start=start_chunk, end=end_chunk, entity_type="gsp", entity_id=gsp_id, - extra_fields="", + extra_fields="installedcapacity_mwp", dataframe=True, ) - ) - # add 30 days to the chunk, to get the next chunk - start_chunk = start_chunk + CHUNK_DURATION - end_chunk = end_chunk + CHUNK_DURATION + future_tasks.append(task) + + # add 30 days to the chunk, to get the next chunk + start_chunk = start_chunk + CHUNK_DURATION + end_chunk = end_chunk + CHUNK_DURATION - if end_chunk > end: - end_chunk = end + if end_chunk > end: + end_chunk = end - # join together one gsp data, and sort - one_gsp_data_df = pd.concat(one_gsp_data_df) - one_gsp_data_df = one_gsp_data_df.sort_values(by=["gsp_id", "datetime_gmt"]) + logger.debug(f"Getting results") + # Collect results from each thread. + for task in tqdm(future_tasks): + one_chunk_one_gsp_gsp_data_df = task.result() - # append to longer list - gsp_data_df.append(one_gsp_data_df) + if normalize_data: + one_chunk_one_gsp_gsp_data_df["generation_mw"] = ( + one_chunk_one_gsp_gsp_data_df["generation_mw"] + / one_chunk_one_gsp_gsp_data_df["installedcapacity_mwp"] + ) + # append to longer list + gsp_data_df.append(one_chunk_one_gsp_gsp_data_df) + + # join together gsp data gsp_data_df = pd.concat(gsp_data_df) + # sort + gsp_data_df = gsp_data_df.sort_values(by=["gsp_id", "datetime_gmt"]) + # remove any extra data loaded gsp_data_df = gsp_data_df[gsp_data_df["datetime_gmt"] <= end] @@ -88,3 +106,49 @@ def load_pv_gsp_raw_data_from_pvlive( gsp_data_df["datetime_gmt"] = gsp_data_df["datetime_gmt"].dt.tz_localize(None) return gsp_data_df + + +def get_installed_capacity( + start: Optional[datetime] = datetime(2021, 1, 1, tzinfo=pytz.utc), + maximum_number_of_gsp: Optional[int] = None, +) -> pd.Series: + """ + Get the installed capacity of each gsp + + This can take ~30 seconds for getting the full list + + Args: + start: optional datetime when the installed cpapcity is collected + maximum_number_of_gsp: Truncate list of GSPs to be no larger than this number of GSPs. + Set to None to disable truncation. + + Returns: pd.Series of installed capacity indexed by gsp_id + + """ + logger.debug(f"Getting all installed capacity at {start}") + + # get a lit of gsp ids + gsp_ids = get_list_of_gsp_ids(maximum_number_of_gsp=maximum_number_of_gsp) + + # setup pv Live class, although here we are getting historic data + pvl = PVLive() + + # loop over gsp_id to get installed capacity + data = [] + for gsp_id in gsp_ids: + d = pvl.at_time( + start, + entity_type="gsp", + extra_fields="installedcapacity_mwp", + dataframe=True, + entity_id=gsp_id, + ) + data.append(d) + + # join data together + data_df = pd.concat(data) + + # set gsp_id as index + data_df.set_index("gsp_id", inplace=True) + + return data_df["installedcapacity_mwp"] diff --git a/nowcasting_dataset/dataset/split/__init__.py b/nowcasting_dataset/dataset/split/__init__.py new file mode 100644 index 00000000..c7eb89d1 --- /dev/null +++ b/nowcasting_dataset/dataset/split/__init__.py @@ -0,0 +1 @@ +""" split functions """ diff --git a/scripts/get_raw_pv_gsp_data.py b/scripts/get_raw_pv_gsp_data.py index 5406d053..32b56c28 100755 --- a/scripts/get_raw_pv_gsp_data.py +++ b/scripts/get_raw_pv_gsp_data.py @@ -11,6 +11,7 @@ import yaml import os import numcodecs +import xarray as xr from nowcasting_dataset.data_sources.gsp.pvlive import load_pv_gsp_raw_data_from_pvlive from pathlib import Path @@ -36,11 +37,21 @@ data_df = load_pv_gsp_raw_data_from_pvlive(start=start, end=end) # pivot to index as datetime_gmt, and columns as gsp_id -data_df = data_df.pivot(index="datetime_gmt", columns="gsp_id", values="generation_mw") -data_df.columns = [str(col) for col in data_df.columns] +data_generation = data_df.pivot(index="datetime_gmt", columns="gsp_id", values="generation_mw") +data_generation.columns = [str(col) for col in data_generation.columns] +data_generation_xarray = xr.DataArray( + data_generation, name="generation_mw", dims=["datetime_gmt", "gsp_id"] +) -# change to xarray -data_xarray = data_df.to_xarray() +data_capacity = data_df.pivot( + index="datetime_gmt", columns="gsp_id", values="installedcapacity_mwp" +) +data_capacity.columns = [str(col) for col in data_capacity.columns] +data_capacity_xarray = xr.DataArray( + data_capacity, name="installedcapacity_mwp", dims=["datetime_gmt", "gsp_id"] +) + +data_xarray = xr.merge([data_generation_xarray, data_capacity_xarray]) # save config to file with open(os.path.join(LOCAL_TEMP_PATH, "configuration.yaml"), "w+") as f: diff --git a/tests/data/gsp/test.zarr/.zmetadata b/tests/data/gsp/test.zarr/.zmetadata index a6cf4ed1..ab697b80 100644 --- a/tests/data/gsp/test.zarr/.zmetadata +++ b/tests/data/gsp/test.zarr/.zmetadata @@ -4,384 +4,9 @@ ".zgroup": { "zarr_format": 2 }, - "1/.zarray": { - "chunks": [ - 145 - ], - "compressor": { - "blocksize": 0, - "clevel": 5, - "cname": "lz4", - "id": "blosc", - "shuffle": 1 - }, - "dtype": " 1 + + gsp_pv_df = load_pv_gsp_raw_data_from_pvlive( + start=start, end=end, number_of_gsp=1, normalize_data=True + ) + assert gsp_pv_df["generation_mw"].max() <= 1 + + +def test_load_gsp_raw_data_from_pvlive_one_gsp(): + """a + Test that one gsp system data can be loaded + """ + + start = datetime(2019, 1, 1, tzinfo=pytz.utc) + end = datetime(2019, 3, 1, tzinfo=pytz.utc) + + gsp_pv_df = load_pv_gsp_raw_data_from_pvlive(start=start, end=end, number_of_gsp=1) + + assert isinstance(gsp_pv_df, pd.DataFrame) + assert len(gsp_pv_df) == (48 * 59 + 1) + # 30 days in january, 29 days in february, plus one for the first timestamp in march + assert "datetime_gmt" in gsp_pv_df.columns + assert "generation_mw" in gsp_pv_df.columns + + +def test_load_gsp_raw_data_from_pvlive_many_gsp(): + """ + Test that one gsp system data can be loaded + """ + + start = datetime(2019, 1, 1, tzinfo=pytz.utc) + end = datetime(2019, 1, 2, tzinfo=pytz.utc) + + gsp_pv_df = load_pv_gsp_raw_data_from_pvlive(start=start, end=end, number_of_gsp=10) + + assert isinstance(gsp_pv_df, pd.DataFrame) + assert len(gsp_pv_df) == (48 + 1) * 10 + assert "datetime_gmt" in gsp_pv_df.columns + assert "generation_mw" in gsp_pv_df.columns + + +def test_get_installed_capacity(): + + installed_capacity = get_installed_capacity(maximum_number_of_gsp=10) + + assert len(installed_capacity) == 10 + assert "installedcapacity_mwp" == installed_capacity.name + assert installed_capacity.iloc[0] == 342.02623 + assert installed_capacity.iloc[9] == 308.00432