In [2]:
import os
import cv2
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split
from tqdm import tqdm
import time

2025-06-02 22:47:37.900424: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [3]:
import random
from tensorflow.keras import layers, models,optimizers, callbacks
from collections import defaultdict

In [4]:
from tensorflow.keras import mixed_precision
mixed_precision.set_global_policy('mixed_float16')


In [5]:
# ==== CONFIG ====
IMG_SIZE = 160
DATA_DIR = "/mnt/d/MyEverything/PythonProjects/Recent_projects/cnn_analysis/Hand_Drawing/quickdraw_images"  # <- Replace this with your dataset path
PARTIAL_STEPS = [0.5, 1.0]
BATCH_SIZE = 32
EPOCHS = 10
CLASSES_LIMIT = 50
MAX_SAMPLES_PER_CLASS = 4500

In [6]:
def get_filepaths_and_labels(base_dir):
    classes = sorted(os.listdir(base_dir))
    if CLASSES_LIMIT:
        classes = classes[:CLASSES_LIMIT]
    all_paths = []
    all_labels = []
    class_to_idx = {cls: i for i, cls in enumerate(classes)}
    for cls in classes:
        class_path = os.path.join(base_dir, cls)
        images = [os.path.join(class_path, img) for img in os.listdir(class_path) if img.endswith('.png')]
        all_paths.extend(images)
        all_labels.extend([cls] * len(images))
    return all_paths, all_labels, class_to_idx

In [7]:
class EpochSampler:
    def __init__(self, all_filepaths, all_labels, class_to_idx, max_per_class=10000):
        self.class_to_idx = class_to_idx
        self.max_per_class = max_per_class
        self.class_map = defaultdict(list)
        for path, label in zip(all_filepaths, all_labels):
            self.class_map[label].append(path)

    def sample_epoch_data(self):
        sampled_paths, sampled_labels = [], []
        for cls, paths in self.class_map.items():
            selected = random.sample(paths, min(self.max_per_class, len(paths)))
            sampled_paths.extend(selected)
            sampled_labels.extend([cls] * len(selected))
        return sampled_paths, sampled_labels


In [12]:
class SketchDataGenerator(tf.keras.utils.Sequence):
    def __init__(self, filepaths, labels, batch_size, class_to_idx, steps=[1.0], shuffle=True):
        self.filepaths = np.array(filepaths)
        self.labels = np.array(labels)
        self.batch_size = batch_size
        self.class_to_idx = class_to_idx
        self.steps = steps
        self.shuffle = shuffle
        self.indices = np.arange(len(self.filepaths))
        self.on_epoch_end()

    def __len__(self):
        return len(self.filepaths) // self.batch_size

    def __getitem__(self, index):
        inds = self.indices[index * self.batch_size:(index + 1) * self.batch_size]
        batch_x = self.filepaths[inds]
        batch_y = [self.class_to_idx[self.labels[i]] for i in inds]
        X = []
        
        for path in batch_x:
            img = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
            
            if img is None:
                img = np.zeros((IMG_SIZE, IMG_SIZE), dtype=np.uint8)
            else:
                img = cv2.resize(img, (IMG_SIZE, IMG_SIZE))  # ✅ Ensure correct size
            
            # Create partial mask
            mask = np.ones_like(img, dtype=np.uint8) * 255
            step_ratio = random.choice(self.steps)
            height = int(IMG_SIZE * step_ratio)
            mask[height:, :] = 255
            
            img = cv2.bitwise_and(img, mask)
            img = img.astype(np.float32) / 255.0
            img = np.expand_dims(img, axis=-1)  # Shape: (IMG_SIZE, IMG_SIZE, 1)
            X.append(img)
    
        X = np.array(X)  # Shape: (batch_size, IMG_SIZE, IMG_SIZE, 1)
        y = tf.keras.utils.to_categorical(batch_y, num_classes=len(self.class_to_idx))
        return X, y

    def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.indices)


In [13]:
# ------------------- CNN MODEL -------------------
def build_cnn_model(input_shape, num_classes):
    model = models.Sequential([
        layers.Input(shape=input_shape),
        layers.Conv2D(32, (3, 3), activation='relu'),
        layers.MaxPooling2D(2),
        layers.Conv2D(64, (3, 3), activation='relu'),
        layers.MaxPooling2D(2),
        layers.Conv2D(128, (3, 3), activation='relu'),
        layers.MaxPooling2D(2),
        layers.Flatten(),
        layers.Dense(256, activation='relu'),
        layers.Dropout(0.3),
        layers.Dense(num_classes, activation='softmax')
    ])
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    return model


In [14]:
# ------------------- MAIN -------------------
print("[INFO] Loading dataset...")
all_paths, all_labels, class_to_idx = get_filepaths_and_labels(DATA_DIR)
sampler = EpochSampler(all_paths, all_labels, class_to_idx, MAX_SAMPLES_PER_CLASS)

model = build_cnn_model((IMG_SIZE, IMG_SIZE, 1), len(class_to_idx))
model.summary()

[INFO] Loading dataset...


In [None]:

for epoch in range(EPOCHS):
    print(f"\n[INFO] Epoch {epoch + 1}/{EPOCHS}")
    epoch_paths, epoch_labels = sampler.sample_epoch_data()
    tr_paths, val_paths, tr_labels, val_labels = train_test_split(
        epoch_paths, epoch_labels, test_size=0.1, stratify=epoch_labels, random_state=random.randint(1, 9999))
    train_gen = SketchDataGenerator(tr_paths, tr_labels, BATCH_SIZE, class_to_idx, PARTIAL_STEPS)
    val_gen = SketchDataGenerator(val_paths, val_labels, BATCH_SIZE, class_to_idx, [1.0], shuffle=False)
    model.fit(train_gen, validation_data=val_gen, epochs=1)



[INFO] Epoch 1/10


  self._warn_if_super_not_called()
I0000 00:00:1748872393.243726    4173 service.cc:148] XLA service 0x7fbda800ec40 initialized for platform CUDA (this does not guarantee that XLA will be used). Devices:
I0000 00:00:1748872393.248281    4173 service.cc:156]   StreamExecutor device (0): NVIDIA GeForce RTX 3050 6GB Laptop GPU, Compute Capability 8.6
2025-06-02 22:53:13.458427: I tensorflow/compiler/mlir/tensorflow/utils/dump_mlir_util.cc:268] disabling MLIR crash reproducer, set env var `MLIR_CRASH_REPRODUCER_DIRECTORY` to enable.
I0000 00:00:1748872393.814740    4173 cuda_dnn.cc:529] Loaded cuDNN version 90300
2025-06-02 22:53:19.942682: E external/local_xla/xla/service/slow_operation_alarm.cc:65] Trying algorithm eng40{k2=5,k13=1,k14=3} for conv (f16[32,38,38,64]{3,2,1,0}, u8[0]{0}) custom-call(f16[32,36,36,128]{3,2,1,0}, f16[128,3,3,64]{3,2,1,0}), window={size=3x3}, dim_labels=b01f_o01i->b01f, custom_call_target="__cudnn$convBackwardInput", backend_config={"cudnn_conv_backend_config":

[1m1373/5766[0m [32m━━━━[0m[37m━━━━━━━━━━━━━━━━[0m [1m34:51[0m 476ms/step - accuracy: 0.2244 - loss: 3.1217

libpng error: Read Error


[1m2301/5766[0m [32m━━━━━━━[0m[37m━━━━━━━━━━━━━[0m [1m27:30[0m 476ms/step - accuracy: 0.3029 - loss: 2.7828

libpng error: Read Error


[1m4203/5766[0m [32m━━━━━━━━━━━━━━[0m[37m━━━━━━[0m [1m12:11[0m 468ms/step - accuracy: 0.3898 - loss: 2.4150

libpng error: Read Error


[1m5766/5766[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2977s[0m 514ms/step - accuracy: 0.4317 - loss: 2.2410 - val_accuracy: 0.7100 - val_loss: 1.1376

[INFO] Epoch 2/10


  self._warn_if_super_not_called()


[1m3721/5766[0m [32m━━━━━━━━━━━━[0m[37m━━━━━━━━[0m [1m20:33[0m 603ms/step - accuracy: 0.6775 - loss: 1.2359

libpng error: Read Error


[1m4936/5766[0m [32m━━━━━━━━━━━━━━━━━[0m[37m━━━[0m [1m8:04[0m 584ms/step - accuracy: 0.6789 - loss: 1.2304 

libpng error: Read Error


[1m5766/5766[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 576ms/step - accuracy: 0.6799 - loss: 1.2263  

libpng error: Read Error


[1m5766/5766[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3607s[0m 626ms/step - accuracy: 0.6799 - loss: 1.2263 - val_accuracy: 0.7437 - val_loss: 0.9868

[INFO] Epoch 3/10


  self._warn_if_super_not_called()


[1m 833/5766[0m [32m━━[0m[37m━━━━━━━━━━━━━━━━━━[0m [1m39:06[0m 476ms/step - accuracy: 0.7070 - loss: 1.1257

libpng error: Read Error


[1m2447/5766[0m [32m━━━━━━━━[0m[37m━━━━━━━━━━━━[0m [1m45:16[0m 818ms/step - accuracy: 0.7094 - loss: 1.1098

libpng error: Read Error


[1m5766/5766[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 867ms/step - accuracy: 0.7121 - loss: 1.0995   

libpng error: Read Error


[1m5766/5766[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5802s[0m 1s/step - accuracy: 0.7121 - loss: 1.0995 - val_accuracy: 0.7621 - val_loss: 0.8938

[INFO] Epoch 4/10


  self._warn_if_super_not_called()


[1m 417/5766[0m [32m━[0m[37m━━━━━━━━━━━━━━━━━━━[0m [1m1:38:28[0m 1s/step - accuracy: 0.7292 - loss: 1.0157

libpng error: Read Error


[1m4599/5766[0m [32m━━━━━━━━━━━━━━━[0m[37m━━━━━[0m [1m17:08[0m 881ms/step - accuracy: 0.7344 - loss: 1.0120   

libpng error: Read Error


[1m4906/5766[0m [32m━━━━━━━━━━━━━━━━━[0m[37m━━━[0m [1m12:33[0m 876ms/step - accuracy: 0.7344 - loss: 1.0119

libpng error: Read Error


[1m5766/5766[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5442s[0m 944ms/step - accuracy: 0.7345 - loss: 1.0115 - val_accuracy: 0.7784 - val_loss: 0.8479

[INFO] Epoch 5/10


  self._warn_if_super_not_called()


[1m 102/5766[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m1:10:35[0m 748ms/step - accuracy: 0.7528 - loss: 0.9232

In [None]:
def train_more_epochs(model, all_paths, all_labels, class_to_idx, epochs, batch_size=64, max_samples_per_class=10000, partial_steps=[0.25, 0.5, 0.75, 1.0]):
    from sklearn.model_selection import train_test_split
    import random

    sampler = EpochSampler(all_paths, all_labels, class_to_idx, max_samples_per_class)

    for epoch in range(epochs):
        print(f"\n[INFO] Additional Epoch {epoch + 1}/{epochs}")
        epoch_paths, epoch_labels = sampler.sample_epoch_data()
        tr_paths, val_paths, tr_labels, val_labels = train_test_split(
            epoch_paths, epoch_labels, test_size=0.1, stratify=epoch_labels, random_state=random.randint(1, 9999)
        )
        train_gen = SketchDataGenerator(tr_paths, tr_labels, batch_size, class_to_idx, partial_steps)
        val_gen = SketchDataGenerator(val_paths, val_labels, batch_size, class_to_idx, [1.0], shuffle=False)
        model.fit(train_gen, validation_data=val_gen, epochs=1)


In [None]:
train_more_epochs(model, all_paths, all_labels, class_to_idx, epochs=2)

In [None]:
print("\n[INFO] Measuring inference time...")
test_img_path = random.choice(all_paths)
test_img = cv2.imread(test_img_path, cv2.IMREAD_GRAYSCALE)
test_img = cv2.resize(test_img, (IMG_SIZE, IMG_SIZE))
test_img = test_img.astype(np.float32) / 255.0
test_img = np.expand_dims(test_img, axis=-1)
test_img = np.expand_dims(test_img, axis=0)

start_time = time.time()
_ = model.predict(test_img)
end_time = time.time()

print(f"[INFO] Inference time for one image: {end_time - start_time:.4f} seconds")
