# Analysis of European Electricity Generation
### Data Engineering Capstone Project

#### Project Summary
The project is preparing the tables for analysing the electricity generation in european countries. The purpose is threeful:
* analyse the generation of different production units in different European countries,
* create the foundation for producing the prediction models, forecasting the electricity generations by different units,
* analyse the predictions done by different TSOs (Transmission System Operators).

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

#### 0. Import Libraries

In [1]:
!pip install xmltodict



In [2]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

In [3]:
# Do all imports and installs here
import pandas as pd
import numpy as np
import io
import boto3
from datetime import datetime, date, timedelta
import requests
import xmltodict
import psycopg2
import configparser


#### 1. Connect to AWS

In [4]:
# CONFIG
config = configparser.ConfigParser()
config.read('dwh.cfg')

['dwh.cfg']

In [5]:
# Read the data from s3
s3 = boto3.resource('s3',
         aws_access_key_id=config['IAM_ROLE']['ACCESS_USER'],
         aws_secret_access_key=config['IAM_ROLE']['ACCESS_KEY'])

s3_client = boto3.client('s3',
                         aws_access_key_id=config['IAM_ROLE']['ACCESS_USER'],
                         aws_secret_access_key=config['IAM_ROLE']['ACCESS_KEY'])

bucket = s3.Bucket(config['IAM_ROLE']['ARN_MY_S3'])

In [6]:
redshift = boto3.client('redshift',
                       region_name="eu-north-1",
                       aws_access_key_id=config['IAM_ROLE']['ACCESS_USER'],
                       aws_secret_access_key=config['IAM_ROLE']['ACCESS_KEY'])

In [7]:
ec2 = boto3.resource('ec2',
                       region_name="eu-north-1",
                       aws_access_key_id=config['IAM_ROLE']['ACCESS_USER'],
                       aws_secret_access_key=config['IAM_ROLE']['ACCESS_KEY'])

In [8]:
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

myClusterProps = redshift.describe_clusters(ClusterIdentifier='udacitydataeng')['Clusters'][0]
prettyRedshiftProps(myClusterProps)

Unnamed: 0,Key,Value
0,ClusterIdentifier,udacitydataeng
1,NodeType,dc2.large
2,ClusterStatus,available
3,MasterUsername,udacity_user_210501
4,DBName,whysolate_udacity_de
5,Endpoint,"{'Address': 'udacitydataeng.cnn1dboj2hbx.eu-north-1.redshift.amazonaws.com', 'Port': 5439}"
6,VpcId,vpc-dd0fb2b4
7,NumberOfNodes,2


In [9]:
try:
    vpc = ec2.Vpc(id=myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    defaultSg.authorize_ingress(
        GroupName=defaultSg.group_name,
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP',
        FromPort=int(config['CLUSTER']['DB_PORT']),
        ToPort=int(config['CLUSTER']['DB_PORT'])
    )
except Exception as e:
    print(e)

ec2.SecurityGroup(id='sg-03efb61ca30c40c87')
An error occurred (InvalidPermission.Duplicate) when calling the AuthorizeSecurityGroupIngress operation: the specified rule "peer: 0.0.0.0/0, TCP, from port: 5439, to port: 5439, ALLOW" already exists


### Step 1: Scope the Project and Gather Data

#### Scope 
The project gathers the data from different sources (described in a following subchapter), preprocesses it, and puts into AWS Redshift Data Warehouse. End solution and technology used are presented in chapter 3. 


#### Describe and Gather Data 
The data should be containing:
- information about production units (name, area that unit belongs to, type of generation)
- temperature information from different countries, which would allow forecasting electricity production from solar power plants,
- electricity production from different production unit over time,
- price of electricity in different areas,
- load for electricity in those areas,
- electiricty forecasts from different sources of electricity in certain areas over time.

The information about temperature comes from kaggle dataset (https://www.kaggle.com/sudalairajkumar/daily-temperature-of-major-cities). In the future, it should be replaced with future data from some API, as the current dataset was chosen only for the purpose of showing ability to preprocess file with *.csv* extension.

All the other information is received from ENTSO-E API (https://transparency.entsoe.eu/ and https://www.entsoe.eu/data/energy-identification-codes-eic/). 

##### Eiccodes

This dataset lists different areas that can be described with other data using ENTSO-E datasets. It will be used to identify characteristics of different countries, such as the generation of their production units or national load. It will be linked by others using column 0, which is described by ENTSO-E as 'EicCode'. Areas can be recognized either by column 1 (display name) or 2 (full name).

In [10]:
obj = s3.Object(config['IAM_ROLE']['ARN_MY_S3'],'Y_eiccodes.csv')
data=(obj.get()['Body'].read())
eiccodes = pd.read_csv(io.BytesIO(data), delimiter=';', header = None)

In [11]:
eiccodes.head()

Unnamed: 0,0,1,2,3,4,5,6,7,8
0,10Y1001A1001A016,GB-NI,Northern Ireland,,,Active,BT6 9RT,GB,Control Area
1,10Y1001A1001A43R,COMP-SMM-AREA,Comp. Control Block Area Macedonia-Montenegro-Serbia,,,Active,,,Valid EIC Function needed
2,10Y1001A1001A47J,SE4,Swedish Elspot Area 4,,,Active,,,Bidding Zone
3,10Y1001A1001A49F,RUSSIAN_AREA,Russian area,,,Active,,,Market Balance Area
4,10Y1001A1001A50U,KALININGRAD_AREA,Kaliningrad area,,,Active,,,Market Balance Area


##### Production Units

This dataset lists the production units from Poland and their characteristics. The dataset is limited only to Poland, as Poland has been chosen as a reference to show that it is possible to engineer the described project. It is possible to get more data, however due to personal limitations on personal budget used for this project, it will be limited.

In [12]:
obj = s3.Object(config['IAM_ROLE']['ARN_MY_S3'],'ProductionUnits.csv')
data=(obj.get()['Body'].read())
produnits = pd.read_csv(io.BytesIO(data), delimiter=';', header = None)

In [13]:
produnits.head()

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17
0,19W0000000001195,Łaziska 3,01/01/2011,21/08/2014,COMMISSIONED,B05,Polska,895,220,10YPL-AREA-----S,10YPL-AREA-----S,20/03/2015,19W0000000001268,Łaziska 3 B12,COMMISSIONED,B05,Polska,225.0
1,19W0000000001195,Łaziska 3,01/01/2011,21/08/2014,COMMISSIONED,B05,Polska,895,220,10YPL-AREA-----S,10YPL-AREA-----S,20/03/2015,19W000000000122G,Łaziska 3 B10,COMMISSIONED,B05,Polska,215.0
2,19W0000000001195,Łaziska 3,01/01/2011,21/08/2014,COMMISSIONED,B05,Polska,895,220,10YPL-AREA-----S,10YPL-AREA-----S,20/03/2015,19W000000000124C,Łaziska 3 B11,COMMISSIONED,B05,Polska,225.0
3,19W0000000001195,Łaziska 3,01/01/2011,21/08/2014,COMMISSIONED,B05,Polska,895,220,10YPL-AREA-----S,10YPL-AREA-----S,20/03/2015,19W000000000121I,Łaziska 3 B09,COMMISSIONED,B05,Polska,230.0
4,19W0000000001195,Łaziska 3,21/08/2014,Infinity,COMMISSIONED,B05,Polska,905,220,10YPL-AREA-----S,10YPL-AREA-----S,,19W0000000001268,Łaziska 3 B12,COMMISSIONED,B05,Polska,225.0


##### Temperature

This dataset presents the average temperature and its uncertainty for different world cities, per time.

In [14]:
obj = s3.Object(config['IAM_ROLE']['ARN_MY_S3'],'GlobalLandTemperaturesByCity2.csv')
data=(obj.get()['Body'].read())
temp = pd.read_csv(io.BytesIO(data), delimiter=',', header = None)

In [15]:
temp.head()

Unnamed: 0,0,1,2,3,4,5,6
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [16]:
temp[temp.iloc[:, 4] == 'Poland'].iloc[:,0].max()

'2013-09-01'

#### Others

In [17]:
# For presenting the data purpose, the data shown will be limited to only 1 day.
my_date = pd.to_datetime('2017-01-01')
my_area = '10YPL-AREA-----S'
APP_ID = config['ENTSOE']['APP_ID']

In [18]:
from aws_functions import *

## Generation realized

In [19]:
def get_generation(APP_ID, my_date, my_area):
    
    start_date_url = str(my_date.date()).replace("-", "")
    end_date_url = str((my_date + pd.to_timedelta(1, "D")).date()).replace("-", "")
    
    # actual generation (a73) realized (a16)
    URL = f'https://transparency.entsoe.eu/api?securityToken={APP_ID}&documentType=A73&processType=A16&in_Domain={my_area}&periodStart={start_date_url}2300&periodEnd={end_date_url}2300'

    # get response
    response = requests.get(URL)
    o = xmltodict.parse(response.content)
    
    generation = []
    for i in o['GL_MarketDocument']['TimeSeries']:
        for j in i['Period']['Point']:
            generation.append([i['registeredResource.mRID']['@codingScheme'], i['registeredResource.mRID']['#text'], i['MktPSRType']['psrType'], 
                            i['MktPSRType']['PowerSystemResources']['name'], i['MktPSRType']['PowerSystemResources']['mRID']['#text'],
                            str(my_date + pd.to_timedelta(int(j['position']), 'H')), j['quantity'], my_area])

    generation = pd.DataFrame(generation, columns = ['codingScheme', 'registeredResource', 'psrType', 'MktPSRTypeName', 
                                                 'MktPSRTypeText', 'time', 'quantity', 'area'])
    
    return generation
    

In [20]:
generation = get_generation(APP_ID, my_date, my_area)
generation.head()

Unnamed: 0,codingScheme,registeredResource,psrType,MktPSRTypeName,MktPSRTypeText,time,quantity,area
0,A01,19W0000000000806,B05,Kraków Łęg B2,19W000000000086V,2017-01-01 01:00:00,93,10YPL-AREA-----S
1,A01,19W0000000000806,B05,Kraków Łęg B2,19W000000000086V,2017-01-01 02:00:00,92,10YPL-AREA-----S
2,A01,19W0000000000806,B05,Kraków Łęg B2,19W000000000086V,2017-01-01 03:00:00,91,10YPL-AREA-----S
3,A01,19W0000000000806,B05,Kraków Łęg B2,19W000000000086V,2017-01-01 04:00:00,92,10YPL-AREA-----S
4,A01,19W0000000000806,B05,Kraków Łęg B2,19W000000000086V,2017-01-01 05:00:00,91,10YPL-AREA-----S


In [21]:
def save_generation(APP_ID, my_date, my_area, folder_name):
    # define generation
    generation = get_generation(APP_ID, my_date, my_area)
    # create a folder if needed
    create_folder_s3(folder_name)
    # Save in s3
    save_csv_in_s3(generation, s3_client, folder_name, datetime.date(my_date), my_bucket = config['IAM_ROLE']['ARN_MY_S3'])

## Forecast

In [22]:
def get_forecast(APP_ID, my_date, my_area):
    
    start_date_url = str(my_date.date()).replace("-", "")
    end_date_url = str((my_date + pd.to_timedelta(1, "D")).date()).replace("-", "")

    # actual generation (a73) realized (a16)
    URL = f'https://transparency.entsoe.eu/api?securityToken={APP_ID}&documentType=A71&processType=A01&in_Domain={my_area}&periodStart={start_date_url}2300&periodEnd={end_date_url}2300'

    # get response
    response = requests.get(URL)
    o = xmltodict.parse(response.content)
    
    forecast = []
    for i in o['GL_MarketDocument']['TimeSeries']['Period']['Point']:
        forecast.append([str(my_date + pd.to_timedelta(int(i['position']), 'H')), i['quantity'], my_area])

    forecast = pd.DataFrame(forecast, columns = ['time', 'quantity', 'area'])
    
    return forecast
    
    
def save_forecast(APP_ID, my_date, my_area, folder_name):
    # define forecast
    forecast = get_forecast(APP_ID, my_date, my_area)
    # create a folder if needed
    create_folder_s3(folder_name)
    # Save in s3
    save_csv_in_s3(forecast, s3_client, folder_name, datetime.date(my_date), my_bucket = config['IAM_ROLE']['ARN_MY_S3'])

In [23]:
forecast = get_forecast(APP_ID, my_date, my_area)
forecast.head()

Unnamed: 0,time,quantity,area
0,2017-01-01 01:00:00,15259,10YPL-AREA-----S
1,2017-01-01 02:00:00,15036,10YPL-AREA-----S
2,2017-01-01 03:00:00,14903,10YPL-AREA-----S
3,2017-01-01 04:00:00,14960,10YPL-AREA-----S
4,2017-01-01 05:00:00,15353,10YPL-AREA-----S


## Prices

In [24]:
def get_prices(APP_ID, my_date, my_area):

    start_date_url = str(my_date.date()).replace("-", "")
    end_date_url = str((my_date + pd.to_timedelta(1, "D")).date()).replace("-", "")
    
    URL = f'https://transparency.entsoe.eu/api?securityToken={APP_ID}&documentType=A44&in_Domain={my_area}&out_Domain={my_area}&periodStart={start_date_url}2300&periodEnd={end_date_url}2300'

    # get response
    response = requests.get(URL)
    o = xmltodict.parse(response.content)
    
    prices = []
    for i in o['Publication_MarketDocument']['TimeSeries']['Period']['Point']:
        prices.append([str(my_date + pd.to_timedelta(int(i['position']), 'H')), i['price.amount'], my_area])

    prices = pd.DataFrame(prices, columns = ['time', 'price', 'area'])
    
    return prices
    
    
def save_prices(APP_ID, my_date, my_area, folder_name):
    # define forecast
    prices = get_prices(APP_ID, my_date, my_area)
    # create a folder if needed
    create_folder_s3(folder_name)
    # Save in s3
    save_csv_in_s3(prices, s3_client, folder_name, datetime.date(my_date), my_bucket = config['IAM_ROLE']['ARN_MY_S3'])

In [25]:
prices = get_prices(APP_ID, my_date, my_area)
prices.head()

Unnamed: 0,time,price,area
0,2017-01-01 01:00:00,25.72,10YPL-AREA-----S
1,2017-01-01 02:00:00,19.79,10YPL-AREA-----S
2,2017-01-01 03:00:00,19.1,10YPL-AREA-----S
3,2017-01-01 04:00:00,18.89,10YPL-AREA-----S
4,2017-01-01 05:00:00,20.51,10YPL-AREA-----S


## Load

In [26]:
def get_load(APP_ID, my_date, my_area):

    start_date_url = str(my_date.date()).replace("-", "")
    end_date_url = str((my_date + pd.to_timedelta(1, "D")).date()).replace("-", "")
    
    URL = f'https://transparency.entsoe.eu/api?securityToken={APP_ID}&documentType=A65&processType=A16&outBiddingZone_Domain={my_area}&periodStart={start_date_url}2300&periodEnd={end_date_url}2300'

    # get response
    response = requests.get(URL)
    o = xmltodict.parse(response.content)
    
    load = []
    for i in o['GL_MarketDocument']['TimeSeries']['Period']['Point']:
        load.append([str(my_date + pd.to_timedelta(int(i['position']), 'H')), i['quantity'], my_area])

    load = pd.DataFrame(load, columns = ['time', 'quantity', 'area'])
    
    return load
    
    
def save_load(APP_ID, my_date, my_area, folder_name):
    # define forecast
    load = get_load(APP_ID, my_date, my_area)
    # create a folder if needed
    create_folder_s3(folder_name)
    # Save in s3
    save_csv_in_s3(load, s3_client, folder_name, datetime.date(my_date), my_bucket = config['IAM_ROLE']['ARN_MY_S3'])

In [27]:
load = get_load(APP_ID, my_date, my_area)
load.head()

Unnamed: 0,time,quantity,area
0,2017-01-01 01:00:00,14133,10YPL-AREA-----S
1,2017-01-01 02:00:00,13757,10YPL-AREA-----S
2,2017-01-01 03:00:00,13639,10YPL-AREA-----S
3,2017-01-01 04:00:00,13703,10YPL-AREA-----S
4,2017-01-01 05:00:00,14016,10YPL-AREA-----S


### Step 2: Explore and Assess the Data


In [28]:
# EIC Codes information 
eiccodes.shape
eiccodes.dtypes
eiccodes.describe()

(1102, 9)

0    object
1    object
2    object
3    object
4    object
5    object
6    object
7    object
8    object
dtype: object

Unnamed: 0,0,1,2,3,4,5,6,7,8
count,1102,1102,1101,11,523,1043,213,601,1040
unique,1102,1102,1021,9,181,1,131,36,18
top,21Y000000000062A,FI_RKE000,UPM Sähkönsiirto Oy,10YCB-GERMANY--8,23XFORTUMPOWER-W,Active,90,FI,Metering Grid Area
freq,1,1,13,2,32,1043,9,445,438


In [29]:
# Polish production units
produnits.shape
produnits.dtypes
produnits.describe()

(575, 18)

0     object 
1     object 
2     object 
3     object 
4     object 
5     object 
6     object 
7     int64  
8     int64  
9     object 
10    object 
11    object 
12    object 
13    object 
14    object 
15    object 
16    object 
17    float64
dtype: object

Unnamed: 0,7,8,17
count,575.0,575.0,497.0
mean,1775.702609,273.634783,260.267606
std,1835.735339,139.298287,169.041137
min,0.0,0.0,0.0
25%,330.0,110.0,195.0
50%,1345.0,400.0,225.0
75%,1983.0,400.0,370.0
max,5472.0,400.0,1075.0


In [30]:
# Global temperature data
temp.shape
temp.dtypes
temp.describe()

(1048575, 7)

0    object 
1    float64
2    float64
3    object 
4    object 
5    object 
6    object 
dtype: object

Unnamed: 0,1,2
count,1001028.0,1001028.0
mean,17.92899,1.033831
std,10.35884,1.103003
min,-31.874,0.036
25%,12.017,0.348
50%,20.486,0.607
75%,25.926,1.38
max,39.156,15.03


In [31]:
# Generation of electicity units
generation.shape
generation.dtypes
generation.describe()

(3168, 8)

codingScheme          object
registeredResource    object
psrType               object
MktPSRTypeName        object
MktPSRTypeText        object
time                  object
quantity              object
area                  object
dtype: object

Unnamed: 0,codingScheme,registeredResource,psrType,MktPSRTypeName,MktPSRTypeText,time,quantity,area
count,3168,3168,3168,3168,3168,3168,3168,3168
unique,1,37,5,132,132,24,337,1
top,A01,19W000000000016F,B05,Adamów B5,19W0000000001187,2017-01-01 17:00:00,0,10YPL-AREA-----S
freq,3168,312,1872,24,24,132,1195,3168


In [32]:
# Forecast of electicity production
forecast.shape
forecast.dtypes
forecast.describe()

(24, 3)

time        object
quantity    object
area        object
dtype: object

Unnamed: 0,time,quantity,area
count,24,24,24
unique,24,24,1
top,2017-01-01 03:00:00,21277,10YPL-AREA-----S
freq,1,1,24


In [33]:
# Electicity prices
prices.shape
prices.dtypes
prices.describe()

(24, 3)

time     object
price    object
area     object
dtype: object

Unnamed: 0,time,price,area
count,24,24.0,24
unique,24,23.0,1
top,2017-01-01 03:00:00,39.17,10YPL-AREA-----S
freq,1,2.0,24


In [34]:
# National load
load.shape
load.dtypes
load.describe()

(24, 3)

time        object
quantity    object
area        object
dtype: object

Unnamed: 0,time,quantity,area
count,24,24,24
unique,24,24,1
top,2017-01-01 03:00:00,22506,10YPL-AREA-----S
freq,1,1,24


#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

The data quality is very good, as it has all the needed information to perform the analysis described in the project purpose. However, the temperature dataset is missing many rows of data. It is also to be noted that the data is described till 2013, which means that the analysis would not be as accurate as it could (it cannot be joined with ENTSO-E data, which starts in 2017), if the data would be described till more recent years. 

#### Cleaning Steps
There is no need for cleaning the data, yet some of the columns are not necessary. If needed to check which columns will be taken from the ones shown above, please check *sql_queries.py* functions and queries.

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
The database follows star schema. It allows to easily access the data and map it as necessary, which is so needed for both analytics and machine learning. There is one fact table (eiccodes), and 6 dimension tables (forecast, generation, load, prices, produnits, temps).

#### 3.2 Mapping Out Data Pipelines
ETL pipeline is created using following steps:
1. Extracting the data (ENTSO-E Codes for fact table and temperature data for one of the dimension tables) from s3 bucket,
2. Transforming that data,
3. Creating the fact table and inserting the data into it,
4. Creating the dims TEMPERATURE table and inserting the data into it,
5. Creating the other dimension tables and inserting the data into it from ENTSO-E API.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [35]:
from sql_queries import *

# Insert to S3

In [36]:
sdate = date(2017, 1, 1)   # start date
edate = date(2017, 1, 7)   # end date

delta = edate - sdate  # as timedelta

for i in range(delta.days + 1):
    day = sdate + timedelta(days=i)
    save_generation(APP_ID, pd.to_datetime(day), my_area, 'PL2017generation')
    save_forecast(APP_ID, pd.to_datetime(day), my_area, 'PL2017forecast')
    save_prices(APP_ID, pd.to_datetime(day), my_area, 'PL2017prices')
    save_load(APP_ID, pd.to_datetime(day), my_area, 'PL2017load')

# Insert to Redshift

In [37]:
def drop_tables(cur, conn):
    for query in drop_table_queries:
        cur.execute(query)
        conn.commit() 


def create_tables(cur, conn):
    for query in create_table_queries:
        cur.execute(query)
        conn.commit()

def insert_tables(cur, conn):
    for query in insert_table_queries:
        cur.execute(query)
        conn.commit()

In [38]:
# CONFIG
config = configparser.ConfigParser()
config.read('dwh.cfg')

['dwh.cfg']

In [39]:
# Connect to DWH
conn = psycopg2.connect(f"host={config['CLUSTER']['HOST']} dbname={config['CLUSTER']['DB_NAME']} user={config['CLUSTER']['DB_USER']} password={config['CLUSTER']['DB_PASSWORD']} port={config['CLUSTER']['DB_PORT']}")
cur = conn.cursor()

In [40]:
drop_tables(cur, conn)
create_tables(cur, conn)
insert_tables(cur, conn)
conn.close()

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [5]:
def execute_my_query(my_query, my_cluster):
    # Connect to DWH
    conn = psycopg2.connect(f"host={my_cluster['HOST']} dbname={my_cluster['DB_NAME']} user={my_cluster['DB_USER']} password={my_cluster['DB_PASSWORD']} port={my_cluster['DB_PORT']}")
    cur = conn.cursor()
    # Execute
    cur.execute(my_query)
    return pd.DataFrame(cur.fetchall(), columns=[x[0] for x in cur.description])
    

In [6]:
def unit_eiccodes(my_query, my_cluster):
    """
    Following function will check if the joining was performed correctly,
    which means that the data will have correct number of columns and number of rows will be above 0.
    """
    returned_df = execute_my_query(my_query, my_cluster)
    if (returned_df.shape[0] == 0):
        raise ValueError('DF has no rows.')
    elif (returned_df.shape[1] != 6):
        raise ValueError('DF has too little columns.')
        
    return returned_df
    

In [7]:
# This query presents merging the data between the 'eiccodes' and 'price' to show the price for each hour for Poland 
query_eiccodes = """
select eic.*, pric."time", pric.price
from "public".eiccodes eic 
inner join "public".prices pric on eic.eiccode = pric.area
where pric."time" <= '2021/01/01'
order by pric."time"
"""

unit_eiccodes(query_eiccodes, config['CLUSTER'])

Unnamed: 0,eiccode,eicdisplayname,eiclongname,marketparticipantisocountrycode,time,price
0,10YPL-AREA-----S,PL,Poland,PL,2017-01-01 01:00:00,25
1,10YPL-AREA-----S,PL,Poland,PL,2017-01-01 02:00:00,19
2,10YPL-AREA-----S,PL,Poland,PL,2017-01-01 03:00:00,19
3,10YPL-AREA-----S,PL,Poland,PL,2017-01-01 04:00:00,18
4,10YPL-AREA-----S,PL,Poland,PL,2017-01-01 05:00:00,20
5,10YPL-AREA-----S,PL,Poland,PL,2017-01-01 06:00:00,25
6,10YPL-AREA-----S,PL,Poland,PL,2017-01-01 07:00:00,27
7,10YPL-AREA-----S,PL,Poland,PL,2017-01-01 08:00:00,31
8,10YPL-AREA-----S,PL,Poland,PL,2017-01-01 09:00:00,34
9,10YPL-AREA-----S,PL,Poland,PL,2017-01-01 10:00:00,37


In [8]:
def unit_size(my_query, my_cluster):
    """
    Following function will check if all the tables have sufficient number of rows,
    and if the tables have the data.
    """
    returned_df = execute_my_query(my_query, my_cluster)
    if (returned_df.shape[0] == 0):
        raise ValueError('There are no tables')
    elif (returned_df.shape[0] != 9):
        raise ValueError('There should be 9 tables in the svv_table_info.')
    elif (returned_df.loc[:, 'estimated_visible_rows'].min() == 0):
        raise ValueError('There is a table with 0 rows.')
    elif (returned_df.loc[:, 'estimated_visible_rows'].sum() < 1e6):
        raise ValueError('There are not enough rows in total.')
        
    return returned_df
    

In [None]:
# This query shows the schema and size of each table to show that the data is inputted sufficiently 
query_size = """
select info."database", info."schema", info.table, info.size, info.pct_used, info."empty", info.tbl_rows, info.estimated_visible_rows
from pg_catalog.svv_table_info info
"""
unit_size(query_size, config['CLUSTER'])

#### 4.3 Data dictionary 

*Eiccodes*

Table presenting different areas that can be described with other data using ENTSO-E datasets.

- eiccode - unique identifier representing certain area,
- eicdisplayname - display name of that area,
- eiclongname - long name of that area, which is intuitive to understand for a human being what the area is,
- marketparticipantisocountrycode - standarized country code.

*Produnits*

Table listing the production units from Poland and their characteristics.

- pueiccode - EIC code of a production unit (unique identifier),
- puname - Name of the production unit,
- validfrom - date since which the unit has its status assigned,
- validto - date till which the unit has its status assigned,
- pustatus - status of a unit in certain time period,
- putype - standarized type of a unit,
- pulocation - origin country of a unit,
- puinstalledcapacity - installed capacity of a unit,
- controlarea - unique identifier of the control area to which the unit belongs.

*Temperature*

Table presenting the historical temperature data for different world cities.

- dt - datetime of the measurement,
- averagetemperature - average temperature of a time period,
- averagetemperatureuncertainty - standard deviation of the measurements for certain time period,
- city - city name,
- country - country in which the city belongs to,
- latitude - latitude of the city,
- longitude - longitude of the city,

*Generation*

Table presenting electricity generated by production units.

- codingscheme - standarized scheme code,
- registeredresource - unique identifier of the resource of the unit,
- psrtype - standarized PSR type of the unit,
- mktpsrtypename - name of the unit,
- mktpsrtypetext - unique identifier of the unit,
- time - datetime of the measurement,
- quantity - quantity of the generated electricity,
- area - unique identifier of the country.

*Forecast*

Table presenting forecasted national load, done by TSO of the country.

- time - datetime of the forecasted measurement,
- quantity - forecasted national load,
- area - unique identifier of the country.

*Load*

Table presenting national load.

- time - datetime of the measurement,
- quantity - national load,
- area - unique identifier of the country.

*Prices*

Table presenting the prices of the electricity in the area.

- time - datetime of the measurement,
- price - price of the unit of electricity,
- area - unique identifier of the country.

The schema diagram is drawn and in *Udacity_ENTSOE.png* file included in the same folder as the notebook.

#### Step 5: Complete Project Write Up

Project allowed exploring different aspects of Data Engineering, and still left space for further explorations. It could be that Spark can be introduced to the project to make the computation faster; Airflow can be introduced to create DAG updating the database on a daily or even hourly basis, if needed. Then, temperature data could be replaced with calls to some API, which would also allow creating dashboards and analysing the live data. 

* Rationale for the choice of tools and technologies for the project.

Airflow seems to be the best choice for the technology used for the project, as it can allow continuous flow of the data to the data warehouse. This allows analysing life data, forecasting and creating dashboards, as it was presented in the purposes of the project itself. Other tools, such as AWS (Redshift build on PostgreSQL) and Python, are clear choice for storage and preprocessing, as those technologies were main part of the course.

* How often the data should be updated?

If only the data would go live, it is enough to be updated every day. This is because the interval is 1h, hence the minimum update interval is 1h, yet it's not necessary. This would create only 1 row for each category, hence the cost of preprocessing to the value of getting the data faster is not very high. At the same time, the data and the results of forecasting would be analysed only each day. If only needed to reanalyse the data and perform forecasting, the only update needed is the temperature data, which lasts till year 2013.

* How the problem would be approached under the following scenarios:
 * The data was increased by 100x.

The processing needs to be changed to pySpark. Additionally, it might be beneficial to divide the data into subgroups and create more tables. Lastly, more quality checks are needed, to avoid database lock. It could be a good practise to create queuing in the DAG.

 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 
This is exactly the purpose of the project, hence the project needs Airflow to be introduced and the pipeline to be included in the DAG. However, functions are introduced in the way that they can be easily introduced to Airflow, using smaller time intervals, yet the same sequence.
 
 * The database needed to be accessed by 100+ people.

All users would require IAMs (most likely only read-only role, although if developers and Data Engineers - also writing role), while at the same time, to avoid queuing, Redshift would require much bigger capacity.