In [4]:
import ray
import numpy as np
from sklearn.linear_model import LinearRegression
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
import time
import psutil

# Initialize Ray
ray.init(ignore_reinit_error=True, dashboard_port=8265)

# Load California Housing Dataset and increase the size
def load_data():
    data = fetch_california_housing()
    X, y = data.data, data.target
    
    # Increase the size of the dataset by tripling the data
    X = np.tile(X, (6, 1))  # Repeat the data three times (for X)
    y = np.tile(y, 6)       # Repeat the target values three times (for y)
    
    return train_test_split(X, y, test_size=0.2, random_state=42)

# Linear regression training function
@ray.remote
def train_worker(X, y):
    start_time = time.time()  # Start timer for the worker
    model = LinearRegression()
    model.fit(X, y)
    score = model.score(X, y)  # R^2 score
    training_time = time.time() - start_time  # Calculate training time for this worker
    return score, training_time

# Fault Tolerance Test with Average Calculation
def measure_fault_tolerance(num_workers, num_failures, X_train, y_train, num_trials=5):
    print(f"\nFault Tolerance Test: {num_workers} Workers with {num_failures} Failures (over {num_trials} trials)")

    trial_results = []

    for trial in range(num_trials):
        workers = []
        split_X = np.array_split(X_train, num_workers)
        split_y = np.array_split(y_train, num_workers)

        # Launch workers
        for i in range(num_workers):
            workers.append(train_worker.remote(split_X[i], split_y[i]))

        # Simulate failures
        for _ in range(num_failures):
            if workers:
                failed_worker = workers.pop(np.random.randint(len(workers)))
                ray.cancel(failed_worker, force=True)
                print(f"Simulated failure for task: {failed_worker}")

        # Collect results from surviving workers
        results = []
        training_times = []
        for worker in workers:
            try:
                result, worker_training_time = ray.get(worker)
                results.append(result)
                training_times.append(worker_training_time)
            except Exception as e:
                print(f"Task {worker} failed with error: {e}")

        if results:
            avg_score = np.mean(results)
            avg_training_time = np.mean(training_times)
            trial_results.append(avg_score)
            print(f"Trial {trial + 1}: Average R^2 Score after failures: {avg_score:.4f}, Average Training Time: {avg_training_time:.2f} seconds")
        else:
            print(f"Trial {trial + 1}: No surviving workers completed their tasks.")
            trial_results.append(0)

    # Calculate overall average fault tolerance score
    overall_avg_score = np.mean(trial_results)
    print(f"Overall Average R^2 Score after failures: {overall_avg_score:.4f}\n")
    return overall_avg_score

# Resource monitoring function
def monitor_resource_usage():
    total_cores = psutil.cpu_count(logical=True)
    available_cores = ray.available_resources().get("CPU", 0)
    total_memory = psutil.virtual_memory().total / (1024 ** 3)  # GB
    used_memory = (total_memory - psutil.virtual_memory().available / (1024 ** 3))

    print(f"Total CPU Cores: {total_cores}")
    print(f"Available CPU Cores: {available_cores}")
    print(f"Total Memory: {total_memory:.2f} GB")
    print(f"Used Memory: {used_memory:.2f} GB")

# Measure communication overhead
def measure_communication_overhead(num_workers, X_train, y_train):
    start_time = time.time()

    split_X = np.array_split(X_train, num_workers)
    split_y = np.array_split(y_train, num_workers)

    # Launch workers and collect results
    workers = [train_worker.remote(split_X[i], split_y[i]) for i in range(num_workers)]
    ray.get(workers)

    end_time = time.time()
    communication_time = end_time - start_time
    print(f"Communication Overhead for {num_workers} workers: {communication_time:.2f} seconds")

# Main execution
if __name__ == "__main__":
    X_train, X_test, y_train, y_test = load_data()

    # Test fault tolerance with different configurations (2, 4, 8, and 10 workers)
    for num_workers in [2, 4, 8, 10]:
        measure_fault_tolerance(num_workers=num_workers, num_failures=2, X_train=X_train, y_train=y_train, num_trials=3)

        # Measure communication overhead
        measure_communication_overhead(num_workers=num_workers, X_train=X_train, y_train=y_train)

    # Monitor resource usage
    print("\nResource Usage:")
    monitor_resource_usage()

    ray.shutdown()

2024-12-07 01:09:50,531	INFO worker.py:1634 -- Connecting to existing Ray cluster at address: 127.0.0.1:6379...
2024-12-07 01:09:50,582	INFO worker.py:1819 -- Connected to Ray cluster.


Fault Tolerance Test: 4 Workers with 1 Failures (over 3 trials)
Simulated failure for task: ObjectRef(3b86534cae58b4adffffffffffffffffffffffff0200000001000000)
[33m(raylet)[0m A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: a50339a56b26a0fbce2376c7c0dc3a24ed20cd7c02000000 Worker ID: 8969c3f864a32de23aceaadeb313403d7db92bfe4ad4916b0917c81d Node ID: ddf46cc29f954199e1fdd9d4ca119f09663a730b83b3f48dcd7b7dc7 Worker IP address: 127.0.0.1 Worker port: 10014 Worker PID: 90932 Worker exit type: SYSTEM_ERROR Worker exit detail: The leased worker has unrecoverable failure. Worker is requested to be destroyed when it is returned. RPC Error message: Connection reset; RPC Error details: 


[36m(pid=90988)[0m 
[36m(pid=90988)[0m A module that was compiled using NumPy 1.x cannot be run in
[36m(pid=90988)[0m NumPy 2.1.3 as it may crash. To support both 1.x and 2.x
[36m(pid=90988)[0m versions of NumPy, modules must be compiled with NumPy 2.0.
[36m(pid=90988)[0m Some module may need to rebuild instead e.g. with 'pybind11>=2.12'.
[36m(pid=90988)[0m 
[36m(pid=90988)[0m If you are a user of the module, the easiest solution will be to
[36m(pid=90988)[0m downgrade to 'numpy<2' or try to upgrade the affected module.
[36m(pid=90988)[0m We expect that some modules will need time to support NumPy 2.
[36m(pid=90988)[0m 
[36m(pid=90988)[0m Traceback (most recent call last):  File "C:\Users\PMLS\anaconda3\Lib\site-packages\ray\_private\workers\default_worker.py", line 289, in <module>
[36m(pid=90988)[0m     worker.main_loop()
[36m(pid=90988)[0m   File "C:\Users\PMLS\anaconda3\Lib\site-packages\ray\_private\worker.py", line 920, in main_loop
[36m(pid=90988)[0m 

Trial 1: Average R^2 Score after failures: 0.6190, Average Training Time: 0.01 seconds
Simulated failure for task: ObjectRef(8f663780d3989ddbffffffffffffffffffffffff0200000001000000)
Trial 2: Average R^2 Score after failures: 0.6129, Average Training Time: 0.01 seconds
Simulated failure for task: ObjectRef(be6acd8b66376146ffffffffffffffffffffffff0200000001000000)
Trial 3: Average R^2 Score after failures: 0.6265, Average Training Time: 0.01 seconds
Overall Average R^2 Score after failures: 0.6195

Fault Tolerance Test: 8 Workers with 2 Failures (over 3 trials)
Simulated failure for task: ObjectRef(c453a022e4a6723cffffffffffffffffffffffff0200000001000000)
Simulated failure for task: ObjectRef(ba8a006fe2c26cd1ffffffffffffffffffffffff0200000001000000)
Trial 1: Average R^2 Score after failures: 0.6317, Average Training Time: 0.01 seconds
Simulated failure for task: ObjectRef(3f2f06a8fa342b1bffffffffffffffffffffffff0200000001000000)
Simulated failure for task: ObjectRef(e1dfc33e08f0b074ffff

2024-12-07 01:09:56,688	ERROR worker.py:422 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): The worker died unexpectedly while executing this task. Check python-core-worker-*.log files for more information.


Simulated failure for task: ObjectRef(cdc13c092b12eb09ffffffffffffffffffffffff0200000001000000)
Simulated failure for task: ObjectRef(fb88e879dfe75486ffffffffffffffffffffffff0200000001000000)
Trial 3: Average R^2 Score after failures: 0.6314, Average Training Time: 0.01 seconds
Overall Average R^2 Score after failures: 0.6316

Communication Overhead for 4 workers: 0.14 seconds
Communication Overhead for 8 workers: 0.25 seconds

Resource Usage:
Total CPU Cores: 12
Available CPU Cores: 11.0
Total Memory: 7.73 GB
Used Memory: 6.80 GB


[36m(pid=90928)[0m A module that was compiled using NumPy 1.x cannot be run in[32m [repeated 4x across cluster][0m
[36m(pid=90928)[0m NumPy 2.1.3 as it may crash. To support both 1.x and 2.x[32m [repeated 4x across cluster][0m
[36m(pid=90928)[0m versions of NumPy, modules must be compiled with NumPy 2.0.[32m [repeated 4x across cluster][0m
[36m(pid=90928)[0m Some module may need to rebuild instead e.g. with 'pybind11>=2.12'.[32m [repeated 4x across cluster][0m
[36m(pid=90928)[0m If you are a user of the module, the easiest solution will be to[32m [repeated 4x across cluster][0m
[36m(pid=90928)[0m downgrade to 'numpy<2' or try to upgrade the affected module.[32m [repeated 4x across cluster][0m
[36m(pid=90928)[0m We expect that some modules will need time to support NumPy 2.[32m [repeated 4x across cluster][0m
[36m(pid=90928)[0m Traceback (most recent call last):  File "C:\Users\PMLS\anaconda3\Lib\site-packages\ray\_private\workers\default_worker.py", line 2