# bk_DRT

In [1]:
from __future__ import division 
import os
import time
import warnings
import ray
import pickle
from pathlib import Path
import numpy as np
import pandas as pd
from scipy.optimize import minimize
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import py_eis.run_KKopt as run_KK
import py_eis.KKZhit as KKZhit
import py_eis.val as valit
import py_eis.drt as drt
from py_eis.bc import back_calc
from hyperopt import tpe, hp, fmin, Trials

In [None]:
css = ['#1f77b4','#ff7f0e','#2ca02c','#d62728','#9467bd','#8c564b','#e377c2','#7f7f7f','#bcbd22','#17becf']*100

In [None]:
def del_iframes():
    time.sleep(.1)
    try:
        for iframe in os.listdir(Path('iframe_figures')):
            try: 
                os.remove(Path('iframe_figures')/iframe)
            except: 
                pass
    except: pass
del_iframes()

# Data pre-processing

- Single impedances must be in `"folder"` (see below)

In [None]:
# Enter data pre-processing code here.

# Import

- `Choose or add` custom read_in_method to read_data().

In [None]:
def read_data(folder,file):
    
    read_in_method = 0
    
    if read_in_method == 0: # ASC, ASC2, ESC, ESC2
        raw_data = np.load(folder/file)
        F  = raw_data['f']
        Zr = np.real(raw_data['Z5'])
        Zi = np.imag(raw_data['Z5'])

    data = np.array([F,Zr,Zi])
    
    if data[0][0] > data[0][-1]:
        data = np.fliplr(data)
        
    return data 

- Add your data folder to `'EIS_data/'`

In [None]:
print('Folders:',os.listdir('EIS_data/'))

- Choose `subfolder`:

In [None]:
sub_folder = 'ASC2'
#====================================================================================================
folder = Path('EIS_data/'+sub_folder+'/')
fl = os.listdir(folder)
for fi, file in enumerate(fl):
    print(fi,'-',file)

# EIS

In [None]:
ii = list(range(len(fl)))

In [None]:
Z_eis=pd.DataFrame()
fig = go.Figure()
for mi, meas in enumerate(ii):  
    data = read_data(folder,fl[meas])  
    fig.add_trace(go.Scatter(x = data[1], y = -data[2], mode='lines', line=dict(width=3)))
    for col in range(3):
        Z_eis=pd.concat([Z_eis, pd.DataFrame(data[col])],axis=1)         
fig.update_xaxes(showline=True, mirror=True, zeroline=False, autorange=True,
                 ticks="inside",tickwidth=1, tickcolor='black', ticklen=2, title="Z' [\u03A9]")  
fig.update_yaxes(showline=True, mirror=True, zeroline=False, autorange=True,
                 ticks="inside",tickwidth=1, tickcolor='black',ticklen=2, title="-Z'' [\u03A9]")  
fig.update_layout(template=None,height=360,width=600,font=dict(family='Arial',size=16),
                  margin=dict(l=80, r=10, b=50,t=50, pad=0), 
                  legend=dict(x=1.01,y=1.005,bgcolor="white",bordercolor="Black",borderwidth=0))   
fig.show(renderer='iframe_connected')

# KK

- Set `threshold` and `frequency range` for KK test.

In [13]:
KK_threshold = .05 
f_min, f_max = 1, 1000

In [14]:
for mi, meas in enumerate(ii):
    fig_KK = make_subplots(rows=1, cols=2) 
    data = read_data(folder,fl[meas]) 
    F, Zr, Zi  = data[0], data[1], data[2]
    datap, datap_raw = run_KK.run_KK(F,Zr,Zi,KK_threshold,f_min,f_max)   
     
    fig_KK.add_trace(go.Scatter(x = fig['data'][mi]['x'], y = fig['data'][mi]['y'], name = "Curve #"+str(mi), 
                                mode='lines', line=dict(color = css[mi], width=4)), row=1, col=1)
    fig_KK.add_trace(go.Scatter(x = np.real(datap['Z']), y = -np.imag(datap['Z']), name = "Curve #"+str(mi), 
                                mode='markers', marker=dict(size=8, color='lightblue', symbol='circle-open')), row=1,col=1)  
    fig_KK.add_trace(go.Scatter(x = datap_raw['F'], y = np.real(datap_raw['Z'])/np.real(datap_raw['ZrKK'])-1, name = "Z'", 
                                mode='lines', line=dict(color = css[mi], width=4)), row=1, col=2)
    fig_KK.add_trace(go.Scatter(x = datap['F'], y = np.real(datap['Z'])/datap['ZrKK']-1, name = "Z' (KK)",
                                mode='markers',marker=dict(size=8, color = 'lightblue', symbol='circle-open')),row=1,col=2)
    fig_KK.add_trace(go.Scatter(x = [0,10**6], y = [KK_threshold]*2, name= 'Threshold',
                                mode='lines', line=dict(dash='dash',color='black')), row=1,col=2)
    fig_KK.add_trace(go.Scatter(x = [0,10**6], y = [-KK_threshold]*2, name= 'Threshold',
                                mode='lines', line=dict(dash='dash',color='black')), row=1,col=2)
    fm = np.array(datap_raw['F'][::-1])[np.where(np.imag(datap_raw['Z'])[::-1]<0)[0][0]]
    fig_KK.add_trace(go.Scatter(x = [fm, fm], y = [-1,1], name = "f_lim (Z''<0)",
                                mode='lines',line=dict(dash='dot',color='magenta')),row=1,col=2)
    fig_KK.update_xaxes(showline=True, mirror=True, zeroline=False,
                 ticks="inside",tickwidth=1, tickcolor='black', ticklen=2, title="Z' [\u03A9]",row=1,col=1)   
    fig_KK.update_yaxes(showline=True, mirror=True, zeroline=False,
                 ticks="inside",tickwidth=1, tickcolor='black',ticklen=2, title="-Z'' [\u03A9]",row=1,col=1) 
    fig_KK.update_xaxes(showline=True, mirror=True, zeroline=False, range=[-2,5],type='log',dtick=1,
                 ticks="inside",tickwidth=1, tickcolor='black', ticklen=2, title="f [Hz]",row=1,col=2)   
    fig_KK.update_yaxes(showline=True, mirror=True, zeroline=False, range=[-.2,.2],
                 ticks="inside",tickwidth=1, tickcolor='black',ticklen=2, title="rel. error [%]",row=1,col=2) 
    fig_KK.update_layout(template=None,height=360,width=900,font=dict(family='Arial',size=16),
                         margin=dict(l=80, r=10, b=50,t=50, pad=0), 
                         legend=dict(x=1.01,y=1.005,bgcolor="white",bordercolor="Black",borderwidth=1)) 
    fig_KK.show(renderer='iframe_connected')
    time.sleep(1)

# DRT

- If possible, adjust 'step' so that the `number of frequencies` around 60.

In [15]:
step = 6
print('Number of frequencies = ',len(datap[::step]))

Number of frequencies =  59


- `Frequency range` for DRT calculation.
- `DRT coefficients`.

In [16]:
f_lim = [.01,10000]
lam = 5
shape = 4
drt_coeffs = [1/(10**lam),shape,0]

In [None]:
def drt_master(f_lim,step,drt_coeffs,KK_threshold,stepwise,ii,folder,fl,f_min,f_max,decades,roy=True):
    x, counter = 0, 0
    while x == 0 and counter < 3:
        try:
            if roy == True:
                ray.shutdown()
                time.sleep(1)
                ray.init(num_cpus=3,ignore_reinit_error=True)
                time.sleep(1)
            @ray.remote
            def drt_calc(data,drt_coeffs):
                warnings.filterwarnings("ignore")
                gg,tt = drt.drt(data[1]+data[2]*1j,data[0],lam=drt_coeffs[0],coeff=drt_coeffs[1],L=drt_coeffs[2]) 
                obj = [gg,tt]
                return obj
            data_list, input_list = [], []
            for mi, meas in enumerate(ii):
                data = read_data(folder,fl[meas]) 
                F, Zr, Zi  = data[0], data[1], data[2]
                datap, datap_raw = run_KK.run_KK(F,Zr,Zi,KK_threshold,f_min,f_max)           
                dataf = datap[(datap['F']>=f_lim[0])&(datap['F']<=f_lim[1])][::step]
                dataKK = np.array([dataf['F'],np.real(dataf['Z']),np.imag(dataf['Z'])])
                data_save = np.array([datap['F'],np.real(datap['Z']),np.imag(datap['Z'])])
                data_list.append(data_save)
                input_list.append(drt_calc.remote(dataKK,drt_coeffs))
            output_list = []
            for ipt in input_list:
                output_list.append(ray.get([ipt]))
            if roy == True:
                ray.shutdown()
            x = 1
        except: counter +=1
    return data_list, output_list
data_list, output_list = drt_master(f_lim,step,drt_coeffs,KK_threshold,stepwise,ii,folder,fl,f_min,f_max,decades)

In [18]:
Z_drt = pd.DataFrame()
fig_drt = go.Figure()
for out in output_list:
    for obj in out:
        fig_drt.add_trace(go.Scatter(x = np.power(obj[1],-1),y = obj[0],mode='lines',line=dict(width=3)))
        Z_drt=pd.concat([Z_drt, pd.DataFrame(np.power(obj[1],-1))],axis=1)
        Z_drt=pd.concat([Z_drt, pd.DataFrame(obj[0])],axis=1)
        
fig_drt.update_xaxes(showline=True, mirror=True, zeroline=False,range=[-3, 5],
                     type='log',dtick=1,ticks="inside",tickwidth=1, tickcolor='black', ticklen=2, title="f [Hz]")   
fig_drt.update_yaxes(showline=True, mirror=True, zeroline=False, range=[0,.006],
                     ticks="inside",tickwidth=1, tickcolor='black',ticklen=2, 
                     title="r'$\displaystyle\frac{1}{f}G\left(\displaystyle\frac{1}{f}\right)[m\Omega]$'")         
fig_drt.update_layout(template=None,height=360,width=600,font=dict(family='Arial',size=16),
                      margin=dict(l=80, r=10, b=50,t=50, pad=0))
fig_drt.show(renderer='iframe_connected')

# Validation

In [20]:
for oi, out in enumerate(output_list):
    for obj in out:    
        bc = back_calc(obj[0],obj[1])   
        shift,Fq_red,Z_data,add_shift,bc_shifted,points,diff,res,error,error_lines = valit.validation(data_list,oi,bc)
        fig_bc =  make_subplots(rows=1, cols=2)
        fig_bc.add_trace(go.Scatter(x=data_list[oi][1], y=-data_list[oi][2], name= "Curve #"+str(oi),
                                    mode='lines', line=dict(color = css[oi], width=4)),row=1,col=1) 
        fig_bc.add_trace(go.Scatter(x=Fq_red,y=diff, name = "Deviation",
                                    mode='lines+markers',marker=dict(size=8, color = 'lightgrey', symbol='circle')),row=1,col=2)      
        fig_bc.add_trace(go.Scatter(x=(np.real(bc_shifted))[::20],y=-np.imag(bc_shifted)[::20], name = "Backcalc",
                                    mode='markers',marker=dict(size=8, color = 'lightblue', symbol='circle-open')),row=1,col=1)              
        fig_bc.add_trace(go.Scatter(x=[0.01,100000],y=[0.01,0.01],name='L_upper',
                                    mode='lines',line=dict(dash='dash',color='black')),row=1,col=2)
        fig_bc.add_trace(go.Scatter(x=[0.01,100000],y=[-0.01,-0.01],name='L_lower',
                                    mode='lines',line=dict(dash='dash',color='black')),row=1,col=2)      
        fig_bc.update_xaxes(showline=True, mirror=True, zeroline=False,
                 ticks="inside",tickwidth=1, tickcolor='black', ticklen=2, title="Z' [\u03A9]",row=1,col=1)   
        fig_bc.update_yaxes(showline=True, mirror=True, zeroline=False,
                 ticks="inside",tickwidth=1, tickcolor='black',ticklen=2, title="-Z'' [\u03A9]",row=1,col=1) 
        fig_bc.update_xaxes(showline=True, mirror=True, zeroline=False,type='log',range=[-2,5],dtick=1,
                 ticks="inside",tickwidth=1, tickcolor='black', ticklen=2, title="f [Hz]",row=1,col=2)   
        fig_bc.update_yaxes(showline=True, mirror=True, zeroline=True,range=[-0.1,0.1],
                 ticks="inside",tickwidth=1, tickcolor='black',ticklen=2, title="Deviation [m\u03A9]",row=1,col=2)    
        fig_bc.update_layout(template=None,height=270,width=920,font=dict(family='Arial',size=16),
                         margin=dict(l=80, r=10, b=50,t=50, pad=0),
                         legend=dict(x=1.01,y=1.005,bgcolor="white",bordercolor="Black",borderwidth=1)) 
        time.sleep(.2)
        fig_bc.show(renderer='iframe_connected')

# DRTopt

 - Create `custom "ii" list` if not all values should be used for optimization. 
 - `Number of evaluations` for optimization.
 - `Create a new file` or continue with an old one.

In [24]:
if False:
    ii = []
    
evals = 10
create_new_file = True

In [None]:
%%time
def objective(params):   
    x, y = params['x'], params['y']
    data_list, output_list = drt_master(f_lim,step,[1/(10**x),y,0],KK_threshold,stepwise,ii,folder,fl,f_min,f_max,decades,False)     
    qual_list, qual = [], None
    for oi, out in enumerate(output_list):
        for obj in out:   
            bc = back_calc(obj[0],obj[1])   
            shift,Fq_red,Z_data,add_shift,bc_shifted,points,diff,res,error,error_lines = valit.validation(data_list,oi,bc)
            qual = error/len(error_lines.columns)*1000
        if qual is None: 
            qual = np.nan
        qual_list.append(qual)
    qual = np.mean(qual_list)
    return qual
space = {'x': hp.quniform('x', 1, 6, q=1),'y': hp.quniform('y', 1, 6, q=1)}
if create_new_file == True: trials = Trials()
else: trials = pickle.load(open("my_results.pkl", "rb"))
max_evals, step2 = evals, 1

ray.shutdown()
time.sleep(1)
ray.init(num_cpus=3,ignore_reinit_error=True)
time.sleep(1)
for i in range(1, max_evals + 1, step2):
    best = fmin(fn=objective,space=space,algo=tpe.suggest,trials=trials,max_evals=i)
    print(best)
    file_string = "my_results.pkl"
    pickle.dump(trials, open(file_string, "wb"))
ray.shutdown()

- Adjust marker size if necessary.

In [None]:
best = pickle.load(open("my_results.pkl", "rb"))
loss_list, val_list1, val_list2 = [],[],[]
for b in list(best):
    loss_list.append(b['result']['loss'])
    val_list1.append(b['misc']['vals']['x'][0])
    val_list2.append(b['misc']['vals']['y'][0])
rst = pd.DataFrame([loss_list, val_list1,val_list2]).transpose().sort_values(by=[0]).reset_index(drop=True).iloc[:100,:]
col_list=[]
for i in range(int(len(rst)/10)):
    for j in range(10): col_list.append(css[i])
for k in range(len(rst)%10): col_list.append(css[i+1])

fig_l = go.Figure()
for vi in range(len(rst)):
    row=rst.loc[vi]
    try: fig_l.add_trace(go.Scatter(x=[row[1]],y=[row[2]],mode='markers',marker=dict(size=10**4*row[0],
                                                                                     symbol='circle',color=col_list[vi])))
    except: pass    
fig_l.update_xaxes(showline=True, mirror=True, zeroline=False,range=[0,7],dtick=1,
                   ticks="inside",tickwidth=1, tickcolor='black', ticklen=2, title="DRT param. I")   
fig_l.update_yaxes(showline=True, mirror=True, zeroline=False,range=[0,7],dtick=1,
                   ticks="inside",tickwidth=1, tickcolor='black',ticklen=2, title="DRT param. II")         
fig_l.update_layout(template=None,height=360,width=900,font=dict(family='Arial',size=16),
                    margin=dict(l=80, r=10, b=50,t=50, pad=0))
rst.columns = ['mean loss','DRT param. I','DRT param. II']
print(rst.iloc[:10,:])
fig_l.show(renderer='iframe_connected')

# Save data

In [None]:
Z_eis.columns = ['F','Zr','Zi']*len(output_list)
raw_input = input('Save to pickle? ')
if raw_input == 'y':    
    file_name=input('Filename: ')
    df_file=Z_eis
    pkl_folder = Path("pickles/")
    df_file.to_pickle(pkl_folder/file_name)
    print('File saved.')

In [None]:
Z_drt.columns = ['F','Z']*len(output_list)
raw_input = input('Save to pickle? ')
if raw_input == 'y':    
    file_name=input('Filename: ')
    df_file=Z_drt
    pkl_folder = Path("pickles/")
    df_file.to_pickle(pkl_folder/file_name)
    print('File saved.')