In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split

In [None]:
class VitalSignsPredictor:
    def __init__(self, sequence_length=60, feature_dim=5):
        self.sequence_length = sequence_length
        self.feature_dim = feature_dim
        self.scalers = {}

    def create_model(self):
        # Input layer
        inputs = keras.Input(
            shape=(self.sequence_length, self.feature_dim), name="vital_signs"
        )

        # Feature indices
        # Assuming feature order: [art, ecg, pleth, co2, phen]
        ART_IDX, ECG_IDX, PLETH_IDX, CO2_IDX, PHEN_IDX = 0, 1, 2, 3, 4

        # Main feature processing branch
        x = layers.LSTM(128, return_sequences=True, dropout=0.2)(inputs)
        x = layers.LSTM(64, return_sequences=False, dropout=0.2)(x)
        x = layers.BatchNormalization()(x)

        # Branch 1: Prediction with phenylephrine
        # Use all features including phen
        with_phen_features = layers.Dense(64, activation="relu")(x)
        with_phen_features = layers.Dropout(0.3)(with_phen_features)
        with_phen_features = layers.Dense(32, activation="relu")(with_phen_features)
        output_with_phen = layers.Dense(1, name="prediction_with_phen")(
            with_phen_features
        )

        # Branch 2: Prediction without phenylephrine
        # Create a modified version without phen feature
        # We'll use a separate processing branch that focuses on non-phen features
        without_phen_branch = layers.LSTM(128, return_sequences=True, dropout=0.2)(
            inputs
        )
        without_phen_branch = layers.LSTM(64, return_sequences=False, dropout=0.2)(
            without_phen_branch
        )
        without_phen_branch = layers.BatchNormalization()(without_phen_branch)

        without_phen_features = layers.Dense(64, activation="relu")(without_phen_branch)
        without_phen_features = layers.Dropout(0.3)(without_phen_features)
        without_phen_features = layers.Dense(32, activation="relu")(
            without_phen_features
        )
        output_without_phen = layers.Dense(1, name="prediction_without_phen")(
            without_phen_features
        )

        # Create model
        model = keras.Model(
            inputs=inputs, outputs=[output_with_phen, output_without_phen]
        )

        # Compile model
        model.compile(
            optimizer=keras.optimizers.Adam(learning_rate=0.001),
            loss={"prediction_with_phen": "mse", "prediction_without_phen": "mse"},
            loss_weights={"prediction_with_phen": 1.0, "prediction_without_phen": 1.0},
            metrics={
                "prediction_with_phen": ["mae"],
                "prediction_without_phen": ["mae"],
            },
        )

        return model

In [None]:
def prepare_data(self, data, target_col="art"):
    """
    Prepare time series data for training
    data: DataFrame with columns [art, ecg, pleth, co2, phen]
    """
    # Create sequences
    sequences = []
    targets_with_phen = []
    targets_without_phen = []

    for i in range(len(data) - self.sequence_length):
        seq = data.iloc[i : i + self.sequence_length].values
        target = data.iloc[i + self.sequence_length][target_col]

        sequences.append(seq)
        targets_with_phen.append(target)
        targets_without_phen.append(target)

    sequences = np.array(sequences)
    targets_with_phen = np.array(targets_with_phen)
    targets_without_phen = np.array(targets_without_phen)

    return sequences, targets_with_phen, targets_without_phen


def fit_scalers(self, data):
    """Fit scalers for each feature"""
    for i, col in enumerate(data.columns):
        self.scalers[i] = StandardScaler()
        self.scalers[i].fit(data[[col]])


def scale_data(self, data):
    """Scale data using fitted scalers"""
    scaled_data = data.copy()
    for i, col in enumerate(data.columns):
        scaled_data[col] = self.scalers[i].transform(data[[col]])
    return scaled_data

In [None]:
# Example usage and training
def main():
    # Assuming you have your data loaded as a DataFrame
    # data = pd.read_csv('your_vitaldb_data.csv')
    # For demonstration, creating sample data
    np.random.seed(42)
    n_samples = 10000
    data = pd.DataFrame(
        {
            "art": np.random.normal(120, 20, n_samples),
            "ecg": np.random.normal(70, 10, n_samples),
            "pleth": np.random.normal(95, 5, n_samples),
            "co2": np.random.normal(40, 5, n_samples),
            "phen": np.random.normal(0, 1, n_samples),
        }
    )

    # Initialize predictor
    predictor = VitalSignsPredictor(sequence_length=60, feature_dim=5)

    # Prepare data
    predictor.fit_scalers(data)
    scaled_data = predictor.scale_data(data)

    sequences, targets_with_phen, targets_without_phen = predictor.prepare_data(
        scaled_data
    )

    # Split data
    X_train, X_test, y1_train, y1_test, y2_train, y2_test = train_test_split(
        sequences,
        targets_with_phen,
        targets_without_phen,
        test_size=0.2,
        random_state=42,
    )

    # Create and train model
    model = predictor.create_model()

    # Print model architecture
    model.summary()

    # Callbacks
    callbacks = [
        keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True),
        keras.callbacks.ReduceLROnPlateau(factor=0.5, patience=5),
    ]

    # Train model
    history = model.fit(
        X_train,
        {"prediction_with_phen": y1_train, "prediction_without_phen": y2_train},
        validation_data=(
            X_test,
            {"prediction_with_phen": y1_test, "prediction_without_phen": y2_test},
        ),
        epochs=100,
        batch_size=32,
        callbacks=callbacks,
        verbose=1,
    )

    return model, predictor, history


# Alternative simpler model architecture
def create_simpler_model(sequence_length=60, feature_dim=5):
    """Alternative simpler architecture"""
    inputs = keras.Input(shape=(sequence_length, feature_dim))

    # Shared LSTM layers
    x = layers.LSTM(64, return_sequences=True, dropout=0.2)(inputs)
    x = layers.LSTM(32, return_sequences=False, dropout=0.2)(x)

    # Branch with phen
    with_phen = layers.Dense(32, activation="relu")(x)
    with_phen = layers.Dropout(0.3)(with_phen)
    output_with_phen = layers.Dense(1, name="prediction_with_phen")(with_phen)

    # Branch without phen - using same shared features but different output head
    without_phen = layers.Dense(32, activation="relu")(x)
    without_phen = layers.Dropout(0.3)(without_phen)
    output_without_phen = layers.Dense(1, name="prediction_without_phen")(without_phen)

    model = keras.Model(inputs=inputs, outputs=[output_with_phen, output_without_phen])

    model.compile(
        optimizer="adam",
        loss={"prediction_with_phen": "mse", "prediction_without_phen": "mse"},
        metrics={"prediction_with_phen": ["mae"], "prediction_without_phen": ["mae"]},
    )

    return model


# For prediction on new data
def predict_new_data(model, predictor, new_data):
    """Make predictions on new data"""
    scaled_data = predictor.scale_data(new_data)
    sequences, _, _ = predictor.prepare_data(scaled_data)

    predictions = model.predict(sequences)
    pred_with_phen, pred_without_phen = predictions

    return pred_with_phen, pred_without_phen

In [None]:
if __name__ == "__main__":
    model, predictor, history = main()