In [1]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
import matplotlib.pyplot as plt

In [2]:
n_samples = 2000
segment_length = 50

In [3]:
def generate_data(n_samples, seq_length):
    # Class 0: n_samples sequences of length seq_length, mean 0, variance 1
    x_class_0 = np.random.normal(loc=0, scale=0.5, size=(n_samples, seq_length))
    y_class_0 = np.zeros(n_samples)

    # Class 1: initialize list to hold individual sequences
    x_class_1 = []
    y_class_1 = np.ones(n_samples)

    for _ in range(n_samples):
        # Generate unique cp and ran_mean for each sample
        cp = np.random.randint(int(seq_length * 0.25), int(seq_length * 0.75))
        ran_mean = np.random.choice([-1, 1])

        # Create a sequence with left segment mean 0 and right segment mean ran_mean
        left_segment = np.random.normal(0, 0.5, (seq_length - cp))
        right_segment = np.random.normal(ran_mean, 0.5, (cp))
        x_class_1.append(np.hstack((left_segment, right_segment)))

    # Convert list to numpy array
    x_class_1 = np.array(x_class_1)

    # Combine all data
    X = np.vstack((x_class_0, x_class_1))
    y = np.concatenate((y_class_0, y_class_1))

    X_scaled = np.zeros_like(X)  # Initialize array for scaled data
    for i in range(X.shape[0]):
        scaler = MinMaxScaler(feature_range=(0, 1))
        X_scaled[i, :] = scaler.fit_transform(X[i, :].reshape(-1, 1)).flatten()

    return X_scaled, y

# Generate the data
X, y = generate_data(n_samples, segment_length)

# Convert data to PyTorch tensors
X_tensor = torch.tensor(X, dtype=torch.float32)
y_tensor = torch.tensor(y, dtype=torch.float32)

In [4]:
# for i in range(10):
#     plt.scatter(np.arange(len(X[i])), X[i])
#     plt.show()

In [5]:
# Step 2: Define the MLP Model
class MLPClassifier(nn.Module):
    def __init__(self, intput_size):
        super(MLPClassifier, self).__init__()
        self.fc1 = nn.Linear(intput_size, 64)
        self.fc2 = nn.Linear(64, 32)
        self.fc3 = nn.Linear(32, 1)
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        x = self.sigmoid(self.fc3(x))
        return x

In [6]:
# Initialize the model, loss function, and optimizer
model = MLPClassifier(segment_length)
criterion = nn.BCELoss()  # Binary Cross-Entropy Loss for binary classification
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Step 3: Train the Model
# Split data into subtrain (80%) and validation (20%)
X_subtrain, X_val, y_subtrain, y_val = train_test_split(X_tensor, y_tensor, test_size=0.2, random_state=42)

# Early stopping criteria
min_val_loss = float('inf')
epochs_without_improvement = 0
early_stop_patience = 100
best_model_state = None

# Training loop
num_epochs = 10000
for epoch in range(num_epochs):
    # Set model to training mode
    model.train()
    
    # Forward pass on subtrain data
    optimizer.zero_grad()
    outputs = model(X_subtrain).squeeze()
    loss = criterion(outputs, y_subtrain)
    
    # Backward pass and optimization
    loss.backward()
    optimizer.step()

    # Calculate accuracy on subtrain
    with torch.no_grad():
        train_preds = (outputs > 0.5).float()  # Binary prediction threshold at 0.5
        train_accuracy = (train_preds == y_subtrain).float().mean().item() * 100

    # Evaluate on validation set
    model.eval()
    with torch.no_grad():
        val_outputs = model(X_val).squeeze()
        val_loss = criterion(val_outputs, y_val)
        
        # Calculate accuracy on validation set
        val_preds = (val_outputs > 0.5).float()
        val_accuracy = (val_preds == y_val).float().mean().item() * 100

    # Early stopping check
    if val_loss < min_val_loss:
        min_val_loss = val_loss
        epochs_without_improvement = 0
        best_model_state = model.state_dict()
    else:
        epochs_without_improvement += 1

    # Print loss values and accuracies for tracking
    if epoch % 100 == 0:
        print(f"Epoch [{epoch}/{num_epochs}], Loss: {loss.item():.8f}, Val Loss: {val_loss.item():.8f}, Training Accuracy: {train_accuracy:.2f}%, Validation Accuracy: {val_accuracy:.2f}%")

    # Stop training if no improvement for `early_stop_patience` epochs
    if epochs_without_improvement >= early_stop_patience:
        print("Early stopping triggered")
        break

# Load the best model state based on validation loss
model.load_state_dict(best_model_state)
print("Training complete. Best model saved with validation loss:", min_val_loss)

Epoch [0/10000], Loss: 0.69381559, Val Loss: 0.69339567, Training Accuracy: 48.69%, Validation Accuracy: 50.50%
Epoch [100/10000], Loss: 0.17649092, Val Loss: 0.18114978, Training Accuracy: 97.97%, Validation Accuracy: 98.00%
Epoch [200/10000], Loss: 0.03152816, Val Loss: 0.04753244, Training Accuracy: 99.53%, Validation Accuracy: 98.87%
Epoch [300/10000], Loss: 0.01361094, Val Loss: 0.03740959, Training Accuracy: 99.97%, Validation Accuracy: 98.87%
Epoch [400/10000], Loss: 0.00658713, Val Loss: 0.03405719, Training Accuracy: 100.00%, Validation Accuracy: 99.25%
Epoch [500/10000], Loss: 0.00365460, Val Loss: 0.03377058, Training Accuracy: 100.00%, Validation Accuracy: 99.37%
Early stopping triggered
Training complete. Best model saved with validation loss: tensor(0.0337)


In [7]:
torch.save(model, 'trained_model.pth')

In [8]:
scripted_model = torch.jit.script(model)
scripted_model.save("trained_model.pth")