In [None]:
import numpy as np
import scipy as sp
import pandas as pd
import matplotlib.pyplot as plt
from numpy import float64 as f64
import matplotlib as mpl
import csv
import os

In [None]:
# For interactive plots
%matplotlib widget
plt.rc('figure', autolayout=True)
plt.rc('savefig', dpi=600)
plt.rc('pgf', texsystem='lualatex')
plt.rc('font', family='serif')
plt.rc('text', usetex=True)
plt.rc('pgf', rcfonts=False)
plt.rc('pgf', preamble="\n".join([
    r"\usepackage{polyglossia}",
    r"\usepackage{fontspec}",
    r"\setmainfont{Liberation Serif}",
    r"\setsansfont{Liberation Sans}",
    r"\setmonofont{Liberation Mono}",
]))
mpl.use('pgf')

In [None]:
label_font = 12
markersize = 8

In [None]:
def get_ampl_calibration_(path: str) -> float:
    input_col = "CH1"
    sensor_col = "CH2"
    df = pd.read_csv(path)
    sensor_data = df[sensor_col].iloc[1:].astype(float)
    return (sensor_data.max() - sensor_data.min()) / 2


get_ampl_calibration = np.vectorize(get_ampl_calibration_)

In [None]:
pos_0 = (9.81 + 9.80) / 2.0
positions = (pd.read_csv("data/calibration/positions.csv").to_numpy() - pos_0).flatten()
start_index = 2
f = np.vectorize(lambda n: f"data/calibration/measurement{n}.csv")
file_names = f(np.arange(start_index, len(positions) + start_index))
amplitudes = get_ampl_calibration(file_names) * 1000.0

In [None]:
fig, ax = plt.subplots(1, 1, figsize=(4, 3))
ax.set_ylabel(f"$\Delta{{z}}$, мм")
ax.set_xlabel(f"$v_{{max}}$, мВ")

ax.get_yaxis().set_visible(True)
ax.get_xaxis().set_visible(True)

ax.grid(True, which="both")

result = sp.stats.linregress(amplitudes, positions)
k = result.slope
b = result.intercept

xs = np.linspace(0, amplitudes.max(), 2)
ys = xs * result.slope + result.intercept

ax.plot(xs, ys)
ax.scatter(amplitudes, positions, color="r", marker="x")

fig.savefig(f"drawings/calibration.pgf", backend='pgf')

In [None]:
print(f"A = {result.slope} * V + {result.intercept}")

In [None]:
def average(lst):
    return sum(lst) / len(lst)


def average_list(lst):
    av = average(lst)
    return [x - av for x in lst]

In [None]:
first_file_index = 11
end_file_index = 21
f = np.vectorize(lambda n: f"data/vertical/measurement{n}.csv")
file_names = f(np.arange(first_file_index, end_file_index))

In [None]:
av_zs = [6.63, 6.03, 5.43, 4.83, 4.23, 3.63, 3.03, 2.43, 1.83, 1.23]

In [None]:
def butter_lowpass(cutoff, fs, order=5):
    return sp.signal.butter(order, cutoff, fs=fs, btype='lowpass', output='sos')


def butter_lowpass_filter(data, cutoff, fs, order=5):
    sos = butter_lowpass(cutoff, fs, order=order)
    y = sp.signal.sosfilt(sos, data)
    return y

In [None]:
cutoff = 2000.0
sr = 1250000.0
n = 30000
t = n / sr

def filter_signal(sig):
    return butter_lowpass_filter(sig, cutoff, sr)

In [None]:
def find_peaks(sig):
    peak_distance = 500.0
    peaks, _ = sp.signal.find_peaks(sig, distance=peak_distance)
    return peaks

In [None]:
def plot_fft(file, us, us_filtered, offset):
    fig, ax = plt.subplots(1, 1, figsize=(5, 3.5))
    ax.set_ylabel(f"$\Delta{{z}}$, мм")
    ax.set_xlabel(f"$v_{{max}}$, мВ")
    
    ax.get_yaxis().set_visible(True)
    ax.get_xaxis().set_visible(True)
    
    ax.grid(True, which="both")

    def plot_one(sig, label):
        f, pxx_den = sp.signal.periodogram(sig, sr)
        ax.semilogy(f, pxx_den, label=label)

    plot_one(us, "Изначальный сигнал")
    plot_one(us_filtered, "После фильтра $f_{cutoff} = 2500$ Гц")
    
    ax.set_title(f"Спектр мощности при $z_0 = {round(offset, 2)}$ мм")
    ax.set_ylim([1e-7, 1e2])
    ax.set_xlim([0, 1e4])
    ax.set_xlabel('f, Гц')
    ax.set_ylabel('$PSD$, $V^2$/Гц')
    ax.legend()
    
    fig.savefig(f"{file}.pgf", backend='pgf')

In [None]:
all_fs = []
all_zs = []
for (offset, name) in zip(av_zs, file_names):
    df = pd.read_csv(name).iloc[1:]
    n_col = "X"
    u_col = "CH4"
    pos_col = "CH2"
    
    ns = df[n_col].astype(int).to_numpy()
    us = df[u_col].astype(float).to_numpy() * 1000.0 # mv
    ch2_millivolts = df[pos_col].astype(float).to_numpy() * 1000.0 # mv
    
    pos_rel = filter_signal(ch2_millivolts * k + b + offset)
    voltage_offset = np.average(us)
    us = -1.0 * (us - voltage_offset) # Offset and invert phase
    
    us_filtered = filter_signal(us)
    peaks_us = find_peaks(us_filtered)
    print(f"Found peaks of filtered signal: {peaks_us}")
    values = peaks_us[np.argsort(-us_filtered[peaks_us])][0:2]
    finish_idx, start_idx = values.max(), values.min()
    
    ns = np.arange(0, finish_idx - start_idx)
    us = us[start_idx:finish_idx]
    us_filtered = us_filtered[start_idx:finish_idx]
    us_filtered -= np.average(us_filtered) # To make integral periodic
    pos_rel = pos_rel[start_idx:finish_idx]
    
    get_integral_up_to = np.vectorize(lambda x: sp.integrate.simpson(us_filtered[:x + 1], dx=t))
    time_integral = get_integral_up_to(ns)
    peaks_integral = find_peaks(time_integral)
    peaks_pos = find_peaks(-pos_rel)
    diff_peaks = peaks_integral - peaks_pos
    roll_amount = int(np.average(diff_peaks)) 
    pos_rel = np.roll(pos_rel, roll_amount)
    print(f"Difference between peaks and thoughs of position and integral: {diff_peaks}")
    
    all_fs.append(time_integral)
    all_zs.append(pos_rel)

    plot_fft(f"drawings/fft/{os.path.basename(name)}", us, us_filtered, offset)

In [None]:
all_fs = list(reversed(all_fs))
all_zs = list(reversed(all_zs))
av_zs = list(reversed(av_zs))

for i in range(1, len(all_fs)):
    all_fs[-i - 1] += max(all_fs[-i]) - min(all_fs[-i - 1])
av = (max(all_zs[0]) + min(all_zs[0])) / 2
for i in range(len(all_zs)):
    all_zs[i] += av + av_zs[0]

In [None]:
all_fs_np = np.concatenate(all_fs)
all_zs_np = np.concatenate(all_zs)

exponent = lambda x, a, b: np.exp(a * x + b)
popt, pcov = sp.optimize.curve_fit(exponent, all_zs_np, all_fs_np)
zs = np.linspace(np.min(all_zs_np), np.max(all_zs_np), 10000)
(a, b) = popt
fit = np.exp(a * zs + b)

fig, ax = plt.subplots(1, 1, figsize=(4, 3))

ax.set_ylabel("$NL_{static}$, мВ с")
ax.set_xlabel("z, мм")

ax.get_yaxis().set_visible(True)
ax.get_xaxis().set_visible(True)

ax.grid(True, which="both")

ax.plot(zs, fit)
for (i, (fs, zs)) in enumerate(zip(all_fs, all_zs)):
    ax.plot(zs, fs, label=f"$z_0 = {round(av_zs[i], 2)}$ мм")

fig.savefig(f"drawings/flux_function_z_axis_fit.pgf", backend='pgf')