In [5]:
import os
import sys
import torch
import random
import argparse
import numpy as np
import torch.nn as nn
import torch.optim as optim
from multiprocessing import Process, Manager, Pool
from multiprocessing.pool import ThreadPool
from torch.utils.tensorboard import SummaryWriter


sys.path.append("../")
from models import build_model
from data_distributor import *
from dataloaders import *
import aggregation_rules
from aggregation_rules import Aggregator

# Set seed for reproducibility
seed_value = 1
random.seed(seed_value)
np.random.seed(seed_value)
torch.manual_seed(seed_value)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

In [6]:
# Worker process
def worker(idx, model, data, target, gradients_dict, loss_list, optimizer, criterion, faulty):
    model.train()
    optimizer.zero_grad()
    output = model(data)
    # loss = nn.MSELoss()(output, target)
    loss = criterion(output, target)
    loss.backward()
    if faulty:
        # print("faulty worker")
        # add gaussian noise to gradients
        gradients = [torch.randn_like(p.grad)*100 for p in model.parameters()]
        # gradients = [p.grad+(torch.randn_like(p.grad)*10) for p in model.parameters()]
        # gradients = [torch.ones_like(p.grad) * 100000 for p in model.parameters()]
        # convert gradients to numpy array
    else:
        gradients = [p.grad.clone() for p in model.parameters()]
    
    # gradients_list.append(gradients)
    gradients_dict[idx] = gradients
    loss_list.append(loss.item())

def parallel_worker_train(args):
    # this is a helper function for parallelizing worker
    idx, model, data, target, gradients_list, loss_list, optimizer, criterion, faulty = args
    worker(idx, model, data, target, gradients_list, loss_list, optimizer, criterion, faulty)

def setup_train_job(dataset, global_batch_size):
    # Initialize global model and optimizer, and set up datalaoder
    if dataset=="dummy":
        test_loader = None
        data_loader = dummy_dataloader(f=lambda x: x ** 3, num_samples=500)
        model = build_model(arch="simplemodel", class_number=1)
        optimizer = optim.SGD(model.parameters(), lr=0.01)
        criterion = nn.MSELoss()
    elif dataset.lower()=="mnist":
        data_loader, test_loader = mnist_dataloader(global_batch_size)
        model = build_model(arch="mlp", class_number=10)
        optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
        criterion = nn.CrossEntropyLoss()
    elif dataset.lower()=="cifar100":
        data_loader, test_loader = CIFAR100_dataloader(global_batch_size)
        model = build_model(arch="SimpleCNN", class_number=100)
        optimizer = optim.Adam(model.parameters(), lr=0.003)
        criterion = nn.CrossEntropyLoss()
    else:
        raise ValueError("Dataset not supported")
    
    return data_loader, test_loader, model, optimizer, criterion
    
def inference(model, data_loader):
    model.eval()
    correct = 0
    total = 0
    for data_batch, target_batch in data_loader:
        output = model(data_batch)
        _, predicted = torch.max(output.data, 1)
        # print("pred", predicted)
        # print("gt", target_batch)
        total += target_batch.size(0)
        correct += (predicted == target_batch).sum().item()
    return correct / total

In [13]:
dataset = "mnist"
n_epochs = 2
num_sub_batches = 4
global_batch_size = 64
faulty_worker_idxs = [0]
defense_method = None
num_processes = 1

# set device
device = torch.device("cpu")

# print numbers worker, and proc
print(f'Number of workers: {num_sub_batches}')
print(f'Number of processes: {num_processes}')

Number of workers: 4
Number of processes: 1


In [14]:
# multiprocessing manager, and tensorboard writer
manager = Manager()

# setup train job
data_loader, test_loader, global_model, optimizer, criterion = setup_train_job(dataset, global_batch_size)
global_model.to(device)

correct_args = {
        'correct': True,
        'cmodel': "linear",
        'model': global_model
    }
aggregator = Aggregator(correct_args=correct_args)
data_distributor = DDPDataDistributor(num_workers=num_sub_batches, faulty_worker_ids=faulty_worker_idxs)
aggregator.update_faulty_worker_idxs(faulty_worker_idxs)

training_iter = 0
for epoch in range(n_epochs):
    epoch_loss_list = []
    for iteration, (data_batch, target_batch) in enumerate(data_loader):
        training_iter+=1
        global_model.train()
        optimizer.zero_grad()
        gradients_dict = manager.dict()
        loss_list = manager.list()

        # Divide the data_batch and target_batch into smaller batches // again simulating DDP
        data_sub_batches, target_sub_batches, worker_batch_map = data_distributor.distribute(data_batch, target_batch)

        faulty = False
        args_list = []
        for i, (data, target) in enumerate(zip(data_sub_batches, target_sub_batches)):
            data, target = data.to(device), target.to(device)

            if i in faulty_worker_idxs: faulty = True
            else: faulty = False
            args_list.append((i, global_model, data, target, gradients_dict, loss_list, optimizer, criterion, faulty))
        with ThreadPool(processes=num_processes) as pool:
            pool.map(parallel_worker_train, args_list)

        # Assume faulty grad is detected here!!!
        # faulty_worker_idxs: [0]

        # gradients_list = list(gradients_dict.values())
        # aggregated_gradients = aggregation_rules.average_grads(gradients_list)
        aggregated_gradients = aggregator.aggregate(gradients_dict, worker_batch_map)

        # Update global model
        for p, agg_grad in zip(global_model.parameters(), aggregated_gradients):
            p.grad = agg_grad
        optimizer.step()

        avg_iter_loss = sum(loss_list) / len(loss_list)
        epoch_loss_list.append(avg_iter_loss)
        # print(f'Epoch: {epoch+1}, sub-batch: {iteration}, avg sub-batch loss: {round(avg_iter_loss, 3)}')
        # print curr iter / total iter
        print(f'Epoch: {epoch+1}, sub-batch: {iteration+1}/{len(data_loader)}, avg sub-batch loss: {round(avg_iter_loss, 3)}')

        # validation per 10 iterations
        if iteration % 10 == 0:
            acc = inference(global_model, test_loader)
            print("validation accuracy:", acc)
        
        break
        
    break
    # Compute and print the average epoch loss
    avg_epoch_loss = sum(epoch_loss_list) / len(epoch_loss_list)
    print(f'Epoch: {epoch+1} done, avg loss: {round(avg_epoch_loss, 3)}')

Init correction model for worker 0
[CORRECTION] Training ...
[CORRECTION] Training loss (MSE): 0.0002963462
Epoch: 1, sub-batch: 1/938, avg sub-batch loss: 2.313
validation accuracy: 0.1051


In [15]:
worker_batch_map

{0: 0, 1: 1, 2: 2, 3: 3}

In [None]:
import multiprocessing
import time

class Monitor:
    def __init__(self):
        self.process = None
        self.exit_event = multiprocessing.Event()
        self.data_queue = multiprocessing.Queue()

    def start(self):
        self.process = multiprocessing.Process(target=self.run)
        self.process.start()

    def run(self):
        while not self.exit_event.is_set():
            if not self.data_queue.empty():
                data = self.data_queue.get()
                print(f"Received data: {data}")
                # Process the data here
            else:
                time.sleep(1)  # Sleep if there's no data

    def stop(self):
        self.exit_event.set()
        self.process.join()

    def send_data(self, data):
        self.data_queue.put(data)

    def is_running(self):
        return self.process is not None and self.process.is_alive()

In [None]:
def main():
    # Usage
    monitor = Monitor()
    monitor.start()

    try:
        # Main process does its work and sends data to the monitor
        for i in range(5):
            monitor.send_data(f"Data {i}")
            time.sleep(1)
    finally:
        monitor.stop()
        print("Monitor stopped.")

In [None]:
main()

In [2]:
import numpy as np
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error

# Example data
ndarray1 = np.random.rand(100, 3)  # Replace with your data
ndarray2 = np.random.rand(100, 3)  # Replace with your data

# Splitting the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(ndarray1, ndarray2, test_size=0.2, random_state=42)

# Create and train the model
model = LinearRegression()
model.fit(X_train, y_train)

# Predictions and evaluation
y_pred = model.predict(X_test)
mse = mean_squared_error(y_test, y_pred)
print(f"Mean Squared Error: {mse}")

# Use the model to predict new values
new_data = np.array([[0.5, 0.6, 0.7]])  # Example new data
predicted = model.predict(new_data)
print(f"Predicted output: {predicted}")


Mean Squared Error: 0.08929456509440377
Predicted output: [[0.53116393 0.52878067 0.43936241]]


In [3]:
ndarray1.shape

(100, 3)