In [1]:
import numpy as np

In [4]:
def fedavg(models:list):
    return np.mean(models)

In [10]:
def krum(models:list):
    return np.max(models)

In [41]:
def aggregator(func, data):
    return func(data)

In [42]:
a = fedavg
aggregator(a, [1,2])

1.5

In [24]:
import random

In [38]:
random.randint(0,3)

2

In [8]:
aggregator(fedavg, a)

13.666666666666666

In [11]:
aggregator(krum, a)

34

In [15]:
from typing import Union
import random

In [13]:
type(1)==int

True

In [21]:
from util import generate_attack_matrix
    

In [23]:
node_list = list(range(10))
attack_type = 'label flipping'
targeted = False
poisoned_node_ratio = 20
noise_injected_ratio = 20
poisoned_sample_ratio = 100
generate_attack_matrix(node_list, attack_type, targeted, poisoned_node_ratio, noise_injected_ratio, poisoned_sample_ratio)

attacked_node_list_[8, 0]


{0: {'attack_type': 'label flipping',
  'targeted': False,
  'noise_injected_ratio': 20,
  'poisoned_sample_ratio': 100},
 1: {'attack_type': 'no attack',
  'targeted': None,
  'noise_injected_ratio': None,
  'poisoned_sample_ratio': None},
 2: {'attack_type': 'no attack',
  'targeted': None,
  'noise_injected_ratio': None,
  'poisoned_sample_ratio': None},
 3: {'attack_type': 'no attack',
  'targeted': None,
  'noise_injected_ratio': None,
  'poisoned_sample_ratio': None},
 4: {'attack_type': 'no attack',
  'targeted': None,
  'noise_injected_ratio': None,
  'poisoned_sample_ratio': None},
 5: {'attack_type': 'no attack',
  'targeted': None,
  'noise_injected_ratio': None,
  'poisoned_sample_ratio': None},
 6: {'attack_type': 'no attack',
  'targeted': None,
  'noise_injected_ratio': None,
  'poisoned_sample_ratio': None},
 7: {'attack_type': 'no attack',
  'targeted': None,
  'noise_injected_ratio': None,
  'poisoned_sample_ratio': None},
 8: {'attack_type': 'label flipping',
  'targ

In [44]:
from typing import OrderedDict, List
import torch
import logging

In [None]:
def cosine_metric2(model1: OrderedDict[str, torch.Tensor], model2: OrderedDict[str, torch.Tensor], similarity: bool = False) -> Optional[float]:
    if model1 is None or model2 is None:
        logging.info("Cosine similarity cannot be computed due to missing model")
        return None

    cos_similarities = []

    for layer in model1:
        if layer in model2:
            l1 = model1[layer].flatten()
            l2 = model2[layer].flatten()
            if l1.shape != l2.shape:
                # Adjust the shape of the smaller layer to match the larger layer
                min_len = min(l1.shape[0], l2.shape[0])
                l1, l2 = l1[:min_len], l2[:min_len]

            cos_sim = torch.nn.functional.cosine_similarity(l1.unsqueeze(0), l2.unsqueeze(0), dim=1)
            cos_similarities.append(cos_sim.item())

    if cos_similarities:
        avg_cos_sim = torch.mean(torch.tensor(cos_similarities))
        # result = torch.clamp(avg_cos_sim, min=0).item()
        # return result
        return avg_cos_sim.item() if similarity else (1 - avg_cos_sim.item())
    else:
        return None
    
def cosine_metric(model1: OrderedDict, model2: OrderedDict, similarity: bool = False) -> Optional[float]:
    if model1 is None or model2 is None:
        logging.info("Cosine similarity cannot be computed due to missing model")
        return None

    cos_similarities: List = []

    for layer in model1:
        if layer in model2:
            l1 = model1[layer].to('cpu')
            l2 = model2[layer].to('cpu')
            if l1.shape != l2.shape:
                # Adjust the shape of the smaller layer to match the larger layer
                min_len = min(l1.shape[0], l2.shape[0])
                l1, l2 = l1[:min_len], l2[:min_len]
            cos = torch.nn.CosineSimilarity(dim=l1.dim() - 1)
            cos_mean = torch.mean(cos(l1.float(), l2.float())).mean()
            cos_similarities.append(cos_mean)
        else:
            logging.info("Layer {} not found in model 2".format(layer))

    if cos_similarities:    
        cos = torch.Tensor(cos_similarities)
        avg_cos = torch.mean(cos)
        relu_cos = torch.nn.functional.relu(avg_cos)  # relu to avoid negative values
        return relu_cos.item() if similarity else (1 - relu_cos.item())
    else:
        return None
        

def euclidean_metric(model1: OrderedDict[str, torch.Tensor], model2: OrderedDict[str, torch.Tensor], standardized: bool = False, similarity: bool = False) -> Optional[float]:
    if model1 is None or model2 is None:
        return None

    distances = []

    for layer in model1:
        if layer in model2:
            l1 = model1[layer].flatten()
            l2 = model2[layer].flatten()
            if standardized:
                l1 = (l1 - l1.mean()) / l1.std()
                l2 = (l2 - l2.mean()) / l2.std()
            
            distance = torch.norm(l1 - l2, p=2)
            if similarity:
                norm_sum = torch.norm(l1, p=2) + torch.norm(l2, p=2)
                similarity_score = 1 - (distance / norm_sum if norm_sum != 0 else 0)
                distances.append(similarity_score.item())
            else:
                distances.append(distance.item())

    if distances:
        avg_distance = torch.mean(torch.tensor(distances))
        return avg_distance.item()
    else:
        return None
    

def minkowski_metric(model1: OrderedDict[str, torch.Tensor], model2: OrderedDict[str, torch.Tensor], p: int, similarity: bool = False) -> Optional[float]:
    if model1 is None or model2 is None:
        return None

    distances = []

    for layer in model1:
        if layer in model2:
            l1 = model1[layer].flatten()
            l2 = model2[layer].flatten()

            distance = torch.norm(l1 - l2, p=p)
            if similarity:
                norm_sum = torch.norm(l1, p=p) + torch.norm(l2, p=p)
                similarity_score = 1 - (distance / norm_sum if norm_sum != 0 else 0)
                distances.append(similarity_score.item())
            else:
                distances.append(distance.item())

    if distances:
        avg_distance = torch.mean(torch.tensor(distances))
        return avg_distance.item()
    else:
        return None

def chebyshev_metric(model1: OrderedDict[str, torch.Tensor], model2: OrderedDict[str, torch.Tensor], similarity: bool = False) -> Optional[float]:
    if model1 is None or model2 is None:
        return None

    distances = []

    for layer in model1:
        if layer in model2:
            l1 = model1[layer].flatten()
            l2 = model2[layer].flatten()

            distance = torch.norm(l1 - l2, p=float('inf'))
            if similarity:
                norm_sum = torch.norm(l1, p=float('inf')) + torch.norm(l2, p=float('inf'))
                similarity_score = 1 - (distance / norm_sum if norm_sum != 0 else 0)
                distances.append(similarity_score.item())
            else:
                distances.append(distance.item())

    if distances:
        avg_distance = torch.mean(torch.tensor(distances))
        return avg_distance.item()
    else:
        return None


def manhattan_metric(model1: OrderedDict[str, torch.Tensor], model2: OrderedDict[str, torch.Tensor], similarity: bool = False) -> Optional[float]:
    if model1 is None or model2 is None:
        return None

    distances = []

    for layer in model1:
        if layer in model2:
            l1 = model1[layer].flatten()
            l2 = model2[layer].flatten()

            distance = torch.norm(l1 - l2, p=1)
            if similarity:
                norm_sum = torch.norm(l1, p=1) + torch.norm(l2, p=1)
                similarity_score = 1 - (distance / norm_sum if norm_sum != 0 else 0)
                distances.append(similarity_score.item())
            else:
                distances.append(distance.item())

    if distances:
        avg_distance = torch.mean(torch.tensor(distances))
        return avg_distance.item()
    else:
        return None


def pearson_correlation_metric(model1: OrderedDict[str, torch.Tensor], model2: OrderedDict[str, torch.Tensor], similarity: bool = False) -> Optional[float]:
    if model1 is None or model2 is None:
        return None

    correlations = []

    for layer in model1:
        if layer in model2:
            l1 = model1[layer].flatten()
            l2 = model2[layer].flatten()

            if l1.shape != l2.shape:
                min_len = min(l1.shape[0], l2.shape[0])
                l1, l2 = l1[:min_len], l2[:min_len]

            correlation = torch.corrcoef(torch.stack((l1, l2)))[0, 1]
            if similarity:
                adjusted_similarity = (correlation + 1) / 2
                correlations.append(adjusted_similarity.item())
            else:
                correlations.append(1 - (correlation + 1) / 2)

    if correlations:
        avg_correlation = torch.mean(torch.tensor(correlations))
        return avg_correlation.item()
    else:
        return None

In [43]:
def get_repution(repution_func, model, current_round_nei_models):
    nei_repution_score = {}
    for nei in current_round_nei_models:
        nei_repution_score[nei] = repution_func(model, current_round_nei_models[nei])
    return nei_repution_score