# Libraries

In [None]:
import warnings
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch

from torchsom.core import SOM
from torchsom.visualization import SOMVisualizer, VisualizationConfig

from sklearn.preprocessing import StandardScaler
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import (
    f1_score, 
    accuracy_score, 
    recall_score, 
    precision_score,
    confusion_matrix, 
    classification_report,
    ConfusionMatrixDisplay
)
from sklearn.exceptions import ConvergenceWarning, DataConversionWarning

warnings.filterwarnings("ignore", category=ConvergenceWarning)
warnings.filterwarnings("ignore", category=DataConversionWarning)

In [None]:
random_seed = 42
torch.manual_seed(random_seed)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Preprocessing 

In [None]:
wine_df = pd.read_csv(
    filepath_or_buffer="../data/wine.csv",
)
wine_df.rename(columns={'OD280/OD315': 'OD280_OD315'}, inplace=True)

In [None]:
feature_columns = wine_df.columns[1:]  
scaler = StandardScaler()
wine_df[feature_columns] = scaler.fit_transform(wine_df[feature_columns])

In [None]:
wine_df.head()

In [None]:
wine_df.describe()

In [None]:
feature_names = feature_columns.to_list()
feature_names

In [None]:
wine_df.shape

In [None]:
"""
1. Create a tensor from the wine df and separate the features and the target
2. Randomly shuffle the data
3. Split the data into training and testing sets
"""
wine_torch = torch.tensor(wine_df.to_numpy(dtype=np.float32))
all_features, all_targets = wine_torch[:, 1:], wine_torch[:, 0].long()


shuffled_indices = torch.randperm(len(all_features))
all_features, all_targets = all_features[shuffled_indices], all_targets[shuffled_indices]

train_ratio = 0.8
train_count = int(train_ratio * len(all_features))
train_features, train_targets = all_features[:train_count], all_targets[:train_count]
test_features, test_targets = all_features[train_count:], all_targets[train_count:]

print(train_features.shape, test_features.shape)
print(train_targets.shape, test_targets.shape)

# TorchSOM

In [None]:
som = SOM(
    x=25,
    y=15,
    sigma=1.75,
    learning_rate=0.95,
    neighborhood_order=3,
    epochs=100,
    batch_size=16,
    topology="rectangular",
    distance_function="euclidean",
    neighborhood_function="gaussian",
    num_features=all_features.shape[1],
    lr_decay_function="asymptotic_decay",
    sigma_decay_function="asymptotic_decay",
    initialization_mode="pca",
    device=device,
    random_seed=random_seed,
) 

In [None]:
som.initialize_weights(
    data=train_features,
    mode=som.initialization_mode
)

In [None]:
QE, TE = som.fit(
    data=train_features
)

In [None]:
visualizer = SOMVisualizer(som=som, config=VisualizationConfig(save_format="pdf"))
save_path = f"results/wine/{som.topology}" # Set to None if you want a direct plot

In [None]:
visualizer.plot_training_errors(
    quantization_errors=QE, 
    topographic_errors=TE, 
    save_path=save_path
)

In [None]:
visualizer.plot_distance_map(save_path=save_path)

In [None]:
visualizer.plot_hit_map(
    data=train_features,
    save_path=save_path
)

In [None]:
visualizer.plot_classification_map(
    data=train_features,
    target=train_targets,
    save_path=save_path
)

In [None]:
visualizer.plot_component_planes(
    component_names=feature_names,
    save_path=save_path
)

# Prediction
Here, we do not add the testing samples in the SOM BMUs map.  
In forecasting or process control, it is interesting to add overtime the new elements in the SOM and potentially to update/refit it with a certain frequency.

In [None]:
predictions = []
bmus_idx_map = som.build_bmus_data_map(
    data=train_features,
    return_indices=True,  # False means we want the features of each sample and not the indices
)
for idx, (test_feature, test_target) in enumerate(zip(test_features, test_targets)):
        
    collected_features, collected_targets = som.collect_samples(
        query_sample=test_feature,
        historical_samples=train_features,
        historical_outputs=train_targets,
        min_buffer_threshold=30, # Collect 30 historical samples to train a model
        bmus_idx_map=bmus_idx_map,
    )
    
    X = collected_features.cpu().numpy()
    y = collected_targets.cpu().numpy().ravel()
    test_feature_np = test_feature.cpu().numpy().reshape(1, -1)  
    
    clf = MLPClassifier(
        hidden_layer_sizes=(8, 8, 8),
        max_iter=200,
        learning_rate_init=0.001,
        activation="relu",
        solver="adam",
        batch_size='auto', 
        random_state=random_seed,
        shuffle=True,
        verbose=False,
    ).fit(X, y)
    
    # plt.plot(clf.loss_curve_)
    # plt.xlabel("Iteration")
    # plt.ylabel("Loss")
    # plt.title("MLPClassifier Training Loss Curve")
    # plt.grid(True)
    # plt.show()
    
    clf_prediction = clf.predict(test_feature_np)
    predictions.append(clf_prediction[0]) 

In [None]:
y_pred = np.array(predictions)
y_true = test_targets.numpy()             

# accuracy = accuracy_score(y_true, y_pred)
# precision = precision_score(y_true, y_pred, average='macro') # or 'weighted' or 'micro'
# recall = recall_score(y_true, y_pred, average='macro')
# f1 = f1_score(y_true, y_pred, average='macro') # or 'weighted' or 'micro'

# print(f"Accuracy: {accuracy:.4f}")
# print(f"Precision: {precision:.4f}")
# print(f"Recall (Rappel): {recall:.4f}")
# print(f"F1 Score: {f1:.4f}")

In [None]:
class_report = classification_report(y_true, y_pred)
print('\t\t\tClassification report:\n\n', class_report, '\n')

In [None]:
cm = confusion_matrix(y_true, y_pred)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=[1, 2, 3])
fig, ax = plt.subplots(figsize=(6, 6))
disp.plot(ax=ax, cmap='Blues', values_format='d')
plt.title("Confusion Matrix")
plt.grid(False)
plt.tight_layout()
plt.show()