# Aquisição de dados no portal da transparência da ENTSOE

Como conseguir a API Key para acessar os dados do portal da transparência da ENTSOE:

https://uat-transparency.entsoe.eu/content/static_content/Static%20content/web%20api/how_to_get_security_token.html

## Importando bibliotecas

In [None]:
import numpy as np
import os
import pandas as pd
import time
from dotenv import load_dotenv
from datetime import datetime, timedelta
from entsoe import EntsoePandasClient

In [None]:
# Carregar variáveis de ambiente do arquivo .env
load_dotenv()
API_KEY = os.getenv("ENTSOE_API_KEY")
# Configuração do cliente Entsoe
client = EntsoePandasClient(API_KEY)
# Definição de parâmetros
country_code = "FR"

## Funções

In [None]:
def preencher_horas_faltantes(df, metodo="spline", order=2):
    """
    Preenche as horas faltantes no índice datetime do DataFrame, interpolando os valores de acordo com o método escolhido.
    Parâmetros:
    df (pd.DataFrame): DataFrame com índice datetime e colunas numéricas.
    metodo (str): Método de interpolação ('linear', 'time', 'index', 'values', 'pad',
                  'nearest', 'zero', 'slinear', 'quadratic', 'cubic', 'barycentric',
                  'polynomial', 'krogh', 'piecewise_polynomial', 'spline', 'pchip',
                  'akima', 'cubicspline', 'from_derivatives').
                  Padrão: 'spline'.
    order (int, opcional): Ordem do polinômio para os métodos 'polynomial' e 'spline'. Padrão: 2.
    Retorna:
    pd.DataFrame: DataFrame com as horas corrigidas e valores interpolados.
    """
    if df.empty:
        print("O DataFrame está vazio. Nenhuma interpolação foi realizada.")
        return df
    # Garante que o índice é datetime e remove valores inválidos
    df = df.copy()
    df.index = pd.to_datetime(df.index, errors="coerce")
    if df.index.isna().any():
        raise ValueError(
            "O índice contém valores não reconhecidos como datas.")
    # Criar um range de tempo contínuo com frequência de 1 hora
    full_range = pd.date_range(
        start=df.index.min(), end=df.index.max(), freq="h")
    # Só reindexa se houver horas ausentes
    if not df.index.equals(full_range):
        df = df.reindex(full_range)
    # Filtra apenas colunas numéricas
    numeric_cols = df.select_dtypes(include=["number"]).columns
    non_numeric_cols = df.select_dtypes(exclude=["number"]).columns
    if numeric_cols.empty:
        print(
            "Nenhuma coluna numérica encontrada para interpolação. Retornando DataFrame original."
        )
        return df
    # Lista de métodos válidos
    metodos_validos = [
        "linear",
        "time",
        "index",
        "values",
        "pad",
        "nearest",
        "zero",
        "slinear",
        "quadratic",
        "cubic",
        "barycentric",
        "polynomial",
        "krogh",
        "piecewise_polynomial",
        "spline",
        "pchip",
        "akima",
        "cubicspline",
        "from_derivatives",
    ]
    # Aplicando o método de interpolação escolhido
    try:
        if metodo in metodos_validos:
            if metodo in ["spline", "polynomial"] and order is None:
                order = 2  # Define um padrão se o usuário não passar
            df[numeric_cols] = df[numeric_cols].interpolate(
                method=metodo,
                order=order if metodo in ["spline", "polynomial"] else None,
            )
        else:
            raise ValueError(
                f"Método de interpolação inválido: '{metodo}'. Métodos disponíveis: {', '.join(metodos_validos)}"
            )
    except Exception as e:
        print(f"Erro ao interpolar os dados: {e}")
        return df
    # Aviso se existirem colunas não numéricas
    if not non_numeric_cols.empty:
        print(
            f"As colunas não numéricas foram ignoradas: {list(non_numeric_cols)}")
    return df

In [None]:
def get_day_ahead_prices(
    client,
    country_code: str,
    start: pd.Timestamp,
    end: pd.Timestamp,
    max_retries: int = 3,
    metodo="spline",
    order=2,
) -> pd.DataFrame:
    """
Obtém os preços day-ahead para um país específico em um período determinado, com tentativas automáticas em caso de falha.
Args:
    client: Cliente de API que fornece os preços.
    country_code (str): Código do país.
    start (pd.Timestamp): Data de início no formato Timestamp.
    end (pd.Timestamp): Data de fim no formato Timestamp.
    max_retries (int): Número máximo de tentativas antes de desistir.
    metodo (str): Método de interpolação ('linear', 'time', 'index', 'values', 'pad',
        'nearest', 'zero', 'slinear', 'quadratic', 'cubic', 'barycentric',
        'polynomial', 'krogh', 'piecewise_polynomial', 'spline', 'pchip',
        'akima', 'cubicspline', 'from_derivatives').
        Padrão: 'spline'.
    order (int, opcional): Ordem do polinômio para os métodos 'polynomial' e 'spline'. Padrão: 2.
Returns:
    pd.DataFrame: DataFrame contendo os preços day-ahead com timestamps em UTC.
    """

    # Garante que os argumentos start e end são timestamps corretamente formatados
    start = (
        pd.Timestamp(start).tz_localize("UTC")
        if pd.Timestamp(start).tz is None
        else pd.Timestamp(start)
    )
    end = (
        pd.Timestamp(end).tz_localize("UTC")
        if pd.Timestamp(end).tz is None
        else pd.Timestamp(end)
    )
    attempts = 0
    while attempts < max_retries:
        try:
            # Obtenção dos dados
            data = client.query_day_ahead_prices(
                country_code, start=start, end=end)

            # Se for uma Series, converte para DataFrame com nome de coluna adequado
            if isinstance(data, pd.Series):
                data = data.to_frame(name="Day Ahead Price")
            # Garantir que o índice está no formato datetime e converter para UTC
            data.index = pd.to_datetime(data.index).tz_convert("UTC")
            # Preencher possíveis valores ausentes antes da interpolação
            data.ffill(inplace=True)
            # Preenchimento de horas faltantes usando spline de ordem 2
            data = preencher_horas_faltantes(data, metodo=metodo, order=order)
            # Resetar o índice e nomear corretamente a coluna de timestamp
            data = data.reset_index(names=["Timestamp"])
            return data
        except Exception as e:
            attempts += 1
            print(
                f"Erro ao obter os dados de {start.year} (Tentativa {attempts}/{max_retries}): {e}"
            )
            if attempts < max_retries:
                print("Tentando novamente em 5 segundos...")
                time.sleep(5)
            else:
                print(
                    f"Falha após {max_retries} tentativas. Pulando ano {start.year}.")
                return (
                    pd.DataFrame()
                    # Retorna um DataFrame vazio para evitar falhas no código principal.
                )

In [None]:
def get_load(
    client,
    country_code: str,
    start: pd.Timestamp,
    end: pd.Timestamp,
    max_retries: int = 3,
    metodo="spline",
    order=2,
) -> pd.DataFrame:
    """
    Obtém os a carga atual para um país específico em um período determinado, com tentativas automáticas em caso de falha.
    Args:
        client: Cliente de API que fornece os preços.
        country_code (str): Código do país.
        start (pd.Timestamp): Data de início no formato Timestamp.
        end (pd.Timestamp): Data de fim no formato Timestamp.
        max_retries (int): Número máximo de tentativas antes de desistir.
        metodo (str): Método de interpolação ('linear', 'time', 'index', 'values', 'pad',
            'nearest', 'zero', 'slinear', 'quadratic', 'cubic', 'barycentric',
            'polynomial', 'krogh', 'piecewise_polynomial', 'spline', 'pchip',
            'akima', 'cubicspline', 'from_derivatives').
            Padrão: 'spline'.
        order (int, opcional): Ordem do polinômio para os métodos 'polynomial' e 'spline'. Padrão: 2.
    Returns:
        pd.DataFrame: DataFrame contendo os preços day-ahead com timestamps em UTC.
    """

    # Garante que os argumentos start e end são timestamps corretamente formatados
    start = (
        pd.Timestamp(start).tz_localize("UTC")
        if pd.Timestamp(start).tz is None
        else pd.Timestamp(start)
    )
    end = (
        pd.Timestamp(end).tz_localize("UTC")
        if pd.Timestamp(end).tz is None
        else pd.Timestamp(end)
    )

    attempts = 0

    while attempts < max_retries:
        try:
            # Obtenção dos dados
            data = client.query_load(country_code, start=start, end=end)

            # Se for uma Series, converte para DataFrame com nome de coluna adequado
            if isinstance(data, pd.Series):
                data = data.to_frame()

            # Garantir que o índice está no formato datetime e converter para UTC
            data.index = pd.to_datetime(data.index).tz_convert("UTC")

            # Preencher possíveis valores ausentes antes da interpolação
            data.ffill(inplace=True)

            # Preenchimento de horas faltantes usando spline de ordem 2
            data = preencher_horas_faltantes(data, metodo=metodo, order=order)

            # Resetar o índice e nomear corretamente a coluna de timestamp
            data = data.reset_index(names=["Timestamp"])

            return data

        except Exception as e:
            attempts += 1
            print(
                f"Erro ao obter os dados de {start.year} (Tentativa {attempts}/{max_retries}): {e}"
            )

            if attempts < max_retries:
                print("Tentando novamente em 5 segundos...")
                time.sleep(5)
            else:
                print(
                    f"Falha após {max_retries} tentativas. Pulando ano {start.year}.")
                return (
                    pd.DataFrame()
                    # Retorna um DataFrame vazio para evitar falhas no código principal.
                )

In [None]:
def get_crossborder_flux(
    client,
    country_code: str,
    start: pd.Timestamp,
    end: pd.Timestamp,
    max_retries: int = 3,
    metodo="spline",
    order=2,
) -> pd.DataFrame:
    """
    Obtém o fluxo de energia de um país específico em um período determinado, com tentativas automáticas em caso de falha.
    Args:
        client: Cliente de API que fornece os preços.
        country_code (str): Código do país.
        start (pd.Timestamp): Data de início no formato Timestamp.
        end (pd.Timestamp): Data de fim no formato Timestamp.
        max_retries (int): Número máximo de tentativas antes de desistir.
        metodo (str): Método de interpolação ('linear', 'time', 'index', 'values', 'pad',
            'nearest', 'zero', 'slinear', 'quadratic', 'cubic', 'barycentric',
            'polynomial', 'krogh', 'piecewise_polynomial', 'spline', 'pchip',
            'akima', 'cubicspline', 'from_derivatives').
            Padrão: 'spline'.
        order (int, opcional): Ordem do polinômio para os métodos 'polynomial' e 'spline'. Padrão: 2.
    Returns:
        pd.DataFrame: DataFrame contendo os preços day-ahead com timestamps em UTC.
    """

    # Garante que os argumentos start e end são timestamps corretamente formatados

    start = (
        pd.Timestamp(start).tz_localize("UTC")
        if pd.Timestamp(start).tz is None
        else pd.Timestamp(start)
    )
    end = (
        pd.Timestamp(end).tz_localize("UTC")
        if pd.Timestamp(end).tz is None
        else pd.Timestamp(end)
    )

    attempts = 0

    while attempts < max_retries:
        try:
            # Obtenção dos dados
            export_energy = client.query_physical_crossborder_allborders(
                country_code, start=start, end=end, export=True, per_hour=True
            ).drop(columns="sum")
            import_energy = client.query_physical_crossborder_allborders(
                country_code, start=start, end=end, export=False, per_hour=True
            ).drop(columns="sum")

            # Calcula o fluxo de energia

            data = import_energy.subtract(export_energy, fill_value=0)

            # Se for uma Series, converte para DataFrame com nome de coluna adequado

            if isinstance(data, pd.Series):
                data = data.to_frame()

            # Garantir que o índice está no formato datetime e converter para UTC

            data.index = pd.to_datetime(data.index).tz_convert("UTC")

            # Preencher possíveis valores ausentes antes da interpolação
            data.ffill(inplace=True)

            # Preenchimento de horas faltantes usando spline de ordem 2
            data = preencher_horas_faltantes(data, metodo=metodo, order=order)

            # Resetar o índice e nomear corretamente a coluna de timestamp
            data = data.reset_index(names=["Timestamp"])
            return data


        except Exception as e:
            attempts += 1
            print(
                f"Erro ao obter os dados de {start.year} (Tentativa {attempts}/{max_retries}): {e}"
            )

            if attempts < max_retries:
                print("Tentando novamente em 5 segundos...")
                time.sleep(5)
            else:
                print(
                    f"Falha após {max_retries} tentativas. Pulando ano {start.year}.")
                return (
                    pd.DataFrame()
                    # Retorna um DataFrame vazio para evitar falhas no código principal.
                )

In [None]:
def get_generation(
    client,
    country_code: str,
    start: pd.Timestamp,
    end: pd.Timestamp,
    psr_type: str = None,
    max_retries: int = 3,
    metodo="spline",
    order=2,
) -> pd.DataFrame:
    """
    Obtém os preços day-ahead para um país específico em um período determinado, com tentativas automáticas em caso de falha.
    Args:
        client: Cliente de API que fornece os preços.
        country_code (str): Código do país.
        start (pd.Timestamp): Data de início no formato Timestamp.
        end (pd.Timestamp): Data de fim no formato Timestamp.
        psr_type (str, opcional): Tipo de geração a ser consultado. Se None, retorna todos os tipos disponíveis.
            Os tipos de PSR que podem estar disponíveis são:
                - 'B01' - Biomass
                - 'B02' - Fossil Brown coal/Lignite
                - 'B03' - Fossil Coal-derived gas
                - 'B04' - Fossil Gas
                - 'B05' - Fossil Hard coal
                - 'B06' - Fossil Oil
                - 'B07' - Fossil Oil shale
                - 'B08' - Fossil Peat
                - 'B09' - Geothermal
                - 'B10' - Hydro Pumped Storage
                - 'B11' - Hydro Run-of-river and poundage
                - 'B12' - Hydro Water Reservoir
                - 'B13' - Marine
                - 'B14' - Nuclear
                - 'B15' - Other renewable
                - 'B16' - Solar
                - 'B17' - Waste
                - 'B18' - Wind Offshore
                - 'B19' - Wind Onshore
                - 'B20' - Other
                - 'B21' - AC Link
                - 'B22' - DC Link
                - 'B23' - Substation
                - 'B24' - Transformer
                - 'B25' - Energy storage
        max_retries (int): Número máximo de tentativas antes de desistir.
        metodo (str): Método de interpolação ('linear', 'time', 'index', 'values', 'pad',
            'nearest', 'zero', 'slinear', 'quadratic', 'cubic', 'barycentric',
            'polynomial', 'krogh', 'piecewise_polynomial', 'spline', 'pchip',
            'akima', 'cubicspline', 'from_derivatives').
            Padrão: 'spline'.
        order (int, opcional): Ordem do polinômio para os métodos 'polynomial' e 'spline'. Padrão: 2.
    Returns:
        pd.DataFrame: DataFrame contendo os preços day-ahead com timestamps em UTC.
"""

    # Garante que os argumentos start e end são timestamps corretamente formatados
    start = (
        pd.Timestamp(start).tz_localize("UTC")
        if pd.Timestamp(start).tz is None
        else pd.Timestamp(start)
    )
    end = (
        pd.Timestamp(end).tz_localize("UTC")
        if pd.Timestamp(end).tz is None
        else pd.Timestamp(end)
    )

    attempts = 0

    while attempts < max_retries:
        try:
            # Obtenção dos dados
            data = client.query_generation(
                country_code, start=start, end=end, psr_type=psr_type
            )

            # Renomeando colunas de forma mais intuitiva apenas se houver MultiIndex
            if isinstance(data.columns, pd.MultiIndex):
                data.columns = [
                    f"{source} {metric.replace('Actual ', '')}"
                    for source, metric in data.columns
                ]

            # Tornar negativas as colunas 'Consumption' e somá-las com 'Aggregated'
            data.loc[:, data.columns.str.contains(
                "Consumption", na=False)] *= -1
            data = (
                data.T.groupby(
                    lambda x: x.replace(" Aggregated", "").replace(
                        " Consumption", "")
                )
                .sum()
                .T
            )

            # Se for uma Series, converte para DataFrame com nome de coluna adequado
            if isinstance(data, pd.Series):
                data = data.to_frame()

            # Garantir que o índice está no formato datetime e converter para UTC
            data.index = pd.to_datetime(data.index).tz_convert("UTC")

            # Preencher possíveis valores ausentes antes da interpolação
            data.ffill(inplace=True)

            # Preenchimento de horas faltantes usando spline de ordem 2
            data = preencher_horas_faltantes(data, metodo=metodo, order=order)

            # Resetar o índice e nomear corretamente a coluna de timestamp
            data = data.reset_index(names=["Timestamp"])
            return data


        except Exception as e:
            attempts += 1
            print(
                f"Erro ao obter os dados de {start.year} (Tentativa {attempts}/{max_retries}): {e}"
            )

            if attempts < max_retries:
                print("Tentando novamente em 5 segundos...")
                time.sleep(5)
            else:
                print(
                    f"Falha após {max_retries} tentativas. Pulando ano {start.year}.")
                return (
                    pd.DataFrame()
                    # Retorna um DataFrame vazio para evitar falhas no código principal.
                )

## Coleta Day Ahead Prices

In [None]:
# Verifica se o csv já existe
try:
    df = pd.read_csv("data/day_ahead_prices.csv")
    print("Arquivo CSV encontrado.")
except FileNotFoundError:
    # Inicializa um dicionário para armazenar os preços por ano
    day_ahead_prices = {}
    # Aquisição dos preços day-ahead para a França de 2015 até o primeiro dado de 2025
    for year in range(2015, 2025):
        start = pd.Timestamp(f"{year}0101", tz="UTC")
        end = pd.Timestamp(f"{year + 1}0101", tz="UTC")
        try:
            prices = get_day_ahead_prices(
                client, country_code=country_code, start=start, end=end
            )
            day_ahead_prices[year] = prices
            print(f"Dados de {year} adquiridos com sucesso.")
        except Exception as e:
            print(f"Erro ao obter os dados de {year}: {e}")
    # Concatenar todos os DataFrames, tratando duplicatas pela coluna Timestamp
    if day_ahead_prices:
        dahp = pd.concat(day_ahead_prices.values(), ignore_index=True).drop_duplicates(
            subset="Timestamp", keep="last"
        )
        # Armazenar os dados em um arquivo CSV
        dahp.to_csv("data/day_ahead_prices.csv", index=False)
        print("Arquivo CSV salvo com sucesso.")
    else:
        print("Nenhum dado foi adquirido, arquivo CSV não gerado.")

## Coleta LOAD

In [None]:
# Verifica se o csv já existe
try:
    df = pd.read_csv("data/load.csv")
    print("Arquivo CSV encontrado.")
except FileNotFoundError:
    # Inicializa um dicionário para armazenar os preços por ano
    load = {}
    # Aquisição dos preços day-ahead para a França de 2015 até o primeiro dado de 2025
    for year in range(2015, 2025):
        start = pd.Timestamp(f"{year}0101", tz="UTC")
        end = pd.Timestamp(f"{year + 1}0101", tz="UTC")
        try:
            actual_load = get_load(
                client, country_code=country_code, start=start, end=end, order=3
            )
            load[year] = actual_load
            print(f"Dados de {year} adquiridos com sucesso.")
        except Exception as e:
            print(f"Erro ao obter os dados de {year}: {e}")
    # Concatenar todos os DataFrames, tratando duplicatas pela coluna Timestamp
    if load:
        all_load = pd.concat(load.values(), ignore_index=True).drop_duplicates(
            subset="Timestamp", keep="last"
        )
        # Armazenar os dados em um arquivo CSV
        all_load.to_csv("data/load.csv", index=False)
        print("Arquivo CSV salvo com sucesso.")
    else:
        print("Nenhum dado foi adquirido, arquivo CSV não gerado.")

## Coleta Actual Generation per Production Type

In [None]:
# Verifica se o csv já existe
try:
    df = pd.read_csv("data/gen.csv")
    print("Arquivo CSV encontrado.")
except FileNotFoundError:
    # Inicializa um dicionário para armazenar os preços por ano
    gen = {}
    # Aquisição dos preços day-ahead para a França de 2015 até o primeiro dado de 2025
    for year in range(2015, 2025):
        start = pd.Timestamp(f"{year}0101", tz="UTC")
        end = pd.Timestamp(f"{year + 1}0101", tz="UTC")
        try:
            actual_gen = get_generation(
                client, country_code=country_code, start=start, end=end, order=3
            )
            gen[year] = actual_gen
            print(f"Dados de {year} adquiridos com sucesso.")
        except Exception as e:
            print(f"Erro ao obter os dados de {year}: {e}")
    # Concatenar todos os DataFrames, tratando duplicatas pela coluna Timestamp
    if gen:
        all_gen = pd.concat(gen.values(), ignore_index=True).drop_duplicates(
            subset="Timestamp", keep="last"
        )
        # Armazenar os dados em um arquivo CSV
        all_gen.to_csv("data/gen.csv", index=False)
        print("Arquivo CSV salvo com sucesso.")
    else:
        print("Nenhum dado foi adquirido, arquivo CSV não gerado.")

## Coleta Crossborder Flows

In [None]:
# Verifica se o csv já existe
try:
    df = pd.read_csv("data/flux.csv")
    print("Arquivo CSV encontrado.")
except FileNotFoundError:
    # Inicializa um dicionário para armazenar os preços por ano
    flux = {}
    # Aquisição dos preços day-ahead para a França de 2015 até o primeiro dado de 2025
    for year in range(2015, 2025):
        start = pd.Timestamp(f"{year}0101", tz="UTC")
        end = pd.Timestamp(f"{year + 1}0101", tz="UTC")
        try:
            actual_flux = get_crossborder_flux(
                client, country_code=country_code, start=start, end=end, order=3
            )
            flux[year] = actual_flux
            print(f"Dados de {year} adquiridos com sucesso.")
        except Exception as e:
            print(f"Erro ao obter os dados de {year}: {e}")
    # Concatenar todos os DataFrames, tratando duplicatas pela coluna Timestamp
    if flux:
        all_flux = pd.concat(flux.values(), ignore_index=True).drop_duplicates(
            subset="Timestamp", keep="last"
        )
        # Armazenar os dados em um arquivo CSV
        all_flux.to_csv("data/flux.csv", index=False)
        print("Arquivo CSV salvo com sucesso.")
    else:
        print("Nenhum dado foi adquirido, arquivo CSV não gerado.")