In [1]:
import sys
sys.path.append('../')

In [2]:
from utils.ucr_helpers import UCR_Data
data = UCR_Data("Mallat")

In [3]:
fig = data.plot_fig()
fig.update_layout(template = "plotly_white", height =500, width=800,
                  xaxis=dict(title="Time"),
                  yaxis=dict(title="Value"))
fig.show()

In [4]:
print(data.X.shape)

(2400, 1024)


In [5]:
import numpy as np
import torch
correlations = torch.tensor(np.corrcoef(data.X))

In [45]:
import torch
from tqdm import tqdm
from models.embedding_models import MatrixFactorization
from typing import List, Tuple

# # Constants and configuration
EMBEDDING_DIM = 20
LEARNING_RATE = 0.01
EPOCHS = 300
STEP_SIZE = 20
GAMMA = 0.9
REGULARIZATION_LOSS_WEIGHT = 0.1
PAIRWISE_LOSS_WEIGHT = 0.001

# Initialize model
model = MatrixFactorization(
    n_time_series=data.X.shape[0], embedding_dim=EMBEDDING_DIM, normalize=True
)
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=STEP_SIZE, gamma=GAMMA)

# Function to calculate losses
def calculate_losses(model, correlations) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
    pairwise_similarity = model()
    pairwise_loss = PAIRWISE_LOSS_WEIGHT * model.calculate_loss(
        correlations, pairwise_similarity, loss_function=torch.nn.functional.l1_loss,
    )
    regularization_loss = (
        REGULARIZATION_LOSS_WEIGHT * torch.abs(torch.linalg.norm(model.embeddings.weight, dim=1) - 1).sum()
    )
    total_loss = pairwise_loss + regularization_loss
    return total_loss, pairwise_loss, regularization_loss

# Training loop
losses: List[Tuple[float, float, float]] = []
learning_rates: List[float] = []

for epoch in tqdm(range(EPOCHS)):
    optimizer.zero_grad()
    total_loss, pairwise_loss, regularization_loss = calculate_losses(model, correlations)
    total_loss.backward()
    optimizer.step()
    scheduler.step()

    # Logging losses and learning rates
    losses.append((total_loss.item(), pairwise_loss.item(), regularization_loss.item()))
    learning_rates.append(optimizer.param_groups[0]['lr'])


100%|██████████| 500/500 [00:45<00:00, 10.99it/s]


In [46]:
import plotly.graph_objects as go

# Unpack the losses
total_losses, pairwise_losses, regularization_losses = zip(*losses)

# Create a figure
fig = go.Figure()

# Add traces for pairwise and regularization losses
fig.add_trace(go.Scatter(x=list(range(len(total_losses))), y=pairwise_losses, mode='lines', name='Total Loss'))
fig.add_trace(go.Scatter(x=list(range(len(pairwise_losses))), y=pairwise_losses, mode='lines', name='Pairwise Loss'))
fig.add_trace(go.Scatter(x=list(range(len(regularization_losses))), y=regularization_losses, mode='lines', name='Regularization Loss'))

# Create a secondary y-axis for the total loss
fig.update_layout(
    yaxis=dict(title='Pairwise and Regularization Loss'),
    yaxis2=dict(title='Learning Rate', overlaying='y', side='right')
)

# Add the total loss trace
fig.add_trace(go.Scatter(x=list(range(len(learning_rates))), y=learning_rates, mode='lines', name='Learning rate', yaxis='y2'))

# Update layout
fig.update_layout(title='Losses During Training', xaxis_title='Epoch', yaxis_title='Loss')
fig.update_layout(template='plotly_dark')

# Show the figure
fig.show()


In [49]:
from sklearn.metrics import classification_report
from sklearn.neural_network import MLPClassifier
from imblearn.over_sampling import SMOTE

# Evaluation function
def evaluate_model(embeddings_train, embeddings_test, y_train, y_test, use_smote=False, verbose=False):
    # Train MLPClassifier
    classifier = MLPClassifier(hidden_layer_sizes=[100], max_iter=5000)
    if use_smote:
        sm = SMOTE(random_state=42)
        X_train_oversampled, y_train_oversampled = sm.fit_resample(embeddings_train, y_train)
        classifier.fit(X_train_oversampled, y_train_oversampled)
    else:
        classifier.fit(embeddings_train, y_train)

    # Predict and evaluate
    y_preds = classifier.predict(embeddings_test)
    report = classification_report(y_true=y_test, y_pred=y_preds, output_dict=True)
    if verbose:
        print(classification_report(y_true=y_test, y_pred=y_preds))
    accuracy = report['accuracy']
    return accuracy

# Prepare data for evaluation
smote = False
embeddings = model.embeddings.weight.detach().numpy()
train_size = data.X_train.shape[0]
embeddings_train = embeddings[:train_size, :]
embeddings_test = embeddings[train_size:, :]
y_train = data.y[:train_size]
y_test = data.y[train_size:]
evaluate_model(embeddings_train, embeddings_test, y_train, y_test, use_smote=smote, verbose=True)

              precision    recall  f1-score   support

           1       0.96      0.80      0.87       294
           2       1.00      1.00      1.00       292
           3       0.92      1.00      0.96       294
           4       0.82      0.96      0.88       289
           5       1.00      0.93      0.96       298
           6       1.00      0.99      0.99       294
           7       1.00      1.00      1.00       291
           8       1.00      0.99      0.99       293

    accuracy                           0.96      2345
   macro avg       0.96      0.96      0.96      2345
weighted avg       0.96      0.96      0.96      2345



0.9582089552238806

In [50]:
import numpy as np
import pandas as pd
import plotly.express as px
from sklearn.decomposition import PCA

def pca_plot(model, data):
    # generate some sample data
    embeddings = model.embeddings.weight.detach().numpy()

    # apply PCA to reduce the dimensionality of the data to 2D
    pca = PCA(n_components=3)
    pca.fit(embeddings)
    pca_data = pca.transform(embeddings)

    # generate some sample class labels
    labels = data.y

    # convert the PCA data, class labels, and entity names to a pandas DataFrame for plotting with plotly
    df = pd.DataFrame({'x': pca_data[:, 0], 'y': pca_data[:, 1], 'label': labels})

    # plot the data using plotly, colored by the class labels and with entity names in the hover label
    fig = px.scatter(df, x='x', y='y', color='label', hover_name='label')
    fig.update_layout(template='plotly_dark')
    fig.show()
pca_plot(model, data)

### Grid search to find best params

In [None]:
import itertools
import torch
from tqdm import tqdm
from models.embedding_models import MatrixFactorization
from typing import List, Tuple


# Define the hyperparameter grid
hyperparameter_grid = {
    'embedding_dim': [10, 20, 30],
    'learning_rate': [0.05],
    'epochs': [300],
    'step_size': [20],
    'gamma': [0.9],
    # 'regularization_loss_weight': [0.05, 0.1, 0.15],
    # 'pairwise_loss_weight': [0.001, 0.01, 0.1]
}



# Function to train and evaluate the model
def train_and_evaluate(hyperparams, data):
    # Unpack hyperparameters
    embedding_dim, learning_rate, epochs, step_size, gamma = hyperparams

    # Initialize and train Matrix Factorization model
    model = MatrixFactorization(n_time_series=data.X.shape[0], embedding_dim=embedding_dim, normalize=True)
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=step_size, gamma=gamma)

    for epoch in range(epochs):
        optimizer.zero_grad()
        total_loss, pairwise_loss, regularization_loss = calculate_losses(model, correlations)
        total_loss.backward()
        optimizer.step()
        scheduler.step()

    # Prepare data for evaluation
    smote = False
    embeddings = model.embeddings.weight.detach().numpy()
    train_size = data.X_train.shape[0]
    embeddings_train = embeddings[:train_size, :]
    embeddings_test = embeddings[train_size:, :]
    y_train = data.y[:train_size]
    y_test = data.y[train_size:]

    # Evaluate model
    accuracy = evaluate_model(embeddings_train, embeddings_test, y_train, y_test, use_smote=smote)
    return accuracy

total_combinations = 1
for values in hyperparameter_grid.values():
    total_combinations *= len(values)
# Grid search
results = []
for params in tqdm(itertools.product(*hyperparameter_grid.values()), total=total_combinations):
    performance = train_and_evaluate(params, data)
    print("="*20)
    print(params)
    print(performance)
    results.append((params, performance))

# Find the best performing hyperparameters
best_params = max(results, key=lambda x: x[1])[0]

