In [1]:
import ray
import tensorflow as tf
import numpy as np
from tensorflow.keras.datasets import mnist
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten, Dropout
from tensorflow.keras.optimizers import Adam
import time
import psutil
import os

# TensorFlow threading optimization
tf.config.threading.set_intra_op_parallelism_threads(4)
tf.config.threading.set_inter_op_parallelism_threads(4)

# Initialize Ray
ray.init()

# Load and preprocess MNIST data
def preprocess_mnist():
    (x_train, y_train), (x_test, y_test) = mnist.load_data()
    x_train = x_train.astype("float32") / 255.0
    x_test = x_test.astype("float32") / 255.0
    y_train = tf.keras.utils.to_categorical(y_train, 10)
    y_test = tf.keras.utils.to_categorical(y_test, 10)
    return x_train, y_train, x_test, y_test

# More complex model with Dropout layer
def build_complex_model():
    model = Sequential([
        Flatten(input_shape=(28, 28)),
        Dense(128, activation='relu'),
        Dropout(0.2),
        Dense(64, activation='relu'),
        Dense(10, activation='softmax')
    ])
    model.compile(optimizer=Adam(learning_rate=0.001), loss="categorical_crossentropy", metrics=["accuracy"])
    return model

# Ray Remote Training Worker
@ray.remote(num_cpus=1)  # Explicitly allocate 1 CPU per worker
def train_worker(x_data, y_data, epochs=1):
    print(f"Worker started with data shape: {x_data.shape}")
    model = build_complex_model()
    model.fit(x_data, y_data, epochs=epochs, verbose=0)
    return model.evaluate(x_data, y_data, verbose=0)

# Measure scalability and resource usage
def measure_scalability(num_workers, x_train, y_train):
    split_data = np.array_split(x_train, num_workers)
    split_labels = np.array_split(y_train, num_workers)

    workers = []
    start_time = time.time()
    for i in range(num_workers):
        workers.append(train_worker.remote(split_data[i], split_labels[i]))

    results = ray.get(workers)
    end_time = time.time()

    avg_accuracy = np.mean([result[1] for result in results])
    print(f"Scalability Test: {num_workers} Workers")
    print(f"Average Accuracy: {avg_accuracy:.4f}")
    print(f"Training Time: {end_time - start_time:.2f} seconds")

# Monitor resource usage
def monitor_resource_usage():
    total_cpus = psutil.cpu_count(logical=True)  # Total CPU cores (including logical CPUs)
    available_cpus = psutil.cpu_count(logical=False)  # Physical cores (if Hyperthreading is on, it will be less than total logical CPUs)
    total_memory = psutil.virtual_memory().total / (1024 ** 3)  # Total memory in GB
    used_memory = psutil.virtual_memory().used / (1024 ** 3)  # Used memory in GB

    print(f"Resource Usage:")
    print(f"Total CPU Cores: {total_cpus}")
    print(f"Available CPU Cores: {available_cpus}")
    print(f"Total Memory: {total_memory:.2f} GB")
    print(f"Used Memory: {used_memory:.2f} GB")

# Test fault tolerance
@ray.remote(num_cpus=1)
def faulty_worker(x_data, y_data):
    if np.random.rand() < 0.3:  # Simulate random failure
        raise Exception("Simulated worker failure")
    model = build_complex_model()
    model.fit(x_data, y_data, epochs=1, verbose=0)
    return model.evaluate(x_data, y_data, verbose=0)

# Modified Fault Tolerance Test with Average Accuracy Calculation
def test_fault_tolerance(x_train, y_train):
    num_workers = 5
    split_data = np.array_split(x_train, num_workers)
    split_labels = np.array_split(y_train, num_workers)

    workers = []
    for i in range(num_workers):
        workers.append(faulty_worker.remote(split_data[i], split_labels[i]))

    successful_results = []
    failed_workers = 0

    try:
        results = ray.get(workers)
        for result in results:
            if result is not None:
                successful_results.append(result[1])
            else:
                failed_workers += 1
        
        if successful_results:
            avg_accuracy = np.mean(successful_results)
            print(f"Fault Tolerance Test Passed. Avg Accuracy (successful workers): {avg_accuracy:.4f}")
        else:
            print("No successful workers in fault tolerance test.")
        
    except Exception as e:
        # Mask system path, only show the pid and exception message
        pid = os.getpid()  # Get the current process ID
        print(f"Fault Tolerance Test: Worker with PID {pid} failed. Error: {str(e)}")
        
    print(f"Total workers: {num_workers}, Failed workers: {failed_workers}, Successful workers: {len(successful_results)}")

# Main execution
if __name__ == "__main__":
    x_train, y_train, x_test, y_test = preprocess_mnist()

    # Test scalability
    for workers in [2, 4, 6, 8, 9, 12]:  # Test with different numbers of workers
        measure_scalability(workers, x_train, y_train)

    # Test fault tolerance with average accuracy calculation
    test_fault_tolerance(x_train, y_train)

    # Monitor resource usage
    monitor_resource_usage()

    # Shutdown Ray
    ray.shutdown()


2024-12-04 01:05:52,477	INFO util.py:154 -- Outdated packages:
  ipywidgets==7.8.1 found, needs ipywidgets>=8
Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.
2024-12-04 01:05:57,038	INFO worker.py:1634 -- Connecting to existing Ray cluster at address: 127.0.0.1:6379...
2024-12-04 01:05:57,059	INFO worker.py:1819 -- Connected to Ray cluster.
[36m(pid=78280)[0m 2024-12-04 01:05:58.952490: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
[36m(pid=78280)[0m 2024-12-04 01:06:00.031805: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE

[36m(train_worker pid=78280)[0m Worker started with data shape: (30000, 28, 28)
[36m(train_worker pid=78280)[0m Worker started with data shape: (30000, 28, 28)
Scalability Test: 2 Workers
Average Accuracy: 0.9514
Training Time: 14.12 seconds
[36m(train_worker pid=78280)[0m Worker started with data shape: (15000, 28, 28)


[36m(pid=79464)[0m 2024-12-04 01:06:12.139545: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
[36m(pid=77844)[0m 2024-12-04 01:06:13.106679: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.


[36m(train_worker pid=78280)[0m Worker started with data shape: (15000, 28, 28)


[36m(train_worker pid=77844)[0m   super().__init__(**kwargs)
[36m(train_worker pid=77844)[0m 2024-12-04 01:06:17.750466: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
[36m(train_worker pid=77844)[0m To enable the following instructions: AVX2 AVX_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
[36m(pid=77844)[0m 2024-12-04 01:06:14.611438: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.[32m [repeated 2x across cluster] (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to disable log deduplication, or see https://docs.ray.io/en/master/ray-observability/user-guides/configure-logging.html#log-deduplication for m

Scalability Test: 4 Workers
Average Accuracy: 0.9375
Training Time: 11.61 seconds
[36m(train_worker pid=78280)[0m Worker started with data shape: (10000, 28, 28)[32m [repeated 3x across cluster][0m


[36m(train_worker pid=79464)[0m   super().__init__(**kwargs)
[36m(train_worker pid=79464)[0m 2024-12-04 01:06:18.218980: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
[36m(train_worker pid=79464)[0m To enable the following instructions: AVX2 AVX_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
[36m(pid=73708)[0m 2024-12-04 01:06:24.132123: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
[36m(pid=73180)[0m 2024-12-04 01:06:25.948111: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation 

[36m(train_worker pid=77844)[0m Worker started with data shape: (10000, 28, 28)[32m [repeated 3x across cluster][0m


[36m(train_worker pid=73708)[0m   super().__init__(**kwargs)
[36m(train_worker pid=73708)[0m 2024-12-04 01:06:33.411450: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
[36m(train_worker pid=73708)[0m To enable the following instructions: AVX2 AVX_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
[36m(pid=73180)[0m 2024-12-04 01:06:28.959555: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.[32m [repeated 2x across cluster][0m


Scalability Test: 6 Workers
Average Accuracy: 0.9280
Training Time: 13.37 seconds
[36m(train_worker pid=73708)[0m Worker started with data shape: (7500, 28, 28)[32m [repeated 3x across cluster][0m


[36m(train_worker pid=73180)[0m   super().__init__(**kwargs)
[36m(train_worker pid=73180)[0m 2024-12-04 01:06:33.369454: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
[36m(train_worker pid=73180)[0m To enable the following instructions: AVX2 AVX_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
[36m(pid=71572)[0m 2024-12-04 01:06:42.334933: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.[32m [repeated 2x across cluster][0m
[36m(train_worker pid=71572)[0m   super().__init__(**kwargs)
[36m(train_worker pid=71572)[0m 2024-12-04 01:06:48.119596: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow b

[36m(train_worker pid=71572)[0m Worker started with data shape: (7500, 28, 28)[32m [repeated 7x across cluster][0m
Scalability Test: 8 Workers
Average Accuracy: 0.9197
Training Time: 14.17 seconds


[36m(pid=77356)[0m 2024-12-04 01:06:52.200519: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.


[36m(train_worker pid=71572)[0m Worker started with data shape: (6666, 28, 28)[32m [repeated 7x across cluster][0m


[36m(pid=77356)[0m 2024-12-04 01:06:56.557265: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
[36m(train_worker pid=77356)[0m   super().__init__(**kwargs)
[36m(train_worker pid=77356)[0m 2024-12-04 01:07:03.291065: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
[36m(train_worker pid=77356)[0m To enable the following instructions: AVX2 AVX_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


[36m(train_worker pid=77356)[0m Worker started with data shape: (6666, 28, 28)[32m [repeated 2x across cluster][0m
Scalability Test: 9 Workers
Average Accuracy: 0.9181
Training Time: 15.46 seconds
[36m(train_worker pid=78280)[0m Worker started with data shape: (5000, 28, 28)
[36m(train_worker pid=79464)[0m Worker started with data shape: (5000, 28, 28)


[36m(pid=79284)[0m 2024-12-04 01:07:08.707559: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
[36m(pid=79284)[0m 2024-12-04 01:07:13.252959: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
[36m(train_worker pid=79284)[0m   super().__init__(**kwargs)
[36m(train_worker pid=79284)[0m 2024-12-04 01:07:20.283185: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
[36m(train_worker pid=79284)[0m To enable the following instructions: AVX2 AVX_VNNI 

[36m(train_worker pid=79284)[0m Worker started with data shape: (5000, 28, 28)[32m [repeated 8x across cluster][0m
Scalability Test: 12 Workers
Average Accuracy: 0.9061
Training Time: 17.54 seconds


2024-12-04 01:07:24,330	ERROR worker.py:422 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): [36mray::faulty_worker()[39m (pid=77844, ip=127.0.0.1)
  File "python\ray\_raylet.pyx", line 1862, in ray._raylet.execute_task
  File "C:\Users\PMLS\AppData\Local\Temp\ipykernel_66768\227194966.py", line 83, in faulty_worker
Exception: Simulated worker failure


Fault Tolerance Test: Worker with PID 66768 failed. Error: [36mray::faulty_worker()[39m (pid=77844, ip=127.0.0.1)
  File "python\ray\_raylet.pyx", line 1862, in ray._raylet.execute_task
  File "C:\Users\PMLS\AppData\Local\Temp\ipykernel_66768\227194966.py", line 83, in faulty_worker
Exception: Simulated worker failure
Total workers: 5, Failed workers: 0, Successful workers: 0
Resource Usage:
Total CPU Cores: 12
Available CPU Cores: 10
Total Memory: 7.73 GB
Used Memory: 7.69 GB
[36m(train_worker pid=81480)[0m Worker started with data shape: (5000, 28, 28)[32m [repeated 2x across cluster][0m


[36m(train_worker pid=81480)[0m   super().__init__(**kwargs)[32m [repeated 2x across cluster][0m
[36m(train_worker pid=81480)[0m 2024-12-04 01:07:21.080023: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.[32m [repeated 2x across cluster][0m
[36m(train_worker pid=81480)[0m To enable the following instructions: AVX2 AVX_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.[32m [repeated 2x across cluster][0m
