In [None]:
# !pip install mpi4py

In [None]:
# !unzip /content/archive.zip

In [None]:
!mkdir weights

In [None]:
!touch mpi.py

In [None]:
!mpirun -np 4 --allow-run-as-root python mpi.py

##################### -- TRAIN -- #####################
 93%|█████████▎| 13/14 [00:02<00:00,  4.81it/s]Epoch of train 1: acc -- 0.65, loss -- 0.814472034573555, process -- 3
100%|██████████| 14/14 [00:02<00:00,  5.52it/s]Epoch of train 1: acc -- 0.6153846153846154, loss -- 0.7842612862586975, process -- 2
Epoch of train 1: acc -- 0.66, loss -- 0.7632881939411164, process -- 1

 71%|███████▏  | 5/7 [00:00<00:00,  5.64it/s]Epoch of val 1: acc -- 0.7333333333333333, loss -- 0.5612647831439972, process -- 2
 86%|████████▌ | 6/7 [00:01<00:00,  5.58it/s]Epoch of val 1: acc -- 0.55, loss -- 0.6474131643772125, process -- 3
100%|██████████| 7/7 [00:01<00:00,  5.81it/s]Epoch of val 1: acc -- 0.85, loss -- 0.43463655933737755, process -- 1

100%|██████████| 14/14 [00:02<00:00,  5.91it/s]Epoch of train 2: acc -- 0.7583333333333333, loss -- 0.6066157072782516, process -- 3
Epoch of train 2: acc -- 0.8384615384615385, loss -- 0.43640569425546205, process -- 2
Epoch of train 2: acc -- 0.746666666666

In [None]:
import torch
import torchvision
from torch.utils.data import Dataset
from torchvision import datasets, transforms, models
from torchvision.datasets.vision import data
from torchvision.transforms import ToTensor
import os
import pandas as pd
from PIL import Image
import cv2
from mpi4py import MPI
import torch.optim as optim
import torch.nn as nn
from tqdm import tqdm

def train(model, criterion, optimizer, dataloader, sizes, my_rank, epochs=10):
    best_score = 0.0
    for epoch in range(epochs):
        model.train()
        epoch_loss = 0.0
        score = 0
        if my_rank == 0:
            iteration = 0
            for image, label in tqdm(dataloader['train']):
                comm.send(image, dest=iteration % (p - 1) + 1, tag=0)
                comm.send(label, dest=iteration % (p - 1) + 1, tag=1)
                iteration += 1
        if my_rank != 0:
            size = 0
            for i in range(len(dataloader['train'])):
                if i % (p - 1) + 1 == my_rank:
                    image = comm.recv(source=0, tag=0)
                    label = comm.recv(source=0, tag=1)
                    size += image.size(0)
                    optimizer.zero_grad()
                    out = model(image)
                    _, preds = torch.max(out, 1)
                    loss = criterion(out, label)
                    loss.backward()
                    optimizer.step()
                    epoch_loss += loss.item() * image.size(0)
                    score += torch.sum(preds == label.data)
            epoch_acc = score.double() / size 
            epoch_loss = epoch_loss / size
            print(f"Epoch of train {epoch + 1}: acc -- {epoch_acc.item()}, loss -- {epoch_loss}, process -- {my_rank}")
        MPI.Comm.Barrier(MPI.COMM_WORLD)
        score = 0
        epoch_loss = 0.0
        model.eval()
        with torch.no_grad():
            if my_rank == 0:
                iteration = 0
                for image, label in tqdm(dataloader['val']):
                    comm.send(image, dest=iteration % (p - 1) + 1, tag=0)
                    comm.send(label, dest=iteration % (p - 1) + 1, tag=1)
                    iteration += 1
            if my_rank != 0:
                size = 0
                for i in range(len(dataloader['val'])):
                    if i % (p - 1) + 1 == my_rank:
                        image = comm.recv(source=0, tag=0)
                        label = comm.recv(source=0, tag=1)
                        size += image.size(0)
                        out = model(image)
                        _, preds = torch.max(out, 1)
                        loss = criterion(out, label)
                        epoch_loss += loss.item() * image.size(0)
                        score += torch.sum(preds == label.data)
                epoch_acc = score.double() / size
                epoch_loss = epoch_loss / size
                print(f"Epoch of val {epoch + 1}: acc -- {epoch_acc.item()}, loss -- {epoch_loss}, process -- {my_rank}")
            MPI.Comm.Barrier(MPI.COMM_WORLD)
        if my_rank != 0:
            if epoch == 0:
                best_loss = epoch_loss
            if epoch_acc > best_score and epoch_loss <= best_loss:
                best_score = epoch_acc
                best_loss = epoch_loss
                torch.save(model.state_dict(), f"./weights/model_{my_rank}.pth")

def test(model, criterion, dataloader_test, dataset_sizes_test):
    score = 0
    epoch_loss = 0.0
    model.eval()
    result = 0
    with torch.no_grad():
        if my_rank != 0:
            for image, label in tqdm(dataloader_test):
                out = model(image)
                comm.send(out, dest=0, tag=0)
                _, preds = torch.max(out, 1)
                loss = criterion(out, label)
                epoch_loss += loss.item() * image.size(0)
                score += torch.sum(preds == label.data)
            epoch_acc = score.double() / dataset_sizes_test
            epoch_loss = epoch_loss / dataset_sizes_test
            print(f"Test: acc -- {epoch_acc.item()}, loss -- {epoch_loss}, process -- {my_rank}")
        if my_rank == 0:
            result = 0
            for _, label in dataloader_test:
                result_all_models = torch.zeros(label.size(0), 2)
                for procid in range(1, p):
                    out = comm.recv(source=procid, tag=0)
                    result_all_models += out
                result_all_models /= p - 1
                _, preds = torch.max(result_all_models, 1)
                result += torch.sum(preds == label.data)
            result = result.double() / dataset_sizes_test
            print(f"Test process {my_rank}: acc -- {result.item()}")

class Dataset_Creature(Dataset):
    def __init__(self, data_frame, transforms=None):
        self.data_frame=data_frame
        self.transforms=transforms

    def __len__(self):
        return len(self.data_frame)

    def __getitem__(self, idx):
        path = self.data_frame.iloc[idx, 0]
        image = Image.open(path).convert('RGB')
        label = self.data_frame.iloc[idx, 1]
        if self.transforms:
            image = self.transforms(image)
        return image, label

def Create_Dataloader(condition, datas, transform=None, shuffles=None):
    if condition == True:
        d_dataloaders = {}
        d_sizes = {}
        for data_type, shuffle, data in zip(['train', 'val'], shuffles, datas):
            dataset = Dataset_Creature(data_frame=data, transforms=transform)
            d_dataloaders[data_type] = torch.utils.data.DataLoader(dataset, batch_size=30, shuffle=shuffle)
            d_sizes[data_type] = len(dataset)
        return d_dataloaders, d_sizes
    else:
        dataset = Dataset_Creature(data_frame=datas, transforms=transform)
        dataloader = torch.utils.data.DataLoader(dataset, batch_size=30, shuffle=False)
        dataset_sizes = len(dataset)
        return dataloader, dataset_sizes

def Model_Load_Creature(load=False, my_rank=None):
    criterion = nn.CrossEntropyLoss()
    if load == False:
        model_ft = models.mobilenet_v3_small(pretrained=True)
        model_ft.classifier[3] = torch.nn.Linear(in_features=model_ft.classifier[3].in_features, out_features=2)
        optimizer_ft = optim.AdamW(model_ft.parameters(), lr=0.001)
        return model_ft, criterion, optimizer_ft
    else:
        model_ft = models.mobilenet_v3_small(pretrained=False, num_classes=2)
        model_ft.load_state_dict(torch.load(f'/content/weights/model_{my_rank}.pth'))
        return model_ft, criterion

def Parsing_Data(images_directory):
    all_dir = os.listdir(images_directory)
    train = list()
    val = list()
    test = list()
    for lbl, path in enumerate(all_dir):
        all_paths = os.listdir(images_directory + '/' + path)
        for iteration, image in enumerate(all_paths):
            if iteration < 200:
                train.append([f'{images_directory}/{path}/{image}', lbl])
            elif iteration >= 200 and iteration < 300:
                val.append([f'{images_directory}/{path}/{image}', lbl])
            else:
                test.append([f'{images_directory}/{path}/{image}', lbl])
    return pd.DataFrame(train, columns=[0, 1]), pd.DataFrame(val, columns=[0, 1]), pd.DataFrame(test, columns=[0, 1])

if __name__ == "__main__":
    transform = transforms.Compose([
        transforms.Resize((32, 32)), 
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ])
    images_directory = '/content/Data'
    shuffles = [True, False]
    train_data, val_data, test_data = Parsing_Data(images_directory=images_directory)
    dataloader, sizes = Create_Dataloader(condition=True, datas=[train_data, val_data], transform=transform, shuffles=shuffles)
    comm = MPI.COMM_WORLD
    my_rank = comm.Get_rank()
    p = comm.Get_size()
    if my_rank == 0:
        print("##################### -- TRAIN -- #####################")
    model, criterion, optimizer = Model_Load_Creature(False)
    train(model, criterion, optimizer, dataloader, sizes, my_rank, epochs=15)
    MPI.Comm.Barrier(MPI.COMM_WORLD)
    if my_rank == 0:
        print("##################### -- TEST -- #####################")
    dataloader, sizes = Create_Dataloader(condition=False, datas=test_data, transform=transform)
    if my_rank != 0:
        model, criterion = Model_Load_Creature(True, my_rank=my_rank)
    else:
        model, criterion, optimizer = Model_Load_Creature(False)
    test(model, criterion, dataloader, sizes)
    MPI.Finalize

In [None]:
https://www.kaggle.com/datasets/akhiljethwa/forest-vs-desert