In [1]:
# Import necessary libraries
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import (
    accuracy_score,
    precision_score,
    recall_score,
    f1_score,
    confusion_matrix,
)
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import os

In [10]:
# Set random seed for reproducibility
torch.manual_seed(42)
np.random.seed(42)

# Define a custom Dataset for the Iris data
class IrisDataset(Dataset):
    def __init__(self, features, labels):
        """
        Initializes the dataset with features and labels.

        Args:
            features (numpy.ndarray): Feature matrix.
            labels (numpy.ndarray): Label vector.
        """
        self.X = torch.tensor(features, dtype=torch.float32)
        self.y = torch.tensor(labels, dtype=torch.long)

    def __len__(self):
        """
        Returns the total number of samples.
        """
        return len(self.y)

    def __getitem__(self, idx):
        """
        Retrieves the feature and label at the specified index.

        Args:
            idx (int): Index of the sample to retrieve.

        Returns:
            tuple: (feature, label) pair.
        """
        return self.X[idx], self.y[idx]

# Define the neural network model
class IrisNet(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes):
        """
        Initializes the neural network layers.

        Args:
            input_size (int): Number of input features.
            hidden_size (int): Number of neurons in the hidden layer.
            num_classes (int): Number of output classes.
        """
        super(IrisNet, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)  # First fully connected layer
        self.relu = nn.ReLU()                          # Activation function
        self.fc2 = nn.Linear(hidden_size, num_classes) # Output layer

    def forward(self, x):
        """
        Defines the forward pass of the network.

        Args:
            x (torch.Tensor): Input tensor.

        Returns:
            torch.Tensor: Output logits.
        """
        out = self.fc1(x)
        out = self.relu(out)
        out = self.fc2(out)
        return out

def evaluate_pt_model(model_path, X_test, y_test, scaler=None, device=None, visualize=True):
    """
    Evaluates a PyTorch .pt model on the provided test data.

    Args:
        model_path (str): Path to the saved .pt model file.
        X_test (numpy.ndarray): Test feature matrix.
        y_test (numpy.ndarray): True labels for the test data.
        scaler (sklearn.preprocessing.StandardScaler, optional): Scaler used to preprocess the data.
            If provided, it will be used to transform the test data.
        device (torch.device, optional): Device to perform computation on.
            If not provided, it will be set to CUDA if available, else CPU.
        visualize (bool, optional): Whether to visualize the confusion matrix.

    Returns:
        dict: A dictionary containing evaluation metrics.
    """
    # Set device
    if device is None:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f'Using device: {device}')

    # Load the Iris dataset for defining the model architecture
    # This assumes that the model was trained on the Iris dataset
    iris = load_iris()
    input_size = X_test.shape[1]    # Number of features
    hidden_size = 16                # Must match the hidden size used during training
    num_classes = len(np.unique(y_test))  # Number of classes

    # Initialize the model and load state_dict
    model = IrisNet(input_size, hidden_size, num_classes).to(device)
    if not os.path.exists(model_path):
        raise FileNotFoundError(f"The specified model path does not exist: {model_path}")
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.eval()  # Set model to evaluation mode
    print(f'Model loaded from {model_path}')

    # Preprocess the test data
    if scaler is not None:
        X_test = scaler.transform(X_test)
    test_dataset = IrisDataset(X_test, y_test)
    test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)

    # Make predictions
    all_predictions = []
    all_labels = []

    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs = inputs.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            all_predictions.extend(preds.cpu().numpy())
            all_labels.extend(labels.numpy())

    # Calculate evaluation metrics
    accuracy = accuracy_score(all_labels, all_predictions)
    precision = precision_score(all_labels, all_predictions, average='weighted', zero_division=0)
    recall = recall_score(all_labels, all_predictions, average='weighted', zero_division=0)
    f1 = f1_score(all_labels, all_predictions, average='weighted', zero_division=0)
    conf_matrix = confusion_matrix(all_labels, all_predictions)

    # Print evaluation metrics
    print('\nEvaluation Metrics:')
    print(f'Accuracy : {accuracy:.4f}')
    print(f'Precision: {precision:.4f}')
    print(f'Recall   : {recall:.4f}')
    print(f'F1 Score : {f1:.4f}')
    print('Confusion Matrix:')
    print(conf_matrix)

    # Visualize the confusion matrix
    if visualize:
        plt.figure(figsize=(6,5))
        sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues',
                    xticklabels=iris.target_names,
                    yticklabels=iris.target_names)
        plt.xlabel('Predicted Label')
        plt.ylabel('True Label')
        plt.title('Confusion Matrix')
        plt.show()

    # Return metrics as a dictionary
    metrics = {
        'Accuracy': accuracy,
        'Precision': precision,
        'Recall': recall,
        'F1 Score': f1,
        'Confusion Matrix': conf_matrix
    }

    return metrics

def main():
    """
    Main function to demonstrate the evaluation of a saved .pt model.
    """
    # Load the Iris dataset
    iris = load_iris()
    X = iris.data
    y = iris.target

    # Standardize the features
    scaler = StandardScaler()
    X = scaler.fit_transform(X)

    # Split the data into training and test sets (80% train, 20% test)
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42, stratify=y
    )

    # Define model parameters
    input_size = X.shape[1]    # Number of features
    hidden_size = 16           # Number of neurons in hidden layer
    num_classes = len(np.unique(y))  # Number of classes

    # Initialize the model, loss function, and optimizer
    model = IrisNet(input_size, hidden_size, num_classes)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

    # Create Dataset and DataLoader for training
    train_dataset = IrisDataset(X_train, y_train)
    train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)

    # Training parameters
    num_epochs = 50
    best_val_accuracy = 0.0
    save_path = 'iris_model.pt'

    # For simplicity, we'll use a portion of training data as validation
    # Split training data into actual training and validation sets
    X_train_actual, X_val, y_train_actual, y_val = train_test_split(
        X_train, y_train, test_size=0.2, random_state=42, stratify=y_train
    )
    train_actual_dataset = IrisDataset(X_train_actual, y_train_actual)
    train_actual_loader = DataLoader(train_actual_dataset, batch_size=16, shuffle=True)
    val_dataset = IrisDataset(X_val, y_val)
    val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)

    # Training loop
    for epoch in range(num_epochs):
        model.train()  # Set model to training mode
        running_loss = 0.0
        for inputs, labels in train_actual_loader:
            optimizer.zero_grad()           # Zero the parameter gradients
            outputs = model(inputs)         # Forward pass
            loss = criterion(outputs, labels)  # Compute loss
            loss.backward()                 # Backward pass
            optimizer.step()                # Update weights
            running_loss += loss.item() * inputs.size(0)

        epoch_loss = running_loss / len(train_actual_loader.dataset)

        # Validate the model
        model.eval()  # Set model to evaluation mode
        all_predictions = []
        all_labels_val = []
        with torch.no_grad():
            for inputs, labels in val_loader:
                outputs = model(inputs)
                _, preds = torch.max(outputs, 1)
                all_predictions.extend(preds.numpy())
                all_labels_val.extend(labels.numpy())

        val_accuracy = accuracy_score(all_labels_val, all_predictions)

        print(
            f'Epoch [{epoch+1}/{num_epochs}], '
            f'Train Loss: {epoch_loss:.4f}, '
            f'Validation Accuracy: {val_accuracy:.4f}'
        )

        # Save the model if validation accuracy improves
        if val_accuracy > best_val_accuracy:
            best_val_accuracy = val_accuracy
            torch.save(model.state_dict(), save_path)
            print(f'Model saved to {save_path}')

    # Evaluate the saved model using the evaluate_pt_model function
    metrics = evaluate_pt_model(
        model_path="/Users/yangshuntian/cs4180/src/cs4180/final_project/new_dqn_multi/itr1/dqn_trading_agent_AAPL.pt",
        X_test=X_test,
        y_test=y_test,
        scaler=scaler,
        visualize=True
    )

    # Optionally, you can access the metrics dictionary
    # print(metrics)

if __name__ == '__main__':
    main()


Epoch [1/50], Train Loss: 1.0349, Validation Accuracy: 0.3750
Model saved to iris_model.pt
Epoch [2/50], Train Loss: 1.0088, Validation Accuracy: 0.4167
Model saved to iris_model.pt
Epoch [3/50], Train Loss: 0.9825, Validation Accuracy: 0.5000
Model saved to iris_model.pt
Epoch [4/50], Train Loss: 0.9557, Validation Accuracy: 0.6250
Model saved to iris_model.pt
Epoch [5/50], Train Loss: 0.9297, Validation Accuracy: 0.7083
Model saved to iris_model.pt
Epoch [6/50], Train Loss: 0.9037, Validation Accuracy: 0.6667
Epoch [7/50], Train Loss: 0.8774, Validation Accuracy: 0.7083
Epoch [8/50], Train Loss: 0.8514, Validation Accuracy: 0.7917
Model saved to iris_model.pt
Epoch [9/50], Train Loss: 0.8250, Validation Accuracy: 0.8333
Model saved to iris_model.pt
Epoch [10/50], Train Loss: 0.7989, Validation Accuracy: 0.8333
Epoch [11/50], Train Loss: 0.7738, Validation Accuracy: 0.8333
Epoch [12/50], Train Loss: 0.7471, Validation Accuracy: 0.8333
Epoch [13/50], Train Loss: 0.7228, Validation Accu

RuntimeError: Error(s) in loading state_dict for IrisNet:
	Missing key(s) in state_dict: "fc1.weight", "fc1.bias", "fc2.weight", "fc2.bias". 
	Unexpected key(s) in state_dict: "network.0.weight", "network.0.bias", "network.3.weight", "network.3.bias", "network.6.weight", "network.6.bias". 

In [11]:
path = "src/cs4180/final_project/new_dqn_multi/itr1/dqn_trading_agent_AAPL.pt"

def example_basic_evaluation():
    # 加载 Iris 数据集
    iris = load_iris()
    X = iris.data
    y = iris.target

    # 标准化特征
    scaler = StandardScaler()
    X = scaler.fit_transform(X)

    # 分割数据集为训练集和测试集（80% 训练，20% 测试）
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42, stratify=y
    )

    # 模型文件路径
    # model_path = path

    # 评估模型
    metrics = evaluate_pt_model(
        model_path= path,
        X_test=X_test,
        y_test=y_test,
        scaler=scaler,
        visualize=True
    )

    # 打印评估指标
    print(metrics)
