In [3]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [None]:

import numpy as np
import pandas as pd
import optuna
import torch
import torch.nn as nn
import torch.optim as optim
from torch.jit import ScriptModule, script_method
from tqdm import tqdm
from pathlib import Path
from torch.nn import Parameter as TorchParam
from torch import Tensor
from typing import List, Tuple
from joblib import parallel_backend


In [5]:
device = torch.device("cpu")

In [10]:
input_cols = ['nEmFild100ms.Rec_10ms_Fild.pp_rbe_CddAgEm_10ms_Fild.rbe_CddRslvr',
                'tqEmFild100ms.Rec_10ms_Fild.pp_rbe_Mct_10ms_Fild.rbe_MctAsm',
                'iDaFild10ms.Rec_2ms_Fild.rp_rbe_CddIPha_2ms_Fild.rbe_MctAsm',
                'iQaFild10ms.Rec_2ms_Fild.rp_rbe_CddIPha_2ms_Fild.rbe_MctAsm',
                'uDaFundaFild100ms.Rec_10ms_Fild.pp_rbe_Mct_10ms_Fild.rbe_MctAsm',
                'uQaFundaFild100ms.Rec_10ms_Fild.pp_rbe_Mct_10ms_Fild.rbe_MctAsm',
                'R_CW_tCooltIvtrOut',
                'tEmSnsrFild10ms.Rec_2ms_Fild.rp_rbe_CddTEm_2ms_Fild.rbe_TMdlEm']

target_cols = ['pm','stator_winding','R_EM_tTMotRshaftOilIn']
temperature_cols = target_cols + ['tEmSnsrFild10ms.Rec_2ms_Fild.rp_rbe_CddTEm_2ms_Fild.rbe_TMdlEm','R_CW_tCooltIvtrOut']

In [6]:
train_tensor = torch.from_numpy(np.load("train_tensor.npy")).to(device)
train_sample_weights = torch.from_numpy(np.load("train_sample_weights.npy")).to(device)
test_tensor = torch.from_numpy(np.load("test_tensor.npy")).to(device)
test_sample_weights = torch.from_numpy(np.load("test_sample_weights.npy")).to(device)

In [7]:
# Hyper parameters optimization 
# 自定义正弦激活层
class SinusLayer(nn.Module):
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return torch.sin(x)
    
def smooth_abs(x: torch.Tensor, epsilon: float = 1e-6) -> torch.Tensor:
    return torch.sqrt(x**2 + epsilon)

In [8]:
# 激活函数映射
def get_activation(activation_name: str) -> nn.Module:
    activation_dict = {
        "Sigmoid": nn.Sigmoid(),
        "tanh": nn.Tanh(),
        "linear": nn.Identity(),
        "ReLU": nn.ReLU(),
        "biased Elu": nn.ELU(alpha=1.0),
        "sinus": SinusLayer()
    }
    return activation_dict[activation_name]

In [13]:
# TNNCell定义（支持动态结构和激活函数）
class TNNCell(nn.Module):
    def __init__(self, cond_net_layers, cond_net_units, cond_activations, 
                 ploss_net_layers, ploss_net_units, ploss_activations):
        super().__init__()
        self.sample_time = 0.5
        self.output_size = len(target_cols)
        self.caps = nn.Parameter(torch.Tensor(self.output_size))
        nn.init.normal_(self.caps, mean=-9.2, std=0.5)
        
        n_temps = len(temperature_cols)
        n_conds = int(0.5 * n_temps * (n_temps - 1))
        
        # 动态构建conductance_net
        cond_layers = []
        input_dim = len(input_cols) + self.output_size
        for i, units in enumerate(cond_net_units):
            cond_layers.append(nn.Linear(input_dim, units))
            # 使用当前层的激活函数
            cond_layers.append(get_activation(cond_activations[i]))
            input_dim = units

        cond_layers.append(nn.Linear(input_dim, n_conds))
        cond_layers.append(nn.Sigmoid())
        self.conductance_net = nn.Sequential(*cond_layers)
    
        # 动态构建ploss_net
        ploss_layers = []
        input_dim = len(input_cols) + self.output_size
        for i, units in enumerate(ploss_net_units):
            ploss_layers.append(nn.Linear(input_dim, units))
            if i < len(ploss_net_units) - 1:
                ploss_layers.append(get_activation(ploss_activations[i]))
            input_dim = units
        ploss_layers.append(nn.Linear(input_dim, self.output_size))
        self.ploss = nn.Sequential(*ploss_layers)
        
        # 其余初始化代码
        self.adj_mat = np.zeros((n_temps, n_temps), dtype=int)
        triu_idx = np.triu_indices(n_temps, 1)
        adj_idx_arr = np.ones_like(self.adj_mat)
        adj_idx_arr = adj_idx_arr[triu_idx].ravel()
        self.adj_mat[triu_idx] = np.cumsum(adj_idx_arr) - 1
        self.adj_mat += self.adj_mat.T
        self.adj_mat = torch.from_numpy(self.adj_mat[:self.output_size, :]).type(torch.int64)
        self.temp_idcs = [i for i, x in enumerate(input_cols) if x in temperature_cols]
        self.nontemp_idcs = [i for i, x in enumerate(input_cols) if x not in temperature_cols + ["profile_id"]]

    def forward(self, inp: Tensor, hidden: Tensor) -> Tuple[Tensor, Tensor]:
        prev_out = hidden
        temps = torch.cat([prev_out, inp[:, self.temp_idcs]], dim=1)
        sub_nn_inp = torch.cat([inp, prev_out], dim=1)
        # conducts = torch.abs(self.conductance_net(sub_nn_inp))
        conducts = self.conductance_net(sub_nn_inp)
        # power_loss = torch.abs(self.ploss(sub_nn_inp))
        power_loss = smooth_abs(self.ploss(sub_nn_inp))
        temp_diffs = torch.sum(
            (temps.unsqueeze(1) - prev_out.unsqueeze(-1)) * conducts[:, self.adj_mat],
            dim=-1,
        )
        out = prev_out + self.sample_time * torch.exp(self.caps) * (temp_diffs + power_loss)
        return prev_out, torch.clip(out, -1, 5)