In [4]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as Data
import numpy as np
import warnings
import os
from sklearn.model_selection import train_test_split
from torch.utils.data import TensorDataset, DataLoader
from sklearn.metrics import confusion_matrix, classification_report

In [5]:
def read_data(MK=True):
    
    flux = np.load('/Users/portia/Documents/AAA-College/AI/ai4astro/spec_cnn/flux.npy')
    scls = np.load('/Users/portia/Documents/AAA-College/AI/ai4astro/spec_cnn/spectypes.npy')
    
    unique_classes = np.unique(scls)
    print("Unique classes:", unique_classes)
    
    # If class labels start from 1, convert to start from 0
    if unique_classes.min() == 1:
        scls = scls - 1
        print("Classes converted to start from 0:", np.unique(scls))
    
    print(f"Data shape: {flux.shape}, Label shape: {scls.shape}")
    
    # Split training and testing sets
    fluxTR, fluxTE, clsTR, clsTE = train_test_split(flux, scls, test_size=0.2, random_state=42)
    
    Xtrain = torch.from_numpy(fluxTR).float()
    Xtest = torch.from_numpy(fluxTE).float()
    ytrain = torch.from_numpy(clsTR).long()
    ytest = torch.from_numpy(clsTE).long()
    
    # Create datasets and data loaders
    torch_dataset_train = TensorDataset(Xtrain, ytrain)
    torch_dataset_test = TensorDataset(Xtest, ytest)
    
    data_loader_train = DataLoader(dataset=torch_dataset_train, batch_size=batch_size, shuffle=True)
    data_loader_test = DataLoader(dataset=torch_dataset_test, batch_size=batch_size, shuffle=False)
    
    return data_loader_train, data_loader_test, clsTR.shape[0], clsTE.shape[0]

In [6]:
class RNN_Model(nn.Module):
    
    def __init__(self, input_size, hidden_size, num_layers, num_class):
        super(RNN_Model, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        self.rnn = nn.RNN(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True
        )
        
        self.dense = nn.Sequential(
            nn.ReLU(),
            nn.Linear(hidden_size, num_class),
            nn.Softmax(dim=1)
        )

    def forward(self, x):
       
        # Check input dimensions, if 2D, add sequence dimension
        if len(x.shape) == 2:
            x = x.unsqueeze(1)
            
        # Initialize hidden state
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        
        # Forward propagation
        out, _ = self.rnn(x, h0)
        
        # Take output from the last time step
        out = out[:, -1, :]
        out = self.dense(out)
        
        return out

In [7]:
def run_RNN_module(model, device, num_class, num_epochs, batch_size, learning_rate, train_loader, test_loader):
    
    criterion = nn.CrossEntropyLoss()
    
    optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9)
    
    for epoch in range(num_epochs):
        # Training mode
        model.train()
        train_loss = 0.0
        correct = 0
        total = 0
        
        for batch_x, batch_y in train_loader:
            batch_x = batch_x.to(device)
            batch_y = batch_y.to(device)
            
            # Forward propagation
            outputs = model(batch_x)
            loss = criterion(outputs, batch_y)
            
            # Backward propagation and optimization
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            # Accumulate loss and accuracy
            train_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += batch_y.size(0)
            correct += (predicted == batch_y).sum().item()
        
        # Evaluation mode
        model.eval()
        test_loss = 0.0
        test_correct = 0
        test_total = 0
        all_preds = []
        all_targets = []
        
        with torch.no_grad():
            for batch_x, batch_y in test_loader:
                batch_x = batch_x.to(device)
                batch_y = batch_y.to(device)
                
                # Forward propagation
                outputs = model(batch_x)
                loss = criterion(outputs, batch_y)
                
                # Accumulate loss and accuracy
                test_loss += loss.item()
                _, predicted = torch.max(outputs.data, 1)
                test_total += batch_y.size(0)
                test_correct += (predicted == batch_y).sum().item()
                
                # Collect predictions and targets for confusion matrix
                all_preds.extend(predicted.cpu().numpy())
                all_targets.extend(batch_y.cpu().numpy())
        
        print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss / len(train_loader):.4f}, Train Acc: {100 * correct / total:.2f}%, Test Loss: {test_loss / len(test_loader):.4f}, Test Acc: {100 * test_correct / test_total:.2f}%')
        
        # Save confusion matrix and model every 100 epochs
        if (epoch + 1) % 100 == 0 or epoch == num_epochs - 1:
            # Calculate confusion matrix and print classification report
            print("\nClassification Report:")
            print(classification_report(all_targets, all_preds))

In [8]:
if __name__ == "__main__":
    warnings.filterwarnings("ignore")
    
    torch.manual_seed(42)
    np.random.seed(42)

    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")
    
    num_class = 3  
    num_epochs = 200  
    batch_size = 128  
    learning_rate = 0.001  
    
    # Load data
    train_loader, test_loader, train_size, test_size = read_data()
    print(f"Training set size: {train_size}, Testing set size: {test_size}")
    
    # Create model
    model = RNN_Model(
        input_size=7781,  
        hidden_size=128,  
        num_layers=2,     
        num_class=num_class
    ).to(device)
    
    # Train and evaluate model
    run_RNN_module(
        model, device, num_class, num_epochs, batch_size, learning_rate, train_loader, test_loader
    )

Using device: cpu
Unique classes: [0 1 2]
Data shape: (6000, 7781), Label shape: (6000,)
Training set size: 4800, Testing set size: 1200
Epoch [1/200], Train Loss: 1.0748, Train Acc: 52.25%, Test Loss: 1.0655, Test Acc: 51.17%
Epoch [2/200], Train Loss: 1.0537, Train Acc: 51.35%, Test Loss: 1.0460, Test Acc: 50.08%
Epoch [3/200], Train Loss: 1.0353, Train Acc: 51.83%, Test Loss: 1.0303, Test Acc: 53.08%
Epoch [4/200], Train Loss: 1.0206, Train Acc: 56.08%, Test Loss: 1.0175, Test Acc: 57.67%
Epoch [5/200], Train Loss: 1.0073, Train Acc: 60.19%, Test Loss: 1.0050, Test Acc: 60.17%
Epoch [6/200], Train Loss: 0.9942, Train Acc: 62.73%, Test Loss: 0.9930, Test Acc: 61.92%
Epoch [7/200], Train Loss: 0.9810, Train Acc: 65.08%, Test Loss: 0.9808, Test Acc: 63.00%
Epoch [8/200], Train Loss: 0.9685, Train Acc: 66.81%, Test Loss: 0.9698, Test Acc: 63.50%
Epoch [9/200], Train Loss: 0.9578, Train Acc: 67.58%, Test Loss: 0.9595, Test Acc: 64.67%
Epoch [10/200], Train Loss: 0.9460, Train Acc: 68.67%