In [4]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import savgol_filter
from magprime import UBSS, calculate_coupling_coefficients
import plotly.graph_objects as go
import sys
sys.path.append(r'C:\Users\ASUS\Desktop\ubss\MAGPRIME')

# File loading and preprocessing
def load_data(file_path):
    try:
        data = np.loadtxt(file_path, delimiter=',', dtype=float)
        if data.shape[1] != 12:
            raise ValueError("Data file must have 12 columns (X, Y, Z for 4 magnetometers).")
        return data
    except Exception as e:
        print(f"Error loading data: {e}")
        return None

# Visualization of raw data
def plot_raw_data(data):
    axes = ['X', 'Y', 'Z']
    for i in range(4):
        plt.figure(figsize=(12, 6))
        for j in range(3):
            plt.plot(data[:, i * 3 + j], label=f'Magnetometer {i + 1} - {axes[j]}')
        plt.title(f'Raw Data - Magnetometer {i + 1}')
        plt.xlabel('Time Index')
        plt.ylabel('Magnetic Field (nT)')
        plt.legend()
        plt.grid()
        plt.show()

# UBSS cleaning
def apply_ubss(data):
    try:
        ubss = UBSS(num_magnetometers=4)
        ubss_cleaned_data = ubss.fit_transform(data)
        return ubss_cleaned_data
    except Exception as e:
        print(f"Error during UBSS cleaning: {e}")
        return None

# Visualization of UBSS-cleaned data
def plot_ubss_cleaned_data(data):
    axes = ['X', 'Y', 'Z']
    for i in range(4):
        fig = go.Figure()
        for j in range(3):
            fig.add_trace(go.Scatter(
                y=data[:, i * 3 + j],
                mode='lines',
                name=f'{axes[j]} axis'
            ))
        fig.update_layout(
            title=f'UBSS-Cleaned Data - Magnetometer {i + 1}',
            xaxis_title='Time Index',
            yaxis_title='Magnetic Field (nT)',
            legend_title='Axes'
        )
        fig.show()

# Apply Savitzky-Golay filter
def apply_savgol_filter(data, window_length=51, polyorder=3):
    try:
        if window_length >= len(data) or window_length % 2 == 0:
            raise ValueError("Window length must be odd and less than the length of the data.")
        smoothed_data = savgol_filter(data, window_length, polyorder, axis=0)
        return smoothed_data
    except Exception as e:
        print(f"Error during Savitzky-Golay filtering: {e}")
        return None

# Comparison plot of smoothed vs raw data
def plot_smoothed_vs_raw(raw_data, smoothed_data):
    axes = ['X', 'Y', 'Z']
    for i in range(4):
        plt.figure(figsize=(12, 6))
        for j in range(3):
            plt.plot(raw_data[:, i * 3 + j], label=f'Raw {axes[j]}', alpha=0.5)
            plt.plot(smoothed_data[:, i * 3 + j], label=f'Smoothed {axes[j]}')
        plt.title(f'Smoothed vs Raw Data - Magnetometer {i + 1}')
        plt.xlabel('Time Index')
        plt.ylabel('Magnetic Field (nT)')
        plt.legend()
        plt.grid()
        plt.show()

# Main workflow
def main(file_path):
    sys.path.append(r'C:\Users\ASUS\Desktop\ubss\MAGPRIME')
    data = load_data(file_path)
    if data is None:
        return

    print("Plotting raw data...")
    plot_raw_data(data)

    print("Applying UBSS cleaning...")
    ubss_cleaned_data = apply_ubss(data)
    if ubss_cleaned_data is None:
        return

    print("Plotting UBSS-cleaned data...")
    plot_ubss_cleaned_data(ubss_cleaned_data)

    print("Applying Savitzky-Golay filter...")
    smoothed_data = apply_savgol_filter(ubss_cleaned_data)
    if smoothed_data is None:
        return

    print("Plotting smoothed vs raw data...")
    plot_smoothed_vs_raw(data, smoothed_data)

# Run the script
if __name__ == "__main__":
    sys.path.append(r'C:\Users\ASUS\Desktop\ubss\MAGPRIME')
    file_path = "nomagnetictorque_5min.txt"  # Replace with your actual file path
    main(file_path)


ModuleNotFoundError: No module named 'magprime'