In [None]:
"""
# coding: utf-8
@author: Yuhao Zhang
last updated: 05/30/2025
data from: Xinchao Chen
"""
%reset -f

data_path = '/data1/zhangyuhao/xinchao_data/NP2/test/control/Mice_1411_3/20250109_control_Mice_1411_3_VN_head_fixation'
save_path = "/home/zhangyuhao/Desktop/Result/ET/Rastermap/NP2/demo/20250308_control_Mice_1423-15-VN-head_fixation_demo"

import numpy as np
import matplotlib.pyplot as plt
from tick.hawkes import SimuHawkes, HawkesExpKern, HawkesKernelExp
from statsmodels.tsa.stattools import acf
from statsmodels.tsa.seasonal import STL

# Simulation
P.P.标准泊松过程（随机均匀）<br>
Periodic P.P.周期性泊松过程（强度随时间周期性变化）<br>
Hawkes P. Hawkes过程（自激励过程，事件聚集）<br>

In [None]:
# 生成泊松过程（恒定速率）
def generate_poisson(lamb, T):
    n = np.random.poisson(lamb * T)
    return np.sort(np.random.uniform(0, T, n))

# 生成周期性泊松过程（速率含周期性）
def generate_periodic_poisson(base_lamb, amplitude, period, T):
    t = np.linspace(0, T, 1000)
    lamb_t = base_lamb + amplitude * np.sin(2 * np.pi * t / period)
    events = []
    current_time = 0
    while current_time < T:
        current_lamb = base_lamb + amplitude * np.sin(2 * np.pi * current_time / period)
        next_time = current_time + np.random.exponential(1 / current_lamb)
        if next_time < T:
            events.append(next_time)
        current_time = next_time
    return np.array(events)

def Periodic_Poisson_Version2(base_lamb, amplitude, period, T):
    # 参数设置
    T = 0.01  # 周期 (100 Hz)
    mu = 0.5  # 泊松背景率
    A = 0.8  # 周期性强度
    f0 = 100  # 周期频率 (Hz)
    phi = 0  # 相位
    # 时间参数
    T_max = 1  # 模拟时间长度
    dt = 0.001
    time = np.arange(0, T_max, dt)
    # 周期性过程
    periodic_spikes = (np.sin(2 * np.pi * f0 * time + phi) > 0.99).astype(int)
    # 泊松过程
    poisson_spikes = np.random.rand(len(time)) < mu * dt
    # 组合发放
    spikes = np.logical_or(periodic_spikes, poisson_spikes)
    # 事件时间
    event_times = time[spikes]
    isi = np.diff(event_times)
    return event_times, isi

# 生成Hawkes过程（自激励）
def generate_hawkes(mu, alpha, beta, T):
    # 创建指数衰减核 (alpha * beta * exp(-beta * t))
    kernel = HawkesKernelExp(alpha * beta, beta)
    
    hawkes = SimuHawkes(
        baseline=[mu], 
        kernels=[[kernel]],  # 注意这里是 kernels (复数)
        end_time=T, 
        verbose=False
    )
    hawkes.simulate()
    return hawkes.timestamps[0]

def Hawkes_Version2():
    # 参数设置
    mu = 0.1  # 基础发放率
    alpha = 0.8  # 激发强度
    beta = 1.5  # 衰减速率
    T = 100  # 模拟时长

    # 模拟事件发生
    t_events = []
    t = 0
    while t < T:
        lambda_t = mu
        for t_i in t_events:
            lambda_t += alpha * np.exp(-beta * (t - t_i))
        
        # 生成下一个事件时间
        dt = -np.log(np.random.rand()) / lambda_t
        t += dt
        if t < T:
            t_events.append(t)
    return np.array(t_events)

# 参数设置
T = 100  # 时间范围
np.random.seed(42)

# 生成三种过程的数据
events_poisson = generate_poisson(lamb=0.5, T=T)
events_periodic = generate_periodic_poisson(base_lamb=0.5, amplitude=0.2, period=10, T=1000)
events_hawkes = generate_hawkes(mu=0.1, alpha=0.5, beta=1.0, T=T)

# Raster Plot

In [None]:
def plot_events(events, title):
    plt.figure(figsize=(10, 2))
    plt.eventplot(events, lineoffsets=0.5, linelengths=0.5)
    plt.title(title)
    plt.xlabel("Time")
    plt.yticks([])
    plt.show()

plot_events(events_poisson, "Poisson Process")
plot_events(events_periodic, "Periodic Poisson Process")
plot_events(events_hawkes, "Hawkes Process")

# Plot ISI

In [None]:
# 绘制 ISI 分布
plt.hist(isi * 1000, bins=100, alpha=0.7)
plt.xlabel('Inter-spike Interval (ms)')
plt.ylabel('Counts')
plt.title('Periodic + Poisson Process ISI Distribution')
plt.show()

# Compute ACF
泊松过程：ACF应接近0（无自相关）<br>
周期性过程：可能显示微弱周期性<br>
Hawkes过程：短时滞后上显著非零（聚集性）

In [None]:
# 计算自相关函数（ACF）
def plot_acf(events, max_lag=10):  # 将max_lag从20减少到10
    if len(events) < 2:
        print("不足的事件数量来计算ACF")
        return
    
    intervals = np.diff(events)
    
    # 确保最大滞后不超过间隔序列长度
    max_lag = min(max_lag, len(intervals) - 1)
    
    if max_lag <= 0:
        print("不足的事件间隔来计算ACF")
        return
    
    acf_values = acf(intervals, nlags=max_lag, fft=True)
    plt.stem(range(max_lag+1), acf_values)
    plt.title("ACF of Inter-Event Times")
    plt.xlabel("Lag")
    plt.ylabel("ACF")
    plt.show()

# 使用更小的max_lag
plot_acf(events_periodic, max_lag=10)
plot_acf(events_poisson)     # ACF应接近零
plot_acf(events_periodic)    # ACF可能显示微弱周期
plot_acf(events_hawkes)      # ACF在短滞后上显著非零

# Use ACF to distingusih Hawkes Process

In [None]:
from statsmodels.tsa.stattools import acf

# 加载数据
series1_path = r"C:\Users\zyh20\Desktop\Research\01_ET_data_analysis\Research\ISI_distribution\xinchao_np2_benchmark\20250310_benchmark_tremor_VN_spike_time_neuron_5.npy"
series2_path = r"C:\Users\zyh20\Desktop\Research\01_ET_data_analysis\Research\ISI_distribution\jiejue_data_benchmark\controlVN_spike_time_20221118_NP1_session3_neuron_30.npy"

t1 = np.load(series1_path)  # 替换为实际文件名
t2 = np.load(series2_path)


# 设定分箱宽度（根据数据调整）
bin_width = 0.1
max_time = max(np.max(t1), np.max(t2))
bins = np.arange(0, max_time + bin_width, bin_width)

# 生成计数序列
counts1, _ = np.histogram(t1, bins=bins)
counts2, _ = np.histogram(t2, bins=bins)

# 计算ACF，设置合适的滞后阶数
nlags = 40
acf1 = acf(counts1, nlags=nlags, fft=True)
acf2 = acf(counts2, nlags=nlags, fft=True)

# 绘制结果
plt.figure(figsize=(10, 8))

plt.subplot(2, 1, 1)
plt.stem(range(nlags+1), acf1, use_line_collection=True)
plt.title('ACF of Series 1')
plt.xlabel('Lag')
plt.ylabel('Autocorrelation')

plt.subplot(2, 1, 2)
plt.stem(range(nlags+1), acf2, use_line_collection=True)
plt.title('ACF of Series 2')
plt.xlabel('Lag')
plt.ylabel('Autocorrelation')

plt.tight_layout()
plt.show()


# Detection
1) 通过KS检验判断是否符合泊松过程
2) 通过STL分解检测周期性
3) 通过拟合Hawkes模型检测自激励特性

In [None]:
# 1：泊松过程检验（Kolmogorov-Smirnov）
def test_poisson(events):
    intervals = np.diff(events)
    from scipy.stats import kstest, expon
    ks_stat, p_value = kstest(intervals, expon.cdf, args=(1/np.mean(intervals),))
    print(f"KS检验p值: {p_value:.3f} (p>0.05接受泊松假设)")

# 2：检测周期性（STL分解）
def detect_periodicity(events, period=10):
    counts, bins = np.histogram(events, bins=np.arange(0, T, 1))
    stl = STL(counts, period=period, seasonal=13)
    result = stl.fit()
    result.plot()
    plt.show()

# 3：拟合Hawkes模型，检查参数显著性
def fit_hawkes(events):
    # 使用 'decays' 而不是 'decay'
    model = HawkesExpKern(decays=1.0)  # 注意这里是 decays (复数)
    model.fit(events)
    print(f"Hawkes模型参数 - 基线: {model.baseline[0]:.3f}, 自激强度: {model.adjacency[0,0]:.3f}")
    return model.score()  # 返回对数似然

In [None]:
print("=== Poisson Process ===")
test_poisson(events_poisson)
detect_periodicity(events_poisson)

print("\n=== Periodic Poisson Process ===")
test_poisson(events_periodic)  # 预期拒绝泊松假设
detect_periodicity(events_periodic)

print("\n=== Hawkes Process ===")
log_likelihood = fit_hawkes([events_hawkes])
test_poisson(events_hawkes)     # 预期拒绝泊松假设

# Power Spectrum Analysis

In [None]:
def plot_spectrum(counts, bin_width, title):
    """
    计算并绘制功率谱密度
    :param counts: 分箱后的计数序列
    :param bin_width: 分箱宽度（秒）
    :param title: 图像标题
    """
    # 计算采样频率（Hz）
    fs = 1 / bin_width  # 例如 bin_width=0.1秒 → fs=10Hz
    
    # 使用Welch方法估计PSD
    f, Pxx = signal.welch(
        counts,
        fs=fs,
        window='hann',    # 汉宁窗减少频谱泄漏
        nperseg=256,      # 分段长度
        scaling='density' # 功率谱密度
    )
    
    # 可视化
    plt.figure(figsize=(10, 4))
    plt.semilogy(f, Pxx, color='blue')
    plt.title(f'Power Spectrum: {title}')
    plt.xlabel('Frequency (Hz)')
    plt.ylabel('Power Spectral Density')
    plt.grid(True)
    
    # 标注最高峰
    peak_idx = np.argmax(Pxx[1:]) + 1  # 排除0频率
    plt.annotate(f'Peak: {f[peak_idx]:.2f} Hz\n(Period: {1/f[peak_idx]:.2f} sec)',
                 xy=(f[peak_idx], Pxx[peak_idx]),
                 xytext=(f[peak_idx]+0.2, Pxx[peak_idx]*0.8),
                 arrowprops=dict(arrowstyle="->"))
    plt.show()

# 分析Series 1（假设bin_width=0.1秒）
plot_spectrum(counts1, bin_width=0.1, title="Series 1")

# 分析Series 2
plot_spectrum(counts2, bin_width=0.1, title="Series 2")