In [None]:
import matplotlib.pylab as plt
import numpy as np
from pathlib import Path
from hdf5storage import loadmat
from scipy import signal

dataset_path = '/home/zhang03/zxc/STFT_3DDL/DATASETS/raw_data/DAS1K/'


def get_class_names(directory_path):
    """
    测试模块在/home/zhang03/zxc/STFT_3DDL/STFT_3Ddl/prepreprocessing/stft/create_1window_stft.ipynb
    """
    # 转换为Path对象
    directory = Path(directory_path)
    # 确保路径存在且为目录
    if not directory.is_dir():
        raise ValueError(f"{directory_path} 不是一个有效的目录路径")
    
    # 创建一个集合来存储类别名，避免重复
    category_names = set()
    
    # 遍历主文件夹下的所有子文件夹
    for subfolder in directory.iterdir():
        if subfolder.is_dir():  # 检查是否为子文件夹
            # 遍历子文件夹中的文件
            category_names.add(subfolder.stem)
    
    # 返回类别名列表
    return list(category_names)

def get_iterfile_names(directory_path):
    """
    测试模块在/home/zhang03/zxc/STFT_3DDL/STFT_3Ddl/prepreprocessing/stft/create_1window_stft.ipynb
    """
    # 转换为Path对象
    directory = Path(directory_path)
    # 确保路径存在且为目录
    if not directory.is_dir():
        raise ValueError(f"{directory_path} 不是一个有效的目录路径")
    
    # 创建一个集合来存储文件名，避免重复
    file_names = set()
    
    # 遍历主文件夹下的所有子文件夹
    for subfolder in directory.iterdir():
        if subfolder.is_file():  # 检查是否为子文件
            file_names.add(subfolder.stem)
    
    # 返回文件名列表
    return list(file_names)

def read_raw_signal_file(dataset_path, class_name, iterfile_name):
    """
    测试模块在/home/zhang03/zxc/STFT_3DDL/STFT_3Ddl/prepreprocessing/stft/create_1window_stft.ipynb
    """
    iterfile_path = Path(dataset_path) / class_name / f"{iterfile_name}.mat"
    # mat = loadmat('/home/zhang03/zxc/STFT_3DDL/DATASETS/raw_data/DAS1K/CARHORN/carhorn1.mat')
    mat = loadmat(str(iterfile_path))
    phase = mat[iterfile_name][0]
    intensity = mat[iterfile_name][1]
    taxis = np.arange(len(phase))/10000
    plt.figure(figsize=(15, 15))
    plt.plot(taxis, phase)
    plt.figure(figsize=(15, 15))
    plt.plot(taxis, intensity)
    print('test')
    return phase, intensity

class_names = get_class_names(dataset_path)
# class_names_0 = 'HANDHAMMER'
# iterfile_names_0 = 'handhammer14'
class_names_0 = 'CARHORN'
iterfile_names_0 = 'carhorn2'
phase, intensity = read_raw_signal_file(dataset_path, class_names_0, iterfile_names_0)
print(phase.shape)
print(intensity.shape)


### 编写方法，对信号进行下采样

In [None]:
def signal_resample_1(raw_signal, num_resampled):
    # 只resample
    resampled_signal = signal.resample(raw_signal, num=num_resampled)
    return resampled_signal

def signal_resample_2(raw_signal, num_resampled):
    # 先归一化到[0,1],再重采样
    signal_data = (raw_signal - np.min(raw_signal)) / (np.max(raw_signal) - np.min(raw_signal)) # 归一化到[0,1]
    resampled_signal = signal.resample(signal_data, num=num_resampled)
    return resampled_signal

def signal_resample_3(raw_signal, num_resampled):
    # 先重采样，再归一化到[0,1]
    resampled_signal = signal.resample(raw_signal, num=num_resampled)
    resampled_signal = (resampled_signal - np.min(resampled_signal)) / (np.max(resampled_signal) - np.min(resampled_signal)) # 归一化到[0,1]
    return resampled_signal

def signal_resample_4(raw_signal, num_resampled):
    # 先归一化到[-1,1]，再重采样
    signal_data = 2 * (raw_signal - np.min(raw_signal)) / (np.max(raw_signal) - np.min(raw_signal) + 1e-10) - 1 # 归一化到[-1,1]
    resampled_signal = signal.resample(signal_data, num=num_resampled)
    return resampled_signal



In [None]:
from pyts.image import GramianAngularField
def get_gasf(signal):
    gaf = GramianAngularField(method="summation") # GASF
    gasf = gaf.fit_transform(signal.reshape(1, -1))[0]
    return gasf

def get_gadf(signal):
    gaf = GramianAngularField(method="difference") # GADF
    gadf = gaf.fit_transform(signal.reshape(1, -1))[0]
    return gadf


In [None]:
SIGNAL_SIZES = [64, 128, 256, 512]
resampled_signals = []

for signal_size in SIGNAL_SIZES:
    resampled_signal = signal_resample_1(phase, signal_size)
    resampled_signals.append(resampled_signal)
    taxis = np.arange(len(resampled_signal))/10000
    plt.figure(figsize=(15, 15))
    plt.plot(taxis, resampled_signal)
    

In [None]:
# 测试模块
for i, resampled_signal in enumerate(resampled_signals):
    gasf = get_gasf(resampled_signal)
    plt.figure()
    plt.imshow(gasf, cmap='hot', interpolation='nearest')
    plt.title(f"{str(i)} gasf")
    plt.colorbar()
    plt.show()
    gadf = get_gadf(resampled_signal)
    plt.figure()
    plt.imshow(gadf, cmap='hot', interpolation='nearest')
    plt.title(f"{str(i)} gadf")
    plt.colorbar()
    plt.show()

In [None]:
SIGNAL_SIZES = [64, 128, 256, 512]
resampled_signals = []

for signal_size in SIGNAL_SIZES:
    resampled_signal = signal_resample_2(phase, signal_size)
    resampled_signals.append(resampled_signal)
    taxis = np.arange(len(resampled_signal))/10000
    plt.figure(figsize=(15, 15))
    plt.plot(taxis, resampled_signal)

In [None]:
# 测试模块
for i, resampled_signal in enumerate(resampled_signals):
    gasf = get_gasf(resampled_signal)
    plt.figure()
    plt.imshow(gasf, cmap='hot', interpolation='nearest')
    plt.title(f"{str(i)} gasf")
    plt.colorbar()
    plt.show()
    gadf = get_gadf(resampled_signal)
    plt.figure()
    plt.imshow(gadf, cmap='hot', interpolation='nearest')
    plt.title(f"{str(i)} gadf")
    plt.colorbar()
    plt.show()

In [None]:
SIGNAL_SIZES = [64, 128, 256, 512]
resampled_signals = []

for signal_size in SIGNAL_SIZES:
    resampled_signal = signal_resample_3(phase, signal_size)
    resampled_signals.append(resampled_signal)
    taxis = np.arange(len(resampled_signal))/10000
    plt.figure(figsize=(15, 15))
    plt.plot(taxis, resampled_signal)

In [None]:
# 测试模块
for i, resampled_signal in enumerate(resampled_signals):
    gasf = get_gasf(resampled_signal)
    plt.figure()
    plt.imshow(gasf, cmap='hot', interpolation='nearest')
    plt.title(f"{str(i)} gasf")
    plt.colorbar()
    plt.show()
    gadf = get_gadf(resampled_signal)
    plt.figure()
    plt.imshow(gadf, cmap='hot', interpolation='nearest')
    plt.title(f"{str(i)} gadf")
    plt.colorbar()
    plt.show()

In [None]:
SIGNAL_SIZES = [64, 128, 256, 512]
resampled_signals = []

for signal_size in SIGNAL_SIZES:
    resampled_signal = signal_resample_4(phase, signal_size)
    resampled_signals.append(resampled_signal)
    taxis = np.arange(len(resampled_signal))/10000
    plt.figure(figsize=(15, 15))
    plt.plot(taxis, resampled_signal)

In [None]:
# 测试模块
for i, resampled_signal in enumerate(resampled_signals):
    gasf = get_gasf(resampled_signal)
    plt.figure()
    plt.imshow(gasf, cmap='hot', interpolation='nearest')
    plt.title(f"{str(i)} gasf")
    plt.colorbar()
    plt.show()
    gadf = get_gadf(resampled_signal)
    plt.figure()
    plt.imshow(gadf, cmap='hot', interpolation='nearest')
    plt.title(f"{str(i)} gadf")
    plt.colorbar()
    plt.show()