In [3]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
from PIL import Image
import os
import matplotlib.pyplot as plt
from sklearn.preprocessing import MultiLabelBinarizer
from collections import Counter



In [4]:
import os
import pandas as pd
from PIL import Image

image_dir = '/kaggle/input/draft-safety/train/train_safety'
csv_path = "/kaggle/input/draft-safety/train/train_annotation.csv"

df = pd.read_csv(csv_path)

for image_name in os.listdir(image_dir):
    if image_name.endswith('.jpg'):
        image_id = image_name.replace('.jpg', '')
        labels = df[df['image_id'] == image_id]['labels'].values[0]
        img_path = os.path.join(image_dir, image_name)
        image = Image.open(img_path)

        #print(f"\nImage: {image_name}")
        #print(f"Size: {image.size}")
        #print(f"Labels: {labels}")

        # Optional: Show first 3 images
        '''if os.listdir(image_dir).index(image_name) < 3:
            image.show()'''


In [5]:
from sklearn.preprocessing import MultiLabelBinarizer

all_labels = ['person', 'red_hat', 'yellow_hat', 'blue_hat', 'vest', 'white_hat']
df['label_list'] = df['labels'].str.split()

mlb = MultiLabelBinarizer(classes=all_labels)
binary_labels = mlb.fit_transform(df['label_list'])
print(binary_labels)


[[1 1 1 0 0 0]
 [1 0 0 1 0 0]
 [1 0 1 0 1 0]
 ...
 [1 0 0 0 1 1]
 [1 1 0 0 0 0]
 [1 1 1 0 0 1]]


In [6]:
image_sizes=[]
for img_name in os.listdir(image_dir):
    if img_name.endswith(".jpg"):
        img_path = os.path.join(image_dir, img_name)
        with Image.open(img_path) as img:
            image_sizes.append(img.size)

output_dir = '/kaggle/working/resized_images'
os.makedirs(output_dir, exist_ok=True)

target_size = (224, 224)

for img_name in os.listdir(image_dir):
    if img_name.endswith('.jpg'):
        img_path = os.path.join(image_dir, img_name)
        img = Image.open(img_path)
        img_resized = img.resize(target_size)
        img_resized.save(os.path.join(output_dir, img_name))

In [7]:
import torch
from torch.utils.data import Dataset
import torchvision.transforms as T
from PIL import Image
import os

class SafetyDataset(Dataset):
    def __init__(self, df, image_dir, labels=None, transform=None):
        self.df = df
        self.image_dir = image_dir
        self.labels = labels  # Labels can be None for test dataset
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, index):
        image_id = self.df.iloc[index]['image_id']  # Get image ID
        img_path = os.path.join(self.image_dir, f"{image_id}.jpg")  # Build image path
        image = Image.open(img_path).convert('RGB')  # Open image

        if self.transform:
            image = self.transform(image)  # Apply transformations

        # Handle case where labels are None (test dataset)
        if self.labels is not None:
            label = torch.tensor(self.labels[index]).float()  # Convert labels to tensor
            return image, label
        else:
            return image, image_id  # Return image and image_id for test dataset

In [8]:
from torchvision import transforms

# This is the transform object we are modifying
transform = transforms.Compose([
    # --- ADD DATA AUGMENTATION HERE ---
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
    # --- END OF AUGMENTATION ---
    
    transforms.ToTensor(),  # convert image to tensor with shape [C, H, W] and scale to [0, 1]
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])  # normalize RGB values
])

dataset = SafetyDataset(df, '/kaggle/working/resized_images', binary_labels, transform=transform)
img,lbl=dataset[0]
print(img.shape,lbl)
print("Original:", df.iloc[0]['labels'])
print("Binary:  ", labels[0])


torch.Size([3, 224, 224]) tensor([1., 1., 1., 0., 0., 0.])
Original: person red_hat yellow_hat
Binary:   p


In [9]:
from torch.utils.data import DataLoader
dataloader=DataLoader(dataset,batch_size=32,shuffle=True)
for images, labels in dataloader:
    print("Image batch shape:", images.shape)   # Should be [32, 3, 224, 224]
    print("Label batch shape:", labels.shape)   # Should be [32, 6]
    break


Image batch shape: torch.Size([32, 3, 224, 224])
Label batch shape: torch.Size([32, 6])


In [10]:
from sklearn.model_selection import train_test_split
train_df, temp_df, train_labels, temp_labels = train_test_split(
    df, binary_labels, test_size=0.3, random_state=42)
#split temp into validation and test 
val_df, test_df, val_labels, test_labels = train_test_split(
    temp_df, temp_labels, test_size=0.5, random_state=42)

In [11]:
# Import PyTorch's neural network module
import torch.nn as nn

# Define a class called SafetyCNN, which is your model
class SafetyCNN(nn.Module):
    def __init__(self):
        # Call the parent class constructor
        super(SafetyCNN, self).__init__()

        # 🔹 1st Convolution Layer: input image with 3 channels (RGB), output 16 feature maps
        self.conv1 = nn.Conv2d(3, 16, 3, padding=1)
        self.relu1 = nn.ReLU()
        self.pool1 = nn.MaxPool2d(2, 2)  # Downsample (reduce) the image by half

        # 🔹 2nd Convolution Layer: from 16 input channels to 32
        self.conv2 = nn.Conv2d(16, 32, 3, padding=1)
        self.relu2 = nn.ReLU()
        self.pool2 = nn.MaxPool2d(2, 2)
        nn.BatchNorm2d(16),  # After Conv2d(3, 16, ...)

        # 🔹 3rd Convolution Layer: from 32 input channels to 64
        self.conv3 = nn.Conv2d(32, 64, 3, padding=1)
        self.relu3 = nn.ReLU()
        self.pool3 = nn.MaxPool2d(2, 2)

        nn.Conv2d(64, 128, 3, padding=1),
        nn.ReLU(),
        nn.MaxPool2d(2, 2),

        # 🔹 Flatten the output into a single long vector so we can use a fully connected layer
        self.flatten = nn.Flatten()
        nn.Dropout(0.5),  # Apply before the Linear layer

        # 🔹 Fully connected layer: input is 64 * 28 * 28 (after all the pooling), output is 6 values (for 6 labels)
        self.fc = nn.Linear(64 * 28 * 28, 6)

        # 🔹 Sigmoid activation: gives 6 values between 0 and 1 (multi-label probabilities)
        self.sigmoid = nn.Sigmoid()

    # 🔁 Forward function: tells PyTorch how to pass input through the model
    def forward(self, x):
        #print(f"[Input]         {x.shape}")

        x = self.pool1(self.relu1(self.conv1(x)))
        #print(f"[After Conv1]   {x.shape}")

        x = self.pool2(self.relu2(self.conv2(x)))
       # print(f"[After Conv2]   {x.shape}")

        x = self.pool3(self.relu3(self.conv3(x)))
        #print(f"[After Conv3]   {x.shape}")

        x = self.flatten(x)
        #print(f"[After Flatten] {x.shape}")

        x = self.fc(x)
       # print(f"[After FC]      {x.shape}")

        x = self.sigmoid(x)
       # print(f"[Output]        {x.shape}")

        return x
# 1. Create an instance of your model
model = SafetyCNN()

# 2. Create a dummy input to test the model
# Shape: [1, 3, 224, 224] → batch of 1 image, 3 channels (RGB), 224x224
dummy_input = torch.randn(1, 3, 224, 224)

# 3. Pass the dummy input through the model
output = model(dummy_input)

# CNN model for multi-label classification of safety gear in images.
# Input: RGB image of size [3, 224, 224]
# Output: 6 probabilities (one for each label: person, red_hat, etc.)

# Model structure:
# - Conv2d + ReLU + MaxPool: learns low-level features (edges, shapes)
# - Repeated 3 times with more filters to learn complex patterns
# - Flatten: turns feature maps into a long vector
# - Linear: fully connected layer outputs 6 scores
# - Sigmoid: converts scores into probabilities for each label


In [12]:
train_dataset = SafetyDataset(train_df, '/kaggle/working/resized_images', train_labels, transform=transform)
val_dataset   = SafetyDataset(val_df,   '/kaggle/working/resized_images', val_labels,   transform=transform)
from torch.utils.data import DataLoader

import torch.nn as nn
import torch.optim as optim

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader   = DataLoader(val_dataset, batch_size=32, shuffle=False)
criterion = nn.BCELoss()  # Binary Cross Entropy for multi-label classification
optimizer = optim.Adam(model.parameters(), lr=0.001)

num_epochs = 10
best_val_loss=float('inf')
patience=2
counter=0
for epoch in range(num_epochs):  # You can define num_epochs above
    model.train()
    running_loss = 0.0

    for images, labels in train_loader:
        outputs = model(images)
        loss = criterion(outputs, labels.float())
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    avg_loss = running_loss / len(train_loader)

    
    # --- Validation ---

    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for images, labels in val_loader:
            outputs = model(images)
            loss = criterion(outputs, labels.float())
            val_loss += loss.item()
    avg_val_loss = val_loss / len(val_loader)
    
    print(f"Epoch [{epoch+1}/{num_epochs}] - Train Loss: {avg_loss:.4f} - Val Loss: {avg_val_loss:.4f}")

    #early stopping
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        counter = 0
        torch.save(model.state_dict(), 'best_model.pth')  # Save the best model
    else:
        counter += 1
        if counter >= patience:
            print("Early stopping triggered.")
            break



Epoch [1/10] - Train Loss: 0.5305 - Val Loss: 0.4914
Epoch [2/10] - Train Loss: 0.4750 - Val Loss: 0.4309
Epoch [3/10] - Train Loss: 0.4352 - Val Loss: 0.4279
Epoch [4/10] - Train Loss: 0.4087 - Val Loss: 0.4145
Epoch [5/10] - Train Loss: 0.3950 - Val Loss: 0.4104
Epoch [6/10] - Train Loss: 0.3749 - Val Loss: 0.4112
Epoch [7/10] - Train Loss: 0.3529 - Val Loss: 0.3954
Epoch [8/10] - Train Loss: 0.3322 - Val Loss: 0.3975
Epoch [9/10] - Train Loss: 0.3066 - Val Loss: 0.3981
Early stopping triggered.


In [13]:
# Define the test dataset
test_dataset = SafetyDataset(test_df, '/kaggle/working/resized_images', test_labels, transform=transform)

# Create the test DataLoader
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Load the best model
model.load_state_dict(torch.load('best_model.pth'))
model.eval()

# Test the model on the test dataset
correct_predictions = 0
total_labels = 0

with torch.no_grad():
    for images, labels in test_loader:
        outputs = model(images)
        preds = (outputs > 0.5).float()  # Convert probabilities to binary predictions
        correct_predictions += (preds == labels).sum().item()
        total_labels += labels.numel()
        
# Calculate accuracy
test_accuracy = (correct_predictions / total_labels) * 100
print(f"Test Accuracy: {test_accuracy:.2f}%")

Test Accuracy: 80.53%


In [25]:
from torchvision import transforms

# 🔧 Define test transform with resizing
test_transform = transforms.Compose([
    transforms.Resize((224, 224)),  # 👈 Resize all test images to 224x224
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5]*3, std=[0.5]*3)
])

# 📂 Test image directory
test_dir = '/kaggle/input/draft-safety/test_safety/test_safety'

# 🧾 List of test image file names
test_filenames = sorted([f for f in os.listdir(test_dir) if f.endswith('.jpg')])
test_df = pd.DataFrame({'image_id': [f.replace('.jpg', '') for f in test_filenames]})

# 🧠 Create test dataset and loader
test_dataset = SafetyDataset(test_df, test_dir, labels=[[0]*6]*len(test_df), transform=test_transform)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# 🔍 Run inference
model.eval()
predictions = []

with torch.no_grad():
    for images, _ in test_loader:
        outputs = model(images)
        preds = (outputs > 0.5).int().tolist()
        predictions.extend(preds)

# 📝 Prepare submission file
submission = pd.DataFrame()
submission['image_id'] = test_df['image_id']
submission['labels'] = [' '.join([label for i, label in enumerate(all_labels) if p[i]]) for p in predictions]
submission.to_csv('submission.csv', index=False)

print("✅ submission.csv ready!")


✅ submission.csv ready!


In [15]:
import torch
import torch.nn as nn
from torchvision import models, transforms
from torch.utils.data import DataLoader, Dataset
from PIL import Image
import os
import pandas as pd


In [16]:
resnet50 = models.resnet50(weights=None)
state_dict = torch.load("/kaggle/input/resnet/pytorch/default/1/resnet50-11ad3fa6.pth")
resnet50.load_state_dict(state_dict)


<All keys matched successfully>

In [17]:
# ✅ Step 1: Get input size before replacing
num_features = resnet50.fc.in_features

# ✅ Step 2: Freeze pretrained layers
for param in resnet50.parameters():
    param.requires_grad = False

# ✅ Step 3: Replace classifier
resnet50.fc = nn.Sequential(
    nn.Linear(num_features, 6),
    nn.Sigmoid()
)
#load and prepare the data for resnet , edit is so it would be as resnet expect it 
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=[0.485, 0.456, 0.406], 
        std=[0.229, 0.224, 0.225]
    )
])


In [18]:
#training the final layers of the model
criterion = nn.BCELoss()  # Multi-label = Binary Cross Entropy
optimizer = torch.optim.Adam(resnet50.fc.parameters(), lr=0.001)
train_dataset = SafetyDataset(train_df, '/kaggle/working/resized_images', train_labels, transform=transform)
val_dataset   = SafetyDataset(val_df,   '/kaggle/working/resized_images', val_labels,   transform=transform)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader   = DataLoader(val_dataset, batch_size=32, shuffle=False)

num_epochs = 10
best_val_loss = float('inf')
patience = 2
counter = 0

for epoch in range(num_epochs):
    resnet50.train()
    running_loss = 0.0

    for images, labels in train_loader:
        outputs = resnet50(images)
        loss = criterion(outputs, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    avg_train_loss = running_loss / len(train_loader)

    # Validation
    resnet50.eval()
    val_loss = 0.0
    with torch.no_grad():
        for images, labels in val_loader:
            outputs = resnet50(images)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
    avg_val_loss = val_loss / len(val_loader)

    print(f"Epoch [{epoch+1}/{num_epochs}] - Train Loss: {avg_train_loss:.4f} - Val Loss: {avg_val_loss:.4f}")

    # Early stopping
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        counter = 0
        torch.save(resnet50.state_dict(), 'best_resnet50.pth')
    else:
        counter += 1
        if counter >= patience:
            print("Early stopping triggered.")
            break


Epoch [1/10] - Train Loss: 0.5208 - Val Loss: 0.4640
Epoch [2/10] - Train Loss: 0.4393 - Val Loss: 0.4269
Epoch [3/10] - Train Loss: 0.3994 - Val Loss: 0.4012
Epoch [4/10] - Train Loss: 0.3756 - Val Loss: 0.3854
Epoch [5/10] - Train Loss: 0.3515 - Val Loss: 0.3734
Epoch [6/10] - Train Loss: 0.3337 - Val Loss: 0.3640
Epoch [7/10] - Train Loss: 0.3178 - Val Loss: 0.3574
Epoch [8/10] - Train Loss: 0.3003 - Val Loss: 0.3467
Epoch [9/10] - Train Loss: 0.2875 - Val Loss: 0.3450
Epoch [10/10] - Train Loss: 0.2772 - Val Loss: 0.3328


In [19]:
# Load the best resnet50 model
resnet50.load_state_dict(torch.load('best_resnet50.pth'))
resnet50.eval()

correct_predictions = 0
total_labels = 0

with torch.no_grad():
    for images, labels in test_loader:
        outputs = resnet50(images)
        preds = (outputs > 0.5).float()
        correct_predictions += (preds == labels).sum().item()
        total_labels += labels.numel()

test_accuracy = (correct_predictions / total_labels) * 100
print(f"Test Accuracy: {test_accuracy:.2f}%")


Test Accuracy: 63.04%


In [26]:
from torchvision import transforms

# 🔧 Define test transform with ResNet normalization
test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=[0.485, 0.456, 0.406],  # ResNet expected mean
        std=[0.229, 0.224, 0.225]    # ResNet expected std
    )
])

# 📂 Test image directory
test_dir = '/kaggle/input/draft-safety/test_safety/test_safety'

# 🧾 List of test image file names
test_filenames = sorted([f for f in os.listdir(test_dir) if f.endswith('.jpg')])
test_df = pd.DataFrame({'image_id': [f.replace('.jpg', '') for f in test_filenames]})

# 🧠 Create test dataset and loader
test_dataset = SafetyDataset(test_df, test_dir, labels=[[0]*6]*len(test_df), transform=test_transform)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# 🔍 Run inference with resnet50
resnet50.eval()
predictions = []

with torch.no_grad():
    for images, _ in test_loader:
        outputs = resnet50(images)
        preds = (outputs > 0.5).int().tolist()
        predictions.extend(preds)

# 📝 Prepare submission file
submission = pd.DataFrame()
submission['image_id'] = test_df['image_id']
submission['labels'] = [' '.join([label for i, label in enumerate(all_labels) if p[i]]) for p in predictions]
submission.to_csv('submission1.csv', index=False)

print("✅ submission1.csv ready with ResNet50!")


✅ submission1.csv ready with ResNet50!


In [29]:
for name, param in resnet50.named_parameters():
    if "layer4" in name or "fc" in name:
        param.requires_grad = True
    else:
        param.requires_grad = False


In [33]:
optimizer = torch.optim.Adam(
    filter(lambda p: p.requires_grad, resnet50.parameters()),
    lr=0.001
)
#training the final layers of the model
criterion = nn.BCELoss()  # Multi-label = Binary Cross Entropy
optimizer = torch.optim.Adam(resnet50.fc.parameters(), lr=0.001)
train_dataset = SafetyDataset(train_df, '/kaggle/working/resized_images', train_labels, transform=transform)
val_dataset   = SafetyDataset(val_df,   '/kaggle/working/resized_images', val_labels,   transform=transform)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader   = DataLoader(val_dataset, batch_size=32, shuffle=False)

num_epochs = 20
best_val_loss = float('inf')
patience = 2
counter = 0

for epoch in range(num_epochs):
    resnet50.train()
    running_loss = 0.0

    for images, labels in train_loader:
        outputs = resnet50(images)
        loss = criterion(outputs, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    avg_train_loss = running_loss / len(train_loader)

    # Validation
    resnet50.eval()
    val_loss = 0.0
    with torch.no_grad():
        for images, labels in val_loader:
            outputs = resnet50(images)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
    avg_val_loss = val_loss / len(val_loader)

    print(f"Epoch [{epoch+1}/{num_epochs}] - Train Loss: {avg_train_loss:.4f} - Val Loss: {avg_val_loss:.4f}")

    # Early stopping
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        counter = 0
        torch.save(resnet50.state_dict(), 'best_resnet50_two.pth')
    else:
        counter += 1
        if counter >= patience:
            print("Early stopping triggered.")
            break


Epoch [1/20] - Train Loss: 0.2006 - Val Loss: 0.2945
Epoch [2/20] - Train Loss: 0.1975 - Val Loss: 0.2957
Epoch [3/20] - Train Loss: 0.1895 - Val Loss: 0.2953
Early stopping triggered.


In [34]:
# Load the best resnet50 model
resnet50.load_state_dict(torch.load('best_resnet50_two.pth'))
resnet50.eval()

correct_predictions = 0
total_labels = 0

with torch.no_grad():
    for images, labels in test_loader:
        outputs = resnet50(images)
        preds = (outputs > 0.5).float()
        correct_predictions += (preds == labels).sum().item()
        total_labels += labels.numel()

test_accuracy = (correct_predictions / total_labels) * 100
print(f"Test Accuracy: {test_accuracy:.2f}%")


Test Accuracy: 61.38%


In [35]:
from torchvision import transforms

# 🔧 Define test transform with ResNet normalization
test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=[0.485, 0.456, 0.406],  # ResNet expected mean
        std=[0.229, 0.224, 0.225]    # ResNet expected std
    )
])

# 📂 Test image directory
test_dir = '/kaggle/input/draft-safety/test_safety/test_safety'

# 🧾 List of test image file names
test_filenames = sorted([f for f in os.listdir(test_dir) if f.endswith('.jpg')])
test_df = pd.DataFrame({'image_id': [f.replace('.jpg', '') for f in test_filenames]})

# 🧠 Create test dataset and loader
test_dataset = SafetyDataset(test_df, test_dir, labels=[[0]*6]*len(test_df), transform=test_transform)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# 🔍 Run inference with resnet50
resnet50.eval()
predictions = []

with torch.no_grad():
    for images, _ in test_loader:
        outputs = resnet50(images)
        preds = (outputs > 0.5).int().tolist()
        predictions.extend(preds)

# 📝 Prepare submission file
submission = pd.DataFrame()
submission['image_id'] = test_df['image_id']
submission['labels'] = [' '.join([label for i, label in enumerate(all_labels) if p[i]]) for p in predictions]
submission.to_csv('submission2.csv', index=False)

print("✅ submission1.csv ready with ResNet50!")


✅ submission1.csv ready with ResNet50!
