In [22]:
import logging
import pandas as pd
import os
import random
import time, datetime
from entsoe import EntsoePandasClient
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
from influxdb_client.client.query_api import QueryApi

In [23]:
# Set up logging
logging.basicConfig(filename='entsoe.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logging.info('Script started at %s', pd.Timestamp.now(tz='UTC'))

In [24]:
# Set API key
API_KEY = '6276342c-e10c-4d88-8688-cb0a1cf163ca'

In [25]:
# Define InfluxDB connection details
influxdb_url = 'http://159.89.103.242:8086'
influxdb_token = 'dfRMxqDtwyHK7vDJHelAm0WKISLvKFUrmhclvaaAoMFOHRRTGNnYkV8bXd0jR9r4arvkg3l_lWNSHyKMG0WxSg=='
influxdb_org = 'entra'
influxdb_bucket = 'entra'

In [26]:
# Initialize the Entsoe client
client = EntsoePandasClient(api_key=API_KEY)

In [27]:
# Create InfluxDB client
influx_client = InfluxDBClient(url=influxdb_url, token=influxdb_token, org=influxdb_org)

In [28]:
# Create the write API
write_api = influx_client.write_api(write_options=SYNCHRONOUS)
# Instantiate the query API
query_api = QueryApi(influx_client)

In [29]:
# Define the time period
start = pd.Timestamp('2023-07-18', tz='UTC')
end = pd.Timestamp('2023-08-03', tz='UTC')

In [30]:
# Define the categories and their corresponding measurements
categories = {
    '6.1.A': 'actual_total_load',
    '6.1.B': 'day_ahead_total_load_forecast',
    '14.1.A': 'installed_generation_capacity',
    '16.1.B&C': 'actual_generation'
}

# Country codes
country_codes = sorted(["DE_50HZ", "IT_NORD_SI", "AL", "IT_PRGP", "DE_AMPRION", "IT_ROSN", "AT", "IT_SARD", "BY", "IT_SICI",
                 "BE", "IT_SUD", "BA", "RU_KGD", "BG", "LV", "CZ_DE_SK", "LT", "HR", "LU", "CWE", "MT", "CY", "ME",
                 "CZ", "GB", "DE_AT_LU", "NL", "DE_LU", "NO_1", "DK", "NO_2", "DK_1", "NO_3", "DK_2", "NO_4", "DK_CA",
                 "NO_5", "EE", "NO", "FI", "PL_CZ", "MK", "PL", "FR", "PT", "DE", "MD", "GR", "RO", "HU", "RU", "IS",
                 "SE_1", "IE_SEM", "SE_2", "IE", "SE_3", "IT", "SE_4", "IT_SACO_AC", "RS", "IT_SACO_DC", "SK", "IT_BRNN",
                 "SI", "IT_CNOR", "GB_NIR", "IT_CSUD", "ES", "IT_FOGN", "SE", "IT_GR", "CH", "IT_MACRO_NORTH",
                 "DE_TENNET", "IT_MACRO_SOUTH", "DE_TRANSNET", "IT_MALTA", "TR", "IT_NORD", "UA", "IT_NORD_AT",
                 "UA_DOBTPP", "IT_NORD_CH", "UA_BEI", "IT_NORD_FR", "UA_IPS"])

In [31]:
country_codes

['AL',
 'AT',
 'BA',
 'BE',
 'BG',
 'BY',
 'CH',
 'CWE',
 'CY',
 'CZ',
 'CZ_DE_SK',
 'DE',
 'DE_50HZ',
 'DE_AMPRION',
 'DE_AT_LU',
 'DE_LU',
 'DE_TENNET',
 'DE_TRANSNET',
 'DK',
 'DK_1',
 'DK_2',
 'DK_CA',
 'EE',
 'ES',
 'FI',
 'FR',
 'GB',
 'GB_NIR',
 'GR',
 'HR',
 'HU',
 'IE',
 'IE_SEM',
 'IS',
 'IT',
 'IT_BRNN',
 'IT_CNOR',
 'IT_CSUD',
 'IT_FOGN',
 'IT_GR',
 'IT_MACRO_NORTH',
 'IT_MACRO_SOUTH',
 'IT_MALTA',
 'IT_NORD',
 'IT_NORD_AT',
 'IT_NORD_CH',
 'IT_NORD_FR',
 'IT_NORD_SI',
 'IT_PRGP',
 'IT_ROSN',
 'IT_SACO_AC',
 'IT_SACO_DC',
 'IT_SARD',
 'IT_SICI',
 'IT_SUD',
 'LT',
 'LU',
 'LV',
 'MD',
 'ME',
 'MK',
 'MT',
 'NL',
 'NO',
 'NO_1',
 'NO_2',
 'NO_3',
 'NO_4',
 'NO_5',
 'PL',
 'PL_CZ',
 'PT',
 'RO',
 'RS',
 'RU',
 'RU_KGD',
 'SE',
 'SE_1',
 'SE_2',
 'SE_3',
 'SE_4',
 'SI',
 'SK',
 'TR',
 'UA',
 'UA_BEI',
 'UA_DOBTPP',
 'UA_IPS']

In [18]:
measurements = list(categories.values())

In [20]:
#Actual Total Load - 1
for country_code in country_codes:
    try:
        # Download the actual total load data
        actual_total_load = client.query_load(country_code=country_code, start=start, end=end)
        print(f"Actual Total Load for {country_code}:")
        print(actual_total_load)  # Print the data

        # Rename the columns
        actual_total_load.columns = ['value']

        # Convert the index to UTC and format it as string
        actual_total_load.index = actual_total_load.index.tz_convert('UTC').strftime('%Y-%m-%dT%H:%M:%SZ')

        # Convert the data to InfluxDB Line Protocol format
        data_points = []
        for timestamp, value in actual_total_load.iterrows():
            data_point = Point("actual_total_load") \
                .tag("country", country_code) \
                .field("value", value['value']) \
                .time(timestamp, WritePrecision.NS)
            data_points.append(data_point)
            
        write_api.write(bucket=influxdb_bucket, record=data_points)
        logging.info('Data extraction (day_ahead_total_load_forecast) completed for country: %s', country_code)
    except Exception as e:
        print(f'Data extraction (actual_total_load) failed for country: {country_code}: {str(e)}')
        logging.error(f'Data extraction (actual_total_load) failed for country: {country_code}: {str(e)}')

Actual Total Load for AT:
                           Actual Load
2023-07-18 02:00:00+02:00       5193.0
2023-07-18 02:15:00+02:00       5140.0
2023-07-18 02:30:00+02:00       5090.0
2023-07-18 02:45:00+02:00       5048.0
2023-07-18 03:00:00+02:00       5018.0
...                                ...
2023-08-03 00:45:00+02:00       4920.0
2023-08-03 01:00:00+02:00       4882.0
2023-08-03 01:15:00+02:00       4820.0
2023-08-03 01:30:00+02:00       4758.0
2023-08-03 01:45:00+02:00       4701.0

[1536 rows x 1 columns]
Actual Total Load for BA:
                           Actual Load
2023-07-18 02:00:00+02:00        884.0
2023-07-18 03:00:00+02:00        848.0
2023-07-18 04:00:00+02:00        846.0
2023-07-18 05:00:00+02:00        917.0
2023-07-18 06:00:00+02:00       1138.0
...                                ...
2023-08-02 21:00:00+02:00       1191.0
2023-08-02 22:00:00+02:00       1145.0
2023-08-02 23:00:00+02:00       1024.0
2023-08-03 00:00:00+02:00        846.0
2023-08-03 01:00:00+02:00 

In [21]:
# Download day-ahead total load forecast - 2
for country_code in country_codes:
    try:
        day_ahead_total_load_forecast = client.query_load_forecast(country_code=country_code, start=start, end=end)
        print(f"Day-Ahead Total Load Forecast for {country_code}:")
        print(day_ahead_total_load_forecast)  # Print the data
        
        # Rename the columns
        day_ahead_total_load_forecast.columns = ['value']

        # Convert the index to UTC and format it as string
        day_ahead_total_load_forecast.index = day_ahead_total_load_forecast.index.tz_convert('UTC').strftime('%Y-%m-%dT%H:%M:%SZ')

        # Convert the data to InfluxDB Line Protocol format
        data_points = []
        for timestamp, value in day_ahead_total_load_forecast.iterrows():
            data_point = Point("day_ahead_total_load_forecast") \
                .tag("country", country_code) \
                .field("value", value['value']) \
                .time(timestamp, WritePrecision.NS)
            data_points.append(data_point)

        # Write data points to InfluxDB
        write_api.write(bucket=influxdb_bucket, record=data_points)
        logging.info('Data extraction (day_ahead_total_load_forecast) completed for country: %s', country_code)

    except Exception as e:
        print(f'Data extraction (day_ahead_total_load_forecast) failed for {country_code}: {str(e)}')
        logging.error(f'Data extraction (day_ahead_total_load_forecast) failed for {country_code}: {str(e)}')

Day-Ahead Total Load Forecast for AL:
                           Forecasted Load
2023-07-18 02:00:00+02:00            629.0
2023-07-18 02:15:00+02:00            629.0
2023-07-18 02:30:00+02:00            629.0
2023-07-18 02:45:00+02:00            629.0
2023-07-18 03:00:00+02:00            597.0
...                                    ...
2023-08-03 01:00:00+02:00            664.0
2023-08-03 01:15:00+02:00            664.0
2023-08-03 01:30:00+02:00            664.0
2023-08-03 01:45:00+02:00            664.0
2023-08-03 02:00:00+02:00            624.0

[1537 rows x 1 columns]
Data extraction (day_ahead_total_load_forecast) failed for AL: name 'write_api' is not defined
Day-Ahead Total Load Forecast for AT:
                           Forecasted Load
2023-07-18 02:00:00+02:00           5116.0
2023-07-18 02:15:00+02:00           5064.0
2023-07-18 02:30:00+02:00           5016.0
2023-07-18 02:45:00+02:00           4980.0
2023-07-18 03:00:00+02:00           4976.0
...                           

In [None]:
# Download installed generation capacity - 3
for country_code in country_codes:
    try:
        # Download the installed generation capacity data
        installed_generation_capacity = client.query_installed_generation_capacity(country_code, start=start, end=end, psr_type=None)
        print(f"Installed Generation Capacity for {country_code}:")
        print(installed_generation_capacity)
        
        # Convert the index to UTC and format it as a string
        installed_generation_capacity.index = installed_generation_capacity.index.tz_convert('UTC').strftime('%Y-%m-%dT%H:%M:%SZ')

        # Create data points for each row
        data_points = []
        for _, row in installed_generation_capacity.iterrows():
            for column in installed_generation_capacity.columns[1:]:
                production_type = column
                value = row[column]
                if pd.notnull(value):
                    data_point = Point("installed_generation_capacity") \
                        .tag("country", country_code) \
                        .tag("production_type", production_type) \
                        .field("value", value) \
                        .time(row.name)
                    data_points.append(data_point)


        # Write data points to InfluxDB
        write_api.write(bucket=influxdb_bucket, record=data_points)
        logging.info('Data extraction (installed_generation_capacity) completed for country: %s', country_code)
        print(f'Data extraction (installed_generation_capacity) completed for country: {country_code}')
    except Exception as e:
        print(f'Data extraction (installed_generation_capacity) failed for {country_code}: {str(e)}')
        logging.error(f'Data extraction (installed_generation_capacity) failed for {country_code}: {str(e)}')


In [None]:
#List of country codes (out of all 84) for which the upload of installed_generation_capacity failed
failed_country_codes = [
    'BY', 'CWE', 'CZ_DE_SK', 'IS', 'IT_BRNN', 'IT_CNOR', 'IT_CSUD', 'IT_FOGN',
    'IT_GR', 'IT_MACRO_NORTH', 'IT_MACRO_SOUTH', 'IT_MALTA', 'IT_NORD', 'IT_NORD_AT',
    'IT_NORD_CH', 'IT_NORD_FR', 'IT_NORD_SI', 'IT_PRGP', 'IT_ROSN', 'IT_SACO_AC',
    'IT_SACO_DC', 'IT_SARD', 'IT_SICI', 'IT_SUD', 'PL_CZ', 'RU', 'RU_KGD', 'SE_1',
    'SE_2', 'SE_3', 'SE_4', 'TR', 'UA_DOBTPP'
]
len(failed_country_codes)
failed_country_codes

In [None]:
# Download and save the data for each country - Actual Generation per Production Type - 4
for country_code in sorted_country_codes:
    try:
        actual_generation = client.query_generation(country_code=country_code, start=start, end=end)
        print(f"Actual Generation for {country_code}:")
        print(actual_generation)  # Print the data

        # Convert the index to UTC and format it as string
        actual_generation.index = actual_generation.index.tz_convert('UTC').strftime('%Y-%m-%dT%H:%M:%SZ')

        # Convert the data to InfluxDB Line Protocol format
        data_points = []
        for timestamp, row in actual_generation.iterrows():
            for column, value in row.items():
                if pd.notnull(value):
                    production_type = str(column).replace("/", "_").replace(" ", "_")
                    data_point = Point("actual_generation") \
                        .tag("country", country_code) \
                        .tag("production_type", production_type) \
                        .field("value", value) \
                        .time(timestamp, WritePrecision.NS)
                    data_points.append(data_point)

        # Write the data points to InfluxDB
        write_api.write(bucket=influxdb_bucket, record=data_points)
        logging.info('Data extraction (actual_generation) completed for country: %s', country_code)

    except Exception as e:
        print(f'Data extraction (actual_generation) failed for {country_code}: {str(e)}')
        logging.error(f'Data extraction (actual_generation) failed for {country_code}: {str(e)}')


In [None]:
#Removing Duplicates

In [None]:
def remove_duplicates(country_code, measurement, start, end):
    query = f'''
        from(bucket: "{influxdb_bucket}")
          |> range(start: {start}, stop: {end})
          |> filter(fn: (r) => r._measurement == "{measurement}" and r.country_code == "{country_code}")
          |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
          |> keep(columns: ["_time", "_value"])
          |> distinct()
    '''

    result = query_api.query_data_frame(query)
    if not result.empty:
        result.set_index('_time', inplace=True)
        result.sort_index(inplace=True)
        return result
    else:
        return None

# Set the desired parameters
country_code = 'AT'
measurement = 'actual_total_load'
start = pd.Timestamp('2023-07-01', tz='UTC').isoformat()
end = pd.Timestamp.now(tz='UTC').isoformat()

df = remove_duplicates(country_code, measurement, start, end)
print(df)

In [None]:
#Remove Duplicates for all countries, every measurement
for country_code in sorted_country_codes:
    for measurement in categories.values():
        df = remove_duplicates(country_code, measurement, start, end)
        print(f"DataFrame: {df}, Country: {country_code}, Measurement: {measurement}")