In [None]:
!pip install -q git+https://github.com/qubvel/classification_models

In [None]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import tensorflow as tf
from imblearn.over_sampling import RandomOverSampler
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout
from tensorflow.keras.callbacks import EarlyStopping
from classification_models.tfkeras import Classifiers
VGG16, preprocess_input = Classifiers.get('vgg16')

In [34]:
import tensorflow as tf
print("Available GPUs:", tf.config.list_physical_devices('GPU'))



In [35]:
base_dir = "../../bttai-ajl-2025/train/train"
base_test_dir = "../../bttai-ajl-2025/train/train"
train_csv_path = "../../bttai-ajl-2025/train.csv"
test_csv_path = "../../bttai-ajl-2025/test.csv"

In [36]:
# Loading the Data 
train_df = pd.read_csv(train_csv_path)
test_df = pd.read_csv(test_csv_path)

# Add .jpg extension to md5hash column to reference the file_name
train_df['md5hash'] = train_df['md5hash'].astype(str) + '.jpg'
test_df['md5hash'] = test_df['md5hash'].astype(str) + '.jpg'

# Combine label and md5hash to form the correct path
train_df['file_path'] = train_df['label'] + '/' + train_df['md5hash']

In [37]:
# Function to check if image files exist
def check_image_paths(df, base_dir):
    missing_files = []
    existing_files = []
    
    for file_path in df['file_path']:
        full_path = os.path.join(base_dir, file_path)
        if not os.path.exists(full_path):
            missing_files.append(full_path)
        else:
            existing_files.append(full_path)
    
    # Print results
    print(f"\nTotal Missing Files: {len(missing_files)}")
    if missing_files:
        print("⚠️ First 5 Missing Files for Reference:")
        print(missing_files[:5])

    print(f"\nTotal Existing Files: {len(existing_files)}")
    if existing_files:
        print("✅ First 5 Existing Files for Reference:")
        print(existing_files[:5])

    return missing_files

# Run the file existence check
missing_train_files = check_image_paths(train_df, base_dir)

# Print sample data
print("\nTrain Data Sample:")
print(train_df.head())

print("\nTest Data Sample:")
print(test_df.head())



In [38]:
num_skipped = 0

for root, _, files in os.walk(base_dir):
    for fname in files:
        fpath = os.path.join(root, fname)
        
        # Check if the file is a JPEG (you can extend this check to other formats if needed)
        if fname.lower().endswith((".jpg", ".jpeg")):
            # print(f"Checking: {fpath}")
            
            try:
                with open(fpath, "rb") as fobj:
                    # Read the first 10 bytes to check for the JFIF marker
                    header = fobj.read(10)
                    is_jfif = b"JFIF" in header
            except Exception as e:
                print(f"Error reading file {fpath}: {e}")
                continue

            if not is_jfif:
                num_skipped += 1
                train_df.drop(train_df.index[train_df['md5hash'] == fname])
                # print(f"Removed corrupted image: {fpath}")

print(f"Total corrupted images removed: {num_skipped}")




In [39]:
plt.figure(figsize=(12, 6))
sns.countplot(y=train_df['label'], order=train_df['label'].value_counts().index)

plt.xlabel("Number of Images")
plt.ylabel("Class Labels")
plt.title("Label Distribution in Training Data")
plt.grid(axis='x', linestyle='--', alpha=0.7)
plt.show()



In [40]:
# Calculate counts per class
class_counts = train_df['label'].value_counts()
print(class_counts)

# Define a threshold (e.g. classes with fewer than 100 samples are underrepresented)
threshold = 100
underrepresented_classes = class_counts[class_counts < threshold].index.tolist()

print("Underrepresented classes:", underrepresented_classes)



In [41]:
duplicate_counts = train_df['md5hash'].duplicated().sum()

print(f"🔍 Duplicate Images Found: {duplicate_counts}")
if duplicate_counts > 0:
    print(train_df[train_df['md5hash'].duplicated(keep=False)].head(10))



In [42]:
# Check for missing values in each column
missing_values = train_df.isnull().sum()

print("🔍 Missing Values per Column:\n", missing_values[missing_values > 0])



In [43]:
# Check if image files exist
missing_files = []
for file_path in train_df['file_path']:
    full_path = os.path.join(base_dir, file_path)
    if not os.path.exists(full_path):
        missing_files.append(full_path)

# Print results
print(f"\n🔍 Total Missing Images: {len(missing_files)}")
if missing_files:
    print("⚠️ First 5 Missing Images for Reference:")
    print(missing_files[:5])



# Data Pre-processing

In [44]:
X_train = train_df.drop(columns=['label'])  # Features (excluding the label)
y_train = train_df['label']  # Target label

In [None]:
train_df.head()

In [None]:
# Check `qc` column type
print("🔍 QC Column Data Type:", X_train['qc'].dtype)

# Plot distribution of existing `qc` values (ignoring NaNs)
plt.figure(figsize=(8, 5))
sns.histplot(X_train['qc'].dropna(), bins=20, kde=True)
plt.title("Distribution of QC Column (Without NaNs)")
plt.xlabel("QC Values")
plt.ylabel("Frequency")
plt.grid()
plt.show()





In [47]:
def extract_qc_number(qc_value):
    if pd.isnull(qc_value):
        return None  # Keep NaN values
    return int(qc_value.split()[0])  # Extract numeric part (before space)

# Apply the function
X_train['qc'] = X_train['qc'].apply(extract_qc_number) # Now the qc only holds numbers [1-5] and nan

# Print unique values to verify conversion
print("✅ Unique QC Numeric Values:", X_train['qc'].unique())



# Creating new column `sample_weight`

In [None]:
def qc_to_weight(qc_value):
    """Assigns sample weights based on QC value."""
    if pd.isnull(qc_value):
        # Missing QC
        return 0.1
    elif qc_value == 1:
        # Diagnostic
        return 1.0
    elif qc_value == 5:
        # Potentially diagnostic
        return 0.8
    elif qc_value == 2:
        # Characteristic
        return 0.5
    elif qc_value == 3:
        # Wrongly labeled
        return 0.2
    elif qc_value == 4:
        # Undecided
        return 0.3
    else:
        # Fallback case if there's an unexpected QC value
        return 0.0

# Applying the function to create a sample_weight column:
X_train['sample_weight'] = X_train['qc'].apply(qc_to_weight)

In [49]:
print(X_train['sample_weight'].value_counts())



In [50]:
# Visualize the numeric QC distribution
plt.figure(figsize=(6,4))
sns.countplot(x='qc', data=X_train)
plt.title("Distribution of Numeric QC Values")
plt.show()

# Visualize sample_weight distribution
plt.figure(figsize=(6,4))
sns.countplot(x='sample_weight', data=X_train)
plt.title("Distribution of Sample Weights")
plt.show()





In [51]:
X_train.drop('qc', axis=1, inplace=True)
print(X_train.head())



In [52]:
# Check `sample_weight` column type
print("🔍 QC Column Data Type:", X_train['sample_weight'].dtype)

# Plot distribution of existing `qc` values (ignoring NaNs)
plt.figure(figsize=(8, 5))
sns.histplot(X_train['sample_weight'].dropna(), bins=20, kde=True)
plt.title("Distribution of sample_weight Column (0.2 is missing value)")
plt.xlabel("Sample Weight Values")
plt.ylabel("Frequency")
plt.grid()
plt.show()





In [53]:
X_train.head()



In [54]:
y_train.head()



In [55]:
# 1) Label encode y_train (which is currently string labels)
label_encoder = LabelEncoder()
y_train_encoded = label_encoder.fit_transform(y_train)  # array of ints

# 2) Create a single DataFrame that holds:
#    - file_path (string)
#    - label (numeric, after encoding)
#    - sample_weight (float)
df_train = X_train.copy()  # X_train has file_path, sample_weight, etc.
df_train['label'] = y_train_encoded.astype(np.int32)   # or int
df_train['sample_weight'] = df_train['sample_weight'].astype(np.float32)

# (Optional) remove columns you don't need, e.g. 'ddi_scale' or anything else:
if 'ddi_scale' in df_train.columns:
    df_train.drop('ddi_scale', axis=1, inplace=True)

# 3) Identify underrepresented classes in *numeric* form
#    underrepresented_classes is currently a list of strings
#    We transform them to their numeric code
underrepresented_classes_encoded = label_encoder.transform(underrepresented_classes)
underrepresented_classes_encoded = set(underrepresented_classes_encoded)  # for quick "in" checks

print("Underrepresented classes (string):", underrepresented_classes)
print("Underrepresented classes (encoded):", underrepresented_classes_encoded)

print(df_train.head())



In [56]:
print(df_train.dtypes)
print(df_train['label'])
print(df_train['sample_weight'])



In [57]:
# Baseline augmentation for all classes
baseline_aug_datagen = ImageDataGenerator(
    rescale=1./255,
    rotation_range=10,       # mild rotation
    width_shift_range=0.05,  # mild shift
    height_shift_range=0.05,
    # Add more if desired, e.g. brightness_range=[0.8, 1.2]
)

# Extra augmentation for underrepresented classes
minority_aug_datagen = ImageDataGenerator(
    rescale=1./255,
    rotation_range=30,       # heavier rotation
    width_shift_range=0.15,
    height_shift_range=0.15,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode='nearest',
    # Potentially color transformations, e.g. channel_shift_range=50
)

In [None]:
import cv2
import math

def custom_augment_generator(
    df,
    underrepresented_classes_encoded,
    baseline_aug_datagen,
    minority_aug_datagen,
    base_dir,
    batch_size=32,
    target_size=(224, 224),
    shuffle=True
):
    """
    Yields (X_batch, y_batch, w_batch):
      - baseline_aug_datagen for well-represented classes
      - minority_aug_datagen for underrepresented classes
      - with sample weights in w_batch
    """
    while True:
        if shuffle:
            df = df.sample(frac=1).reset_index(drop=True)
        
        for start in range(0, len(df), batch_size):
            batch_df = df.iloc[start:start+batch_size]
            
            images = []
            labels = []
            weights = []
            
            for _, row in batch_df.iterrows():
                # 1) Build full path
                full_path = os.path.join(base_dir, row['file_path'])
                
                # 2) Read image (BGR)
                img_bgr = cv2.imread(full_path)
                if img_bgr is None:
                    # File missing or corrupted, skip
                    continue
                
                # 3) Convert to RGB
                img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
                CapsNets 
                # 4) Resize
                img_rgb = cv2.resize(img_rgb, target_size)
                
                # 5) Force float32 BEFORE augmentation
                img_rgb = img_rgb.astype(np.float32)
                
                # 6) Decide which datagen to use
                if row['label'] in underrepresented_classes_encoded:
                    # heavier augmentation
                    img_aug = minority_aug_datagen.random_transform(img_rgb)
                    img_aug = minority_aug_datagen.standardize(img_aug)
                else:
                    # baseline augmentation
                    img_aug = baseline_aug_datagen.random_transform(img_rgb)
                    img_aug = baseline_aug_datagen.standardize(img_aug)
                
                images.append(img_aug)
                labels.append(row['label'])
                weights.append(row['sample_weight'])
            
            # Convert lists to numpy arrays
            X_batch = np.array(images, dtype=np.float32)
            y_batch = np.array(labels, dtype=np.int32)
            w_batch = np.array(weights, dtype=np.float32)
            
            # Yield triple so Keras interprets w_batch as sample weights
            yield (X_batch, y_batch, w_batch)

In [59]:
from tensorflow.keras.applications import ConvNeXtTiny
from tensorflow.keras.layers import GlobalAveragePooling2D, Dense
from tensorflow.keras.models import Model

# 1) Create the generator
train_gen = custom_augment_generator(
    df=df_train,
    underrepresented_classes_encoded=underrepresented_classes_encoded,
    baseline_aug_datagen=baseline_aug_datagen,
    minority_aug_datagen=minority_aug_datagen,
    base_dir=base_dir,          # e.g. ".../.../bttai-ajl-2025/train/train"
    batch_size=32,
    target_size=(224, 224),
    shuffle=True
)

# 2) Compute steps_per_epoch (No change)
steps_per_epoch = math.ceil(len(df_train) / 32)

# 3) Build the ConvNeXt-Tiny model
base_model = ConvNeXtTiny(weights="imagenet", include_top=False, input_shape=(224, 224, 3))

# Freeze the base model layers (optional)
base_model.trainable = False  # Set to True if fine-tuning

x = base_model.output
x = GlobalAveragePooling2D()(x)  # Pooling layer to reduce dimensions
num_classes = len(label_encoder.classes_)  # Number of output classes

# Add the classification head
predictions = Dense(num_classes, activation="softmax")(x)

# Create the full model
model = Model(inputs=base_model.input, outputs=predictions)

# 4) Compile the model (AdamW optimizer for stability)
model.compile(
    optimizer=tf.keras.optimizers.AdamW(learning_rate=1e-3),
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"]
)

from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint

callbacks = [
    EarlyStopping(monitor="val_loss", patience=5, restore_best_weights=True),
    ModelCheckpoint("best_model.h5", monitor="val_loss", save_best_only=True)
]

# 5) Train the model
history = model.fit(
    train_gen,
    steps_per_epoch=steps_per_epoch,
    epochs=10,
    callbacks=callbacks
)

# Fine-tuning
for layer in base_model.layers[-10:]:
    layer.trainable = True
    
# Lower learning rate for making smaller adjustments
model.compile(
    optimizer=tf.keras.optimizers.AdamW(learning_rate=1e-5),
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"]
)

history = model.fit(
    train_gen,
    steps_per_epoch=steps_per_epoch,
    epochs=20,
    callbacks=callbacks
)







In [None]:
# 1) After training finishes
model.save("my_model.h5")

print("✅ Model saved as my_model.h5")





In [None]:
# from tensorflow.keras.models import load_model

# model = load_model("my_model.h5")
# print("✅ Model loaded successfully.")





In [None]:
# # Suppose we have a fitted label_encoder
# # label_encoder.classes_ is an array like ['basal-cell-carcinoma', 'melanoma', ...]
# idx_to_label = {i: label for i, label in enumerate(label_encoder.classes_)}

In [None]:
# import os
# import numpy as np
# from tensorflow.keras.preprocessing.image import load_img, img_to_array

# predictions = []

# # The base_test_dir you mentioned:
# test_images_dir = base_test_dir  # e.g. "/kaggle/input/bttai-ajl-2025/test/test"

# for idx, row in test_df.iterrows():
#     md5_name = row["md5hash"]  # e.g. something.jpg
#     img_path = os.path.join(test_images_dir, md5_name)

#     if os.path.exists(img_path):
#         # Load and preprocess the image
#         image = load_img(img_path, target_size=(224, 224))
#         img_array = img_to_array(image)
#         img_array = img_array / 255.0  # Normalize to [0,1]
#         img_array = np.expand_dims(img_array, axis=0)  # shape (1,224,224,3)

#         # Model prediction
#         pred = model.predict(img_array)  # shape (1, num_classes)
#         pred_idx = np.argmax(pred[0])    # integer index
#         predicted_label = idx_to_label.get(pred_idx, "unknown")
#     else:
#         predicted_label = "unknown"

#     predictions.append(predicted_label)


In [None]:
# test_df["md5hash"] = test_df["md5hash"].str.replace(".jpg", "", regex=False)

In [None]:
# submission_df = pd.DataFrame({
#     "md5hash": test_df["md5hash"],
#     "label": predictions
# })

In [None]:
# submission_file_path = "submission.csv"
# submission_df.to_csv(submission_file_path, index=False)
# print(f"✅ Submission file saved: {submission_file_path}")

