# Import
该笔记记录了各种特征的提取方案，被一种特征提取会在最后总结为一个提取函数，其输入是DRT结果，输出的评估特征组成的向量

In [3]:
import os
import re
import gc
import sys 
from datetime import datetime
from loguru import logger



import torch
import numpy as np


import matplotlib.pyplot as plt
%matplotlib qt




# Feature Extraction

## Definition

In [4]:
def SearchELE(rootPath, ele_pattern = re.compile(r"(.+?)_归档")):
    '''==================================================
        Search all electrode directories in the rootPath
        Parameter: 
            rootPath: current search path
            ele_pattern: electrode dir name patten
        Returen:
            ele_list: list of electrode directories
        ==================================================
    '''
    ele_list = []
    for i in os.listdir(rootPath):
        _path = os.path.join(rootPath, i)
        if os.path.isdir(_path):
            match_ele = ele_pattern.match(i)
            if match_ele:
                ele_list.append([_path, match_ele.group(1)])
            else:
                ele_list.extend(SearchELE(_path, ele_pattern))

    return ele_list



def Load_Single(ele_id, rootPath, DATA_SUFFIX):
    fd_pt = os.path.join(f"{rootPath}/{ele_id}_归档", DATA_SUFFIX, f"{ele_id}_{DATA_SUFFIX}.pt")
    if not os.path.exists(fd_pt):
        logger.warning(f"{fd_pt} does not exist")
        return None
    data_pt = torch.load(fd_pt)
    # data_pt = torch.load(fd_pt, weights_only=False)
    _data_group = data_pt["data_group"]

    return _data_group



def DRT_Plot_Batch(fig, DRTdata_list, EISdata_list, Loess_list, eis_seq):
    
    axis = [0] * 6
    axis[0] = fig.add_subplot(2,3,1)    # Nyquist Plot
    axis[1] = fig.add_subplot(2,3,2)    # Bode Plot (Magnitude)
    axis[2] = fig.add_subplot(2,3,3)    # Bode Plot (Phase)
    axis[3] = fig.add_subplot(2,3,4)    # Text
    axis[4] = fig.add_subplot(2,3,5)    # DRT (RC)
    axis[5] = fig.add_subplot(2,3,6)    # DRT (Rτ)


    text_axis = axis[3]
    text_axis.axis('off')

    _s       = 2
    _alpha   = 0.7

    cmap = plt.colormaps.get_cmap('rainbow_r')
    for i in range(len(EISdata_list)):
        if i in eis_seq:
            ch_eis      = EISdata_list[i][0]
            ch_drt      = DRTdata_list[i]
            ch_loess    = Loess_list[i]

            # ch_R    = np.array([i[1:,0] for i in ch_drt])
            # ch_C    = np.array([i[1:,-1] for i in ch_drt])
            ch_R    = np.concatenate([i[1,:] for i in ch_drt])
            ch_C    = np.concatenate([i[2,:] for i in ch_drt])

            _color  = cmap(i/len(EISdata_list))

            axis[0].plot(ch_eis[1,:], -ch_eis[2,:], color = _color, linewidth=2)
            axis[1].loglog(ch_eis[0,:], np.abs(ch_eis[1,:]+1j*ch_eis[2,:]), color = _color, linewidth=2)
            axis[2].semilogx(ch_eis[0,:], np.rad2deg(np.angle(ch_eis[1,:]+1j*ch_eis[2,:])), color = _color, linewidth=2)

            axis[4].scatter(ch_R, ch_C, s=_s, alpha=_alpha, color=_color, label=f'ch[{i:03d}]')
            axis[5].scatter(ch_loess[0,:], ch_loess[1,:], s=_s, alpha=_alpha, color=_color, label=f'ch[{i:03d}]')



    axis[0].set_aspect('equal', adjustable='datalim')
    axis[4].set_xscale('log')
    axis[4].set_yscale('log')
    axis[5].set_xscale('log')
    axis[5].set_yscale('log')


    return text_axis




## RC

### Load Data

In [957]:

rootPath = "D:/Baihm/EISNN/Archive/"
ele_list = SearchELE(rootPath)
DATASET_SUFFIX = "Outlier_Ver04"

# rootPath = "D:/Baihm/EISNN/Archive_New/"
# ele_list = SearchELE(rootPath)
# DATASET_SUFFIX = "Outlier_Ver04"

# rootPath = "D:/Baihm/EISNN/Invivo/"
# ele_list = SearchELE(rootPath, re.compile(r"(.+?)_Ver02"))
# DATASET_SUFFIX = "Outlier_Ver04"


n_ele = len(ele_list)
logger.info(f"Search in {rootPath} and find {n_ele:03d} electrodes")

[32m2025-08-01 03:07:59.764[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m15[0m - [1mSearch in D:/Baihm/EISNN/Archive/ and find 218 electrodes[0m


In [978]:
# ele_id = '06017758'
# ch_id = 96     # Perfect of Perfect

# ele_id = '09290511'
# ch_id = 4    # Up & Down, 2 outliers
# ch_id = 13    # Up & Down, 2 outliers
# ch_id = 21    # Normal + 2 outlier
# ch_id = 41    # Normal + 2 outlier - *(Hard To Tell)
# ch_id = 79    # 3-class, What a mess


# ele_id = '01037160'
# ch_id = 0    # Example for Delemination
# ch_id = 20  # Normal to Short, Same to GPR  
# ch_id = 89  # Same to GPR  
# ch_id = 7  # Normal Example


# ele_id = '05087163'
# ch_id = 7   # one outlier
# ch_id = 50  # No outlier but in two Phases
# ch_id = 55  # One outlier &wired end point
# ch_id = 114 # Open Circuit with on outpler



ele_id = '02067447'
# ch_id = 68  # Short all the time
ch_id = 124 # Short all the time

# ele_id = '11057712'
# ch_id = 106    # Very Good Electrode with 1 hidden outlier, and one phase shift



# ele_id = '10057084'
# ch_id = 16    # Totaly Mess
# ch_id = 18    # Totaly Mess




Loe_Data = Load_Single(ele_id, rootPath, DATA_SUFFIX = f"{DATASET_SUFFIX}_DRTLoe_Ver02")
Loe_Data = Loe_Data[ch_id]



DRTdata_list    = Loe_Data['DRTlist']  
EISdata_list    = Loe_Data['EISlist']  
Loess_list      = Loe_Data['Loesslist']
eis_seq         = Loe_Data['eis_seq']     
eis_short       = Loe_Data['seq_short']  



  data_pt = torch.load(fd_pt)


In [962]:

fig = plt.figure(figsize=(16, 9), constrained_layout=True)
text_axis = DRT_Plot_Batch(fig, DRTdata_list, EISdata_list, Loess_list, eis_seq)



In [989]:
## Draft

fig = plt.figure(figsize=(16, 9), constrained_layout=True)
axis = [0] * 6
axis[0] = fig.add_subplot(2,3,1)    # Nyquist Plot
axis[1] = fig.add_subplot(2,3,2)    # Bode Plot (Magnitude)
axis[2] = fig.add_subplot(2,3,3)    # Bode Plot (Phase)
axis[3] = fig.add_subplot(2,3,4)    # Text
axis[4] = fig.add_subplot(2,3,5)    # DRT (RC)
axis[5] = fig.add_subplot(2,3,6)    # DRT (Rτ)


text_axis = axis[3]
text_axis.axis('off')

_s       = 2
_alpha   = 0.7

cmap = plt.colormaps.get_cmap('rainbow_r')
for i in range(len(EISdata_list)):
    # if i not in [0,1,2,3,5,11]: continue
    if i in eis_seq:
        ch_eis      = EISdata_list[i][0]
        ch_drt      = DRTdata_list[i]
        ch_loess    = Loess_list[i]

        # ch_R    = np.array([i[1:,0] for i in ch_drt])
        # ch_C    = np.array([i[1:,-1] for i in ch_drt])
        ch_R    = np.concatenate([i[1,:] for i in ch_drt])
        ch_C    = np.concatenate([i[2,:] for i in ch_drt])

        _color  = cmap(i/len(EISdata_list))

        axis[0].plot(ch_eis[1,:], -ch_eis[2,:], color = _color, linewidth=2)
        axis[1].loglog(ch_eis[0,:], np.abs(ch_eis[1,:]+1j*ch_eis[2,:]), color = _color, linewidth=2)
        axis[2].semilogx(ch_eis[0,:], np.rad2deg(np.angle(ch_eis[1,:]+1j*ch_eis[2,:])), color = _color, linewidth=2)

        axis[4].scatter(ch_R, ch_C, s=_s, alpha=_alpha, color=_color, label=f'ch[{i:03d}]')
        axis[5].scatter(ch_loess[0,:], ch_loess[1,:], s=_s, alpha=_alpha, color=_color, label=f'ch[{i:03d}]')



axis[0].set_aspect('equal', adjustable='datalim')
axis[4].set_xscale('log')
axis[4].set_yscale('log')
axis[5].set_xscale('log')
axis[5].set_yscale('log')

In [963]:

Rinf_list = []
C0_list = []
# tau_lim = [1/2/np.pi/1e6,1/2/np.pi/10]
# tau_lim = [1e-7,1/2/np.pi/10]
tau_lim = [1e-7,5e-2]
for i in range(len(DRTdata_list)):
    if i in eis_seq:
        ch_drt  = DRTdata_list[i]
        Rinf    = np.array([j[[0,1],0] for j in ch_drt])
        C0      = np.array([j[[0,2],-1] for j in ch_drt])
        
        Rinf = Rinf[Rinf[:,0].argsort(),:].T
        C0 = C0[C0[:,0].argsort(),:].T
        # Rinf = Rinf[:,:].T
        # C0 = C0[:,:].T

        Rinf_list.append(Rinf[:,Rinf[0,:]<tau_lim[0]])
        C0_list.append(C0[:,C0[0,:]>tau_lim[1]])
# Rinf_list = np.array(Rinf_list)
# C0_list = np.array(C0_list)

fig = plt.figure()
axis1 = fig.add_subplot(121)
axis2 = fig.add_subplot(122)
cmap = plt.get_cmap('rainbow_r')
for i in range(len(Rinf_list)):
    _c = cmap(i/len(Rinf_list))
    axis1.scatter(Rinf_list[i][0,:],Rinf_list[i][1,:], s=2, color = _c)
    axis2.scatter(C0_list[i][0,:],C0_list[i][1,:], s=2, color = _c)

axis1.set_xscale('log')
axis1.set_yscale('log')
axis2.set_xscale('log')
axis2.set_yscale('log')

### Trim mean

In [70]:

from scipy.stats import trim_mean
def bootstrap_trimmed_mean(data, proportion_to_cut=0.1, n_boot=1000):
    n = len(data)
    estimates = []
    for _ in range(n_boot):
        resample = np.random.choice(data, size=n, replace=True)
        resample_sorted = np.sort(resample)
        k = int(n * proportion_to_cut)
        trimmed = resample_sorted[k:n-k]
        estimates.append(np.mean(trimmed))
    return np.mean(estimates), np.std(estimates, ddof=1) 

In [85]:


fig = plt.figure()
axis1 = fig.add_subplot(121)
axis2 = fig.add_subplot(122)
cmap = plt.colormaps.get_cmap('rainbow_r')
for i in range(len(Rinf_list)):
    # if i != 1: continue

    Rinf_poi = Rinf_list[i]
    C0_poi = C0_list[i]

    _color = cmap(i/len(Rinf_list))
    # axis1.scatter(Rinf_list[i,0,:], Rinf_list[i,1,:], s=10, alpha=1, color=_color)
    # axis2.scatter(C0_list[i,0,:], C0_list[i,1,:], s=10, alpha=1, color=_color)
    
    axis1.scatter(np.arange(Rinf_poi.shape[1]), Rinf_poi[1,:], s=10, alpha=1, color=_color)
    axis2.scatter(np.arange(C0_poi.shape[1]), C0_poi[1,:], s=10, alpha=1, color=_color)


    # axis1.scatter(np.arange(Rinf_list.shape[1]), Rinf_list[i,:,1], s=10, alpha=1, color=_color)
    # axis2.scatter(np.arange(C0_list.shape[1]), C0_list[i,:,1], s=10, alpha=1, color=_color)

    # axis.scatter(Rinf_list[i,:,0], Rinf_list[i,:,1], s=10, alpha=1, color='gray')
    # axis.scatter(Rinf_list[i,:,0], Rinf_list[i,:,1], s=10, alpha=1, color='gray')

    
    # trim_mu_Rinf    = trim_mean(Rinf_list[i,:,1], proportiontocut=0.4)
    # trim_mu_C0      = trim_mean(C0_list[i,:,1], proportiontocut=0.4)
    # axis1.axhline(trim_mu_Rinf, color=_color, linestyle='--', alpha=0.8, label=f'Trim Mean')
    # axis2.axhline(trim_mu_C0, color=_color, linestyle='--', alpha=0.8, label=f'Trim Mean')

    trim_R_mu, trim_R_std = bootstrap_trimmed_mean(Rinf_poi[1,:], proportion_to_cut=0.3)
    trim_C_mu, trim_C_std = bootstrap_trimmed_mean(C0_poi[1,:], proportion_to_cut=0.3)

    axis1.axhline(trim_R_mu, color=_color, linestyle='--', alpha=0.8, label=f'Trim Mean')
    axis1.axhline(trim_R_mu+3*trim_R_std, color=_color, linestyle='--', alpha=0.8, label=f'Trim std x 3')
    axis1.axhline(trim_R_mu-3*trim_R_std, color=_color, linestyle='--', alpha=0.8)

    axis2.axhline(trim_C_mu, color=_color, linestyle='--', alpha=0.8, label=f'Trim Mean')
    axis2.axhline(trim_C_mu+3*trim_C_std, color=_color, linestyle='--', alpha=0.8, label=f'Trim std x 3')
    axis2.axhline(trim_C_mu-3*trim_C_std, color=_color, linestyle='--', alpha=0.8)

    

    # axis.plot(Rinf_list[i,:,0], Rinf_list[i,:,1], color=_color)
axis1.set_xlabel('Rinf')
# axis1.set_ylabel('Rinf Value')
axis1.set_title('Rinf Values from DRT Data')
# axis1.set_xscale('log')
axis1.set_yscale('log')

axis2.set_xlabel('C0')
# axis2.set_ylabel('C0 Value')
axis2.set_title('C0 Values from DRT Data')
# axis2.set_xscale('log')
axis2.set_yscale('log')



### Batch

In [124]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.optimize import curve_fit
from sklearn.linear_model import LinearRegression
from scipy.stats import bootstrap
import warnings

warnings.filterwarnings("ignore")


# 2. 分段回归（Piecewise Flat Tail）
def estimate_piecewise_tail(x, y, tail_fraction=1):
    n_tail = int(len(x) * tail_fraction)
    y_tail = y[-n_tail:]
    est = np.mean(y_tail)
    error = np.std(y_tail, ddof=1) / np.sqrt(n_tail)
    return est, error

# 3. 批次平均法（Batch Mean Estimation）
def estimate_batch_mean(x, y, batch_size=10):
    y_batches = [y[i:i+batch_size] for i in range(0, len(y), batch_size) if len(y[i:i+batch_size]) == batch_size]
    batch_means = np.array([np.mean(batch) for batch in y_batches])
    est = np.mean(batch_means[-1])  # 用最后一个 batch
    error = np.std(batch_means, ddof=1) / np.sqrt(len(batch_means))
    return est, error

# 4. Bootstrap 置信区间
def estimate_bootstrap(x, y, tail_fraction=1, n_resamples=1000):
    n_tail = int(len(x) * tail_fraction)
    y_tail = y[-n_tail:]
    res = bootstrap((y_tail,), np.mean, confidence_level=0.95, n_resamples=n_resamples, method='basic')
    est = np.mean(y_tail)
    error = (res.confidence_interval.high - res.confidence_interval.low) / 2
    return est, error



# 6. 扩窗估计（从尾端逐渐扩张）
def estimate_expanding_tail(x, y, min_window=5):
    errors = []
    means = []
    for i in range(min_window, len(y)):
        segment = y[-i:]
        means.append(np.mean(segment))
        errors.append(np.std(segment, ddof=1) / np.sqrt(len(segment)))
    # 选取误差最小的一个（或者误差变化最小的一段）
    idx = np.argmin(errors)
    return means[idx], errors[idx]


# --- 1. 滑动平均尾部估计 ---
def tail_moving_average(x, y, window_ratio=1):
    window_size = int(len(y) * window_ratio)
    y_tail = y[-window_size:]
    # estimate = np.mean(y_tail)
    estimate = trim_mean(y_tail, 0.2)
    error = np.std(y_tail)
    return estimate, error



# --- 3. 滑动线性回归斜率检测 ---
def regression_plateau(x, y, window_ratio=1):
    window_size = int(len(y) * window_ratio)
    x_tail = x[-window_size:].reshape(-1, 1)
    y_tail = y[-window_size:]
    model = LinearRegression().fit(x_tail, y_tail)
    slope = model.coef_[0]
    intercept = model.intercept_
    estimate = np.mean(y_tail)
    error = np.abs(slope) * (x_tail.max() - x_tail.min())
    return estimate, error




# 快速测试各方法
methods = {
    "Piecewise Tail": estimate_piecewise_tail,
    "Batch Mean": estimate_batch_mean,
    "Bootstrap": estimate_bootstrap,
    "Expanding Window": estimate_expanding_tail,
    "Tail Moving Average": tail_moving_average,
    "Regression Plateau": regression_plateau,
}


_day_id = 3
_poi_R = Rinf_list[_day_id][:,:]
_poi_C = C0_list[_day_id][:,:]

x_demo = _poi_C[0,:]
# x_demo = np.arange(100)
y_demo = _poi_C[1,:]



# x_demo = _poi_R[0,:]
# # x_demo = np.arange(100)
# y_demo = _poi_R[1,::-1]


# x_demo = x_demo[50:]
# y_demo = y_demo[50:]
# y_demo = np.log10(y_demo)




fig = plt.figure()
axis = [0]*10
axis[0] = fig.add_subplot(331)
axis[1] = fig.add_subplot(332)
axis[2] = fig.add_subplot(333)
axis[3] = fig.add_subplot(334)
axis[4] = fig.add_subplot(335)
axis[5] = fig.add_subplot(336)
axis[6] = fig.add_subplot(337)

_poi_cnt = 0
for name, func in methods.items():
    est, err = func(x_demo, y_demo)
    axis[_poi_cnt].scatter(x_demo, y_demo)
    axis[_poi_cnt].axhline(est, color='red', linestyle='--')
    axis[_poi_cnt].axhline(est+err, color='red', linestyle='--')
    axis[_poi_cnt].axhline(est-err, color='red', linestyle='--')
    axis[_poi_cnt].set_xscale('log')
    axis[_poi_cnt].set_yscale('log')
    _poi_cnt = _poi_cnt+1
    
    


# results = {}
# for name, func in methods.items():
#     est, err = func(x_demo, y_demo)
#     results[name] = (est, err)

# results



### RANSAC

In [None]:
import numpy as np
from sklearn.linear_model import RANSACRegressor, LinearRegression
import matplotlib.pyplot as plt


_day_id = 9
_poi_R = Rinf_list[_day_id][:,:]
_poi_C = C0_list[_day_id][:,:]

# x_demo = _poi_C[0,:]
# x_demo = np.arange(100)
# x_demo = np.arange(_poi_C.shape[1])+1
# y_demo = _poi_C[1,:]



x_demo = _poi_R[0,:]
# x_demo = np.arange(_poi_R.shape[1])+1
y_demo = _poi_R[1,:]
# y_demo = y_demo[::-1]

# x_demo = x_demo/y_demo
# y_demo = _poi_R[1,:]
# y_demo = y_demo[::-1]



x_demo = np.log10(x_demo)
# y_demo = np.log10(y_demo)




X = x_demo.reshape(-1,1)
# X = np.arange(x_demo.shape[0]).reshape(-1,1)
y = y_demo.reshape(-1,1)

# 普通线性回归
lr = LinearRegression()
lr.fit(X, y)

# RANSAC回归
std_y = np.std(y)
ransac = RANSACRegressor(
    estimator=LinearRegression(),
    min_samples=0.6,
    # residual_threshold=std_y,
    # stop_probability=0.99,
)
ransac.fit(X, y)

inlier_mask = ransac.inlier_mask_

# 可视化比较
plt.figure(figsize=(10, 6))
plt.scatter(X, y, color='blue', label='数据点')
plt.scatter(X[inlier_mask], y[inlier_mask], color='red', label='数据点')
plt.plot(X, lr.predict(X), color='red', label='普通线性回归')
plt.plot(X, ransac.predict(X), color='green', label='RANSAC回归')
plt.legend()
plt.title('普通线性回归 vs RANSAC回归')
plt.show()
# plt.xscale('log')
# plt.yscale('log')
# plt.axis('equal')

  el.exec() if hasattr(el, "exec") else el.exec_()
  el.exec() if hasattr(el, "exec") else el.exec_()
  el.exec() if hasattr(el, "exec") else el.exec_()
  el.exec() if hasattr(el, "exec") else el.exec_()
  el.exec() if hasattr(el, "exec") else el.exec_()
  el.exec() if hasattr(el, "exec") else el.exec_()
  el.exec() if hasattr(el, "exec") else el.exec_()
  el.exec() if hasattr(el, "exec") else el.exec_()
  el.exec() if hasattr(el, "exec") else el.exec_()


In [238]:
_poi_R.shape

(2, 85)

In [106]:

from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error

# 加载加州房价数据集
X, y = x_demo.reshape(-1,1), y_demo.reshape(-1,1)

# 添加一些异常值
# y[:50] += 10

# 划分训练测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 普通线性回归
lr = LinearRegression()
lr.fit(X_train, y_train)
lr_score = mean_squared_error(y_test, lr.predict(X_test))

# RANSAC回归
ransac = RANSACRegressor(random_state=42)
ransac.fit(X_train, y_train)
ransac_score = mean_squared_error(y_test, ransac.predict(X_test))

print(f"普通线性回归MSE: {lr_score:.4f}")
print(f"RANSAC回归MSE: {ransac_score:.4f}")
print(f"内点比例: {ransac.inlier_mask_.mean():.2%}")

# 可视化预测结果对比
plt.figure(figsize=(10, 6))
plt.scatter(y_test, lr.predict(X_test), alpha=0.5, label='普通回归')
plt.scatter(y_test, ransac.predict(X_test), alpha=0.5, label='RANSAC')
plt.plot([min(y_test), max(y_test)], [min(y_test), max(y_test)], 'k--')
plt.xlabel('真实值')
plt.ylabel('预测值')
plt.legend()
plt.title('预测结果对比')
plt.show()

普通线性回归MSE: 0.0069
RANSAC回归MSE: 0.0282
内点比例: 45.00%


  el.exec() if hasattr(el, "exec") else el.exec_()
  el.exec() if hasattr(el, "exec") else el.exec_()
  el.exec() if hasattr(el, "exec") else el.exec_()
  el.exec() if hasattr(el, "exec") else el.exec_()
  el.exec() if hasattr(el, "exec") else el.exec_()
  el.exec() if hasattr(el, "exec") else el.exec_()
  el.exec() if hasattr(el, "exec") else el.exec_()
  el.exec() if hasattr(el, "exec") else el.exec_()
  el.exec() if hasattr(el, "exec") else el.exec_()
  el.exec() if hasattr(el, "exec") else el.exec_()
  el.exec() if hasattr(el, "exec") else el.exec_()
  el.exec() if hasattr(el, "exec") else el.exec_()
  el.exec() if hasattr(el, "exec") else el.exec_()


### RANSAC - Sweep

In [None]:
import numpy as np
from sklearn.linear_model import RANSACRegressor, LinearRegression

def detect_linear_segment(x, y, window_size=20, error_threshold=1e-2):
    x = np.asarray(x)
    y = np.asarray(y)
    errors      = []
    pred_list   = []

    start_idx   = None
    conv_len    = 0

    for i in range(len(x) - window_size):
        # x_win = x[i:].reshape(-1, 1)
        # y_win = y[i:]
        x_win = x[i:i+window_size].reshape(-1, 1)
        y_win = y[i:i+window_size]

        # model = LinearRegression().fit(x_win, y_win)
        # y_pred = model.predict(x_win)


        ransac = RANSACRegressor(
            estimator=LinearRegression(),
            min_samples=0.6
        ).fit(x_win, y_win)
        y_pred = ransac.predict(x_win)
        inlier_mask = ransac.inlier_mask_

        mse = np.mean((1 - y_pred[inlier_mask]/(y_win[inlier_mask]+1e-6))**2)
        # mse = np.mean((1 - y_pred[:]//(y_win[:]))**2)
        # mse = np.mean((y_win - y_pred)**2)
        errors.append(mse)
        # pred_list.append(np.array(x_win.flatten(), y_pred.flatten()))
        pred_list.append([x_win.flatten(), y_pred.flatten()])

        # 判断是否进入线性区域
        if mse < error_threshold:
            start_idx = i
            break

    return pred_list, errors


In [212]:



_day_id = 9
_poi_R = Rinf_list[_day_id][:,:]
_poi_C = C0_list[_day_id][:,:]

# x_demo = _poi_C[0,:]
# x_demo = np.arange(100)
# x_demo = np.arange(_poi_C.shape[1])+1
# y_demo = _poi_C[1,:]



x_demo = _poi_R[0,:]
# x_demo = np.arange(_poi_R.shape[1])+1
y_demo = _poi_R[1,:]
y_demo = y_demo[::-1]

# x_demo = x_demo/y_demo
# y_demo = _poi_R[1,:]
# y_demo = y_demo[::-1]



# x_demo = np.log10(x_demo)
# y_demo = np.log10(y_demo)




X = x_demo.reshape(-1,1)
y = y_demo.reshape(-1,1)

pred_list, errors = detect_linear_segment(X,y,window_size=20)


fig, axis = plt.subplots(1,2)
axis[0].plot(errors)
axis[1].scatter(X,y, s=5)
for i in pred_list:
    axis[1].plot(i[0], i[1])

# axis[1].set_xscale('log')
# axis[1].set_yscale('log')




In [130]:

fig = plt.figure()
axis1 = fig.add_subplot(121)
axis2 = fig.add_subplot(122)
cmap = plt.get_cmap('rainbow_r')
for i in range(len(Rinf_list)):
    if i != _day_id: continue
    _c = cmap(i/len(Rinf_list))
    axis1.scatter(Rinf_list[i][0,:],Rinf_list[i][1,:], s=2, color = _c)
    axis2.scatter(C0_list[i][0,:],C0_list[i][1,:], s=2, color = _c)

axis1.set_xscale('log')
axis1.set_yscale('log')
axis2.set_xscale('log')
axis2.set_yscale('log')

In [217]:
from sklearn.neighbors import KernelDensity
import numpy as np

# 假设 x 是一维的 numpy array，形状为 (n_samples,)
x = X

# 拟合 KDE 模型
kde = KernelDensity(kernel='gaussian', bandwidth=0.2).fit(x)

# 在一系列点上估计 PDF
x_plot = np.linspace(x.min(), x.max(), 1000).reshape(-1, 1)
log_density = kde.score_samples(x_plot)  # 返回 log(P(x))
density = np.exp(log_density)  # 转为 P(x)
density_at_x = np.exp(kde.score_samples(x))  # 每个x点的概率密度
plt.figure()
plt.semilogy(density_at_x)


[<matplotlib.lines.Line2D at 0x20abea6f890>]

### RANSAC Monte Carlo

In [None]:
def featureLumpedR(tau, Rinf, n_monte=100, DebugFlag=False):
    tau_log = np.log10(tau)
    
    tau_mask = (tau_mask>np.percentile(tau_mask, 10)) & (tau_mask<np.percentile(tau_mask, 90))
    Rinf_mask = (Rinf>np.percentile(Rinf, 10)) & (Rinf<np.percentile(Rinf, 90))
    

    # X = tau_log[Rinf_mask & tau_mask].reshape(-1,1)
    X = np.arange(tau_log[Rinf_mask & tau_mask].shape[0]).reshape(-1,1)
    y = Rinf[Rinf_mask & tau_mask].reshape(-1,1)

    # std_y = np.std(y)

    if DebugFlag:
        ransac = RANSACRegressor(
            estimator=LinearRegression(),
            min_samples=0.6,
            # residual_threshold=std_y,
            stop_probability=0.99,
        ).fit(X, y)
        return X, ransac
    else:
        _est_list = []
        for _ in range(n_monte):
            ransac = RANSACRegressor(
                estimator=LinearRegression(),
                min_samples=0.6,
                # residual_threshold=std_y,
                stop_probability=0.99,
            ).fit(X, y)
            _est_list.append(ransac.predict(X[0]))
        est = np.mean(_est_list)
        std = np.std(_est_list)


def bootstrap_RC(y, n_boot=1000):
    y = np.array(y)
    means = []

    for _ in range(n_boot):
        sample = np.random.choice(y, size=len(y), replace=True)
        means.append(np.mean(sample))

    means = np.array(means)
    mean_est = means.mean()
    ci_lower = np.percentile(means, 2.5)
    ci_upper = np.percentile(means, 97.5)

    return mean_est, ci_lower, ci_upper

    

In [416]:


_day_id = 9
_poi_R = Rinf_list[_day_id][:,:]
_poi_C = C0_list[_day_id][:,:]

# x_demo = _poi_C[0,:]
# x_demo = np.arange(100)
# x_demo = np.arange(_poi_C.shape[1])+1
# y_demo = _poi_C[1,:]



# x_demo = _poi_R[0,:]
# x_demo = np.log10(_poi_R[0,:])
x_demo = np.arange(_poi_R.shape[1])+1
y_demo = _poi_R[1,:]
# y_demo = y_demo[::-1]

# mask = select_most_dense_continuous_segment(np.log10(_poi_R[0,:]), window_width=0.2, resolution=1000)

x_mask = (x_demo>np.percentile(x_demo, 10)) & (x_demo<np.percentile(x_demo, 90))
y_mask = (y_demo>np.percentile(y_demo, 10)) & (y_demo<np.percentile(y_demo, 90))
# y_mask = y_demo>0

X = x_demo[x_mask&y_mask].reshape(-1,1)
# X = np.arange(x_demo.shape[0]).reshape(-1,1)
y = y_demo[x_mask&y_mask].reshape(-1,1)





# RANSAC回归
std_y = np.std(y)
ransac = RANSACRegressor(
    estimator=LinearRegression(),
    min_samples=0.6,
    # residual_threshold=std_y,
    stop_probability=0.99,
)
ransac.fit(X, y)

inlier_mask = ransac.inlier_mask_

# 可视化比较
plt.figure(figsize=(10, 6))
plt.scatter(x_demo, y_demo, color='green', label='org')
plt.scatter(X, y, color='blue', label='Trimmed')
plt.scatter(X[inlier_mask], y[inlier_mask], color='red', label='inlier')
# plt.scatter(X[mask], y[mask], color='red', label='数据点')
plt.plot(X, ransac.predict(X), color='green', label='RANSAC')
# plt.plot(x_demo, ransac.predict(x_demo), color='green', label='RANSAC回归')
plt.legend()
# plt.title('普通线性回归 vs RANSAC回归')
plt.show()
# plt.xscale('log')
# plt.yscale('log')
# plt.axis('equal')

### Result

In [981]:
def bootstrap_RANSAC_RC(tau, RC, n_monte=100, RC_flag=0, DebugFlag=False):
    '''==================================================
        Tikhonov DRT Deconvolution
        Parameter: 
            tau:  tau in order
            RC:  data R or data C in order
            RC_flag: 0 for R, 1 for C
            DebugFlag: debug flag
        Returen:
            tau_vec: time domain vector
            x: DRT result
            n_extend: number of extend RLC parameters
        ==================================================
    '''
    tau_log = np.log10(tau)
    
    tau_mask = (tau_log>np.percentile(tau_log, 10)) & (tau_log<np.percentile(tau_log, 90))
    RC_mask = (RC>np.percentile(RC, 10)) & (RC<np.percentile(RC, 90))
    

    # X = tau_log[Rinf_mask & tau_mask].reshape(-1,1)
    _x = np.arange(tau_log[RC_mask & tau_mask].shape[0]).reshape(-1,1)
    _y = RC[RC_mask & tau_mask].reshape(-1,1)

    # std_y = np.std(y)

    if RC_flag: RC_idx = -1
    else: RC_idx = 0

    if DebugFlag:
        ransac = RANSACRegressor(
            estimator=LinearRegression(),
            min_samples=0.6,
            # residual_threshold=std_y,
            stop_probability=0.99,
        ).fit(_x, _y)
        return _x, ransac
    else:
        _est_list = []
        for _ in range(n_monte):
            idx = np.random.choice(len(_x), len(_x), replace=True)
            x_boot, y_boot = _x[idx], _y[idx]
            ransac = RANSACRegressor(
                estimator=LinearRegression(),
                min_samples=0.6,
                stop_probability=0.99,
            ).fit(x_boot, y_boot)
            _est_list.append(ransac.predict(_x[RC_idx].reshape(-1,1)))

        _est_list = np.array(_est_list)
        est = np.mean(_est_list)
        ci_95 = np.percentile(_est_list, [2.5,97.5])

        return est, ci_95

from scipy.stats import trim_mean
def bootstrap_RC(RC, n_boot=100):
    # RC_mask = (RC>np.percentile(RC, 10)) & (RC<np.percentile(RC, 90))
    # y = np.array(RC[RC_mask])
    y = np.array(RC).flatten()
    _est_list = []

    for _ in range(n_boot):
        sample = np.random.choice(y, size=len(y), replace=True)
        # _est_list.append(np.median(sample))
        _est_list.append(trim_mean(sample, proportiontocut=0.1))

    _est_list = np.array(_est_list)
    est = _est_list.mean()
    ci_95 = np.percentile(_est_list, [2.5,97.5])

    return est, ci_95

    

def bayesian_RC(RC, prior_mu=0, prior_sigma=1e3):
    '''==================================================
        Tikhonov DRT Deconvolution
        Parameter: 
            resistance_values (np.ndarray): shape = (n_samples,)
            prior_mu (float): 先验均值
            prior_sigma (float): 先验标准差
            plot (bool): 是否画出后验分布
        Returen:
            posterior_mean (float): 后验均值
            ci_95 (tuple): 95%置信区间 (lower, upper)
            posterior_samples (np.ndarray): 后验采样结果
        ==================================================
    '''

    RC = np.asarray(RC)
    n = len(RC)

    # 观测数据的样本均值和标准差
    sample_mean = np.mean(RC)
    sample_std = np.std(RC, ddof=1)
    obs_var = sample_std**2

    # 贝叶斯更新公式（共轭先验：Normal-Normal）
    post_var = 1 / (n / obs_var + 1 / prior_sigma**2)
    post_mean = post_var * (sample_mean * n / obs_var + prior_mu / prior_sigma**2)
    post_std = np.sqrt(post_var)

    # 后验采样
    posterior_samples = np.random.normal(post_mean, post_std, size=1000)
    ci_95 = np.percentile(posterior_samples, [2.5, 97.5])


    return post_mean, tuple(ci_95)



#### Single Test

In [982]:


_day_id = 5
_poi_R = Rinf_list[_day_id][:,:]
_poi_C = C0_list[_day_id][:,:]

# x_demo = _poi_R[0,:]
# x_demo = np.arange(_poi_R.shape[1])+1
# y_demo = _poi_R[1,:]


# x_demo = _poi_C[0,:]
x_demo = np.arange(_poi_C.shape[1])+1
# y_demo = np.log10(_poi_C[1,:])
y_demo = _poi_C[1,:]


In [983]:

# est_ransac, ci_95_ransac = bootstrap_RANSAC_RC(_poi_R[0,:], _poi_R[1,:], n_monte=3, RC_flag=0, DebugFlag=False)
# est_ransac_100, ci_95_ransac_100 = bootstrap_RANSAC_RC(_poi_R[0,:], _poi_R[1,:], n_monte=100, RC_flag=0, DebugFlag=False)
# est_boot, ci_95_boot = bootstrap_RC(_poi_R[1,:], n_boot=100)
# est_baye, ci_95_baye = bayesian_RC(_poi_R[1,:], prior_mu=1e3, prior_sigma=1e3)



est_ransac, ci_95_ransac = bootstrap_RANSAC_RC(x_demo, y_demo, n_monte=3, RC_flag=1, DebugFlag=False)
est_ransac_100, ci_95_ransac_100 = bootstrap_RANSAC_RC(x_demo, y_demo, n_monte=100, RC_flag=1, DebugFlag=False)
est_boot, ci_95_boot = bootstrap_RC(y_demo, n_boot=100)
est_baye, ci_95_baye = bayesian_RC(y_demo, prior_mu=1e3, prior_sigma=1e3)


In [987]:
# x_demo = _poi_R[0,:]
# # x_demo = np.arange(_poi_R.shape[1])+1
# y_demo = _poi_R[1,:]


# 可视化比较
fig, axis = plt.subplots(2,2,figsize=(12, 6))
fig.suptitle(f"{ele_id}_ch{ch_id:03d}_day{_day_id:02d}")
axis = axis.flatten()

axis[0].scatter(x_demo, y_demo, color='blue', label='org')
axis[0].hlines(est_ransac, x_demo.min(),x_demo.max(), color = 'red', linestyles='--' )
axis[0].hlines(ci_95_ransac[0], x_demo.min(),x_demo.max(), color = 'red', linestyles='--' )
axis[0].hlines(ci_95_ransac[1], x_demo.min(),x_demo.max(), color = 'red', linestyles='--' )
axis[0].set_title(f"RANSAC_#3")

axis[1].scatter(x_demo, y_demo, color='blue', label='org')
axis[1].hlines(est_ransac_100, x_demo.min(),x_demo.max(), color = 'red', linestyles='--' )
axis[1].hlines(ci_95_ransac_100[0], x_demo.min(),x_demo.max(), color = 'red', linestyles='--' )
axis[1].hlines(ci_95_ransac_100[1], x_demo.min(),x_demo.max(), color = 'red', linestyles='--' )
axis[1].set_title(f"RANSAC_#100")


axis[2].scatter(x_demo, y_demo, color='blue', label='org')
axis[2].hlines(est_boot, x_demo.min(),x_demo.max(), color = 'orange', linestyles='--' )
axis[2].hlines(ci_95_boot[0], x_demo.min(),x_demo.max(), color = 'orange', linestyles='--' )
axis[2].hlines(ci_95_boot[1], x_demo.min(),x_demo.max(), color = 'orange', linestyles='--' )
axis[2].set_title(f"Boot_trimmean")

axis[3].scatter(x_demo, y_demo, color='blue', label='org')
axis[3].hlines(est_baye, x_demo.min(),x_demo.max(), color = 'orange', linestyles='--' )
axis[3].hlines(ci_95_baye[0], x_demo.min(),x_demo.max(), color = 'orange', linestyles='--' )
axis[3].hlines(ci_95_baye[1], x_demo.min(),x_demo.max(), color = 'orange', linestyles='--' )
axis[3].set_title(f"Bayesian Estimation")


# axis[0].set_xscale('log')
# axis[1].set_xscale('log')
# axis[2].set_xscale('log')

# axis[0].set_yscale('log')
# axis[1].set_yscale('log')
# axis[2].set_yscale('log')
# axis[3].set_yscale('log')

Text(0.5, 1.0, 'Bayesian Estimation')

#### Batch Run

In [988]:

fig, axis = plt.subplots(2,2,figsize=(12, 6))
fig.suptitle(f"{ele_id}_ch{ch_id:03d}_day{_day_id:02d}")
axis = axis.flatten()

cmap = plt.get_cmap("rainbow_r")

for i in range(len(Rinf_list)):
    _day_id = i
    _poi_R = Rinf_list[_day_id][:,:]
    _poi_C = C0_list[_day_id][:,:]

    # x_demo = _poi_R[0,:]
    # # x_demo = np.arange(_poi_R.shape[1])+1
    # y_demo = _poi_R[1,:]


    # x_demo = _poi_C[0,:]
    x_demo = np.arange(_poi_C.shape[1])+1
    y_demo = _poi_C[1,:]


    # est_ransac, ci_95_ransac = bootstrap_RANSAC_RC(_poi_R[0,:], _poi_R[1,:], n_monte=3, RC_flag=0, DebugFlag=False)
    # est_ransac_100, ci_95_ransac_100 = bootstrap_RANSAC_RC(_poi_R[0,:], _poi_R[1,:], n_monte=100, RC_flag=0, DebugFlag=False)
    # est_boot, ci_95_boot = bootstrap_RC(_poi_R[1,:], n_boot=100)
    # est_baye, ci_95_baye = bayesian_RC(_poi_R[1,:], prior_mu=1e3, prior_sigma=1e3)



    est_ransac, ci_95_ransac = bootstrap_RANSAC_RC(x_demo, y_demo, n_monte=3, RC_flag=1, DebugFlag=False)
    est_ransac_100, ci_95_ransac_100 = bootstrap_RANSAC_RC(x_demo, y_demo, n_monte=100, RC_flag=1, DebugFlag=False)
    est_boot, ci_95_boot = bootstrap_RC(y_demo, n_boot=100)
    est_baye, ci_95_baye = bayesian_RC(y_demo, prior_mu=5e-8, prior_sigma=1e-8)




    _c = cmap(i/len(Rinf_list))
    axis[0].scatter(x_demo, y_demo, s=2, color=_c, label='org')
    axis[0].scatter(x_demo[-1], est_ransac, s=20, color=_c, label='org')
    # axis[0].hlines(est_ransac, x_demo.min(),x_demo.max(), color = _c, linestyles='--' )
    # axis[0].hlines(ci_95_ransac[0], x_demo.min(),x_demo.max(), color = 'red', linestyles='--' )
    # axis[0].hlines(ci_95_ransac[1], x_demo.min(),x_demo.max(), color = 'red', linestyles='--' )
    axis[0].set_title(f"RANSAC_#3")

    axis[1].scatter(x_demo, y_demo, s=2, color=_c, label='org')
    axis[1].scatter(x_demo[-1], est_ransac_100, s=20, color=_c, label='org')
    # axis[1].hlines(est_ransac_100, x_demo.min(),x_demo.max(), color = _c, linestyles='--' )
    # axis[1].hlines(ci_95_ransac_100[0], x_demo.min(),x_demo.max(), color = 'red', linestyles='--' )
    # axis[1].hlines(ci_95_ransac_100[1], x_demo.min(),x_demo.max(), color = 'red', linestyles='--' )
    axis[1].set_title(f"RANSAC_#100")


    axis[2].scatter(x_demo, y_demo, s=2, color=_c, label='org')
    axis[2].scatter(x_demo[-1], est_boot, s=20, color=_c, label='org')
    # axis[2].hlines(est_boot, x_demo.min(),x_demo.max(), color = _c, linestyles='--' )
    # axis[2].hlines(ci_95_boot[0], x_demo.min(),x_demo.max(), color = 'orange', linestyles='--' )
    # axis[2].hlines(ci_95_boot[1], x_demo.min(),x_demo.max(), color = 'orange', linestyles='--' )
    axis[2].set_title(f"Boot_trimmean")

    axis[3].scatter(x_demo, y_demo, s=2, color=_c, label='org')
    axis[3].scatter(x_demo[-1], est_baye, s=20, color=_c, label='org')
    # axis[3].hlines(est_baye, x_demo.min(),x_demo.max(), color = _c, linestyles='--' )
    # axis[3].hlines(ci_95_baye[0], x_demo.min(),x_demo.max(), color = 'orange', linestyles='--' )
    # axis[3].hlines(ci_95_baye[1], x_demo.min(),x_demo.max(), color = 'orange', linestyles='--' )
    axis[3].set_title(f"Bayesian Estimation")


    # axis[0].set_xscale('log')
    # axis[1].set_xscale('log')
    # axis[2].set_xscale('log')
    # axis[3].set_xscale('log')

    axis[0].set_yscale('log')
    axis[1].set_yscale('log')
    axis[2].set_yscale('log')
    axis[3].set_yscale('log')


## Peak Deconvolution

### Load Data

In [952]:

rootPath = "D:/Baihm/EISNN/Archive/"
ele_list = SearchELE(rootPath)
DATASET_SUFFIX = "Outlier_Ver04"

# rootPath = "D:/Baihm/EISNN/Archive_New/"
# ele_list = SearchELE(rootPath)
# DATASET_SUFFIX = "Outlier_Ver04"

# rootPath = "D:/Baihm/EISNN/Invivo/"
# ele_list = SearchELE(rootPath, re.compile(r"(.+?)_Ver02"))
# DATASET_SUFFIX = "Outlier_Ver04"


n_ele = len(ele_list)
logger.info(f"Search in {rootPath} and find {n_ele:03d} electrodes")

[32m2025-07-31 06:45:59.986[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m15[0m - [1mSearch in D:/Baihm/EISNN/Archive/ and find 218 electrodes[0m


In [953]:
# ele_id = '06017758'
# ch_id = 96     # Perfect of Perfect

# ele_id = '09290511'
# ch_id = 4    # Up & Down, 2 outliers

ele_id = '02067447'
ch_id = 68  # Short all the time




# ele_id = '01037160'
# ch_id = 20  # Normal to Short, Same to GPR  
# ch_id = 89  # Same to GPR  
# ch_id = 7  # Normal Example


# ele_id = '05087163'
# ch_id = 7   # one outlier
# ch_id = 50  # No outlier but in two Phases
# ch_id = 55  # One outlier &wired end point
# ch_id = 114 # Open Circuit with on outpler





# ele_id = '11057712'
# ch_id = 106    # Very Good Electrode with 1 hidden outlier, and one phase shift



# ele_id = '10057084'
# ch_id = 16    # Totaly Mess
# ch_id = 18    # Totaly Mess




Loe_Data = Load_Single(ele_id, rootPath, DATA_SUFFIX = f"{DATASET_SUFFIX}_DRTLoe_Ver02")
Loe_Data = Loe_Data[ch_id]



DRTdata_list    = Loe_Data['DRTlist']  
EISdata_list    = Loe_Data['EISlist']  
Loess_list      = Loe_Data['Loesslist']
eis_seq         = Loe_Data['eis_seq']     
eis_short       = Loe_Data['seq_short']  



  data_pt = torch.load(fd_pt)


In [956]:

Rinf_list = []
C0_list = []
TRC_list = []
# tau_lim = [1/2/np.pi/1e6,1/2/np.pi/10]
# tau_lim = [1e-7,1/2/np.pi/10]
tau_lim = [1e-7,5e-2]
for i in range(len(DRTdata_list)):
    if i in eis_seq:
        ch_drt  = DRTdata_list[i]
        Rinf    = np.array([j[[0,1],0] for j in ch_drt])
        C0      = np.array([j[[0,2],-1] for j in ch_drt])
        
        Rinf = Rinf[Rinf[:,0].argsort(),:].T
        C0 = C0[C0[:,0].argsort(),:].T
        # Rinf = Rinf[:,:].T
        # C0 = C0[:,:].T

        _drt_all = np.concatenate(ch_drt, axis=1)
        _drt_all = _drt_all[:,_drt_all[0,:].argsort()]

        Rinf_list.append(Rinf[:,Rinf[0,:]<tau_lim[0]])
        C0_list.append(C0[:,C0[0,:]>tau_lim[1]])
        TRC_list.append(_drt_all)


# Rinf_list = np.array(Rinf_list)
# C0_list = np.array(C0_list)

fig = plt.figure()
axis1 = fig.add_subplot(131)
axis2 = fig.add_subplot(132)
axis3 = fig.add_subplot(133)
cmap = plt.get_cmap('rainbow_r')
for i in range(len(Rinf_list)):
    _c = cmap(i/len(Rinf_list))
    axis1.scatter(Rinf_list[i][0,:],Rinf_list[i][1,:], s=2, color = _c)
    # axis2.scatter(TRC_list[i][1,:],TRC_list[i][2,:], s=2, color = _c)
    axis2.scatter(TRC_list[i][0,:],TRC_list[i][1,:], s=2, color = _c)
    axis3.scatter(C0_list[i][0,:],C0_list[i][1,:], s=2, color = _c)

axis1.set_xscale('log')
axis1.set_yscale('log')
axis2.set_xscale('log')
axis2.set_yscale('log')
axis3.set_xscale('log')
axis3.set_yscale('log')

### Preprocess

In [949]:
from statsmodels.nonparametric.smoothers_lowess import lowess

RC_flatten = []

DRT_flatten = []
DRT_flatten_lowess = []
DRT_lowess_flatten = []

for i in range(len(TRC_list)):
    _day_id = i
    _poi_R = Rinf_list[_day_id][:,:]
    _poi_C = C0_list[_day_id][:,:]
    _poi_drt = TRC_list[_day_id][:,:] 

    
    est_R, ci_95_R = bayesian_RC(_poi_R[1,:], prior_mu=1e3, prior_sigma=1e3)
    est_C, ci_95_C = bayesian_RC(_poi_C[1,:], prior_mu=5e-8, prior_sigma=1e-8)
    RC_flatten.append([est_R, est_C])
    


    _tau        = _poi_drt[0,:]
    _drtRC        = _poi_drt[1,:] / _poi_drt[2,:]
    _drtRC_flat   = _poi_drt[1,:] / _poi_drt[2,:] / ((est_R*est_R/_tau) + (_tau/est_C/est_C))

    _tau_log = np.log(_tau)
    _drtRC_log = np.log(_drtRC)
    _drRC_flat_log = np.log(_drtRC_flat)


    _drtRC_log_loess = lowess(_drtRC_log, _tau_log, frac=0.1, it=3, return_sorted=False)
    _drtRC_flat_log_loess = lowess(_drRC_flat_log, _tau_log, frac=0.1, it=3, return_sorted=False)

    _drtRC_loess_flat = np.exp(_drtRC_log_loess) / ((est_R*est_R/_tau) + (_tau/est_C/est_C))
    _drtRC_flat_loess = np.exp(_drtRC_flat_log_loess) 

    DRT_flatten.append(np.array([_tau, _drtRC_flat]))
    DRT_lowess_flatten.append(np.array([_tau, _drtRC_loess_flat]))
    DRT_flatten_lowess.append(np.array([_tau, _drtRC_flat_loess]))




In [951]:

fig, axis = plt.subplots(1,3)
axis = axis.flatten()
cmap = plt.get_cmap('rainbow_r')

axis[0].set_title("DRT")
axis[1].set_title("Flat + Loess")
axis[2].set_title("Loess + Flat")

for i in range(len(DRT_flatten)):
    # if i != 4: continue
    _c = cmap(i/len(DRT_flatten))
    axis[0].scatter(DRT_flatten[i][0,:],DRT_flatten[i][1,:], color=_c, s=2)
    # axis[0].vlines(RC_flatten[i][0]*RC_flatten[i][1],DRT_flatten[i][1,:].min(),DRT_flatten[i][1,:].max(), color=_c)

    axis[1].scatter(DRT_flatten_lowess[i][0,:],DRT_flatten_lowess[i][1,:], color=_c, s=2)
    # axis[1].vlines(RC_flatten[i][0]*RC_flatten[i][1],DRT_flatten_lowess[i][1,:].min(),DRT_flatten_lowess[i][1,:].max(), color=_c)

    axis[2].scatter(DRT_lowess_flatten[i][0,:],DRT_lowess_flatten[i][1,:], color=_c, s=2)
    # axis[2].vlines(RC_flatten[i][0]*RC_flatten[i][1],DRT_lowess_flatten[i][1,:].min(),DRT_lowess_flatten[i][1,:].max(), color=_c)

axis[0].set_xscale('log')
axis[0].set_yscale('log')

axis[1].set_xscale('log')
axis[1].set_yscale('log')
axis[1].sharex(axis[0])
axis[1].sharey(axis[0])

axis[2].set_xscale('log')
axis[2].set_yscale('log')
axis[2].sharex(axis[0])
axis[2].sharey(axis[0])




### Draft

In [371]:

from statsmodels.nonparametric.smoothers_lowess import lowess
def feature_Peak(DRTdata_list):
    '''==================================================
        Calculate Rinf from DRTdata_list
        Parameter: 
            DRTdata_list: list of DRT data - day x sample x [T,R,C]
        Return:
            Rinf_list: list of Rinf values
        ==================================================
    '''
    PeakR_list = []
    PeakC_list = []
    for ch_drt in DRTdata_list:
        PeakR = np.concatenate([i[1,1:-1] for i in ch_drt])
        PeakC = np.concatenate([i[2,1:-1] for i in ch_drt])
        
        PeakR_list.append(PeakR)
        PeakC_list.append(PeakC)

    # return np.concatenate(Rinf_list)
    return PeakR_list, PeakC_list



def DRT_Loess(DRTdata):
    '''==================================================
        DRT Analysis with Loess
        Parameter: 
            DRTdata:    list of tuples (tau_i, R_i, C_i) for each sample
        Returen:
            DRTdata_Loess:    Loess smoothed DRT data
        ==================================================
    '''

    _tau_i  = np.concatenate([i[0,1:-1] for i in DRTdata])
    _R_i    = np.concatenate([i[1,1:-1] for i in DRTdata])
    _C_i    = np.concatenate([i[2,1:-1] for i in DRTdata])
    # _tau_i  = np.concatenate([i[0,2:-2] for i in DRTdata])
    # _R_i    = np.concatenate([i[1,2:-2] for i in DRTdata])
    # _C_i    = np.concatenate([i[2,2:-2] for i in DRTdata])

    # _order  = _tau_i.argsort()
    # _tau_i  = _tau_i[_order]
    # _R_i    = _R_i[_order]
    # _C_i    = _C_i[_order]

    x_log = np.log(_tau_i)
    y_log = np.log(_R_i)-np.log(_C_i)
    
    y_log_smooth = lowess(y_log, x_log, frac=0.1, it=3, return_sorted=False)


    R_loess         = np.exp((x_log + y_log_smooth)/2)
    tau_loess       = _tau_i
    C_loess         = _tau_i / R_loess
    DRTdata_Loess   =  np.array([tau_loess, R_loess, C_loess])

    return DRTdata_Loess

In [360]:

PeakR_list, PeakC_list = feature_Peak([DRTdata_list[i] for i in eis_seq])

fig = plt.figure()
axis = fig.add_subplot(111)
axis.scatter(PeakR_list[0]*PeakC_list[0],PeakR_list[0]/PeakC_list[0], s=2)

axis.set_xscale('log')
axis.set_yscale('log')

# x = PeakR_list[0]*PeakC_list[0]
# y = PeakR_list[0]/PeakC_list[0]


In [410]:

DRTdata_Loess = [DRT_Loess(DRTdata_list[i]) for i in eis_seq]


fig = plt.figure()
axis = fig.add_subplot(111)
# axis.scatter(PeakR_list[0]*PeakC_list[0],PeakR_list[0]/PeakC_list[0], s=2, color='gray')
# axis.scatter(DRTdata_Loess[0][1,:]*DRTdata_Loess[0][2,:],DRTdata_Loess[0][1,:]/DRTdata_Loess[0][2,:], s=2)
# axis.scatter(PeakR_list[0]/PeakC_list[0],PeakR_list[0]*PeakC_list[0], s=2, color='gray')
# axis.scatter(DRTdata_Loess[0][1,:]/DRTdata_Loess[0][2,:],DRTdata_Loess[0][1,:]*DRTdata_Loess[0][2,:], s=2)
# axis.scatter(PeakR_list[0]*PeakC_list[0],PeakR_list[0], s=2, color='gray')
# axis.scatter(DRTdata_Loess[0][0,:],DRTdata_Loess[0][1,:], s=2)

axis.scatter(PeakR_list[0]*PeakC_list[0],np.abs(PeakR_list[0] + 1j/PeakC_list[0]), s=2, color='gray')
# axis.scatter(DRTdata_Loess[0][0,:],DRTdata_Loess[0][1,:], s=2)


axis.set_xscale('log')
axis.set_yscale('log')

x = DRTdata_Loess[0][1,:]*DRTdata_Loess[0][2,:]
y = DRTdata_Loess[0][1,:]/DRTdata_Loess[0][2,:]


In [379]:
import numpy as np
from scipy.interpolate import interp1d

# 原始数据 (x, y)，确保是 ndarray 且去除 0

x_log = np.log10(x)
y_log = np.log10(y)
mask = np.isfinite(x_log) & np.isfinite(y_log)
x_log, y_log = x_log[mask], y_log[mask]

# 重采样
x_uniform = np.linspace(x_log.min(), x_log.max(), 1000)
f_interp = interp1d(x_log, y_log, kind='linear', fill_value='extrapolate')
y_uniform = f_interp(x_uniform)


fig = plt.figure()
axis = fig.add_subplot(111)
axis.scatter(x_uniform,y_uniform, s=2)

# axis.set_xscale('log')
# axis.set_yscale('log')


<matplotlib.collections.PathCollection at 0x218e81dfc50>

In [388]:
import numpy as np
import matplotlib.pyplot as plt
from lmfit.models import GaussianModel

# 合成示例数据
# x = np.linspace(0, 20, 500)
# np.random.seed(0)
# y = (1.5 * np.exp(-(x - 6)**2 / (2 * 1.2**2)) +
#      2.0 * np.exp(-(x - 13)**2 / (2 * 1.5**2)) +
#      0.05 * np.random.randn(len(x)))


# x = np.log(PeakR_list[0]*PeakC_list[0])
# y = np.log(PeakR_list[0]/PeakC_list[0])
# y = y[x.argsort()]
# x = x[x.argsort()]

x = x_uniform
y = y_uniform
# x = x_uniform
# y = np.power(10,y_uniform)

# 模型定义（两个高斯）
model = GaussianModel(prefix='g1_') + GaussianModel(prefix='g2_')

# 参数初始值估计
# params = model.make_params()
# params['g1_center'].set(-13)
# params['g1_sigma'].set(1)
# params['g1_amplitude'].set(14)

# params['g2_center'].set(13)
# params['g2_sigma'].set(1)
# params['g2_amplitude'].set(2)

# 拟合
result = model.fit(y, params, x=x)

# 拟合结果和每个成分画图
components = result.eval_components(x=x)

plt.plot(x, y, 'b', label='data')
plt.plot(x, result.best_fit, 'r-', label='fit')
plt.plot(x, components['g1_'], 'g--', label='g1')
plt.plot(x, components['g2_'], 'm--', label='g2')
plt.legend()
plt.show()


  warn("Using UFloat objects with std_dev==0 may give unexpected results.")


In [392]:
from sklearn.mixture import GaussianMixture

X = x_uniform.reshape(-1, 1)
Y = y_uniform.reshape(-1, 1)

gmm = GaussianMixture(n_components=3).fit(X, Y)
# 拟合值
y_fit = np.exp(gmm.score_samples(X))

plt.plot(x_uniform, y_fit, 'b', label='data')
plt.plot(x_uniform, y_uniform, 'b', label='data')
plt.legend()
plt.show()


# Data Summary

## Definition

In [411]:
def SearchELE(rootPath, ele_pattern = re.compile(r"(.+?)_归档")):
    '''==================================================
        Search all electrode directories in the rootPath
        Parameter: 
            rootPath: current search path
            ele_pattern: electrode dir name patten
        Returen:
            ele_list: list of electrode directories
        ==================================================
    '''
    ele_list = []
    for i in os.listdir(rootPath):
        _path = os.path.join(rootPath, i)
        if os.path.isdir(_path):
            match_ele = ele_pattern.match(i)
            if match_ele:
                ele_list.append([_path, match_ele.group(1)])
            else:
                ele_list.extend(SearchELE(_path, ele_pattern))

    return ele_list



def Load_Single(ele_id, rootPath, DATA_SUFFIX):
    fd_pt = os.path.join(f"{rootPath}/{ele_id}_归档", DATA_SUFFIX, f"{ele_id}_{DATA_SUFFIX}.pt")
    if not os.path.exists(fd_pt):
        logger.warning(f"{fd_pt} does not exist")
        return None
    data_pt = torch.load(fd_pt, weights_only=False)
    _data_group = data_pt["data_group"]

    return _data_group



def DRT_Plot_Batch(fig, DRTdata_list, EISdata_list, Loess_list, eis_seq):
    
    axis = [0] * 6
    axis[0] = fig.add_subplot(2,3,1)    # Nyquist Plot
    axis[1] = fig.add_subplot(2,3,2)    # Bode Plot (Magnitude)
    axis[2] = fig.add_subplot(2,3,3)    # Bode Plot (Phase)
    axis[3] = fig.add_subplot(2,3,4)    # Text
    axis[4] = fig.add_subplot(2,3,5)    # DRT (RC)
    axis[5] = fig.add_subplot(2,3,6)    # DRT (Rτ)


    text_axis = axis[3]
    text_axis.axis('off')

    _s       = 2
    _alpha   = 0.7

    cmap = plt.colormaps.get_cmap('rainbow_r')
    for i in range(len(EISdata_list)):
        if i in eis_seq:
            ch_eis      = EISdata_list[i][0]
            ch_drt      = DRTdata_list[i]
            ch_loess    = Loess_list[i]

            # ch_R    = np.array([i[1:,0] for i in ch_drt])
            # ch_C    = np.array([i[1:,-1] for i in ch_drt])
            ch_R    = np.concatenate([i[1,:] for i in ch_drt])
            ch_C    = np.concatenate([i[2,:] for i in ch_drt])

            _color  = cmap(i/len(EISdata_list))

            axis[0].plot(ch_eis[1,:], -ch_eis[2,:], color = _color, linewidth=2)
            axis[1].loglog(ch_eis[0,:], np.abs(ch_eis[1,:]+1j*ch_eis[2,:]), color = _color, linewidth=2)
            axis[2].semilogx(ch_eis[0,:], np.rad2deg(np.angle(ch_eis[1,:]+1j*ch_eis[2,:])), color = _color, linewidth=2)

            axis[4].scatter(ch_R, ch_C, s=_s, alpha=_alpha, color=_color, label=f'ch[{i:03d}]')
            axis[5].scatter(ch_loess[0,:], ch_loess[1,:], s=_s, alpha=_alpha, color=_color, label=f'ch[{i:03d}]')



    axis[0].set_aspect('equal', adjustable='datalim')
    axis[4].set_xscale('log')
    axis[4].set_yscale('log')
    axis[5].set_xscale('log')
    axis[5].set_yscale('log')


    return text_axis




## Load Data

In [412]:

rootPath = "D:/Baihm/EISNN/Archive/"
ele_list = SearchELE(rootPath)
DATASET_SUFFIX = "Outlier_Ver04"

# rootPath = "D:/Baihm/EISNN/Archive_New/"
# ele_list = SearchELE(rootPath)
# DATASET_SUFFIX = "Outlier_Ver04"

# rootPath = "D:/Baihm/EISNN/Invivo/"
# ele_list = SearchELE(rootPath, re.compile(r"(.+?)_Ver02"))
# DATASET_SUFFIX = "Outlier_Ver04"


n_ele = len(ele_list)
logger.info(f"Search in {rootPath} and find {n_ele:03d} electrodes")

[32m2025-07-23 21:11:15.853[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m15[0m - [1mSearch in D:/Baihm/EISNN/Archive/ and find 218 electrodes[0m


In [414]:
ele_id = '06017758'
ch_id = 96     # Perfect of Perfect

# ele_id = '09290511'
# ch_id = 13    # Up & Down, 2 outliers
# ch_id = 21    # Normal + 2 outlier
# ch_id = 41    # Normal + 2 outlier - *(Hard To Tell)
# ch_id = 79    # 3-class, What a mess


# ele_id = '01037160'
# ch_id = 20  # Normal to Short, Same to GPR  
# ch_id = 89  # Same to GPR  
# ch_id = 7  # Normal Example


# ele_id = '05087163'
# ch_id = 7   # one outlier
# ch_id = 50  # No outlier but in two Phases
# ch_id = 55  # One outlier &wired end point
# ch_id = 114 # Open Circuit with on outpler



# ele_id = '02067447'
# ch_id = 68  # Short all the time


# ele_id = '11057712'
# ch_id = 106    # Very Good Electrode with 1 hidden outlier, and one phase shift



# ele_id = '10057084'
# ch_id = 16    # Totaly Mess
# ch_id = 18    # Totaly Mess




Loe_Data = Load_Single(ele_id, rootPath, DATA_SUFFIX = f"{DATASET_SUFFIX}_DRTLoe_Ver02")
Loe_Data = Loe_Data[ch_id]



DRTdata_list    = Loe_Data['DRTlist']  
EISdata_list    = Loe_Data['EISlist']  
Loess_list      = Loe_Data['Loesslist']
eis_seq         = Loe_Data['eis_seq']     
eis_short       = Loe_Data['seq_short']  


# fig = plt.figure(figsize=(16, 9), constrained_layout=True)
# text_axis = DRT_Plot_Batch(fig, DRTdata_list, EISdata_list, Loess_list, eis_seq)

