# Ray Cluster

In [None]:
# Import pieces from codeflare-sdk
from codeflare_sdk.cluster.cluster import Cluster, ClusterConfiguration
from codeflare_sdk.cluster.auth import TokenAuthentication

In [None]:
# Create authentication object for user permissions
# IF unused, SDK will automatically check for default kubeconfig, then in-cluster config
# KubeConfigFileAuthentication can also be used to specify kubeconfig path manually
auth = TokenAuthentication(
    token = "",
    server = "",
    skip_tls=True
)
auth.login()

In [None]:
# ENTER YOUR USERNAME HERE
NAMESPACE = !cat /var/run/secrets/kubernetes.io/serviceaccount/namespace
NAMESPACE = NAMESPACE[0]

In [None]:
# Create and configure our cluster object (and appwrapper)
cluster = Cluster(ClusterConfiguration(
    name='raytest',
    namespace=NAMESPACE,
    num_workers=2,
    min_cpus=4,
    max_cpus=4,
    min_memory=8,
    max_memory=8,
    num_gpus=0,
    image="quay.io/project-codeflare/ray:latest-py39-cu118",
    instascale=False
))

In [None]:
import update_yaml
update_yaml.namespace_specific_yaml(NAMESPACE)
!cp raytest.yaml ~/.codeflare/appwrapper/raytest.yaml

In [None]:
# Bring up the cluster
cluster.up()
cluster.wait_ready()

In [None]:
cluster.details()

In [None]:
ray_dashboard_uri = cluster.cluster_dashboard_uri()
ray_cluster_uri = cluster.cluster_uri()
print(ray_dashboard_uri)
print(ray_cluster_uri)

# Data

In [None]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.utils import class_weight
import pickle
from pathlib import Path

In [None]:
Data = pd.read_csv('data/card_transdata.csv')
Data.head()

In [None]:
# Set the input (X) and output (Y) data. 
# The only output data is whether it's fraudulent. All other fields are inputs to the model.

X = Data.drop(columns = ['repeat_retailer','distance_from_home', 'fraud'])
y = Data['fraud']

# Split the data into training and testing sets so you have something to test the trained model with.

# X_train, X_test, y_train, y_test = train_test_split(X,y, test_size = 0.2, stratify = y)
X_train, X_test, y_train, y_test = train_test_split(X,y, test_size = 0.2, shuffle = False)

X_train, X_val, y_train, y_val = train_test_split(X_train,y_train, test_size = 0.2, stratify = y_train)

# Scale the data to remove mean and have unit variance. The data will be between -1 and 1, which makes it a lot easier for the model to learn than random (and potentially large) values.
# It is important to only fit the scaler to the training data, otherwise you are leaking information about the global distribution of variables (which is influenced by the test set) into the training set.

scaler = StandardScaler()

X_train = scaler.fit_transform(X_train.values)

# Since the dataset is unbalanced (it has many more non-fraud transactions than fraudulent ones), set a class weight to weight the few fraudulent transactions higher than the many non-fraud transactions.
class_weights = class_weight.compute_class_weight('balanced',classes = np.unique(y_train),y = y_train)
class_weights = {i : class_weights[i] for i in range(len(class_weights))}

# Get S3 info

In [None]:
import boto3
from os import environ

s3_endpoint_url = environ.get('AWS_S3_ENDPOINT')
s3_access_key = environ.get('AWS_ACCESS_KEY_ID')
s3_secret_key = environ.get('AWS_SECRET_ACCESS_KEY')
s3_bucket_name = environ.get('AWS_S3_BUCKET')

# Training

In [None]:
!pip install pyarrow

In [None]:
#before proceeding make sure the cluster exists and the uri is not empty
assert ray_cluster_uri, "Ray cluster needs to be started and set before proceeding"

import ray
from ray.air.config import ScalingConfig

# reset the ray context in case there's already one. 
ray.shutdown()
# establish connection to ray cluster

#install additional libraries that will be required for model training
# with open('requirements.txt') as f:
#     requirements = f.read().splitlines()
runtime_env = {"pip": ["tensorflow==2.10"]}

# NOTE: This will work for in-cluster notebook servers (RHODS/ODH), but not for local machines
# To see how to connect from your laptop, go to demo-notebooks/additional-demos/local_interactive.ipynb
ray.init(address=ray_cluster_uri, runtime_env=runtime_env)

print("Ray cluster is up and running: ", ray.is_initialized())

In [None]:
@ray.remote
def train_fn():
    from keras.models import Sequential
    from keras.layers import Dense, Dropout, BatchNormalization, Activation
    from ray import train
    from ray.train import Checkpoint
    from ray.train.tensorflow import TensorflowTrainer
    from ray.train.tensorflow.keras import ReportCheckpointCallback
    import tensorflow as tf
    import tempfile
    import os
    
    def build_model():
        model = Sequential()
        model.add(Dense(32, activation = 'relu', input_dim = len(X.columns)))
        model.add(Dropout(0.2))
        model.add(Dense(32))
        model.add(BatchNormalization())
        model.add(Activation('relu'))
        model.add(Dropout(0.2))
        model.add(Dense(32))
        model.add(BatchNormalization())
        model.add(Activation('relu'))
        model.add(Dropout(0.2))
        model.add(Dense(1, activation = 'sigmoid'))
        return model
    
    def train_model(config: dict):
        batch_size = config.get("batch_size", 32)
        batch_size = batch_size * train.get_context().get_world_size()
        epochs = config.get("epochs", 2)
        
        # Build the model
        strategy = tf.distribute.MultiWorkerMirroredStrategy()
        with strategy.scope():
            model = build_model()
            model.compile(optimizer='adam',loss='binary_crossentropy',metrics=['accuracy'])
            model.summary()

        # Get the dataset
        dataset = train.get_dataset_shard("train")
        tf_dataset = dataset.to_tf(
            feature_columns="x", label_columns="y", batch_size=batch_size
        )

        # Train the model
        results = []
        history = model.fit(
            tf_dataset, 
            epochs=epochs,
            callbacks=[ReportCheckpointCallback()]
        )
        results.append(history.history)
        
        # Push results to S3
        s3 = boto3.client(
            's3', endpoint_url=s3_endpoint_url,
            aws_access_key_id=s3_access_key, aws_secret_access_key=s3_secret_key,
        )
        with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
            model_location = os.path.join(temp_checkpoint_dir, "model.keras")
            model.save(model_location)
            s3.upload_file(model_location, s3_bucket_name, 'model.keras')

        return results
    
    # Run everything distributed
    config = {"batch_size": 32, "epochs": 2}
    reshaped_dataset = [{"x": X_train[i], "y":y_train.values[i]} for i in range(len(X_train))]
    train_dataset = ray.data.from_items(reshaped_dataset)
    scaling_config = ScalingConfig(num_workers=2, use_gpu=False)
    
    trainer = TensorflowTrainer(
        train_loop_per_worker=train_model,
        train_loop_config=config,
        scaling_config=scaling_config,
        datasets={"train": train_dataset},
    )
    result = trainer.fit()

    return result

In [None]:
result = ray.get(train_fn.remote())

# Save model

In [None]:
!pip install onnx tf2onnx tensorflow

In [None]:
import tf2onnx
import onnx
import keras
import os

In [None]:
# Download the file we trained
s3 = boto3.client(
            's3', endpoint_url=s3_endpoint_url,
            aws_access_key_id=s3_access_key, aws_secret_access_key=s3_secret_key,
        )
s3.download_file(
        s3_bucket_name, "model.keras", 'model.keras'
    )

In [None]:
model = keras.models.load_model("model.keras")
model_proto, _ = tf2onnx.convert.from_keras(model)
os.makedirs("models/fraud", exist_ok=True)
onnx.save(model_proto, "models/fraud/model.onnx")

In [None]:
# And push it back up to S3
s3.upload_file("models/fraud/model.onnx", s3_bucket_name, 'model.onnx')

# Shut down cluster

In [None]:
cluster.down()

In [None]:
auth.logout()