### Bibliotecas

In [None]:
"""
%pip install aeon
%pip install tsfresh
%pip install tslearn
%pip install tensorflow
%pip install keras
%pip install pywavelets
"""

In [3]:
import pandas as pd
import numpy as np

from aeon.datasets import load_classification
from aeon.datasets.tsc_data_lists import univariate_equal_length
from aeon.classification.interval_based import SupervisedTimeSeriesForest, TimeSeriesForestClassifier, DrCIFClassifier
from aeon.classification.distance_based import KNeighborsTimeSeriesClassifier
from sklearn.svm import SVC

from aeon.classification.base import BaseClassifier

from tsfresh import extract_features
from tsfresh.feature_extraction import MinimalFCParameters

from tslearn.preprocessing import TimeSeriesScalerMeanVariance
from tslearn.piecewise import PiecewiseAggregateApproximation, SymbolicAggregateApproximation

import pywt
from sklearn.linear_model import RidgeClassifierCV, LogisticRegression
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import LabelEncoder
from sklearn.ensemble import RandomForestClassifier, ExtraTreesClassifier, GradientBoostingClassifier
from scipy.fftpack import fft
from tqdm import tqdm

import warnings
warnings.filterwarnings("ignore")

### Loading data

In [None]:
def load_data(dataset):
    # LabelEncoder para labels alvo
    le = LabelEncoder()

    # Carregar conjunto de dados do repositório UCR
    X_train, y_train = load_classification(dataset, split="TRAIN")
    X_test, y_test = load_classification(dataset, split="test")

    # Formatar o conjunto de dados para 2D
    features_train = X_train.reshape(X_train.shape[0], -1)
    features_test = X_test.reshape(X_test.shape[0], -1)

    # Ajustar e transformar as labels alvo
    target_train = le.fit_transform(y_train)
    target_test = le.transform(y_test)

    return features_train, features_test, target_train, target_test


### Function transform data

In [4]:
def choose_wavelet(X):
    min_variance = float('inf')
    best_wavelet = None
    candidate_wavelets = ['db1', 'db2', 'db3', 'db4', 'db5', 'db6', 'db7', 'db8', 'db9']

    for wavelet_type in candidate_wavelets:
        _, coeffs_cD = pywt.dwt(X, wavelet_type, axis=1)
        total_variance = np.var(coeffs_cD)

        if total_variance < min_variance:
            min_variance = total_variance
            best_wavelet = wavelet_type
    return str(best_wavelet)


In [5]:
def transform_data_math(X, wavelet='db1'):
    n_sax_symbols = int(X.shape[1] / 4)
    n_paa_segments = int(X.shape[1] / 4)

    # FFT Transformation
    X_fft = np.abs(fft(X, axis=1))
    
    # DWT Transformation
    coeffs_cA, coeffs_cD = pywt.dwt(X, wavelet=wavelet, axis=1, mode='constant')
    X_dwt = np.hstack((coeffs_cA, coeffs_cD))

    # PAA Transformation
    paa = PiecewiseAggregateApproximation(n_segments=n_paa_segments)
    X_paa_ = paa.inverse_transform(paa.fit_transform(X))
    X_paa = X_paa_.reshape(X_paa_.shape[0], -1)
    df_PAA = pd.DataFrame(X_paa)
    
    # SAX Transformation
    sax = SymbolicAggregateApproximation(n_segments=n_paa_segments, alphabet_size_avg=n_sax_symbols)
    X_sax_ = sax.inverse_transform(sax.fit_transform(X))
    X_sax = X_sax_.reshape(X_sax_.shape[0], -1)
    df_SAX = pd.DataFrame(X_sax)

    # Original Data
    data_X = TimeSeriesScalerMeanVariance().fit_transform(X)
    data_X.resize(data_X.shape[0], data_X.shape[1])
    df_X = pd.DataFrame(data_X)

    # FFT Data
    data_FFT = TimeSeriesScalerMeanVariance().fit_transform(X_fft)
    data_FFT.resize(data_FFT.shape[0], data_FFT.shape[1])
    df_FFT = pd.DataFrame(data_FFT)

    # DWT Data
    data_DWT = TimeSeriesScalerMeanVariance().fit_transform(X_dwt)
    data_DWT.resize(data_DWT.shape[0], data_DWT.shape[1])
    df_DWT = pd.DataFrame(data_DWT)

    # Adding IDs to DataFrames
    df_X['id'] = df_FFT['id'] = df_DWT['id'] = df_PAA['id'] = df_SAX['id'] = range(len(df_X))
    
    # Merging all DataFrames on 'id'
    final_df = df_X.merge(df_FFT, on='id', suffixes=('_X', '_FFT'))
    final_df = final_df.merge(df_DWT, on='id', suffixes=('', '_DWT'))
    final_df = final_df.merge(df_PAA, on='id', suffixes=('', '_PAA'))
    final_df = final_df.merge(df_SAX, on='id', suffixes=('', '_SAX'))
    
    
    return final_df

### AmazonForestClassifier

In [None]:
class CombinedDecisionForest:
    def __init__(self):
        self.clf1 = RandomForestClassifier()
        self.clf2 = ExtraTreesClassifier()
        self.clf3 = SupervisedTimeSeriesForest()
        self.clf4 = TimeSeriesForestClassifier()
        self.meta_clf = RidgeClassifierCV(alphas=np.logspace(-3,3,10))
        self.classifiers = [self.clf1, self.clf2, self.clf3, self.clf4]
        self.clf_weights = None

    def fit(self, X, y):
        for clf in self.classifiers:
            clf.fit(X, y)

        train_preds = [clf.predict(X) for clf in self.classifiers]
        accuracies = [accuracy_score(y, preds) for preds in train_preds]

        self.clf_weights = np.array(accuracies)
        self.clf_weights /= np.sum(self.clf_weights)

    def predict_proba(self, X):
        probs = [clf.predict_proba(X) for clf in self.classifiers]
        combined_probs = np.sum([prob * weight for prob, weight in zip(probs, self.clf_weights)], axis=0)
        return combined_probs / np.sum(combined_probs, axis=1, keepdims=True)

    def predict(self, X):
        proba = self.predict_proba(X)
        return np.argmax(proba, axis=1)

### Train/Predict

In [None]:
def train_classifier(X_train, y_train, wavelet=None):
    trained_models = {}  # Salvar modelos treinados para cada transformação
    X_train_transformed = transform_data_math(X_train, wavelet=wavelet)  # Transformar todo o conjunto de treino
    # Treinar um modelo para cada transformação e salvar no dicionário
    for rep, X_trans in X_train_transformed.items():
        model = CombinedDecisionForest()
        model.fit(X_trans, y_train)
        trained_models[rep] = model
        
    # Preparar dados para o meta-classificador
    meta_features = []
    for i in range(X_train.shape[0]):
        instance_features = []
        for rep, model in trained_models.items():
            proba = model.predict_proba(np.array(X_train_transformed[rep].iloc[i,:]).reshape(1, -1))
            instance_features.extend(proba.flatten())
        meta_features.append(instance_features)
    
    meta_features = np.array(meta_features)
    
    meta_classifier = RidgeClassifierCV(np.logspace(-3,3,10))
    meta_classifier.fit(meta_features, y_train)
    
    return trained_models, meta_classifier

def predict_classifier(X_test, trained_base_model, trained_meta_classifier, wavelet=None):
    predictions = []
    meta_features_test = []

    for i in tqdm(range(len(X_test)), ascii=True, colour='green', desc="Testing"):
        x_instance = X_test[i].reshape(1, -1)
        x_transformed = transform_data_math(x_instance, wavelet=wavelet)

        instance_features = []
        for rep, X_trans in x_transformed.items():
            proba = trained_base_model[rep].predict_proba(X_trans.values.reshape(1, -1))
            instance_features.extend(proba.flatten())

        meta_feature = np.array(instance_features).reshape(1, -1)
        predictions.append(trained_meta_classifier.predict(meta_feature)[0])

        meta_features_test.append(meta_feature.flatten())

    meta_features_test = np.array(meta_features_test)

    return predictions


# AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA

In [None]:
#Normal Version
def apply_pooling_operations(data, pooling_funcs):
    pooled_features = []
    for func in pooling_funcs:
        pooled_features.append(func(data, axis=1))
    return np.hstack(pooled_features)

def transform_series(X, wavelet='db1'):
    n_sax_symbols = int(X.shape[1] / 4)
    n_paa_segments = int(X.shape[1] / 4)

    all_transformed_features = []

    for series in X:
        series = series.reshape(1, -1)  # Ensure series is 2D for transformations

        # FFT Transformation
        X_fft = np.abs(fft(series, axis=1))

        # DWT Transformation
        coeffs_cA, coeffs_cD = pywt.dwt(series, wavelet=wavelet, axis=1, mode='constant')
        X_dwt = np.hstack((coeffs_cA, coeffs_cD))

        # PAA Transformation
        paa = PiecewiseAggregateApproximation(n_segments=n_paa_segments)
        X_paa_ = paa.inverse_transform(paa.fit_transform(series))
        X_paa = X_paa_.reshape(X_paa_.shape[0], -1)

        # SAX Transformation
        sax = SymbolicAggregateApproximation(n_segments=n_paa_segments, alphabet_size_avg=n_sax_symbols)
        X_sax_ = sax.inverse_transform(sax.fit_transform(series))
        X_sax = X_sax_.reshape(X_sax_.shape[0], -1)

        # Pooling Functions (MAX and PPV)
        pooling_funcs = [
            np.max,
            np.min,
            np.mean,
            np.median,
            np.std,
            lambda x, axis: np.percentile(x, 90, axis=axis)
    ]

        # Apply Pooling Operations
        pooled_X = apply_pooling_operations(series, pooling_funcs)
        pooled_fft = apply_pooling_operations(X_fft, pooling_funcs)
        pooled_dwt = apply_pooling_operations(X_dwt, pooling_funcs)
        pooled_paa = apply_pooling_operations(X_paa, pooling_funcs)
        pooled_sax = apply_pooling_operations(X_sax, pooling_funcs)

        # Concatenate Features
        concatenated_features = np.hstack((pooled_X, pooled_fft, pooled_dwt, pooled_paa, pooled_sax))
        all_transformed_features.append(concatenated_features)

    return np.array(all_transformed_features)


In [None]:
from aeon.classification.shapelet_based import ShapeletTransformClassifier
from aeon.classification.dictionary_based import ContractableBOSS
from sklearn.model_selection import cross_val_score

In [None]:
class CombinedDecisionForest:
    def __init__(self):
        self.clf1 = ContractableBOSS()
        self.clf2 = ExtraTreesClassifier()
        self.clf3 = ShapeletTransformClassifier()
        self.clf4 = TimeSeriesForestClassifier()
        self.classifiers = [self.clf1, self.clf2, self.clf3, self.clf4]
        self.clf_weights = None

    def fit(self, X, y):
        # Use cross-validation to determine the weights of each classifier
        accuracies = []
        for clf in self.classifiers:
            scores = cross_val_score(clf, X, y, cv=5, scoring='accuracy')
            accuracies.append(np.mean(scores))

        self.clf_weights = np.array(accuracies)
        self.clf_weights /= np.sum(self.clf_weights)

        for clf in self.classifiers:
            clf.fit(X, y)

    def predict_proba(self, X):
        probs = [clf.predict_proba(X) for clf in self.classifiers]
        combined_probs = np.sum([prob * weight for prob, weight in zip(probs, self.clf_weights)], axis=0)
        return combined_probs / np.sum(combined_probs, axis=1, keepdims=True)

    def predict(self, X):
        proba = self.predict_proba(X)
        return np.argmax(proba, axis=1)

In [None]:
X_train, X_test, target_train, target_test = load_data('StarLightCurves')

#Training
best_wl = choose_wavelet(X_train)
features_train = transform_series(X_train, best_wl)
#Testing
features_test = transform_series(X_test, best_wl)

feature_extractor = CombinedDecisionForest()
feature_extractor.fit(features_train, target_train)
    
train_features = feature_extractor.predict_proba(features_train)
test_features = feature_extractor.predict_proba(features_test)
    
meta_model = RidgeClassifierCV(alphas=np.logspace(-3, 3, 10))
    
meta_model.fit(train_features, target_train)
    
predictions = meta_model.predict(test_features)
    
accuracy = accuracy_score(target_test, predictions)

accuracy

In [None]:
select = ['Beef','Car','CBF','Coffee','DiatomSizeReduction','ECG200','ECGFiveDays','FaceFour','GunPoint','Lightning2','Lightning7','MedicalImages','MoteStrain','OliveOil','SonyAIBORobotSurface1','SonyAIBORobotSurface2', 'Trace', 'SyntheticControl','TwoPatterns']

In [None]:
from sklearn import svm
accuracy_data = []
for dataset_name in select:
    X_train, X_test, target_train, target_test = load_data(dataset_name)
    
    #Training
    best_wl = choose_wavelet(X_train)
    features_train = transform_series(X_train, best_wl)
    #Testing
    features_test = transform_series(X_test, best_wl)
    
    feature_extractor = CombinedDecisionForest()
    feature_extractor.fit(features_train, target_train)
        
    train_features = feature_extractor.predict_proba(features_train)
    test_features = feature_extractor.predict_proba(features_test)
        
    meta_model = RidgeClassifierCV(alphas=np.logspace(-3, 3, 10))

    meta_model.fit(train_features, target_train)
        
    predictions = meta_model.predict(test_features)
        
    accuracy = accuracy_score(target_test, predictions)
        
    accuracy_data.append({'Dataset Name': dataset_name, 'Accuracy': accuracy})
    
    print(f"Acurácia {dataset_name}: {accuracy}")
    
accuracy_df = pd.DataFrame(accuracy_data)

### Meta-Classificador TUNADO

In [6]:
from sklearn.model_selection import GridSearchCV
class CombinedMetaClassifier:
    def __init__(self):

        self.param_grids = [
            {
                "C":[0.01, 0.1, 1, 10, 100, 1000],
                "kernel":['linear','rbf','poly','sigmoid'],
            }
        ]

        self.clf1 = SVC(probability=True)
        self.clf2 = SVC(probability=True)
        self.clf3 = SVC(probability=True)
        self.clf4 = SVC(probability=True)
        self.meta_clf = RidgeClassifierCV(np.logspace(-3,3,10))
        self.classifiers = [self.clf1, self.clf2, self.clf3, self.clf4]

    def fit(self, X, y):
        # Treinar classificadores base com GridSearchCV
        best_classifiers = []
        for clf, param_grid in zip(self.classifiers, self.param_grids):
            grid_search = GridSearchCV(clf, param_grid, cv=2, n_jobs=2)
            grid_search.fit(X, y)
            best_classifiers.append(grid_search.best_estimator_)
            print(f'Best parameters for {clf}: {grid_search.best_params_}')

        self.classifiers = best_classifiers

        # Obter probabilidades dos classificadores base
        base_probabilities = []
        for clf in self.classifiers:
            clf.fit(X, y)
            if hasattr(clf, "predict_proba"):
                probs = clf.predict_proba(X)
            else:
                preds = clf.predict(X)
                probs = np.zeros((preds.size, len(np.unique(y))))
                probs[np.arange(preds.size), preds] = 1
            base_probabilities.append(probs)
        
        # Stack probabilities para criar meta features
        meta_features = np.hstack(base_probabilities)
        
        # Treinar meta-classificador
        self.meta_clf.fit(meta_features, y)

    def predict(self, X):
        # Obter previsões probabilísticas dos classificadores base
        base_probabilities = []
        for clf in self.classifiers:
            if hasattr(clf, "predict_proba"):
                probs = clf.predict_proba(X)
            else:
                # Converter previsões para probabilidades (caso o classificador não suporte predict_proba)
                preds = clf.predict(X)
                probs = np.zeros((preds.size, clf.n_classes_))
                probs[np.arange(preds.size), preds] = 1
            base_probabilities.append(probs)
        
        # Stack probabilities para criar meta features
        meta_features = np.hstack(base_probabilities)
        
        # Previsão final usando o meta-classificador
        return self.meta_clf.predict(meta_features)
    
    def predict_proba(self, X):
        # Obter previsões probabilísticas dos classificadores base
        base_probabilities = []
        for clf in self.classifiers:
            if hasattr(clf, "predict_proba"):
                probs = clf.predict_proba(X)
            else:
                # Converter previsões para probabilidades (caso o classificador não suporte predict_proba)
                preds = clf.predict(X)
                probs = np.zeros((preds.size, clf.n_classes_))
                probs[np.arange(preds.size), preds] = 1
            base_probabilities.append(probs)
        
        # Stack probabilities para criar meta features
        meta_features = np.hstack(base_probabilities)
        
        # Previsão probabilística final usando o meta-classificador
        return self.meta_clf.predict_proba(meta_features)


In [None]:
def extract_tsfresh_features(X):
    X = extract_features(X, default_fc_parameters=MinimalFCParameters(), column_id='id')
    return X

In [7]:
import os

def load_data(dataset):
    # LabelEncoder para labels alvo
    le = LabelEncoder()

    # Carregar conjunto de dados do repositório UCR
    X_train, y_train = load_classification(dataset, split="TRAIN")
    X_test, y_test = load_classification(dataset, split="TEST")

    # Formatar o conjunto de dados para 2D
    features_train = X_train.reshape(X_train.shape[0], -1)
    features_test = X_test.reshape(X_test.shape[0], -1)

    # Ajustar e transformar as labels alvo
    target_train = le.fit_transform(y_train)
    target_test = le.transform(y_test)

    return features_train, features_test, target_train, target_test

def extract_tsfresh_features(X):
    # Cria um dataframe a partir dos dados
    df = pd.DataFrame(X)
    df['id'] = df.index
    
    # Extrai as características usando tsfresh
    extracted_features = extract_features(df, default_fc_parameters=MinimalFCParameters(), column_id='id')
    return extracted_features

def save_transformed_data(dataset):
    # Carrega os dados
    features_train, features_test, target_train, target_test = load_data(dataset)
    
    # Extrai características dos dados de treino e teste
    transformed_train = extract_tsfresh_features(features_train)
    transformed_test = extract_tsfresh_features(features_test)
    
    # Cria a pasta de destino, se não existir
    output_dir = './transform_data'
    os.makedirs(output_dir, exist_ok=True)
    
    # Salva os dados transformados em arquivos CSV
    transformed_train.to_csv(os.path.join(output_dir, f'{dataset}_train_features.csv'), index=False)
    transformed_test.to_csv(os.path.join(output_dir, f'{dataset}_test_features.csv'), index=False)
    
    # Salva as labels em arquivos CSV
    pd.DataFrame(target_train, columns=['target']).to_csv(os.path.join(output_dir, f'{dataset}_train_labels.csv'), index=False)
    pd.DataFrame(target_test, columns=['target']).to_csv(os.path.join(output_dir, f'{dataset}_test_labels.csv'), index=False)

for dataset in univariate_equal_length:
    save_transformed_data(dataset)

Feature Extraction: 100%|██████████| 30/30 [00:54<00:00,  1.82s/it]
Feature Extraction: 100%|██████████| 30/30 [00:57<00:00,  1.90s/it]
Feature Extraction: 100%|██████████| 30/30 [00:38<00:00,  1.28s/it]
Feature Extraction: 100%|██████████| 30/30 [00:33<00:00,  1.10s/it]
Feature Extraction: 100%|██████████| 30/30 [00:03<00:00,  8.33it/s]
Feature Extraction: 100%|██████████| 30/30 [00:05<00:00,  5.22it/s]
Feature Extraction: 100%|██████████| 30/30 [00:04<00:00,  6.64it/s]
Feature Extraction: 100%|██████████| 30/30 [00:13<00:00,  2.26it/s]
Feature Extraction: 100%|██████████| 30/30 [00:04<00:00,  6.95it/s]
Feature Extraction: 100%|██████████| 30/30 [00:17<00:00,  1.70it/s]
Feature Extraction: 100%|██████████| 30/30 [00:05<00:00,  5.64it/s]
Feature Extraction: 100%|██████████| 30/30 [00:04<00:00,  6.10it/s]
Feature Extraction: 100%|██████████| 30/30 [02:38<00:00,  5.28s/it]
Feature Extraction: 100%|██████████| 30/30 [02:52<00:00,  5.74s/it]
Feature Extraction: 100%|██████████| 30/30 [00:5

In [None]:
select = ['Adiac', 'Beef','Car','CBF','Coffee','DiatomSizeReduction','ECG200','ECGFiveDays','FaceFour','GunPoint','Lightning2','Lightning7','MedicalImages','MoteStrain','OliveOil','SonyAIBORobotSurface1','SonyAIBORobotSurface2', 'Trace', 'SyntheticControl','TwoPatterns']

In [15]:
def load_transformed_data(dataset_name, data_dir='../Pos_Qualys/dransform_data'):
    # Carregar os dados de treino e teste a partir dos arquivos CSV
    train_features_path = os.path.join(data_dir, f'{dataset_name}_train_features.csv')
    test_features_path = os.path.join(data_dir, f'{dataset_name}_test_features.csv')
    train_labels_path = os.path.join(data_dir, f'{dataset_name}_train_labels.csv')
    test_labels_path = os.path.join(data_dir, f'{dataset_name}_test_labels.csv')
    
    features_train = pd.read_csv(train_features_path)
    features_test = pd.read_csv(test_features_path)
    target_train = pd.read_csv(train_labels_path).values.ravel()
    target_test = pd.read_csv(test_labels_path).values.ravel()
    
    return features_train, features_test, target_train, target_test

accuracy_data = []
for dataset_name in univariate_equal_length:
    features_train, features_test, target_train, target_test = load_transformed_data(dataset_name)
    
    model_classifier = CombinedMetaClassifier()
    model_classifier.fit(features_train, target_train)
    y_hat = model_classifier.predict(features_test)
    accuracy = accuracy_score(target_test, y_hat)
        
    accuracy_data.append({'Dataset Name': dataset_name, 'Accuracy': accuracy})
    
    print(f"Acurácia {dataset_name}: {accuracy}")
    
accuracy_df_extf = pd.DataFrame(accuracy_data)

Best parameters for SVC(probability=True): {'C': 0.01, 'kernel': 'linear'}
Acurácia EOGVerticalSignal: 0.4005524861878453
Best parameters for SVC(probability=True): {'C': 10, 'kernel': 'poly'}
Acurácia LargeKitchenAppliances: 0.496
Best parameters for SVC(probability=True): {'C': 0.1, 'kernel': 'linear'}
Acurácia ItalyPowerDemand: 0.9708454810495627
Best parameters for SVC(probability=True): {'C': 0.01, 'kernel': 'linear'}
Acurácia ShapeletSim: 0.49444444444444446
Best parameters for SVC(probability=True): {'C': 10, 'kernel': 'poly'}
Acurácia ECGFiveDays: 0.9860627177700348
Best parameters for SVC(probability=True): {'C': 0.01, 'kernel': 'linear'}
Acurácia Coffee: 1.0
Best parameters for SVC(probability=True): {'C': 1, 'kernel': 'linear'}
Acurácia NonInvasiveFetalECGThorax1: 0.9475826972010178
Best parameters for SVC(probability=True): {'C': 10, 'kernel': 'poly'}
Acurácia CricketX: 0.6410256410256411
Best parameters for SVC(probability=True): {'C': 10, 'kernel': 'poly'}
Acurácia TwoPat

In [16]:
accuracy_df_extf.to_csv("df_metaLearningSVC.csv", index=False)

In [None]:
accuracy_df_extf = pd.DataFrame(accuracy_data)
accuracy_df_extf

### Meta-Learning

In [None]:
def visualize_ensemble_prediction(X, model):
    # Generate meta-features for visualization
    meta_features = np.column_stack([clf.predict_proba(X) for clf in model.base_classifiers])
    meta_predictions = model.meta_clf.predict_proba(meta_features)
    
    # Criar um DataFrame para visualização
    df_meta = pd.DataFrame(meta_features, columns=[f'Prob Class {i+1} from Clf{j+1}' 
                                                   for j in range(len(model.base_classifiers)) 
                                                   for i in range(meta_features.shape[1] // len(model.base_classifiers))])
    df_meta['Meta Prediction Prob Class 1'] = meta_predictions[:, 0]
    df_meta['Meta Prediction Prob Class 2'] = meta_predictions[:, 1]
    df_meta['Meta Prediction'] = np.argmax(meta_predictions, axis=1) + 1  # Adicionar 1 para coincidir com a classe 1-indexada
    
    display(df_meta)

In [None]:
# Exemplo de uso:
visualize_ensemble_prediction(meta_ts, model_learning)

### USING CNN

In [None]:
import tensorflow
from tensorflow.keras.layers import Input, Conv1D, MaxPooling1D, BatchNormalization, ReLU, concatenate, Flatten, Dense, Add
from tensorflow.keras.models import Model, Sequential

In [None]:
def load_data_cnn(dataset):
    # LabelEncoder para labels alvo
    le = LabelEncoder()
    # Carregar conjunto de dados do repositório UCR
    X_train, y_train = load_classification(dataset, split="TRAIN")
    X_test, y_test = load_classification(dataset, split="test")

    # Ajustar e transformar as labels alvo
    y_train = le.fit_transform(y_train)
    y_test = le.transform(y_test)

    return X_train, X_test, y_train, y_test


In [None]:
def apply_pooling_operations_vectorized(data, pooling_funcs):
    pooled_features = []
    for func in pooling_funcs:
        if isinstance(func, tuple):
            func, q = func
            pooled_features.append(func(data, q, axis=-1))
        else:
            pooled_features.append(func(data, axis=-1))
    return np.concatenate(pooled_features, axis=-1)

def transform_series_cnn(X, wavelet='db1'):
    n_sax_symbols = int(X.shape[-1] / 4)
    n_paa_segments = int(X.shape[-1] / 4)

    pooling_funcs = [
        np.max,
        np.min,
        np.mean,
        np.median,
        np.std,
        (np.percentile, 90)
    ]

    # FFT Transformation
    X_fft = np.abs(fft(X, axis=-1))

    # DWT Transformation
    coeffs_cA, coeffs_cD = pywt.dwt(X, wavelet=wavelet, axis=-1, mode='constant')
    X_dwt = np.concatenate((coeffs_cA, coeffs_cD), axis=-1)

    # PAA Transformation
    paa = PiecewiseAggregateApproximation(n_segments=n_paa_segments)
    X_paa_ = paa.inverse_transform(paa.fit_transform(X))

    # SAX Transformation
    sax = SymbolicAggregateApproximation(n_segments=n_paa_segments, alphabet_size_avg=n_sax_symbols)
    X_sax_ = sax.inverse_transform(sax.fit_transform(X))

    # Apply Pooling Operations
    pooled_X = apply_pooling_operations_vectorized(X, pooling_funcs)
    pooled_fft = apply_pooling_operations_vectorized(X_fft, pooling_funcs)
    pooled_dwt = apply_pooling_operations_vectorized(X_dwt, pooling_funcs)
    pooled_paa = apply_pooling_operations_vectorized(X_paa_, pooling_funcs)
    pooled_sax = apply_pooling_operations_vectorized(X_sax_, pooling_funcs)

    # Add an extra dimension to arrays that have only 1 dimension
    pooled_X = np.expand_dims(pooled_X, axis=-1)
    pooled_fft = np.expand_dims(pooled_fft, axis=-1)
    pooled_dwt = np.expand_dims(pooled_dwt, axis=-1)
    pooled_paa = np.expand_dims(pooled_paa, axis=-1)
    pooled_sax = np.expand_dims(pooled_sax, axis=-1)

    # Concatenate Features
    concatenated_features = np.concatenate((pooled_X, pooled_fft, pooled_dwt, pooled_paa, pooled_sax), axis=-1)

    return concatenated_features


In [None]:
def cnn_model(input_shape):
    input_layer = Input(shape=input_shape)

    # First convolutional block
    conv1 = Conv1D(filters=32, kernel_size=2, activation='relu')(input_layer)
    pool1 = MaxPooling1D(pool_size=2)(conv1)
    norm1 = BatchNormalization()(pool1)

    # Second convolutional block
    conv2 = Conv1D(filters=32, kernel_size=2, activation='relu')(input_layer)
    pool2 = MaxPooling1D(pool_size=2)(conv2)
    norm2 = BatchNormalization()(pool2)

    # Third convolutional block
    conv3 = Conv1D(filters=32, kernel_size=3, activation='relu')(input_layer)
    pool3 = MaxPooling1D(pool_size=2)(conv3)
    norm3 = BatchNormalization()(pool3)

    # Concatenate all features
    concatenated = concatenate([norm1, norm2, norm3], axis=-1)

    # Fully connected layers
    flat = Flatten()(concatenated)
    dense1 = Dense(units=64, activation='relu')(flat)
    output_layer = Dense(units=1, activation='softmax')(dense1)  # Saída binária

    model = Model(inputs=input_layer, outputs=output_layer)
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
    return model


In [None]:
#Data
X_train, X_test, target_train, target_test = load_data_cnn('CBF')

#Training
best_wl = choose_wavelet(X_train)
features_train = transform_series_cnn(X_train, best_wl)

#Testing
features_test = transform_series_cnn(X_test, best_wl)

In [None]:
input_shape = features_train.shape[1:]
model_classifier = cnn_model(input_shape)

model_classifier.fit(features_train, target_train, epochs=20, batch_size=2)
y_hat = model_classifier.predict(features_test)
accuracy = accuracy_score(target_test, y_hat)
print(f"Accuracy: {accuracy}")

### Using Inception

In [None]:
def apply_transformations(X):
    n_sax_symbols = int(X.shape[-1] / 4)
    n_paa_segments = int(X.shape[-1] / 4)

    # FFT Transformation
    X_fft = np.abs(fft(X, axis=-1))

    # DWT Transformation
    coeffs_cA, coeffs_cD = pywt.dwt(X, wavelet='db1', axis=-1, mode='constant')
    X_dwt = np.concatenate((coeffs_cA, coeffs_cD), axis=-1)

    # PAA Transformation
    paa = PiecewiseAggregateApproximation(n_segments=n_paa_segments)
    X_paa = paa.inverse_transform(paa.fit_transform(X))

    # SAX Transformation
    sax = SymbolicAggregateApproximation(n_segments=n_paa_segments, alphabet_size_avg=n_sax_symbols)
    X_sax = sax.inverse_transform(sax.fit_transform(X))

    return [X_fft, X_dwt, X_paa, X_sax]


In [None]:
from aeon.classification.deep_learning import IndividualInceptionClassifier
from sklearn.linear_model import RidgeClassifierCV
from sklearn.metrics import accuracy_score

def train_individual_classifiers(X_train, y_train, X_val, y_val):
    classifiers = []
    val_predictions = []
    
    transformed_X_train = apply_transformations(X_train)
    transformed_X_val = apply_transformations(X_val)

    for X_trans, x_val_trans in zip(transformed_X_train, transformed_X_val):
        clf = IndividualInceptionClassifier(n_epochs=1500, batch_size=64, verbose=False, save_best_model=True)
        clf.fit(X_trans, y_train)
        classifiers.append(clf)
        
        # Prever no conjunto de validação
        val_pred = clf.predict(x_val_trans)
        val_predictions.append(val_pred)

    # Converter lista de previsões para um array numpy
    val_predictions = np.array(val_predictions).T  # Transpor para que cada linha corresponda a um exemplo

    return classifiers, val_predictions


In [None]:
from sklearn.linear_model import RidgeClassifierCV

def train_meta_classifier(X_test, y_test, classifiers):
    # Coletar previsões no conjunto de teste para treinar o meta-classificador
    transformed_X_test = apply_transformations(X_test)

    test_predictions = []
    for clf, X_test_trans in zip(classifiers, transformed_X_test):
        test_pred = clf.predict(X_test_trans)
        test_predictions.append(test_pred)

    # Converter lista de previsões para um array numpy
    test_predictions = np.array(test_predictions).T  # Transpor para que cada linha corresponda a um exemplo

    # Treinar o meta-classificador
    meta_clf = RidgeClassifierCV()
    meta_clf.fit(test_predictions, y_test)

    # Prever com o meta-classificador
    final_predictions = meta_clf.predict(test_predictions)

    return final_predictions, meta_clf


In [None]:
#Data
X_train, X_test, target_train, target_test = load_data_cnn('CBF')

#Training
#best_wl = choose_wavelet(X_train)
features_train = apply_transformations(X_train)

#Testing
features_test = apply_transformations(X_test)

In [None]:
from aeon.datasets import load_arrow_head
from sklearn.model_selection import train_test_split

# Carregar dados de exemplo
X, y = load_arrow_head(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
X_tr, X_val, y_tr, y_val = train_test_split(X_train, y_train, test_size=0.2, random_state=42)

# Treinar os classificadores individuais
classifiers, val_predictions = train_individual_classifiers(X_tr, y_tr, X_val, y_val)

# Treinar e avaliar o meta-classificador
final_predictions, meta_clf = train_meta_classifier(X_test, y_test, classifiers)

# Avaliar o desempenho
accuracy = accuracy_score(y_test, final_predictions)
print(f"Accuracy: {accuracy}")
