In [3]:
import os
from multiprocessing.dummy import Pool
from typing import Callable

import pandas as pd
from entsoe import EntsoePandasClient
from entsoe.mappings import PSRTYPE_MAPPINGS
from entsoe.exceptions import NoMatchingDataError
from pycountry import countries
from dotenv import load_dotenv

from const import TZ, START, END, COUNTRIES, GEN_TYPES

## General definitions

In [4]:
# Load environment setting (API key)
load_dotenv()

# Create client to ENTSO-E TP
client = EntsoePandasClient(api_key=os.getenv("ENTSOE_APIKEY"))

# Invert the psr mapping so that we can get psr types by textual gen. types
inverted_psr_mapping = {value: key for key, value in PSRTYPE_MAPPINGS.items()}

# Get the two-letter country code for this country
try:
    country_codes = [countries.get(name=country).alpha_2 for country in COUNTRIES]
except AttributeError:
    raise KeyError(country)

Define a function for downloading the needed data in parallel.

In [5]:
def download_parallel(function: Callable, arguments, 
                      n_threads=1,
                      index_name=None, columns_name=None) -> pd.DataFrame:
    """Download data using a defined function
    
    Args:
        function: The query function to use, must return a pandas Series
        arguments: List of tuples to pass as arguments to `function`
        n_threads: Number of threads to use
        index_name (optional): Name for index
        columns_name (optional): Name(s) for the columns
    """
    try:
        with Pool(n_threads) as p:
            series = p.starmap(function, arguments)
    except ConnectionError:
        raise
    df = pd.concat(series, axis=1)
    if index_name is not None:
        df.index.name = index_name
    if columns_name is not None:
        df.columns.names = columns_name
    return df.sort_index(0).sort_index(1)

In [14]:
def harmonize_datetime_index(ts: pd.Series) -> pd.Series:
    """Aux. function to make sure we have real timestamps in UTC"""
    return pd.Series(ts.values, 
                     index=pd.DatetimeIndex(ts.index, name="timestamp").tz_convert("UTC"))

## Actual generation

Define functions for querying the ENTSO-E Tranparency Platform API.

In [10]:
def get_gen_data(country_code: str, gentype: str) -> pd.Series:
    """Query generation data for country and generation type
    """ 
    # Execute the query
    ts = pd.Series(name=(country_code, gentype)).tz_localize('UTC')
    try:
        ts = client.query_generation(
            country_code, 
            start=pd.Timestamp(START, tz=TZ), 
            end=pd.Timestamp(END, tz=TZ), 
            psr_type=inverted_psr_mapping[gentype]
        )[gentype]  # Select the only column
    except NoMatchingDataError:
        print(f"No data for {gentype} in {country_code}!")
    except ValueError:
        print(f"Error getting data for {gentype} in {country_code}!")
    else:
        # Make sure we have real timestamps in UTC
        ts = harmonize_datetime_index(ts)
        ts.name = (country_code, gentype)
    return ts.sort_index()

Download data for all generation types using four threads and store in the raw data folder.

In [6]:
for gt in GEN_TYPES:
    idx = pd.MultiIndex.from_product([country_codes, [gt]])
    df = download_parallel(get_gen_data, idx, n_threads=4)
    filename = f"../data/raw/ENTSO-E_TP_generation_{gt}.csv" 
    df.xs(gt, axis=1, level=1).to_csv(filename, header=True)
    print(f"Wrote {filename}")

Wrote ../data/raw/ENTSO-E_TP_generation_Solar.csv
Wrote ../data/raw/ENTSO-E_TP_generation_Wind Onshore.csv
Wrote ../data/raw/ENTSO-E_TP_generation_Wind Offshore.csv


## Installed generation capacity

Let’s download installed capacities from ENTSO-E

In [12]:
def get_installed_cap_data(gentype: str, country_code: str) -> pd.Series:
    """Query generation data for country and generation type
    """
    # Time series name has the arguments
    ts_name = (gentype, country_code)
    ts = pd.Series(name=ts_name)    
    # Execute the query
    try:
        ts = client.query_installed_generation_capacity(
            country_code, 
            start=pd.Timestamp(f'{pd.Timestamp(START).year - 1}-01-01'), 
            end=pd.Timestamp(END), 
            psr_type=inverted_psr_mapping[gentype]
        )[gentype]  # Select the only column
    except NoMatchingDataError:
        print(f"No data for {gentype} in {country_code}!")
    else:
        ts.index = ts.index.year  # ENTSO-E has numbers for the beginning of the year
        ts.index.name = 'Year'
        ts.name = ts_name
    return ts

In [13]:
idx = pd.MultiIndex.from_product([GEN_TYPES, country_codes])
df = download_parallel(get_installed_cap_data, idx, n_threads=4)
df.T.to_csv("../data/raw/ENTSO-E_TP_installed_cap.csv", header=True)
print(f"Wrote {filename}")

Wrote ../data/raw/ENTSO-E_TP_generation_Wind Offshore.csv


## Actual load

Let’s define a fucntion to query the load.

In [15]:
def get_load_data(country_code: str) -> pd.Series:
    """Query generation data for country and generation type
    """       
    # Execute the query
    ts = pd.Series(name=(country_code, gentype)).tz_localize('UTC')
    try:
        ts = client.query_load(
            country_code, 
            start=pd.Timestamp(START, tz=TZ), 
            end=pd.Timestamp(END, tz=TZ), 
            ) # Select the only column
    except NoMatchingDataError:
        print(f"No data for {country_code}!")
    except ValueError:
        print(f"Error getting data for {country_code}!")
    else:
        ts = harmonize_datetime_index(ts)
        ts.name = country_code
    return ts.sort_index()

In [16]:
df = download_parallel(get_load_data, [(cc,) for cc in country_codes], n_threads=4)
filename = f"../data/raw/ENTSO-E_TP_load.csv" 
df.to_csv(filename, header=True)
print(f"Wrote {filename}")

Wrote ../data/raw/ENTSO-E_TP_load.csv


## Generation forecast

In [15]:
def get_fcast_data(country_code: str, gentype: str) -> pd.Series:
    """Query generation data for country and generation type
    """       
    # Execute the query
    ts = pd.Series(name=(country_code, gentype)).tz_localize('UTC')
    try:
        ts = client.query_wind_and_solar_forecast(
            country_code, 
            start=pd.Timestamp(START, tz=TZ), 
            end=pd.Timestamp(END, tz=TZ), 
            psr_type=inverted_psr_mapping[gentype]
        )[gentype]  # Select the only column
    except NoMatchingDataError:
        print(f"No data for {gentype} in {country_code}!")
    except ValueError:
        print(f"Error getting forecast data for {gentype} in {country_code}!")
    else:
        ts = harmonize_datetime_index(ts)
        ts.name = (country_code, gentype)
    return ts.sort_index()

Download data for all generation types using four threads and store in the raw data folder.

In [16]:
for gt in GEN_TYPES:
    idx = pd.MultiIndex.from_product([country_codes, [gt]])
    df = download_parallel(get_fcast_data, idx, n_threads=4)
    filename = f"../data/raw/ENTSO-E_TP_forecast_{gt}.csv" 
    df.xs(gt, axis=1, level=1).to_csv(filename, header=True)
    print(f"Wrote {filename}")

No data for Solar in FI!
No data for Solar in IE!
No data for Solar in IT!
No data for Solar in SE!
Wrote ../data/raw/ENTSO-E_TP_forecast_Solar.csv
No data for Wind Onshore in IT!
Wrote ../data/raw/ENTSO-E_TP_forecast_Wind Onshore.csv
No data for Wind Offshore in FI!
No data for Wind Offshore in FR!
No data for Wind Offshore in IE!
No data for Wind Offshore in IT!
No data for Wind Offshore in PT!
No data for Wind Offshore in ES!
No data for Wind Offshore in SE!
No data for Wind Offshore in AT!
Wrote ../data/raw/ENTSO-E_TP_forecast_Wind Offshore.csv
