In [None]:
import tensorflow as tf
print("TF version:", tf.__version__)
print("Built with CUDA:", tf.test.is_built_with_cuda())
print("GPU Available:", tf.config.list_physical_devices('GPU'))

In [None]:

import gc
import tensorflow as tf
from numba import cuda

gc.collect()
tf.keras.backend.clear_session()
cuda.select_device(0)
cuda.close()

In [None]:
import os
os.environ["TF_ENABLE_ONEDNN_OPTS"] = "0"
import warnings
warnings.filterwarnings("ignore")
import tensorflow as tf
import matplotlib.pyplot as plt
tf.compat.v1.set_random_seed(0)
from tensorflow import keras
import numpy as np
np.random.seed(0)
import itertools
from keras.preprocessing import image_dataset_from_directory
from tensorflow.keras.layers import Rescaling
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn.metrics import precision_score, accuracy_score, recall_score, confusion_matrix, ConfusionMatrixDisplay, classification_report


In [None]:
train_datagen = ImageDataGenerator(
    rotation_range=15,         # Rotate image up to 15 degrees
    width_shift_range=0.1,     # Shift image horizontally by 10% of width
    height_shift_range=0.1,    # Shift image vertically by 10% of height
    zoom_range=0.2,            # Zoom in or out by 20%
    horizontal_flip=True,      # Flip the image horizontally
    rescale=1./255             # Normalize pixel values between 0 and 1
)
valid_datagen = ImageDataGenerator(rescale=1./255)

train_gen = train_datagen.flow_from_directory("trafficdata_new/train",
                                         target_size=(256, 256),
                                         batch_size = 32,
                                        class_mode='categorical')
test_gen = valid_datagen.flow_from_directory("trafficdata_new/valid",
                                        target_size=(256, 256),
                                       class_mode='categorical',
                                       batch_size = 32,
                                       shuffle=False)


In [None]:
from tensorflow import keras
from tensorflow.keras.layers import Layer, GlobalAveragePooling2D, GlobalMaxPooling2D, Reshape, Dense, Multiply, Conv2D, Concatenate

# Channel Attention Module
class ChannelAttention(keras.layers.Layer):
    def __init__(self, channels, reduction=16, **kwargs):
        super(ChannelAttention, self).__init__(**kwargs)
        self.channels = channels
        self.reduction = reduction
        self.avg_pool = GlobalAveragePooling2D()
        self.max_pool = GlobalMaxPooling2D()
        self.fc1 = Dense(channels // reduction, activation="relu", use_bias=False)
        self.fc2 = Dense(channels, activation="sigmoid", use_bias=False)

    def call(self, inputs):
        avg_out = self.fc2(self.fc1(self.avg_pool(inputs)))
        max_out = self.fc2(self.fc1(self.max_pool(inputs)))
        out = avg_out + max_out
        return Multiply()([inputs, out])

    def get_config(self):
        config = super(ChannelAttention, self).get_config()
        config.update({
            "channels": self.channels,
            "reduction": self.reduction
        })
        return config

# Spatial Attention Module
class SpatialAttention(keras.layers.Layer):
    def __init__(self, **kwargs):
        super(SpatialAttention, self).__init__(**kwargs)
        self.conv = Conv2D(1, kernel_size=7, strides=1, padding="same", activation="sigmoid", use_bias=False)

    def call(self, inputs):
        avg_out = keras.layers.Lambda(lambda x: keras.backend.mean(x, axis=-1, keepdims=True))(inputs)
        max_out = keras.layers.Lambda(lambda x: keras.backend.max(x, axis=-1, keepdims=True))(inputs)
        concat = Concatenate(axis=-1)([avg_out, max_out])
        attention = self.conv(concat)
        return Multiply()([inputs, attention])

    def get_config(self):
        config = super(SpatialAttention, self).get_config()
        return config
    




In [None]:

# CNN Model with Attention
model = keras.Sequential()

# CNN Layers with Attention
model.add(keras.layers.Conv2D(16, (5, 5), activation="relu", padding="same", input_shape=(256, 256, 3)))
model.add(ChannelAttention(16))
model.add(SpatialAttention())
model.add(keras.layers.Conv2D(16, (5, 5), activation="relu", padding="same"))
model.add(keras.layers.MaxPooling2D(2,2))

model.add(keras.layers.Conv2D(32, (3, 3), activation="relu", padding="same"))
model.add(ChannelAttention(32))
model.add(SpatialAttention())
model.add(keras.layers.Conv2D(32, (3, 3), activation="relu", padding="same"))
model.add(keras.layers.MaxPooling2D(2,2))

model.add(keras.layers.Conv2D(64, (3, 3), activation="relu", padding="same"))
model.add(ChannelAttention(64))
model.add(SpatialAttention())
model.add(keras.layers.Conv2D(64, (3, 3), activation="relu", padding="same"))
model.add(keras.layers.MaxPooling2D(2,2))

model.add(keras.layers.Conv2D(128, (3, 3), activation="relu", padding="same"))
model.add(ChannelAttention(128))
model.add(SpatialAttention())
model.add(keras.layers.Conv2D(128, (3, 3), activation="relu", padding="same"))
model.add(keras.layers.MaxPooling2D(2,2))

model.add(keras.layers.Conv2D(256, (3, 3), activation="relu", padding="same"))
model.add(ChannelAttention(256))
model.add(SpatialAttention())
model.add(keras.layers.Conv2D(256, (3, 3), activation="relu", padding="same"))
model.add(keras.layers.MaxPooling2D(2,2))

model.add(keras.layers.Conv2D(512, (3, 3), activation="relu", padding="same"))
model.add(ChannelAttention(512))
model.add(SpatialAttention())
model.add(keras.layers.Conv2D(512, (3, 3), activation="relu", padding="same"))
model.add(keras.layers.GlobalMaxPooling2D())

# Flatten and Reshape for LSTM
model.add(keras.layers.Flatten())
# model.add(keras.layers.Reshape((32, -1)))  # Reshape to (timesteps, features) for LSTM

# # LSTM Layers
# model.add(keras.layers.LSTM(128, return_sequences=True,dropout=0.1)) 
# model.add(keras.layers.LSTM(64,dropout=0.1))

# Fully Connected Layers
# model.add(keras.layers.Dense(480, activation="relu"))
# model.add(keras.layers.Dropout(0.3))
model.add(keras.layers.Dense(160, activation="relu"))
model.add(keras.layers.Dropout(0.3))
model.add(keras.layers.Dense(6, activation="softmax"))

# Define Cosine Decay learning rate schedule
initial_learning_rate = 0.0005  # Starting LR
first_decay_steps = 230  # Number of training steps before LR reaches minimum
t_mul = 2
m_mul = 0.5
alpha = 0.3 # Minimum LR factor (final LR will be initial_lr * alpha)

lr_schedule = tf.keras.optimizers.schedules.CosineDecayRestarts(
    initial_learning_rate=initial_learning_rate,
    first_decay_steps=first_decay_steps,
    t_mul = t_mul,
    m_mul = m_mul,
    alpha=alpha,
)

# Compile the Model
opt = keras.optimizers.AdamW(learning_rate=0.0003)
model.compile(optimizer=opt, loss="categorical_crossentropy", metrics=['accuracy'])

model.summary()



In [None]:
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau

checkpoint = ModelCheckpoint(
    filepath="tflite_saves/final_model_gmpool.keras",  
    monitor="val_accuracy",      
    save_best_only=True,
    mode="max",               
    verbose=1                 
)



In [None]:
ep = 100
history = model.fit(train_gen,
          validation_data=test_gen,
          epochs = ep,
          callbacks=[checkpoint])


In [None]:
plt.figure(figsize = (20,5))
plt.subplot(1,2,1)
plt.title("Train and Validation Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.plot(history.history['loss'],label="Train Loss")
plt.plot(history.history['val_loss'], label="Validation Loss")
plt.legend()

plt.subplot(1,2,2)
plt.title("Train and Validation Accuracy")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.plot(history.history['accuracy'], label="Train Accuracy")
plt.plot(history.history['val_accuracy'], label="Validation Accuracy")
plt.legend()
plt.tight_layout()


In [None]:
from tensorflow.keras.models import load_model
custom_objects = {
    "ChannelAttention": ChannelAttention,
    "SpatialAttention": SpatialAttention
}

model = load_model("tflite_saves/final_model_gmpool.keras",custom_objects = custom_objects)


In [None]:
# tf.config.run_functions_eagerly(True)
labels = []
predictions = []
for x,y in test_gen:
    labels.append(tf.argmax(y,1).numpy())
    predictions.append(tf.argmax(model(x),1).numpy())


In [None]:
predictions = list(itertools.chain.from_iterable(predictions))
labels = list(itertools.chain.from_iterable(labels))


In [None]:
print("Train Accuracy  : {:.2f} %".format(history.history['accuracy'][-1]*100))
print("Test Accuracy   : {:.2f} %".format(accuracy_score(labels[:9912], predictions) * 100))
print("Precision Score : {:.2f} %".format(precision_score(labels[:9912], predictions, average='micro') * 100))
print("Recall Score    : {:.2f} %".format(recall_score(labels[:9912], predictions, average='micro') * 100))


In [None]:
plt.figure(figsize= (20,5))
cm = confusion_matrix(labels[:9912], predictions)
disp = ConfusionMatrixDisplay(confusion_matrix=cm,
                              display_labels=list(range(1,7)))
fig, ax = plt.subplots(figsize=(15,15))
disp.plot(ax=ax,colorbar= False,cmap = 'YlGnBu')
plt.title("Confusion Matrix")
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.show()
print(classification_report(labels[:9912], predictions))


In [None]:
import tensorflow as tf
import numpy as np
from tensorflow.keras.preprocessing.image import ImageDataGenerator

valid_datagen = ImageDataGenerator(rescale=1./255)


valid_gen = valid_datagen.flow_from_directory(
    "trafficdata_new/valid",  # Path to your validation data
    target_size=(256, 256),   # Resize images to match model input size
    batch_size=32,            # Batch size
    class_mode='categorical', # For classification tasks
    shuffle=False             # Do not shuffle the data
)
def representative_dataset():
    num_samples = 100  # Number of samples for the representative dataset
    for i, (images, _) in enumerate(valid_gen):
        for image in images:
            yield [image[np.newaxis, ...]]  # Add batch dimension
        if (i + 1) * valid_gen.batch_size >= num_samples:
            break

# Convert the model
converter = tf.lite.TFLiteConverter.from_keras_model("96_model.keras")
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = representative_dataset
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.int8  
converter.inference_output_type = tf.int8  
tflite_quant_model = converter.convert()

# Save the quantized model
with open('tflite_saves/model_full_quantised.tflite', 'wb') as f:
    f.write(tflite_quant_model)