# AXPO DATA OPS ENGINEER INTERNSHIP PROBLEM SET

## PART ONE - ETL PIPELINE

We aim to extract, transform, and load data from the Spanish power market.
This data is accessible via the public API of ESIOS.
To send valid data requests to the ESIOS API, use the following token. If
you encounter issues requesting data from the API, please let us know.

Token: ca757527cb8381ad315cd72b02a0176f8842fa5b548d99e14f4de46f61bcb17a

### LIBRARIES AND REQUIREMENTS

In [1]:
pip install --upgrade pip

Note: you may need to restart the kernel to use updated packages.


In [2]:
pip install requests

Note: you may need to restart the kernel to use updated packages.


In [3]:
# LIBRARIES

import requests
import pandas as pd
import numpy as np

### EXTRACT

Load the following indicators from 2020-01-01 to 2021-12-31:

    - Power demand forecast (indicator 460)
    - Power demand real (indicator 1293)
    - Nuclear power availability (indicator 474)

Refer to the IDs in parentheses to request the specific data from the ESIOS
API.

#### Power demand forecast

In [4]:
# API URL
url = "https://api.esios.ree.es/indicators/460"

# HEADERS
headers = {
    "Accept": "application/json; application/vnd.esios-api-v1+json",
    "Content-Type": "application/json",
    "x-api-key": "ca757527cb8381ad315cd72b02a0176f8842fa5b548d99e14f4de46f61bcb17a"
}

# PARAMETERS
params = {
    "start_date": "2020-01-01T00:00:00Z",
    "end_date": "2021-12-31T23:59:59Z"
}

# GET REQUEST
response = requests.get(url, headers=headers, params=params)

# CORRECTNESS CHECKING AND JSON
if response.status_code == 200:
    # Load the JSON response
    data = response.json()
    
    # Inspect the structure of the JSON to find where the relevant data is stored
    # Let's assume the data we want is in data['indicator']['values']
    indicator_data = data.get('indicator', {}).get('values', [])
    
    # Check if there is data
    if indicator_data:
        # Convert the list of dictionaries to a DataFrame
        df_forecast = pd.DataFrame(indicator_data)
        
        # Display the DataFrame
        display(df_forecast)
    else:
        print("No data available for the specified time period.")
else:
    print(f"Error: Unable to fetch data. Status code {response.status_code}")

Unnamed: 0,value,datetime,datetime_utc,tz_time,geo_id,geo_name
0,22519.0,2020-01-01T01:00:00.000+01:00,2020-01-01T00:00:00Z,2020-01-01T00:00:00.000Z,8741,Península
1,21222.0,2020-01-01T02:00:00.000+01:00,2020-01-01T01:00:00Z,2020-01-01T01:00:00.000Z,8741,Península
2,19900.0,2020-01-01T03:00:00.000+01:00,2020-01-01T02:00:00Z,2020-01-01T02:00:00.000Z,8741,Península
3,18998.0,2020-01-01T04:00:00.000+01:00,2020-01-01T03:00:00Z,2020-01-01T03:00:00.000Z,8741,Península
4,18849.0,2020-01-01T05:00:00.000+01:00,2020-01-01T04:00:00Z,2020-01-01T04:00:00.000Z,8741,Península
...,...,...,...,...,...,...
17539,27283.0,2021-12-31T20:00:00.000+01:00,2021-12-31T19:00:00Z,2021-12-31T19:00:00.000Z,8741,Península
17540,25723.0,2021-12-31T21:00:00.000+01:00,2021-12-31T20:00:00Z,2021-12-31T20:00:00.000Z,8741,Península
17541,23338.0,2021-12-31T22:00:00.000+01:00,2021-12-31T21:00:00Z,2021-12-31T21:00:00.000Z,8741,Península
17542,22102.0,2021-12-31T23:00:00.000+01:00,2021-12-31T22:00:00Z,2021-12-31T22:00:00.000Z,8741,Península


To be able to transform datetime later we need to know if there are different time zones in the df. 
If we had for example non-peninsular time stamps we might need to keep it in mind

In [5]:
# Check time zones

forecast_time_zones = df_forecast['geo_name'].unique()

print(forecast_time_zones)

['Península']


In [6]:
# Check which type of data we have in datetime

date_type = df_forecast['datetime'].dtypes

print(date_type)

object


#### Power demand real

In [7]:
# API URL
url = "https://api.esios.ree.es/indicators/1293"

# HEADERS
headers = {
    "Accept": "application/json; application/vnd.esios-api-v1+json",
    "Content-Type": "application/json",
    "x-api-key": "ca757527cb8381ad315cd72b02a0176f8842fa5b548d99e14f4de46f61bcb17a"
}

# PARAMETERS
params = {
    "start_date": "2020-01-01T00:00:00Z",
    "end_date": "2021-12-31T23:59:59Z"
}

# GET REQUEST
response = requests.get(url, headers=headers, params=params)

# CORRECTNESS CHECKING AND JSON
if response.status_code == 200:
    # Load the JSON response
    data = response.json()
    
    # Inspect the structure of the JSON to find where the relevant data is stored
    # Let's assume the data we want is in data['indicator']['values']
    indicator_data = data.get('indicator', {}).get('values', [])
    
    # Check if there is data
    if indicator_data:
        # Convert the list of dictionaries to a DataFrame
        df_forecast = pd.DataFrame(indicator_data)
        
        # Display the DataFrame
        display(df_forecast)
    else:
        print("No data available for the specified time period.")
else:
    print(f"Error: Unable to fetch data. Status code {response.status_code}")

Error: Unable to fetch data. Status code 504


Given that we can't access the data, we will use the same data from indicator 460, and later we will multiply the values by a random number between 0 and 2 to obtain different values for the next step of tranforming the data.

In [8]:
# Copy our df_frorecast
df_real = df_forecast.copy()

#### Nuclear power availability

In [9]:
# API URL
url = "https://api.esios.ree.es/indicators/474"

# HEADERS
headers = {
    "Accept": "application/json; application/vnd.esios-api-v1+json",
    "Content-Type": "application/json",
    "x-api-key": "ca757527cb8381ad315cd72b02a0176f8842fa5b548d99e14f4de46f61bcb17a"
}

# PARAMETERS
params = {
    "start_date": "2020-01-01T00:00:00Z",
    "end_date": "2021-12-31T23:59:59Z"
}

# GET REQUEST
response = requests.get(url, headers=headers, params=params)

# CORRECTNESS CHECKING AND JSON
if response.status_code == 200:
    # Load the JSON response
    data = response.json()
    
    # Inspect the structure of the JSON to find where the relevant data is stored
    # Let's assume the data we want is in data['indicator']['values']
    indicator_data = data.get('indicator', {}).get('values', [])
    
    # Check if there is data
    if indicator_data:
        # Convert the list of dictionaries to a DataFrame
        df_forecast = pd.DataFrame(indicator_data)
        
        # Display the DataFrame
        display(df_forecast)
    else:
        print("No data available for the specified time period.")
else:
    print(f"Error: Unable to fetch data. Status code {response.status_code}")

Error: Unable to fetch data. Status code 504


Given that we can't access the data, we will use the same data from indicator 460, and later we will multiply the values by a random number between 0 and 2 to obtain different values for the next step of tranforming the data.

In [10]:
# Copy our df_frorecast
df_nuclear = df_forecast.copy()

### TRANSFORM

Transform these three time series into a pandas DataFrame with hourly granularity and the following column structure:

    • ”datetime”, format: ”yyyy-mm-dd HH:MM:SS”
    • ”demand forecast”: values of the time series for id 460
    • ”demand real”: values of the time series for id 1293
    • ”nuclear power availability”: values of the time series for id 474
    • ”error demand forecast”: the error between the forecast of the demand and actual demand
    • ”error demand forecast avg 24H”: average of the error between forecast demand and actual demand for the last 24 hours
    • ”error demand forecast avg 12H”: average of the error between forecast demand and actual demand for the last 12 hours
    • ”demand real lag 1D”: the value of the real demand for the same hour on the previous day
    • ”demand forecast lag 1D”: the value of the forecast demand for the same hour in the previous day
    • ”nuclear power availability avg 24H”: average of the nuclear power availability for the last 24 hours
    • ”nuclear power availability avg 12H”: average of the nuclear power availability for the last 12 hours

Save the DataFrame as a csv file (yourname etl.csv).

There is randomness, every time we run the code the random numbers will be different. If we need reproducibility, we can set a seed for the random number generator with *np.random.seed(seed_number)*.

In [11]:
# FIx the seed for reproducibility
np.random.seed(24)

First, we will transform *df_real* and *df_nuclear* to make them different from *df_forecast*, by multiplying each of their values by a random number between 0 and 2.

In [12]:
# Generate random numbers between 0 and 2 for each row
random_factors1 = np.random.uniform(0, 2, size=len(df_forecast))
random_factors2 = np.random.uniform(0, 2, size=len(df_forecast))

# Multiply the 'values' column in the copied DataFrames by the random numbers
df_real['value'] *= random_factors1
df_nuclear['value'] *= random_factors2

#### Datetime changes

In [13]:
# Check data type after conversion
print(df_forecast['datetime'].dtypes)


object


In [14]:
# Format 'datetime' column
df_forecast['datetime'] = pd.to_datetime(df_forecast['datetime'], format='%Y-%m-%dT%H:%M:%S.%f%z', utc=True, errors='coerce').dt.tz_localize(None)
df_real['datetime'] = pd.to_datetime(df_real['datetime'], format='%Y-%m-%dT%H:%M:%S.%f%z', utc=True, errors='coerce').dt.tz_localize(None)
df_nuclear['datetime'] = pd.to_datetime(df_nuclear['datetime'], format='%Y-%m-%dT%H:%M:%S.%f%z', utc=True, errors='coerce').dt.tz_localize(None)

We see that in the original df the record began at UTC+1 and at 01:00:00, and in the new one it begins without taking into account UTC and at 00:00:00. If it were necessary to change it to add an hour, it could be done.

#### Rename columns

In [15]:
# Rename the 'value' column in each DataFrame with the appropriate name
df_forecast = df_forecast.rename(columns={'value': 'demand_forecast'})
df_real = df_real.rename(columns={'value': 'demand_real'})
df_nuclear = df_nuclear.rename(columns={'value': 'nuclear_power_availability'})

#### Merge dataframes

To be able to calculate means and errors we need to merge all the data into one dataframe to make it easier to acces all the data at once.

In [16]:
# Merge the DataFrames into one based on 'datetime'
df_merged = pd.merge(df_forecast[['datetime', 'demand_forecast']], df_real[['datetime', 'demand_real']], on='datetime')
df_merged = pd.merge(df_merged, df_nuclear[['datetime', 'nuclear_power_availability']], on='datetime')

#### Error demand

In [17]:
# Calculate the error in the demand
df_merged['error_demand_forecast'] = df_merged['demand_forecast'] - df_merged['demand_real']

In [18]:
# Calculate 24h and 12h moving averages for the error
df_merged['error_demand_forecast_avg_24H'] = df_merged['error_demand_forecast'].rolling(window=24, min_periods=1).mean()
df_merged['error_demand_forecast_avg_12H'] = df_merged['error_demand_forecast'].rolling(window=12, min_periods=1).mean()

Calculating moving averages in the first rows can result in *NaN* because the calculation window (for example, 24 hours 0 12 hours) cannot be completed if there is not enough previous data.

In Pandas, the rolling function has a parameter called *min_periods* that allows us to define the minimum number of observations necessary for the value of the moving average to be calculated. If the number of observations is less than *min_periods*, pandas will calculate the moving average using only the available values ​​instead of returning *NaN*.

Another alternative would be filler methods. We would consider *fillna(method='bfill')*, which fills the NaN with the first valid value available after the *NaN*, since we assume that we are interested in knowing the average error in that time slot.
If we prefer this the code would be the following:

In [19]:
#df_merged['error_demand_forecast_avg_24H'] = df_merged['error_demand_forecast_avg_24H'].fillna(method='bfill')
#df_merged['error_demand_forecast_avg_12H'] = df_merged['error_demand_forecast_avg_12H'].fillna(method='bfill')

#### 1D lag

In [20]:
# Calculate real and forecast demand values ​​with a delay of 1 day
df_merged['demand_real_lag_1D'] = df_merged['demand_real'].shift(24).shift(24).fillna(0, limit=24)
df_merged['demand_forecast_lag_1D'] = df_merged['demand_forecast'].shift(24).shift(24).fillna(0, limit=24)

Again, the first values ​​will appear with *NaN* because there is no previous data for the calculation. As we assume that a numerical value is better than a *NaN*, we choose to fill these values ​​with 0, but if *NaN*s are preferred, we would only have to eliminate *.shift(24).fillna(0, limit=24)* from the code.

#### Nuclear averages

In [21]:
# Calculate 24H and 12H moving averages for nuclear power availability
df_merged['nuclear_power_availability_avg_24H'] = df_merged['nuclear_power_availability'].rolling(window=24, min_periods=1).mean()
df_merged['nuclear_power_availability_avg_12H'] = df_merged['nuclear_power_availability'].rolling(window=12, min_periods=1).mean()

### SHOW AND EXPORT

In [22]:
# Show the final df
df_merged

Unnamed: 0,datetime,demand_forecast,demand_real,nuclear_power_availability,error_demand_forecast,error_demand_forecast_avg_24H,error_demand_forecast_avg_12H,demand_real_lag_1D,demand_forecast_lag_1D,nuclear_power_availability_avg_24H,nuclear_power_availability_avg_12H
0,2020-01-01 00:00:00,22519.0,43237.259308,4762.944652,-20718.259308,-20718.259308,-20718.259308,0.000000,0.0,4762.944652,4762.944652
1,2020-01-01 01:00:00,21222.0,29690.089448,25142.133396,-8468.089448,-14593.174378,-14593.174378,0.000000,0.0,14952.539024,14952.539024
2,2020-01-01 02:00:00,19900.0,39794.718246,27050.834644,-19894.718246,-16360.355667,-16360.355667,0.000000,0.0,18985.304231,18985.304231
3,2020-01-01 03:00:00,18998.0,8361.677123,3842.098642,10636.322877,-9611.186031,-9611.186031,0.000000,0.0,15199.502834,15199.502834
4,2020-01-01 04:00:00,18849.0,13611.102432,32214.272026,5237.897568,-6641.369311,-6641.369311,0.000000,0.0,18602.456672,18602.456672
...,...,...,...,...,...,...,...,...,...,...,...
17539,2021-12-31 19:00:00,27283.0,16669.564063,52622.449850,10613.435937,-8.824251,1190.259986,59398.574837,29776.0,27584.080565,29587.690009
17540,2021-12-31 20:00:00,25723.0,45434.464669,7574.170695,-19711.464669,-1158.903062,-1521.899870,29647.249428,29313.0,26526.607183,27781.672210
17541,2021-12-31 21:00:00,23338.0,41999.135274,22298.652254,-18661.135274,-2972.532941,-4842.935952,46441.408938,27005.0,26029.280193,26875.824121
17542,2021-12-31 22:00:00,22102.0,21202.360261,1406.362327,899.639739,-2379.043887,-5690.920188,34256.758462,24606.0,24668.402764,24354.032949


In [23]:
# Save as a *.csv* archive
df_merged.to_csv('regina_hergueta_etl.csv', index=False)