### Implementing Ray for Hyperparameter Tuning and Model Comparison

### Introduction

<div class="alert alert-info">
  <strong><a href="https://www.ray.io/" target="_blank">Ray</a></strong> is a powerful distributed computing framework designed for scalable and efficient execution of Python applications across clusters. It simplifies the development of distributed systems by automating orchestration, scheduling, fault tolerance, and resource management.
</div>


### Key Points:


*   **`ray.init()`**
Memulai runtime Ray dan terhubung ke cluster Ray

*   **`@ray.remote`**
Dekorator yang menentukan fungsi atau kelas Python untuk dieksekusi sebagai tugas (remote function) atau (remote clas) dalam proses yang berbeda

* **`.remote`**
Suffix untuk fungsi dan kelas remote; operasi remote bersifat asynchronous

* **`ray.put()`**
Menyimpan objek di dalam penyimpanan objek in-memory; mengembalikan referensi objek yang digunakan untuk melewati objek ke setiap pemanggilan fungsi atau metode remote.

* **`ray.get()`**
Mengambil objek remote dari penyimpanan objek dengan menentukan referensi objek tersebut.




### Project Context

Mengimplementasikan beberapa teknik machine learning untuk menunjukkan bagaimana Ray dapat mempercepat proses ini.

# Import Libraries

In [None]:
!pip install ray

Collecting ray
  Downloading ray-2.32.0-cp310-cp310-manylinux2014_x86_64.whl (65.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m65.7/65.7 MB[0m [31m10.8 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: ray
Successfully installed ray-2.32.0


In [None]:
!pip install ray[default] joblib



In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# Ray for parallel processing
import ray
import itertools
import time
import joblib
from ray.util.joblib import register_ray

# Scikit-learn libraries
from collections import Counter
from sklearn.preprocessing import LabelEncoder, OneHotEncoder, OrdinalEncoder
from sklearn.tree import DecisionTreeClassifier
from sklearn.model_selection import train_test_split, cross_val_predict, GridSearchCV, RandomizedSearchCV, cross_val_score
from sklearn.compose import ColumnTransformer, make_column_transformer, make_column_selector
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.impute import SimpleImputer
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score
from sklearn.datasets import make_classification
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression

# Loading Data

In [None]:
df = pd.read_csv('https://raw.githubusercontent.com/tesalonikahtp/dataset/main/job_app.csv')
df.head()

Unnamed: 0.1,Unnamed: 0,Age,Accessibility,EdLevel,Employment,Gender,MentalHealth,MainBranch,YearsCode,YearsCodePro,Country,PreviousSalary,HaveWorkedWith,ComputerSkills,Employed
0,0,<35,No,Master,1,Man,No,Dev,7,4,Sweden,51552.0,C++;Python;Git;PostgreSQL,4,0
1,1,<35,No,Undergraduate,1,Man,No,Dev,12,5,Spain,46482.0,Bash/Shell;HTML/CSS;JavaScript;Node.js;SQL;Typ...,12,1
2,2,<35,No,Master,1,Man,No,Dev,15,6,Germany,77290.0,C;C++;Java;Perl;Ruby;Git;Ruby on Rails,7,0
3,3,<35,No,Undergraduate,1,Man,No,Dev,9,6,Canada,46135.0,Bash/Shell;HTML/CSS;JavaScript;PHP;Ruby;SQL;Gi...,13,0
4,4,>35,No,PhD,0,Man,No,NotDev,40,30,Singapore,160932.0,C++;Python,2,0


In [None]:
df = df.drop(columns=['Unnamed: 0'])

In [None]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 73462 entries, 0 to 73461
Data columns (total 14 columns):
 #   Column          Non-Null Count  Dtype  
---  ------          --------------  -----  
 0   Age             73462 non-null  object 
 1   Accessibility   73462 non-null  object 
 2   EdLevel         73462 non-null  object 
 3   Employment      73462 non-null  int64  
 4   Gender          73462 non-null  object 
 5   MentalHealth    73462 non-null  object 
 6   MainBranch      73462 non-null  object 
 7   YearsCode       73462 non-null  int64  
 8   YearsCodePro    73462 non-null  int64  
 9   Country         73462 non-null  object 
 10  PreviousSalary  73462 non-null  float64
 11  HaveWorkedWith  73399 non-null  object 
 12  ComputerSkills  73462 non-null  int64  
 13  Employed        73462 non-null  int64  
dtypes: float64(1), int64(5), object(8)
memory usage: 7.8+ MB


In [None]:
df = df.dropna(subset=['HaveWorkedWith'])

Dealing With Categorical Data
- - -

In [None]:
text_col = 'HaveWorkedWith'
categorical_cols = ['Age', 'Accessibility', 'EdLevel', 'Gender', 'MentalHealth', 'MainBranch', 'Country']
ordinal_cols = ['YearsCodePro']

preprocessor = ColumnTransformer(
    transformers=[
        ('tfidf', TfidfVectorizer(), text_col),
        ('onehot', OneHotEncoder(handle_unknown='ignore'), categorical_cols),
        ('ordinal', OrdinalEncoder(), ordinal_cols)
    ])

X = preprocessor.fit_transform(df)

# Get feature names for the resulting dataframe
text_feature_names = preprocessor.named_transformers_['tfidf'].get_feature_names_out([text_col])
categorical_feature_names = preprocessor.named_transformers_['onehot'].get_feature_names_out(categorical_cols)
ordinal_feature_names = ordinal_cols

columns = text_feature_names.tolist() + categorical_feature_names.tolist() + ordinal_feature_names

categorical = pd.DataFrame(X.toarray(), columns=columns)

In [None]:
columns_to_remove = [text_col] + categorical_cols + ordinal_cols
df_cleaned = df.drop(columns=columns_to_remove)

df_cleaned_reset = df_cleaned.reset_index(drop=True)
categorical_reset = categorical.reset_index(drop=True)

merged_data = pd.concat([df_cleaned_reset, categorical_reset], axis=1)

merged_data

Unnamed: 0,Employment,YearsCode,PreviousSalary,ComputerSkills,Employed,3d,angular,ansible,apl,asp,...,Country_United Republic of Tanzania,Country_United States of America,Country_Uruguay,Country_Uzbekistan,"Country_Venezuela, Bolivarian Republic of...",Country_Viet Nam,Country_Yemen,Country_Zambia,Country_Zimbabwe,YearsCodePro
0,1,7,51552.0,4,0,0.0,0.000000,0.0,0.0,0.000000,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,4.0
1,1,12,46482.0,12,1,0.0,0.000000,0.0,0.0,0.000000,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,5.0
2,1,15,77290.0,7,0,0.0,0.000000,0.0,0.0,0.000000,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,6.0
3,1,9,46135.0,13,0,0.0,0.000000,0.0,0.0,0.000000,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,6.0
4,0,40,160932.0,2,0,0.0,0.000000,0.0,0.0,0.000000,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,30.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
73394,1,7,41058.0,13,1,0.0,0.250169,0.0,0.0,0.261380,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2.0
73395,1,21,115000.0,11,1,0.0,0.000000,0.0,0.0,0.247904,...,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,16.0
73396,1,4,57720.0,12,1,0.0,0.000000,0.0,0.0,0.000000,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,3.0
73397,1,5,70000.0,15,1,0.0,0.252840,0.0,0.0,0.264170,...,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0


In [None]:
merged_data.columns = merged_data.columns.str.replace('[', '_').str.replace(']', '_').str.replace('<', '_')
X = merged_data.drop(columns='Employed')
y = merged_data['Employed']

# Ray for Hyperparameter Tuning

Sequential (Without Ray)
- - -

In [None]:
param_grid = {
    'criterion': ['gini', 'entropy'],
    'splitter': ['best'],
    'max_depth': [None],
    'min_samples_split': [2, 10],
    'min_samples_leaf': [1, 4],
    'min_weight_fraction_leaf': [0],
    'max_features': [None, 'log2'],
    'random_state': [None]
}

X = merged_data.drop(columns='Employed')
y = merged_data['Employed']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

model = DecisionTreeClassifier()

start_time = time.time()

# Hyperparameter Tuning
search = GridSearchCV(model, param_grid=param_grid, cv=5, scoring='f1_macro')
search.fit(X_train, y_train)

best_model = search.best_estimator_

print("Best Hyperparameters:", search.best_params_)

best_model.fit(X_train, y_train)

y_pred_test = best_model.predict(X_test)

f1_test = f1_score(y_test, y_pred_test, average='macro')

time_delta = time.time() - start_time

print("f1 score test:", f1_test)
print("Run time:", time_delta)

Best Hyperparameters: {'criterion': 'gini', 'max_depth': None, 'max_features': None, 'min_samples_leaf': 4, 'min_samples_split': 10, 'min_weight_fraction_leaf': 0, 'random_state': None, 'splitter': 'best'}
f1 score train: 0.9853189158693128
f1 score test: 0.9693555693330244
Run time: 115.05536413192749


Note : Run time 115s

Paralel (Using Ray)
- - -

In [None]:
# Initialize Ray
ray.init(ignore_reinit_error=True)
register_ray()

param_grid = {
    'criterion': ['gini', 'entropy'],
    'splitter': ['best'],
    'max_depth': [None],
    'min_samples_split': [2, 10],
    'min_samples_leaf': [1, 4],
    'min_weight_fraction_leaf': [0],
    'max_features': [None, 'log2'],
    'random_state': [None]
}

model = DecisionTreeClassifier()

start_time = time.time()

with joblib.parallel_backend('ray'):
    search = GridSearchCV(model, param_grid=param_grid, cv=5, scoring='f1_macro')
    search.fit(X_train, y_train)

best_model = search.best_estimator_

print("Best Hyperparameters:", search.best_params_)

best_model.fit(X_train, y_train)

y_pred_train = best_model.predict(X_train)
y_pred_test = best_model.predict(X_test)

f1_train = f1_score(y_train, y_pred_train, average='macro')
f1_test = f1_score(y_test, y_pred_test, average='macro')

time_delta = time.time() - start_time

print("f1 score train:", f1_train)
print("f1 score test:", f1_test)
print("Run time:", time_delta)

# Shutdown Ray
ray.shutdown()


2024-07-11 04:02:25,722	INFO worker.py:1779 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


Best Hyperparameters: {'criterion': 'gini', 'max_depth': None, 'max_features': None, 'min_samples_leaf': 4, 'min_samples_split': 10, 'min_weight_fraction_leaf': 0, 'random_state': None, 'splitter': 'best'}
f1 score train: 0.9853700076520305
f1 score test: 0.9690825017497842
Run time: 96.5391104221344


Note : Rune time: 96s

# Hasil Run time for Tuning Hyperparameter

| Data | Dimension | Using Ray (s) | Serial (s) | Perbandingan | Peningkatan (%)  
|----------|----------|:----------:|:---------:|:----------:|:----------:
| job_application | 73399 rows × 317 columns | 96 | 115 |0.8347x | 16.52

# Ray for Comparing 1 model

First Dataset (20.640 rows × 8 columns)
---

Sequential (Without Ray)
- - -

In [None]:
from sklearn.linear_model import LinearRegression
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split

california_housing = fetch_california_housing()
X_california = california_housing.data
y_california = california_housing.target

X_train, X_test, y_train, y_test = train_test_split(X_california, y_california, test_size=0.2, random_state=42)

start_time = time.time()

model = LinearRegression()
model.fit(X_train, y_train)

delta_time = time.time() - start_time

accuracy = model.score(X_test, y_test)
print(f'Accuracy without Ray: {accuracy}')
print("Run time:", delta_time)

Accuracy without Ray: 0.5757877060324524
Run time: 0.005457401275634766


Note: Run time: 0.04s

Paralel (Using Ray)
- - -

In [None]:
from sklearn.linear_model import LinearRegression
from sklearn.datasets import fetch_california_housing

# Initialize Ray
ray.init()

@ray.remote
def train_model(X_train, y_train):
    model = LinearRegression()
    model.fit(X_train, y_train)
    return model

california_housing = fetch_california_housing()
X_california = california_housing.data
y_california = california_housing.target

X_train, X_test, y_train, y_test = train_test_split(X_california, y_california, test_size=0.2, random_state=42)

start_time = time.time()

model_ref = train_model.remote(X_train, y_train)
model = ray.get(model_ref)

delta_time = time.time() - start_time

r_squared = model.score(X_test, y_test)

print(f'R-squared with Ray: {r_squared}')
print("Run time:", delta_time)

# Shutdown Ray
ray.shutdown()

2024-07-12 02:08:06,197	INFO worker.py:1788 -- Started a local Ray instance.


R-squared with Ray: 0.5757877060324524
Run time: 0.9055943489074707


Note: Run time: 1.2s

Second Dataset (73399 rows × 317 columns)
----

Sequential (Without Ray)
- - -

In [None]:
import time

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

start_time = time.time()

model = LogisticRegression(max_iter=200)
model.fit(X_train, y_train)

time_delta = time.time() - start_time

accuracy = model.score(X_test, y_test)
print(f'Accuracy without Ray: {accuracy}')
print("Run time:", time_delta)

Accuracy without Ray: 0.6722070844686648
Run time: 2.1381962299346924


Note : Run time : 2.1s

Paralel (Using Ray)
- - -

In [None]:
# Initialize Ray
ray.init()

@ray.remote
def train_model(X_train, y_train):
    model = LogisticRegression(max_iter=200)
    model.fit(X_train, y_train)
    return model


X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

start_time = time.time()

model_ref = train_model.remote(X_train, y_train)
model = ray.get(model_ref)

time_delta = time.time() - start_time

accuracy = model.score(X_test, y_test)
print(f'Accuracy with Ray: {accuracy}')
print("Run time:", time_delta)

# Shutdown Ray
ray.shutdown()

2024-07-11 02:53:36,469	INFO worker.py:1779 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


Accuracy with Ray: 0.6722070844686648
Run time: 3.3988559246063232


Note : Run time : 3.5s

Third Dataset (581012 rows × 54 columns)
---

Sequential (Without Ray)
- - -

In [None]:
from sklearn.datasets import fetch_covtype

data = fetch_covtype()
X = data.data
y = data.target

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

start_time = time.time()

model = LogisticRegression(max_iter=200)
model.fit(X_train, y_train)

delta_time = time.time() - start_time

accuracy = model.score(X_test, y_test)
print(f'Accuracy without Ray: {accuracy}')
print("Run time:", delta_time)

Accuracy without Ray: 0.6450005593659371
Run time: 65.14412069320679


STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
  n_iter_i = _check_optimize_result(


Note: Run time: 65s

Paralel (Using Ray)
- - -

In [None]:
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import fetch_covtype
from sklearn.model_selection import train_test_split

# Initialize Ray
ray.init()

# Define a function to train a logistic regression model
@ray.remote
def train_model(X_train, y_train):
    model = LogisticRegression(max_iter=200)
    model.fit(X_train, y_train)
    return model

# Load a large dataset
data = fetch_covtype()
X = data.data
y = data.target

# Split dataset into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

start_time = time.time()

# Train logistic regression model in parallel using Ray
model_ref = train_model.remote(X_train, y_train)
model = ray.get(model_ref)

delta_time = time.time() - start_time

# Evaluate the model
accuracy = model.score(X_test, y_test)

print(f'Accuracy with Ray: {accuracy}')
print("Run time:", delta_time)

# Shutdown Ray
ray.shutdown()

2024-07-11 02:54:54,507	INFO worker.py:1779 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m
[36m(train_model pid=22579)[0m STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.
[36m(train_model pid=22579)[0m 
[36m(train_model pid=22579)[0m Increase the number of iterations (max_iter) or scale the data as shown in:
[36m(train_model pid=22579)[0m     https://scikit-learn.org/stable/modules/preprocessing.html
[36m(train_model pid=22579)[0m Please also refer to the documentation for alternative solver options:
[36m(train_model pid=22579)[0m     https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
[36m(train_model pid=22579)[0m   n_iter_i = _check_optimize_result(


Accuracy with Ray: 0.6462655869469807
Run time: 61.755223512649536


Note: Run time: 61s



# Hasil Run time for Modeling

| Data | Dimension | Using Ray (s) | Serial (s) | Perbandingan | Peningkatan (%)|
|:----------:|:----------:|:----------:|:----------:|:----------:|:----------:|
| california_housing |20.640 rows × 8 columns  | 1.2 | 0.04 | 30x | -2900 |
| job_application |73399 rows × 317 columns | 3.5 | 2.1 | 1.6x | -66 |
| covtype |581012 rows × 54 columns | 61 | 65 | 0.94x | 6.1 |

# Ray for Comparing Many Models


First Dataset (20.640 rows × 8 columns)
---

Sequential (Without Ray)
- - -

In [None]:
from sklearn.linear_model import LinearRegression
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from sklearn.ensemble import RandomForestRegressor

california_housing = fetch_california_housing()
X_california = california_housing.data
y_california = california_housing.target

X_train, X_test, y_train, y_test = train_test_split(X_california, y_california, test_size=0.2, random_state=42)

models = {
    'Random Forest Regressor': RandomForestRegressor(n_estimators=100, random_state=42),
    'Linear Regression': LinearRegression()
}

start_time = time.time()

def train_and_evaluate_model(model_name, X_train, y_train, X_test, y_test):
    if model_name == 'Random Forest Regressor':
        model = RandomForestRegressor(n_estimators=100, random_state=42)
    elif model_name == 'Linear Regression':
        model = LinearRegression()

    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)

    if model_name == 'Random Forest Regressor':
        mse = mean_squared_error(y_test, y_pred)
        return f"MSE: {mse}"
    elif model_name == 'Linear Regression':
        mse = mean_squared_error(y_test, y_pred)
        return f"MSE: {mse}"

# Initialize a dictionary to store results
results = {}

for model_name in models.keys():
    result = train_and_evaluate_model(model_name, X_train, y_train, X_test, y_test)
    results[model_name] = result

time_delta = time.time() - start_time

for model_name, result in results.items():
    print(f"Test results for {model_name}: {result}")

print("Run time:", time_delta)


Test results for Random Forest Regressor: MSE: 0.2553684927247781
Test results for Linear Regression: MSE: 0.5558915986952422
Run time: 14.082176923751831


Note: Run time: 14s

Paralel (Using Ray)
- - -

In [None]:
from sklearn.linear_model import LinearRegression
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, accuracy_score
from sklearn.ensemble import RandomForestRegressor

# Initialize Ray
ray.init(ignore_reinit_error=True)

california_housing = fetch_california_housing()
X_california = california_housing.data
y_california = california_housing.target

X_train, X_test, y_train, y_test = train_test_split(X_california, y_california, test_size=0.2, random_state=42)

models = {
    'Random Forest Regressor': RandomForestRegressor(n_estimators=100, random_state=42),
    'Linear Regression': LinearRegression()
}

start_time = time.time()

@ray.remote
def train_and_evaluate_model(model_name, X_train, y_train, X_test, y_test):
    if model_name == 'Random Forest Regressor':
        model = RandomForestRegressor(n_estimators=100, random_state=42)
        model.fit(X_train, y_train)
        y_pred = model.predict(X_test)
        mse = mean_squared_error(y_test, y_pred)
        return f"MSE: {mse}"
    elif model_name == 'Linear Regression':
        model = LinearRegression()
        model.fit(X_train, y_train)
        y_pred = model.predict(X_test)
        mse = mean_squared_error(y_test, y_pred)
        return f"MSE: {mse}"



# Initialize a list to store futures
futures = []

for model_name in models.keys():
    future = train_and_evaluate_model.remote(model_name, X_train, y_train, X_test, y_test)
    futures.append(future)

results = ray.get(futures)

time_delta = time.time() - start_time

for model_name, result in zip(models.keys(), results):
    print(f"Test results for {model_name}: {result}")

print("Run time:", time_delta)

# Shutdown Ray
ray.shutdown()


2024-07-11 09:16:31,753	INFO worker.py:1788 -- Started a local Ray instance.


Test results for Random Forest Regressor: MSE: 0.2553684927247781
Test results for Linear Regression: MSE: 0.5558915986952422
Run time: 16.284026861190796


Note: Run time: 16.7s

Second Dataset (73399 rows × 317 columns)
---

Paralel
- - -

In [None]:
# Initialize Ray
ray.init(ignore_reinit_error=True)

X = merged_data.drop(columns='Employed')
y = merged_data['Employed']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

models = {
    'Random Forest': RandomForestClassifier(random_state=42),
    'Logistic Regression': LogisticRegression(max_iter=200, random_state=42)
}

start_time = time.time()

@ray.remote
def train_and_evaluate_model(model_name, X_train, y_train, X_test, y_test):
    if model_name == 'Random Forest':
        model = RandomForestClassifier(random_state=42)
    elif model_name == 'Logistic Regression':
        model = LogisticRegression(max_iter=200, random_state=42)

    model.fit(X_train, y_train)

    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)

    return accuracy

# Initialize a list to store futures
futures = []

for model_name in models.keys():
    future = train_and_evaluate_model.remote(model_name, X_train, y_train, X_test, y_test)
    futures.append(future)

results = ray.get(futures)

time_delta = time.time() - start_time

for model_name, score in zip(models.keys(), results):
    print(f"Test Accuracy for {model_name}: {score}")

print("Run time:", time_delta)
# Shutdown Ray
ray.shutdown()

2024-07-11 02:56:05,202	INFO worker.py:1779 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


Test Accuracy for Random Forest: 0.9402588555858311
Test Accuracy for Logistic Regression: 0.6722070844686648
Run time: 35.52976417541504


Note: Run time: 35s

Sequential
- - -

In [None]:
X = merged_data.drop(columns='Employed')
y = merged_data['Employed']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

models = {
    'Random Forest': RandomForestClassifier(random_state=42),
    'Logistic Regression': LogisticRegression(max_iter=200, random_state=42)
}

start_time = time.time()

results = {}

for model_name, model in models.items():
    model.fit(X_train, y_train)

    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    results[model_name] = accuracy

time_delta = time.time() - start_time

for model_name, score in results.items():
    print(f"Test Accuracy for {model_name}: {score}")

print("Run time:", time_delta)


Test Accuracy for Random Forest: 0.9402588555858311
Test Accuracy for Logistic Regression: 0.6722070844686648
Run time: 49.34344124794006


Note: Run time: 49s

Third Dataset (581012 rows × 54 columns)
---

Paralel
- - -

In [None]:
from sklearn.datasets import fetch_covtype

# Initialize Ray
ray.init(ignore_reinit_error=True)

# Load the dataset
data = fetch_covtype()
X = data.data
y = data.target

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

models = {
    'Random Forest': RandomForestClassifier(random_state=42),
    'Logistic Regression': LogisticRegression(max_iter=200, random_state=42)
}

start_time = time.time()

@ray.remote
def train_and_evaluate_model(model_name, X_train, y_train, X_test, y_test):
    if model_name == 'Random Forest':
        model = RandomForestClassifier(random_state=42)
    elif model_name == 'Logistic Regression':
        model = LogisticRegression(max_iter=200, random_state=42)

    model.fit(X_train, y_train)

    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)

    return accuracy

# Initialize a list to store futures
futures = []

for model_name in models.keys():
    future = train_and_evaluate_model.remote(model_name, X_train, y_train, X_test, y_test)
    futures.append(future)

results = ray.get(futures)

time_delta = time.time() - start_time

for model_name, score in zip(models.keys(), results):
    print(f"Test Accuracy for {model_name}: {score}")

print("Run time:", time_delta)

# Shutdown Ray
ray.shutdown()


2024-07-11 02:57:38,732	INFO worker.py:1779 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m
[36m(train_and_evaluate_model pid=23755)[0m STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.
[36m(train_and_evaluate_model pid=23755)[0m 
[36m(train_and_evaluate_model pid=23755)[0m Increase the number of iterations (max_iter) or scale the data as shown in:
[36m(train_and_evaluate_model pid=23755)[0m     https://scikit-learn.org/stable/modules/preprocessing.html
[36m(train_and_evaluate_model pid=23755)[0m Please also refer to the documentation for alternative solver options:
[36m(train_and_evaluate_model pid=23755)[0m     https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
[36m(train_and_evaluate_model pid=23755)[0m   n_iter_i = _check_optimize_result(


Test Accuracy for Random Forest: 0.9551646687262807
Test Accuracy for Logistic Regression: 0.6462655869469807
Run time: 247.074312210083


Note : Run time: 247s

Serial
- - -

In [None]:
import time
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import fetch_covtype
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.ensemble import RandomForestClassifier

# Load the dataset
data = fetch_covtype()
X = data.data
y = data.target

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

models = {
    'Random Forest': RandomForestClassifier(random_state=42),
    'Logistic Regression': LogisticRegression(max_iter=200, random_state=42)
}

start_time = time.time()

# Dictionary to store results
results = {}

for model_name, model in models.items():
    model.fit(X_train, y_train)

    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    results[model_name] = accuracy

time_delta = time.time() - start_time

for model_name, score in results.items():
    print(f"Test Accuracy for {model_name}: {score}")

print("Run time:", time_delta)

Test Accuracy for Random Forest: 0.9551646687262807
Test Accuracy for Logistic Regression: 0.6450005593659371
Run time: 258.74831223487854


STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
  n_iter_i = _check_optimize_result(


Note : Run time: 259s

# Hasil Run time for Comparing Models

| Data | Dimension | Using Ray (s) | Serial (s) | Perbandingan | Peningkatan (%) |
|:----------:|:----------:|:----------:|:----------:|:----------:|:----------:|
| california_housing |20.640 rows × 8 columns  | 16.7 | 14.08 | 1.186x | -18.6 |
| job_application |73399 rows × 317 columns | 35 | 49 | 0.7x | 28.6 |
| covtype |581012 rows × 54 columns | 247 | 259 | 0.95x | 4.6 |

# Ray for Data Prepocessing

In [None]:
import ray
import pandas as pd


ray.init()

@ray.remote
def process_chunk(chunk):
    # Proses data pada chunk
    chunk['processed'] = chunk['value'] * 2
    return chunk


data = pd.DataFrame({
    'id': range(100),
    'value': np.random.randint(1, 100, size=100)
})

chunks = np.array_split(data, 10)

result_ids = [process_chunk.remote(chunk) for chunk in chunks]
results = ray.get(result_ids)

final_result = pd.concat(results)

ray.shutdown()

2024-07-11 04:17:26,188	INFO worker.py:1779 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


In [None]:
data.head()

Unnamed: 0,id,value
0,0,31
1,1,73
2,2,6
3,3,86
4,4,6


In [None]:
final_result.head()

Unnamed: 0,id,value,processed
0,0,31,62
1,1,73,146
2,2,6,12
3,3,86,172
4,4,6,12


# Conclusion

### Kelebihan Ray
1.   **Skalabiilitas**: Ray dirancang untuk menangani pemrosesan skala besar, memungkinkan untuk memanfaatkan banyak CPU dan GPU.
2.   **Manajemen Sumber Daya**: Ray memungkinkan pengalokasian sumber daya yan efisien selama pemrosesan tugas.

### Kekurangan Ray
1.  **Overhead Komunikasi**: Untuk tugas yang relatif sederhana atau skala kecil, keuntungan Ray mungkin tidak sebanding dengan overhead yang dihasilkan, sehingga dalam kasus ini, solusi non-paralel mungkin lebih efisien.
2.  **Kompleksitas Pengaturan**: Meskipun API nya sederhana, manajemen lingkungan eksekusi bisa jadi kompleks.


Ray adalah alat yang kuat untuk meningkatkan efisiensi dan kecepatan pemrosesan dalam skala besar dan kompleks. Tapi, penggunaannya perlu dipertimbangkan dengan hati-hati berdasarkan jenis tugas yang ingin dieksekusi. Ray lebih tepat dipakai untuk tugas yang bisa dibagi menjadi bagian-bagian paralel yang perlu pemrosesan besar-besaran, tapi mungkin kurang efisien untuk tugas-tugas yang lebih sederhana atau kecil.