# Baseline Model

In [1]:
import pickle
from pathlib import Path

import numpy as np
import pandas as pd
import scipy.stats as stats
from catboost import CatBoostClassifier
from scipy.signal import medfilt
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler



In [2]:
class SignalDataset:

    # Time and frequency domains 
    @staticmethod
    def rms(y: np.ndarray):
        return np.sqrt(np.mean(y ** 2))
    
    @staticmethod
    def power(y: np.ndarray):
        return np.mean(y ** 2)
    
    @staticmethod
    def peak(y: np.ndarray):
        return np.max(np.abs(y))
    
    @staticmethod
    def p2p(y: np.ndarray):
        return np.ptp(y)
    
    @staticmethod
    def crest_factor(y: np.ndarray):
        return np.max(np.abs(y))/np.sqrt(np.mean(y ** 2))
    
    @staticmethod
    def skew(y: np.ndarray):
        return stats.skew(y)
    
    @staticmethod
    def kurtosis(y: np.ndarray):
        return stats.kurtosis(y)
    
    @staticmethod
    def form_factor(y: np.ndarray):
        return np.sqrt(np.mean(y ** 2)) / np.mean(y)
    
    @staticmethod
    def pulse_indicator(y: np.ndarray):
        return np.max(np.abs(y)) / np.mean(y)
    
    @staticmethod
    def denoize(dataset: np.ndarray, threshold: float = 0.05) -> np.ndarray:
        cleaned_dataset = dataset.copy()
        for i in range(len(dataset)):
            max_val = np.max(np.abs(dataset[i, 0]))
            silence_mask = np.abs(dataset[i, 0]) > threshold * max_val
            cleaned_dataset[i, 0] = dataset[i, 0][silence_mask]
        return cleaned_dataset
    
    @staticmethod
    def median_filter(dataset: np.ndarray) -> np.ndarray:
        cleaned_dataset = dataset.copy()
        for i in range(len(dataset)):
            cleaned_dataset[i, 0] = medfilt(dataset[i, 0], kernel_size=35)
        return cleaned_dataset

    def __init__(
            self,
            raw_data: np.ndarray,
            denoize: bool = False,
            median_filter: bool = False,
    ) -> None:
        self.raw_data = self.median_filter(raw_data) if median_filter else raw_data
        self.raw_data = self.denoize(self.raw_data) if denoize else self.raw_data
    
    def make_dataset(self):
        self.df = pd.DataFrame()
        self.df["min_t"] = [np.min(y) for y in self.raw_data[:]]
        self.df["max_t"] = [np.max(y) for y in self.raw_data[:]]
        self.df["mean_t"] = [np.mean(y) for y in self.raw_data[:]]
        self.df["rms_t"] = [SignalDataset.rms(y) for y in self.raw_data[:]]
        self.df["var_t"] = [np.var(y) for y in self.raw_data[:]]
        self.df["std_t"] = [np.std(y) for y in self.raw_data[:]]
        self.df["power_t"] = [SignalDataset.power(y) for y in self.raw_data[:]]
        self.df["peak_t"] = [SignalDataset.peak(y) for y in self.raw_data[:]]
        self.df["p2p_t"] = [SignalDataset.p2p(y) for y in self.raw_data[:]]
        self.df["crest_factor_t"] = [SignalDataset.crest_factor(y) for y in self.raw_data[:]]
        self.df["skew_t"] = [SignalDataset.skew(y) for y in self.raw_data[:]]
        self.df["kurtosis_t"] = [SignalDataset.kurtosis(y) for y in self.raw_data[:]]
        self.df["form_factor_t"] = [SignalDataset.form_factor(y) for y in self.raw_data[:]]
        self.df["pulse_indicator_t"] = [SignalDataset.pulse_indicator(y) for y in self.raw_data[:]]


        self.target = "target"
        self.features = [col for col in self.df.columns if col != "target"]


In [3]:
labels1 = pd.read_csv('synth3/labels.csv')
labels1['filename']=labels1['filename'].str.replace('/home/roman/gagarin/data_generation/data/final_dataset1', 'synth3')
labels2 = pd.read_csv('synth3/labels2.csv')
labels2['filename']=labels2['filename'].str.replace('/home/roman/gagarin/data_generation/data/final_dataset', 'synth3')

labels = pd.concat([labels1, labels2]).sample(frac=1.0)

In [4]:
import pandas as pd
import numpy as np

def load_dataset(labels):
    
    dataset = [[], [], [], [], []]
    for idx, row in labels.iterrows():
        bin_path = row['filename']
        label = row['anomaly']
        data = pd.Series(np.fromfile(bin_path, dtype='uint8'))
        distr = data.value_counts().sort_index().values
        if len(distr) != 0:
            if label == "blur":
                dataset[0].append(distr)
            elif label == "crop":
                dataset[1].append(distr)
            elif label == "highlight":
                dataset[2].append(distr)
            elif label == "normal":
                dataset[3].append(distr)
            elif label == "overlap":
                dataset[4].append(distr)
        
    dataset = [np.array(item) for item in dataset]
    
    return dataset

In [5]:
dataset = load_dataset(labels)

In [6]:
normal_dataset = SignalDataset(np.array(dataset[0]))
normal_dataset.make_dataset()
normal_dataset.df["target"] = "0"

anomal_dataset = SignalDataset(np.array(dataset[1]))
anomal_dataset.make_dataset()
anomal_dataset.df["target"] = "1"

anomal_dataset1 = SignalDataset(np.array(dataset[2]))
anomal_dataset1.make_dataset()
anomal_dataset1.df["target"] = "2"

anomal_dataset3 = SignalDataset(np.array(dataset[3]))
anomal_dataset3.make_dataset()
anomal_dataset3.df["target"] = "3"

anomal_dataset4 = SignalDataset(np.array(dataset[4]))
anomal_dataset4.make_dataset()
anomal_dataset4.df["target"] = "4"

In [7]:
dataset_df = pd.concat([normal_dataset.df, anomal_dataset.df, anomal_dataset1.df, anomal_dataset3.df, anomal_dataset4.df]).reset_index(drop=True)

In [8]:
target_col = dataset_df['target']
features = dataset_df.drop(columns=['target'])
scaler = MinMaxScaler()
normalized_features = scaler.fit_transform(features)
df = pd.DataFrame(normalized_features, columns=features.columns)
df['target'] = target_col

X_train, X_val, y_train, y_val = train_test_split(
    df.drop(columns=['target']),
    df['target'],
    random_state=90,
    test_size=0.15
)

model = CatBoostClassifier()

model.fit(X_train, y_train)

pred = model.predict(X_val)
accuracy_score(y_val, pred)

Learning rate set to 0.089908
0:	learn: 1.4012451	total: 96.5ms	remaining: 1m 36s
1:	learn: 1.2608981	total: 103ms	remaining: 51.3s
2:	learn: 1.1534536	total: 108ms	remaining: 35.9s
3:	learn: 1.0741336	total: 113ms	remaining: 28.1s
4:	learn: 1.0058879	total: 119ms	remaining: 23.6s
5:	learn: 0.9485929	total: 124ms	remaining: 20.5s
6:	learn: 0.9007713	total: 129ms	remaining: 18.3s
7:	learn: 0.8616728	total: 133ms	remaining: 16.5s
8:	learn: 0.8318622	total: 137ms	remaining: 15.1s
9:	learn: 0.8027278	total: 141ms	remaining: 13.9s
10:	learn: 0.7783500	total: 145ms	remaining: 13.1s
11:	learn: 0.7576686	total: 149ms	remaining: 12.3s
12:	learn: 0.7388718	total: 153ms	remaining: 11.6s
13:	learn: 0.7229922	total: 157ms	remaining: 11.1s
14:	learn: 0.7090005	total: 162ms	remaining: 10.6s
15:	learn: 0.6959551	total: 166ms	remaining: 10.2s
16:	learn: 0.6858598	total: 170ms	remaining: 9.82s
17:	learn: 0.6756188	total: 174ms	remaining: 9.52s
18:	learn: 0.6667046	total: 178ms	remaining: 9.21s
19:	learn

0.8416666666666667

In [39]:

dir = Path()
with open(dir / "cb_model.pkl", "wb") as f:
    pickle.dump(model, f)

with open(dir / "scaler.pkl", "wb") as f:
    pickle.dump(scaler, f)