# Training a model with a GravyFlow Dataset

In this notebook we will use our generated dataset to train a keras model. We start with the needed imports: 

In [1]:
# Built-in imports
from typing import List, Dict
from pathlib import Path

# Dependency imports: 
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Input, Conv1D, MaxPooling1D, Dense, Flatten, Dropout, ELU
from tensorflow.keras.models import Model

# Import the GravyFlow module.
import gravyflow as gf

2025-02-11 11:03:19.346055: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1739293399.367879 1377671 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1739293399.374751 1377671 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-02-11 11:03:19.399047: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


## Creating a TensorFlow dataset through composition:

Rather than generating a generic Python iterator, we can also use GravyFlow to create a custom TensorFlow dataset. This will give us the ability to utalise all the functionality provided by the TensorFlow dataset class, including seamless integration with keras models, whilst maintaining the ability to generate datasets quickly enough for real time training, only caching downloaded data segments.

First we will create a keras model, inspired by a model from the literature, found at Gabbard _et at._ here: https://link.aps.org/doi/10.1103/PhysRevLett.120.141103:

In [2]:
def create_gabbard(
        input_shape_onsource : int, 
        input_shape_offsource : int
    ) -> tf.keras.Model:
    
    # Define the inputs based on the dictionary keys and expected shapes
    # Replace `input_shape_onsource` and `input_shape_offsource` with the actual shapes
    onsource_input = Input(shape=input_shape_onsource, name="ONSOURCE")
    offsource_input = Input(shape=input_shape_offsource, name="OFFSOURCE")

    # Pass the inputs to your custom Whiten layer
    # Assuming your Whiten layer can handle multiple inputs
    whiten_output = gf.Whiten()([onsource_input, offsource_input])

    x = gf.Reshape()(whiten_output)
    
    # Convolutional and Pooling layers
    x = Conv1D(8, 64, padding='same', name="Convolutional_1")(x)
    x = ELU(name="ELU_1")(x)
    x = MaxPooling1D(pool_size=4, strides=4, name="Pooling_1", padding="same")(x)
    
    x = Conv1D(8, 32, padding='same', name="Convolutional_2")(x)
    x = ELU(name="ELU_2")(x)
    x = Conv1D(16, 32, padding='same', name="Convolutional_3")(x)
    x = ELU(name="ELU_3")(x)
    x = MaxPooling1D(pool_size=4, strides=4, name="Pooling_3", padding="same")(x)
    
    # Flatten layer
    x = Flatten(name="Flatten")(x)
    
    # Dense layers with dropout
    x = Dense(64, name="Dense_1")(x)
    x = ELU(name="ELU_7")(x)
    x = Dropout(0.5, name="Dropout_1")(x)
    
    x = Dense(64, name="Dense_2")(x)
    x = ELU(name="ELU_8")(x)
    x = Dropout(0.5, name="Dropout_2")(x)
    
    outputs = Dense(1, activation='sigmoid', name="INJECTION_MASKS")(x)
    
    # Create model
    model = Model(inputs=[onsource_input, offsource_input], outputs=outputs, name="custom")
    
    return model

Because we are only using one injection, we expect our output label to be a single value for each example, therefore we must adjust the dimensionality of the injection masks output with tensorflow datasets mapping functionality, we define the function we want to map to the dataset here:

In [3]:
def adjust_features(features, labels):
    labels['INJECTION_MASKS'] = labels['INJECTION_MASKS'][0]
    return features, labels

TensorFlow and keras requires that the model and training dataset are created in the same scope, and is quite strict about these limitations. Thus we will here create our dataset and our model in the same scope. Nominally, it is anticipated that GravyFlow will mostly be used in Python scripts, rather than notebooks, where this will not be a problem if everything is kept in the same TensorFlow strategy:

In [4]:
with gf.env(memory_to_allocate_tf=5000):
    # This object will be used to obtain real interferometer data based on specified parameters.
    ifo_data_obtainer : gf.IFODataObtainer = gf.IFODataObtainer(
        observing_runs=gf.ObservingRun.O3, # Specify the observing run (e.g., O3).
        data_quality=gf.DataQuality.BEST,  # Choose the quality of the data (e.g., BEST).
        data_labels=[                      # Define the types of data to include.
            gf.DataLabel.NOISE, 
            gf.DataLabel.GLITCHES
        ],
        segment_order=gf.SegmentOrder.RANDOM, # Order of segment retrieval (e.g., RANDOM).
        force_acquisition=True,               # Force the acquisition of new data.
        cache_segments=False                  # Choose not to cache the segments.
    )

    # Initialize the noise generator wrapper:
    # This wrapper will use the ifo_data_obtainer to generate real noise based on the specified parameters.
    noise: gf.NoiseObtainer = gf.NoiseObtainer(
        ifo_data_obtainer=ifo_data_obtainer, # Use the previously set up IFODataObtainer object.
        noise_type=gf.NoiseType.REAL,        # Specify the type of noise as REAL.
        ifos=gf.IFO.L1                       # Specify the interferometer (e.g., LIGO Livingston L1).
    )

    scaling_method : gf.ScalingMethod = gf.ScalingMethod(
        value=gf.Distribution(
            min_=8.0,
            max_=15.0,
            type_=gf.DistributionType.UNIFORM
        ),
        type_=gf.ScalingTypes.SNR
    )

    # Define a uniform distribution for the mass of the first object in solar masses.
    mass_1_distribution_msun : gf.Distribution = gf.Distribution(
        min_=10.0, 
        max_=60.0, 
        type_=gf.DistributionType.UNIFORM
    )

    # Define a uniform distribution for the mass of the second object in solar masses.
    mass_2_distribution_msun : gf.Distribution = gf.Distribution(
        min_=10.0, 
        max_=60.0, 
        type_=gf.DistributionType.UNIFORM
    )

    # Define a uniform distribution for the inclination of the binary system in radians.
    inclination_distribution_radians : gf.Distribution = gf.Distribution(
        min_=0.0, 
        max_=np.pi, 
        type_=gf.DistributionType.UNIFORM
    )

    # Initialize a PhenomD waveform generator with the defined distributions.
    # This generator will produce waveforms with randomly varied masses and inclination angles.
    phenom_d_generator : gf.WaveformGenerator = gf.cuPhenomDGenerator(
        mass_1_msun=mass_1_distribution_msun,
        mass_2_msun=mass_2_distribution_msun,
        inclination_radians=inclination_distribution_radians,
        scaling_method=scaling_method,
        injection_chance=0.5 # Set so half produced examples will not contain this signal
    )
    
    training_dataset : tf.data.Dataset = gf.Dataset(       
        noise_obtainer=noise,
        waveform_generators=phenom_d_generator,
        input_variables=[
            gf.ReturnVariables.ONSOURCE, 
            gf.ReturnVariables.OFFSOURCE, 
        ],
        output_variables=[
            gf.ReturnVariables.INJECTION_MASKS
        ]
    ).map(adjust_features)

    validation_dataset : tf.data.Dataset = gf.Dataset(       
        noise_obtainer=noise,
        waveform_generators=phenom_d_generator,
        seed=1001, # Implement different seed to generate different waveforms,
        group="validate", # Ensure noise is pulled from those marked for validation.
        input_variables=[
            gf.ReturnVariables.ONSOURCE, 
            gf.ReturnVariables.OFFSOURCE, 
        ],
        output_variables=[
            gf.ReturnVariables.INJECTION_MASKS
        ]
    ).map(adjust_features)

    testing_dataset : tf.data.Dataset = gf.Dataset(       
        noise_obtainer=noise,
        waveform_generators=phenom_d_generator,
        seed=1002, # Implement different seed to generate different waveforms,
        group="test", # Ensure noise is pulled from those marked for validation.
        input_variables=[
            gf.ReturnVariables.ONSOURCE, 
            gf.ReturnVariables.OFFSOURCE, 
        ],
        output_variables=[
            gf.ReturnVariables.INJECTION_MASKS
        ]
    ).map(adjust_features)
    
    for input_example, _ in training_dataset.take(1):
        input_shape_onsource = input_example["ONSOURCE"].shape[1:]  # Exclude batch dimension    
        input_shape_offsource = input_example["OFFSOURCE"].shape[1:] 
    
    model = create_gabbard(input_shape_onsource, input_shape_offsource)

    # Now you can print the model summary
    model.summary()
    
    # Model compilation
    model.compile(
        optimizer='adam',
        loss='binary_crossentropy',  # Or any other loss function appropriate for your task
        metrics=['accuracy']
    )

2025-02-11 11:03:33,407 - INFO - Available GPUs: ['5']
2025-02-11 11:03:33.663750: W tensorflow/core/common_runtime/gpu/gpu_bfc_allocator.cc:47] Overriding orig_value setting because the TF_FORCE_GPU_ALLOW_GROWTH environment variable is set. Original config value was 0.
I0000 00:00:1739293413.664193 1377671 gpu_device.cc:2022] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 5000 MB memory:  -> device: 5, name: Tesla V100-SXM2-16GB, pci bus id: 0000:86:00.0, compute capability: 7.0
2025-02-11 11:03:33,704 - INFO - Visible GPUs after restriction: [PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU'), PhysicalDevice(name='/physical_device:GPU:1', device_type='GPU'), PhysicalDevice(name='/physical_device:GPU:2', device_type='GPU'), PhysicalDevice(name='/physical_device:GPU:3', device_type='GPU'), PhysicalDevice(name='/physical_device:GPU:4', device_type='GPU'), PhysicalDevice(name='/physical_device:GPU:5', device_type='GPU'), PhysicalDevice(name='/physical_devi

UnknownError: {{function_node __wrapped__IteratorGetNext_output_types_3_device_/job:localhost/replica:0/task:0/device:CPU:0}} Exception: 
Traceback (most recent call last):

  File "/home/michael.norman/gravyflow/gravyflow/src/dataset/dataset.py", line 234, in __next__
    onsource, offsource, gps_times = next(self.noise)
                                     ^^^^^^^^^^^^^^^^

  File "/home/michael.norman/gravyflow/gravyflow/src/dataset/noise/acquisition.py", line 1540, in get_onsource_offsource_chunks
    subarrays, background_chunks, start_gps_times = self.current_segment.random_subsection(
                                                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/home/michael.norman/gravyflow/gravyflow/src/dataset/noise/acquisition.py", line 452, in random_subsection
    return random_subsection(
           ^^^^^^^^^^^^^^^^^^

  File "/home/michael.norman/miniconda3/envs/gravyflow2/lib/python3.11/site-packages/tensorflow/python/util/traceback_utils.py", line 153, in error_handler
    raise e.with_traceback(filtered_tb) from None

  File "/local/michael.norman/__autograph_generated_filepgolk7nc.py", line 37, in tf__random_subsection
    ag__.for_stmt(ag__.converted_call(ag__.ld(zip), (ag__.ld(data), ag__.ld(start_gps_time)), None, fscope), None, loop_body, get_state, set_state, ('start_gps_time',), {'iterate_names': '(tensor_data, start_gps_time)'})

  File "/local/michael.norman/__autograph_generated_filepgolk7nc.py", line 27, in loop_body
    batch_subarrays, batch_background_chunks, subsections_start_gps_time = ag__.converted_call(ag__.ld(_random_subsection), (ag__.ld(tensor_data), ag__.ld(num_examples_per_batch), ag__.ld(num_onsource_samples), ag__.ld(num_offsource_samples), ag__.ld(time_interval_seconds), ag__.ld(start_gps_time), ag__.ld(seed)), None, fscope)
                                                                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

TypeError: in user code:

    File "/home/michael.norman/gravyflow/gravyflow/src/dataset/noise/acquisition.py", line 370, in random_subsection  *
        batch_subarrays, batch_background_chunks, subsections_start_gps_time = (

    TypeError: Binding inputs to tf.function failed due to `Can not cast TensorSpec(shape=(), dtype=tf.float64, name=None) to TensorSpec(shape=(), dtype=tf.float32, name=None)`. Received args: (<tf.Tensor 'data:0' shape=(29061939,) dtype=float32>, 32, 4096, 32768, <tf.Tensor 'time_interval_seconds:0' shape=() dtype=float32>, <tf.Tensor 'start_gps_time:0' shape=() dtype=float64>, <tf.Tensor 'seed:0' shape=(2,) dtype=int32>) and kwargs: {} for signature: (tensor_data: TensorSpec(shape=(None,), dtype=tf.float32, name=None), num_examples_per_batch: TensorSpec(shape=(), dtype=tf.int32, name=None), num_onsource_samples: TensorSpec(shape=(), dtype=tf.int32, name=None), num_offsource_samples: TensorSpec(shape=(), dtype=tf.int32, name=None), time_interval_seconds: TensorSpec(shape=(), dtype=tf.float32, name=None), start_gps_times: TensorSpec(shape=(), dtype=tf.float32, name=None), seed: TensorSpec(shape=(2,), dtype=tf.int32, name=None)).



During handling of the above exception, another exception occurred:


Traceback (most recent call last):

  File "/home/michael.norman/miniconda3/envs/gravyflow2/lib/python3.11/site-packages/tensorflow/python/ops/script_ops.py", line 269, in __call__
    ret = func(*args)
          ^^^^^^^^^^^

  File "/home/michael.norman/miniconda3/envs/gravyflow2/lib/python3.11/site-packages/tensorflow/python/autograph/impl/api.py", line 643, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^

  File "/home/michael.norman/miniconda3/envs/gravyflow2/lib/python3.11/site-packages/tensorflow/python/data/ops/from_generator_op.py", line 198, in generator_py_func
    values = next(generator_state.get_iterator(iterator_id))
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/home/michael.norman/gravyflow/gravyflow/src/dataset/dataset.py", line 237, in __next__
    raise Exception()

Exception


	 [[{{node PyFunc}}]] [Op:IteratorGetNext] name: 

If the previous cell failed it is most likely because you attempted to run it twice within the same kernel session. The kernal must be restarted in order to generate a fresh TensorFlow stratergy and recompile the model.

Finally, we can train the model with our generated dataset:

In [None]:
examples_per_epoch : int = int(1E5)
num_validation_examples : int = int(1E4)
num_testing_examples : int = int(1E4)

with gf.env(memory_to_allocate_tf=5000):
    history = model.fit(
        training_dataset,
        epochs=10,  # Number of epochs to train for
        steps_per_epoch=examples_per_epoch // gf.Defaults.num_examples_per_batch,
        validation_data=validation_dataset,
        validation_steps=num_validation_examples // gf.Defaults.num_examples_per_batch #Ensure this is set as dataset is uncapped
    )

    model.evaluate(
        testing_dataset, 
        steps=num_testing_examples // gf.Defaults.num_examples_per_batch
    )