# Training a model with a GravyFlow Dataset

In this notebook we will use our generated dataset to train a keras model. We start with the needed imports: 

In [1]:
import os
os.environ['KERAS_BACKEND'] = 'jax'

# Built-in imports
from typing import List, Dict
from pathlib import Path

# Dependency imports: 
import numpy as np
import keras
from keras import ops
import jax
import jax.numpy as jnp
from keras.layers import Input, Permute, Conv1D, MaxPooling1D, Dense, Flatten, Dropout, ELU
from keras.models import Model

# Import the GravyFlow module.
import gravyflow as gf

INFO:2025-12-07 13:06:07,859:jax._src.xla_bridge:812: Unable to initialize backend 'tpu': INTERNAL: Failed to open libtpu.so: libtpu.so: cannot open shared object file: No such file or directory
2025-12-07 13:06:07,859 - INFO - Unable to initialize backend 'tpu': INTERNAL: Failed to open libtpu.so: libtpu.so: cannot open shared object file: No such file or directory
W1207 13:06:09.671928 2204553 cuda_executor.cc:1802] GPU interconnect information not available: INTERNAL: NVML doesn't support extracting fabric info or NVLink is not used by the device.
W1207 13:06:09.672523 2204548 cuda_executor.cc:1802] GPU interconnect information not available: INTERNAL: NVML doesn't support extracting fabric info or NVLink is not used by the device.
W1207 13:06:09.672784 2204549 cuda_executor.cc:1802] GPU interconnect information not available: INTERNAL: NVML doesn't support extracting fabric info or NVLink is not used by the device.
W1207 13:06:09.673236 2204550 cuda_executor.cc:1802] GPU interconne

## Creating a TensorFlow dataset through composition:

Rather than generating a generic Python iterator, we can also use GravyFlow to create a custom TensorFlow dataset. This will give us the ability to utalise all the functionality provided by the TensorFlow dataset class, including seamless integration with keras models, whilst maintaining the ability to generate datasets quickly enough for real time training, only caching downloaded data segments.

First we will create a keras model, inspired by a model from the literature, found at Gabbard _et at._ here: https://link.aps.org/doi/10.1103/PhysRevLett.120.141103:

In [2]:
def create_gabbard(
        input_shape_onsource : int, 
        input_shape_offsource : int
    ) -> keras.Model:
    
    # Define the inputs based on the dictionary keys and expected shapes
    # Replace `input_shape_onsource` and `input_shape_offsource` with the actual shapes
    onsource_input = Input(shape=input_shape_onsource, name="ONSOURCE")
    offsource_input = Input(shape=input_shape_offsource, name="OFFSOURCE")

    # Pass the inputs to your custom Whiten layer
    # Assuming your Whiten layer can handle multiple inputs
    whiten_output = gf.Whiten()([onsource_input, offsource_input])

    x = Permute((2, 1))(whiten_output)
    
    # Convolutional and Pooling layers
    x = Conv1D(8, 64, padding='valid', name="Convolutional_1")(x)
    x = ELU(name="ELU_1")(x)
    x = MaxPooling1D(pool_size=4, strides=4, name="Pooling_1", padding="valid")(x)
    
    x = Conv1D(8, 32, padding='valid', name="Convolutional_2")(x)
    x = ELU(name="ELU_2")(x)
    x = Conv1D(16, 32, padding='valid', name="Convolutional_3")(x)
    x = ELU(name="ELU_3")(x)
    x = MaxPooling1D(pool_size=4, strides=4, name="Pooling_3", padding="valid")(x)
    
    # Flatten layer
    x = Flatten(name="Flatten")(x)
    
    # Dense layers with dropout
    x = Dense(64, name="Dense_1")(x)
    x = ELU(name="ELU_7")(x)
    x = Dropout(0.5, name="Dropout_1")(x)
    
    x = Dense(64, name="Dense_2")(x)
    x = ELU(name="ELU_8")(x)
    x = Dropout(0.5, name="Dropout_2")(x)
    
    outputs = Dense(1, activation='sigmoid', name=gf.ReturnVariables.INJECTION_MASKS.name)(x)
    
    # Create model
    model = Model(
        inputs=[onsource_input, offsource_input], 
        outputs={gf.ReturnVariables.INJECTION_MASKS.name: outputs}, 
        name="custom"
    )
    
    return model

Because we are only using one injection, we expect our output label to be a single value for each example, therefore we must adjust the dimensionality of the injection masks output with tensorflow datasets mapping functionality, we define the function we want to map to the dataset here:

In [3]:
class AdapterDataset(keras.utils.PyDataset):
    def __init__(self, dataset):
        super().__init__(workers=dataset.workers, use_multiprocessing=dataset.use_multiprocessing)
        self.dataset = dataset
    
    def __len__(self):
        return len(self.dataset)
    
    def __getitem__(self, index):
        features, labels = self.dataset[index]
        if 'INJECTION_MASKS' in labels:
            mask = labels['INJECTION_MASKS']
            # Check if already processed: (Batch, 1)
            if len(mask.shape) == 2 and mask.shape[1] == 1:
                pass
            elif len(mask.shape) == 2: # (NumGenerators, Batch)
                mask = mask[0]
                mask = ops.expand_dims(mask, axis=-1)
            elif len(mask.shape) == 3: # (NumGenerators, Batch, Time)
                mask = mask[0]
                mask = jnp.max(mask, axis=-1)
                mask = ops.expand_dims(mask, axis=-1)
            
            labels['INJECTION_MASKS'] = mask
        return features, labels


TensorFlow and keras requires that the model and training dataset are created in the same scope, and is quite strict about these limitations. Thus we will here create our dataset and our model in the same scope. Nominally, it is anticipated that GravyFlow will mostly be used in Python scripts, rather than notebooks, where this will not be a problem if everything is kept in the same TensorFlow strategy:

In [4]:
examples_per_epoch = int(1E5)
num_validation_examples = int(1E4)
num_testing_examples = int(1E4)
steps_per_epoch = examples_per_epoch // gf.Defaults.num_examples_per_batch
validation_steps = num_validation_examples // gf.Defaults.num_examples_per_batch
testing_steps = num_testing_examples // gf.Defaults.num_examples_per_batch

# This object will be used to obtain real interferometer data based on specified parameters.
ifo_data_obtainer : gf.IFODataObtainer = gf.IFODataObtainer(
    observing_runs=gf.ObservingRun.O3, # Specify the observing run (e.g., O3).
    data_quality=gf.DataQuality.BEST,  # Choose the quality of the data (e.g., BEST).
    data_labels=[                      # Define the types of data to include.
        gf.DataLabel.NOISE, 
        gf.DataLabel.GLITCHES
    ],
    segment_order=gf.SegmentOrder.RANDOM, # Order of segment retrieval (e.g., RANDOM).
    force_acquisition=True,               # Force the acquisition of new data.
    cache_segments=False                  # Choose not to cache the segments.
)

# Initialize the noise generator wrapper:
# This wrapper will use the ifo_data_obtainer to generate real noise based on the specified parameters.
noise: gf.NoiseObtainer = gf.NoiseObtainer(
    ifo_data_obtainer=ifo_data_obtainer, # Use the previously set up IFODataObtainer object.
    noise_type=gf.NoiseType.REAL,        # Specify the type of noise as REAL.
    ifos=gf.IFO.L1                       # Specify the interferometer (e.g., LIGO Livingston L1).
)

scaling_method : gf.ScalingMethod = gf.ScalingMethod(
    value=gf.Distribution(
        min_=8.0,
        max_=15.0,
        type_=gf.DistributionType.UNIFORM
    ),
    type_=gf.ScalingTypes.SNR
)

# Define a uniform distribution for the mass of the first object in solar masses.
mass_1_distribution_msun : gf.Distribution = gf.Distribution(
    min_=10.0, 
    max_=60.0, 
    type_=gf.DistributionType.UNIFORM
)

# Define a uniform distribution for the mass of the second object in solar masses.
mass_2_distribution_msun : gf.Distribution = gf.Distribution(
    min_=10.0, 
    max_=60.0, 
    type_=gf.DistributionType.UNIFORM
)

# Define a uniform distribution for the inclination of the binary system in radians.
inclination_distribution_radians : gf.Distribution = gf.Distribution(
    min_=0.0, 
    max_=np.pi, 
    type_=gf.DistributionType.UNIFORM
)

# Initialize a PhenomD waveform generator with the defined distributions.
# This generator will produce waveforms with randomly varied masses and inclination angles.
phenom_d_generator : gf.WaveformGenerator = gf.CBCGenerator(
    mass_1_msun=mass_1_distribution_msun,
    mass_2_msun=mass_2_distribution_msun,
    inclination_radians=inclination_distribution_radians,
    scaling_method=scaling_method,
    injection_chance=0.5 # Set so half produced examples will not contain this signal
)

training_dataset  = gf.Dataset(       
    noise_obtainer=noise,
    waveform_generators=phenom_d_generator,
    input_variables=[
        gf.ReturnVariables.ONSOURCE, 
        gf.ReturnVariables.OFFSOURCE, 
    ],
    output_variables=[
        gf.ReturnVariables.INJECTION_MASKS
    ]
)

validation_dataset  = gf.Dataset(       
    noise_obtainer=noise,
    waveform_generators=phenom_d_generator,
    seed=1001, # Implement different seed to generate different waveforms,
    group="validate", # Ensure noise is pulled from those marked for validation.
    input_variables=[
        gf.ReturnVariables.ONSOURCE, 
        gf.ReturnVariables.OFFSOURCE, 
    ],
    output_variables=[
        gf.ReturnVariables.INJECTION_MASKS
    ]
)

testing_dataset  = gf.Dataset(     
    num_examples_per_batch = 32,  
    noise_obtainer=noise,
    waveform_generators=phenom_d_generator,
    seed=1002, # Implement different seed to generate different waveforms,
    group="test", # Ensure noise is pulled from those marked for validation.
    input_variables=[
        gf.ReturnVariables.ONSOURCE, 
        gf.ReturnVariables.OFFSOURCE, 
    ],
    output_variables=[
        gf.ReturnVariables.INJECTION_MASKS
    ]
)

for input_example, _ in [training_dataset[0]]:
    input_shape_onsource = input_example["ONSOURCE"].shape[1:]  # Exclude batch dimension    
    input_shape_offsource = input_example["OFFSOURCE"].shape[1:] 

model = create_gabbard(input_shape_onsource, input_shape_offsource)

# Now you can print the model summary
model.summary()

# Model compilation
model.compile(
    optimizer='adam',
    loss='binary_crossentropy',  # Or any other loss function appropriate for your task
    metrics=['accuracy']
)

training_dataset = AdapterDataset(training_dataset)
validation_dataset = AdapterDataset(validation_dataset)
testing_dataset = AdapterDataset(testing_dataset)

2025-12-07 13:06:11,513 - INFO - Validating SciToken with jti: https://cilogon.org/oauth2/74a27edc3406eacca927215205620ba3?type=accessToken&ts=1765131015701&version=v2.0&lifetime=10800000
2025-12-07 13:06:12,134 - INFO - Loading event times from cache.
2025-12-07 13:06:13,210 - INFO - Validating SciToken with jti: https://cilogon.org/oauth2/74a27edc3406eacca927215205620ba3?type=accessToken&ts=1765131015701&version=v2.0&lifetime=10800000
2025-12-07 13:06:13,952 - INFO - Loading event times from cache.
2025-12-07 13:06:14,042 - INFO - Validating SciToken with jti: https://cilogon.org/oauth2/74a27edc3406eacca927215205620ba3?type=accessToken&ts=1765131015701&version=v2.0&lifetime=10800000
2025-12-07 13:06:14,700 - INFO - Loading event times from cache.
2025-12-07 13:06:14,961 - INFO - Finding URLs for L-L1_HOFT_C01 in interval [1242384384.1, 1242386157.9), ext='gwf', urltype='file', match=None
2025-12-07 13:06:14,970 - INFO - Validating SciToken with jti: https://cilogon.org/oauth2/74a27ed

If the previous cell failed it is most likely because you attempted to run it twice within the same kernel session. The kernal must be restarted in order to generate a fresh TensorFlow stratergy and recompile the model.

Finally, we can train the model with our generated dataset:

In [None]:

history = model.fit(
    training_dataset,
    epochs=10,  # Number of epochs to train for
    validation_data=validation_dataset,
)

Epoch 1/10
[1m 169/1000[0m [32m━━━[0m[37m━━━━━━━━━━━━━━━━━[0m [1m36s[0m 44ms/step - accuracy: 0.5372 - loss: 0.7712

KeyboardInterrupt: 

In [None]:
model.evaluate(
    testing_dataset, 
)