In [None]:
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
from pytorch_tcn import TCN
import os
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.metrics import precision_recall_curve, f1_score, accuracy_score
from tqdm import tqdm

In [6]:
class ElevatorTCNModel(nn.Module):
    def __init__(self, input_channels, output_size, num_channels=[64, 64, 64], kernel_size=3, dropout=0.1):
        super().__init__()
        self.tcn = TCN(num_inputs=input_channels,
                       num_channels=num_channels,
                       kernel_size=kernel_size,
                       dropout=dropout,
                       causal=True)
        self.linear = nn.Linear(num_channels[-1], output_size)

    def forward(self, x):
        # x shape: (batch_size, seq_len, input_channels)  
        # 但 PyTorch-TCN 默认期望 (batch, channels, length),因此需要转置
        x = x.transpose(1, 2)  # -> (batch, input_channels, seq_len)
        y = self.tcn(x)        # -> (batch, num_channels[-1], seq_len)
        # 取最后一个 time step’s feature map
        out = self.linear(y[:, :, -1])  # -> (batch, output_size)
        return out

class ElevatorCallsDataset(Dataset):
    def __init__(self, df, input_len=60*60, gap = 30 ,output_window=60,downsample_seconds = 60):
        """
        df: pandas DataFrame with time series data (按时间排序,频次例如每秒／每分钟)
        input_len: 用多少时间步 (window length) 作为输入
        gap: 输入和输出之间的时间间隔（例如30表示预测输入和输出之间有30个秒的间隔）
        output_window: 预测多少步之后 (例如 60 表示预测下一分钟)
        feature_cols: list of feature列名 (包含楼层 call & direction one-hot + optional 时间特征)
        target_cols: list of target 列名 (未来是否有 call）
        """
        self.df = df.reset_index(drop=True)
        self.data = self.df.values
        self.input_len = input_len
        self.gap = gap
        self.output_window = output_window

        self.downsample_seconds = downsample_seconds

        self.total_length = len(self.data) - input_len - gap - output_window + 1
        self.total_length = max(self.total_length, 0)
            
    
    def __len__(self):
        return self.total_length
    
    def __getitem__(self, idx):
        input_window = self.data[idx:idx + self.input_len]
    
        x = []
        for i in range(0, self.input_len, self.downsample_seconds):
            block = input_window[i : i + self.downsample_seconds]
            x.append(block.sum(axis=0))
    
        x = np.stack(x).astype(np.float32)
    
        output_window = self.data[
            idx + self.input_len + self.gap - 1:
            idx + self.input_len + self.gap + self.output_window - 1, 3:]
        
        y = (output_window.sum(axis=0) > 0).astype(np.float32)
    
        return torch.from_numpy(x), torch.from_numpy(y)
    
def search_best_thresholds(all_probs, all_labels):
    num_labels = all_probs.shape[1]
    best_thresholds = np.zeros(num_labels)

    for i in range(num_labels):
        y_true = all_labels[:, i]
        y_prob = all_probs[:, i]

        if y_true.sum() == 0:
            best_thresholds[i] = 0.5
            continue

        precision, recall, thresholds = precision_recall_curve(y_true, y_prob)
        f1 = 2 * precision * recall / (precision + recall + 1e-9)

        idx = f1.argmax()
        if idx >= len(thresholds):
            best_thresholds[i] = 0.5
        else:
            best_thresholds[i] = thresholds[idx]

    return best_thresholds

In [7]:
dir = os.getcwd()

model_dir = os.path.join(dir, 'best_model')

testset = pd.read_csv(os.path.join(dir, 'testset.csv'))

test_dataset = ElevatorCallsDataset(testset, input_len=60*60, gap=30, output_window=60)
test_loader = DataLoader(test_dataset, batch_size= 128*2 , shuffle=False, num_workers =4)

In [None]:
for model_path in os.listdir(model_dir):
    if model_path.endswith('.pth'):
        full_model_path = os.path.join(model_dir, model_path)
        # load model
        model = ElevatorTCNModel(input_channels=len(testset.columns), output_size=len(testset.columns)-3)
        model.load_state_dict(torch.load(full_model_path, map_location=torch.device('cpu')))
        model.eval()
        
        # threshold search
        all_probs = []
        all_labels = []
        with torch.no_grad():
            for x_batch, y_batch in test_loader:
                outputs = model(x_batch)
                probs = torch.sigmoid(outputs).numpy()
                all_probs.append(probs)
                all_labels.append(y_batch.numpy())
                
        all_probs = np.vstack(all_probs)
        all_labels = np.vstack(all_labels)
        best_thresholds = search_best_thresholds(all_probs, all_labels)
        
        # save thresholds
        threshold_path = os.path.join(model_dir, model_path.replace('.pth', '_thresholds.npy'))
        np.save(threshold_path, best_thresholds)

  model.load_state_dict(torch.load(full_model_path, map_location=torch.device('cpu')))
