In [None]:
import pandas as pd
import numpy as np
import os
import cv2
import shutil
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.preprocessing.image import ImageDataGenerator # type: ignore
from tensorflow.keras.layers import Dense, Dropout, Conv2D, MaxPooling2D, BatchNormalization # type: ignore
from tensorflow.keras.optimizers import Adam # type: ignore
from tensorflow.keras.models import Model # type: ignore
from tensorflow.keras import backend as K # type: ignore
import albumentations as A # type: ignore
import json

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# Set up parameters
limiter = 8
data_path = '/content/drive/My Drive/Colab Notebooks/crane parts 5'
train_path = os.path.join(data_path, 'train')
test_path = os.path.join(data_path, 'test')
paths = [train_path, test_path]
names = ['train', 'test']

In [None]:
for i in range(2):
    path = paths[i]
    name = names[i]
    classes = os.listdir(path)
    filepaths = []
    labels = []
    for klass in classes:
        classpath = os.path.join(path, klass)
        flist = os.listdir(classpath)
        if len(flist) > limiter:
            flist = np.random.choice(flist, limiter, replace=False)
        for f in flist:
            fpath = os.path.join(classpath, f)
            filepaths.append(fpath)
            labels.append(klass)
    Fseries = pd.Series(filepaths, name='filepaths')
    Lseries = pd.Series(labels, name='labels')
    if name == 'train':
        train_df = pd.concat([Fseries, Lseries], axis=1)
    else:
        test_df = pd.concat([Fseries, Lseries], axis=1)

num_of_classes = len(classes)
print(f'test_df length= {len(test_df)}  train_df length= {len(train_df)}')

test_df length= 25  train_df length= 40


In [None]:
import shutil
import os

# Define the path to the 'aug' directory
data_path = '/content/drive/My Drive/Colab Notebooks/working_dir/aug'

# Delete everything inside the 'aug' folder
def delete_contents(folder_path):
    for filename in os.listdir(folder_path):
        file_path = os.path.join(folder_path, filename)
        try:
            if os.path.isfile(file_path) or os.path.islink(file_path):
                os.unlink(file_path)
            elif os.path.isdir(file_path):
                shutil.rmtree(file_path)
        except Exception as e:
            print(f'Failed to delete {file_path}. Reason: {e}')



In [None]:
def balance(df, n,column, working_dir, img_size):
    def get_augmented_image(image): # given an image this function returns an augmented image
        width=int(image.shape[1]*.8)
        height=int(image.shape[0]*.8)
        transform= A.Compose([
            A.HorizontalFlip(p=.5),
            A.Rotate(limit=30, p=.25),
            A.RandomBrightnessContrast(p=.5),
            A.RandomGamma(p=.5),
            A.RandomCrop(width=width, height=height, p=.25) ])
        return transform(image=image)['image']
    def dummy(image):
        return image

    df=df.copy()
    print('Initial length of dataframe is ', len(df))
    aug_dir=os.path.join(working_dir, 'aug')# directory to store augmented images
    if os.path.isdir(aug_dir):# start with an empty directory
        shutil.rmtree(aug_dir)
    os.mkdir(aug_dir)
    for label in df[column].unique():
        dir_path=os.path.join(aug_dir,label)
        os.mkdir(dir_path) # make class directories within aug directory
    # create and store the augmented images
    total=0
    groups=df.groupby(column) # group by class
    for label in df[column].unique():  # for every class
        msg=f'augmenting images in train set  for class {label}                                              '
        print(msg, '\r', end='')
        group=groups.get_group(label)  # a dataframe holding only rows with the specified label
        sample_count=len(group)   # determine how many samples there are in this class
        if sample_count< n: # if the class has less than target number of images
            aug_img_count=0
            delta=n - sample_count  # number of augmented images to create
            target_dir=os.path.join(aug_dir, label)  # define where to write the images
            desc=f'augmenting class {label:25s}'
            for i in range(delta):
                j= i % sample_count # need this because we may have to go through the image list several times to get the needed number
                img_path=group['filepaths'].iloc[j]
                img=cv2.imread(img_path)
                img=get_augmented_image(img)
                fname=os.path.basename(img_path)
                fname='aug' +str(i) +'-' +fname
                dest_path=os.path.join(target_dir, fname)
                cv2.imwrite(dest_path, img)
                aug_img_count +=1
            total +=aug_img_count

    print('')
    print('Total Augmented images created= ', total)
    # create aug_df and merge with train_df to create composite training set ndf
    aug_fpaths=[]
    aug_labels=[]
    classlist=sorted(os.listdir(aug_dir))
    for klass in classlist:
        classpath=os.path.join(aug_dir, klass)
        flist=sorted(os.listdir(classpath))
        for f in flist:
            fpath=os.path.join(classpath,f)
            aug_fpaths.append(fpath)
            aug_labels.append(klass)
    Fseries=pd.Series(aug_fpaths, name='filepaths')
    Lseries=pd.Series(aug_labels, name='labels')
    aug_df=pd.concat([Fseries, Lseries], axis=1)
    df=pd.concat([df,aug_df], axis=0).reset_index(drop=True)
    print('Length of augmented dataframe is now ', len(df))
    return df

In [None]:
n=30  # number of image files desired for each class
column='labels'
working_dir=r'/content/drive/My Drive/Colab Notebooks/working_dir' # where to store the augment images
img_size=(224,224)
# Execute the deletion
delete_contents(data_path)
train_df=balance(train_df, n,column, working_dir, img_size)

Initial length of dataframe is  40

Total Augmented images created=  110
Length of augmented dataframe is now  150


In [None]:
display(train_df)

Unnamed: 0,filepaths,labels
0,/content/drive/My Drive/Colab Notebooks/crane ...,bridge idler wheel
1,/content/drive/My Drive/Colab Notebooks/crane ...,bridge idler wheel
2,/content/drive/My Drive/Colab Notebooks/crane ...,bridge idler wheel
3,/content/drive/My Drive/Colab Notebooks/crane ...,bridge idler wheel
4,/content/drive/My Drive/Colab Notebooks/crane ...,bridge idler wheel
...,...,...
145,/content/drive/My Drive/Colab Notebooks/workin...,radio control
146,/content/drive/My Drive/Colab Notebooks/workin...,radio control
147,/content/drive/My Drive/Colab Notebooks/workin...,radio control
148,/content/drive/My Drive/Colab Notebooks/workin...,radio control


In [None]:
gen = ImageDataGenerator(rescale=1./255)

In [None]:
train_gen = gen.flow_from_dataframe(
    train_df,
    x_col='filepaths',
    y_col='labels',
    target_size=(128, 128),  # Make sure this matches the input_shape expected by the model
    class_mode='categorical',
    batch_size=32,
    shuffle=True
)

Found 150 validated image filenames belonging to 5 classes.


In [None]:
test_gen = gen.flow_from_dataframe(
    test_df,
    x_col='filepaths',
    y_col='labels',
    target_size=(128, 128),
    class_mode='categorical',
    batch_size=32,
    shuffle=False
)

Found 25 validated image filenames belonging to 5 classes.


In [None]:
class_labels = train_gen.class_indices
class_labels = {v: k for k, v in class_labels.items()}  # Invert the dictionary to get index to label mapping

# Specify the path where you want to save the JSON file
json_path = '/content/drive/My Drive/Colab Notebooks/model/class_labels.json'

with open(json_path, 'w') as json_file:
    json.dump(class_labels, json_file)

In [None]:
def make_simple_model(input_shape, num_classes):
    base_model = keras.applications.MobileNetV2(input_shape=input_shape, include_top=False, weights='imagenet', pooling='max')
    base_model.trainable = False
    x = BatchNormalization()(base_model.output)
    x = Dense(128, activation='relu')(x)
    x = Dropout(0.5)(x)
    outputs = Dense(num_classes, activation='softmax')(x)
    model = Model(inputs=base_model.input, outputs=outputs)
    model.compile(optimizer=Adam(learning_rate=0.001), loss='categorical_crossentropy', metrics=['accuracy'])
    return model

model = make_simple_model((128, 128, 3), len(train_df['labels'].unique()))

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_128_no_top.h5


In [None]:

epochs = 20  # Adjust based on the complexity of your dataset and desired accuracy
history = model.fit(train_gen, epochs=epochs, validation_data=test_gen)

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


In [None]:
results = model.evaluate(test_gen)
print(f"Test Loss: {results[0]}, Test Accuracy: {results[1]}")

Test Loss: 0.11169354617595673, Test Accuracy: 0.9599999785423279


In [None]:
model.save('/content/drive/My Drive/Colab Notebooks/model/crane_parts_model.keras')
