In [2]:
import tensorflow as tf
from tensorflow.keras import layers, models

# Use MirroredStrategy for distributed training simulation
strategy = tf.distribute.MirroredStrategy()

# Define the LeNet model
with strategy.scope():
    model = models.Sequential([
        layers.Input(shape=(32, 32, 1)),
        layers.Conv2D(6, kernel_size=(5, 5), activation='relu'),
        layers.AveragePooling2D(pool_size=(2, 2)),
        layers.Conv2D(16, kernel_size=(5, 5), activation='relu'),
        layers.AveragePooling2D(pool_size=(2, 2)),
        layers.Flatten(),
        layers.Dense(120, activation='relu'),
        layers.Dense(84, activation='relu'),
        layers.Dense(10, activation='softmax')
    ])
    model.compile(optimizer='adam',
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])

# Dataset preprocessing
def create_dataset():
    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.cifar10.load_data()
    x_train = tf.image.resize(x_train, (32, 32))
    x_test = tf.image.resize(x_test, (32, 32))
    x_train = tf.image.rgb_to_grayscale(x_train) / 255.0
    x_test = tf.image.rgb_to_grayscale(x_test) / 255.0
    train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).shuffle(5000).batch(64)
    test_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test)).batch(64)
    return train_dataset, test_dataset

# Prepare datasets
train_dataset, test_dataset = create_dataset()

# Distribute the dataset
train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)

# Train the model
model.fit(train_dist_dataset, epochs=5)

# Evaluate the model
loss, accuracy = model.evaluate(test_dist_dataset)
print(f"Test Accuracy: {accuracy}")


Epoch 1/5
[1m782/782[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m31s[0m 38ms/step - accuracy: 0.2758 - loss: 1.9607
Epoch 2/5
[1m782/782[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m42s[0m 39ms/step - accuracy: 0.4278 - loss: 1.6136
Epoch 3/5
[1m782/782[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m42s[0m 41ms/step - accuracy: 0.4677 - loss: 1.4942
Epoch 4/5
[1m782/782[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m38s[0m 38ms/step - accuracy: 0.4967 - loss: 1.4205
Epoch 5/5
[1m782/782[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m43s[0m 40ms/step - accuracy: 0.5197 - loss: 1.3550
[1m157/157[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 21ms/step - accuracy: 0.5279 - loss: 1.3480
Test Accuracy: 0.517799973487854


In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models

# Use MirroredStrategy for distributed training simulation
strategy = tf.distribute.MirroredStrategy()

# Define the LeNet model
with strategy.scope():
    model = models.Sequential([
        layers.Input(shape=(32, 32, 1)),  # Input layer
        layers.Conv2D(6, kernel_size=(5, 5), activation='relu'),  # Conv layer
        layers.AveragePooling2D(pool_size=(2, 2)),               # Pooling layer
        layers.Conv2D(16, kernel_size=(5, 5), activation='relu'), # Conv layer
        layers.AveragePooling2D(pool_size=(2, 2)),               # Pooling layer
        layers.Flatten(),                                        # Flatten
        layers.Dense(120, activation='relu'),                   # Dense layer
        layers.Dense(84, activation='relu'),                    # Dense layer
        layers.Dense(10, activation='softmax')                  # Output layer
    ])
    model.compile(optimizer='adam',
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])

# Dataset preparation
def create_dataset():
    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.cifar10.load_data()
    x_train = tf.image.resize(x_train, (32, 32))
    x_test = tf.image.resize(x_test, (32, 32))
    x_train = tf.image.rgb_to_grayscale(x_train) / 255.0
    x_test = tf.image.rgb_to_grayscale(x_test) / 255.0
    train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).shuffle(5000).batch(64)
    test_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test)).batch(64)
    return train_dataset, test_dataset

# Prepare datasets
train_dataset, test_dataset = create_dataset()

# Distribute the dataset
train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)

# Callback to observe gradients and weights
class GradientCallback(tf.keras.callbacks.Callback):
    def on_train_batch_end(self, batch, logs=None):
        print(f"\n--- Batch {batch} ---")
        for i, weight in enumerate(self.model.trainable_weights):
            print(f"Layer {i} weight shape: {weight.shape}")
            print(f"Sample weight values: {weight.numpy().flatten()[:5]}")  # Display first 5 weights
        print("------------------")

# Function to print data distribution across replicas
@tf.function
def print_batch_distribution(features, labels):
    replica_context = tf.distribute.get_replica_context()
    tf.print(f"Replica {replica_context.replica_id_in_sync_group}: "
             f"Feature shape: {tf.shape(features)}, Label shape: {tf.shape(labels)}")

# Observe how dataset is distributed
iterator = iter(train_dist_dataset)
features, labels = next(iterator)
strategy.run(print_batch_distribution, args=(features, labels))

# Train the model and observe updates
model.fit(train_dist_dataset, epochs=1, callbacks=[GradientCallback()])

# Evaluate the model
loss, accuracy = model.evaluate(test_dist_dataset)
print(f"Test Loss: {loss}")
print(f"Test Accuracy: {accuracy}")


In [4]:
import tensorflow as tf
from tensorflow.keras import layers, models
import numpy as np

# Define the LeNet model
def create_model():
    model = models.Sequential([
        layers.Input(shape=(32, 32, 1)),  # Input layer
        layers.Conv2D(6, kernel_size=(5, 5), activation='relu'),  # Conv layer
        layers.AveragePooling2D(pool_size=(2, 2)),               # Pooling layer
        layers.Conv2D(16, kernel_size=(5, 5), activation='relu'), # Conv layer
        layers.AveragePooling2D(pool_size=(2, 2)),               # Pooling layer
        layers.Flatten(),                                        # Flatten
        layers.Dense(120, activation='relu'),                   # Dense layer
        layers.Dense(84, activation='relu'),                    # Dense layer
        layers.Dense(10, activation='softmax')                  # Output layer
    ])
    return model

# Simulate multiple workers
class Worker:
    def __init__(self, worker_id, model):
        self.worker_id = worker_id
        self.model = model
        self.optimizer = tf.keras.optimizers.Adam()
        print(f"Worker {worker_id} initialized.")

    def compute_gradients(self, features, labels):
        with tf.GradientTape() as tape:
            predictions = self.model(features, training=True)
            loss = tf.keras.losses.sparse_categorical_crossentropy(labels, predictions)
            loss = tf.reduce_mean(loss)
        gradients = tape.gradient(loss, self.model.trainable_variables)
        return gradients, loss

    def set_parameters(self, parameters):
        self.model.set_weights(parameters)

# Central Parameter Server
class ParameterServer:
    def __init__(self, model):
        self.model = model
        print("Parameter Server initialized.")

    def get_parameters(self):
        return self.model.get_weights()

    def update_parameters(self, aggregated_gradients):
        optimizer = tf.keras.optimizers.Adam()  # Use a single optimizer for the parameter server
        optimizer.apply_gradients(zip(aggregated_gradients, self.model.trainable_variables))
        print("Updated parameters on the Parameter Server.")

# Dataset preparation
def create_dataset():
    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.cifar10.load_data()
    x_train = tf.image.resize(x_train, (32, 32))
    x_test = tf.image.resize(x_test, (32, 32))
    x_train = tf.image.rgb_to_grayscale(x_train) / 255.0
    x_test = tf.image.rgb_to_grayscale(x_test) / 255.0
    train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).shuffle(5000).batch(64)
    test_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test)).batch(64)
    return train_dataset, test_dataset

# Initialize components
train_dataset, test_dataset = create_dataset()
global_model = create_model()
parameter_server = ParameterServer(global_model)

# Simulate 2 workers
workers = [Worker(worker_id=i, model=create_model()) for i in range(2)]

# Training loop
epochs = 1
for epoch in range(epochs):
    print(f"\nEpoch {epoch + 1}/{epochs}")
    for step, (features, labels) in enumerate(train_dataset):
        # Split data across workers (simple round-robin split for simulation)
        worker_data = np.array_split(features.numpy(), len(workers))
        worker_labels = np.array_split(labels.numpy(), len(workers))

        # Each worker computes gradients
        worker_gradients = []
        losses = []
        for worker, data, lbl in zip(workers, worker_data, worker_labels):
            print(f"Worker {worker.worker_id} processing data batch {step}")
            gradients, loss = worker.compute_gradients(tf.convert_to_tensor(data), tf.convert_to_tensor(lbl))
            worker_gradients.append(gradients)
            losses.append(loss.numpy())
            print(f"Worker {worker.worker_id} gradients (layer 0 sample): {gradients[0].numpy().flatten()[:5]}")

        # Aggregate gradients on the parameter server
        aggregated_gradients = [
            tf.reduce_mean([grad[i] for grad in worker_gradients], axis=0)
            for i in range(len(worker_gradients[0]))
        ]
        print(f"Aggregated gradients (layer 0 sample): {aggregated_gradients[0].numpy().flatten()[:5]}")

        # Update parameters on the parameter server
        print(f"Parameters before update (layer 0 sample): {parameter_server.get_parameters()[0].flatten()[:5]}")
        parameter_server.update_parameters(aggregated_gradients)
        print(f"Parameters after update (layer 0 sample): {parameter_server.get_parameters()[0].flatten()[:5]}")

        # Broadcast updated parameters to workers
        for worker in workers:
            worker.set_parameters(parameter_server.get_parameters())
            print(f"Worker {worker.worker_id} synchronized with updated parameters.")

        if step % 10 == 0:
            print(f"Step {step}, Losses: {losses}")

# Compile the global model for evaluation
global_model.compile(optimizer='adam',
                     loss='sparse_categorical_crossentropy',
                     metrics=['accuracy'])

# Evaluate the global model
test_loss, test_acc = global_model.evaluate(test_dataset)
print(f"Test Loss: {test_loss}")
print(f"Test Accuracy: {test_acc}")


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Updated parameters on the Parameter Server.
Parameters after update (layer 0 sample): [0.14592758 0.00917801 0.15775949 0.04945175 0.03161769]
Worker 0 synchronized with updated parameters.
Worker 1 synchronized with updated parameters.
Worker 0 processing data batch 289
Worker 0 gradients (layer 0 sample): [ 0.30667242  0.5697729  -0.44856307 -1.6287508  -0.15269202]
Worker 1 processing data batch 289
Worker 1 gradients (layer 0 sample): [ 0.2858757   0.5037129  -0.39404172 -1.5788977  -0.2402962 ]
Aggregated gradients (layer 0 sample): [ 0.29627407  0.5367429  -0.42130238 -1.6038243  -0.1964941 ]
Parameters before update (layer 0 sample): [0.14592758 0.00917801 0.15775949 0.04945175 0.03161769]
Updated parameters on the Parameter Server.
Parameters after update (layer 0 sample): [0.14492759 0.00817803 0.15875947 0.05045174 0.03261766]
Worker 0 synchronized with updated parameters.
Worker 1 synchronized with updated para

In [8]:
import tensorflow as tf
from tensorflow.keras import layers, models

# Use MirroredStrategy for parallel training simulation
strategy = tf.distribute.MirroredStrategy()

# Define the LeNet model
def create_model():
    return models.Sequential([
        layers.Input(shape=(32, 32, 1)),  # Input layer
        layers.Conv2D(6, kernel_size=(5, 5), activation='relu'),  # Conv layer
        layers.AveragePooling2D(pool_size=(2, 2)),               # Pooling layer
        layers.Conv2D(16, kernel_size=(5, 5), activation='relu'), # Conv layer
        layers.AveragePooling2D(pool_size=(2, 2)),               # Pooling layer
        layers.Flatten(),                                        # Flatten
        layers.Dense(120, activation='relu'),                   # Dense layer
        layers.Dense(84, activation='relu'),                    # Dense layer
        layers.Dense(10, activation='softmax')                  # Output layer
    ])

# Dataset preparation
def create_dataset():
    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.cifar10.load_data()
    x_train = tf.image.resize(x_train, (32, 32))
    x_test = tf.image.resize(x_test, (32, 32))
    x_train = tf.image.rgb_to_grayscale(x_train) / 255.0
    x_test = tf.image.rgb_to_grayscale(x_test) / 255.0
    train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).shuffle(5000).batch(64)
    test_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test)).batch(64)
    return train_dataset, test_dataset

# Create datasets
train_dataset, test_dataset = create_dataset()

# Parallel training setup
with strategy.scope():
    model = create_model()
    model.compile(optimizer='adam',
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])

# Train the model
model.fit(train_dataset, epochs=1)

# Evaluate the model
test_loss, test_acc = model.evaluate(test_dataset)
print(f"Test Loss: {test_loss}")
print(f"Test Accuracy: {test_acc}")


[1m782/782[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m58s[0m 36ms/step - accuracy: 0.2773 - loss: 1.9937
[1m157/157[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 12ms/step - accuracy: 0.4021 - loss: 1.6785
Test Loss: 1.688963532447815
Test Accuracy: 0.39489999413490295


In [11]:
import tensorflow as tf
from tensorflow.keras import layers, models
import os

# Set up the TF_CONFIG for simulating three workers
os.environ["TF_CONFIG"] = """
{
    "cluster": {
        "worker": ["localhost:12345", "localhost:12346", "localhost:12347"]
    },
    "task": {"type": "worker", "index": 0}
}
"""

# Initialize MultiWorkerMirroredStrategy
strategy = tf.distribute.MultiWorkerMirroredStrategy()

# Log the number of replicas (workers)
num_workers = strategy.num_replicas_in_sync
print(f"Number of workers (replicas): {num_workers}")

# Define the LeNet model
def create_model():
    return models.Sequential([
        layers.Input(shape=(32, 32, 1)),  # Input layer
        layers.Conv2D(6, kernel_size=(5, 5), activation='relu'),  # Conv layer
        layers.AveragePooling2D(pool_size=(2, 2)),               # Pooling layer
        layers.Conv2D(16, kernel_size=(5, 5), activation='relu'), # Conv layer
        layers.AveragePooling2D(pool_size=(2, 2)),               # Pooling layer
        layers.Flatten(),                                        # Flatten
        layers.Dense(120, activation='relu'),                   # Dense layer
        layers.Dense(84, activation='relu'),                    # Dense layer
        layers.Dense(10, activation='softmax')                  # Output layer
    ])

# Dataset preparation
def create_dataset():
    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.cifar10.load_data()
    x_train = tf.image.resize(x_train, (32, 32))
    x_test = tf.image.resize(x_test, (32, 32))
    x_train = tf.image.rgb_to_grayscale(x_train) / 255.0
    x_test = tf.image.rgb_to_grayscale(x_test) / 255.0
    train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).shuffle(5000).batch(64)
    test_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test)).batch(64)
    return train_dataset, test_dataset

# Create datasets
train_dataset, test_dataset = create_dataset()

# Parallel training setup
with strategy.scope():
    model = create_model()
    model.compile(optimizer='adam',
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])

# Train the model
print("\nStarting training...")
model.fit(train_dataset, epochs=1)

# Evaluate the model
print("\nEvaluating the model...")
test_loss, test_acc = model.evaluate(test_dataset)
print(f"Test Loss: {test_loss}")
print(f"Test Accuracy: {test_acc}")


RuntimeError: Collective ops must be configured at program startup

In [5]:
import tensorflow as tf
from tensorflow.keras import layers, models
import numpy as np
import threading

# Define the LeNet model
def create_model():
    return models.Sequential([
        layers.Input(shape=(32, 32, 1)),  # Input layer
        layers.Conv2D(6, kernel_size=(5, 5), activation='relu'),  # Conv layer
        layers.AveragePooling2D(pool_size=(2, 2)),               # Pooling layer
        layers.Conv2D(16, kernel_size=(5, 5), activation='relu'), # Conv layer
        layers.AveragePooling2D(pool_size=(2, 2)),               # Pooling layer
        layers.Flatten(),                                        # Flatten
        layers.Dense(120, activation='relu'),                   # Dense layer
        layers.Dense(84, activation='relu'),                    # Dense layer
        layers.Dense(10, activation='softmax')                  # Output layer
    ])

# Dataset preparation
def create_dataset():
    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.cifar10.load_data()
    x_train = tf.image.resize(x_train, (32, 32))
    x_test = tf.image.resize(x_test, (32, 32))
    x_train = tf.image.rgb_to_grayscale(x_train) / 255.0
    x_test = tf.image.rgb_to_grayscale(x_test) / 255.0
    train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).shuffle(5000).batch(64)
    test_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test)).batch(64)
    return train_dataset, test_dataset

# Worker class
class Worker:
    def __init__(self, worker_id, model, dataset, parameter_server):
        self.worker_id = worker_id
        self.model = model
        self.dataset = dataset
        self.parameter_server = parameter_server
        self.optimizer = tf.keras.optimizers.Adam()

    def compute_and_send_gradients(self, features, labels):
        with tf.GradientTape() as tape:
            predictions = self.model(features, training=True)
            loss = tf.keras.losses.sparse_categorical_crossentropy(labels, predictions)
            loss = tf.reduce_mean(loss)
        gradients = tape.gradient(loss, self.model.trainable_variables)
        self.parameter_server.receive_gradients(self.worker_id, gradients)
        return loss

    def run(self):
        for step, (features, labels) in enumerate(self.dataset):
            features = tf.convert_to_tensor(features)
            labels = tf.convert_to_tensor(labels)
            loss = self.compute_and_send_gradients(features, labels)
            print(f"Worker {self.worker_id} - Step {step}, Loss: {loss.numpy()}")

# Parameter server
class ParameterServer:
    def __init__(self, model):
        self.model = model
        self.gradient_accumulator = [tf.zeros_like(var) for var in model.trainable_variables]
        self.lock = threading.Lock()

    def receive_gradients(self, worker_id, gradients):
        with self.lock:
            for i, grad in enumerate(gradients):
                self.gradient_accumulator[i] += grad
            print(f"Parameter Server - Received gradients from Worker {worker_id}")

    def apply_gradients(self):
        optimizer = tf.keras.optimizers.Adam()
        with self.lock:
            optimizer.apply_gradients(zip(self.gradient_accumulator, self.model.trainable_variables))
            self.gradient_accumulator = [tf.zeros_like(var) for var in self.model.trainable_variables]
            print("Parameter Server - Gradients applied and parameters updated")

# Parallel training setup
train_dataset, test_dataset = create_dataset()
global_model = create_model()
parameter_server = ParameterServer(global_model)

# Split dataset for workers
worker_datasets = [train_dataset.shard(num_shards=3, index=i) for i in range(3)]
workers = [Worker(worker_id=i, model=create_model(), dataset=worker_datasets[i], parameter_server=parameter_server) for i in range(3)]

# Run workers in parallel
threads = []
for worker in workers:
    thread = threading.Thread(target=worker.run)
    threads.append(thread)
    thread.start()

# Wait for all workers to finish
for thread in threads:
    thread.join()

# Apply accumulated gradients on the parameter server
parameter_server.apply_gradients()

# Evaluate the global model
global_model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
test_loss, test_acc = global_model.evaluate(test_dataset)
print(f"Test Loss: {test_loss}")
print(f"Test Accuracy: {test_acc}")


Parameter Server - Received gradients from Worker 2
Worker 2 - Step 0, Loss: 2.304616928100586
Parameter Server - Received gradients from Worker 1
Worker 1 - Step 0, Loss: 2.307723045349121
Parameter Server - Received gradients from Worker 2
Worker 2 - Step 1, Loss: 2.3021416664123535
Parameter Server - Received gradients from Worker 0
Worker 0 - Step 0, Loss: 2.3018152713775635
Parameter Server - Received gradients from Worker 1
Worker 1 - Step 1, Loss: 2.3013782501220703
Parameter Server - Received gradients from Worker 2
Worker 2 - Step 2, Loss: 2.2952466011047363
Parameter Server - Received gradients from Worker 0
Worker 0 - Step 1, Loss: 2.3033993244171143
Parameter Server - Received gradients from Worker 1
Worker 1 - Step 2, Loss: 2.303593158721924
Parameter Server - Received gradients from Worker 2
Worker 2 - Step 3, Loss: 2.2999205589294434
Parameter Server - Received gradients from Worker 0
Worker 0 - Step 2, Loss: 2.304145336151123
Parameter Server - Received gradients from W

Worker 0 gets the first shard.
Worker 1 gets the second shard.
Worker 2 gets the third shard.

In [6]:
import tensorflow as tf
from tensorflow.keras import layers, models
import numpy as np

# Define the model
def create_model():
    return models.Sequential([
        layers.Input(shape=(32, 32, 1)),  # Input layer
        layers.Conv2D(6, kernel_size=(5, 5), activation='relu'),  # Worker 0 updates this
        layers.AveragePooling2D(pool_size=(2, 2)),                # Worker 0 skips this (no weights)
        layers.Conv2D(16, kernel_size=(5, 5), activation='relu'), # Worker 1 updates this
        layers.AveragePooling2D(pool_size=(2, 2)),                # Worker 1 skips this (no weights)
        layers.Flatten(),                                         # Worker 1 skips this (no weights)
        layers.Dense(120, activation='relu'),                    # Worker 1 updates this
        layers.Dense(84, activation='relu'),                     # Worker 1 updates this
        layers.Dense(10, activation='softmax')                   # Worker 1 updates this
    ])

# Dataset preparation
def create_dataset():
    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.cifar10.load_data()
    x_train = tf.image.resize(x_train, (32, 32))
    x_test = tf.image.resize(x_test, (32, 32))
    x_train = tf.image.rgb_to_grayscale(x_train) / 255.0
    x_test = tf.image.rgb_to_grayscale(x_test) / 255.0
    train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).shuffle(5000).batch(64)
    test_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test)).batch(64)
    return train_dataset, test_dataset

# Custom Worker Class
class Worker:
    def __init__(self, worker_id, model, layers_to_update, dataset):
        self.worker_id = worker_id
        self.model = model
        self.layers_to_update = layers_to_update  # Indices of layers this worker updates
        self.dataset = dataset
        self.optimizer = tf.keras.optimizers.Adam()

    def compute_and_update(self, features, labels):
        with tf.GradientTape(persistent=True) as tape:
            predictions = self.model(features, training=True)
            loss = tf.keras.losses.sparse_categorical_crossentropy(labels, predictions)
            loss = tf.reduce_mean(loss)

        # Compute gradients only for the assigned layers with trainable weights
        gradients = []
        variables = []
        for i in self.layers_to_update:
            if i < len(self.model.layers) and self.model.layers[i].trainable_weights:
                gradients.extend(tape.gradient(loss, self.model.layers[i].trainable_weights))
                variables.extend(self.model.layers[i].trainable_weights)

        # Apply gradients if there are any
        if gradients and variables:
            self.optimizer.apply_gradients(zip(gradients, variables))
            print(f"Worker {self.worker_id} updated layers: {self.layers_to_update}")
        else:
            print(f"Worker {self.worker_id} found no trainable variables to update.")

        del tape
        return loss

    def run(self):
        for step, (features, labels) in enumerate(self.dataset):
            features = tf.convert_to_tensor(features)
            labels = tf.convert_to_tensor(labels)
            loss = self.compute_and_update(features, labels)
            print(f"Worker {self.worker_id} - Step {step}, Loss: {loss.numpy()}")

# Create datasets
train_dataset, test_dataset = create_dataset()

# Verify model structure
global_model = create_model()
global_model.summary()

# Assign layers to workers based on the model summary
worker_0_layers = [0]  # Conv2D(6) for Worker 0
worker_1_layers = [2, 5, 6, 7]  # Conv2D(16) and Dense layers for Worker 1

# Split dataset for workers
worker_0_dataset = train_dataset.shard(num_shards=2, index=0)
worker_1_dataset = train_dataset.shard(num_shards=2, index=1)

# Create workers
worker_0 = Worker(worker_id=0, model=global_model, layers_to_update=worker_0_layers, dataset=worker_0_dataset)
worker_1 = Worker(worker_id=1, model=global_model, layers_to_update=worker_1_layers, dataset=worker_1_dataset)

# Run workers
worker_0.run()
worker_1.run()

# Evaluate the global model
global_model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
test_loss, test_acc = global_model.evaluate(test_dataset)
print(f"Test Loss: {test_loss}")
print(f"Test Accuracy: {test_acc}")


Worker 0 updated layers: [0]
Worker 0 - Step 0, Loss: 2.3132429122924805
Worker 0 updated layers: [0]
Worker 0 - Step 1, Loss: 2.3208091259002686
Worker 0 updated layers: [0]
Worker 0 - Step 2, Loss: 2.3052821159362793
Worker 0 updated layers: [0]
Worker 0 - Step 3, Loss: 2.3051953315734863
Worker 0 updated layers: [0]
Worker 0 - Step 4, Loss: 2.311065435409546
Worker 0 updated layers: [0]
Worker 0 - Step 5, Loss: 2.3174688816070557
Worker 0 updated layers: [0]
Worker 0 - Step 6, Loss: 2.297121047973633
Worker 0 updated layers: [0]
Worker 0 - Step 7, Loss: 2.2987008094787598
Worker 0 updated layers: [0]
Worker 0 - Step 8, Loss: 2.303971529006958
Worker 0 updated layers: [0]
Worker 0 - Step 9, Loss: 2.311938524246216
Worker 0 updated layers: [0]
Worker 0 - Step 10, Loss: 2.3091397285461426
Worker 0 updated layers: [0]
Worker 0 - Step 11, Loss: 2.2956953048706055
Worker 0 updated layers: [0]
Worker 0 - Step 12, Loss: 2.29311466217041
Worker 0 updated layers: [0]
Worker 0 - Step 13, Loss:

In [19]:
import tensorflow as tf
from tensorflow.keras import layers, models
import numpy as np
import csv
import threading

# Define the model
def create_model():
    return models.Sequential([
        layers.Input(shape=(32, 32, 1)),  # Input layer
        layers.Conv2D(6, kernel_size=(5, 5), activation='relu'),  # Worker 0 updates this
        layers.AveragePooling2D(pool_size=(2, 2)),                # Worker 0 skips this (no weights)
        layers.Conv2D(16, kernel_size=(5, 5), activation='relu'), # Worker 1 updates this
        layers.AveragePooling2D(pool_size=(2, 2)),                # Worker 1 skips this (no weights)
        layers.Flatten(),                                         # Worker 1 skips this (no weights)
        layers.Dense(120, activation='relu'),                    # Worker 1 updates this
        layers.Dense(84, activation='relu'),                     # Worker 1 updates this
        layers.Dense(10, activation='softmax')                   # Worker 1 updates this
    ])

# Dataset preparation
def create_dataset():
    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.cifar10.load_data()
    x_train = tf.image.resize(x_train, (32, 32))
    x_test = tf.image.resize(x_test, (32, 32))
    x_train = tf.image.rgb_to_grayscale(x_train) / 255.0
    x_test = tf.image.rgb_to_grayscale(x_test) / 255.0
    train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).shuffle(5000).batch(64)
    test_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test)).batch(64)
    return train_dataset, test_dataset

# Save weights to CSV
def save_weights_to_csv(worker_id, layer_weights, filename):
    with open(filename, mode='a', newline='') as file:
        writer = csv.writer(file)
        for layer, weights in layer_weights.items():
            writer.writerow([f"Worker {worker_id}", layer, *weights.flatten()[:10]])  # Save first 10 weights

# Custom Worker Class
class Worker:
    def __init__(self, worker_id, model, layers_to_update, dataset, csv_file):
        self.worker_id = worker_id
        self.model = model
        self.layers_to_update = layers_to_update  # Indices of layers this worker updates
        self.dataset = dataset
        self.csv_file = csv_file
        self.optimizer = tf.keras.optimizers.Adam()

    def compute_and_update(self, features, labels):
        with tf.GradientTape(persistent=True) as tape:
            predictions = self.model(features, training=True)
            loss = tf.keras.losses.sparse_categorical_crossentropy(labels, predictions)
            loss = tf.reduce_mean(loss)

        # Compute gradients only for the assigned layers with trainable weights
        gradients = []
        variables = []
        layer_weights = {}
        for i in self.layers_to_update:
            if i < len(self.model.layers) and self.model.layers[i].trainable_weights:
                gradients.extend(tape.gradient(loss, self.model.layers[i].trainable_weights))
                variables.extend(self.model.layers[i].trainable_weights)
                # Log weights
                layer_weights[f"Layer {i}"] = self.model.layers[i].trainable_weights[0].numpy()

        # Apply gradients if there are any
        if gradients and variables:
            self.optimizer.apply_gradients(zip(gradients, variables))
            save_weights_to_csv(self.worker_id, layer_weights, self.csv_file)
            print(f"Worker {self.worker_id} updated layers: {self.layers_to_update}")
        else:
            print(f"Worker {self.worker_id} found no trainable variables to update.")

        del tape
        return loss

    def run(self):
        for step, (features, labels) in enumerate(self.dataset):
            features = tf.convert_to_tensor(features)
            labels = tf.convert_to_tensor(labels)
            loss = self.compute_and_update(features, labels)
            print(f"Worker {self.worker_id} - Step {step}, Loss: {loss.numpy()}")

# Create datasets
train_dataset, test_dataset = create_dataset()

# Verify model structure
global_model = create_model()
global_model.summary()

# Assign layers to workers based on the model summary
worker_0_layers = [0]  # Conv2D(6) for Worker 0
worker_1_layers = [2, 5, 6, 7]  # Conv2D(16) and Dense layers for Worker 1

# Split dataset for workers
worker_0_dataset = train_dataset.shard(num_shards=2, index=0)
worker_1_dataset = train_dataset.shard(num_shards=2, index=1)

# Create workers with CSV file names
worker_0 = Worker(worker_id=0, model=global_model, layers_to_update=worker_0_layers, dataset=worker_0_dataset, csv_file='worker_0_weights.csv')
worker_1 = Worker(worker_id=1, model=global_model, layers_to_update=worker_1_layers, dataset=worker_1_dataset, csv_file='worker_1_weights.csv')

# Parallel worker execution using threading
def run_worker(worker):
    worker.run()

# Create threads for workers
worker_0_thread = threading.Thread(target=run_worker, args=(worker_0,))
worker_1_thread = threading.Thread(target=run_worker, args=(worker_1,))

# Start threads
worker_0_thread.start()
worker_1_thread.start()

# Wait for both workers to finish
worker_0_thread.join()
worker_1_thread.join()

# Evaluate the global model
global_model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
test_loss, test_acc = global_model.evaluate(test_dataset)
print(f"Test Loss: {test_loss}")
print(f"Test Accuracy: {test_acc}")


Worker 0 updated layers: [0]
Worker 0 - Step 0, Loss: 2.3010637760162354
Worker 0 updated layers: [0]
Worker 0 - Step 1, Loss: 2.3063416481018066
Worker 0 updated layers: [0]
Worker 0 - Step 2, Loss: 2.305616855621338
Worker 1 updated layers: [2, 5, 6, 7]
Worker 1 - Step 0, Loss: 2.306950092315674
Worker 0 updated layers: [0]
Worker 0 - Step 3, Loss: 2.301142692565918
Worker 0 updated layers: [0]
Worker 0 - Step 4, Loss: 2.300665855407715
Worker 0 updated layers: [0]
Worker 0 - Step 5, Loss: 2.2978196144104004
Worker 1 updated layers: [2, 5, 6, 7]
Worker 1 - Step 1, Loss: 2.2992303371429443
Worker 0 updated layers: [0]
Worker 0 - Step 6, Loss: 2.304971694946289
Worker 0 updated layers: [0]
Worker 0 - Step 7, Loss: 2.302351951599121
Worker 0 updated layers: [0]
Worker 0 - Step 8, Loss: 2.3071470260620117
Worker 1 updated layers: [2, 5, 6, 7]
Worker 1 - Step 2, Loss: 2.3068220615386963
Worker 0 updated layers: [0]
Worker 0 - Step 9, Loss: 2.3035683631896973
Worker 0 updated layers: [0]
W

In [1]:
import tensorflow as tf
from tensorflow.keras import layers, models
import numpy as np
import csv
import threading

# Define the model
def create_model():
    return models.Sequential([
        layers.Input(shape=(32, 32, 1)),  # Input layer for grayscale images
        layers.Conv2D(6, kernel_size=(5, 5), activation='relu'),  # Worker 0 updates this
        layers.AveragePooling2D(pool_size=(2, 2)),                # No trainable parameters
        layers.Conv2D(16, kernel_size=(5, 5), activation='relu'), # Worker 1 updates this
        layers.AveragePooling2D(pool_size=(2, 2)),                # No trainable parameters
        layers.Flatten(),                                         # No trainable parameters
        layers.Dense(120, activation='relu'),                    # Worker 1 updates this
        layers.Dense(84, activation='relu'),                     # Worker 1 updates this
        layers.Dense(10, activation='softmax')                   # Worker 1 updates this
    ])

# Dataset preparation
def create_dataset():
    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.cifar10.load_data()

    # Resize images and convert to grayscale
    x_train = tf.image.resize(x_train, (32, 32))
    x_test = tf.image.resize(x_test, (32, 32))
    x_train = tf.image.rgb_to_grayscale(x_train) / 255.0  # Normalize
    x_test = tf.image.rgb_to_grayscale(x_test) / 255.0    # Normalize

    # Create TensorFlow datasets
    train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).shuffle(5000).batch(64)
    test_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test)).batch(64)
    return train_dataset, test_dataset

# Assign random subsets to workers
def create_random_subsets(dataset, num_workers):
    """
    Split the dataset into random subsets for each worker.
    - Each worker gets approximately 1/num_workers of the dataset.
    """
    shuffled_dataset = dataset.shuffle(buffer_size=5000)
    subsets = []
    for i in range(num_workers):
        subsets.append(shuffled_dataset.shard(num_shards=num_workers, index=i))
    return subsets

# Save weights to CSV
def save_weights_to_csv(worker_id, layer_weights, filename):
    with open(filename, mode='a', newline='') as file:
        writer = csv.writer(file)
        for layer, weights in layer_weights.items():
            writer.writerow([f"Worker {worker_id}", layer, *weights.flatten()[:10]])  # Save first 10 weights

# Custom Worker Class
class Worker:
    """
    Represents a worker that:
    - Processes a random subset of the dataset.
    - Updates assigned layers of the model.
    - Logs updated weights to a CSV file.
    """
    def __init__(self, worker_id, model, layers_to_update, dataset, csv_file):
        self.worker_id = worker_id
        self.model = model
        self.layers_to_update = layers_to_update  # Indices of layers this worker updates
        self.dataset = dataset
        self.csv_file = csv_file
        self.optimizer = tf.keras.optimizers.Adam()  # Optimizer for this worker

    def compute_and_update(self, features, labels):
        """
        Compute gradients for assigned layers and update their weights.
        """
        with tf.GradientTape(persistent=True) as tape:
            predictions = self.model(features, training=True)
            loss = tf.keras.losses.sparse_categorical_crossentropy(labels, predictions)
            loss = tf.reduce_mean(loss)

        # Compute gradients for the assigned layers only
        gradients = []
        variables = []
        layer_weights = {}
        for i in self.layers_to_update:
            if i < len(self.model.layers) and self.model.layers[i].trainable_weights:
                gradients.extend(tape.gradient(loss, self.model.layers[i].trainable_weights))
                variables.extend(self.model.layers[i].trainable_weights)
                # Save current weights for logging
                layer_weights[f"Layer {i}"] = self.model.layers[i].trainable_weights[0].numpy()

        # Apply gradients and log updated weights
        if gradients and variables:
            self.optimizer.apply_gradients(zip(gradients, variables))
            save_weights_to_csv(self.worker_id, layer_weights, self.csv_file)
            print(f"Worker {self.worker_id} updated layers: {self.layers_to_update}")
        else:
            print(f"Worker {self.worker_id} found no trainable variables to update.")

        del tape
        return loss

    def run(self):
        """
        Process the dataset shard assigned to this worker.
        """
        for step, (features, labels) in enumerate(self.dataset):
            features = tf.convert_to_tensor(features)
            labels = tf.convert_to_tensor(labels)
            loss = self.compute_and_update(features, labels)
            print(f"Worker {self.worker_id} - Step {step}, Loss: {loss.numpy()}")

# Create datasets
train_dataset, test_dataset = create_dataset()

# Verify model structure
global_model = create_model()
global_model.summary()

# Assign layers to workers based on the model summary
worker_0_layers = [0]  # Conv2D(6) for Worker 0
worker_1_layers = [2, 5, 6, 7]  # Conv2D(16) and Dense layers for Worker 1

# Create random subsets for workers
num_workers = 2
worker_datasets = create_random_subsets(train_dataset, num_workers)

# Create workers with random datasets
worker_0 = Worker(worker_id=0, model=global_model, layers_to_update=worker_0_layers, dataset=worker_datasets[0], csv_file='worker_0_weights.csv')
worker_1 = Worker(worker_id=1, model=global_model, layers_to_update=worker_1_layers, dataset=worker_datasets[1], csv_file='worker_1_weights.csv')

# Parallel worker execution using threading
def run_worker(worker):
    """
    Helper function to run a worker in a thread.
    """
    worker.run()

# Create threads for workers
worker_0_thread = threading.Thread(target=run_worker, args=(worker_0,))
worker_1_thread = threading.Thread(target=run_worker, args=(worker_1,))

# Start threads
worker_0_thread.start()
worker_1_thread.start()

# Wait for both workers to finish
worker_0_thread.join()
worker_1_thread.join()

# Evaluate the global model
global_model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
test_loss, test_acc = global_model.evaluate(test_dataset)
print(f"Test Loss: {test_loss}")
print(f"Test Accuracy: {test_acc}")


Downloading data from https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz
[1m170498071/170498071[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m12s[0m 0us/step


Worker 0 updated layers: [0]
Worker 0 - Step 0, Loss: 2.303966999053955
Worker 0 updated layers: [0]
Worker 0 - Step 1, Loss: 2.2964797019958496
Worker 0 updated layers: [0]Worker 1 updated layers: [2, 5, 6, 7]

Worker 0 - Step 2, Loss: 2.307445764541626
Worker 1 - Step 0, Loss: 2.30586838722229
Worker 0 updated layers: [0]
Worker 0 - Step 3, Loss: 2.311882257461548
Worker 0 updated layers: [0]
Worker 0 - Step 4, Loss: 2.299121856689453
Worker 1 updated layers: [2, 5, 6, 7]
Worker 1 - Step 1, Loss: 2.3009235858917236
Worker 0 updated layers: [0]
Worker 0 - Step 5, Loss: 2.2981696128845215
Worker 0 updated layers: [0]
Worker 0 - Step 6, Loss: 2.310103416442871
Worker 0 updated layers: [0]
Worker 0 - Step 7, Loss: 2.307710647583008
Worker 1 updated layers: [2, 5, 6, 7]
Worker 1 - Step 2, Loss: 2.304830551147461
Worker 0 updated layers: [0]
Worker 0 - Step 8, Loss: 2.3085670471191406
Worker 0 updated layers: [0]
Worker 0 - Step 9, Loss: 2.2985544204711914
Worker 0 updated layers: [0]
Work