In [1]:
%matplotlib qt
import pandas as pd
import numpy as np
import seaborn as sns
from scipy.optimize import curve_fit
import matplotlib.pyplot as plt
import os
import warnings
warnings.filterwarnings("ignore")

In [2]:
def txt_to_df(data_files, freqs):
    dfs = {}
    for freq, filename in zip(freqs, data_files):
        # print(f"Reading {freq}, {filename}...")
        with open(filename, 'r') as file:
            content = file.read()
            content = content.replace('"', '')
        
        with open(filename, 'w') as file:
            file.write(content)

        df = pd.read_csv(filename, delimiter=',')
    
        # print("Number of NaNs in each column:")
        # print(df.isnull().sum())
        df = df.dropna()
        
        df['Error'] = df[' Error']
        df['Target angle'] = df[' Target angle']
        df.drop(columns=[' Error', ' Target angle'], inplace=True)

        for col in df.columns:
            try:
                df[col] = pd.to_numeric(df[col], errors='raise')
            except ValueError as e:
                # print(f"\nOh Oh! Conversion failed in column: {col}, Error: {e}\n")
                df[col] = pd.to_numeric(df[col], errors='coerce')

        # print("Number of NaNs in each column:")
        # print(df.isnull().sum())
        df = df.dropna()
        
        dfs[freq] = df

    return dfs

In [3]:
# def get_rms_amps(data):
#     rms_target = np.mean(data["Target angle"]**2)**0.5
#     rms_error = np.mean(data["Error"]**2)**0.5
#     return rms_target, rms_error

In [4]:
# def sine_wave(t, A, phase, offset, freq):
#     return A * np.sin(2 * np.pi * freq * t + phase) + offset
# def cos_wave(t, A, phase, offset, freq):
#     return A * np.cos(2 * np.pi * freq * t + phase) + offset

In [5]:
# Two Perfect Tracking Methods
def perfect_tracking_30(data, freq, visualise=False):
    '''Using the set 30 degrees as the wave amplitude'''
    angle_range = 60
    cycles = 10 if freq > 0.2 or freq in [0.01, 0.1] else 5
    points_per_wave = 500
    total_points = cycles * points_per_wave
    t_perfect = np.linspace(0, cycles / freq, total_points)
    perfect_wave = (angle_range/2) * np.cos(2 * np.pi * freq * t_perfect)

    if visualise:
        time_s = pd.to_numeric(data['Run time'], errors='raise') / 1000
        plt.figure()
        plt.plot(t_perfect, perfect_wave, label='Perfect Wave')
        plt.plot(time_s, data['Target angle'], label='Target Angle')
        plt.title(f'{freq}Hz')
        plt.legend()

    return perfect_wave, 30/(2**0.5)

def pefect_tracking_error(data, freq, visualise=False):
    '''Summing the error and target angle'''
    perfect_wave = data['Target angle'] + data['Error']

    if visualise:
        time_s = pd.to_numeric(data['Run time'], errors='raise') / 1000
        plt.figure()
        plt.plot(time_s, perfect_wave, label='Perfect Wave')
        plt.plot(time_s, data['Target angle'], label='Target Angle')
        plt.title(f'{freq}Hz')
        plt.legend()

    return perfect_wave, np.mean(perfect_wave**2)**0.5


In [6]:
def phase_curve_fit(data, freq, perfect_wave, visualise=False):
    '''Uses the curve fit method to obtain  phase - Can visualise'''
    time_s = pd.to_numeric(data['Run time'], errors='raise') / 1000

    params_target, _ = curve_fit(
        lambda t, A, phase, offset: sine_wave(t, A, phase, offset, freq), time_s, data['Target angle'], p0=[30, 0, 0]
        )
    A_target, phase_target, offset_target = params_target
    target_fit = sine_wave(time_s, A_target, phase_target, offset_target, freq)

    cycles = 10 if freq > 0.2 or freq in [0.01, 0.1] else 5
    points_per_wave = 500
    total_points = cycles * points_per_wave
    t_perfect = np.linspace(0, cycles / freq, total_points)

    params_perfect, _ = curve_fit(
        lambda t, A, phase, offset: sine_wave(t, A, phase, offset, freq), t_perfect, perfect_wave, p0=[30, 0, 0]
        )
    A_perfect, phase_perfect, offset_perfect = params_perfect
    perfect_fit = sine_wave(t_perfect, A_perfect, phase_perfect, offset_perfect, freq)

    phase_diff_rad = phase_target - phase_perfect
    phase_diff_deg = phase_diff_rad * (180 / np.pi)

    if visualise:
        fig, ax = plt.subplots(1, 2, figsize=(10,6))
        ax[0].plot(time_s, data['Target angle'])
        ax[0].plot(time_s, target_fit, linestyle='--')
        ax[0].set_title("Target Wave")

        ax[1].plot(time_s, perfect_wave)
        ax[1].plot(time_s, perfect_fit, linestyle='--')
        ax[1].set_title("Perfect Wave")

        fig.suptitle(f"{freq}Hz")
    
    return phase_diff_deg

def phase_cc(data, freq, perfect_wave):
    '''Using Cross Correlation. 
    Uses an estimate sampling frequency of 8Hz based on raw data'''
    fs = 8

    correlation = np.correlate(data['Target angle'], perfect_wave, mode='full')
    lag = np.argmax(correlation) - len(data['Target angle'] - 1)
    time_delay = lag / fs
    phase_diff = (time_delay * freq * 2 * np.pi) % (2 * np.pi)
    phase_diff_deg = np.degrees(phase_diff)

    return phase_diff_deg

def phase_fft(data, freq, perfect_wave):
    '''Phase Angle Method: Using Discrete Fourier Transform for phase.
    Uses an estimate sampling frequency of 8Hz based on raw data'''
    fs = 8
    time_s = pd.to_numeric(data['Run time'], errors='raise') / 1000

    fft1 = np.fft.fft(data['Target angle'])
    fft2 = np.fft.fft(perfect_wave)

    ff = np.fft.fftfreq(len(time_s), 1/fs)
    target_freq_index = np.argmin(np.abs(ff-freq))
    phase1 = np.angle(fft1[target_freq_index])
    phase2 = np.angle(fft2[target_freq_index])

    phase_diff = phase1 - phase2
    phase_diff_rad = np.arctan2(np.sin(phase_diff), np.cos(phase_diff))
    phase_diff_deg = np.degrees(phase_diff_rad)

    return phase_diff_deg



In [7]:
def cos_wave(t, A, phase, offset, freq):
    return A * np.cos(2 * np.pi * freq * t + phase) + offset

def get_rms_amps(data, freq, ax_target=None, ax_error=None, plot=False):
    assert (plot and (ax_target or ax_error)) or not plot, "Please provide axes for plotting"

    time_s = pd.to_numeric(data['Run time'], errors='raise') / 1000
    error = data['Error']
    skip_time = (1 / freq) / 2
    start_index = np.argmax(time_s > skip_time)
    time_filtered = time_s[start_index:]
    target_angle_filtered = data['Target angle'][start_index:]

    params_target, _ = curve_fit(
        lambda t, A, phase, offset: cos_wave(t, A, phase, offset, freq),
        time_filtered, target_angle_filtered, p0=[30, 0, 0]
    )
    A_target, phase_target, offset_target = params_target
    target_fit = cos_wave(time_filtered, A_target, phase_target, offset_target, freq)

    angle_range = 60
    cycles = 10 if freq > 0.2 or freq in [0.01, 0.1] else 5
    points_per_wave = 500
    total_points = cycles * points_per_wave
    t_perfect = np.linspace(0, time_s[start_index] + cycles / freq, total_points)
    t_axis = np.linspace(time_s[start_index], time_s[start_index] + cycles / freq, total_points)

    # Perfect wave starts at 30 degrees (peak of cosine wave with phase 0)
    perfect_wave = (angle_range / 2) * np.cos(2 * np.pi * freq * t_perfect)

    # params_perfect, _ = curve_fit(
    #     lambda t, A, phase, offset: cos_wave(t, A, phase, offset, freq),
    #     t_perfect, perfect_wave, p0=[30, 0, 0]
    # )
    # A_perfect, phase_perfect, offset_perfect = params_perfect
    # perfect_fit = cos_wave(t_perfect, A_perfect, phase_perfect, offset_perfect, freq)

    # phase_diff_rad = phase_target - phase_perfect
    # phase_diff_deg = phase_diff_rad * (180 / np.pi)

    if plot and ax_target:
        ax_target.plot(time_s, data['Target angle'], label='Target Angle')
        ax_target.plot(time_filtered, target_fit, linestyle='--', label='Target Fit')
        ax_target.plot(t_axis, perfect_wave, label='Perfect Wave', alpha=0.8)
        ax_target.set_title(f"Target Angle at {freq}Hz", fontsize=6)
        ax_target.tick_params(axis="both", labelsize=6)
        # ax_target.legend(fontsize=6)

    if plot and ax_error:
        ax_error.plot(time_s, error, label='Error')
        ax_error.set_title(f"Error at {freq}Hz", fontsize=6)
        ax_error.tick_params(axis="both", labelsize=6)
        # ax_error.legend(fontsize=6, loc='lower right')

    rms_target = np.sqrt(np.mean(data['Target angle'] ** 2))
    rms_error = np.sqrt(np.mean(error ** 2))

    return rms_target, rms_error, phase_diff_deg



In [8]:
def plot_rms(rms_targets, rms_errors, freqs, save_suff=""):
    fig, ax = plt.subplots()
    ax.plot(freqs, rms_targets, label='Target RMS', linestyle='-', marker='x')
    ax.plot(freqs, rms_errors, label='Error RMS', linestyle='-', marker='x')
    ax.axvline(x=0.15, color='r', linestyle='--')
    ax.set_xlabel('Frequency (Hz)', fontsize=20)
    ax.set_ylabel('Amplitude (degrees)', fontsize=20)
    ax.set_title('(a) RMS Amplitude vs Frequency', fontsize=20)
    ax.tick_params(axis="both", labelsize=20)
    ax.legend(fontsize=20, loc='upper right')
    # plt.grid()
    plt.tight_layout()
    plt.savefig(f"figures/RMS_vs_f{save_suff}.pdf")
    plt.show()

In [9]:
def bode_plot(gains, phases, freqs, save_suff=""):
    gains = gains[0]
    # gain_db = 20 * np.log10(gains)
    plt.figure()

    # Magnitude plot
    plt.plot(freqs, gains, 'x-')
    plt.axvline(x=0.15, color='r', linestyle='--')
    plt.title("(b) Gain vs Frequency", fontsize=20)
    plt.ylabel("Gain", fontsize=20)
    plt.xlabel("Frequency (Hz)", fontsize=20)
    plt.tick_params(axis="both", labelsize=20)
    # plt.grid()
    plt.tight_layout()
    plt.savefig(f"figures/gain_vs_f{save_suff}.pdf")

    plt.figure()
    plt.axvline(x=0.15, color='r', linestyle='--')
    plt.plot(freqs, phases, 'x-')
    plt.title("(c) Phase vs Frequency", fontsize=20)
    plt.ylabel("Phase (degrees)", fontsize=20)
    plt.xlabel("Frequency (Hz)", fontsize=20)
    plt.tick_params(axis="both", labelsize=20)
    # plt.grid()
    plt.tight_layout()
    plt.savefig(f"figures/phase_vs_f{save_suff}.pdf")

In [10]:
def compare_pid(data1, data2):
    gains, phases, freqs = data1
    new_gains, new_phases, new_freqs = data2
    gains = gains[0]
    new_gains = new_gains[0]
    freqs = freqs[2:]
    gains = gains[2:]
    phases = phases[2:]

    plt.figure()
    # Magnitude plot
    plt.plot(new_freqs, new_gains, 'x-', label='Adjusted PID')
    plt.plot(freqs, gains, linestyle='--', color="grey", label='Default PID')
    plt.axvline(x=0.4, color='r', linestyle='--')
    # plt.semilogx(freqs, gains, linestyle='--', color="grey", label='Default PID')
    plt.title("(a) Gain vs Frequency", fontsize=16)
    plt.ylabel("Gain", fontsize=16)
    plt.xlabel("Frequency (Hz)", fontsize=16)
    plt.legend(fontsize=16)
    plt.tick_params(axis="both", labelsize=16)
    # plt.grid()
    plt.tight_layout()
    plt.savefig("figures/compare_gain.pdf")

    plt.figure()
    plt.plot(new_freqs, new_phases, 'x-', label='Adjusted PID')
    plt.plot(freqs, phases, linestyle='--', color="grey", label='Default PID')
    plt.axvline(x=0.4, color='r', linestyle='--')
    # plt.semilogx(freqs, phases, linestyle='--', color="grey", label='Default PID')
    plt.title("(b) Phase vs Frequency", fontsize=16)
    plt.ylabel("Phase (degrees)", fontsize=16)
    plt.xlabel("Frequency (Hz)", fontsize=16)
    plt.legend(fontsize=16)
    plt.tick_params(axis="both", labelsize=16)
    # plt.grid()
    plt.tight_layout()
    plt.savefig("figures/compare_phase.pdf")


In [11]:
def plot_data_distribution(data):
    frequencies_of_interest = [0.03, 0.05, 0.07, 0.09, 0.11, 0.13, 0.15, 0.17, 0.19, 0.2, 0.3, 0.4, 0.5, 0.6]
    data = {key: value for key, value in data.items() if key in frequencies_of_interest}

    df = pd.concat([
        pd.DataFrame({'frequency': freq, 'value': df['Target angle']})
        for freq, df in data.items()
    ], ignore_index=True)

    means = {freq: (df['Target angle']).mean() for freq, df in data.items()}

    plt.figure(figsize=(8, 6))
    ax = sns.boxplot(data=df, x='frequency', y='value', width=0.5, palette='pastel')

    xtick_positions = [i for i, freq in enumerate(frequencies_of_interest) if freq in means]
    mean_values = [means[freq] for freq in frequencies_of_interest if freq in means]

    plt.plot(xtick_positions, mean_values, marker='o', linestyle='-', color='k', label='Mean', alpha=0.6)

    plt.axhline(0, color='gray', linestyle='--', linewidth=0.8)
    plt.title('Target Angle Distributions by Frequency', fontsize=14)
    # plt.tick_params(axis="both", labelsize=20)
    plt.ylabel('Target Angle (degrees)', fontsize=14)
    plt.xlabel('Frequency (Hz)', fontsize=14)
    
    plt.legend(fontsize=14)

In [12]:
def plot_select_target_angles(data):
    d1 = data[0.03]['Target angle']
    d2 = data[0.4]['Target angle']

    plt.figure()
    plt.title('Target Angle at 0.03Hz', fontsize=20)
    plt.xlabel('Time (s)', fontsize=20)
    plt.ylabel('Target Angle (degrees)', fontsize=20)
    plt.tick_params(axis="both", labelsize=20)
    plt.plot(d1, label='0.03Hz')
    plt.tight_layout()

    plt.figure()
    plt.title('Target Angle at 0.4Hz', fontsize=20)
    plt.xlabel('Time (s)', fontsize=20)
    plt.ylabel('Target Angle (degrees)', fontsize=20)
    plt.tick_params(axis="both", labelsize=20)
    plt.plot(d2, label='0.4Hz')
    plt.tight_layout()

In [13]:
def run(data_files, freqs, save_suff="idk"):
    plt.close('all')
    assert len(data_files) == len(freqs), "Bro, please"

    data = txt_to_df(data_files, freqs)
    rms_targets = []
    rms_errors = []
    phase_diffs = []
    gains = []
    fig, axs = plt.subplots(int(np.ceil(len(freqs)/2)), 4, figsize=(12, 3 * len(freqs)))

    for i, (freq, df) in enumerate(data.items()):
        row = i // 2
        col = i % 2
        if i % 2 == 1:
            col += 1

        # rms_target = get_rms_amps
        # perfect_wave, rms_perf = perfect_tracking_30(df, freq, visualise=True)
        # # perfect_wave, rms_perf = pefect_tracking_error(df, freq, visualise=False)
        # phase_diff = phase_curve_fit(df, freq, perfect_wave, visualise=False)
        # # phase_diff = phase_cc(df, freq, perfect_wave)
        # # phase_diff = phase_fft(df, freq, perfect_wave)
        # rms_target, rms_error = get_rms_amps(df)
        
        rms_target, rms_error, phase_diff = get_rms_amps(df, float(freq), axs[row, col], axs[row, col+1], plot=True)
        # rms_target, rms_error, phase_diff = get_rms_amps(df, float(freq))
        # rms_targets.append(rms_target)
        # rms_errors.append(rms_error)
        phase_diffs.append(phase_diff)
        # gains.append(rms_target/rms_perf)

    plt.tight_layout()
    # plot_rms(rms_targets, rms_errors, freqs, save_suff)
    # gains = [np.array(rms_targets)/(30/2**0.5)]
    # bode_plot(gains, phase_diffs, freqs, save_suff)
    # plot_data_distribution(data)
    # plot_select_target_angles(data)

    return gains, phase_diffs, freqs

# Obtaining and sorting data files by frequency
unsorted_data_files = os.listdir("data/stock_pid")
unsorted_freqs = [float(f.split('_')[1].split('Hz')[0]) for f in unsorted_data_files]
combined = [(freq, file) for freq, file in sorted(zip(unsorted_freqs, unsorted_data_files))]
freqs, data_files = zip(*combined)
data_files = [f"data/stock_pid/{f}" for f in data_files]

# Better PID: Obtaining and sorting data files by frequency 
new_unsorted_data_files = os.listdir("data/better_pid")
new_unsorted_freqs = [float(f.split('_')[-1].split('Hz')[0]) for f in new_unsorted_data_files]
new_combined = [(freq, file) for freq, file in sorted(zip(new_unsorted_freqs, new_unsorted_data_files))]
new_freqs, new_data_files = zip(*new_combined)
new_data_files = [f"data/better_pid/{f}" for f in new_data_files]

output = run(data_files, freqs, save_suff="del")
# print("\nNEW PID")
# new_output = run(new_data_files, new_freqs, save_suff="_adj")
# compare_pid(output, new_output)

NameError: name 'phase_diff_deg' is not defined