In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms, models
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import pandas as pd
import os
from sklearn.model_selection import train_test_split
from tqdm import tqdm
import zipfile

In [2]:
# Extract images from the two zip files

!git clone https://github.com/Prashant-AV/Qualcomm-DL-Hackathon.git

# Function to remove spaces from file names in a given directory
def remove_spaces_in_filenames(directory):
    for filename in os.listdir(directory):
        if ' ' in filename:
            new_filename = filename.replace(' ', '_')
            os.rename(os.path.join(directory, filename), os.path.join(directory, new_filename))
            print(f'Renamed: {filename} -> {new_filename}')

# Function to extract images from a zip file and return a list of image file paths
def extract_images(zip_file, extract_to):
    with zipfile.ZipFile(zip_file, 'r') as zip_ref:
        zip_ref.extractall(extract_to)
        return [os.path.join(extract_to, file) for file in zip_ref.namelist() if file.lower().endswith(('.png', '.jpg', '.jpeg'))]

# Function to combine images and save them to a new zip file
def combine_images_to_zip(image_files, output_zip):
    with zipfile.ZipFile(output_zip, 'w') as zipf:
        for image_file in image_files:
            zipf.write(image_file, os.path.basename(image_file))

# Specify the directory containing the files
directory = '/content/Qualcomm-DL-Hackathon/train'

# Remove spaces from file names
remove_spaces_in_filenames(directory)

zip_file1 = '/content/Qualcomm-DL-Hackathon/train/images_part-1.zip'
zip_file2 = '/content/Qualcomm-DL-Hackathon/train/images_part-2.zip'

extract_to1 = 'extracted_zip1'
extract_to2 = 'extracted_zip2'

image_files1 = extract_images(zip_file1, extract_to1)
image_files2 = extract_images(zip_file2, extract_to2)

# Combine the images and save them to a new zip file
combined_image_files = image_files1 + image_files2
output_zip = 'images.zip'
combine_images_to_zip(combined_image_files, output_zip)

Cloning into 'Qualcomm-DL-Hackathon'...
remote: Enumerating objects: 10, done.[K
remote: Counting objects: 100% (10/10), done.[K
remote: Compressing objects: 100% (10/10), done.[K
remote: Total 10 (delta 1), reused 0 (delta 0), pack-reused 0 (from 0)[K
Receiving objects: 100% (10/10), 30.68 MiB | 15.20 MiB/s, done.
Resolving deltas: 100% (1/1), done.
Renamed: images part-1.zip -> images_part-1.zip
Renamed: images part-2.zip -> images_part-2.zip


In [3]:
extract_dir = "/content/images"
os.makedirs(extract_dir, exist_ok=True)

# Open and extract the zip file
with zipfile.ZipFile("images.zip", 'r') as zip_ref:
    zip_ref.extractall(extract_dir)

print(f"Contents extracted to {extract_dir}")

Contents extracted to /content/images


In [4]:
# Emergency Dataset Class
class EmergencyDataset(Dataset):
    def __init__(self, csv_file, root_dir, transform=None):
        self.data_frame = pd.read_csv(csv_file)
        self.root_dir = root_dir
        self.transform = transform

    def __len__(self):
        return len(self.data_frame)

    def __getitem__(self, idx):
        img_name = os.path.join(self.root_dir, self.data_frame.iloc[idx, 0])
        image = Image.open(img_name).convert("RGB")
        label = int(self.data_frame.iloc[idx, 1])

        if self.transform:
            image = self.transform(image)

        return image, label

In [None]:
# Split train.csv into train and test sets
train_csv = '/content/Qualcomm-DL-Hackathon/train/train.csv'
data = pd.read_csv(train_csv)
# Split the data into train and test sets
train_data, test_data = train_test_split(data, test_size=0.2, random_state=50)

# Further split the train_data into train and validation sets
train_data, val_data = train_test_split(train_data, test_size=0.2, random_state=50)

# Save the split data into new CSV files
train_data.to_csv('train_split.csv', index=False)
val_data.to_csv('val_split.csv', index=False)
test_data.to_csv('test_split.csv', index=False)

print("Data has been split into train_split.csv, val_split.csv, and test_split.csv")

# Define transformations
#transform = transforms.Compose([
#    transforms.Resize((128, 128)),
#    transforms.ToTensor(),
#     transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
#])

train_transform = transforms.Compose([
    transforms.RandomResizedCrop(224),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

test_transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# Define transformations
#transform = transforms.Compose([
#    transforms.Resize((128, 128)),
#    transforms.ToTensor(),
#    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
#])


Data has been split into train_split.csv, val_split.csv, and test_split.csv


In [None]:
# Load Data
root_dir = '/content/images'
train_dataset = EmergencyDataset(csv_file='train_split.csv', root_dir=root_dir, transform=transform)
val_dataset = EmergencyDataset(csv_file='val_split.csv', root_dir=root_dir, transform=transform)
test_dataset = EmergencyDataset(csv_file='test_split.csv', root_dir=root_dir, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)


In [None]:
#Hyperparams
batch_size = 32
learning_rate = 0.001
num_epochs = 10
num_classes = 2

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class EmergencyVehicleClassifier(nn.Module):
    def __init__(self, num_classes):
      super(EmergencyVehicleClassifier, self).__init__()
      self.conv1 = nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1)
      self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
      self.conv3 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
      self.pool = nn.MaxPool2d(kernel_size=2, stride=2, padding=0, ceil_mode=True)
      self.fc1 = nn.Linear(128 * 16 * 16, 512)
      self.fc2 = nn.Linear(512, num_classes)  # 2 classes(emergency and non-emergency vehicle)
      self.dropout = nn.Dropout(0.5)
      #self.relu = nn.ReLU()
      self.sigmoid = nn.Sigmoid()


    def forward(self, x):
        x = self.pool(self.relu(self.conv1(x)))
        x = self.pool(self.relu(self.conv2(x)))
        x = self.pool(self.relu(self.conv3(x)))
        x = x.view(x.size(0),-1)
        x = self.dropout(x)
        x = self.fc2(x)
        return x


model =  EmergencyVehicleClassifier(num_classes=num_classes).to(device)
print (model)

# Define the loss function
criterion = nn.BCELoss()  # Binary Cross-Entropy Loss
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Modify the fully connected layer
#model.fc2 = nn.Sequential(
#    nn.Dropout(0.3),
#    nn.Linear(model.fc2.in_features, 1),  # Output layer with 1 unit for binary classification
#    nn.Sigmoid()  # Sigmoid activation for binary classification
#)
#model = model.to(device)


ClassifyEmergencyVehicleOrNot(
  (conv1): Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv2): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv3): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=True)
  (fc1): Linear(in_features=32768, out_features=512, bias=True)
  (fc2): Linear(in_features=512, out_features=2, bias=True)
  (dropout): Dropout(p=0.5, inplace=False)
  (sigmoid): Sigmoid()
)


In [None]:
## Model training function
def train(model, loader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    best_val_loss = float('inf')

    for inputs, labels in loader:
        inputs, labels = inputs.to(device), labels.to(device)

        # Reshape labels to match the output shape
        #labels = labels.view(-1, 1).float()

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        predicted = (outputs > 0.5).float()
        total += labels.size(0)
        correct += (predicted == labels).sum().item()


    epoch_acc = 100 * correct / total
    epoch_loss = running_loss / len(loader)
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}, Accuracy: {epoch_acc:.2f}%")
    return epoch_loss, epoch_acc


In [None]:
 ## Model validation function
 def validate(model, loader, criterion, device):
    model.eval()
    val_loss = 0.0
    val_correct = 0
    val_total = 0
    with torch.no_grad():
        for inputs, labels in loader:
            inputs, labels = inputs.to(device), labels.to(device)
            #labels = labels.view(-1, 1).float()
            outputs = model(inputs)
            val_loss = criterion(outputs, labels)
            running_loss += loss.item()
            predicted = (outputs > 0.5).float()
            val_total += labels.size(0)
            val_correct += (predicted == labels).sum().item()

    val_loss = running_loss / len(loader)
    val_acc = 100 * val_correct / val_total
    print(f"Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_acc:.2f}%")


In [None]:
##Actual training the model and save the best model over epochs

best_val_loss = float('inf')

for epoch in range(num_epochs):
    train_loss, train_acc = train(model, train_loader, criterion, optimizer, device)
    val_loss, val_acc = validate(model, test_loader, criterion, device)

    print(f"Epoch [{epoch + 1}/{num_epochs}] "
          f"Train Loss: {train_loss: .4f}, Train Acc: {train_acc:.2f}% "
          f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc: .2f}%")

    # Save the model if validation loss has decreased
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), 'emergency_vehicle_classifer.pth')
        print("Model saved!")

# Load the best model
model.load_state_dict(torch.load('emergency_vehicle_classifier.pth'))

In [None]:
# Load the test data
#actualtest_csv = '/content/Qualcomm-DL-Hackathon/test.csv'
#actualtest_dataset = EmergencyDataset(csv_file=actualtest_csv, root_dir=root_dir, transform=transform)
#actualtest_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)


In [None]:
def test_model(model, loader, device):
    model.eval()
    correct = 0
    total = 0
    class_correct = [0] * num_classes
    class_total = [0] * num_classes
    results = []

    with torch.no_grad():
      for inputs, labels in loader:
        inputs , labels = inputs.to(device) , labels.to(device)
        outputs = model(inputs)
        #_, predicted = torch.max(outputs, 1)
        predicted = (outputs > 0.5).float()

        total += labels.size(0)
        correct += (predicted == labels).sum().item()

        for img_name, pred in zip(inputs, predicted):
            results.append([img_name, pred.item()])

     # Save the results to sample_submissions.csv
    submission_df = pd.DataFrame(results, columns=['image_names', 'emergency_or_not'])
    submission_df.to_csv('sample_submissions.csv', index=False)

    print(f"test accuracy : {100 * correct / total:.2f}%")



