Skip to content
This repository was archived by the owner on Sep 11, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion nowcasting_dataset/config/gcp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ input_data:
satellite_zarr_path: gs://solar-pv-nowcasting-data/satellite/EUMETSAT/SEVIRI_RSS/OSGB36/all_zarr_int16_single_timestep.zarr
solar_pv_data_filename: gs://solar-pv-nowcasting-data/PV/PVOutput.org/UK_PV_timeseries_batch.nc
solar_pv_metadata_filename: gs://solar-pv-nowcasting-data/PV/PVOutput.org/UK_PV_metadata.csv
gsp_zarr_path: gs://solar-pv-nowcasting-data/PV/PVOutput.org/PV/GSP/v0/pv_gsp.zarr
gsp_zarr_path: gs://solar-pv-nowcasting-data/PV/PVOutput.org/PV/GSP/v1/pv_gsp.zarr
topographic_filename: gs://solar-pv-nowcasting-data/Topographic/europe_dem_1km_osgb.tif
output_data:
filepath: gs://solar-pv-nowcasting-data/prepared_ML_training_data/v6/
Expand Down
2 changes: 1 addition & 1 deletion nowcasting_dataset/config/on_premises.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ input_data:
solar_pv_path:
solar_pv_data_filename: /storage/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/raw/PV/PVOutput.org/UK_PV_timeseries_batch.nc
solar_pv_metadata_filename: /storage/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/raw/PV/PVOutput.org/UK_PV_metadata.csv
gsp_zarr_path: /storage/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/raw/PV/GSP/v0/pv_gsp.zarr
gsp_zarr_path: /storage/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/raw/PV/GSP/v1/pv_gsp.zarr
output_data:
filepath: /storage/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/prepared_ML_training_data/v7/
process:
Expand Down
14 changes: 13 additions & 1 deletion nowcasting_dataset/data_sources/gsp/gsp_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ class GSPDataSource(ImageDataSource):
get_center: bool = True
# the maximum number of gsp's to be loaded for data sample
n_gsp_per_example: int = DEFAULT_N_GSP_PER_EXAMPLE
# scale from zero to one
do_scale_0_to_1: bool = False

def __post_init__(self, image_size_pixels: int, meters_per_pixel: int):
"""
Expand Down Expand Up @@ -96,7 +98,8 @@ def load(self):
)

# scale from 0 to 1
self.gsp_power = scale_to_0_to_1(self.gsp_power)
if self.do_scale_0_to_1:
self.gsp_power = scale_to_0_to_1(self.gsp_power)

logger.debug(f"There are {len(self.gsp_power.columns)} GSP")

Expand Down Expand Up @@ -389,7 +392,16 @@ def load_solar_gsp_data(
# Open data - it may be quicker to open byte file first, but decided just to keep it like this at the moment
gsp_power = xr.open_dataset(filename, engine="zarr")
gsp_power = gsp_power.sel(datetime_gmt=slice(start_dt, end_dt))

# only take generation data
gsp_power = gsp_power.generation_mw

# make dataframe with index datetime_gmt and columns og gsp_id
gsp_power_df = gsp_power.to_dataframe()
gsp_power_df.reset_index(inplace=True)
gsp_power_df = gsp_power_df.pivot(
index="datetime_gmt", columns="gsp_id", values="generation_mw"
)

# Save memory
del gsp_power
Expand Down
120 changes: 92 additions & 28 deletions nowcasting_dataset/data_sources/gsp/pvlive.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
import logging
import pandas as pd
from pvlive_api import PVLive
from typing import Optional
import pytz
from tqdm import tqdm
from concurrent import futures

from nowcasting_dataset.data_sources.gsp.eso import get_list_of_gsp_ids

Expand All @@ -12,7 +16,7 @@


def load_pv_gsp_raw_data_from_pvlive(
start: datetime, end: datetime, number_of_gsp: int = None
start: datetime, end: datetime, number_of_gsp: int = None, normalize_data: bool = True
) -> pd.DataFrame:
"""
Load raw pv gsp data from pvlive. Note that each gsp is loaded separately. Also the data is loaded in 30 day chunks.
Expand All @@ -21,6 +25,7 @@ def load_pv_gsp_raw_data_from_pvlive(
start: the start date for gsp data to load
end: the end date for gsp data to load
number_of_gsp: The number of gsp to load. Note that on 2021-09-01 there were 338 to load.
normalize_data: Option to normalize the generation according to installed capacity

Returns: Data frame of time series of gsp data. Shows PV data for each GSP from {start} to {end}

Expand All @@ -38,46 +43,59 @@ def load_pv_gsp_raw_data_from_pvlive(
gsp_data_df = []
logger.debug(f"Will be getting data for {len(gsp_ids)} gsp ids")
# loop over gsp ids
for gsp_id in gsp_ids:

one_gsp_data_df = []

# set the first chunk start and end times
start_chunk = first_start_chunk
end_chunk = first_end_chunk

# loop over 30 days chunks (nice to see progress instead of waiting a long time for one command - this might
# not be the fastest)
while start_chunk <= end:
logger.debug(f"Getting data for gsp id {gsp_id} from {start_chunk} to {end_chunk}")

one_gsp_data_df.append(
pvl.between(
# limit the total number of concurrent tasks to be 4, so that we don't hit the pvlive api too much
future_tasks = []
with futures.ThreadPoolExecutor(max_workers=4) as executor:
for gsp_id in gsp_ids:

# set the first chunk start and end times
start_chunk = first_start_chunk
end_chunk = first_end_chunk

# loop over 30 days chunks (nice to see progress instead of waiting a long time for one command - this might
# not be the fastest)
while start_chunk <= end:
logger.debug(f"Getting data for gsp id {gsp_id} from {start_chunk} to {end_chunk}")

task = executor.submit(
pvl.between,
start=start_chunk,
end=end_chunk,
entity_type="gsp",
entity_id=gsp_id,
extra_fields="",
extra_fields="installedcapacity_mwp",
dataframe=True,
)
)

# add 30 days to the chunk, to get the next chunk
start_chunk = start_chunk + CHUNK_DURATION
end_chunk = end_chunk + CHUNK_DURATION
future_tasks.append(task)

# add 30 days to the chunk, to get the next chunk
start_chunk = start_chunk + CHUNK_DURATION
end_chunk = end_chunk + CHUNK_DURATION

if end_chunk > end:
end_chunk = end
if end_chunk > end:
end_chunk = end

# join together one gsp data, and sort
one_gsp_data_df = pd.concat(one_gsp_data_df)
one_gsp_data_df = one_gsp_data_df.sort_values(by=["gsp_id", "datetime_gmt"])
logger.debug(f"Getting results")
# Collect results from each thread.
for task in tqdm(future_tasks):
one_chunk_one_gsp_gsp_data_df = task.result()

# append to longer list
gsp_data_df.append(one_gsp_data_df)
if normalize_data:
one_chunk_one_gsp_gsp_data_df["generation_mw"] = (
one_chunk_one_gsp_gsp_data_df["generation_mw"]
/ one_chunk_one_gsp_gsp_data_df["installedcapacity_mwp"]
)

# append to longer list
gsp_data_df.append(one_chunk_one_gsp_gsp_data_df)

# join together gsp data
gsp_data_df = pd.concat(gsp_data_df)

# sort
gsp_data_df = gsp_data_df.sort_values(by=["gsp_id", "datetime_gmt"])

# remove any extra data loaded
gsp_data_df = gsp_data_df[gsp_data_df["datetime_gmt"] <= end]

Expand All @@ -88,3 +106,49 @@ def load_pv_gsp_raw_data_from_pvlive(
gsp_data_df["datetime_gmt"] = gsp_data_df["datetime_gmt"].dt.tz_localize(None)

return gsp_data_df


def get_installed_capacity(
start: Optional[datetime] = datetime(2021, 1, 1, tzinfo=pytz.utc),
maximum_number_of_gsp: Optional[int] = None,
) -> pd.Series:
"""
Get the installed capacity of each gsp

This can take ~30 seconds for getting the full list

Args:
start: optional datetime when the installed cpapcity is collected
maximum_number_of_gsp: Truncate list of GSPs to be no larger than this number of GSPs.
Set to None to disable truncation.

Returns: pd.Series of installed capacity indexed by gsp_id

"""
logger.debug(f"Getting all installed capacity at {start}")

# get a lit of gsp ids
gsp_ids = get_list_of_gsp_ids(maximum_number_of_gsp=maximum_number_of_gsp)

# setup pv Live class, although here we are getting historic data
pvl = PVLive()

# loop over gsp_id to get installed capacity
data = []
for gsp_id in gsp_ids:
d = pvl.at_time(
start,
entity_type="gsp",
extra_fields="installedcapacity_mwp",
dataframe=True,
entity_id=gsp_id,
)
data.append(d)

# join data together
data_df = pd.concat(data)

# set gsp_id as index
data_df.set_index("gsp_id", inplace=True)

return data_df["installedcapacity_mwp"]
1 change: 1 addition & 0 deletions nowcasting_dataset/dataset/split/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
""" split functions """
19 changes: 15 additions & 4 deletions scripts/get_raw_pv_gsp_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import yaml
import os
import numcodecs
import xarray as xr

from nowcasting_dataset.data_sources.gsp.pvlive import load_pv_gsp_raw_data_from_pvlive
from pathlib import Path
Expand All @@ -36,11 +37,21 @@
data_df = load_pv_gsp_raw_data_from_pvlive(start=start, end=end)

# pivot to index as datetime_gmt, and columns as gsp_id
data_df = data_df.pivot(index="datetime_gmt", columns="gsp_id", values="generation_mw")
data_df.columns = [str(col) for col in data_df.columns]
data_generation = data_df.pivot(index="datetime_gmt", columns="gsp_id", values="generation_mw")
data_generation.columns = [str(col) for col in data_generation.columns]
data_generation_xarray = xr.DataArray(
data_generation, name="generation_mw", dims=["datetime_gmt", "gsp_id"]
)

# change to xarray
data_xarray = data_df.to_xarray()
data_capacity = data_df.pivot(
index="datetime_gmt", columns="gsp_id", values="installedcapacity_mwp"
)
data_capacity.columns = [str(col) for col in data_capacity.columns]
data_capacity_xarray = xr.DataArray(
data_capacity, name="installedcapacity_mwp", dims=["datetime_gmt", "gsp_id"]
)

data_xarray = xr.merge([data_generation_xarray, data_capacity_xarray])

# save config to file
with open(os.path.join(LOCAL_TEMP_PATH, "configuration.yaml"), "w+") as f:
Expand Down
Loading