In [1]:

import glob
import dask
import dask.dataframe as dd
import matplotlib.pyplot as plt
import time
import pandas as pd
from dask.distributed import Client
import numpy as np
from dask import delayed
import peakutils
import numpy as np
import pandas as pd
from scipy import interpolate, optimize
from scipy import stats
from peakutils.plot import plot as pplot
from scipy.optimize import curve_fit
from scipy.stats import wasserstein_distance
from numpy import trapz
import sys
from scipy import linalg
from dask.distributed import Client, progress


In [2]:
fpath = '../data/sensor2_week5_LN2/*.csv'

f = glob.glob(fpath)


In [3]:
f[0]

'../data/sensor2_week5_LN2/sensor2_LN2_temp-192_001.00ms.CWL0700.00nm 2024 January 23 16_42_50-Frame-07918.csv'

In [6]:

# =============================================================================
# sequential
# =============================================================================

fnm, time_step = [],[]
laser_pow, amplitude, peak_center =[],[],[]
width, debye_waller, frame_num = [], [],[]
kld, wasserstein_dist =[], []
amplitude2, peak_center2, width2 =[],[],[]
temps = []

filtered_files = f
huang_rhys = [649, 780]
nv_zpl = [634.25,640.25]
nv0_zpl = [572.0, 578]

   


def gaussian(x_zpl, amp, u, std):
    ''' gaussian fit'''
    return amp*np.exp(-((x_zpl-u)**2/(2*std**2)))


def main_processor( nv_type='nv', func = 'gaussian', fit_params = [4000, 637.5,1.5], max_fev=50000, dx = 0.01 ):
    ''' nv_type = enter nv for nv(-) or nv0 for nv_zero; default is nv
    
    func is the fitting function used. default is gaussian, other options 
    include lorenztian, two_gaussian or two_lorenztian and spline_fit
    
    fit_parms: default for gaussian
    for lorenzian 
    
    '''
    if nv_type == 'nv':
       zp= nv_zpl
    else:
        zp=nv0_zpl
    
    for f1 in filtered_files[:]:
        #print(f1)
        #fnm.append(f1) #.split('\\')[1])
        #frame_num.append((f1))
        ###### open and clean data ####
        df=pd.read_csv(f1, sep=',', header = 0, engine='python')
        df.sort_values(by='Wavelength', ascending=True)
        df.drop_duplicates(subset='Wavelength', keep='first', inplace=True)
        x,y=df['Wavelength'],df['Intensity']
        ### mark out zpl range of interest #####
        x_zpl, y_zpl = x[(np.abs(x-zp[0])).argmin():(np.abs(x-zp[1])).argmin() ],\
        y[(np.abs(x-zp[0])).argmin():(np.abs(x-zp[1])).argmin() ]

        base = peakutils.baseline(y_zpl, 1)
        y_zpl_base = y_zpl-base
        #time_step.append(time_st(f1))
        #kl_divergence(y, ref)
        #wasserstein_dist.append(wasserstein_distance(y,spectrum1))
        dx_val = (x[0]-x[50])/50
        area_zpl = trapz(y[(np.abs(x-zp[0])).argmin():(np.abs(x-zp[1])).argmin() ], dx= dx_val)
        area_psb = trapz(y[(np.abs(x-huang_rhys[0])).argmin():(np.abs(x-huang_rhys[1])).argmin() ], dx= dx_val)
        dw = area_zpl/area_psb
        debye_waller.append(dw); 
        if func == 'gaussian': 
             popt, pcov = curve_fit(gaussian,x_zpl, y_zpl_base, [4000, 637.5,1.5], maxfev=max_fev )
             amp, center_wavelength, FWHM = popt
             peak_center.append(center_wavelength);
             width.append(FWHM);
             amplitude.append(amp);
             #lp = laser_power(f1)
             #laser_pow.append(lp)
             #temps.append(float(temp(f1)))


In [7]:
a =time.time()
main_processor()
b = time.time() - a

In [8]:
# =============================================================================
# dask
# =============================================================================

# Initialize Dask client with dashboard
client = Client(n_workers=40, threads_per_worker=2, memory_limit='4GB') #, dashboard_address=':8787'

def process_file(f1, nv_type='nv', func='gaussian', fit_params=[4000, 637.5, 1.5], max_fev=50000, dx=0.01):
    ''' Process a single file and extract features '''
    
    if nv_type == 'nv':
        zp = nv_zpl
    else:
        zp = nv0_zpl
    
    df = pd.read_csv(f1, sep=',', header=0, engine='python')#.compute()
    df = df.sort_values(by='Wavelength', ascending=True).drop_duplicates(subset='Wavelength', keep='first')
    x, y = df['Wavelength'], df['Intensity']
    
    # Mark out ZPL range of interest
    x_zpl_range = (np.abs(x - zp[0])).argmin(), (np.abs(x - zp[1])).argmin()
    x_zpl, y_zpl = x[x_zpl_range[0]:x_zpl_range[1]], y[x_zpl_range[0]:x_zpl_range[1]]
    
    base = peakutils.baseline(y_zpl, 1)
    y_zpl_base = y_zpl - base
    
    dx_val = (x[0] - x[50]) / 50
    area_zpl = trapz(y[x_zpl_range[0]:x_zpl_range[1]], dx=dx_val)
    area_psb = trapz(y[(np.abs(x - huang_rhys[0])).argmin():(np.abs(x - huang_rhys[1])).argmin()], dx=dx_val)
    dw = area_zpl / area_psb
    
    result = {'debye_waller': dw}
    
    if func == 'gaussian':
        def gaussian(x_zpl, amp, u, std):
            return amp * np.exp(-((x_zpl - u) ** 2 / (2 * std ** 2)))
        
        popt, _ = curve_fit(gaussian, x_zpl, y_zpl_base, p0=fit_params, maxfev=max_fev)
        amp, center_wavelength, FWHM = popt
        
        result.update({'amplitude': amp, 'peak_center': center_wavelength, 'width': FWHM})
    
    return result

# List of files to process
files = filtered_files


c =time.time()
# Create and compute delayed tasks
delayed_results = [delayed(process_file)(f) for f in files]
results = dask.compute(*delayed_results)

# Monitor task progress
progress(delayed_results)

# Compute results

#results = dask.compute(*delayed_results)


d = time.time() - c



print(b, d)





Perhaps you already have a cluster running?
Hosting the HTTP server on port 44665 instead


256.95157504081726 25.27560806274414


In [9]:
df = pd.DataFrame(results)

df.head()

Unnamed: 0,debye_waller,amplitude,peak_center,width
0,0.010523,10543.714949,636.74211,0.90809
1,0.010524,10553.012911,636.745231,0.911367
2,0.010405,10651.520247,636.723696,0.925218
3,0.010426,10520.971797,636.742129,0.921046
4,0.010496,10470.270077,636.74828,0.910228


In [10]:
client.close()