## Test data preparation

In [9]:
import os

In [2]:
%pwd

'c:\\Tom\\HKA\\7_Semester\\Domänenprojekt_2\\DoPro'

In [3]:
os.chdir("../")

In [3]:
%pwd

'c:\\Tom\\HKA\\7_Semester\\Domänenprojekt_2\\DoPro'

### Data Preparation config class (entity)

In [1]:
from dataclasses import dataclass
from pathlib import Path

@dataclass(frozen=True)
class DataPreparationConfig:
    """Entity-Class for data preparation config params."""

    root_dir: Path
    """Directory into which data will be loaded."""

    weather_data_path: Path
    """Directory where raw weather data is stored."""

    energy_data_path: Path
    """Directory where raw energy data is stored."""

    training_data_path: Path
    """Directory into which training data will be loaded."""

    test_data_path: Path
    """Directory into which test data will be loaded."""

### update config manager class

In [3]:
from src.dopro2_HEFTcom_challenge.constants import PARAMS_FILE_PATH, CONFIG_FILE_PATH
import yaml
from loguru import logger

In [4]:
class ConfigurationManager:
    """Class to manage all configurations."""

    def __init__(
        self,
        config_filepath: Path = CONFIG_FILE_PATH,
        params_filepath: Path = PARAMS_FILE_PATH
    ) -> None:
        """
        Constructor for ConfigurationManager Class.
        Creates artifacts folder.

        :param config_filepath: Path to config.yaml file
        :param params_filepath: Path to params.yaml file

        """
        with config_filepath.open("r") as f:
            self.config: dict = yaml.safe_load(f)

        # with params_filepath.open("r") as f:
        #     self.params: dict = yaml.safe_load(f)

        os.makedirs(self.config["artifacts_root"], exist_ok=True)
        logger.info("created directory at: {}", self.config["artifacts_root"])

    def get_data_preparation_config(self) -> DataPreparationConfig:
        """
        Get all config params and create folder in artifacts dir.
        
        :return: values from config.yaml
        :rtype: DataPreparationConfig
        """
        config = self.config["data_preparation"]

        os.makedirs(config["root_dir"], exist_ok=True)
        logger.info("created directory at: {}", config["root_dir"])

        data_preparation_config = DataPreparationConfig(
            root_dir=config["root_dir"],
            weather_data_path=config["weather_data_path"],
            energy_data_path=config["energy_data_path"],
            training_data_path=config["training_data_path"],
            test_data_path=config["test_data_path"]
        )

        return data_preparation_config

### create data preparation component

In [25]:
import pandas as pd
import xarray as xr
import numpy as np

In [29]:
class DataPreparation:
    """Class to performe data preparation."""

    def __init__(self, config: DataPreparationConfig) -> None:
        """
        Constructor for DataPreparation class.

        :param config: config values from config.yaml
        """

        self.config = config

    def cleaning_energy_data(self) -> None:
        # TODO: handling missing values, outliers, inconsistencies
        logger.info("Start cleaning energy data")
        energy_files = Path(self.config.energy_data_path).glob("*.csv")
        df_raw = pd.concat(
            (pd.read_csv(f) for _, f in enumerate(energy_files)), 
            ignore_index=True
        )
        df = (df_raw
            .assign(dtm=pd.to_datetime(df_raw["dtm"]),
                    Wind_MWh_credit=0.5*df_raw["Wind_MW"] - df_raw["boa_MWh"],
                    Solar_MWh_credit=0.5*df_raw["Solar_MW"]
                    )
        )
        # TODO: Split into wind and solar
        df.to_parquet(f"{self.config.root_dir}/energy_processed.parquet")
        logger.info("Cleaned energy data: file safed under {}", self.config.root_dir)

    def cleaning_weather_data(self) -> None:
        logger.info("Start cleaning weather data")
        # weather_files = Path(self.config.weather_data_path).glob("*.nc")
        dwd_hornsea = xr.open_dataset("artifacts/raw_data/weather/dwd_icon_eu_hornsea_1_20200920_20231027.nc", engine="h5netcdf")
        dwd_hornsea_df = dwd_hornsea["WindSpeed:100"].mean(dim=['latitude', 'longitude']).to_dataframe().reset_index()
        dwd_hornsea_df = (dwd_hornsea_df
                  .assign(ref_datetime=dwd_hornsea_df["ref_datetime"].dt.tz_localize("UTC"),
                          valid_datetime=(dwd_hornsea_df["ref_datetime"] + 
                          pd.to_timedelta(dwd_hornsea_df["valid_datetime"], unit="hours")).dt.tz_localize("UTC")
                         )
        )
        dwd_hornsea_df.to_parquet(f"{self.config.root_dir}/dwd_hornsea_processed.parquet")
        logger.info("Cleaned dwd hornsea data: file safed under {}", self.config.root_dir)

        dwd_solar = xr.open_dataset("artifacts/raw_data/weather/dwd_icon_eu_pes10_20200920_20231027.nc", engine="h5netcdf")
        dwd_solar_df = dwd_solar["SolarDownwardRadiation"].mean(dim="point").to_dataframe().reset_index()
        dwd_solar_df = (dwd_solar_df
                    .assign(ref_datetime=dwd_solar_df["ref_datetime"].dt.tz_localize("UTC"),
                            valid_datetime=(dwd_solar_df["ref_datetime"] + 
                            pd.to_timedelta(dwd_solar_df["valid_datetime"], unit="hours")).dt.tz_localize("UTC")
                           )
        )
        dwd_solar_df.to_parquet(f"{self.config.root_dir}/dwd_solar_processed.parquet")
        logger.info("Cleaned dwd solar data: file safed under {}", self.config.root_dir)

    def merge_data(self) -> None:
        logger.info("Start merging energy and weather data")
        processed_files = Path(self.config.root_dir).glob("*.parquet")
        dfs = []
        for file in processed_files:
            df = pd.read_parquet(file)
            dfs.append(df)
        hornsea, solar, energy = dfs
        merged_table = (hornsea
                   .merge(solar, how="outer", on=["ref_datetime", "valid_datetime"])
                   .set_index("valid_datetime")
                   .groupby("ref_datetime")
                   .resample("30T")
                   .interpolate("linear")
                   .drop(columns="ref_datetime", axis=1)
                   .reset_index()
                   .merge(energy, how="inner", left_on="valid_datetime", right_on="dtm")
        )
        merged_table = merged_table[merged_table["valid_datetime"] - merged_table["ref_datetime"] < np.timedelta64(50,"h")]
        merged_table.rename(columns={"WindSpeed:100":"WindSpeed"},inplace=True)
        merged_table.to_parquet(f"{self.config.root_dir}/merged_data.parquet")
        logger.info("Merged energy and weather data: file safed under {}", self.config.root_dir)



    def transform_data(self) -> None:
        # TODO: feature scaling, encoding, ...
        logger.info("Start transforming data for modell training")
        merged_data = pd.read_parquet("artifacts/prepared_data/merged_data.parquet")
        model_data = merged_data[merged_data["SolarDownwardRadiation"].notnull()]
        model_data = merged_data[merged_data["WindSpeed"].notnull()]
        model_data["total_generation_MWh"] = model_data["Wind_MWh_credit"] + model_data["Solar_MWh_credit"]
        model_data.to_parquet(f"{self.config.root_dir}/model_data.parquet")
        logger.info("Data ready to train the model: file safed under {}", self.config.root_dir)


    # def reduce_data(self):

    def splitting_data(self):
        ...
        # TODO: training, validation, test sets

In [31]:
try:
    config = ConfigurationManager()
    data_preparation_config = config.get_data_preparation_config()
    data_preparation = DataPreparation(config=data_preparation_config)
    data_preparation.cleaning_energy_data()
    data_preparation.cleaning_weather_data()
    data_preparation.merge_data()
    data_preparation.transform_data()
except Exception as e:
    raise e

[32m2024-10-10 14:50:16.875[0m | [1mINFO    [0m | [36m__main__[0m:[36m__init__[0m:[36m24[0m - [1mcreated directory at: artifacts[0m
[32m2024-10-10 14:50:16.877[0m | [1mINFO    [0m | [36m__main__[0m:[36mget_data_preparation_config[0m:[36m36[0m - [1mcreated directory at: artifacts/prepared_data[0m
[32m2024-10-10 14:50:16.878[0m | [1mINFO    [0m | [36m__main__[0m:[36mcleaning_energy_data[0m:[36m15[0m - [1mStart cleaning energy data[0m
[32m2024-10-10 14:50:17.205[0m | [1mINFO    [0m | [36m__main__[0m:[36mcleaning_energy_data[0m:[36m29[0m - [1mCleaned energy data: file safed under artifacts/prepared_data[0m
[32m2024-10-10 14:50:17.214[0m | [1mINFO    [0m | [36m__main__[0m:[36mcleaning_weather_data[0m:[36m32[0m - [1mStart cleaning weather data[0m
[32m2024-10-10 14:50:18.240[0m | [1mINFO    [0m | [36m__main__[0m:[36mcleaning_weather_data[0m:[36m43[0m - [1mCleaned dwd hornsea data: file safed under artifacts/prepared_data[