In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:

from abc import ABC, abstractmethod
from typing import List
from scipy import signal

class Filtration(ABC):
    """
    Абстрактный класс для определения общего интерфейса классов фильтрации.

    """

    @abstractmethod
    def create_filter(self, freq_sample_rate: float,
        frequency: List[float],
        order: int
        ) -> tuple:

        """
        Абстрактная функция для определения общего интерфейса функции фильтрации.

        Args:
            freq_sample_rate: Частота дискретизации сигнала.
            frequency: Список частот для фильтрации.
            order: Порядок фильтра.

        Returns:
            filter_parameters: Кортеж из двух массивов, представляющих коэффициенты фильтра.

        """

        pass


    def filter_data(self, trace: List[float],
        freq_sample_rate: float,
        frequency: List[float],
        order: int
        ) -> List[float]:
        """
        Функция для применения фильтра к данным.

        Args:
            trace: Список данных для фильтрации.
            freq_sample_rate: Частота дискретизации сигнала.
            frequency: Список частот для фильтрации.
            order: Порядок фильтра.

        Returns:
            filtered_trace: Список отфильтрованных данных.

        """
        b, a = self.create_filter(freq_sample_rate, frequency, order)
        return signal.filtfilt(b, a, trace)

class LowPassFiltration(Filtration):
    """
    Класс для низкочастотной фильтрации данных.

    """

    def create_filter(self, freq_sample_rate: float,
        frequency: List[float],
        order: int
        ) -> tuple:
        """Создает параметры фильтра нижних частот.

        Args:
            freq_sample_rate: Частота дискретизации сигнала.
            frequency: Список частот для фильтрации.
            order: Порядок фильтра.

        Returns:
            filter_parameters: Кортеж из двух массивов, представляющих коэффициенты фильтра.

        """
        low_freq = frequency[0]
        filter_parameters = signal.butter(order, low_freq, fs=freq_sample_rate, btype='low')
        return filter_parameters

class HighPassFiltration(Filtration):
    """
    Класс для высокочастотной фильтрации данных.

    """

    def create_filter(self, freq_sample_rate: float,
        frequency: List[float],
        order: int
        ) -> tuple:
        """Создает параметры фильтра высоких частот.

        Args:
            freq_sample_rate: Частота дискретизации сигнала.
            frequency: Список частот для фильтрации.
            order: Порядок фильтра.

        Returns:
            filter_parameters: Кортеж из двух массивов, представляющих коэффициенты фильтра.

        """
        high_freq = frequency[1]
        filter_parameters = signal.butter(order, high_freq, fs=freq_sample_rate, btype='high')
        return filter_parameters

class BandPassFiltration(Filtration):
    """
    Класс для полосовой фильтрации данных.

    """

    def create_filter(self, freq_sample_rate: float,
        frequency: List[float],
        order: int
        ) -> tuple:
        """Создает параметры полосового фильтра.

        Args:
            freq_sample_rate: Частота дискретизации сигнала.
            frequency: Список частот для фильтрации.
            order: Порядок фильтра.

        Returns:
            filter_parameters: Кортеж из двух массивов, представляющих коэффициенты фильтра.

        """
        filter_parameters = signal.butter(order, frequency, fs=freq_sample_rate, btype='band')
        return filter_parameters


In [5]:
import os
import numpy as np
from scipy import signal

data_path = '/content/drive/MyDrive/ML/COOKIE/info/data/act_dataset/данные_вибрации/2024_06_01/'


def process(file, input_dir=data_path + 'norm_splited/', output_dir=data_path + 'compilled_data/') -> None:
    samplerate = 128000

    arr = np.fromfile(input_dir + file, dtype=np.int16) #reading splitted files

    arr = np.array(np.array_split(arr, 4), dtype=np.float64).T #spliting into 4 channels, and transposing for filtration

   #detrend
    for i in range(0, arr.shape[1]):
        arr[:, i] = signal.detrend(arr[:, i], type="linear")

    #filter
    cls = BandPassFiltration()
    filter_data = lambda trace: cls.filter_data(trace, freq_sample_rate=samplerate, frequency=[5, 15000], order=1)
    filtered_data = np.array([filter_data(arr[:, i]) for i in range(arr.shape[1])]).T

    #normalization
    for i in range(0, filtered_data.shape[1]):
        filtered_data[:, i] /= np.max(abs(filtered_data[:, i]))

    if not os.path.exists(output_dir):
        os.mkdir(output_dir)
    try:
        os.remove(output_dir + file)
    except FileNotFoundError:
        pass
    with open(output_dir + file, "wb") as f:
        for i in range(filtered_data.shape[1]):
            f.write(arr[:, i])


def main():
    dir = data_path + 'norm_splited/'
    files = []
    for i in os.listdir(dir):
        if i.endswith(".npy"):
            files.append(i)

    for file in files:
        process(file)


if __name__ == "__main__":
    main()