# Calculations of oscillation frequncies

## Import Python packages

In [1]:
import numpy as np
import pandas as pd
import os
from numpy.fft import rfft
from scipy import fft
from scipy.signal.windows import blackmanharris
from scipy.signal import correlate

## Define functions to calculate frequencies

https://gist.github.com/endolith/255291?permalink_comment_id=2956166

* zero-crossings – easy but less reliable for irregular oscillations
* fast fourier transform – python has good libraries to do this with minimal code
* autocorrelation – python has good libraries to do this with minimal code
* dominant frequency

In [2]:
def zero_crossings(signal, fs=1.0):
    """
    Estimate frequency by counting zero crossings
    """
    # Find all indices right before a rising-edge zero crossing
    indices = np.nonzero((signal[1:] >= 0) & (signal[:-1] < 0))[0]

    # Naive (Measures 1000.185 Hz for 1000 Hz, for instance)
    # crossings = indices

    # More accurate, using linear interpolation to find intersample
    # zero-crossings (Measures 1000.000129 Hz for 1000 Hz, for instance)
    crossings = [i - signal[i] / (signal[i + 1] - signal[i]) for i in indices]

    # Some other interpolation based on neighboring points might be better.
    # Spline, cubic, whatever

    return fs / np.mean(np.diff(crossings))


def dominant_freq(signal, sample_spacing=1):
    spectrum = fft.fft(signal)
    freq = fft.fftfreq(len(signal), sample_spacing)
    dom_freq = freq[np.argmax(np.abs(spectrum))]
    return dom_freq


def parabolic(f, x):
    """Quadratic interpolation for estimating the true position of an
    inter-sample maximum when nearby samples are known.

    f is a vector and x is an index for that vector.

    Returns (vx, vy), the coordinates of the vertex of a parabola that goes
    through point x and its two neighbors.

    Example:
    Defining a vector f with a local maximum at index 3 (= 6), find local
    maximum if points 2, 3, and 4 actually defined a parabola.

    In [3]: f = [2, 3, 1, 6, 4, 2, 3, 1]

    In [4]: parabolic(f, argmax(f))
    Out[4]: (3.2142857142857144, 6.1607142857142856)

    """
    # Requires real division.  Insert float() somewhere to force it?
    xv = 1 / 2 * (f[x - 1] - f[x + 1]) / (f[x - 1] - 2 * f[x] + f[x + 1]) + x
    yv = f[x] - 1 / 4 * (f[x - 1] - f[x + 1]) * (xv - x)
    return (xv, yv)


def fft_freq(signal, fs=1.0):
    """
    Estimate frequency from peak of FFT
    """
    # Compute Fourier transform of windowed signal
    windowed = signal * blackmanharris(len(signal))
    f = rfft(windowed)

    # Find the peak and interpolate to get a more accurate peak
    i = np.argmax(np.abs(f))  # Just use this for less-accurate, naive version
    true_i = parabolic(np.log(np.abs(f)), i)[0]

    # Convert to equivalent frequency
    return fs * true_i / len(windowed)


def autocorr_freq(signal, fs=1.0):
    """
    Estimate frequency using autocorrelation
    """
    # Calculate autocorrelation and throw away the negative lags
    corr = correlate(signal, signal, mode="full")
    corr = corr[len(corr) // 2 :]

    # Find the first low point
    d = np.diff(corr)
    start = np.nonzero(d > 0)[0][0]

    # Find the next peak after the low point (other than 0 lag).  This bit is
    # not reliable for long signals, due to the desired peak occurring between
    # samples, and other peaks appearing higher.
    # Should use a weighting function to de-emphasize the peaks at longer lags.
    peak = np.argmax(corr[start:]) + start
    px, py = parabolic(corr, peak)

    return fs / px

## Read a csv file with precalculated oscillation amplitudes

In [3]:
input_folder = "/standard/redemann_lab/Vitaly/"
input_file = "20240322_MAS91_hcp6RNAi_48hRT003-embryo-0001.csv"

In [4]:
data = pd.read_csv(os.path.join(input_folder, input_file))

In [5]:
data.describe()

Unnamed: 0,Frame,"Pole 1,x (pixel)","Pole 1,y (pixel)","Pole 2,x (pixel)","Pole 2,y (pixel)","Midzone,x (pixel)","Midzone,y (pixel)",angle,Pole-Pole Distance [um],Pole 1 Osc (um),Pole 2 Osc (um),left Pole (pixel),right Pole (pixel)
count,46.0,46.0,46.0,46.0,46.0,46.0,46.0,46.0,46.0,46.0,46.0,46.0,46.0
mean,23.5,286.521739,200.956522,338.673913,216.630435,312.554348,208.836957,193.985317,14.612835,0.38139,-0.935218,72.065217,127.043478
std,13.422618,16.081375,6.706403,26.680042,7.230671,16.209129,4.691664,33.997062,8.323491,1.404277,1.034701,15.718923,15.580553
min,1.0,274.0,193.0,286.0,195.0,284.5,194.0,0.0,0.0,-2.585234,-2.478668,58.0,100.0
25%,12.25,280.0,197.0,345.0,216.25,313.5,208.125,192.164133,5.499753,-0.328157,-1.652784,60.0,110.25
50%,23.5,283.0,199.0,351.5,219.0,317.0,209.0,195.696948,17.650433,0.56919,-0.99647,66.0,133.0
75%,34.75,287.5,202.75,354.75,221.0,319.5,210.5,200.606832,20.874634,1.384824,-0.402936,88.75,139.0
max,46.0,358.0,221.0,359.0,224.0,358.0,221.5,270.0,22.017367,2.432544,2.069814,100.0,141.0


## Calculate frequencies

In [6]:
freq_pole_1 = [
    zero_crossings(data["Pole 1 Osc (um)"].values),
    fft_freq(data["Pole 1 Osc (um)"].values),
    autocorr_freq(data["Pole 1 Osc (um)"].values),
    dominant_freq(data["Pole 1 Osc (um)"].values),
]
freq_pole_2 = [
    zero_crossings(data["Pole 2 Osc (um)"].values),
    fft_freq(data["Pole 2 Osc (um)"].values),
    autocorr_freq(data["Pole 2 Osc (um)"].values),
    dominant_freq(data["Pole 2 Osc (um)"].values),
]
freq_df = pd.DataFrame(
            {
                "Pole 1": freq_pole_1,
                "Pole 2": freq_pole_2,
            },
            index=[
                "Osc. zero-crossings",
                "Osc. frequency (FFT)",
                "Osc. frequency (autocorr.)",
                "Osc. dominant frequency",
            ],)
freq_df.head()

Unnamed: 0,Pole 1,Pole 2
Osc. zero-crossings,0.090142,0.149542
Osc. frequency (FFT),0.051348,0.001765
Osc. frequency (autocorr.),0.057605,0.114192
Osc. dominant frequency,0.043478,0.0


## Save calculated frequencies to csv file

In [None]:
output_folder = input_folder # "/standard/redemann_lab/Vitaly"
output_file = os.path.splitext(input_file)[0]+"-frequencies-"+os.path.splitext(input_file)[1]
print (f"output_folder: {output_folder}\noutput_file: {output_file}")

In [None]:
freq_df.to_csv(os.path.join(output_folder, output_file))