In [None]:
import os
import pandas as pd

# Root directory
dataset_dir = "/kaggle/input/cattle-weight-detection-model-dataset-12k/www.acmeai.tech Dataset - BMGF-LivestockWeight-CV/Pixel"

data = []

# Recursively walk through all directories
for root, dirs, files in os.walk(dataset_dir):
    if os.path.basename(root).lower() == "images":
        for filename in files:
            if filename.lower().endswith(('.jpg', '.jpeg', '.png')):
                parts = filename.split('_')
                try:
                    if len(parts) == 5:
                        if "b4" in parts[1].lower():
                            # B4 format
                            view = parts[2].lower()
                            weight = float(parts[3])
                            gender = parts[4].split('.')[0]
                        else:
                            # B2 format
                            view = parts[1].lower()
                            weight = float(parts[2])
                            gender = parts[4].split('.')[0]
                    elif len(parts) == 4:
                        # B3 format
                        view = parts[1].lower()
                        weight = float(parts[2])
                        gender = parts[3].split('.')[0]
                    else:
                        print(f"Skipping unknown format: {filename}")
                        continue
                    
                    file_path = os.path.join(root, filename)
                    data.append((file_path, weight, view, gender))
                
                except (IndexError, ValueError) as e:
                    print(f"Skipping invalid filename: {filename} -> {e}")


df = pd.DataFrame(data, columns=['image_path', 'weight', 'view', 'gender'])

print("Total valid images found:", len(df))
df.head()


In [None]:
# FOR SEPARATE MODEL
side_df = df[df['view'] == 's'].reset_index(drop=True)
rear_df = df[df['view'] == 'r'].reset_index(drop=True)

side_df.head()
rear_df.head()

In [None]:
import os
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models
from sklearn.model_selection import train_test_split

**FOR COMBINED MODEL**

In [None]:

from sklearn.model_selection import StratifiedShuffleSplit

# Create stratified split object (first split: train vs temp)
sss1 = StratifiedShuffleSplit(n_splits=1, test_size=0.4, random_state=42)
for train_idx, temp_idx in sss1.split(df, df['view']):
    train_df = df.iloc[train_idx]
    temp_df = df.iloc[temp_idx]

# Now split temp into validation and test (half-half)
sss2 = StratifiedShuffleSplit(n_splits=1, test_size=0.5, random_state=42)
for val_idx, test_idx in sss2.split(temp_df, temp_df['view']):
    val_df = temp_df.iloc[val_idx]
    test_df = temp_df.iloc[test_idx]

# Check sizes
print(f"Train size: {len(train_df)}")
print(f"Validation size: {len(val_df)}")
print(f"Test size: {len(test_df)}")

# Optional: Check if proportions of views are preserved
print("\nTrain view distribution:\n", train_df['view'].value_counts(normalize=True))
print("\nValidation view distribution:\n", val_df['view'].value_counts(normalize=True))
print("\nTest view distribution:\n", test_df['view'].value_counts(normalize=True))


**FOR SEPARATE MODEL**

In [None]:
# For side view
train_side, temp_side = train_test_split(side_df, test_size=0.4, random_state=42)
val_side, test_side = train_test_split(temp_side, test_size=0.5, random_state=42)

# For front view
train_rear, temp_rear = train_test_split(rear_df, test_size=0.4, random_state=42)
val_rear, test_rear = train_test_split(temp_rear, test_size=0.5, random_state=42)

# Check sizes
print("FOR SIDE")
print(f"Train size: {len(train_side)}")
print(f"Validation size: {len(val_side)}")
print(f"Test size: {len(test_side)}")

print("FOR REAR")
print(f"Train size: {len(train_rear)}")
print(f"Validation size: {len(val_rear)}")
print(f"Test size: {len(test_rear)}")

In [None]:

# --- 2. Data loading and preprocessing ---

IMG_SIZE = (128, 128)
BATCH_SIZE = 32
AUTOTUNE = tf.data.AUTOTUNE

def process_path(file_path, label):
    # Read and decode
    img = tf.io.read_file(file_path)
    img = tf.image.decode_jpeg(img, channels=3)

    # Resize and normalize
    img = tf.image.resize(img, IMG_SIZE)
    img = tf.image.convert_image_dtype(img, tf.float32)

    # Cast label for regression
    label = tf.cast(label, tf.float32)

    return img, label


**FOR COMBINED MODEL**

In [None]:
def df_to_dataset(dataframe, shuffle=True, batch_size=BATCH_SIZE):
    file_paths = dataframe['image_path'].values
    labels = dataframe['weight'].values.astype(np.float32)
    ds = tf.data.Dataset.from_tensor_slices((file_paths, labels))
    ds = ds.map(process_path, num_parallel_calls=AUTOTUNE)
    if shuffle:
        ds = ds.shuffle(buffer_size=len(dataframe))
    ds = ds.batch(batch_size)
    ds = ds.prefetch(buffer_size=AUTOTUNE)
    return ds

train_ds = df_to_dataset(train_df)
val_ds = df_to_dataset(val_df, shuffle=False)
test_ds = df_to_dataset(test_df, shuffle=False)


**FOR SEPARATE MODEL**

In [None]:
def df_to_dataset(dataframe, shuffle=True, batch_size=BATCH_SIZE):
    file_paths = dataframe['image_path'].values
    labels = dataframe['weight'].values.astype(np.float32)
    ds = tf.data.Dataset.from_tensor_slices((file_paths, labels))
    ds = ds.map(process_path, num_parallel_calls=AUTOTUNE)
    if shuffle:
        ds = ds.shuffle(buffer_size=len(dataframe))
    ds = ds.batch(batch_size)
    ds = ds.prefetch(buffer_size=AUTOTUNE)
    return ds

train_side_ds = df_to_dataset(train_side)
val_side_ds = df_to_dataset(val_side, shuffle=False)
test_side_ds = df_to_dataset(test_side, shuffle=False)

train_rear_ds = df_to_dataset(train_rear)
val_rear_ds = df_to_dataset(val_rear, shuffle=False)
test_rear_ds = df_to_dataset(test_rear, shuffle=False)

In [None]:
# --- 3. Define a simple CNN regression model ---

from tensorflow.keras import layers, models, Input
import tensorflow as tf

def residual_block(x, filters):
    shortcut = x
    x = layers.Conv2D(filters, (3, 3), padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)

    x = layers.Conv2D(filters, (3, 3), padding='same')(x)
    x = layers.BatchNormalization()(x)

    # If shortcut doesn't match filter size, use 1x1 conv to project it
    if shortcut.shape[-1] != filters:
        shortcut = layers.Conv2D(filters, (1, 1), padding='same')(shortcut)
        shortcut = layers.BatchNormalization()(shortcut)

    x = layers.Add()([x, shortcut])
    x = layers.Activation('relu')(x)
    return x

def build_model():
    inputs = Input(shape=(*IMG_SIZE, 3))

    # Initial conv layer
    x = layers.Conv2D(32, (3, 3), padding='same')(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.MaxPooling2D()(x)

    # Residual Block 1
    x = residual_block(x, 64)
    x = layers.MaxPooling2D()(x)
    x = layers.Dropout(0.2)(x)

    # Residual Block 2
    x = residual_block(x, 128)
    x = layers.MaxPooling2D()(x)

    # Residual Block 3
    x = residual_block(x, 256)
    x = layers.MaxPooling2D()(x)
    x = layers.Dropout(0.2)(x)

    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dense(256, activation='relu')(x)
    x = layers.Dropout(0.5)(x)
    outputs = layers.Dense(1)(x)  # Regression output

    model = models.Model(inputs=inputs, outputs=outputs)

    model.compile(
        optimizer='adam',
        loss='mse',
        metrics=[tf.keras.metrics.RootMeanSquaredError(name='rmse')]
    )

    return model

model = build_model()
model.summary()


**FOR COMBINED MODEL**

In [None]:
from tensorflow.keras.callbacks import ModelCheckpoint
import matplotlib.pyplot as plt

# Save the best model based on validation loss
checkpoint = ModelCheckpoint(
    'best_model.h5',
    monitor='val_loss',
    verbose=1,
    save_best_only=True,
    mode='min'
)

earlystop_cb = tf.keras.callbacks.EarlyStopping(monitor='val_loss', mode='min', patience=5, restore_best_weights=True, verbose=1)

# Train the model
history = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=20,
    callbacks=[checkpoint, earlystop_cb]
)

# Plot training & validation loss
plt.figure(figsize=(10, 5))
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss (MSE)')
plt.title('Training and Validation Loss over Epochs')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.savefig('Training and Validation Loss over Epochs.png')
plt.show()


**FOR SEPARATE MODEL**

In [None]:
from tensorflow.keras.callbacks import ModelCheckpoint
import matplotlib.pyplot as plt

def train_and_plot(model, train_ds, val_ds, view_name):
    # Define checkpoint path per view
    checkpoint = ModelCheckpoint(
        f'best_model_{view_name}.h5',
        monitor='val_loss',
        verbose=0,
        save_best_only=True,
        mode='min'
    )

    # Train the model
    history = model.fit(
        train_ds,
        validation_data=val_ds,
        epochs=30,
        callbacks=[checkpoint]
    )

    # Plot training & validation loss
    plt.figure(figsize=(10, 5))
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss (MSE)')
    plt.title(f'{view_name.capitalize()} View - Training and Validation Loss')
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.savefig(f'{view_name}_loss_plot.png')
    plt.show()

    return history

# --- Train model for side view ---
model_side = build_model()
history_side = train_and_plot(model_side, train_side_ds, val_side_ds, 'side')

# --- Train model for rear view ---
model_rear = build_model()
history_rear = train_and_plot(model_rear, train_rear_ds, val_rear_ds, 'rear')


**FOR COMBINED MODEL**

In [None]:
from tensorflow.keras.models import load_model
from tensorflow.keras.losses import MeanSquaredError
from tensorflow.keras.metrics import RootMeanSquaredError

best_model = load_model(
    'best_model.h5',
    compile=False  # <- avoid automatic compile to skip error
)

# Compile again manually
best_model.compile(
    optimizer='adam',
    loss=MeanSquaredError(),
    metrics=[RootMeanSquaredError(name='rmse')]
)


results = []
i = 0

for batch_images, batch_labels in test_ds:
    preds = best_model.predict(batch_images).flatten()
    actuals = batch_labels.numpy().flatten()
    
    for j in range(len(preds)):
        results.append({
            'filename': test_df.iloc[i]['image_path'],
            'actual_weight': actuals[j],
            'predicted_weight': preds[j]
        })
        i += 1

# Save results to CSV
results_df = pd.DataFrame(results)
results_df.to_csv('test_predictions.csv', index=False)

print("âœ… Test predictions saved to 'test_predictions.csv'")


In [None]:
from sklearn.metrics import mean_squared_error
import numpy as np

rmse = mean_squared_error(results_df['actual_weight'], results_df['predicted_weight'], squared=False)
print(f"Test RMSE: {rmse:.4f}")


**FOR SEPARATE MODEL**

In [None]:
from tensorflow.keras.models import load_model
from tensorflow.keras.losses import MeanSquaredError
from tensorflow.keras.metrics import RootMeanSquaredError
from sklearn.metrics import mean_squared_error
import pandas as pd

def evaluate_model(model_path, test_ds, test_df, view_name):
    # Load model
    model = load_model(model_path, compile=False)
    model.compile(
        optimizer='adam',
        loss=MeanSquaredError(),
        metrics=[RootMeanSquaredError(name='rmse')]
    )

    results = []
    i = 0

    for batch_images, batch_labels in test_ds:
        preds = model.predict(batch_images).flatten()
        actuals = batch_labels.numpy().flatten()

        for j in range(len(preds)):
            results.append({
                'filename': test_df.iloc[i]['image_path'],
                'actual_weight': actuals[j],
                'predicted_weight': preds[j]
            })
            i += 1

    results_df = pd.DataFrame(results)
    results_df.to_csv(f'test_predictions_{view_name}.csv', index=False)

    rmse = mean_squared_error(results_df['actual_weight'], results_df['predicted_weight'], squared=False)
    print(f"âœ… Test predictions saved to 'test_predictions_{view_name}.csv'")
    print(f"ðŸ“Š {view_name.capitalize()} View - Test RMSE: {rmse:.4f}\n")

    return results_df, rmse


# --- Evaluate for Side View ---
results_side_df, rmse_side = evaluate_model(
    'best_model_side.h5', test_side_ds, test_side, 'side'
)

# --- Evaluate for Rear View ---
results_rear_df, rmse_rear = evaluate_model(
    'best_model_rear.h5', test_rear_ds, test_rear, 'rear'
)


**FOR COMBINED MODEL**

In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(8,6))
plt.scatter(results_df['actual_weight'], results_df['predicted_weight'], alpha=0.5)
plt.xlabel('Actual Weight')
plt.ylabel('Predicted Weight')
plt.title('Actual vs Predicted Weight')
plt.grid(True)
plt.plot([results_df['actual_weight'].min(), results_df['actual_weight'].max()],
         [results_df['actual_weight'].min(), results_df['actual_weight'].max()],
         'r--')  # Ideal line

plt.savefig('Actual vs Predicted Weight.png')
plt.show()

**FOR SEPARATE MODEL**

In [None]:
import matplotlib.pyplot as plt

def plot_actual_vs_predicted(results_df, view_name):
    plt.figure(figsize=(8,6))
    plt.scatter(results_df['actual_weight'], results_df['predicted_weight'], alpha=0.5)
    plt.xlabel('Actual Weight')
    plt.ylabel('Predicted Weight')
    plt.title(f'{view_name.capitalize()} View: Actual vs Predicted Weight')
    plt.grid(True)

    min_val = min(results_df['actual_weight'].min(), results_df['predicted_weight'].min())
    max_val = max(results_df['actual_weight'].max(), results_df['predicted_weight'].max())

    plt.plot([min_val, max_val], [min_val, max_val], 'r--', label='Ideal Prediction')
    plt.legend()

    plt.tight_layout()
    filename = f'Actual_vs_Predicted_Weight_{view_name}.png'
    plt.savefig(filename)
    plt.show()
    print(f"ðŸ“ˆ Plot saved to {filename}\n")

# --- Plot for Side View ---
plot_actual_vs_predicted(results_side_df, 'side')

# --- Plot for Rear View ---
plot_actual_vs_predicted(results_rear_df, 'rear')
