### Import libraries

In [None]:
import os
import getpass
import proactive

### Creating the gateway

In [None]:
print("Logging on proactive-server 1...")
proactive_host = 'try.activeeon.com'
proactive_port = '8080'
proactive_url  = "http://"+proactive_host+":"+proactive_port
print("Creating gateway ")
gateway = proactive.ProActiveGateway(proactive_url, debug=False)
print("Gateway created")

### Connecting

In [None]:
print("Connecting on: " + proactive_url)
gateway.connect(username="YOUR_LOGIN", password="YOUR_PASSWORD")  # put your login here!
# Or uncomment the following line to protect your password
# gateway.connect(username="", password=getpass.getpass(prompt='Password: '))
assert gateway.isConnected() is True
print("Connected")

### Creating a fork environment

In [None]:
print("Adding a fork environment to the import task...")
proactive_fork_env = gateway.createDefaultForkEnvironment()
proactive_fork_env.setImplementationFromFile("./fork_env_shared.py")
print("Fork environment created.")

### Creating task _import_

In [None]:
print("Creating the initialization task...")
init_task = gateway.createPythonTask()
init_task.setTaskName("init")
init_task.setTaskImplementation("""import pandas as pd
import pickle
import bz2
from sklearn.datasets import load_boston
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression

boston = load_boston()
dataframe_load = pd.DataFrame(boston.data)
dataframe_load.columns = boston.feature_names 
data_label = boston.target
dataframe = dataframe_load.assign(LABEL=data_label)

dataframe_json = dataframe.to_json(orient='split').encode()
compressed_data = bz2.compress(dataframe_json)
dataframe.head()

variables.put("dataframe_json", dataframe_json)""")
init_task.setForkEnvironment(proactive_fork_env)
print("Task created.")

### Creating task _split_

In [None]:
print("Creating the initialization task...")
split_task = gateway.createPythonTask()
split_task.setTaskName("split")
split_task.setTaskImplementation("""import pandas as pd
import pickle
import bz2
from sklearn.datasets import load_boston
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression

dataframe_json = variables.get("dataframe_json")

dataframe = pd.read_json(dataframe_json, orient='split')

X_train, X_test = train_test_split(dataframe, test_size=30)

X_train_json = X_train.to_json(orient='split').encode()
X_test_json = X_test.to_json(orient='split').encode()

variables.put("X_train_json", X_train_json)
variables.put("X_test_json", X_test_json)""")
split_task.setForkEnvironment(proactive_fork_env)
split_task.addDependency(init_task)
print("Task created.")

### Creating task _train_

In [None]:
print("Creating the initialization task...")
train_task = gateway.createPythonTask()
train_task.setTaskName("train")
train_task.setTaskImplementation("""import pandas as pd
import pickle
import bz2
from sklearn.datasets import load_boston
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression

X_train_json = variables.get("X_train_json")

# Fit the model on 33%
columns="LABEL"
X_train = pd.read_json(X_train_json, orient='split')
dataframe_train = X_train.drop(columns, axis=1, inplace=False)
dataframe_label = X_train.filter(columns, axis=1)
model = LinearRegression()
model.fit(dataframe_train, dataframe_label)
# save the model to disk
filename = '/shared/finalized_model.sav'
pickle.dump(model, open(filename, 'wb'))

variables.put("filename", filename)
variables.put("columns", columns)""")
train_task.setForkEnvironment(proactive_fork_env)
train_task.addDependency(split_task)
print("Task created.")

### Creating task _predict_

In [None]:
print("Creating the initialization task...")
predict_task = gateway.createPythonTask()
predict_task.setTaskName("predict")
predict_task.setTaskImplementation("""import pandas as pd
import pickle
import bz2
from sklearn.datasets import load_boston
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression

columns = variables.get("columns")
filename = variables.get("filename")
X_test_json = variables.get("X_test_json")

X_test = pd.read_json(X_test_json, orient='split')

dataframe_test = X_test.drop(columns, axis=1, inplace=False)
dataframe_label = X_test.filter(columns, axis=1)
loaded_model = pickle.load(open(filename, 'rb'))
result = loaded_model.score(dataframe_test, dataframe_label)

print('The prediction result is = ' + str(result))""")
predict_task.setForkEnvironment(proactive_fork_env)
predict_task.addDependency(split_task)
predict_task.addDependency(train_task)
print("Task created.")

### Creating the workflow

In [None]:
print("Creating a proactive job...")
proactive_job = gateway.createJob()
proactive_job.setJobName("PythonMLWorkflow")
print("Job created.")

### Adding tasks to the workflow

In [None]:
print("Adding tasks to the job...")
proactive_job.addTask(init_task)
proactive_job.addTask(split_task)
proactive_job.addTask(train_task)
proactive_job.addTask(predict_task)
print("Tasks added.")

### Submitting the job to the scheduler

In [None]:
print("Submitting the job to the proactive scheduler...")
job_id = gateway.submitJob(proactive_job, debug=False)
print("job_id: " + str(job_id))

### Getting job results and outputs

In [None]:
print("Getting job results...")
job_result = gateway.getJobResult(job_id)
print(job_result)
print("Getting job outputs...")
job_outputs = gateway.printJobOutput(job_id)
print(job_outputs)

### Disconnecting

In [None]:
print("Disconnecting")
gateway.disconnect()
print("Disconnected")
gateway.terminate()
print("Finished")