In [2]:
import matplotlib.pyplot as plt
from pathlib import Path
import json
import torch
from sklearn.manifold import TSNE
import numpy as np
import seaborn as sns
import pandas as pd
from collections import defaultdict

from sklearn.cluster import KMeans
from sklearn.mixture import GaussianMixture
from sklearn.metrics import classification_report, balanced_accuracy_score
from metrics import acc

from data import get_training_and_validation_data
from autoencoder import Autoencoder, VariationalAutoencoder, CategoricalAutoencoder, ConvolutionalCategoricalAutoencoder

model_id = 'PRZG24RVB6'
model_config = json.load(open(Path('models') / model_id / 'config.json', 'r'))
dataset_id = model_config['dataset_id']
model_type = model_config['model']

X_train, y_train, X_val, y_val = get_training_and_validation_data(
    Path('processed_data'), dataset_id, balanced=True)
# X_mean, X_std = X_train.mean(), X_train.std()
# X_train -= X_mean
# X_train /= X_std
# X_val -= X_mean
# X_val /= X_std

X_min, X_max = X_train.min(), X_train.max()
X_train -= X_min
X_train /= (X_max - X_min)
X_val -= X_min
X_val /= (X_max - X_min)
X_train_tensor = torch.from_numpy(X_train).to(torch.float32)
X_val_tensor = torch.from_numpy(X_val).to(torch.float32)

if model_config['model'] == 'vae':
    model = VariationalAutoencoder
elif model_config['model'] == 'cae':
    model = CategoricalAutoencoder
elif model_config['model'] == 'convcae':
    model = ConvolutionalCategoricalAutoencoder
else:
    model = Autoencoder

autoencoder = model(input_dim=X_train.shape[1], **model_config)
autoencoder.load_state_dict(torch.load(Path('models') / model_id / 'ae.pth'))
autoencoder.eval()

if model_type == 'cae' or model_type == 'convcae':
    p = autoencoder.encode(X_train_tensor)
    X_train_enc = autoencoder.reparameterize(p, temperature=model_config['temperature'])
elif model_type == 'vae':
    X_train_mu, X_train_log = autoencoder.encode(X_train_tensor)
    X_train_enc = autoencoder.reparameterize(X_train_mu, X_train_log)
else:
    X_train_enc = autoencoder.encode(X_train_tensor)
X_train_rec = autoencoder.decode(X_train_enc)
print('MSE LOSS:', torch.nn.functional.mse_loss(X_train_tensor, X_train_rec).item())
X_train_rec = X_train_rec.detach().numpy()
X_train_enc = X_train_enc.detach().numpy()

if model_type == 'cae' or model_type == 'convcae':
    p = autoencoder.encode(X_val_tensor)
    X_val_enc = autoencoder.reparameterize(p, temperature=model_config['temperature'])
elif model_type == 'vae':
    X_val_mu, X_val_log = autoencoder.encode(X_val_tensor)
    X_val_enc = autoencoder.reparameterize(X_val_mu, X_val_log)
else:
    X_val_enc = autoencoder.encode(X_val_tensor)
X_val_rec = autoencoder.decode(X_val_enc).detach().numpy()
X_val_enc = X_val_enc.detach().numpy()

MSE LOSS: 0.007594614289700985


# Visualization

In [None]:
data = pd.DataFrame(X_train_enc, columns=[f'Dim_{i+1}' for i in range(X_train_enc.shape[1])])
data['Class'] = y_train

# Plot violin plots for each dimension
for column in data.columns[:-1]:  # Exclude the 'Class' column for the plots
    plt.figure(figsize=(10, 6))
    sns.violinplot(x='Class', y=column, data=data, hue='Class', palette='muted', legend=False)
    plt.title(f'Violin Plot of {column}')
    plt.xlabel('Class')
    plt.ylabel(column)
    plt.show()

In [8]:
samples = {}
for class_label in range(4):
    indices = np.where(y_train == class_label)[0]
    random_index = np.random.choice(indices)
    samples[class_label] = X_train[random_index], X_train_rec[random_index]

for class_label, (sample, rec_sample) in samples.items():
    plt.figure(figsize=(6, 4))
    plt.plot(sample, color='b')
    plt.plot(rec_sample, color='r')
    print(((sample - rec_sample)**2).mean())
    plt.title(f"Class {class_label}")
    plt.grid(True)
    # plt.ylim(0, 1)
    plt.show()

SyntaxError: invalid decimal literal (2525083304.py, line 11)

# Evaluation

In [65]:
mean, std = X_train_enc.mean(), X_train_enc.std()
X_train_enc -= mean
X_train_enc /= std
X_val_enc -= mean
X_val_enc /= std

kmeans = KMeans(n_clusters=4)
kmeans.fit(X_train_enc)
y_pred = kmeans.predict(X_val_enc)
_, label_mapping = acc(y_val, y_pred, return_mapping=True)
y_pred = np.vectorize(label_mapping.get)(y_pred)
print(classification_report(y_val, y_pred))

              precision    recall  f1-score   support

           0       0.00      0.00      0.00      2118
           1       0.83      0.83      0.83     15035
           2       1.00      0.58      0.73      9923
           3       0.94      0.97      0.96      1555

    accuracy                           0.69     28631
   macro avg       0.69      0.59      0.63     28631
weighted avg       0.83      0.69      0.74     28631



  super()._check_params_vs_input(X, default_n_init=10)


In [93]:
gmm = GaussianMixture(n_components=4)
gmm.fit(X_train_enc)
y_pred = gmm.predict(X_val_enc)
_, label_mapping = acc(y_val, y_pred, return_mapping=True)
y_pred = np.vectorize(label_mapping.get)(y_pred)
print(classification_report(y_val, y_pred))

              precision    recall  f1-score   support

           0       0.00      0.00      0.00      2118
           1       0.86      0.90      0.88     15035
           2       1.00      0.77      0.87      9923
           3       0.95      0.94      0.95      1555

    accuracy                           0.79     28631
   macro avg       0.70      0.65      0.67     28631
weighted avg       0.85      0.79      0.81     28631



In [3]:
from numba import njit
from tqdm import tqdm

@njit
def njit_dtw(s1, s2):
    len_s1 = len(s1)
    len_s2 = len(s2)
    mat_d = np.full((len_s1 + 1, len_s2 + 1), np.inf, dtype=np.float64)
    mat_d[0, 0] = 0.0

    for i in range(1, len_s1 + 1):
        for j in range(1, len_s2 + 1):
            d = (s1[i - 1] - s2[j - 1])**2
            mat_d[i, j] = d + np.min(np.array([mat_d[i-1, j], mat_d[i, j-1], mat_d[i-1, j-1]]))
    
    return np.sqrt(mat_d[len_s1, len_s2])

_ = njit_dtw(np.random.randn(100), np.random.randn(100))

In [4]:
from data import load_dataset

X, y = load_dataset(Path('processed_data'), dataset_id='RTAGXFQJ4T')
X, y = X[0][:, :, 0], y[0]
indices = np.random.choice(len(X), 500)
X, y = X[indices], y[indices]
print(X.shape, np.unique(y, return_counts=True))

(500, 100) (array([0, 1, 2, 3]), array([ 25, 193, 239,  43]))


In [5]:
n = len(X)
distance_matrix = np.zeros((n, n))
for i in tqdm(range(n)):
    for j in range(i, n):
        distance = njit_dtw(X[i], X[j])
        distance_matrix[i, j] = distance
        distance_matrix[j, i] = distance

100%|██████████| 500/500 [00:41<00:00, 12.06it/s]


In [7]:
from sklearn.cluster import AgglomerativeClustering
from collections import defaultdict

def cluster_time_series(distance_matrix, n_clusters=None):
    clustering_model = AgglomerativeClustering(n_clusters=n_clusters, affinity='precomputed', linkage='average')
    labels = clustering_model.fit_predict(distance_matrix)
    return labels

clusters = cluster_time_series(distance_matrix, n_clusters=10)
cluster_label_mapping = defaultdict(set)
for label, cluster in zip(y, clusters):
    cluster_label_mapping[cluster].add(label)

for i in range(len(cluster_label_mapping)):
    print(f"Cluster {i} -> {cluster_label_mapping[i]}")



(array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
 array([  2,   5,   2,  19,   3,   1,   1, 454,  11,   2]))

In [6]:
from utils import find_k_nearest_neighbors, generate_synthetic_samples
from collections import Counter

def custom_smote(time_series_data, cluster_labels, true_labels, k=5, n_samples_ratio=1.0):
    unique_clusters = np.unique(cluster_labels)
    synthetic_data = []
    synthetic_labels = []

    # Determine class distribution
    class_counter = Counter(cluster_labels)
    max_samples = max(class_counter.values())

    for cluster in unique_clusters:
        # Get data points and their true labels in the current cluster
        cluster_indices = np.where(cluster_labels == cluster)[0]
        cluster_data = time_series_data[cluster_indices]
        cluster_true_labels = np.array(true_labels)[cluster_indices]

        # Find k-nearest neighbors for data in the cluster
        _, k_neighbors = find_k_nearest_neighbors(cluster_data, k)

        for class_label in class_counter:
            # Data points of the current class in the cluster
            class_indices = [i for i, lbl in enumerate(cluster_true_labels) if lbl == class_label]
            class_data = cluster_data[class_indices]
            
            if len(class_data) == 0:
                continue
            
            # Determine the number of samples to generate
            n_samples = max_samples - class_counter[class_label]
            
            if n_samples <= 0:
                continue
            
            # Generate synthetic samples
            synthetic_samples = generate_synthetic_samples(class_data.reshape(len(class_data), -1), k_neighbors[class_indices], n_samples)
            
            # Append synthetic samples and their corresponding true labels
            synthetic_data.append(synthetic_samples)
            synthetic_labels.extend([class_label] * n_samples)
    
    # Reshape synthetic data to the original time-series shape
    if synthetic_data:
        synthetic_data = np.vstack(synthetic_data).reshape(-1, time_series_data.shape[1], time_series_data.shape[2])
        balanced_data = np.concatenate((time_series_data, synthetic_data))
        balanced_labels = np.concatenate((true_labels, synthetic_labels))
    else:
        balanced_data = time_series_data
        balanced_labels = true_labels

    return balanced_data, balanced_labels

In [7]:
clusters = cluster_time_series(distance_matrix, n_clusters=10)
custom_smote(X, clusters, y)



ValueError: Expected n_neighbors <= n_samples,  but n_samples = 2, n_neighbors = 5

: 

In [24]:
from imblearn.over_sampling import SMOTE

balanced_data = []
balanced_labels = []

clusters = cluster_time_series(distance_matrix, n_clusters=10)

unique_clusters = np.unique(clusters)
for cluster_label in unique_clusters:
    cluster_indices = [i for i, lbl in enumerate(clusters) if lbl == cluster_label]
    cluster_data = [X[i] for i in cluster_indices]
    cluster_true_labels = [y[i] for i in cluster_indices]

    smote = SMOTE()
    cluster_data_reshaped = np.array(cluster_data).reshape(len(cluster_data), -1)
    X_resampled, y_resampled = smote.fit_resample(cluster_data_reshaped, cluster_true_labels)

    balanced_data.extend(X_resampled.reshape(len(X_resampled), -1, X[0].shape[1]))
    balanced_labels.extend(y_resampled)



ValueError: The target 'y' needs to have more than 1 class. Got 1 class instead