In [1]:
import collections
import numpy as np
import tensorflow as tf
import tensorflow_federated as tff
from scipy.stats import ks_2samp, chi2_contingency
import concurrent.futures

# Load the MNIST dataset
mnist = tf.keras.datasets.mnist
(X_train, Y_train), (X_test, Y_test) = mnist.load_data()

# Normalize the data
X_train, X_test = X_train / 255.0, X_test / 255.0

# Define a model creation function
def create_model():
    model = tf.keras.Sequential([
        tf.keras.layers.Flatten(input_shape=(28, 28)),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(10, activation='softmax')
    ])
    return model

# Convert Keras model to TFF model
def model_fn():
    keras_model = create_model()
    return tff.learning.from_keras_model(
        keras_model,
        input_spec=client_datasets[0].element_spec,
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=[tf.keras.metrics.SparseCategoricalAccuracy()]
    )

# Create dummy client datasets
num_clients = 10
client_datasets = [tf.data.Dataset.from_tensor_slices((np.random.rand(100, 28, 28), np.random.randint(0, 10, 100))).batch(10) for _ in range(num_clients)]

# Create federated learning algorithms for each client
federated_algorithms = [tff.learning.build_federated_averaging_process(
    model_fn,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02),
    server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0)
) for _ in range(num_clients)]

# Function to run the training in a separate event loop
def run_training():
    tff.backends.native.set_local_execution_context()

    states = [algorithm.initialize() for algorithm in federated_algorithms]

    # Train the models on the clients' data
    for round_num in range(1, 11):
        for client_id in range(num_clients):
            states[client_id], _ = federated_algorithms[client_id].next(states[client_id], [client_datasets[client_id]])

        # Perform differential testing for each pair of clients
        for i in range(num_clients):
            for j in range(i+1, num_clients):
                # Get the model weights after training
                weights_i = states[i].model.trainable
                weights_j = states[j].model.trainable

                # Evaluate both models on train and test data
                train_predictions_i, train_labels = evaluate_model_on_data(weights_i, create_model, X_train, Y_train)
                test_predictions_i, test_labels = evaluate_model_on_data(weights_i, create_model, X_test, Y_test)
                train_predictions_j, _ = evaluate_model_on_data(weights_j, create_model, X_train, Y_train)
                test_predictions_j, _ = evaluate_model_on_data(weights_j, create_model, X_test, Y_test)

                # Perform differential testing
                print(f"Comparison between Client {i} and Client {j}:")
                perform_differential_testing(train_predictions_i, train_predictions_j, train_labels, "Train")
                perform_differential_testing(test_predictions_i, test_predictions_j, test_labels, "Test")
                print()

# Function to evaluate the model on data
def evaluate_model_on_data(weights, create_model_fn, X, Y):
    model = create_model_fn()
    model.set_weights(weights)
    predictions = model.predict(X)
    return predictions, Y

# Function to perform differential testing
def perform_differential_testing(predictions_i, predictions_j, labels, data_type):
    # Criterion 1: Absolute differences between classes
    pred_class_i = np.argmax(predictions_i, axis=1)
    pred_class_j = np.argmax(predictions_j, axis=1)
    Δ_class = np.sum(pred_class_i != pred_class_j)

    # Criterion 2: Absolute differences between scores
    Δ_score = np.sum(predictions_i != predictions_j)

    # Criterion 3: Significance of difference between scores
    P_KS = ks_2samp(predictions_i.flatten(), predictions_j.flatten()).pvalue

    # Criterion 4: Significance of difference between classifications
    contingency = np.array([[np.sum((pred_class_i == k) & (pred_class_j == l)) for l in range(10)] for k in range(10)])
    contingency += 1  # Add-one smoothing
    P_X2 = chi2_contingency(contingency)[1]

    print(f"{data_type} Data:")
    print(f"Δ_class: {Δ_class}")
    print(f"Δ_score: {Δ_score}")
    print(f"P_KS: {P_KS}")
    print(f"P_X2: {P_X2}")
    
    if P_KS < 0.05 or P_X2 < 0.05:
        print("Warning: Significant difference detected (p-value < 0.05)")

# Run the training
with concurrent.futures.ThreadPoolExecutor() as executor:
    future = executor.submit(run_training)
    future.result()

Comparison between Client 0 and Client 1:
Train Data:
Δ_class: 49895
Δ_score: 600000
P_KS: 0.0
P_X2: 0.0
Test Data:
Δ_class: 8364
Δ_score: 100000
P_KS: 0.0
P_X2: 3.813648318831588e-108

Comparison between Client 0 and Client 2:
Train Data:
Δ_class: 54870
Δ_score: 600000
P_KS: 0.0
P_X2: 0.0
Test Data:
Δ_class: 9115
Δ_score: 100000
P_KS: 1.0063517584683998e-148
P_X2: 1.0481630182941481e-192

Comparison between Client 0 and Client 3:
Train Data:
Δ_class: 49584
Δ_score: 600000
P_KS: 0.0
P_X2: 0.0
Test Data:
Δ_class: 8267
Δ_score: 100000
P_KS: 1.079202831691285e-158
P_X2: 1.4638054479436146e-188

Comparison between Client 0 and Client 4:
Train Data:
Δ_class: 58859
Δ_score: 600000
P_KS: 0.0
P_X2: 0.0
Test Data:
Δ_class: 9805
Δ_score: 100000
P_KS: 0.0
P_X2: 1.1494759890888018e-136

Comparison between Client 0 and Client 5:
Train Data:
Δ_class: 59138
Δ_score: 600000
P_KS: 6.417091181944459e-76
P_X2: 0.0
Test Data:
Δ_class: 9859
Δ_score: 100000
P_KS: 4.68045860427684e-25
P_X2: 9.12937404563366e

This code implements a federated learning setup where each client has its own model implementation. It then performs differential testing to compare the behavior of these different implementations, both on training and test data. This allows for detecting significant differences or potential errors in the various client implementations of the same algorithm.

Imports:
The code imports necessary libraries including TensorFlow, TensorFlow Federated, NumPy, SciPy, and concurrent.futures.
Data Loading and Preprocessing:

The MNIST dataset is loaded using TensorFlow's keras API.
The pixel values are normalized by dividing by 255.0.


Model Definition:

A function create_model() defines a simple neural network using Keras Sequential API.
The model has a Flatten layer, a Dense layer with 128 units and ReLU activation, a Dropout layer, and a final Dense layer with 10 units (for 10 digit classes) and softmax activation.


TensorFlow Federated (TFF) Model Conversion:

The model_fn() function converts the Keras model to a TFF model.
It specifies the loss function (SparseCategoricalCrossentropy) and metrics (SparseCategoricalAccuracy).


Client Dataset Creation:

10 dummy client datasets are created using random data.
Each client has 100 samples of 28x28 images and corresponding labels.


Federated Learning Algorithms:

A separate federated learning algorithm is created for each client using tff.learning.build_federated_averaging_process().
This implements the Federated Averaging algorithm for each client.


Training Function (run_training()):

Sets up the TFF local execution context.
Initializes the state for each client's federated learning algorithm.
Runs 10 rounds of training, where in each round:

Each client's model is updated using their own data.
After updating, differential testing is performed between each pair of clients.




Model Evaluation Function:

evaluate_model_on_data() takes model weights, creates a new model, sets the weights, and makes predictions on given data.


Differential Testing Function:

perform_differential_testing() compares predictions from two client models.
It calculates four criteria:
a. Δ_class: The number of samples where the two models predict different classes.
b. Δ_score: The sum of absolute differences between the models' prediction scores.
c. P_KS: p-value from a Kolmogorov-Smirnov test comparing the distribution of prediction scores.
d. P_X2: p-value from a Chi-squared test comparing the distribution of predicted classes.
It prints these values and warns if either p-value is less than 0.05, indicating a significant difference between the models.


Main Execution:

The training function is run in a separate thread using concurrent.futures.ThreadPoolExecutor.