In [1]:
#Import Library
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
import sklearn.metrics as skm
import copy
import sys
import cv2
import os
import random



In [2]:
channel=1

In [3]:
# Specify image size
img_size = 512

# Making function for create data from path
def create_data():

    cracked = '/kaggle/input/custom-tire512/custom_tire512/dataset/defective'
    normal = '/kaggle/input/custom-tire512/custom_tire512/dataset/normal'



    # Labels for train and test
    Labels = {cracked:0, normal:1}

    # Initializing list for storing train and test data
    data = []

    # Initializing list for storing train and test label
    labels = np.array([])

    # Looping through each label
    for label in Labels:

        # Looping through cracked train data
        for ls in os.listdir(label):

            # Join each ls element with file path
            path = os.path.join(label, ls)

            # Read images from path using cv2
            img = cv2.imread(path, 0)
            img = cv2.resize(img, (img_size, img_size))

            # Adding data into data and labels list
            data.append(np.array(img))
            labels = np.append(labels, Labels[label])

    return np.array(data), labels


In [4]:
combined_data, combined_label = create_data()

print(f"Train data with shape {combined_data.shape}")

Train data with shape (5764, 512, 512)


In [5]:
def drop_duplicates(images, labels):
    # Convert images to grayscale
    grayscale_images = images

    # Calculate hash values for each image
    hash_values = [np.sum(image.astype(np.float32)) for image in grayscale_images]

    # Create a dictionary to store seen hash values
    seen_hash_values = {}

    # Remove duplicate images and labels based on hash values
    filtered_images = []
    filtered_labels = []
    for hash_value, image, label in zip(hash_values, images, labels):
        if hash_value not in seen_hash_values:
            seen_hash_values[hash_value] = True
            filtered_images.append(image)
            filtered_labels.append(label)

    return np.array(filtered_images, dtype = np.uint8), np.array(filtered_labels)

In [6]:
combined_data, combined_label = drop_duplicates(combined_data, combined_label)
print(f"Train data with shape {combined_data.shape}")

Train data with shape (3934, 512, 512)


In [7]:
np.unique(combined_label)

count_0 = np.count_nonzero(combined_label == 0)
count_1 = np.count_nonzero(combined_label == 1)

print("Count of 0:", count_0)
print("Count of 1:", count_1)

Count of 0: 2013
Count of 1: 1921


In [8]:
# def auto_contrast_brightness(gray):
#     # Calculate histogram
#     hist = cv2.calcHist([gray], [0], None, [256], [0, 256])

#     # Find minimum and maximum non-zero intensity values
#     min_val = np.where(hist > 0)[0][0]
#     max_val = np.where(hist > 0)[0][-1]

#     # Calculate new contrast factor (alpha)
#     alpha = 255.0 / (max_val - min_val) if max_val != min_val else 0  # Avoid division by zero

#     # Calculate new brightness factor (beta) to center the histogram
#     mid_point = (min_val + max_val) / 2.0
#     beta = -min_val * alpha

#     # Apply contrast and brightness adjustments
#     adjusted = cv2.convertScaleAbs(gray, alpha=alpha, beta=beta)

#     return adjusted


# auto_contrast_brightness(gray)

In [9]:
combined_data = combined_data/np.max(combined_data)

In [10]:
train_data, test_data, train_label, test_label = train_test_split(combined_data,combined_label, test_size=.1,
                                                                    stratify=combined_label, random_state = 60)

In [11]:
# noise_traindata = copy.deepcopy(train_data)
# noise_traindata = (noise_traindata + 0.25*np.random.rand(*noise_traindata.shape) / 1.1)
# noise_traindata = np.clip(noise_traindata, 0, 1) 

# train_data = np.concatenate((train_data, noise_traindata))
# train_label = np.concatenate((train_label, train_label))
# print(train_data.shape)
# print(train_label.shape)

# random_idx = np.random.randint(low=0, high=train_data.shape[0]/2, size=10)

# # Initializing fig and ax variables
# fig, axs = plt.subplots(2, 5, figsize=(16,8))

# # Looping through each axs
# for i, ax in enumerate(axs.flatten()):

#     # Showing image
#     ax.imshow(noise_traindata[random_idx[i],:], cmap='gray')
#     ax.axis('off')
    
#     # Set title for each image
#     if train_label[random_idx[i]] ==0:
#         ax.set_title(f"Cracked tire")
#     else:
#         ax.set_title(f"Normal tire")
# fig.tight_layout()
# plt.show()

In [12]:
train_dataT = torch.tensor(train_data.reshape(train_data.shape[0], channel, img_size, img_size)).float()
test_dataT = torch.tensor(test_data.reshape(test_data.shape[0], channel, img_size, img_size)).float()
# Label has to be a vector
train_labelT = torch.tensor(train_label.reshape(-1, 1)).float()
test_labelT = torch.tensor(test_label.reshape(-1, 1)).float()

print(f"Train data with shape {train_dataT.shape}")
print(f"Test data with shape {test_dataT.shape}")

print(f"Train label with shape {train_labelT.shape}")
print(f"Test label with shape {test_labelT.shape}")

Train data with shape torch.Size([3540, 1, 512, 512])
Test data with shape torch.Size([394, 1, 512, 512])
Train label with shape torch.Size([3540, 1])
Test label with shape torch.Size([394, 1])


In [13]:
# Split 50% to test and another to validation
# test_data_split, val_data_split, test_label_split, val_label_split = train_test_split(
#                                                                     test_dataT, test_labelT, test_size=.5,
#                                                                     stratify=test_labelT, random_state=42)

# Create TensorDataset Object
train_tensor = torch.utils.data.TensorDataset(train_dataT, train_labelT)
test_tensor = torch.utils.data.TensorDataset(test_dataT, test_labelT)


# Create DataLoader Object
batch_size = 32
train_loader = torch.utils.data.DataLoader(train_tensor, batch_size=batch_size, shuffle=True, drop_last=True)
test_loader = torch.utils.data.DataLoader(test_tensor, batch_size=batch_size)


In [14]:
def create_model(printsize=False):

    # CNN model class

    class TireNet(nn.Module):
        def __init__(self):
            super(TireNet, self).__init__()
            self.conv1 = nn.Conv2d(channel, 32, kernel_size=2, stride=1, padding=1)  # Increased filter size to 3
            self.bn1 = nn.BatchNorm2d(32)
            self.conv12 = nn.Conv2d(32, 64, kernel_size=2, stride=1, padding=1)  # Increased filter size to 3
            self.bn12 = nn.BatchNorm2d(64)
            self.conv2 = nn.Conv2d(64, 128, kernel_size=2, stride=1, padding=1)
            self.bn2 = nn.BatchNorm2d(128)
            self.conv3 = nn.Conv2d(128, 256, kernel_size=2, stride=1, padding=1)
            self.bn3 = nn.BatchNorm2d(256)
            self.pool = nn.MaxPool2d(2, 2)
            self.dp1 = nn.Dropout2d(p=0.0325)
            
#             self.fc1 = nn.Linear(256 * 28 * 28, 1)  # Adjusted for 32x32 input and 3x3 filters
            self.gap = nn.AdaptiveAvgPool2d((1, 1))
    
            self.gapS = nn.AdaptiveAvgPool2d((1, 1))
    
            self.conv5 = nn.Conv2d(256,128, kernel_size=2, stride=1, padding=1)
            self.bn5 = nn.BatchNorm2d(128)
            self.conv56 = nn.Conv2d(128,64, kernel_size=2, stride=1, padding=1)
            self.bn56 = nn.BatchNorm2d(64)
            self.conv6 = nn.Conv2d(64,32, kernel_size=2, stride=1, padding=1)
            self.bn6 = nn.BatchNorm2d(32)
            self.conv7 = nn.Conv2d(32,1, kernel_size=2, stride=1, padding=1)
            
            
            self.convS = nn.Conv2d(128,1, kernel_size=2, stride=1, padding=1)

        def forward(self, x):
            x = F.leaky_relu(self.bn1(self.conv1(x)), 0.01) 
            x = self.dp1(x)
            x = self.pool(x)
            x = F.leaky_relu(self.bn12(self.conv12(x)), 0.01) 
            x = self.dp1(x)
            x = self.pool(x)
            x = F.leaky_relu(self.bn2(self.conv2(x)), 0.01) 
            x = self.dp1(x)
            x = self.pool(x)
            x2=x
            x = F.leaky_relu(self.bn3(self.conv3(x)), 0.01) 
            x = self.dp1(x)
            x = self.pool(x)
            
            
#             x = x.view(-1, 256 * 28* 28)
#             x = (self.fc1(x))
            x = F.leaky_relu(self.bn5(self.conv5(x)), 0.01)
            x = self.dp1(x)
            x = self.pool(x)
            x = F.leaky_relu(self.bn56(self.conv56(x)), 0.01)
            x = self.dp1(x)
            x = self.pool(x)
            x = F.leaky_relu(self.bn6(self.conv6(x)), 0.01)
            x = self.dp1(x)
            x = self.pool(x)
            x = self.conv7(x)
            x = self.pool(x)
            x = self.gap(x)
            
            
            x2 = self.convS(x2)
            x2 = self.pool(x2)
            x2 = self.gapS(x2)
            x2 = x2.squeeze(-1).squeeze(-1)
            
            
            
            
            x = x.squeeze(-1).squeeze(-1)
            
            x += x2
            return x

    # Create variables for model, loss function, optimizer
    model = TireNet()
    lossfunc = nn.BCEWithLogitsLoss()
    #optimizer = torch.optim.SGD(model.parameters(), lr=0.005, weight_decay = 0.005, momentum = 0.9)
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=0.0005)

    return model, lossfunc, optimizer

In [15]:
# Checking for GPU
torch.cuda.empty_cache()
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

device(type='cuda')

In [16]:
# Create train_model function
def train_model(model, lossfunc, optimizer, n_epochs, train_data, val_data):

    # For storing best perfrom model!
    bestmodel = {'acc': 0, 'net': None}

    # Initializing variables for storing result

    # Use GPU for our model
    model = model.to(device)
    val_losses=0;
    count=0;

    # Training loop
    for epochi in range(n_epochs):

        # Training mode
        model.train()

        # Variable for storing result each batch
        batch_train = []
        batch_loss = []

        # Training in train_data
        for X, y in train_data:

            # Push data into GPU
            X = X.to(device)
            y = y.to(device)

            # Predict model
            pred = model(X)
            # How many error in this prediction ? (compute for error)
            loss = lossfunc(pred, y)

            # Set gradient to zero
            optimizer.zero_grad()
            # Compute Loss
            loss.backward()
            # Do back prop
            optimizer.step()

            # Storing loss this batch into batch_loss
            batch_loss.append(loss.item())

            # move pred, y to CPU
            pred = pred.cpu()
            y = y.cpu()

            # Computing accuracy and storing result
            matches = ((pred>0)==y).float()
            batch_train.append(100*torch.mean(matches).item())

        # Compute average of batch_train and batch_loss
        train_acc = (np.mean(batch_train))
        train_losses = (np.mean(batch_loss))

        # Using eval mode
        model.eval()

        batch_val = []
        batch_val_loss = []
        
        # Testing in validation_data to avoid overfitting
        for X, y in val_data:

            # Push data into GPU
            X = X.to(device)
            y = y.to(device)

            # Using torch.no_grad() to switch off gradient computation
            with torch.no_grad():
                # Predict model
                pred = model(X)

            loss = lossfunc(pred, y)
            # Store test loss
            batch_val_loss.append(loss.item())

            # move pred to CPU
            pred = pred.cpu()
            y = y.cpu()

            matches = ((pred>0)==y).float()
            batch_val.append(100*torch.mean(matches).item())

        val_acc = (np.mean(batch_val))
        
        if(val_losses<(np.mean(batch_val_loss))):
            count =count+1;
#         if(count>10):
#             break;
        val_losses = (np.mean(batch_val_loss))
        
        
        # Craete msg for printing result
        msg = f"Epoch {epochi+1} out of {n_epochs} with Train acc {train_acc:.2f}% Val acc {val_acc:.2f}% and Val Loss {val_losses:.8f}"
        sys.stdout.write('\r'+msg)

        # Storing bestmodel
        if val_acc > bestmodel['acc']:

            bestmodel['acc'] = val_acc.item()
            bestmodel['net'] = copy.deepcopy(model.state_dict())

    return train_acc, val_acc, train_losses, val_losses, model , bestmodel

In [17]:
# Create model instance
model, lossfunc, optimizer = create_model()
# Train with 150 epochs
numepochs = 300
train_acc, val_acc, train_losses, val_losses, model, bestmodel = train_model(model, lossfunc, optimizer, numepochs, train_loader, test_loader)

#train_acc, val_acc, train_losses, val_losses, model = train_model(model, lossfunc, optimizer, numepochs, train_loader, val_loader)

Epoch 300 out of 300 with Train acc 99.29% Val acc 97.84% and Val Loss 0.09092644

In [18]:
# Recreate new model
torch.save(model.state_dict(), 'model.pth') 

    
# Load best model
torch.save(bestmodel['net'], 'bestmodel1.pth') 
bestmodel = {'acc': 0, 'net': None}
model = None

In [19]:
torch.cuda.empty_cache()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = create_model()[0].to(device)  # Move model to GPU
model.load_state_dict(torch.load('bestmodel1.pth'))
model.eval()
correct = 0
total = 0

# Iterate over all batches in the test loader
for test_set, test_set_label in test_loader:
    test_set, test_set_label = test_set.to(device), test_set_label.to(device)  # Move data to GPU

    # Forward pass through model
    test_result = model(test_set)  # No need for .cpu() as results are already on GPU

    # Calculate accuracy for each batch
    correct_batch = (test_result > 0) == test_set_label
    correct += torch.sum(correct_batch).item()  # Move correct_batch back to CPU for accumulation
    total += correct_batch.size(0)

# Calculate overall accuracy
accuracy = 100 * correct / total
print("Test accuracy is {:.2f}%".format(accuracy))

Test accuracy is 97.72%


In [20]:
torch.cuda.empty_cache()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = create_model()[0].to(device)  # Move model to GPU
model.load_state_dict(torch.load('model.pth'))

correct = 0
total = 0
model.eval()
# Iterate over all batches in the test loader
for test_set, test_set_label in test_loader:
    test_set, test_set_label = test_set.to(device), test_set_label.to(device)  # Move data to GPU

    # Forward pass through model
    test_result = model(test_set)  # No need for .cpu() as results are already on GPU

    # Calculate accuracy for each batch
    correct_batch = (test_result > 0) == test_set_label
    correct += torch.sum(correct_batch).item()  # Move correct_batch back to CPU for accumulation
    total += correct_batch.size(0)

# Calculate overall accuracy
accuracy = 100 * correct / total
print("Test accuracy is {:.2f}%".format(accuracy))

OutOfMemoryError: CUDA out of memory. Tried to allocate 134.00 MiB (GPU 0; 14.75 GiB total capacity; 14.08 GiB already allocated; 39.06 MiB free; 14.57 GiB reserved in total by PyTorch) If reserved memory is >> allocated memory try setting max_split_size_mb to avoid fragmentation.  See documentation for Memory Management and PYTORCH_CUDA_ALLOC_CONF