In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class GatedLinearUnit(nn.Module):
    def __init__(self, input_size, output_size):
        super(GatedLinearUnit, self).__init__()
        self.linear = nn.Linear(input_size, output_size)
        self.gate = nn.Linear(input_size, output_size)

    def forward(self, x):
        return self.linear(x) * torch.sigmoid(self.gate(x))

class LayerNormLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, dropout, bidirectional):
        super(LayerNormLSTM, self).__init__()
        self.lstm = nn.LSTM(
            input_size, hidden_size, num_layers=num_layers,
            batch_first=True, dropout=dropout, bidirectional=bidirectional
        )
        self.layer_norm = nn.LayerNorm(hidden_size * (2 if bidirectional else 1))

    def forward(self, x):
        lstm_out, _ = self.lstm(x)
        lstm_out = self.layer_norm(lstm_out)
        return lstm_out

class LayerDropLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, dropout, bidirectional, layer_drop_prob=0.2):
        super(LayerDropLSTM, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers=num_layers,
                            batch_first=True, dropout=dropout, bidirectional=bidirectional)
        self.layer_drop_prob = layer_drop_prob
        self.projection = nn.Linear(input_size, hidden_size * (2 if bidirectional else 1))

    def forward(self, x):
        if self.training and torch.rand(1).item() < self.layer_drop_prob:
            return self.projection(x)  # Project input to match LSTM output size
        lstm_out, _ = self.lstm(x)
        return lstm_out

class MultiHeadSelfAttention(nn.Module):
    def __init__(self, hidden_size, num_heads):
        super(MultiHeadSelfAttention, self).__init__()
        self.attention = nn.MultiheadAttention(embed_dim=hidden_size, num_heads=num_heads, batch_first=True)

    def forward(self, x):
        attn_output, _ = self.attention(x, x, x)
        return attn_output

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.pos_embedding = nn.Embedding(max_len, d_model)

    def forward(self, x):
        seq_len = x.size(1)
        positions = torch.arange(0, seq_len, dtype=torch.long, device=x.device).unsqueeze(0)
        pos_enc = self.pos_embedding(positions)
        return x + pos_enc

class Mish(nn.Module):
    def forward(self, x):
        return x * torch.tanh(F.softplus(x))


class ResidualLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, dropout, bidirectional):
        super(ResidualLSTM, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers=num_layers,
                            batch_first=True, dropout=dropout, bidirectional=bidirectional)
        self.projection = nn.Linear(hidden_size * (2 if bidirectional else 1), hidden_size * (2 if bidirectional else 1))

    def forward(self, x):
        lstm_out, _ = self.lstm(x)
        projected_out = self.projection(lstm_out)
        return projected_out



class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes, lstm_dropout=0.3, fcn_dropout=0.5, debug=False):
        super(LSTMModel, self).__init__()
        self.bidirectional = True
        self.num_directions = 2 if self.bidirectional else 1
        self.debug = debug
        
        self.num_heads = 8
        self.hidden_size = hidden_size // (self.num_directions * 3)
        
        if self.debug:
            print(f"Adjusted hidden_size: {self.hidden_size}")
        
        self.embedding_size = input_size

        self.positional_encoding = PositionalEncoding(self.embedding_size)

        self.lstm1 = LayerNormLSTM(
            self.embedding_size, self.hidden_size, num_layers=num_layers,
            dropout=lstm_dropout, bidirectional=self.bidirectional
        )
        self.lstm2 = ResidualLSTM(
            self.embedding_size, self.hidden_size, num_layers=num_layers,
            dropout=lstm_dropout, bidirectional=self.bidirectional
        )
        self.lstm3 = LayerDropLSTM(
            self.embedding_size, self.hidden_size, num_layers=num_layers,
            dropout=lstm_dropout, bidirectional=self.bidirectional
        )

        self.combined_lstm_size = self.hidden_size * self.num_directions * 3
        
        if self.debug:
            print(f"Combined LSTM size: {self.combined_lstm_size}")
            print(f"Number of heads: {self.num_heads}")
            print(f"Is combined_lstm_size divisible by num_heads? {self.combined_lstm_size % self.num_heads == 0}")

        self.transformer_layer = nn.TransformerEncoderLayer(
            d_model=self.combined_lstm_size,
            nhead=self.num_heads,
            batch_first=True
        )
        self.transformer_encoder = nn.TransformerEncoder(
            self.transformer_layer, num_layers=3
        )

        self.attention = MultiHeadSelfAttention(self.combined_lstm_size, self.num_heads)

        self.glu1 = GatedLinearUnit(self.combined_lstm_size, self.combined_lstm_size // 2)
        self.glu2 = GatedLinearUnit(self.combined_lstm_size // 2, self.combined_lstm_size // 4)

        self.fc = nn.Linear(self.combined_lstm_size // 4, num_classes)

        self.layer_norm2 = nn.LayerNorm(self.combined_lstm_size // 2)
        self.layer_norm3 = nn.LayerNorm(self.combined_lstm_size // 4)

        self.dropout = nn.Dropout(p=fcn_dropout)

    def forward(self, x):
        x = self.positional_encoding(x)
        
        if self.debug:
            print(f"Shape after positional encoding: {x.shape}")

        lstm_out1 = self.lstm1(x)
        lstm_out2 = self.lstm2(x)
        lstm_out3 = self.lstm3(x)
        
        if self.debug:
            print(f"Shape of lstm_out1: {lstm_out1.shape}")
            print(f"Shape of lstm_out2: {lstm_out2.shape}")
            print(f"Shape of lstm_out3: {lstm_out3.shape}")

        lstm_out_concat = torch.cat((lstm_out1, lstm_out2, lstm_out3), dim=-1)
        
        if self.debug:
            print(f"Shape after concatenation: {lstm_out_concat.shape}")

        transformer_out = self.transformer_encoder(lstm_out_concat)
        transformer_out = lstm_out_concat + transformer_out  
        
        if self.debug:
            print(f"Shape after transformer: {transformer_out.shape}")

        attn_out = self.attention(transformer_out)
        attn_out = transformer_out + attn_out 
        
        if self.debug:
            print(f"Shape after attention: {attn_out.shape}")

        global_avg_pool = torch.mean(attn_out, dim=1)
        
        if self.debug:
            print(f"Shape after global average pooling: {global_avg_pool.shape}")
            
        out = self.glu1(global_avg_pool)
        out = self.layer_norm2(out)
        out = self.dropout(out)
        
        if self.debug:
            print(f"Shape after first GLU: {out.shape}")

        out = self.glu2(out)
        out = self.layer_norm3(out)
        out = self.dropout(out)
        
        if self.debug:
            print(f"Shape after second GLU: {out.shape}")

        out = self.fc(out)
        
        if self.debug:
            print(f"Final output shape: {out.shape}")

        return out

class FocalLoss(nn.Module):
    def __init__(self, alpha, gamma=2.5, reduction='mean', label_smoothing=0.1):
        super(FocalLoss, self).__init__()
        self.alpha = torch.tensor(alpha) 
        self.gamma = gamma
        self.reduction = reduction
        self.label_smoothing = label_smoothing

    def forward(self, inputs, targets):
        num_classes = inputs.size(1)
        smoothed_labels = F.one_hot(targets, num_classes=num_classes)
        smoothed_labels = smoothed_labels * (1 - self.label_smoothing) + self.label_smoothing / num_classes

        ce_loss = F.cross_entropy(inputs, targets, reduction='none', label_smoothing=self.label_smoothing)
        pt = torch.exp(-ce_loss)
        focal_loss = self.alpha[targets] * (1 - pt) ** self.gamma * ce_loss

        if self.reduction == 'mean':
            return focal_loss.mean()
        elif self.reduction == 'sum':
            return focal_loss.sum()
        else:
            return focal_loss

In [4]:
input_size = 35
hidden_size = 384 #should be divisible by (num_directions * 3 * num_heads) # multiple of 86
num_layers = 2
num_epochs = 55
num_classes = 5
bagging = 5
batch_size = 16
early_stop_patience = 10
subset_size = 555
alpha_np = [0.70, 0.27, 0.01, 0.01, 0.01]

In [10]:
model = LSTMModel(input_size, hidden_size, num_layers, num_classes)

In [12]:
import pandas as pd 
import numpy as np
merged_data_2024_test_cleaned = pd.read_csv('X_test.csv')

In [14]:
X_test_numeric = merged_data_2024_test_cleaned.select_dtypes(include=[float, int])


X_test_tensor = torch.tensor(X_test_numeric.values, dtype=torch.float32)

def create_sequences_test(X, seq_length):
    sequences = []
    for i in range(seq_length, len(X)):
        X_seq = X[i-seq_length:i]
        sequences.append(X_seq)
    return torch.stack(sequences)

sequence_length = 15
X_test_sequences = create_sequences_test(X_test_tensor, sequence_length)

test_dataset = torch.utils.data.TensorDataset(X_test_sequences)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=32, shuffle=False)

In [16]:
X_test_numeric = merged_data_2024_test_cleaned.select_dtypes(include=[float, int])


X_test_tensor = torch.tensor(X_test_numeric.values, dtype=torch.float32)

def create_sequences_test(X, seq_length):
    sequences = []
    for i in range(seq_length, len(X)):
        X_seq = X[i-seq_length:i]
        sequences.append(X_seq)
    return torch.stack(sequences)

sequence_length = 3
X_test_sequences = create_sequences_test(X_test_tensor, sequence_length)

test_dataset = torch.utils.data.TensorDataset(X_test_sequences)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=32, shuffle=False)

In [18]:
model.load_state_dict(torch.load('bagged_model_1.pth', weights_only=True))

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model.eval()

test_predictions = []

with torch.no_grad():
    for X_batch in test_loader:
        X_batch = X_batch[0].to(device)  
        outputs = model(X_batch)         
        _, predicted = torch.max(outputs, 1)  
        test_predictions.extend(predicted.cpu().numpy()) 

In [19]:
test_predictions = []

with torch.no_grad():
    for X_batch in test_loader:
        X_batch = X_batch[0].to(device)  
        outputs = model(X_batch)         
        _, predicted = torch.max(outputs, 1)  
        test_predictions.extend(predicted.cpu().numpy()) 

In [22]:
label_mapping_legend = {'Good': 0, 'Moderate': 1, 'Poor': 2, 'Severe': 3, 'Unhealthy': 4}

reverse_label_mapping = {v: k for k, v in label_mapping_legend.items()}

test_predictions_labels = pd.Series([reverse_label_mapping[pred] for pred in test_predictions], name='Predicted_AQI')

ID_column = pd.Series(range(1, len(test_predictions) + 1), name='ID')

predictions_df = pd.concat([ID_column, test_predictions_labels], axis=1)

predictions_df.to_csv('test_predictions_with_labels_sampling.csv', index=False)

print(predictions_df.head())

   ID Predicted_AQI
0   1          Good
1   2          Good
2   3          Good
3   4          Good
4   5          Good


In [24]:
test_predictions_labels.unique()

array(['Good'], dtype=object)