In [None]:
import pandas as pd
import duckdb as ddb

## Data Extraction and Aggregation

Due to the volume of data found in `Data.csv` I found it easier to operate on the data using DuckDB. At a high level I wanted a location from which I could easily extract data as and when I needed, as well as store results without flooding my RAM.

The steps taken are:
- Creating a DuckDB instance
- Importing data from the `Data.csv`, `Kalam Climate Data.xlsx` and `SampleSubmission.csv` files
- Aggregate the raw data (hydropower data and climate data) into a daily granularity tables
- Use the database as and when needed to store data

In [None]:
# Create the database and two schemas
con = ddb.connect("./kalam_hydropower.db")

try:
    con.sql("create schema 'raw';")
    con.sql("create schema 'prepared';")
except ddb.CatalogException as e:
    print(f"Schemas already exist.")

In [None]:
# Check the datatypes etc before loading - note that v_blue and v_yellow are detected as varchars and must be explicitly cast to doubles
con.sql("select * from read_csv('./data/Data.csv') limit 10;")

In [None]:
# Create a table ignoring the columns consumer device 9 etc - these are assumed to be errors
try:
    con.sql("""
        create table raw.hydropower_production as
            select date_time, Source as source, v_red, cast(v_blue as double) as v_blue, cast(v_yellow as double) as v_yellow, current, kwh
            from read_csv('./data/Data.csv');
    """)
except ddb.CatalogException as e:
    print(f"Table already exists: {e}")

In [None]:
# This is a package needed to read excel natively in DDB
con.sql("INSTALL excel; LOAD excel")

con.sql("select * from './data/Climate Data/Kalam Climate Data.xlsx' limit 10")

In [None]:
# Rename climate column names as the unusual characters make things difficult
try:
    con.sql("""
        create table raw.climate as
            select "Date Time" as date_time, "Temperature (°C)" as temperature, "Dewpoint Temperature (°C)" as dewpoint_temperature, "U Wind Component (m/s)" as u_wind_component, "V Wind Component (m/s)" as v_wind_component, "Total Precipitation (mm)" as total_precipitation, "Snowfall (mm)" as snowfall, "Snow Cover (%)" as snow_cover_perc
            from './data/Climate Data/Kalam Climate Data.xlsx'
    """)
except ddb.CatalogException as e:
    print(f"Table already exists: {e}")

In [None]:
con.sql("select * from raw.climate limit 10")

In [None]:
try:
    con.sql("""
        create table raw.sample_submission as
            select * from read_csv('./data/SampleSubmission.csv')
    """)
except ddb.CatalogException as e:
    print(f"Table already exists: {e}")

In [None]:
con.sql("select * from raw.sample_submission limit 10")

### Aggregation
The main goal here is to make the tables `raw.hydropower_production` and `raw.climate` exist at a daily granularity, this is easier to work with in pandas and the forecasting exercise/validation occurs on a daily level rather than at 5 minute or 1 hour intervals.

In [None]:
# Aggregate the power production data
try:
    con.sql("""
    create table prepared.daily_hydropower_production as
            with temp as (
                select
                    *,
                    cast(date_time as date) as date,
                    regexp_extract(source, 'consumer_device_(\d+)', 1) as consumer_device,
                    regexp_extract(source, '_data_user_(\d+)', 1) as data_user
                from raw.hydropower_production
            )
            select
                date,
                source,
                consumer_device,
                data_user,
                sum(kwh) as kwh
            from temp
            group by date, source, consumer_device, data_user
            order by source, date
    """)
except ddb.CatalogException as e:
    print(f"Table already exists: {e}")

In [None]:
con.sql("select * from prepared.daily_hydropower_production limit 10")

In [None]:
# Similarly aggregate the climate data
try:
    con.sql("""
        create table prepared.daily_climate as (
        select
            cast(date_time as date) as date,
            avg(temperature) as avg_temperature,
            avg(dewpoint_temperature) as avg_dewpoint_temperature,
            avg(u_wind_component) as avg_u_wind_component,
            avg(v_wind_component) as avg_v_wind_component,
            sum(total_precipitation) as total_precipitation,
            sum(snowfall) as total_snowfall,
            avg(snow_cover_perc) as avg_snow_cover_perc
        from raw.climate
        group by cast(date_time as date)
    )
    """)
except ddb.CatalogException as e:
    print(f"Table already exists: {e}")

In [None]:
con.sql("select * from prepared.daily_climate limit 10")

In [None]:
con.close()