In [43]:
import os
import numpy as np
from sklearn.decomposition import PCA
import random
import matplotlib.pyplot as plt
import sys
sys.path.insert(1, '../src/')
from config import raw_data_path, univariate_data_path, processed_data_path
from scipy.signal import resample
from preprocessing_modules import EHGRecord, trim_target, filter_target, z_normalize_target, check_normalize_target, remove_records, z_normalize_signals



In [44]:
dataset = 'tpehgt'

In [45]:
import os
import numpy as np
from sklearn.decomposition import PCA

def univariate(dataset_name):
    data_dir = os.path.join(processed_data_path, dataset_name + "_preprocessed.npy")
    data = np.load(data_dir, allow_pickle=True)

    # Define the path to save the new dataset
    

    # Initialize a list for storing modified entries
    univariate_data = []
    retained_variances = []  # List to store retained variance for each entry

    for entry in data:
        # Copy the entry to preserve metadata
        new_entry = entry.copy()

        # Extract the signal matrix (shape: (599999, 6))
        signal_matrix = entry['signal']
        
        # Apply PCA to reduce from 6D to 1D
        pca = PCA(n_components=1)
        reduced_signal = pca.fit_transform(signal_matrix)  # Shape: (599999, 1)
        
        # Calculate retained variance
        retained_variance = np.sum(pca.explained_variance_ratio_)
        retained_variances.append(retained_variance)

        print(f"Retained Variance for {entry['record_name']}: {retained_variance:.6f}")
        
        # Flatten to (599999,) to retain time-series format
        new_entry['signal'] = reduced_signal.flatten()

        # Append modified entry to the new dataset
        univariate_data.append(new_entry)

    # Compute the average retained variance for the dataset
    avg_retained_variance = np.mean(retained_variances)
    print(f"\nAverage Retained Variance for {dataset_name}: {avg_retained_variance:.6f}")

    # Convert to a NumPy array and save
    
    return univariate_data

univariate_data = univariate(dataset)


Retained Variance for tpehgt_n001: 0.586030
Retained Variance for tpehgt_n002: 0.574200
Retained Variance for tpehgt_n003: 0.777624
Retained Variance for tpehgt_n004: 0.759304
Retained Variance for tpehgt_n005: 0.686037
Retained Variance for tpehgt_p001: 0.557237
Retained Variance for tpehgt_p002: 0.644650
Retained Variance for tpehgt_p003: 0.481346
Retained Variance for tpehgt_p004: 0.524668
Retained Variance for tpehgt_p005: 0.585139
Retained Variance for tpehgt_p006: 0.758221
Retained Variance for tpehgt_p007: 0.551946
Retained Variance for tpehgt_p008: 0.447605
Retained Variance for tpehgt_p009: 0.474526
Retained Variance for tpehgt_p010: 0.411020
Retained Variance for tpehgt_p011: 0.649345
Retained Variance for tpehgt_p012: 0.558731
Retained Variance for tpehgt_p013: 0.535757
Retained Variance for tpehgt_t001: 0.493252
Retained Variance for tpehgt_t002: 0.508303
Retained Variance for tpehgt_t003: 0.499541
Retained Variance for tpehgt_t004: 0.608306
Retained Variance for tpehgt_t00

In [46]:
def downsample_target(univariate_data, target_fs=20):
    """
    Downsamples all signals OF TARGET SET in univariate_data to the target frequency (default: 20Hz).
    
    Parameters:
    - univariate_data (list of dicts): Each entry should have 'metadata' with 'fs' (sampling rate) and 'signal'.
    - target_fs (int): The target sampling frequency (default is 20 Hz).
    
    Returns:
    - downsampled_data (list of dicts): Same structure as input but with downsampled signals.
    """
    downsampled_data = []
    
    for entry in univariate_data:
        original_fs = entry['fs']
        print('óriginaloriginal_fs', original_fs)
        signal = entry['signal']
        
        # Compute the new length after downsampling
        new_length = int(len(signal) * target_fs / original_fs)
        print(target_fs, original_fs)
        print(len(signal), new_length)
        
        # Resample the signal
        downsampled_signal = resample(signal, new_length)
        print('old signal lengt', len(signal))
        print('new length', len(downsampled_signal))
        # Store the downsampled signal with updated metadata
        downsampled_entry = {
            'fs': target_fs,
            'signal': downsampled_signal,
            'record_name': entry['record_name'],
            'preterm': entry['preterm']
        }
        downsampled_data.append(downsampled_entry)
    downsampled_data = z_normalize_target(downsampled_data)
    save_dir = os.path.join(univariate_data_path, dataset + "_univariate.npy")
    np.save(save_dir, np.array(downsampled_data, dtype=object))

    print(f"PCA transformation complete. Saved as '{save_dir}'.")
    
    return downsampled_data

def downsample_signal(univariate_data, target_fs=20):
    """
    Downsamples all signals in univariate_data to the target frequency (default: 20Hz).
    
    Parameters:
    - univariate_data (list of dicts): Each entry should have 'metadata' with 'fs' (sampling rate) and 'signal'.
    - target_fs (int): The target sampling frequency (default is 20 Hz).
    
    Returns:
    - downsampled_data (list of dicts): Same structure as input but with downsampled signals.
    """
    downsampled_data = []
    
    for entry in univariate_data:
        original_fs = entry['metadata']['fs']
        print('óriginaloriginal_fs', original_fs)
        signal = entry['signal']
        
        # Compute the new length after downsampling
        new_length = int(len(signal) * target_fs / original_fs)
        print(target_fs, original_fs)
        print(len(signal), new_length)
        
        # Resample the signal
        downsampled_signal = resample(signal, new_length)
        print('old signal lengt', len(signal))
        print('new length', len(downsampled_signal))
        # Store the downsampled signal with updated metadata
        downsampled_entry = {
            'metadata': {**entry['metadata'], 'fs': target_fs},
            'signal': downsampled_signal,
            'record_name': entry['record_name']
        }
        downsampled_data.append(downsampled_entry)
    
    downsampled_data = z_normalize_signals(downsampled_data)
    save_dir = os.path.join(univariate_data_path, dataset + "_univariate.npy")
    np.save(save_dir, np.array(downsampled_data, dtype=object))

    print(f"PCA transformation complete. Saved as '{save_dir}'.")
    
    return downsampled_data

if dataset == 'target':
    downsampled_data = downsample_target(univariate_data)
else: 
    downsampled_data = downsample_signal(univariate_data)
print(len(univariate_data[1]['signal']))
print(len(downsampled_data[1]['signal']))

# print(data[1])
print(downsampled_data[1])

óriginaloriginal_fs 20
20 20
32900 32900
old signal lengt 32900
new length 32900
óriginaloriginal_fs 20
20 20
32880 32880
old signal lengt 32880
new length 32880
óriginaloriginal_fs 20
20 20
32900 32900
old signal lengt 32900
new length 32900
óriginaloriginal_fs 20
20 20
33060 33060
old signal lengt 33060
new length 33060
óriginaloriginal_fs 20
20 20
32800 32800
old signal lengt 32800
new length 32800
óriginaloriginal_fs 20
20 20
33600 33600
old signal lengt 33600
new length 33600
óriginaloriginal_fs 20
20 20
33599 33599
old signal lengt 33599
new length 33599
óriginaloriginal_fs 20
20 20
33600 33600
old signal lengt 33600
new length 33600
óriginaloriginal_fs 20
20 20
33600 33600
old signal lengt 33600
new length 33600
óriginaloriginal_fs 20
20 20
33600 33600
old signal lengt 33600
new length 33600
óriginaloriginal_fs 20
20 20
33600 33600
old signal lengt 33600
new length 33600
óriginaloriginal_fs 20
20 20
33600 33600
old signal lengt 33600
new length 33600
óriginaloriginal_fs 20
20 20

In [47]:
save_dir = os.path.join(univariate_data_path, "ehgdb1_univariate.npy")
target_data = np.load(save_dir, allow_pickle=True)
print(len(target_data[0]['signal']))
print(target_data[0])
# import numpy as np
# import matplotlib.pyplot as plt

# data_dir = os.path.join(processed_data_path, dataset + "_preprocessed.npy")
# or_dat = np.load(data_dir, allow_pickle=True)
# # Select an instance
# instance = or_dat[1]  # Using the provided index 1
# print('original fs', instance['metadata']['fs'])
# record_name = instance['record_name']
# original_signals = instance['signal']  # Multivariate signals
# print(original_signals)
# fs = instance['metadata']['fs']

# # Find the corresponding univariate signal
# univariate_instance = univariate_data[1]
# print(instance['record_name'], univariate_instance['record_name'])
# univariate_signal = univariate_instance['signal']

# # Determine the number of channels
# num_channels = original_signals.shape[1]
# print(num_channels)
# sequence_length = original_signals.shape[0]

# # Create subplots (num_channels + 1 to include the univariate signal)
# fig, axs = plt.subplots(num_channels + 1, 1, figsize=(12, 2 * (num_channels + 1)), sharex=True)
# time_axis = np.arange(sequence_length) / fs  # Convert to seconds

# # Plot all original channels
# title = f"Original {num_channels}-Channel Signals & PCA-Reduced Univariate Signal ({record_name})"
# for i in range(num_channels):
#     axs[i].plot(time_axis, original_signals[:, i], label=f'Channel {i+1}', color='b', alpha=0.7)
#     axs[i].legend()
#     axs[i].set_ylabel("Amplitude")

# # Plot the univariate signal in a distinct color
# axs[num_channels].plot(time_axis, univariate_signal, label="Univariate Signal (PCA)", color='r')
# axs[num_channels].legend()
# axs[num_channels].set_ylabel("Amplitude")
# axs[num_channels].set_xlabel("Time (seconds)")

# fig.suptitle(title)
# plt.tight_layout()
# plt.show()


7600
{'record_name': 'ice001_l_1of1', 'signal': array([[-1.7358303 ],
       [-0.30347557],
       [-0.40749874],
       ...,
       [-3.09738299],
       [-2.90981482],
       [-3.22768386]]), 'metadata': {'fs': 20, 'sig_len': 100000, 'n_sig': 16, 'base_date': None, 'base_time': None, 'units': ['mV', 'mV', 'mV', 'mV', 'mV', 'mV', 'mV', 'mV', 'mV', 'mV', 'mV', 'mV', 'mV', 'mV', 'mV', 'mV'], 'comments': ['Info:', 'ID:ice001', 'Record type:labour', 'Record number:1/1', 'Age(years):31', 'BMI before pregnancy:23.3', 'BMI at recording:27.6', 'Gravidity:3', 'Parity:2', 'Previous caesarean:No', 'Placental position:Fundus', 'Gestational age at recording(w/d):39/3', 'Gestational age at delivery:39/3', 'Mode of delivery:Vaginal', 'Synthetic oxytocin use in labour:No', 'Epidural during labour:No', 'Comments for recording:', 'Electrodes placed 5-10 mins prior to beginning of recording.', 'Baby born 20 minutes after the end of the recording.']}}


In [48]:
# import numpy as np
# import matplotlib.pyplot as plt

# # Select the downsampled signal
# downsampled_signal = downsampled_data[1]['signal']
# record_name = downsampled_data[1]['record_name']

# # Create time axis assuming uniform sampling
# downsampled_fs = 20  # The new sampling frequency
# downsampled_time_axis = np.arange(len(downsampled_signal)) / downsampled_fs

# # Plot the downsampled signal
# plt.figure(figsize=(12, 4))
# plt.plot(downsampled_time_axis, downsampled_signal, label="Downsampled Signal", color='g')
# plt.xlabel("Time (seconds)")
# plt.ylabel("Amplitude")
# plt.title(f"Downsampled Signal ({record_name}) at 20Hz")
# plt.legend()
# plt.show()


In [49]:
# import numpy as np
# import matplotlib.pyplot as plt
# import os
# import random

# # Load dataset
# data_path = os.path.join(univariate_data_path, dataset + "_univariate.npy")
# data = np.load(data_path, allow_pickle=True)

# # Select a random instance for visualization
# random_instance = random.choice(data)
# univariate_signal = random_instance['signal']  # Univariate after PCA

# # Load the original dataset before PCA
# original_data_path = os.path.join(processed_data_path, dataset + "_preprocessed.npy")
# original_data = np.load(original_data_path, allow_pickle=True)

# # Find the corresponding original signal
# record_name = random_instance['record_name']
# original_instance = next(entry for entry in original_data if entry['record_name'] == record_name)
# original_multivariate_signal = original_instance['signal']  # Shape: (sequence_length, num_channels)

# num_channels = original_multivariate_signal.shape[1]  

# # Create subplots (num_channels + 1 to include the univariate signal)
# fig, axs = plt.subplots(num_channels + 1, 1, figsize=(12, 2 * (num_channels + 1)), sharex=True)

# time_axis = np.arange(original_multivariate_signal.shape[0])

# # Plot all channels dynamically
# for i in range(num_channels):
#     axs[i].plot(time_axis, original_multivariate_signal[:, i], label=f'Channel {i+1}', color='b', alpha=0.7)
#     axs[i].legend()
#     axs[i].set_ylabel("Amplitude")

# # Plot the univariate signal in a distinct color
# axs[num_channels].plot(time_axis, univariate_signal, label="Univariate Signal (PCA)", color='r')
# axs[num_channels].legend()
# axs[num_channels].set_ylabel("Amplitude")
# axs[num_channels].set_xlabel("Time")

# fig.suptitle(f"Original {num_channels}-Channel Signal & PCA-Reduced Univariate Signal")
# plt.tight_layout()
# plt.show()
