# Online Heavy Hitter Detection using Federated Lossy Counting


## Step 0: Preparation

Before we begin with the actual code, let's make sure that we have everything we need.

### Installing dependencies

First, we install the necessary packages:

In [1]:
# !pip3 install -U 'flwr[simulation]'

Now that we have all dependencies installed, we can import everything we need for this tutorial:

In [2]:
%load_ext autoreload
%autoreload 2

import numpy as np
# import torch
import flwr as fl

# DEVICE = torch.device("cpu")  # Try "cuda" to train on GPU
# print(
#     f"Training on {DEVICE} using PyTorch {torch.__version__} and Flower {fl.__version__}"
# )

### Data loading

Let's now load the CIFAR-10 training and test set, partition them into ten smaller datasets (each split into training and validation set), and wrap everything in their own `DataLoader`.

In [3]:
import os
from pathlib import Path

NUM_CLIENTS = 2
nnodes = str(NUM_CLIENTS) + "nodes"
main_src = "/home/paula/ecml2024/"
data_set_paritioned_src = "data_june_1000/"
delimiter = ','
top_n = 30


def load_datasets(num_clients: int, cwd:str):
    # Get a list of all files in the directory
    files = os.listdir(cwd)
    # Filter the files to keep only the CSV files
    csv_files = [os.path.join(cwd,file) for file in files if file.endswith('.csv')]
    return csv_files

# Os clientes apontam para diretórios
def load_subfolders(folder):
    subfolders = []
    print(folder)
    for root, dirs, files in os.walk(folder):
        dirs[:] = [d for d in dirs if d != '.ipynb_checkpoints']
        for dir in dirs:
            subfolders.append(os.path.join(root, dir))
    return subfolders


def count_files_by_folder(directory: str):
    diretorio = Path(directory)
    files = [item for item in diretorio.iterdir() if item.is_file()]
    return len(files)

cwd = "/home/paula/ecml2024/data_june_1000/" + str(nnodes)
onlyfolders = load_subfolders(main_src + data_set_paritioned_src + nnodes)

print(onlyfolders)

num_files = 0
for folder in onlyfolders:
    num_files = count_files_by_folder(folder)

/home/paula/ecml2024/data_june_1000/2nodes
['/home/paula/ecml2024/data_june_1000/2nodes/node_0', '/home/paula/ecml2024/data_june_1000/2nodes/node_1']


## Client

So far, we've implemented our client by subclassing `flwr.client.NumPyClient`. The three methods we implemented are `get_parameters`, `fit`, and `evaluate`. Finally, we wrap the creation of instances of this class in a function called `client_fn`:

In [4]:
import pandas as pd
import csv
from datetime import datetime
from river import sketch
from river import stream
import pickle

class FlowerNumPyClient(fl.client.NumPyClient):

    def __init__(self, cid: str, only_folder: str, col_name: str):
        self.cid = cid
        self.onlyfolder = only_folder
        self.colname = col_name
        self.previous_sketch_path = "heavy_hitters_sketch_" + str(self.cid)+".pkl"

    def load_previous_sketch(self):
        """Carrega o heavy_hitters_sketch se o arquivo existir e round > 1."""
        if os.path.exists(self.previous_sketch_path):
            with open(self.previous_sketch_path, 'rb') as f:
                heavy_hitters_sketch = pickle.load(f)
        else:
            heavy_hitters_sketch = None
        return heavy_hitters_sketch
        
    def save_sketch(self, heavy_hitters_sketch):
        """Salva o heavy_hitters_sketch no arquivo."""
        # Garantir que o diretório para o arquivo exista
        directory = os.path.dirname(self.previous_sketch_path)
        if directory:  # Evita erro se o diretório não for especificado
            os.makedirs(directory, exist_ok=True)
        # Salvar o objeto sketch no arquivo
        with open(self.previous_sketch_path, 'wb') as f:
            pickle.dump(heavy_hitters_sketch, f)

    def save_previous_data(self):
        with open('previous_data.pkl', 'wb') as f:
            pickle.dump(self.previous_round_data, f)

    
    def get_next_batch(self, config):
        data = []
        labels = []
        new_file = self.onlyfolder + "/" + "output_" + str(config["current_round"]) + ".csv"
        return new_file

    
    def train(self, config):
        heavy_hitters_sketch = sketch.HeavyHitters(fading_factor=config["fading_factor"], epsilon=config["epsilon"], support=config["support"]) # janelas de tamanho 5000
        #print(config["current_round"])
        
        if (int(config["current_round"]) > 1):
            #if os.path.exists(self.previous_sketch_path):
            with open(self.previous_sketch_path, 'rb') as f:
                heavy_hitters_sketch = pickle.load(f)
        
        cols = ['ANumber','BNumber','DateTime','Action','Result','CountryCode']
        df_bucket_a = pd.DataFrame(columns=['ANumber', 'Count', 'DateTime'])
        new_batch = self.get_next_batch(config)
        X_y = stream.iter_csv(new_batch, delimiter=delimiter, fieldnames=cols)
        for x, y in X_y:
            res = x.__len__()
            if res > 0:
                heavy_hitters_sketch = heavy_hitters_sketch.update(x[self.colname])
                most_common_iter_a = heavy_hitters_sketch.most_common(top_n)
                
                if heavy_hitters_sketch._n % heavy_hitters_sketch._bucket_width == 0:
                    data_obj = datetime.strptime(x['DateTime'], '%Y%m%d%H%M%S')
                    most_common_iter_a = heavy_hitters_sketch.most_common(top_n)
                    bucket_data = x['DateTime']
                    df_new_row = pd.DataFrame(most_common_iter_a, columns=['ANumber', 'Count'])
                    df_new_row['DateTime'] = bucket_data
                    df_new_row = df_new_row.dropna()
                    df_bucket_a = pd.concat([df_bucket_a, df_new_row], axis = 0)
                  
        path_results = main_src + "results_federated/" + data_set_paritioned_src + nnodes + "/node_" + str(self.cid)
        if not os.path.exists(path_results):
             os.makedirs(path_results)
        #print("****** Client *******")
        #print(df_bucket_a)
        df_bucket_a.to_csv(path_results + '/' + "df_bucket_anumber_" + str(config["current_round"]) + ".csv", index=False)

        ## Nova parte para correção do cliente reter a informação e fazer um cumulativo
        # Salvar o sketch atualizado
        self.save_sketch(heavy_hitters_sketch)
        
        return heavy_hitters_sketch, df_bucket_a


    def read_csv_streaming(self, file_path):
        with open(file_path, 'r') as csv_file:
            csv_reader = csv.reader(csv_file)
            for row in csv_reader:
                yield row

    def fit(self, parameters, config):
        hh, df_bucket_hh = self.train(config)

        tuple_bucket_hh = list(df_bucket_hh.values)
        
        # Convertendo a lista de tuplas
        tuple_list = [(x[0], x[1], x[2]) for x in tuple_bucket_hh]

        return tuple_list, 0, {}

    # def evaluate(self, parameters, config):
    #     print(f"[Client {self.cid}] evaluate, config: {config}")
    #     set_parameters(self.net, parameters)
    #     loss, accuracy = test(self.net, self.valloader)
    #     return float(loss), len(self.valloader), {"accuracy": float(accuracy)}

def numpyclient_fn(cid) -> FlowerNumPyClient:
    onlyfolder = onlyfolders[int(cid)]
    col_name = "ANumber"
    return FlowerNumPyClient(cid, onlyfolder, col_name).to_client()

### Server-side


In [5]:
from typing import Callable, Dict, List, Optional, Tuple, Union

import numpy as np

import flwr as fl
from flwr.common import (
    EvaluateIns,
    EvaluateRes,
    FitIns,
    FitRes,
    Parameters,
    Scalar,
    ndarrays_to_parameters,
    parameters_to_ndarrays
)
from flwr.server.client_manager import ClientManager
from flwr.server.client_proxy import ClientProxy
from flwr.server.strategy import Strategy

class FedAnalytics(fl.server.strategy.FedAvg):
    def __init__(
            self, min_fit_clients,min_evaluate_clients, min_available_clients, on_fit_config_fn
    ) -> None:
        super().__init__()
        self.min_fit_clients = min_fit_clients
        self.min_evaluate_clients = min_evaluate_clients
        self.min_available_clients = min_available_clients
        self.on_fit_config_fn = on_fit_config_fn

    def __repr__(self) -> str:
        return "FedAnalytics"

    def initialize_parameters(
            self, client_manager: Optional[ClientManager] = None
    ) -> Optional[Parameters]:
        return None

    # Função personalizada para selecionar a data mais recente
    def most_actual_datetime(series):
        return series.loc[series.idxmax()]

    # Aggregation Strategy - aggregate training results
    def aggregate_fit(
            self,
            server_round: int,
            results: List[Tuple[ClientProxy, FitRes]],
            failures: List[Union[Tuple[ClientProxy, FitRes], BaseException]],
    ) -> Tuple[Optional[Parameters], Dict[str, Scalar]]:
        # Get results from fit

        # Convert results
        values_aggregated = [
            (parameters_to_ndarrays(fit_res.parameters)) for _, fit_res in results if len(fit_res.parameters.tensors) > 0
        ]

        lista_filtrada = []
        for lista_de_arrays in values_aggregated:
            nova_lista = [arr for arr in lista_de_arrays if len(arr) > 0]
            lista_filtrada.append(nova_lista)

        lista_filtrada = [arr for arr in lista_filtrada if len(arr) > 0]

        values_hh_numpy = np.empty((0, 3))
        if (len(lista_filtrada) > 0):
            values_hh_numpy = np.concatenate(lista_filtrada)
    
        # # # Criar DataFrame
        if len(values_hh_numpy) > 0:
            df_hh = pd.DataFrame(values_hh_numpy, columns=['Code', 'Value', 'DateTime'])
    
            # # # Converter a coluna 'Valor' para tipo numérico
            df_hh['Value'] = pd.to_numeric(df_hh['Value'])
            df_hh['DateTime'] = pd.to_datetime(df_hh['DateTime'])

            # # Agrupar e agregar os dados
            df_agrupado = df_hh.groupby('Code').agg({'Value': 'sum', 'DateTime': 'max'}).reset_index()
            df_sorted = df_agrupado.sort_values(by='Value', ascending=False)
            
            # Cortando as n primeiras linhas
            #df_cortado = df_sorted.iloc[:top_n]
            df_cortado = df_sorted.iloc[:5].copy()

            df_cortado['Window'] = server_round
    
            path_results = main_src + "results_federated/" + data_set_paritioned_src + nnodes + "/server/"
    
            if not os.path.exists(path_results):
                os.makedirs(path_results)
    
            df_cortado.to_csv(path_results + "df_bucket_anumber_" + str(server_round) + ".csv", index=False)
            
        #print(df_cortado)
        
        return ndarrays_to_parameters([]), {}

    def evaluate(self, server_round: int, parameters: Parameters
    ) -> Optional[Tuple[float, Dict[str, Scalar]]]:     
        return 0, {}

    def configure_evaluate(
            self, server_round: int, parameters: Parameters, client_manager: ClientManager
    ) -> List[Tuple[ClientProxy, EvaluateIns]]:
        pass

    def aggregate_evaluate(
            self,
            server_round: int,
            results: List[Tuple[ClientProxy, EvaluateRes]],
            failures: List[Union[Tuple[ClientProxy, EvaluateRes], BaseException]],
    ) -> Tuple[Optional[float], Dict[str, Scalar]]:
        pass


We've seen this before, there's nothing new so far. The only *tiny* difference compared to the previous notebook is naming, we've changed `FlowerClient` to `FlowerNumPyClient` and `client_fn` to `numpyclient_fn`. Let's run it to see the output we get:

In [None]:
# a quantidade de subfolders dentro de um folder define a quantidade de clientes

# a subfolder com a maior quantidade de minibatches define o nrounds

import time
import resource

nrounds = num_files
#nrounds = 100
n_clients = NUM_CLIENTS

def fit_config(server_round):
    """Return training configuration dict for each round."""
    config = {
        "fading_factor": 1,
        "epsilon": 0.001,
        "support": 0.0005, # 2 nodes
        #"support": 0.00025, # 4 nodes
        #"support": 0.000125, # 8 nodes
        "current_round": server_round
    }
    return config

strategy = FedAnalytics(
    min_fit_clients=1,
    min_evaluate_clients=1,
    min_available_clients=1,
    on_fit_config_fn=fit_config
    #initial_parameters=fl.common.ndarrays_to_parameters(params)
)

# Specify client resources if you need GPU (defaults to 1 CPU and 0 GPU)
client_resources = {"num_cpus": 4}

start_time = time.time()
before_resources = resource.getrusage(resource.RUSAGE_SELF)

fl.simulation.start_simulation(
    strategy=strategy,
    client_fn=numpyclient_fn,
    num_clients=n_clients,
    config=fl.server.ServerConfig(num_rounds=nrounds, round_timeout=None),
    client_resources=client_resources
)

end_time = time.time() # Tempo de término da execução
elapsed_time = end_time - start_time # Tempo decorrido
# Obtendo informações sobre os recursos após a execução
after_resources = resource.getrusage(resource.RUSAGE_SELF)
# Calculando a diferença entre os recursos antes e depois da execução
diff_resources = {
    "user_time": after_resources.ru_utime - before_resources.ru_utime,
    "system_time": after_resources.ru_stime - before_resources.ru_stime,
    "max_memory": after_resources.ru_maxrss - before_resources.ru_maxrss
}

# Criando um DataFrame com os dados
data = {
    "tempo_execucao": [elapsed_time],
    "user_time": [diff_resources["user_time"]],
    "system_time": [diff_resources["system_time"]],
    "max_memory": [diff_resources["max_memory"]]
}

df = pd.DataFrame(data)

path_results_resources = main_src + "results_federated/" + data_set_paritioned_src + str(NUM_CLIENTS) + "nodes/"

# Salvando o DataFrame em um arquivo CSV
df.to_csv(path_results_resources + "dados_execucao_updated.csv", index=False)


INFO flwr 2024-08-21 11:13:09,652 | app.py:178 | Starting Flower simulation, config: ServerConfig(num_rounds=16453, round_timeout=None)
2024-08-21 11:13:12,497	INFO worker.py:1621 -- Started a local Ray instance.
INFO flwr 2024-08-21 11:13:16,273 | app.py:213 | Flower VCE: Ray initialized with resources: {'node:10.40.40.70': 1.0, 'node:__internal_head__': 1.0, 'CPU': 32.0, 'object_store_memory': 73446545817.0, 'memory': 161375273575.0}
INFO flwr 2024-08-21 11:13:16,275 | app.py:219 | Optimize your simulation with Flower VCE: https://flower.dev/docs/framework/how-to-run-simulations.html
INFO flwr 2024-08-21 11:13:16,276 | app.py:242 | Flower VCE: Resources for each Virtual Client: {'num_cpus': 4}
INFO flwr 2024-08-21 11:13:16,308 | app.py:288 | Flower VCE: Creating VirtualClientEngineActorPool with 8 actors
INFO flwr 2024-08-21 11:13:16,310 | server.py:89 | Initializing global parameters
INFO flwr 2024-08-21 11:13:16,312 | server.py:276 | Requesting initial parameters from one random cl

In [None]:
from submit_email import send_email
send_email("FederatedLossyCounting " + str(NUM_CLIENTS) + " nodes")