In [11]:
# Author : Sachu Sanjayan
# factored code p04
import numpy as np
from scipy.optimize import leastsq
from scipy.fftpack import ifft
from math import ceil


import ipywidgets as widgets
from IPython.display import display


class PyFEM:

    
    def __init__(self):
        self.run_button = widgets.Button(description="Extract Frequencies")
        self.visualize_checkbox = widgets.Checkbox(value=False, description="Visualize Data")
        self.download_checkbox = widgets.Checkbox(value=False, description="Download Data")
        _layout = widgets.Layout(width='80%')
        
        self.fq_i = widgets.Text(
            value='0',
            placeholder='Min frequency',
            description='Min Fq:',
            disabled=False,
            layout=_layout   
        )
        self.fq_n = widgets.Text(
            value='25',
            placeholder='Max frequency',
            description='Max Fq:',
            disabled=False,
            layout=_layout   
        )

        self.sn_ratio = widgets.Text(
            value='4.5',
            placeholder='Signal noise ratio',
            description='S/N:',
            disabled=False,
            layout=_layout   
        )
        
        self.nq = widgets.Text(
            value='10',
            placeholder='Number of frequencies to extract',
            description='Num Fq:',
            disabled=False,
            layout=_layout     
        )

        self.ticbox = widgets.Text(
            value='filename.dat',
            placeholder='file name',
            description='filename:',
            disabled=False,
            layout=_layout     
        ) 

        self.output = widgets.Output()
        self.run_button.on_click(self.on_run_button_click)

        display( self.ticbox, self.fq_i, self.fq_n, self.sn_ratio, self.nq, self.run_button, self.output)
    

    def on_run_button_click(self, b):

        with self.output:
            self.output.clear_output()
            print("Extracting frequencies !")

            # Load data and run analysis
            file_name = rf'{self.ticbox.value}'

            fi, fn = float(self.fq_i.value), float(self.fq_n.value)
            dn, n = float(self.sn_ratio.value), int(self.nq.value)
            dat = np.loadtxt(file_name)
            t, a = dat[:, 0], dat[:, 1]
            p = self.pysca_loop(t, a, fi, fn, dn, n, fn, ofac=6.0)
            np.savetxt('pw_data.pw', p, fmt="%12.12f %12.12f %12.12f %12.12f %12.12f")


    def pysca_loop(self, t, a, numin, numax, snr_width, n, hifreq, ofac=6.0):
        new_ts = a.copy()
        pw_fq, pw_amp, pw_phase, pw_snr, pw_noise = [], [], [], [], []

        for i in range(n):
            orig_nu, orig_per = self.compute_periodogram(t, new_ts, ofac, hifreq)
            nuidx = (orig_nu >= numin) & (orig_nu <= numax)
            
            if not np.any(nuidx):
                continue
            
            nu, per = orig_nu[nuidx], orig_per[nuidx]
            new_freq = self.find_highest_peak(nu, per)

            # Fit original time series using extracted mode parameters
            amp, phase = 1, 0
            new_amp, new_phase, ok, misc = self.fit_timeseries(t, a, new_freq, amp, phase)

            if not ok:
                raise Exception(f'Harmonic fit failed for peak frequency {new_freq:.12f} [ier={misc[3]}]')

            # Prewhiten the original time series
            new_ts = self.prewhiten(t, new_ts, new_freq, new_amp, new_phase)
            noise = self.median_noise_level(nu, per, new_freq, snr_width)
            snr = new_amp / noise

            print(f"{i}) Freq: {new_freq:.12f}, Amp: {new_amp[0]:.12f}, Phase: {new_phase[0]:.12f}, SNR: {snr[0]:.12f}, Noise: {noise:.12f}")

            pw_fq.append(new_freq)
            pw_amp.append(new_amp)
            pw_phase.append(new_phase)
            pw_snr.append(snr)
            pw_noise.append(noise)

        return np.column_stack((pw_fq, pw_amp, pw_phase, pw_snr, pw_noise))

    def compute_periodogram(self, t, a, ofac, hifreq):
        t, a = np.atleast_1d(t, a)
        if t.ndim > 1 or a.ndim > 1:
            raise ValueError('Input arrays must be 1D')
        nu, p, _, _, var = self.fasper(t, a, float(ofac), hifreq=float(hifreq))
        return nu, 2.0 * np.sqrt(var * p / len(a))

    def fasper(self, x, y, ofac, hifac=None, hifreq=None, MACC=4):
        n = len(x)
        if n != len(y):
            raise ValueError('Incompatible arrays.')
        
        hifac = hifac or 1.0
        ave, var = y.mean(), np.var(y, ddof=1)
        xmin, xmax, xdif = x.min(), x.max(), x.max() - x.min()
        df = 1.0 / (xdif * ofac)
        
        nout = ceil((hifreq / df) if hifreq else (0.5 * ofac * hifac * n))
        nfreqt, nfreq = int(2.0 * nout * MACC), 64
        while nfreq < nfreqt:
            nfreq *= 2
        ndim = 2 * nfreq
        
        wk1, wk2 = np.zeros(ndim, dtype='complex'), np.zeros(ndim, dtype='complex')
        fac, fndim = ndim / (xdif * ofac), ndim
        ck, ckk = ((x - xmin) * fac) % fndim, (2.0 * ((x - xmin) * fac) % fndim) % fndim
        
        for j in range(n):
            wk1[int(ck[j])] += y[j] - ave
            wk2[int(ckk[j])] += 1.0
        
        wk1, wk2 = ifft(wk1) * len(wk1), ifft(wk2) * len(wk1)
        wk1, wk2 = wk1[1:nout+1], wk2[1:nout+1]
        
        hypo2 = 2.0 * abs(wk2)
        hc2wt, hs2wt = wk2.real / hypo2, wk2.imag / hypo2
        cwt, swt = np.sqrt(0.5 + hc2wt), np.sign(hs2wt) * np.sqrt(0.5 - hc2wt)
        den = 0.5 * n + hc2wt * wk2.real + hs2wt * wk2.imag
        cterm = (cwt * wk1.real + swt * wk1.imag) ** 2 / den
        sterm = (cwt * wk1.imag - swt * wk1.real) ** 2 / (n - den)
        
        return df * (np.arange(nout) + 1.), (cterm + sterm) / (2.0 * var), nout, np.argmax(cterm + sterm), var

    def find_highest_peak(self, nu, p):
        imax = np.argmax(p)
        if imax in {0, p.size - 1}:
            return nu[imax]
        frq1, frq2, frq3 = nu[imax-1], nu[imax], nu[imax+1]
        y1, y2, y3 = p[imax-1], p[imax], p[imax+1]
        t1 = (y2 - y3) * (frq2 - frq1)**2 - (y2 - y1) * (frq2 - frq3)**2
        t2 = (y2 - y3) * (frq2 - frq1) - (y2 - y1) * (frq2 - frq3)
        return frq2 - 0.5 * t1 / t2

    def median_noise_level(self, nu, p, nu0, width):
        idx = (nu >= nu0 - 0.5 * width) & (nu <= nu0 + 0.5 * width)
        return np.median(p[idx]) if np.any(idx) else np.nan


    def fit_timeseries(self, t, a, freq, amp0=None, phi0=None, **kwargs):

        t, a, freq = np.atleast_1d(t, a, freq)
        amp0 = np.ones_like(freq) if amp0 == None else np.atleast_1d(amp0)
        phi0 = 0.5 * np.ones_like(freq) if phi0 == None else np.atleast_1d(phi0)
        for ary in [t, a, freq, amp0, phi0]:
            if ary.ndim > 1:
                raise ValueError('Input arrays must be 1d')

        # Fill missing initial values for amplitudes and phases.
        if amp0.size < freq.size:
            amp0 = np.concatenate((amp0, np.ones(freq.size - amp0.size)))
        if phi0.size < freq.size:
            phi0 = np.concatenate((phi0, 0.5 * np.ones(freq.size - phi0.size)))

        # Perform leastsq fit.
        x, cov_x, infodict, mesg, ier = leastsq(
            self._minfunc, np.concatenate((amp0, phi0)), args=(t, a, freq),
            Dfun=self._dfunc, full_output=1, col_deriv=1, **kwargs)

        # The fit was successful, if ier in [1, 2, 3, 4]
        ok = (ier >= 1) and (ier <= 4)

        # Extract amplitudes and phases from the fit result.
        n = len(x) // 2
        amp, phi = x[:n], x[n:]
        idx = (amp < 0)
        amp[idx] *= -1.0
        phi[idx] += 0.5
        phi = np.mod(phi, 1)

        return amp, phi, ok, (cov_x, infodict, mesg, ier)


    def _minfunc(self, amph, xdat, ydat, nu):

        n = len(amph) // 2
        return ydat - self.harmfunc(xdat, nu, amph[:n], amph[n:])

    def _dfunc(self, amph, xdat, ydat, nu):
    
        n = len(amph) // 2
        am, ph = amph[:n], amph[n:]
        res = np.zeros((len(amph), len(xdat)))
        for i in range(n):
            res[i] = -np.sin(2*np.pi * (nu[i]*xdat + ph[i]))
        for i in range(n):
            res[n+i] = -am[i] * np.cos(2*np.pi * (nu[i]*xdat + ph[i])) * 2*np.pi
        return res

    def harmfunc(self, t, nu, amp, phi):

        t, nu, amp, phi = np.atleast_1d(t, nu, amp, phi)
        if t.ndim > 1 or nu.ndim > 1 or amp.ndim > 1 or phi.ndim > 1:
            raise ValueError('Input arrays must be 1d')
        n = len(nu)
        res = np.zeros(len(t))
        for i in range(n):
            res += amp[i] * np.sin(2 * np.pi * (nu[i] * t + phi[i]))
        return res

    def prewhiten(self, t, a, freq, amp, phi):

        t, a, freq, amp, phi = np.atleast_1d(t, a, freq, amp, phi)
        for ary in [t, a, freq, amp, phi]:
            if ary.ndim > 1:
                raise ValueError('Input arrays must be 1d')
        if len(t) != len(a):
            raise ValueError("Arrays 't' and 'a' must have equal size")
        if len(freq) != len(amp) or len(amp) != len(phi):
            raise ValueError("Arrays 'freq', 'amp' and 'phi' must have equal size")
        return a - self.harmfunc(t, freq, amp, phi)


    def run():
        PyFEM()

In [None]:
P04 = PyFEM()
P04.run

Text(value='filename.dat', description='filename:', layout=Layout(width='80%'), placeholder='file name')

Text(value='0', description='Min Fq:', layout=Layout(width='80%'), placeholder='Min frequency')

Text(value='25', description='Max Fq:', layout=Layout(width='80%'), placeholder='Max frequency')

Text(value='4.5', description='S/N:', layout=Layout(width='80%'), placeholder='Signal noise ratio')

Text(value='10', description='Num Fq:', layout=Layout(width='80%'), placeholder='Number of frequencies to extr…

Button(description='Extract Frequencies', style=ButtonStyle())

Output()

<bound method PyFEM.run of <__main__.PyFEM object at 0x0000019E07A7C050>>