In [None]:
from pathlib import Path
import os

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import joblib
import numpy as np
from pydantic import BaseModel, ConfigDict
from dotenv import load_dotenv
import plotly.graph_objects as go
import matplotlib.pyplot as plt
import seaborn as sns
import nilmtk.losses
import enilm.etypes
import enilm.reports.sample_rate
import enilm.appliances
import enilm.datasets
import enilm.seed
import enilm.yaml.config
import enilm.yaml.data
import enilm.yaml.daily
import enilm.yaml.daily.split
import enilm.norm
import enilm.windowing
import enilm.models.torch.utils as utils

In [None]:
load_dotenv()
sns.set_theme()

# Setup Dataset Path

Create `config.yaml` file:

```yaml
datasets:
    paths:
        {sys.platform}:
            {socket.gethostname()}:
                {getpass.getuser()}:
                    REDD: /path/to/REDD.h5
```

Note that the h5 file must be loadable with `nilmtk`.

Then set the `ENILM_CONFIG_FILE_PATH` environment variable to the path of the `config.yaml` file.

# Parameters

In [None]:
%set_env EXP_NAME redd_fridge
exp_name = os.environ["EXP_NAME"]
print(f"exp_name: {exp_name}")

In [None]:
seed = 42
enilm.seed.set(seed)

In [None]:
dataset = enilm.etypes.Datasets.REDD.name
house = 1

sequence_length = 599
batch_size = 512
num_epochs = 25
patience = 6

save_models = True
load_models = True

In [None]:
nilmtk_ds = nilmtk.DataSet(enilm.datasets.get_dataset_path(dataset))
elec = nilmtk_ds.buildings[house].elec
elec.appliances

In [None]:
app_name = 'fridge'
app_elec = enilm.appliances.get_elec(app_name, elec)
app_elec

In [None]:
print(enilm.reports.sample_rate.sample_rate_info(elec))

In [None]:
print(enilm.reports.sample_rate.sample_rate_info(app_elec))

In [None]:
resample_params = enilm.yaml.config.ResampleParams(
    rule="6s", 
    fill="ffill", 
    how="mean",
)

# Cache Setup

In [None]:
cache_folder = Path(os.environ['CACHE_FOLDER'])
print(cache_folder)
assert cache_folder.exists()

In [None]:
cache_dir_path = cache_folder / "nbs" / exp_name
memory_cache_path = cache_dir_path / "joblib"
models_cache_path = cache_dir_path / "models"
exp_json_path = cache_dir_path / "exp.json"

memory = joblib.Memory(memory_cache_path, verbose=1)

# Data

See `/home/mbouchur/shared-projects/ma-embeddedml/enilm/enilm/nbs/preprocessing`

In [None]:
config = enilm.yaml.config.Config(
    dataset=dataset,
    house=house,
    data_path=cache_dir_path,
    selected_physical_quantity="power",
    selected_ac_type=enilm.yaml.config.ACTypes(mains='apparent', apps='active'),
    selected_apps=[app_name],
    selected_gpu=5,
    resample_params=resample_params,
    selected_train_percent=0.8,
    manually_deleted_days=[],
    selected_n_samples_per_day=None,
    n_days=None,
)

## Load

Load data for training and testing using the yaml-config API

In [None]:
%%time

@memory.cache()
def get_data(config):
    raw_data = enilm.yaml.data.raw(config)
    resampled_data = enilm.yaml.data.resample(config)
    overlapping_data = enilm.yaml.data.overlapping(config)
    
    each_day_cleaned = enilm.yaml.daily.clean(config)
    each_day_cleaned_traintest = enilm.yaml.daily.split.train_test(each_day_cleaned, config)
    xy_cleaned = enilm.yaml.daily.split.traintest_xy(each_day_cleaned_traintest)
    
    return raw_data, resampled_data, overlapping_data, each_day_cleaned, each_day_cleaned_traintest, xy_cleaned

raw_data, resampled_data, overlapping_data, each_day_cleaned, each_day_cleaned_traintest, xy_cleaned = get_data(config)

Visualize a day

In [None]:
day_idx = 1

days_dates = list(each_day_cleaned['mains'].keys())
assert day_idx < len(days_dates)

for key in each_day_cleaned.keys():
    plt.plot(each_day_cleaned[key][days_dates[day_idx]], label=key)
plt.title(days_dates[day_idx])
plt.legend()
plt.xticks(rotation=20)
plt.show()

## Normalization

In [None]:
mains_norm_params = enilm.norm.compute(xy_cleaned.train_x)
apps_norm_params = {
    app: enilm.norm.compute(xy_cleaned.train_y[app])
    for app in config.selected_apps
}

# x == mains, y == apps, train:test == split
xy_cleaned_normalized = enilm.yaml.daily.split.XYNP(
    train_x=enilm.norm.normalize(xy_cleaned.train_x, mains_norm_params),
    test_x=enilm.norm.normalize(xy_cleaned.test_x, mains_norm_params),
    train_y={app: enilm.norm.normalize(xy_cleaned.train_y[app], apps_norm_params[app]) for app in config.selected_apps},
    test_y={app: enilm.norm.normalize(xy_cleaned.test_y[app], apps_norm_params[app]) for app in config.selected_apps},
)

## Dataset class

In [None]:
class S2PDataset(Dataset):
    def __init__(
        self,
        mains: np.ndarray,
        appliance: np.ndarray,
        sequence_length: int,
        pad=True,  # whether to pad the sequence or not (e.g. 1, 2, 3, 4, 5, ... -> 0, 0, 1, 2, 3 if pad is True for sequence_length=5 and 1, 2, 3, 4, 5 if pad is False)
        reshape=True,  # whether to reshape the inputs and targets to the expected shape of the model
    ):
        assert sequence_length % 2 == 1
        assert isinstance(mains, np.ndarray)
        assert isinstance(appliance, np.ndarray)
        assert mains.shape == appliance.shape
        self.sequence_length = sequence_length
        self.mains = torch.tensor(mains)
        self.appliance = torch.tensor(appliance)
        self.pad = pad
        self.reshape = reshape

    def __len__(self):
        if self.pad:
            return len(self.mains)
        return len(self.mains) - self.sequence_length + 1

    def _reshape(self, mains, ground_truth):
        return (
            # reshape inputs: add a channel dimension
            mains.reshape(-1, self.sequence_length),
            # reshape targets: since the model outputs a single value per sequence, add the extra singularity dimension
            ground_truth.reshape(
                -1,
            ),
        )

    def __getitem__(self, idx):
        # stop iteration if idx is out of bounds
        if idx == len(self.mains):
            raise StopIteration

        # padding or truncating
        if self.pad:
            mains, gt = (
                utils.get_padded_sequence(self.mains, self.sequence_length, idx),
                self.appliance[idx],
            )
        else:
            if idx > len(self.mains) - self.sequence_length:
                raise StopIteration
            half_seq_len = self.sequence_length // 2
            idx += half_seq_len
            mains, gt = (
                self.mains[idx - half_seq_len : idx + half_seq_len + 1],
                self.appliance[idx],
            )

        # reshape?
        if self.reshape:
            mains, gt = self._reshape(mains, gt)

        return mains, gt

In [None]:
class S2PDatasetMains(Dataset):
    def __init__(
        self,
        mains: np.ndarray,
        sequence_length: int,
        pad=True,  # whether to pad the sequence or not (e.g. 1, 2, 3, 4, 5, ... -> 0, 0, 1, 2, 3 if pad is True for sequence_length=5 and 1, 2, 3, 4, 5 if pad is False)
        reshape=True,  # whether to reshape the inputs and targets to the expected shape of the model
    ):
        assert sequence_length % 2 == 1
        assert isinstance(mains, np.ndarray)
        self.sequence_length = sequence_length
        self.mains = torch.tensor(mains)
        self.pad = pad
        self.reshape = reshape

    def __len__(self):
        if self.pad:
            return len(self.mains)
        return len(self.mains) - self.sequence_length + 1

    def _reshape(self, mains):
        # reshape inputs: add a channel dimension
        return mains.reshape(-1, self.sequence_length)

    def __getitem__(self, idx):
        # stop iteration if idx is out of bounds
        if idx == len(self.mains):
            raise StopIteration

        # padding or truncating
        if self.pad:
            mains = enilm.models.torch.utils.get_padded_sequence(
                self.mains, self.sequence_length, idx
            )
        else:
            if idx > len(self.mains) - self.sequence_length:
                raise StopIteration
            half_seq_len = self.sequence_length // 2
            idx += half_seq_len
            mains = self.mains[idx - half_seq_len : idx + half_seq_len + 1]

        # reshape?
        if self.reshape:
            mains = self._reshape(mains)

        return mains

## Loaders

In [None]:
assert app_name in each_day_cleaned.keys()

train_dataset = S2PDataset(
    mains=xy_cleaned_normalized.train_x,
    appliance=xy_cleaned_normalized.train_y[app_name],
    sequence_length=sequence_length,
    pad=True,
    reshape=True,
)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

val_dataset = S2PDataset(
    mains=xy_cleaned_normalized.test_x,
    appliance=xy_cleaned_normalized.test_y[app_name],
    sequence_length=sequence_length,
    pad=True,
    reshape=True,
)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True)

# Model

## Network

In [None]:
class S2P(nn.Module):
    def __init__(self, seq_len):
        super(S2P, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=1,  out_channels=30, kernel_size=10, stride=1)
        self.conv2 = nn.Conv1d(in_channels=30, out_channels=30, kernel_size=8,  stride=1)
        self.conv3 = nn.Conv1d(in_channels=30, out_channels=40, kernel_size=6,  stride=1)
        self.conv4 = nn.Conv1d(in_channels=40, out_channels=50, kernel_size=5,  stride=1)
        self.drop1 = nn.Dropout(0.2)
        self.conv5 = nn.Conv1d(in_channels=50, out_channels=50, kernel_size=5,  stride=1)
        self.drop2 = nn.Dropout(0.2)
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(
            in_features=((seq_len - self.conv1.out_channels) + 1) * self.conv5.out_channels, 
            out_features=1024,
        )
        # self.fc1 = nn.Linear(50 * ((seq_len - 4*10 - 3*8 - 2*6 - 5*1 + 12) // 1), 1024)
        self.drop3 = nn.Dropout(0.2)
        self.fc2 = nn.Linear(in_features=self.fc1.out_features, out_features=1)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        x = F.relu(self.conv4(x))
        x = self.drop1(x)
        x = F.relu(self.conv5(x))
        x = self.drop2(x)
        x = self.flatten(x)
        x = F.relu(self.fc1(x))
        x = self.drop3(x)
        x = self.fc2(x)
        return x

## Training

### Model

In [None]:
model = S2P(sequence_length)

# Check if CUDA is available
device = torch.device('cpu')
if torch.cuda.is_available():
    device = torch.device(f'cuda:{config.selected_gpu}')
elif torch.backends.mps.is_available():
    device = torch.device('mps')
print(f'Using device: {device}')

# Moving model to device
model = S2P(sequence_length).to(device)

### Loss & Optimizer

In [None]:
# Loss and optimizer
loss_fn = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)

### Losses before training (for comparison)

In [None]:
model.eval()
input_data = np.array(
    enilm.norm.normalize(
        overlapping_data["mains"],
        mains_norm_params,
    )
)
data_loader = DataLoader(
    S2PDatasetMains(
        mains=input_data,
        sequence_length=sequence_length,
        pad=True,
        reshape=True,
    ),
    batch_size=batch_size,
    shuffle=False,
)
preds = []
for inputs in data_loader:
    inputs = inputs.to(torch.float32).to(device)
    preds.append(model(inputs).cpu().detach().numpy().flatten())
preds = np.concatenate(preds)
pr_np: np.ndarray = enilm.norm.denormalize(preds, apps_norm_params[app_name])

gt_np = overlapping_data[app_name].to_numpy()
train_size = int(config.selected_train_percent * gt_np.size)

# MAE
loss_no_training = {}
loss_no_training["mae"] = {}
loss_no_training["mae"]["train"] = float(np.mean(np.abs(pr_np[:train_size] - gt_np[:train_size])))
loss_no_training["mae"]["test"] = float(np.mean(np.abs(pr_np[train_size:] - gt_np[train_size:])))
print(f"Train MAE: {loss_no_training['mae']['train']:.2f} Watts")
print(f"Test MAE: {loss_no_training['mae']['test']:.2f} Watts")
print()

### Train and Validate

In [None]:
%%time

best_loss = float("inf")
best_model_ep = None
patience_ctr = patience
bias_dtype = model.conv1.bias.dtype

# Training loop
loss_history = []
for epoch in range(num_epochs):
    model_ep_path = models_cache_path / f"model_ep{epoch + 1}.pt"
    ep_train_loss_path = model_ep_path.parent / f"train_loss_ep{epoch + 1}.txt"
    ep_val_loss_path = model_ep_path.parent / f"val_loss_ep{epoch + 1}.txt"

    # Create the directory if it does not exist
    if save_models and not models_cache_path.exists():
        models_cache_path.mkdir(parents=True)

    # Load from saved model if available
    if model_ep_path.exists() and load_models:
        print(f"Loading model from epoch {epoch + 1}")
        model.load_state_dict(torch.load(model_ep_path))
        train_loss = float(ep_train_loss_path.read_text())
    # Train for one epoch
    else:
        # Set the model to train mode i.e. gradient tracking is on
        model.train(True)

        train_loss = []
        for step, (inputs, targets) in enumerate(train_loader):
            curr_batch_size = inputs.size(0)

            # move data to GPU
            inputs, targets = inputs.to(bias_dtype).to(device), targets.to(bias_dtype).to(device)

            # Zero gradients for every batch
            optimizer.zero_grad()

            # Forward pass
            outputs = model(inputs)

            # Compute the loss
            loss = loss_fn(outputs, targets)

            # Backpropagation
            loss.backward()

            # Adjust learning weights (parameter updates)
            optimizer.step()

            train_loss.append(loss.item())  # * curr_batch_size
            # By multiplying loss.item() with inputs.size(0), you obtain the total loss for the current batch.
            # The reason for multiplying by inputs.size(0) is to account for variations in batch sizes. If you have
            # a batch size of 32, for example, multiplying the loss by 32 will give you the total loss for that batch.
            # By summing the loss for each batch and keeping track of the total number of samples processed
            # (len(train_loader.dataset)), you can compute the average training loss for the entire dataset by dividing
            # train_loss by the total number of samples

        # Compute the average training loss for the epoch
        train_loss = np.mean(train_loss)

        if save_models:
            # Check if the model already exists
            if model_ep_path.exists():
                print(f"Overwriting model for epoch {epoch + 1}")

            # Save the loss for this epoch
            ep_train_loss_path.write_text(str(train_loss))

            # Save the model
            torch.save(model.state_dict(), model_ep_path)

    # Validation

    # Set the model to evaluation mode i.e. disabling dropout and using population statistics for batch normalization
    model.eval()

    val_loss = 0.0
    if load_models and ep_val_loss_path.exists():
        val_loss = float(ep_val_loss_path.read_text())
    else:
        with torch.no_grad():  # Disable gradient computation and reduce memory consumption
            for inputs, targets in val_loader:
                curr_batch_size = inputs.size(0)

                # move data to GPU
                inputs, targets = inputs.to(bias_dtype).to(device), targets.to(bias_dtype).to(device)

                # Forward pass
                outputs = model(inputs)

                # Compute the loss
                loss = loss_fn(outputs, targets)

                val_loss += loss.item()  # * curr_batch_size

        # Compute the average validation loss for the epoch
        val_loss /= len(val_loader)

        if save_models:
            # Save the loss for this epoch
            ep_val_loss_path.write_text(str(val_loss))

    # Print the loss for each epoch
    if train_loss is not None:
        print(
            f"Epoch {epoch+1}/{num_epochs}: Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}"
        )
    else:
        print(f"Epoch {epoch+1}/{num_epochs}: Val Loss: {val_loss:.4f}")

    # Append to the loss history
    loss_history.append(
        {
            "epoch": epoch + 1,
            "val": val_loss,
            "train": train_loss,
        }
    )

    # Early stopping
    if val_loss < best_loss:
        best_loss = val_loss
        best_model_ep = epoch
        patience_ctr = patience  # reset patience counter
    else:
        patience_ctr -= 1
        if patience_ctr == 0:
            if best_model_ep is None:
                print(
                    "Early stopping before any model was saved! "
                    "-> continuing to next epoch even if val loss is increasing "
                    "in the hope that it will decrease again"
                )
                continue
            else:
                best_mode_path = models_cache_path / f"model_ep{best_model_ep + 1}.pt"
                model.load_state_dict(torch.load(best_mode_path))
                print("Early stopping at epoch", epoch + 1)
                print("Loading best model from epoch", best_model_ep + 1)
                break

Loss History

In [None]:
loss_history = pd.DataFrame(loss_history)
sns.lineplot(data=loss_history, x='epoch', y='train', label='Train loss')
sns.lineplot(data=loss_history, x='epoch', y='val', label='Validation loss')

### Losses after training (for comparison)

In [None]:
model.eval()
input_data = np.array(
    enilm.norm.normalize(
        overlapping_data["mains"],
        mains_norm_params,
    )
)
data_loader = DataLoader(
    S2PDatasetMains(
        mains=input_data,
        sequence_length=sequence_length,
        pad=True,
        reshape=True,
    ),
    batch_size=batch_size,
    shuffle=False,
)
preds = []
for inputs in data_loader:
    inputs = inputs.to(torch.float32).to(device)
    preds.append(model(inputs).cpu().detach().numpy().flatten())
preds = np.concatenate(preds)
pr_np: np.ndarray = enilm.norm.denormalize(preds, apps_norm_params[app_name])

gt_np = overlapping_data[app_name].to_numpy()
train_size = int(config.selected_train_percent * gt_np.size)

# MAE
loss = {}
loss["mae"] = {}
loss["mae"]["train"] = float(np.mean(np.abs(pr_np[:train_size] - gt_np[:train_size])))
loss["mae"]["test"] = float(np.mean(np.abs(pr_np[train_size:] - gt_np[train_size:])))
print(
    f"Train MAE: {loss['mae']['train']:.2f} Watts (before: {loss_no_training['mae']['train']:.2f} Watts)"
)
print(
    f"Test MAE: {loss['mae']['test']:.2f} Watts (before: {loss_no_training['mae']['test']:.2f} Watts)"
)

### Number of parameters

In [None]:
num_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Number of trainable parameters: {num_params:,}")

### Predictions

In [None]:
# generate predictions for a single day
day_idx = 3
test_days_keys = list(each_day_cleaned_traintest.test['mains'])
assert day_idx < len(test_days_keys)
day_date = test_days_keys[day_idx]

day_loader = DataLoader(S2PDataset(
    mains=enilm.norm.normalize(each_day_cleaned_traintest.test['mains'][day_date].to_numpy(), mains_norm_params),
    appliance=enilm.norm.normalize(each_day_cleaned_traintest.test[app_name][day_date].to_numpy(), apps_norm_params[app_name]),
    sequence_length=sequence_length,
    reshape=True,
    pad=True,
), batch_size=batch_size, shuffle=False)

preds = []
best_preds = []
for inputs, _ in day_loader:
    preds.append(model(inputs.to(torch.float32)))

preds_np = [p.data.numpy() for p in preds]
preds_flat = np.concatenate(preds_np).flatten()
preds_flat_denorm = enilm.norm.denormalize(preds_flat, apps_norm_params[app_name])

In [None]:
fig = go.Figure()

# plot mains
fig.add_scatter(
    x=each_day_cleaned_traintest.test["mains"][day_date].index,
    y=each_day_cleaned_traintest.test["mains"][day_date].values,
    name='mains',
)

# plot ground truth
fig.add_scatter(
    x=each_day_cleaned_traintest.test[app_name][day_date].index,
    y=each_day_cleaned_traintest.test[app_name][day_date].values,
    name='ground truth',
)

# plot predictions
fig.add_scatter(
    x=each_day_cleaned_traintest.test[app_name][day_date].index,
    y=preds_flat_denorm,
    name='predictions last',
)

fig.show()

# Export Exp to JSON

In [None]:
# src/backend/types/exp.py

class Exp(BaseModel):
    model_config = ConfigDict(protected_namespaces=())
    dataset: str  # enilm.etypes.DatasetID
    house: enilm.etypes.HouseNr
    app: enilm.etypes.AppName
    exp_name: str
    app_norm_params: enilm.norm.NormParams
    mains_norm_params: enilm.norm.NormParams
    selected_ac_type: enilm.yaml.config.ACTypes
    resample_params: enilm.yaml.config.ResampleParams
    on_power_threshold: float = 6.0  # watts
    description: str = ""


class ModelExp(Exp):  # Inherit from DataExp
    selected_model_weights: str
    sequence_length: int
    selected_train_percent: float
    batch_size: int
    num_epochs: int
    model_name: str
    model_class: str = "enilm.models.torch.seq.S2P"
    ds_class: str = "enilm.models.torch.seq.S2PDatasetMains"

if 'best_mode_path' not in globals():
    print("Using last model as the best model since early stopping did not trigger")
    best_mode_path = Path(models_cache_path / f"model_ep{num_epochs}.pt")

exp = ModelExp(
    dataset=dataset,
    house=house,
    app=app_name,
    exp_name=exp_name,
    app_norm_params=apps_norm_params[app_name],
    mains_norm_params=mains_norm_params,
    selected_model_weights=best_mode_path.name,
    sequence_length=sequence_length,
    selected_ac_type=config.selected_ac_type,
    resample_params=config.resample_params,
    selected_train_percent=config.selected_train_percent,
    batch_size=batch_size,
    num_epochs=num_epochs,
    model_name='S2P',
    description='',
)
exp_json_path.write_text(exp.model_dump_json(indent=2))