In [43]:
import numpy as np
import torch
from torch import nn
from sklearn.model_selection import train_test_split

# Nonlinear time-series generator (Dataset)

In [44]:
import numpy as np
import torch
from torch import nn

def generate_random_dataset(
    n_samples=5000,
    n_cont_features=10,
    n_cat_features=5,
    n_classes=3,
    sequence_length=None,   # now optional
    random_state=42
):
    """
    Generate an unbiased classification dataset using a randomly initialized neural network.
    Automatically chooses a valid sequence_length for LSTM reshaping.
    """

    torch.manual_seed(random_state)
    np.random.seed(random_state)

    # ---- 1. Generate input features ----
    X_cont = np.random.randn(n_samples, n_cont_features)
    X_cat = np.random.randint(0, 5, size=(n_samples, n_cat_features))
    X = np.hstack([X_cont, X_cat])
    input_dim = X.shape[1]

    # ---- 2. Random neural network to generate labels ----
    model = nn.Sequential(
        nn.Linear(input_dim, 32),
        nn.ReLU(),
        nn.Linear(32, 32),
        nn.ReLU(),
        nn.Linear(32, n_classes)
    )

    for p in model.parameters():
        p.requires_grad = False

    with torch.no_grad():
        logits = model(torch.tensor(X, dtype=torch.float32))
        probs = torch.softmax(logits, dim=1).numpy()
        y = np.argmax(probs, axis=1)

    # ---- 3. Prepare MLP + CNN ----
    X_mlp = X.copy()
    X_cnn = X.reshape(n_samples, 1, -1)

    # ---- 4. Auto-select a valid LSTM sequence_length ----
    total_features = X.shape[1]

    if sequence_length is None:
        # choose the largest divider less than sqrt(total_features)
        divs = [d for d in range(2, total_features+1) if total_features % d == 0]
        divs.sort()
        sequence_length = divs[0]   # pick smallest valid divider > 1

    if total_features % sequence_length != 0:
        raise ValueError(
            f"sequence_length={sequence_length} does not divide total_features={total_features}"
        )

    features_per_step = total_features // sequence_length
    X_lstm = X.reshape(n_samples, sequence_length, features_per_step)

    return X_mlp, X_cnn, X_lstm, y


* MLP --> receives the raw feature matrix X
* CNN --> Receives the same 15 features, but arranged as a single channel: This is just a reshape, no new data is created.
* LSTM --> Receives the same 15 features, but split into time steps: This also contains the same numbers, merely reorganized so the LSTM can process it.

In [45]:
X_mlp, X_cnn, X_lstm, y = generate_random_dataset()

In [46]:
print(X_mlp.shape)
print(X_cnn.shape)
print(X_lstm.shape)
print(y.shape)

(5000, 15)
(5000, 1, 15)
(5000, 3, 5)
(5000,)


### MLP Experiment (S1, S2 and S3 comparison)

* S1 = MedianImpute → StandardScaler → OneHotEncoder
* S2 = KNNImputer(k=5) → MinMaxScaler → TargetEncode
* S3 = MICE(3 iters) → RobustScaler → Embedding Layer (this will be ignored for MLP)


In [47]:
from sklearn.impute import SimpleImputer, KNNImputer
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler, OneHotEncoder
from category_encoders import TargetEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import FunctionTransformer


In [48]:
num_cols = list(range(10))
cat_cols = list(range(10, 15))


In [49]:
preprocess_S1 = ColumnTransformer(
    transformers=[
        ("num", Pipeline([
            ("imputer", SimpleImputer(strategy="median")),
            ("scaler", StandardScaler())
        ]), num_cols),

        ("cat", Pipeline([
            ("imputer", SimpleImputer(strategy="most_frequent")),
            ("onehot", OneHotEncoder(handle_unknown="ignore"))
        ]), cat_cols)
    ]
)

preprocess_S2 = ColumnTransformer(
    transformers=[
        ("num", Pipeline([
            ("imputer", KNNImputer(n_neighbors=5)),
            ("scaler", MinMaxScaler())
        ]), num_cols),

        ("cat", Pipeline([
            ("imputer", SimpleImputer(strategy="most_frequent")),
            ("target", TargetEncoder())
        ]), cat_cols)
    ]
)

identity = FunctionTransformer(lambda x: x)

preprocess_S3 = ColumnTransformer(
    transformers=[
        ("num", Pipeline([
            ("imputer", IterativeImputer(max_iter=3)),
            ("scaler", RobustScaler())
        ]), num_cols),

        ("cat", Pipeline([
            ("imputer", SimpleImputer(strategy="most_frequent")),
            ("identity", identity)
        ]), cat_cols)
    ]
)