# integrate.ai API Sample Notebook to run tasks with an AWS task runner.

This is an example notebook that demonstrates creating taskbuilders and running tasks using an AWS task runner. 
For details about required setup and configuration for task runners, see [Using integrate.ai](https://documentation.integrateai.net/#using-integrate-ai).

## Setup
### Set environment variables (or replace inline) with your IAI credentials
Generate and manage this token in the UI, in the Tokens page. 

In [1]:
from integrate_ai_sdk.api import connect
import os

IAI_TOKEN = "
client = connect(token=IAI_TOKEN)


### Set your AWS variables

**Important: The task runner expects your data to be in the bucket that was created when the task runner was provisioned.**

This bucket name takes the form of: `s3://{aws_taskrunner_profile}-{aws_taskrunner_name}.integrate.ai`

For example: `myworkspace-mytaskrunner.integrate.ai`

You can download sample data from the integrate.ai sample bucket [s3://public.s3.integrate.ai/integrate_ai_examples/vfl/](s3://public.s3.integrate.ai/integrate_ai_examples/vfl/).

In [2]:
aws_taskrunner_profile = "<myworkspace>" # This is your workspace name
aws_taskrunner_name = "<mytaskrunner>" # Task runner name - must match what was supplied in UI to create task runner

base_aws_bucket = f'{aws_taskrunner_profile}-{aws_taskrunner_name}.integrate.ai'

# Example datapaths. Make sure that the data you want to work with exists in the base_aws_bucket for your task runner.
# HFL datapaths
train_path1 = f's3://{base_aws_bucket}/synthetic/train_silo0.parquet'
test_path1 = f's3://{base_aws_bucket}/synthetic/test.parquet'
train_path2 = f's3://{base_aws_bucket}/synthetic/train_silo1.parquet'
test_path2 = f's3://{base_aws_bucket}/synthetic/test.parquet'

#EDA/PRL/VFL datapaths
active_train_path = f's3://{base_aws_bucket}/synthetic_prl_vfl/active_train.parquet'
active_test_path = f's3://{base_aws_bucket}/synthetic_prl_vfl/active_test.parquet'
passive_train_path = f's3://{base_aws_bucket}/synthetic_prl_vfl/passive_train.parquet'
passive_test_path = f's3://{base_aws_bucket}/synthetic_prl_vfl/passive_test.parquet'

#Where to store the trained model
aws_storage_path = f's3://{base_aws_bucket}/model'

#Where to store VFL predictions - must be full path and file name
vfl_predict_active_storage_path = f's3://{base_aws_bucket}/vfl_predict/active_predictions.csv'
vfl_predict_passive_storage_path = f's3://{base_aws_bucket}/vfl_predict/passive_predictions.csv'

base_aws_bucket #Prints the base_aws_bucket name for reference

'<myworkspace>-<mytaskrunner>.integrate.ai'

### Set up the taskbuilder 

Specify the task runner name (e.g. `mytaskrunner`) as `task_runner_id`. 

In [3]:
import integrate_ai_sdk
from integrate_ai_sdk.taskgroup.taskbuilder.integrate_ai import IntegrateAiTaskBuilder
from integrate_ai_sdk.taskgroup.base import SessionTaskGroup
from integrate_ai_sdk.taskgroup.taskbuilder import taskrunner_context
from integrate_ai_sdk.taskgroup.taskbuilder.integrate_ai import IntegrateAiTaskBuilder
from integrate_ai_sdk.taskgroup.base import SessionTaskGroup

iai_tb_aws = IntegrateAiTaskBuilder(client=client,
   task_runner_id=aws_taskrunner_name)

# Task 1: Perform EDA in Individual Mode

This example task demonstrates how to run an exploratory data analysis (EDA) session in Individual mode.

In [4]:
# Specify the dataset configuration

dataset_config = {"dataset_one": [], "dataset_two": []}

In [5]:
# Create and start the EDA session 

eda_session = client.create_eda_session(
    name="Testing notebook - EDA",
    description="I am testing EDA session creation with a task runner through a notebook",
    data_config=dataset_config,
    startup_mode="external"
).start()

eda_session.id  #Prints the EDA session ID for reference

'1e4a4add14'

In [None]:
# Create a task group with one task for the server, and one for each of the clients joining the session

eda_task_group_context = (
        SessionTaskGroup(eda_session) \
        .add_task(iai_tb_aws.fls(storage_path=aws_storage_path))\
        .add_task(iai_tb_aws.eda(dataset_name="dataset_one", dataset_path=active_train_path))\
        .add_task(iai_tb_aws.eda(dataset_name="dataset_two", dataset_path=passive_train_path))\
        .start()
    )

In [None]:
# Check the task group status

import json

for i in eda_task_group_context.contexts.values():
    print(json.dumps(i.status(), indent=4))

eda_task_group_context.monitor_task_logs()

In [None]:
# Wait for the tasks to complete (success = True)

task_group_context.wait(60*5, 2)

### Task 1: EDA session complete

In [None]:
# Session complete, retrieve the results 

results = eda_session.results()["dataset_one", "dataset_two"]
results

In [None]:
results.mean().shape

In [None]:
dataset_one = eda_session.results()["dataset_one"]
dataset_one_count = dataset_one["x0"].count()
dataset_one["x0"].mean()

# Task 2: Create an HFL FFNet Training Session

The documentation for [creating a session](https://documentation.integrateai.net/#tutorial-ffnet-model-training-with-a-sample-local-dataset-iai_ffnet) gives additional context into the parameters that are used during training session creation.<br />
For this session we are going to be using two training clients and two rounds. 

You can find the model config and data schema details in the [integrate.ai end user tutorial](https://documentation.integrateai.net/#understanding-models).

In [None]:
# Specify the model and data configurations

model_config = {
    "experiment_name": "test_synthetic_tabular",
    "experiment_description": "test_synthetic_tabular",
    "strategy": {"name": "FedAvg", "params": {}},
    "model": {"params": {"input_size": 15, "hidden_layer_sizes": [6, 6, 6], "output_size": 2}},
    "balance_train_datasets": False,
    "ml_task": {
        "type": "classification",
        "params": {
            "loss_weights": None,
        },
    },
    "optimizer": {"name": "SGD", "params": {"learning_rate": 0.2, "momentum": 0.0}},
    "differential_privacy_params": {"epsilon": 4, "max_grad_norm": 7},
    "save_best_model": {
        "metric": "loss",  # to disable this and save model from the last round, set to None
        "mode": "min",
    },
    "seed": 23,  # for reproducibility
}

data_schema = {
    "predictors": ["x0", "x1", "x2", "x3", "x4", "x5", "x6", "x7", "x8", "x9", "x10", "x11", "x12", "x13", "x14"],
    "target": "y",
}

In [None]:
# Create and start the training session

training_session = client.create_fl_session(
    name="Testing notebook",
    description="I am testing session creation with a task runner through a notebook",
    min_num_clients=2,
    num_rounds=2,
    package_name="iai_ffnet",
    model_config=model_config,
    data_config=data_schema,
    startup_mode="external"
).start()

training_session.id # Prints the training session ID for reference

In [None]:
# Create a task group with one task for the server, and one for each of the clients joining the session

task_group = (
    SessionTaskGroup(training_session)
    .add_task(iai_tb_aws.fls(storage_path=aws_storage_path))\
    .add_task(iai_tb_aws.hfl(train_path=train_path1, test_path=test_path1, use_gpu=False))\
    .add_task(iai_tb_aws.hfl(train_path=train_path2, test_path=test_path2, use_gpu=False))
)

In [None]:
task_group_context = task_group.start()

In [None]:
# Monitor the submitted tasks

for i in task_group_context.contexts.values():
    print(json.dumps(i.status(), indent=4))

task_group_context.monitor_task_logs()

In [None]:
# Wait for the tasks to complete (success = True)

task_group_context.wait(60*5, 2)

### Task 2: HFL Session Complete!
Now you can view the training metrics and start making predictions

In [None]:
# Retrieve the session metrics

training_session.metrics().as_dict()

In [None]:
# Plot the session metrics

fig = training_session.metrics().plot()

# Task 3: Create a PRL Session for linking two or more datasets

To create a PRL session, specify a `dataset_config` dictionary indicating the client names and columns to use as identifiers to link the datasets to each other. The number of expected clients will be inferred as the number of items in dataset_config (i.e., two). These client names are referenced for the compute on the PRL session and for any sessions that use the PRL session downstream.

For this session, two clients are going to be providing data. Client 1 and client 2 are naming their clients client_1 and client_2 respectively. Their datasets will be linked by the "id" column in any provided datasets.

In [None]:
# Specify PRL dataset configuration 

prl_data_config = {
    "clients": {
        "active_client": {"id_columns": ["id"]},
        "passive_client": {"id_columns": ["id"]},
    }
}

In [None]:
# Create and start PRL session

prl_session = client.create_prl_session(
    name="Testing notebook - PRL",
    description="I am testing PRL session creation with a task runner through a notebook",
    data_config=prl_data_config,
    startup_mode="external"
).start()

prl_session.id #Prints the session ID for reference

In [None]:
# Create a task group with one task for the server, and one for each of the clients joining the session

task_group = (SessionTaskGroup(prl_session)\
.add_task(iai_tb_aws.fls(storage_path=aws_storage_path))\
.add_task(iai_tb_aws.prl(train_path=active_train_path, test_path=active_test_path, client_name="active_client"))\
.add_task(iai_tb_aws.prl(train_path=passive_train_path, test_path=passive_test_path, client_name="passive_client"))
)

task_group_context = task_group.start()

In [None]:
#Check the status of the task group

for i in task_group_context.contexts.values():
    print(json.dumps(i.status(), indent=4))

task_group_context.monitor_task_logs()

In [None]:
# Wait for the tasks to complete (success = True)

task_group_context.wait(60*5, 2)

### Task 3: PRL Session Complete!
Now you can view the overlap stats for the datasets.

In [None]:
# View PRL session metrics

metrics = prl_session.metrics().as_dict()
metrics

## Task 3a: Create a VFL Training Session using the PRL session from Task 3

To create a VFL train session, specify the `prl_session_id` indicating the session you just ran to link the datasets together. The `vfl_mode` needs to be set to `train`.

For more information about vertical federated learning with a SplitNN model strategy, see [VFL SplitNN Model Trianing](https://documentation.integrateai.net/#vfl-splitnn-model-training). 

In [None]:
# Specify the model and data configurations

model_config = {
    "strategy": {"name": "SplitNN", "params": {}},
    "model": {
        "feature_models": {
            "passive_client": {"params": {"input_size": 7, "hidden_layer_sizes": [6], "output_size": 5}},
            "active_client": {"params": {"input_size": 8, "hidden_layer_sizes": [6], "output_size": 5}},
        },
        "label_model": {"params": {"hidden_layer_sizes": [5], "output_size": 2}},
    },
    "ml_task": {
        "type": "classification",
        "params": {
            "loss_weights": None,
        },
    },
    "optimizer": {"name": "SGD", "params": {"learning_rate": 0.2, "momentum": 0.0}},
    "seed": 23,  # for reproducibility
}

data_config = {
        "passive_client": {
            "label_client": False,
            "predictors": ["x1", "x3", "x5", "x7", "x9", "x11", "x13"],
            "target": None,
        },
        "active_client": {
            "label_client": True,
            "predictors": ["x0", "x2", "x4", "x6", "x8", "x10", "x12", "x14"],
            "target": "y",
        },
    }

In [None]:
# Create and start a VFL training session

vfl_train_session = client.create_vfl_session(
    name="Testing notebook - VFL Train",
    description="I am testing VFL Train session creation with a task runner through a notebook",
    prl_session_id=prl_session.id,
    vfl_mode='train',
    min_num_clients=2,
    num_rounds=2,
    package_name="iai_ffnet",
    data_config=data_config,
    model_config=model_config,
    startup_mode="external"
).start()

vfl_train_session.id    #Prints the session ID for reference

In [None]:
# Create and start a task group with one task for the server, and one for each of the clients joining the session

vfl_task_group_context = (SessionTaskGroup(vfl_train_session)\
    .add_task(iai_tb_aws.fls(storage_path=aws_storage_path))\
    .add_task(iai_tb_aws.vfl_train(train_path=active_train_path, 
                                    test_path=active_test_path, 
                                    batch_size=1024,  
                                    client_name="active_client", 
                                    storage_path=aws_storage_path))\
    .add_task(iai_tb_aws.vfl_train(train_path=passive_train_path, 
                                    test_path=passive_test_path, 
                                    batch_size=1024, 
                                    client_name="passive_client", 
                                    storage_path=aws_storage_path))\
    .start())


In [None]:
# Check the status of the tasks

for i in vfl_task_group_context.contexts.values():
    print(json.dumps(i.status(), indent=4))

vfl_task_group_context.monitor_task_logs()

In [None]:
# Wait for the tasks to complete (success = True)

vfl_task_group_context.wait(60*5, 2)

### Task 3a: VFL Session Complete!
Now you can view the VFL training metrics and start making predictions

In [None]:
metrics = vfl_train_session.metrics().as_dict()
metrics

In [None]:
fig = vfl_train_session.metrics().plot()

## Task 3b: Make a Prediction on the trained VFL model

To create a VFL predict session, specify the `prl_session_id` indicating the session you ran to link the datasets together. You also need the `training_id` of the VFL train session that was run using the same `prl_session_id`. 

The `vfl_mode` must be set to `predict`.

In [None]:
# Create and start a VFL predict session

vfl_predict_session = client.create_vfl_session(
    name="Testing notebook - VFL Predict",
    description="I am testing VFL Predict session creation with an AWS task runner through a notebook",
    prl_session_id=prl_session.id,
    training_session_id=vfl_train_session.id,
    vfl_mode="predict",
    data_config=data_config,
    startup_mode="external"
).start()

vfl_predict_session.id  # Prints the session ID for reference

In [None]:
# Create and start a task group with one task for the server, and one for each of the clients joining the session

vfl_predict_task_group_context = (SessionTaskGroup(vfl_predict_session)\
.add_task(iai_tb_aws.fls(storage_path=aws_storage_path))\
.add_task(iai_tb_aws.vfl_predict(
        client_name="active_client", 
        dataset_path=active_test_path, 
        raw_output=True,
        batch_size=1024, 
        storage_path=vfl_predict_active_storage_path))\
.add_task(iai_tb_aws.vfl_predict(
        client_name="passive_client",
        dataset_path=passive_test_path,
        batch_size=1024,
        raw_output=True,
        storage_path=vfl_predict_passive_storage_path))\
.start())

In [None]:
# Check the status of the tasks

for i in vfl_predict_task_group_context.contexts.values():
    print(json.dumps(i.status(), indent=4))

vfl_predict_task_group_context.monitor_task_logs()

In [None]:
# Wait for the tasks to complete (success = True)

vfl_predict_task_group_context.wait(60*5, 2)

### Task 3b: VFL Predict Session Complete!

Now you can view the VFL predictions and evaluate the performance as needed

In [None]:
# Retrieve the metrics

metrics = vfl_predict_session.metrics().as_dict()
metrics

In [None]:
import pandas as pd

if TEST_MODE in ['aws_azure', 'aws']:
    print(vfl_predict_active_storage_path)
    df_pred = pd.read_csv(vfl_predict_active_storage_path)
else:
    print(vfl_predict_active_storage_path_az)
    with smart_open.open(vfl_predict_active_storage_path_az, 
                         "r", 
                         transport_params=_smart_open_transport_params(vfl_predict_active_storage_path_az)) as f:
        df_pred = pd.read_csv(f)

df_pred.head()

# Task 4: Using a custom model in an HFL session

Choose a name for your custom model, and set the path for the model and data configurations. For more information about creating a custom model, see 

Note that the name for your custom model must be unique. This means that the name for your custom model cannot already be in the Package Name column of the Custom Models Packages Table in the Model Library Page of the UI.

In [None]:
from datetime import datetime

print(package_path)
print(package_name)

In [None]:
import json

with open(model_config_path, "r") as f:
    lstm_model_config = json.load(f)

with open(data_config_path, "r") as f:
    data_schema = json.load(f)

### Upload customized model

In [None]:
client.upload_model(
    package_path=package_path,
    dataset_path=local_dataset_path,
    package_name=package_name,
    sample_model_config_path=model_config_path,
    sample_data_config_path=data_config_path,
    batch_size=256,
    task="classification",
    test_only=False,
    description="A custom LSTM model.",
)

### Create an HFL session with the custom package

In [None]:
# Specify the model configuration

model_config = {
    "strategy": {"name": "FedAvg", "params": {}},
    "model": {"params": lstm_model_config},
    "ml_task": {"type": "classification", "params": {}},
    "optimizer": {"name": "SGD", "params": {"learning_rate": 0.9, "momentum": 0.9}},
    "differential_privacy_params": {"epsilon": 4, "max_grad_norm": 7},
    "seed": 23,  # for reproducibility
}

In [None]:
# Create and start the training session

training_session = client.create_fl_session(
    name="LSTM custom model notebook",
    description="Training a custom LSTM model using a jupyter notebook.",
    min_num_clients=2,
    num_rounds=5,
    package_name=package_name,
    model_config=model_config,
    data_config=data_schema,
    startup_mode="external"
).start()

training_session.id     #Prints the session ID for reference

In [None]:
# Create and start a task group with one task for the server, and one for each of the clients joining the session

task_group_context = (
    SessionTaskGroup(training_session)
    .add_task(iai_tb_aws.fls(storage_path=aws_storage_path))
    .add_task(iai_tb_aws.hfl(train_path=aws_dataset_path, test_path=aws_dataset_path, approve_custom_package=True))
    .add_task(iai_tb_aws.hfl(train_path=aws_dataset_path, test_path=aws_dataset_path, approve_custom_package=True))
    .start()
)

In [None]:
for i in task_group_context.contexts.values():
    print(json.dumps(i.status(), indent=4))
task_group_context.monitor_task_logs()

In [None]:
task_group_context.wait(60*5, 2)

### Task 4: Custom model session complete!

Now you can retrieve the metrics for the session.

In [None]:
training_session.metrics().as_dict()

# Task 5: Create a linear inference session

In [None]:
# Specify the model and data configurations

model_config_logit = {
    "strategy": {"name": "LogitRegInference", "params": {}},
    "seed": 23,  # for reproducibility
}

data_config_logit = {
    "target": "y",
    "shared_predictors": ["x1", "x2"],
    "chunked_predictors": ["x0", "x3", "x10", "x11"]
}

In [None]:
# Create and start a linear inference session 

training_session_logit = client.create_fl_session(
    name="Testing linear inference session",
    description="I am testing linear inference session creation using a task runner through a notebook",
    min_num_clients=2,
    num_rounds=5,
    package_name="iai_linear_inference",
    model_config=model_config_logit,
    data_config=data_config_logit,
    startup_mode="external"
).start()

training_session_logit.id

In [None]:
#Create a task group

task_group_context = (
    SessionTaskGroup(training_session_logit)
    .add_task(iai_tb_aws.fls(storage_path=aws_storage_path))\
    .add_task(iai_tb_aws.hfl(train_path=train_path1, test_path=test_path1))
    .add_task(iai_tb_aws.hfl(train_path=train_path2_aws, test_path=test_path2_aws)).start()
)


In [None]:
for i in task_group_context.contexts.values():
    print(json.dumps(i.status(), indent=4))
task_group_context.monitor_task_logs()

In [None]:
task_group_context.wait(60*5, 2)