In [21]:
import os
import pandas as pd
import numpy as np
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from sklearn.utils import shuffle

import yaml
from config import CONFIG
import ast

from tensorflow.keras.utils import load_img
from tensorflow.keras.preprocessing.image import img_to_array

## Create an experiment dir and keep a copy of the yaml file in it

In [2]:
def save_dict_as_yaml(data_dict, directory_path, filename):
    """
    Create a directory if it doesn't exist and save a dictionary as a YAML file in it.
    
    Args:
        data_dict (dict): The dictionary to save
        directory_path (str): Path to the directory
        filename (str): Name of the YAML file
    
    Returns:
        str: Path to the saved file
    """
    # Create directory if it doesn't exist
    if not os.path.exists(directory_path):
        os.makedirs(directory_path)
        print(f"Created directory: {directory_path}")
    
    # Create the full file path
    file_path = os.path.join(directory_path, filename)
    
    # Save the dictionary as YAML
    with open(file_path, 'w') as file:
        yaml.dump(data_dict, file, default_flow_style=False, sort_keys=False)
    
    print(f"Saved YAML file to: {file_path}")
    return file_path
save_dict_as_yaml(CONFIG, "../experiments/"+CONFIG['experiment_name'], "config.yaml")

Saved YAML file to: ../experiments/fed_ml_experiment_1/config.yaml


'../experiments/fed_ml_experiment_1/config.yaml'

In [3]:
EXPERIMENT_DIR = '../experiments/'+CONFIG['experiment_name']

## Load data

In [4]:
def create_image_dataframe(data_dir, class_names=None, extensions={'.jpg', '.jpeg', '.png'}):
    """
    Scans a directory with subfolders for each class and returns a DataFrame
    with image file paths and corresponding class labels.

    Args:
        data_dir (str or Path): Root directory containing class subfolders.
        class_names (list[str], optional): If provided, filters only these subfolders.
        extensions (set): Allowed image file extensions.

    Returns:
        pd.DataFrame: DataFrame with columns ['filepath', 'label']
    """
    data = []
    data_dir = Path(data_dir).resolve()  # Ensures it's absolute
    print(data_dir)
    if not data_dir.exists():
        raise FileNotFoundError(f"Directory does not exist: {data_dir}")

    for class_folder in data_dir.iterdir():
        if class_folder.is_dir():
            label = class_folder.name
            if class_names and label not in class_names:
                continue

            for file in class_folder.glob("*"):
                if file.suffix.lower() in extensions:
                    data.append((str(file), label))

    df = pd.DataFrame(data, columns=["filepath", "label"])
    df.to_csv("../data/train.csv",index=False)
    return shuffle(df).reset_index(drop=True)

In [5]:
train_path = "../data/train"  # Adjust based on actual path
class_list = ["mask", "no_mask"]
try:
    df = create_image_dataframe(train_path, class_names=class_list)
    print(df.head())
except FileNotFoundError as e:
    print(f"[ERROR] {e}")

/mnt/c/Users/yashb/downloads/Yash Gupta Pattern Recognition Project/ml_fed_project/ml_fed_project/data/train
                                            filepath    label
0  /mnt/c/Users/yashb/downloads/Yash Gupta Patter...  no_mask
1  /mnt/c/Users/yashb/downloads/Yash Gupta Patter...     mask
2  /mnt/c/Users/yashb/downloads/Yash Gupta Patter...  no_mask
3  /mnt/c/Users/yashb/downloads/Yash Gupta Patter...  no_mask
4  /mnt/c/Users/yashb/downloads/Yash Gupta Patter...  no_mask


## Create folds for clients based on csv

In [9]:
def generate_k_folds(df: pd.DataFrame, num_splits: int, random_state: int = CONFIG['random_seed']):
    """
    Splits a DataFrame into k stratified (or regular) folds.

    Args:
        df (pd.DataFrame): The input dataframe to split.
        num_splits (int): Number of folds.
        random_state (int): Seed for reproducible shuffling.

    Returns:
        List[pd.DataFrame]: A list of DataFrames, each representing one fold.
    """
    if num_splits < 2:
        raise ValueError("num_splits must be at least 2")

    shuffled_df = shuffle(df, random_state=random_state)
    indices = np.array_split(shuffled_df.index, num_splits)
    return [shuffled_df.loc[idx].reset_index(drop=True) for idx in indices]

In [10]:
num_folds = (CONFIG['num_clients']+1)*CONFIG['num_rounds']+1
folds = generate_k_folds(df,num_folds)

## Save folds for each client

In [None]:
def save_folds_with_init_balanced(folds, num_clients, save_dir):
    """
    Saves a list of folds into a structured directory with:
    - `data/init.csv` for the first odd fold.
    - `data/client_X/` folders each containing an equal number of folds.

    Args:
        folds (list[pd.DataFrame]): List of k folds.
        num_clients (int): Number of clients.
        save_dir (str): Base directory where `data/` will be created.

    Returns:
        str: Path to the saved experiment data folder.
    """
    # Create main data directory
    data_dir = os.path.join(save_dir, "data")
    os.makedirs(data_dir, exist_ok=True)

    # Save the first fold as init.csv
    init_csv_path = os.path.join(data_dir, "init.csv")
    folds[0].to_csv(init_csv_path, index=False)
    print(f"✅ Saved Init Fold -> {init_csv_path}")

    # Remaining folds to distribute
    remaining_folds = folds[1:]
    num_remaining_folds = len(remaining_folds)

    # Ensure an even split among all clients
    folds_per_client = num_remaining_folds // num_clients  # Equal distribution

    # Save Client Folds
    current_idx = 0
    for client_id in range(1, num_clients + 1):
        client_dir = os.path.join(data_dir, f"client_{client_id}")
        os.makedirs(client_dir, exist_ok=True)

        for round_num in range(folds_per_client):
            fold_idx = current_idx + round_num
            fold_path = os.path.join(client_dir, f"round{round_num+1}.csv")
            remaining_folds[fold_idx].to_csv(fold_path, index=False)
            print(f"✅ Saved Client {client_id} Fold {round_num+1} -> {fold_path}")

        current_idx += folds_per_client

    return data_dir  # Return the path for reference
    
save_folds_with_init_balanced(folds, num_clients=config['num_clients'], save_dir=EXPERIMENT_DIR)

In [12]:
def save_folds_with_global(folds, num_clients, save_dir):
    """
    Saves a list of folds into a structured directory with:
    - `data/init.csv` for the first odd fold.
    - `data/global/` containing an equal number of folds as each client.
    - `data/client_X/` folders each containing an equal number of folds.

    Args:
        folds (list[pd.DataFrame]): List of k folds.
        num_clients (int): Number of clients.
        save_dir (str): Base directory where `data/` will be created.

    Returns:
        str: Path to the saved experiment data folder.
    """
    # Create main data directory
    data_dir = os.path.join(save_dir, "data")
    os.makedirs(data_dir, exist_ok=True)

    # Save the first fold as init.csv
    init_csv_path = os.path.join(data_dir, "init.csv")
    folds[0].to_csv(init_csv_path, index=False)
    print(f"✅ Saved Init Fold -> {init_csv_path}")

    # Remaining folds to distribute
    remaining_folds = folds[1:]
    num_remaining_folds = len(remaining_folds)

    # Total partitions (clients + global)
    total_partitions = num_clients + 1  # Clients + Global
    folds_per_partition = num_remaining_folds // total_partitions  # Equal distribution

    # Save Global Folds
    global_dir = os.path.join(data_dir, "global")
    os.makedirs(global_dir, exist_ok=True)
    for i in range(folds_per_partition):
        fold_path = os.path.join(global_dir, f"round{i+1}.csv")
        remaining_folds[i].to_csv(fold_path, index=False)
        print(f"✅ Saved Global Fold {i+1} -> {fold_path}")

    # Save Client Folds
    current_idx = folds_per_partition
    for client_id in range(1, num_clients + 1):
        client_dir = os.path.join(data_dir, f"client_{client_id}")
        os.makedirs(client_dir, exist_ok=True)

        for round_num in range(folds_per_partition):
            fold_idx = current_idx + round_num
            fold_path = os.path.join(client_dir, f"round{round_num+1}.csv")
            remaining_folds[fold_idx].to_csv(fold_path, index=False)
            print(f"✅ Saved Client {client_id} Fold {round_num+1} -> {fold_path}")

        current_idx += folds_per_partition

    return data_dir  # Return the path for reference
save_folds_with_global(folds, num_clients=CONFIG['num_clients'], save_dir=EXPERIMENT_DIR)

✅ Saved Init Fold -> ../experiments/fed_ml_experiment_1/data/init.csv
✅ Saved Global Fold 1 -> ../experiments/fed_ml_experiment_1/data/global/round1.csv
✅ Saved Global Fold 2 -> ../experiments/fed_ml_experiment_1/data/global/round2.csv
✅ Saved Global Fold 3 -> ../experiments/fed_ml_experiment_1/data/global/round3.csv
✅ Saved Global Fold 4 -> ../experiments/fed_ml_experiment_1/data/global/round4.csv
✅ Saved Global Fold 5 -> ../experiments/fed_ml_experiment_1/data/global/round5.csv
✅ Saved Global Fold 6 -> ../experiments/fed_ml_experiment_1/data/global/round6.csv
✅ Saved Global Fold 7 -> ../experiments/fed_ml_experiment_1/data/global/round7.csv
✅ Saved Global Fold 8 -> ../experiments/fed_ml_experiment_1/data/global/round8.csv
✅ Saved Global Fold 9 -> ../experiments/fed_ml_experiment_1/data/global/round9.csv
✅ Saved Global Fold 10 -> ../experiments/fed_ml_experiment_1/data/global/round10.csv
✅ Saved Client 1 Fold 1 -> ../experiments/fed_ml_experiment_1/data/client_1/round1.csv
✅ Saved Cli

'../experiments/fed_ml_experiment_1/data'

## Load and process images in folds

In [7]:
def process_image(image_path,label):
    try:
        input_shape = ast.literal_eval(CONFIG.get('image_size', '(100, 100, 3)'))
        image = load_img(image_path, target_size=(input_shape[0],input_shape[0]))
        image = img_to_array(image)
        image = image / 255.0  # Normalize to [0, 1]
        image_class = 1 if label =='mask' else 0
        return (image,image_class)
    except Exception as e:
        print(f"Error processing image at {image_path}: {e}")
        return None

In [19]:
import os
import pandas as pd


def process_image_safe(image_path,label):
    try:
        return process_image(image_path,label)
    except Exception as e:
        print(f"❌ Error processing {image_path}: {e}")
        return None

def process_data_fast(data_path, max_workers=8):
    df = pd.read_csv(data_path)
    filepaths = df['filepath'].tolist()
    labels = df['label'].tolist()
    processed = []
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(process_image_safe, path,label): path for path,label in zip(filepaths,labels)}
        for future in as_completed(futures):
            result = future.result()
            if result is not None:
                processed.append(result)
    return np.array(processed,dtype=object)

def save_processed_array(array, output_path):
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    np.save(output_path, array)
    print(f"💾 Saved processed data to {output_path}")

def process_and_save_folds(data_dir, max_workers=8):
    """
    Processes all folds and saves them under a new directory
    automatically inferred from `data_dir`.
    """
    # Infer output_dir from parent of data_dir
    parent_dir = os.path.dirname(data_dir.rstrip('/'))
    output_dir = os.path.join(parent_dir, 'processed_data')

    print(f"📁 Saving processed data to: {output_dir}")

    # Process init.csv
    init_path = os.path.join(data_dir, 'init.csv')
    if os.path.exists(init_path):
        processed = process_data_fast(init_path, max_workers)
        save_processed_array(processed, os.path.join(output_dir, 'init.npy'))

    # Process global folds
    global_dir = os.path.join(data_dir, 'global')
    if os.path.exists(global_dir):
        for fname in sorted(os.listdir(global_dir)):
            if fname.endswith('.csv'):
                round_idx = fname.replace('.csv', '')
                fpath = os.path.join(global_dir, fname)
                processed = process_data_fast(fpath, max_workers)
                save_processed_array(processed, os.path.join(output_dir, 'global', f"{round_idx}.npy"))

    # Process client folds
    for name in sorted(os.listdir(data_dir)):
        if name.startswith("client_"):
            client_dir = os.path.join(data_dir, name)

            for fname in sorted(os.listdir(client_dir)):
                if fname.endswith('.csv'):
                    round_idx = fname.replace('.csv', '')
                    fpath = os.path.join(client_dir, fname)
                    processed = process_data_fast(fpath, max_workers)
                    save_processed_array(processed, os.path.join(output_dir, name, f"{round_idx}.npy"))

    print(f"✅ All processed data saved under: {output_dir}")


In [20]:
process_and_save_folds('../experiments/fed_ml_experiment_1/data/')

📁 Saving processed data to: ../experiments/fed_ml_experiment_1/processed_data
💾 Saved processed data to ../experiments/fed_ml_experiment_1/processed_data/init.npy
💾 Saved processed data to ../experiments/fed_ml_experiment_1/processed_data/global/round1.npy
💾 Saved processed data to ../experiments/fed_ml_experiment_1/processed_data/global/round10.npy
💾 Saved processed data to ../experiments/fed_ml_experiment_1/processed_data/global/round2.npy
💾 Saved processed data to ../experiments/fed_ml_experiment_1/processed_data/global/round3.npy
💾 Saved processed data to ../experiments/fed_ml_experiment_1/processed_data/global/round4.npy
💾 Saved processed data to ../experiments/fed_ml_experiment_1/processed_data/global/round5.npy
💾 Saved processed data to ../experiments/fed_ml_experiment_1/processed_data/global/round6.npy
💾 Saved processed data to ../experiments/fed_ml_experiment_1/processed_data/global/round7.npy
💾 Saved processed data to ../experiments/fed_ml_experiment_1/processed_data/global/r