In [9]:
# imports
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from torch.cuda.amp import autocast
from torchvision.ops import DeformConv2d
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torchsummary import summary
from contextlib import redirect_stdout
from tqdm import tqdm


import os
import h5py
import json
import gc
import io
import joblib

from sklearn.calibration import LabelEncoder
from sklearn.model_selection import train_test_split

In [2]:
# Load data

# Paths & indices
h5_path = '/home/all/processed_data/image_torchtensor_1024.h5'  # Update with your path
styles = np.load('/home/all/processed_data/styles_1024.npy', allow_pickle=True)  # Your styles data


# Load one image to get the input shape
with h5py.File(h5_path, 'r') as h5file:
    one_file = h5file['images'][0:1]  # Load the first image

# Not going to load X yet. because it is too big.
# We are going to load X batch by batch when model.fit.

le = joblib.load('label_encoder.joblib')

y = le.transform(styles)

# Convert the NumPy array of labels into a torch tensor
# y_tensor = torch.from_numpy(y).long()  # Ensure it's a LongTensor for classification tasks


# Assuming total number of images
num_images = len(y)  # or len(combined_df)
indices = np.arange(num_images)

print('Data load success')

# Split indices
indices_train, indices_temp, y_train, y_temp = train_test_split(indices,y, test_size=0.2, random_state=1, stratify=y)
indices_val, indices_test, y_val, y_test = train_test_split(indices_temp, y_temp, test_size=0.5, random_state=1, stratify=y_temp)

np.save('indices_test.npy', np.array(indices_test))

class H5Dataset(Dataset):
    def __init__(self, h5_path, indices, styles):
        self.h5_path = h5_path
        self.indices = indices
        self.styles = styles

    def __len__(self):
        return len(self.indices)

    def __getitem__(self, idx):
        with h5py.File(self.h5_path, 'r') as h5file:
            # Use the index to access the image and label
            image = h5file['images'][self.indices[idx]]
            styles = self.styles[self.indices[idx]]
            return torch.from_numpy(image).float(), torch.tensor(styles).long()

# Load your data and labels
train_data = H5Dataset(h5_path, indices_train, y)
val_data = H5Dataset(h5_path, indices_val, y)
test_data = H5Dataset(h5_path, indices_test, y)

batch_size = 3  # Define your batch size

# Create data loaders
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True, drop_last=True)
val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=False, drop_last=True)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False, drop_last=True)

print('Data loader set')

Data load success
Data loader set


In [6]:
# class for Offset. making offset as a trainable variable, and keep the already trained offset consistent within next batch.
class OffsetPredictor(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride, padding):
        super(OffsetPredictor, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding)

    def forward(self, x):
        return self.conv(x)

# Handmade Conv2D Model
class Model(nn.Module):
    def __init__(self):
        super(Model, self).__init__()
        self.offset_predictor1 = OffsetPredictor(one_file.shape[1], 2*4*4, kernel_size=4, stride=1, padding=1)  # For a 4x4 kernel
        self.offset_predictor2 = OffsetPredictor(128, 2*4*4, kernel_size=4, stride=1, padding=1)
        self.offset_predictor3 = OffsetPredictor(96, 2*4*4, kernel_size=4, stride=1, padding=1)
        self.offset_predictor4 = OffsetPredictor(64, 2*4*4, kernel_size=4, stride=1, padding=1)
        self.offset_predictor5 = OffsetPredictor(32, 2*4*4, kernel_size=4, stride=1, padding=1)
        
        # Deformable Convolution layers
        
        # 3 -> 128
        self.deform_conv2d1 = DeformConv2d(in_channels=one_file.shape[1], out_channels=128, kernel_size=4, stride=1, padding=1)  # padding=1 for 'same'
        self.batchnorm2d1   = nn.BatchNorm2d(128)
        self.leakyrelu1     = nn.LeakyReLU(0.01)
        self.dropout2d1     = nn.Dropout2d(p=0.2)
        self.avgpool2d1     = nn.AvgPool2d(kernel_size=2)
        
        # 128 -> 96
        self.deform_conv2d2 = DeformConv2d(in_channels=128, out_channels=96, kernel_size=4, stride=1, padding=1)
        self.batchnorm2d2   = nn.BatchNorm2d(96)
        self.leakyrelu2     = nn.LeakyReLU(0.01)
        self.dropout2d2     = nn.Dropout2d(p=0.2)
        self.avgpool2d2     = nn.AvgPool2d(kernel_size=2)

        # 96 -> 64
        self.deform_conv2d3 = DeformConv2d(in_channels=96, out_channels=64, kernel_size=4, stride=1, padding=1)
        self.batchnorm2d3   = nn.BatchNorm2d(64)
        self.leakyrelu3     = nn.LeakyReLU(0.01)
        self.dropout2d3     = nn.Dropout2d(p=0.2)
        self.avgpool2d3     = nn.AvgPool2d(kernel_size=2)
        
        # 64 -> 32
        self.deform_conv2d4 = DeformConv2d(in_channels=64, out_channels=32, kernel_size=4, stride=1, padding=1)
        self.batchnorm2d4   = nn.BatchNorm2d(32)
        self.leakyrelu4     = nn.LeakyReLU(0.01)
        self.dropout2d4     = nn.Dropout2d(p=0.2)
        self.avgpool2d4     = nn.AvgPool2d(kernel_size=2)

        # 32 -> 16
        self.deform_conv2d5 = DeformConv2d(in_channels=32, out_channels=16, kernel_size=4, stride=1, padding=1)
        self.batchnorm2d5   = nn.BatchNorm2d(16)
        self.leakyrelu5     = nn.LeakyReLU(0.01)
        self.dropout2d5     = nn.Dropout2d(p=0.2)
        self.avgpool2d5     = nn.AvgPool2d(kernel_size=2)
        
        # Fully connected layers
        self.flatten      = nn.Flatten()
        self.dropout1d1   = nn.Dropout(0.3)
        self.linear1      = nn.Linear(15376,4096)  # 16384 개 나와야함 / padding 뭔가 이상해서 15376개 나옴
        self.batchnorm1d1 = nn.BatchNorm1d(4096)
        self.leakyrelu6   = nn.LeakyReLU(0.01)
        self.linear2      = nn.Linear(4096,512)
        self.batchnorm1d2 = nn.BatchNorm1d(512)
        self.leakyrelu7   = nn.LeakyReLU(0.01)
        self.linear3      = nn.Linear(512,64)
        self.batchnorm1d3 = nn.BatchNorm1d(64)
        self.leakyrelu8   = nn.LeakyReLU(0.01)
        self.linear4      = nn.Linear(64,7)
                          # nn.Softmax(dim=1)


    def forward(self, x):
        # Predict offsets for the first deformable convolution layer
        offset1 = self.offset_predictor1(x)        
        x = self.deform_conv2d1(x, offset1)
        x = self.batchnorm2d1(x)
        x = self.leakyrelu1(x)
        x = self.dropout2d1(x)
        x = self.avgpool2d1(x)
        
        offset2 = self.offset_predictor2(x)        
        x = self.deform_conv2d2(x, offset2)
        x = self.batchnorm2d2(x)
        x = self.leakyrelu2(x)
        x = self.dropout2d2(x)
        x = self.avgpool2d2(x)
        
        offset3 = self.offset_predictor3(x)        
        x = self.deform_conv2d3(x, offset3)
        x = self.batchnorm2d3(x)
        x = self.leakyrelu3(x)
        x = self.dropout2d3(x)
        x = self.avgpool2d3(x)
        
        offset4 = self.offset_predictor4(x)        
        x = self.deform_conv2d4(x, offset4)
        x = self.batchnorm2d4(x)
        x = self.leakyrelu4(x)
        x = self.dropout2d4(x)
        x = self.avgpool2d4(x)

        offset5 = self.offset_predictor5(x)        
        x = self.deform_conv2d5(x, offset5)
        x = self.batchnorm2d5(x)
        x = self.leakyrelu5(x)
        x = self.dropout2d5(x)
        x = self.avgpool2d5(x)        
        
        x = self.flatten(x)
        x = self.dropout1d1(x)
        x = self.linear1(x)
        x = self.batchnorm1d1(x)
        x = self.leakyrelu6(x)
        x = self.linear2(x)
        x = self.batchnorm1d2(x)
        x = self.leakyrelu7(x)
        x = self.linear3(x)
        x = self.batchnorm1d3(x)
        x = self.leakyrelu8(x)
        x = self.linear4(x)
        return x

# Initialize the model
model = Model()

In [7]:
type(model)

__main__.Model

In [11]:
state_dict = torch.load('best_model_state_dict.pth')
model.load_state_dict(state_dict)

# Setting device to GPU if available
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device = torch.device('cpu')
model = model.to(device)
print(device)

cpu


In [13]:
model.eval()  # Set the model to evaluation mode

# Initialize necessary metrics
correct = 0
total = 0

y_pred = []
y_true = []

# No need to track gradients for validation, which saves memory and computations
with torch.no_grad():
    # Wrap your loader with tqdm for a progress bar
    pbar_test = tqdm(enumerate(test_loader), total=len(test_loader), desc=f"Epoch 1/1")
    for i, (images, labels) in pbar_test:
        with autocast():
            # Move tensors to the same device as model
            images, labels = images.to(device), labels.to(device)
            
            outputs = model(images)
            
            # Get predictions from the maximum value
            _, predicted = torch.max(outputs.data, 1)
            y_pred.extend(predicted.cpu().numpy())  # Move back to cpu and convert to numpy
            y_true.extend(labels.cpu().numpy())
            
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            # Update progress bar
            # pbar_test.set_postfix({'loss': running_loss / (i + 1)})

# Calculate accuracy
accuracy = 100 * correct / total
print(f'Accuracy of the model on the test images: {accuracy}%')



Epoch 1/1:   0%|          | 1/391 [00:47<5:09:17, 47.58s/it]


KeyboardInterrupt: 

In [42]:

from sklearn.metrics import precision_score, recall_score, f1_score, confusion_matrix

import matplotlib.pyplot as plt


# Calculating Precision, Recall, F1 Score, and Confusion Matrix
precision = precision_score(y_true, y_pred, average='weighted')  # or other averaging method
recall = recall_score(y_true, y_pred, average='weighted')
f1 = f1_score(y_true, y_pred, average='weighted')
conf_matrix = confusion_matrix(y_true, y_pred)

# Printing the metrics
print(f'Precision: {precision}')
print(f'Recall: {recall}')
print(f'F1 Score: {f1}')
print("Confusion Matrix:\n", conf_matrix)

  _warn_prf(average, modifier, msg_start, len(result))


Precision: 0.4863070696235053
Recall: 0.6973572037510657
F1 Score: 0.5730167681249339
Confusion Matrix:
 [[  0   0   0   0 127   0   0]
 [  0   0   0   0  87   0   0]
 [  0   0   0   0  74   0   0]
 [  0   0   0   0  16   0   0]
 [  0   0   0   0 818   0   0]
 [  0   0   0   0   6   0   0]
 [  0   0   0   0  45   0   0]]


In [43]:
# Assuming le is your LabelEncoder object and y_pred_labels are your predicted labels
label_names = le.inverse_transform([0,1,2,3,4,5,6])

# Now label_names will have the original string names of the labels
print(label_names)

['Classic' 'Contemporary' 'Country' 'Minimalism' 'Modern' 'Unique' 'Urban']


In [14]:
with open('history.json', 'r') as f:
    history = json.load(f)

# Extracting values for plotting
train_loss = history['train_loss']
val_loss = history['val_loss']
train_accuracy = history['train_accuracy']
val_accuracy = history['val_accuracy']
epochs = range(1, len(train_loss) + 1)  # 1, 2, ... , num_epochs

# Creating subplots for loss and accuracy
fig, ax = plt.subplots(1, 2, figsize=(12, 6))

# Plotting training and validation loss
ax[0].plot(epochs, train_loss, 'r', label='Training loss')
ax[0].plot(epochs, val_loss, 'b', label='Validation loss')
ax[0].set_title('Training and Validation Loss')
ax[0].set_xlabel('Epochs')
ax[0].set_ylabel('Loss')
ax[0].legend()

# Plotting training and validation accuracy
ax[1].plot(epochs, train_accuracy, 'r', label='Training accuracy')
ax[1].plot(epochs, val_accuracy, 'b', label='Validation accuracy')
ax[1].set_title('Training and Validation Accuracy')
ax[1].set_xlabel('Epochs')
ax[1].set_ylabel('Accuracy')
ax[1].legend()

# Show the plots
plt.tight_layout()
plt.show()

NameError: name 'plt' is not defined

In [15]:
# See weights and biases directly

# Function to calculate percentages
def calculate_percentage(tensor, threshold):
    # Flatten the tensor to make the calculation easier
    flattened_tensor = tensor.flatten()
    # Calculate the number of elements greater than the threshold
    count = torch.sum(flattened_tensor.abs() > threshold).item()
    # Calculate the percentage
    total_elements = flattened_tensor.numel()
    percentage = 100.0 * count / total_elements
    return percentage

# Analysis
for name, param in state_dict.items():
    # Calculate percentages for each threshold
    percent_over_1 = calculate_percentage(param, 1)
    percent_over_10 = calculate_percentage(param, 10)
    
    print(f"{name} \n- over1: {percent_over_1:.2f}%, over10: {percent_over_10:.2f}%")


offset_predictor1.conv.weight 
- over1: 0.00%, over10: 0.00%
offset_predictor1.conv.bias 
- over1: 0.00%, over10: 0.00%
offset_predictor2.conv.weight 
- over1: 0.00%, over10: 0.00%
offset_predictor2.conv.bias 
- over1: 0.00%, over10: 0.00%
offset_predictor3.conv.weight 
- over1: 0.00%, over10: 0.00%
offset_predictor3.conv.bias 
- over1: 0.00%, over10: 0.00%
offset_predictor4.conv.weight 
- over1: 0.00%, over10: 0.00%
offset_predictor4.conv.bias 
- over1: 0.00%, over10: 0.00%
offset_predictor5.conv.weight 
- over1: 0.00%, over10: 0.00%
offset_predictor5.conv.bias 
- over1: 0.00%, over10: 0.00%
deform_conv2d1.weight 
- over1: 0.00%, over10: 0.00%
deform_conv2d1.bias 
- over1: 0.00%, over10: 0.00%
batchnorm2d1.weight 
- over1: 48.44%, over10: 0.00%
batchnorm2d1.bias 
- over1: 0.00%, over10: 0.00%
batchnorm2d1.running_mean 
- over1: 34.38%, over10: 0.00%
batchnorm2d1.running_var 
- over1: 3.12%, over10: 0.00%
batchnorm2d1.num_batches_tracked 
- over1: 100.00%, over10: 100.00%
deform_conv2d