# Preprocess Data

Data was downloaded as csvs in a country encoded file structure. The different energy quantities have different file structures. 

Example paths:
- installed_capacity data:
    - data/installed_capacity/SE/SE/Installed Capacity Per Production Unit_202001010000-202101010000 (2).csv
- total_demand data:
    - data/total_demand/SE/total_demand_SE_20150101_20200101.csv

Before staging to S3 we will add the country information into the dataframe, rename the column headers, and clean as needed.

In [74]:
import os
import pandas as pd
from pathlib import Path

#### Define a function to extract country information and get file path

In [105]:
def traverse_path(path, split_idx):
    """
    Generates a dictionary of the path to each country.
    
    Input:
        path: str. The path to the root directory
        split_idx: int. split point to capture country name. I.e. -2 splits one directory up.
    """
    
    pathlist = Path(path).glob('**/*.csv')
    
    country_paths = dict()
    
    for p in pathlist:
        path_in_str = str(p)
        country = path_in_str.split('/')[split_idx]
        country_paths[country] = path_in_str

    return country_paths

## Installed Capacity

**Processing steps:**
1. Add country column
2. Rename column headers

In [335]:
def process_capacity_demand(country_paths, output_path, name, new_cols=None):
    """
    Prepares capacity and demand csvs from the ENTOSE API for the data warehouse.
    Appends country information and renames columns
     
    country_paths: dict. country and path to csvs with installed capcity data 
    new_cols: list. list of new columns headers
    """
    
    for country, path_in_str in country_paths.items():    
        
        #load dataframe
        df = pd.read_csv(path_in_str)
        
        # add country name
        df['country_id'] = country
        
        #rename columns
        if new_cols is not None:
            if 'Unnamed: 0' in df.columns:
                df.drop('Unnamed: 0', axis=1, inplace=True)
            
            assert len(new_cols) == len(df.columns), f'new_cols must be length {len(df.columns)}'
            df.columns = new_cols
        
        #save dataframe
        
        df.to_csv(os.path.join(output_path, f'{name}-{country}-{datetime.datetime.now().year}.csv'), 
                      index=False)
        print(f'Saved: {country}')

In [332]:
root_path = '/Users/ns/github-repos/entsoe-etl-pipeline/data/installed_capacity'
output_path = '/Users/ns/github-repos/entsoe-etl-pipeline/data/installed_capacity/processed'

country_paths = traverse_path(root_path, -3)

new_install_capacity_cols = ['event_date', 'production_type', 'code', 
                             'name', 'installed_capacity_year_start',
                             'current_installed_capacity', 'location', 
                             'voltage_connection_level', 'commissioning_date', 
                             'decommissioning_date', 'country_id']

process_capacity_demand(country_paths, 
                        output_path, 
                        'capacity', 
                        new_install_capacity_cols)

Saved: SE
Saved: PL
Saved: BE
Saved: NO
Saved: CH
Saved: IT
Saved: CZ
Saved: PT
Saved: UK
Saved: GE
Saved: NL
Saved: AT
Saved: DE
Saved: DK
Saved: FI
Saved: FR
Saved: ES
Saved: IE
Saved: installed_capacity


## Total Demand processing

**Processing Steps:**
1. Add country column
2. Rename column headers

Can use the same function as for the installed capacity.

In [337]:
root_path = '/Users/ns/github-repos/entsoe-etl-pipeline/data/total_demand'

output_path = '/Users/ns/github-repos/entsoe-etl-pipeline/data/total_demand/processed'

country_paths = traverse_path(root_path, -2)

new_total_demand_cols = ['event_date', 'total_demand', 'ts', 'country_id']

process_capacity_demand(country_paths, 
                        output_path, 
                        'demand', 
                        new_total_demand_cols)

Saved: SE
Saved: PL
Saved: BE
Saved: NO
Saved: CH
Saved: PT
Saved: NL
Saved: FI
Saved: FR
Saved: ES
Saved: IE


## Total generation processing

Generation data comes from a separate source. So we need a different processing steps.

**Processing Steps:**
1. Add country column
2. Rename columns
3. Parse datetime, localize, and set as index
4. Add a timestamp column
5. Unpivot data into long format


In [116]:
root_path = '/Users/ns/github-repos/entsoe-etl-pipeline/data/total_generation'

country_paths = traverse_path(root_path, -2)

In [188]:
df = pd.read_csv(country_paths['ES'])

In [189]:
## add country column
df['country_id'] = 'ES'

In [190]:
## clean column headers
df.columns = [x[0].strip().lower() for x in df.columns.str.split("-")]

In [191]:
df['mtu'].head()

0    01.01.2017 00:00 - 01.01.2017 01:00 (CET)
1    01.01.2017 01:00 - 01.01.2017 02:00 (CET)
2    01.01.2017 02:00 - 01.01.2017 03:00 (CET)
3    01.01.2017 03:00 - 01.01.2017 04:00 (CET)
4    01.01.2017 04:00 - 01.01.2017 05:00 (CET)
Name: mtu, dtype: object

In [192]:
## parse datetime and set index as dt object
df['date'] = df['mtu'].apply(lambda x: pd.to_datetime(x.split("-")[0]))
df = df.drop('mtu', axis=1)
df = df.set_index('date')
df.index = df.index.tz_localize(tz='Europe/Brussels', 
                                ambiguous='infer', 
                                nonexistent='shift_backward')

df.head(2)

Unnamed: 0_level_0,area,biomass,fossil brown coal/lignite,fossil coal,fossil gas,fossil hard coal,fossil oil,fossil oil shale,fossil peat,geothermal,...,hydro water reservoir,marine,nuclear,other,other renewable,solar,waste,wind offshore,wind onshore,country_id
date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
2017-01-01 00:00:00+01:00,Spain (ES),341.0,901.0,0.0,5412.0,6157.0,175.0,0.0,0.0,0.0,...,1658.0,0.0,7104.0,56.0,95.0,30.0,265.0,0.0,1760.0,ES
2017-01-01 01:00:00+01:00,Spain (ES),338.0,900.0,0.0,5401.0,5959.0,176.0,0.0,0.0,0.0,...,1949.0,0.0,7104.0,57.0,96.0,30.0,265.0,0.0,1825.0,ES


In [194]:
## add timestamp column
df['ts'] = df.index.asi8
df.head(2)

Unnamed: 0_level_0,area,biomass,fossil brown coal/lignite,fossil coal,fossil gas,fossil hard coal,fossil oil,fossil oil shale,fossil peat,geothermal,...,marine,nuclear,other,other renewable,solar,waste,wind offshore,wind onshore,country_id,ts
date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
2017-01-01 00:00:00+01:00,Spain (ES),341.0,901.0,0.0,5412.0,6157.0,175.0,0.0,0.0,0.0,...,0.0,7104.0,56.0,95.0,30.0,265.0,0.0,1760.0,ES,1483225200000000000
2017-01-01 01:00:00+01:00,Spain (ES),338.0,900.0,0.0,5401.0,5959.0,176.0,0.0,0.0,0.0,...,0.0,7104.0,57.0,96.0,30.0,265.0,0.0,1825.0,ES,1483228800000000000


In [198]:
## unpivot data into long format
df.reset_index().melt(id_vars=['date', 'ts', 'country_id', 'area'],
                      var_name='generation_type', 
                      value_name='generation_load')

Unnamed: 0,date,ts,country_id,area,generation_type,generation_load
0,2017-01-01 00:00:00+01:00,1483225200000000000,ES,Spain (ES),biomass,341.0
1,2017-01-01 01:00:00+01:00,1483228800000000000,ES,Spain (ES),biomass,338.0
2,2017-01-01 02:00:00+01:00,1483232400000000000,ES,Spain (ES),biomass,337.0
3,2017-01-01 03:00:00+01:00,1483236000000000000,ES,Spain (ES),biomass,335.0
4,2017-01-01 04:00:00+01:00,1483239600000000000,ES,Spain (ES),biomass,336.0
...,...,...,...,...,...,...
183976,2017-12-31 19:00:00+01:00,1514743200000000000,ES,Spain (ES),wind onshore,14881.0
183977,2017-12-31 20:00:00+01:00,1514746800000000000,ES,Spain (ES),wind onshore,14368.0
183978,2017-12-31 21:00:00+01:00,1514750400000000000,ES,Spain (ES),wind onshore,14250.0
183979,2017-12-31 22:00:00+01:00,1514754000000000000,ES,Spain (ES),wind onshore,13787.0


In [272]:
def process_total_generation(country_paths, output_path):
    """
    Prepares the total generation csvs from the ENTOSE database for the data warehouse.
    
    Input:
        country_paths: dict. dict. country and path to csvs with total generation data 
        output_path: str. path to save
    """
    
    
    for country, path_in_str in country_paths.items():    
        
        #load dataframe
        df = pd.read_csv(path_in_str)
        

        # add country name
        df['country_id'] = country
        
        ## clean column headers
        df.columns = [x[0].strip().lower() for x in df.columns.str.split("-")]
        
        ## parse datetime and set index as dt object
        df['event_date'] = df['mtu'].apply(lambda x: pd.to_datetime(x.split("-")[0]))
        df = df.drop('mtu', axis=1)
        df = df.set_index('event_date')
        df.index = df.index.tz_localize(tz='Europe/Brussels', 
                                        ambiguous='infer', 
                                        nonexistent='shift_backward')
        
        ## Add a timestamp column
        df['ts'] = df.index.asi8
        
        #get date ranges
        start = df.index.min().strftime('%Y%m%d')
        end = df.index.max().strftime('%Y%m%d')
        
        ## unpivot data into long format
        df = df.reset_index().melt(id_vars=['event_date', 'ts', 'country_id', 'area'],
                                   var_name='generation_type', 
                                   value_name='generation_load')
        
        ## fill mising values
        df['generation_load'] = df['generation_load'].replace('n/e', 0).astype('float')
        df['generation_load'].fillna(0, inplace=True)
        
        #save dataframe
        df.to_csv(os.path.join(output_path, f'generation-{country}-{start}-{end}'), 
                  index=False)
        print(f'Saved: {country}')

In [273]:
root_path = '/Users/ns/github-repos/entsoe-etl-pipeline/data/total_generation'
output_path = '/Users/ns/github-repos/entsoe-etl-pipeline/data/total_generation/processed'

country_paths = traverse_path(root_path, -2)

process_total_generation(country_paths, output_path)

Saved: SE
Saved: PL
Saved: BE
Saved: NO
Saved: CH
Saved: PT
Saved: NL
Saved: FI
Saved: FR
Saved: ES
Saved: IE


## Day Ahead Prices Processing

**Processing Steps:**
1. Add country column
2. Change headers
3. Parse datetime into index

In [285]:
root_path = '/Users/ns/github-repos/entsoe-etl-pipeline/data/day_ahead_prices'

country_paths = traverse_path(root_path, -2)

In [286]:
df = pd.read_csv(country_paths['BE'])
df.head()

Unnamed: 0,MTU (CET),Day-ahead Price [EUR/MWh]
0,01.01.2019 00:00 - 01.01.2019 01:00,69.49
1,01.01.2019 01:00 - 01.01.2019 02:00,66.58
2,01.01.2019 02:00 - 01.01.2019 03:00,65.07
3,01.01.2019 03:00 - 01.01.2019 04:00,52.17
4,01.01.2019 04:00 - 01.01.2019 05:00,47.66


In [290]:
df['country_id'] = "country"

df.columns = [x[0].strip().lower() for x in df.columns.str.split(" ")]
df.rename(columns={'day-ahead': 'day_ahead_price'}, inplace=True)

In [291]:
df.head(2)

Unnamed: 0,mtu,day_ahead_price,country_id
0,01.01.2019 00:00 - 01.01.2019 01:00,69.49,country
1,01.01.2019 01:00 - 01.01.2019 02:00,66.58,country


In [292]:
## parse datetime and set index as dt object
df['event_date'] = df['mtu'].apply(lambda x: pd.to_datetime(x.split("-")[0]))
df = df.drop('mtu', axis=1)
df = df.set_index('event_date')
df.index = df.index.tz_localize(tz='Europe/Brussels', 
                                ambiguous='infer', 
                                nonexistent='shift_backward')

In [296]:
df.fillna(0, inplace=True)

In [307]:
def process_day_ahead_prices(country_paths, output_path):
    """
    Prepares the day ahead prices csvs from the ENTOSE database for the data warehouse.
    
    Input:
        country_paths: dict. dict. country and path to csvs with total generation data
        output_path: str. path to save
    
    """
    
    
    for country, path_in_str in country_paths.items():    
        
        #load dataframe
        df = pd.read_csv(path_in_str)
        
        # add country name
        df['country_id'] = country

        ## clean column headers
        df.columns = [x[0].strip().lower() for x in df.columns.str.split(" ")]
        df.rename(columns={'day-ahead': 'day_ahead_price'}, inplace=True)

        ## parse datetime and set index as dt object
        df['event_date'] = df['mtu'].apply(lambda x: pd.to_datetime(x.split("-")[0]))
        df = df.drop('mtu', axis=1)
        df = df.set_index('event_date')
        df.index = df.index.tz_localize(tz='Europe/Brussels', 
                                        ambiguous='infer', 
                                        nonexistent='shift_backward')
        
        ## Add a timestamp column
        df['ts'] = df.index.asi8
        
        ## fill missing values
        df.fillna(0, inplace=True)
        
        #get date ranges
        start = df.index.min().strftime('%Y%m%d')
        end = df.index.max().strftime('%Y%m%d')
        
        #save dataframe
        df = df.reset_index()
        df.to_csv(os.path.join(output_path, f'day-ahead-prices-{country}-{start}-{end}'), 
                  index=False)
        print(f'Saved: {country}')

In [308]:
root_path = '/Users/ns/github-repos/entsoe-etl-pipeline/data/day_ahead_prices'
output_path = '/Users/ns/github-repos/entsoe-etl-pipeline/data/day_ahead_prices/processed'

country_paths = traverse_path(root_path, -2)

process_day_ahead_prices(country_paths, output_path)

Saved: SE
Saved: PL
Saved: BE
Saved: NO
Saved: PT
Saved: NL
Saved: FI
Saved: FR
Saved: ES
Saved: IE
