<a href="https://colab.research.google.com/github/natitedros/WAVN-Federated-Learning/blob/main/CNN_Homing_v1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/drive')

import tensorflow as tf
import pandas as pd
import numpy as np
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from sklearn.model_selection import train_test_split

Mounted at /content/drive


In [2]:
CSV_PATH = '/content/drive/MyDrive/gazebo_dataset/labels.csv'
IMG_DIR = '/content/drive/MyDrive/gazebo_dataset/images/'

In [3]:
df = pd.read_csv(CSV_PATH)
print(f"Total samples: {len(df)}")
print(f"\nFirst few rows:\n{df.head()}")
print(f"\nDirection counts:\n{df['direction'].value_counts()}")

Total samples: 33

First few rows:
     current_image    destination_image direction
0  001_current.png  001_destination.png      left
1  002_current.png  002_destination.png     right
2  003_current.png  003_destination.png     right
3  004_current.png  004_destination.png      left
4  005_current.png  005_destination.png      left

Direction counts:
direction
left        12
right       12
backward     5
forward      4
Name: count, dtype: int64


In [4]:
direction_map = {'forward': 0, 'backward': 1, 'left': 2, 'right': 3}
df['direction_label'] = df['direction'].map(direction_map)

In [5]:
train_df, val_df = train_test_split(df, test_size=0.2, random_state=42)
print(f"\nTraining samples: {len(train_df)}")
print(f"Validation samples: {len(val_df)}")


Training samples: 26
Validation samples: 7


In [6]:
# Function to load image
def load_image(img_path):
    img = load_img(IMG_DIR + img_path, target_size=(224, 224))
    img = img_to_array(img) / 255.0  # Normalize to [0, 1]
    return img

In [7]:
# Creating the dataset
def create_dataset(dataframe, batch_size=32):
    current_images = []
    dest_images = []
    labels = []

    for idx, row in dataframe.iterrows():
        current_img = load_image(row['current_image'])
        dest_img = load_image(row['destination_image'])

        current_images.append(current_img)
        dest_images.append(dest_img)
        labels.append(row['direction_label'])

    current_images = np.array(current_images)
    dest_images = np.array(dest_images)
    labels = np.array(labels)

    return current_images, dest_images, labels

In [8]:
print("\nLoading training data...")
X_train_current, X_train_dest, y_train = create_dataset(train_df)

print("Loading validation data...")
X_val_current, X_val_dest, y_val = create_dataset(val_df)

print(f"\nTraining data shapes:")
print(f"Current images: {X_train_current.shape}")
print(f"Destination images: {X_train_dest.shape}")
print(f"Labels: {y_train.shape}")


Loading training data...
Loading validation data...

Training data shapes:
Current images: (26, 224, 224, 3)
Destination images: (26, 224, 224, 3)
Labels: (26,)


In [9]:
from tensorflow.keras import layers, models

def create_model(num_classes=4):
    # Input for current image
    current_input = layers.Input(shape=(224, 224, 3), name='current_image')

    # Input for destination image
    dest_input = layers.Input(shape=(224, 224, 3), name='destination_image')

    # Shared CNN layers
    def cnn_branch(x):
        x = layers.Conv2D(32, (3, 3), activation='relu')(x)
        x = layers.MaxPooling2D((2, 2))(x)
        x = layers.Conv2D(64, (3, 3), activation='relu')(x)
        x = layers.MaxPooling2D((2, 2))(x)
        x = layers.Conv2D(64, (3, 3), activation='relu')(x)
        x = layers.MaxPooling2D((2, 2))(x)
        x = layers.Flatten()(x)
        return x

    # Process both images
    current_features = cnn_branch(current_input)
    dest_features = cnn_branch(dest_input)

    # Combine features
    combined = layers.concatenate([current_features, dest_features])

    # Dense layers
    x = layers.Dense(128, activation='relu')(combined)
    x = layers.Dropout(0.5)(x)
    output = layers.Dense(num_classes, activation='softmax')(x)

    # Create model
    model = models.Model(inputs=[current_input, dest_input], outputs=output)

    return model

In [10]:
model = create_model(num_classes=4)
model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

model.summary()

# Training the model
history = model.fit(
    [X_train_current, X_train_dest],
    y_train,
    validation_data=([X_val_current, X_val_dest], y_val),
    epochs=10,
    batch_size=32
)

Epoch 1/10
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 9s/step - accuracy: 0.1923 - loss: 1.4132 - val_accuracy: 0.2857 - val_loss: 7.5359
Epoch 2/10
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 6s/step - accuracy: 0.4231 - loss: 5.4828 - val_accuracy: 0.2857 - val_loss: 4.4367
Epoch 3/10
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 5s/step - accuracy: 0.4231 - loss: 3.4964 - val_accuracy: 0.2857 - val_loss: 1.5245
Epoch 4/10
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 4s/step - accuracy: 0.5000 - loss: 1.2861 - val_accuracy: 0.4286 - val_loss: 1.3711
Epoch 5/10
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 7s/step - accuracy: 0.5385 - loss: 1.1511 - val_accuracy: 0.4286 - val_loss: 1.3439
Epoch 6/10
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 4s/step - accuracy: 0.8462 - loss: 0.8094 - val_accuracy: 0.2857 - val_loss: 1.6600
Epoch 7/10
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m

In [12]:
# Save the model
model.save('/content/drive/MyDrive/Models/navigation_model_v1.h5')
print("\nModel saved to Google Drive!")




Model saved to Google Drive!


In [14]:
# Step 14: Function to test specific image pairs
def test_prediction(current_img_name, dest_img_name):
    """
    Test prediction for specific images by name

    Args:
        current_img_name: Name of current image (e.g., 'current_001.jpg')
        dest_img_name: Name of destination image (e.g., 'destination_015.jpg')
    """
    direction_names = {0: 'forward', 1: 'backward', 2: 'left', 3: 'right'}

    # Load the images
    current_img = load_image(current_img_name)
    dest_img = load_image(dest_img_name)

    # Add batch dimension
    current_img = np.expand_dims(current_img, axis=0)
    dest_img = np.expand_dims(dest_img, axis=0)

    # Make prediction
    prediction = model.predict([current_img, dest_img], verbose=0)
    predicted_label = np.argmax(prediction)
    confidence = prediction[0][predicted_label] * 100

    print(f"\n{'='*50}")
    print(f"Current image: {current_img_name}")
    print(f"Destination image: {dest_img_name}")
    print(f"Predicted direction: {direction_names[predicted_label]}")
    print(f"Confidence: {confidence:.2f}%")
    print(f"\nAll probabilities:")
    for label, prob in enumerate(prediction[0]):
        print(f"  {direction_names[label]}: {prob*100:.2f}%")
    print(f"{'='*50}")

    return direction_names[predicted_label]

In [16]:
test_prediction('020_destination.png', '018_destination.png')


Current image: 020_destination.png
Destination image: 018_destination.png
Predicted direction: right
Confidence: 42.89%

All probabilities:
  forward: 3.44%
  backward: 39.24%
  left: 14.43%
  right: 42.89%


'right'