<a href="https://colab.research.google.com/github/shufan6011/ML-Projects/blob/main/Step_4_CNNs_for_GW.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Update:
# Use GPU instead of CPU for model training

# Data Preprocessing

In [None]:
import numpy as np
import pandas as pd
import requests, os
import matplotlib.pyplot as plt
from scipy.signal import butter, filtfilt, spectrogram
from sklearn.preprocessing import StandardScaler
import tensorflow as tf
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Conv2D, MaxPooling2D, Flatten, Dense, Dropout
from sklearn.model_selection import train_test_split


In [None]:
# Go to https://gwosc.org
# Find the information required below (GPS time & detector)


In [None]:
# Set a GPS time:
t_start = 1126259462.4
t_end = 1126259462.4 # For specific events, make t_end the same as t_start

# Choose detector as H1, L1, or V1
detector = 'H1'


In [None]:
%config InlineBackend.figure_format = 'retina'

try:
    from gwpy.timeseries import TimeSeries
except:
    ! pip install -q "gwpy==3.0.8"
    ! pip install -q "matplotlib==3.9.0"
    ! pip install -q "astropy==6.1.0"
    from gwpy.timeseries import TimeSeries


In [None]:
from gwosc.locate import get_urls
url = get_urls(detector, t_start, t_end)[-1]

print('Downloading: ' , url)
fn = os.path.basename(url)
with open(fn,'wb') as strainfile:
    straindata = requests.get(url)
    strainfile.write(straindata.content)


In [None]:
# Read strain data
strain = TimeSeries.read(fn,format='hdf5.gwosc')

# Examine an interval of the event closely
# center = int(t_start)
# strain = strain.crop(center-0.2, center+0.1)

# Extract timestamps and strain values
timestamps = strain.times.value
strain_values = strain.value

# Store the data in a Pandas DataFrame
data = pd.DataFrame({
    'time': timestamps,
    'strain': strain_values
})


## Handling Missing Values

In [None]:
# Drop rows with missing values
data = data.dropna()

print("\nMissing values after cleaning:")
print(data.isnull().sum())


## Data Noise Filtering

In [None]:
# Band-pass filter function
def butter_bandpass(lowcut, highcut, fs, order=5):
    try:
        nyq = 0.5 * fs
        low = lowcut / nyq
        high = highcut / nyq
        b, a = butter(order, [low, high], btype='band')
        return b, a
    except Exception as e:
        print(f"An error occurred: {e}")
        return []

def bandpass_filter(data, lowcut, highcut, fs, order=5):
    try:
        b, a = butter_bandpass(lowcut, highcut, fs, order=order)
        y = filtfilt(b, a, data)
        return y
    except Exception as e:
        print(f"An error occurred: {e}")
        return []

# Filter parameters
lowcut = 20  # Low cutoff frequency (Hz)
highcut = 500  # High cutoff frequency (Hz)

# Apply band-pass filter to the strain data
data['strain'] = bandpass_filter(data['strain'], lowcut, highcut, 4096)


## Data Normalization

In [None]:
# Normalize the filtered strain data
scaler = StandardScaler()
data['strain'] = scaler.fit_transform(data[['strain']])


## Data Inspection

In [None]:
# Inspect the first few rows
print("First few rows of the data:")
print(data.head())

# Inspect col headers
print("\nCol headers:")
print(data.columns)

# Summary stats
print("\nSummary stats:")
print(data.describe())

# Check for missing vals
print("\nMissing vals in each col:")
print(data.isnull().sum())

# Check the sampling frequency
print(f"\nSampling frequency: {strain.sample_rate} Hz")
fs = 4096 # Change this if sampling frequency is different


# 1D CNN

## Segment Labeling

In [None]:
# Convert the preprocessed data back to original strain
scaler.inverse_transform(data)

# Window Size
window_size = 2  # in seconds, adjust as needed

# Define event time and window half-length
event_time = t_start  # Adjust as needed\

# Create segments and labels
segments = []
labels = []

for i in range(0, len(strain) - int(window_size * fs), int(window_size * fs)):
    segment = strain[i:i + int(window_size * fs)]
    segments.append(segment.value)

    # Label based on event presence
    if (segment.times.value[0] <= event_time <= segment.times.value[-1]):
        labels.append(1)  # Event present
    else:
        labels.append(0)  # No event

# Convert to numpy arrays
segments = np.array(segments)
labels = np.array(labels)

print(f"Segments shape: {segments.shape}")
print(f"Labels shape: {labels.shape}")

# Verify segments and labels
print("First few segments:")
print(segments[:2])


## Data Preparation (time-series data)

In [None]:
# Reshape segments for 1D CNN
segments = segments.reshape((segments.shape[0], segments.shape[1], 1))

print(f"Reshaped segments shape: {segments.shape}")


In [None]:
# Split the data into training and test sets
X_train, X_test, y_train, y_test = train_test_split(segments, labels, test_size=0.2, random_state=42)

## Data Augmentation

In [None]:
def augment_data(data, labels):
    try:
        augmented_data = []
        augmented_labels = []
        for d, l in zip(data, labels):
            augmented_data.append(d)
            augmented_labels.append(l)
            augmented_data.append(np.flip(d, axis=0))
            augmented_labels.append(l)
            noise = np.random.normal(0, 0.1, d.shape)
            augmented_data.append(d + noise)
            augmented_labels.append(l)
        return np.array(augmented_data), np.array(augmented_labels)
    except Exception as e:
        print(f"An error occurred: {e}")
        return []

X_train_aug, y_train_aug = augment_data(X_train, y_train)

print(f"Original training data shape: {X_train.shape}")
print(f"Augmented training data shape: {X_train_aug.shape}")


## Model Training & Evaluation

In [None]:
# Build the 1D CNN model
model = Sequential([
    Conv1D(16, 3, activation='relu', input_shape=(X_train_aug.shape[1], 1)),
    MaxPooling1D(2),
    Conv1D(32, 3, activation='relu'),
    MaxPooling1D(2),
    Flatten(),
    Dense(64, activation='relu'),
    Dropout(0.5),
    Dense(1, activation='sigmoid')
])

# Compile the model
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
model.summary()

# Train the model
history = model.fit(X_train_aug, y_train_aug, epochs=20, batch_size=256, validation_data=(X_test, y_test))

# Evaluate the model
loss, accuracy = model.evaluate(X_test, y_test)
print(f"Test Loss: {loss}, Test Accuracy: {accuracy}")

# Plot training history
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(history.history['loss'], label='Loss')
plt.plot(history.history['val_loss'], label='Val Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.title('Training and Validation Loss')

plt.subplot(1, 2, 2)
plt.plot(history.history['accuracy'], label='Accuracy')
plt.plot(history.history['val_accuracy'], label='Val Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.title('Training and Validation Accuracy')

plt.show()


## Final Product

In [None]:
# Save the model (look in the left panel)
model.save('gw_1d_cnn.h5')

# Load the model (for verification)
loaded_model = tf.keras.models.load_model('gw_1d_cnn.h5')

# Verify the loaded model by evaluating it on the test set
loaded_loss, loaded_accuracy = loaded_model.evaluate(X_test, y_test)
print(f"Loaded Model Test Loss: {loaded_loss}, Loaded Model Test Accuracy: {loaded_accuracy}")


# 2D CNN

## Segment Labeling

In [None]:
# Create segments and labels
segments = []
labels = []

for i in range(0, len(strain) - int(window_size * fs), int(window_size * fs)):
    segment = strain[i:i + int(window_size * fs)]
    segments.append(segment.value)

    # Label based on event presence
    if (segment.times.value[0] <= event_time <= segment.times.value[-1]):
        labels.append(1)  # Event present
    else:
        labels.append(0)  # No event

# Convert to numpy arrays
segments = np.array(segments)
labels = np.array(labels)

print(f"Segments shape: {segments.shape}")
print(f"Labels shape: {labels.shape}")

# Verify segments and labels
print("First few segments:")
print(segments[:2])


## Data Preparation (spectrograms)

In [None]:
# Generate spectrograms for each segment
def generate_spectrogram(segment, sample_rate):
    try:
        f, t, Sxx = spectrogram(segment, sample_rate)
        return Sxx
    except Exception as e:
        print(f"An error occurred: {e}")
        return []

spectrograms = np.array([generate_spectrogram(segment, fs) for segment in segments])
print(f"Spectrograms shape: {spectrograms.shape}")

# Reshape spectrograms for 2D CNN
spectrograms = spectrograms[..., np.newaxis]  # Add a channel dimension
print(f"Reshaped spectrograms shape: {spectrograms.shape}")

In [None]:
# Split the data into training and test sets
X_train, X_test, y_train, y_test = train_test_split(spectrograms, labels, test_size=0.2, random_state=42)


## Data Augmentation

In [None]:
def augment_data(data, labels):
    try:
        augmented_data = []
        augmented_labels = []
        for d, l in zip(data, labels):
            augmented_data.append(d)
            augmented_labels.append(l)
            augmented_data.append(np.flip(d, axis=0))
            augmented_labels.append(l)
            noise = np.random.normal(0, 0.1, d.shape)
            augmented_data.append(d + noise)
            augmented_labels.append(l)
        return np.array(augmented_data), np.array(augmented_labels)
    except Exception as e:
        print(f"An error occurred: {e}")
        return []

X_train_aug, y_train_aug = augment_data(X_train, y_train)

print(f"Original training data shape: {X_train.shape}")
print(f"Augmented training data shape: {X_train_aug.shape}")


## Model Training & Evaluation

In [None]:
# Build the 2D CNN model
model = Sequential([
    Conv2D(16, (3, 3), activation='relu', input_shape=(X_train_aug.shape[1], spectrograms.shape[2], 1)),
    MaxPooling2D((2, 2)),
    Conv2D(32, (3, 3), activation='relu'),
    MaxPooling2D((2, 2)),
    Flatten(),
    Dense(64, activation='relu'),
    Dropout(0.5),
    Dense(1, activation='sigmoid')
])

# Compile the model
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
model.summary()

# Train the model
history = model.fit(X_train_aug, y_train_aug, epochs=20, batch_size=256, validation_data=(X_test, y_test))

# Evaluate the model
loss, accuracy = model.evaluate(X_test, y_test)
print(f"Test Loss: {loss}, Test Accuracy: {accuracy}")

# Plot training history
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(history.history['loss'], label='Loss')
plt.plot(history.history['val_loss'], label='Val Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.title('Training and Validation Loss')

plt.subplot(1, 2, 2)
plt.plot(history.history['accuracy'], label='Accuracy')
plt.plot(history.history['val_accuracy'], label='Val Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.title('Training and Validation Accuracy')

plt.show()


## Final Product

In [None]:
# Save the model (look in the left panel)
model.save('gw_2d_cnn.h5')

# Load the model (for verification)
loaded_model = tf.keras.models.load_model('gw_2d_cnn.h5')

# Verify the loaded model by evaluating it on the test set
loaded_loss, loaded_accuracy = loaded_model.evaluate(X_test, y_test)
print(f"Loaded Model Test Loss: {loaded_loss}, Loaded Model Test Accuracy: {loaded_accuracy}")
