Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/k8s executor #87

Open
wants to merge 44 commits into
base: feature/remote-worker
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
9a002c4
New K8s worker creator added
thiagoyeds Jul 9, 2020
b89f64f
Kubernetes Java library dependency added
thiagoyeds Jul 9, 2020
7a3962a
New worker type added to handle K8s
thiagoyeds Jul 9, 2020
85d7f51
Case dealing with added K8s
thiagoyeds Jul 9, 2020
a63d3bf
K8s constant class added
thiagoyeds Jul 9, 2020
aea8eea
K8s client class added
thiagoyeds Jul 9, 2020
2b2625d
K8s configuration class added
thiagoyeds Jul 9, 2020
f1917e1
New task executor added to handle K8s
thiagoyeds Jul 9, 2020
d2371c5
Arrebol K8s json example added
thiagoyeds Jul 9, 2020
d93d673
Fixed constants value
thiagoyeds Jul 9, 2020
db048f3
Confusion fixed when obtaining property
thiagoyeds Jul 9, 2020
2c8cdcf
New constant added to inform k8s cluster namespace to be used
thiagoyeds Jul 9, 2020
944a535
Fixed code to get new k8s cluster namespace info
thiagoyeds Jul 9, 2020
d98f19f
Arrebol k8s json example updated
thiagoyeds Jul 9, 2020
09bfcce
Class changed to K8s client interface
thiagoyeds Jul 9, 2020
2fe20a8
Default K8s client classe impl added
thiagoyeds Jul 9, 2020
b086adf
K8s cluster resource interface added
thiagoyeds Jul 9, 2020
0a49e47
Default K8s cluster resource class added
thiagoyeds Jul 9, 2020
88dc8e4
Method impl of create workers added
thiagoyeds Jul 9, 2020
515ce57
New constructors added in K8s task executor class
thiagoyeds Jul 9, 2020
2aa9b23
List jobs method removed from K8s client interface
thiagoyeds Jul 9, 2020
746ddc5
Command parameter type changed
thiagoyeds Jul 9, 2020
4ea7904
Api exception added in interface methods
thiagoyeds Jul 9, 2020
0352546
Worker type changed to K8sTaskExecutor
thiagoyeds Jul 10, 2020
c7305a6
New constant added to handle successful execution
thiagoyeds Jul 10, 2020
54eb4f8
Hibernate notations removed
thiagoyeds Jul 10, 2020
4b027e3
K8s task executor class impl added
thiagoyeds Jul 10, 2020
82f02a1
Fixed class for changes made to the K8s client interface
thiagoyeds Jul 10, 2020
fd1e1aa
New constant to volume name for used in jobs (opcional)
thiagoyeds Jul 10, 2020
e4e6188
Changed code to check new constant of volume name (opcional)
thiagoyeds Jul 10, 2020
9eb014e
New key added in k8s json example about volume name constant
thiagoyeds Jul 10, 2020
342a55d
Changed code to handle with new constant to volume name (opcional)
thiagoyeds Jul 10, 2020
ce9978c
Changed code to handle with new volume name key
thiagoyeds Jul 10, 2020
5109569
Unused imports removed
thiagoyeds Jul 10, 2020
968e02f
Fixed bug to handle get queue info (classes persistence)
thiagoyeds Jul 14, 2020
4d09024
BackoffLimit attribute added to the job pod
thiagoyeds Jul 20, 2020
b72eaaf
Fixed backoffLimit attibute value to 2
thiagoyeds Jul 20, 2020
8f90457
Implemented method of create workers with worker node param
thiagoyeds Jul 20, 2020
5f3f00a
TODO added in k8s worker specification about requeriments (CPU and RAM)
thiagoyeds Aug 7, 2020
fd932ac
Improved methods name for better
thiagoyeds Aug 7, 2020
3d2100f
Improved var name for better
thiagoyeds Aug 7, 2020
9b1deb0
Fixed broken indentation
thiagoyeds Aug 7, 2020
1e49733
Removed unused method
thiagoyeds Aug 7, 2020
4993498
Improved constants name for better
thiagoyeds Aug 7, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,12 @@
<artifactId>commons-cli</artifactId>
<version>1.4</version>
</dependency>

<dependency>
<groupId>io.kubernetes</groupId>
<artifactId>client-java</artifactId>
<version>8.0.2</version>
</dependency>

</dependencies>

Expand Down
6 changes: 5 additions & 1 deletion src/main/java/org/fogbowcloud/arrebol/ArrebolController.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.fogbowcloud.arrebol.execution.Worker;
import org.fogbowcloud.arrebol.execution.WorkerTypes;
import org.fogbowcloud.arrebol.execution.creator.DockerWorkerCreator;
import org.fogbowcloud.arrebol.execution.creator.K8sWorkerCreator;
import org.fogbowcloud.arrebol.execution.creator.RawWorkerCreator;
import org.fogbowcloud.arrebol.execution.creator.WorkerCreator;
import org.fogbowcloud.arrebol.models.command.CommandState;
Expand All @@ -32,6 +33,7 @@
import org.fogbowcloud.arrebol.scheduler.DefaultScheduler;
import org.fogbowcloud.arrebol.scheduler.FifoSchedulerPolicy;
import org.fogbowcloud.arrebol.utils.ConfValidator;
import org.hibernate.boot.registry.classloading.spi.ClassLoaderService.Work;
import org.springframework.stereotype.Component;

import java.io.BufferedReader;
Expand Down Expand Up @@ -188,7 +190,9 @@ private void buildWorkerCreator(Configuration configuration) throws Exception {
this.workerCreator = new DockerWorkerCreator(configuration);
} else if (poolType.equals(WorkerTypes.RAW.getType())) {
this.workerCreator = new RawWorkerCreator(configuration);
} else {
} else if (poolType.equals(WorkerTypes.K8S.getType())) {
this.workerCreator = new K8sWorkerCreator(configuration);
}else {
String poolTypeMsg = "Worker Pool Type configuration property wrong or missing. Please, verify your configuration file.";
throw new IllegalArgumentException(poolTypeMsg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ public class TaskExecutionResult {
// this class is more like an struct, not very much OO but it ok

public static final int UNDETERMINED_RESULT = Integer.MAX_VALUE;
public static final int SUCCESS_RESULT = 0;
private final RESULT taskResult;
private final int[] exitcodes;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

public enum WorkerTypes {
DOCKER("docker"),
RAW("raw");
RAW("raw"),
K8S("k8s");

private String type;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package org.fogbowcloud.arrebol.execution.creator;

import java.io.IOException;
import java.util.Collection;
import java.util.LinkedList;
import java.util.UUID;

import org.apache.log4j.Logger;
import org.fogbowcloud.arrebol.execution.Worker;
import org.fogbowcloud.arrebol.execution.k8s.K8sConfiguration;
import org.fogbowcloud.arrebol.execution.k8s.K8sTaskExecutor;
import org.fogbowcloud.arrebol.execution.k8s.client.DefaultK8sClient;
import org.fogbowcloud.arrebol.execution.k8s.client.K8sClient;
import org.fogbowcloud.arrebol.execution.k8s.resource.DefaultK8sClusterResource;
import org.fogbowcloud.arrebol.execution.k8s.resource.K8sClusterResource;
import org.fogbowcloud.arrebol.models.configuration.Configuration;
import org.fogbowcloud.arrebol.models.specification.Specification;
import org.fogbowcloud.arrebol.processor.spec.WorkerNode;
import org.fogbowcloud.arrebol.resource.MatchAnyWorker;
import org.fogbowcloud.arrebol.utils.AppUtil;

public class K8sWorkerCreator implements WorkerCreator {

private final K8sConfiguration configuration;
private final Logger LOGGER = Logger.getLogger(K8sWorkerCreator.class);

public K8sWorkerCreator(Configuration configuration) throws Exception {
this.configuration = new K8sConfiguration(configuration);
}

@Override
public Collection<Worker> createWorkers(Integer poolId) {
Collection<Worker> workers = new LinkedList<>();
String address = configuration.getAddress();
int poolSize = configuration.getCapacity();
String namespace = configuration.getNamespace();
String volumeName = configuration.getVolumeName();
for (int i = 0; i < poolSize; i++) {
LOGGER.info("Creating k8s worker with address=" + address);
Worker worker = createK8sWorker(poolId, address, namespace, volumeName);
workers.add(worker);
}

return workers;
}

@Override
public Collection<Worker> createWorkers(Integer poolId, WorkerNode workerNode) {
Collection<Worker> workers = new LinkedList<>();
String namespace = configuration.getNamespace();
String volumeName = configuration.getVolumeName();
for(int i = 0; i < workerNode.getPoolSize(); i++){
LOGGER.info("Creating k8s worker with address=" + workerNode.getAddress());
Worker worker = createK8sWorker(poolId, workerNode.getAddress(), namespace, volumeName);
workers.add(worker);
}
return workers;
}

private Worker createK8sWorker(Integer poolId, String address, String namespace, String volumeName) {
String id = "k8s-executor-" + UUID.randomUUID().toString();
K8sClusterResource k8sClusterResource = new DefaultK8sClusterResource(id, address);
//TODO Add some specification here, something about the CPU and RAM requirements maybe
Specification resourceSpec = null;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to make a note. Maybe we want it to be filled with the requirements we talked about, right? I mean, I think that this is the place we want to specify them.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discussed before, there are two ways to specify the amount of CPU and RAM consumed by the POD container in the k8s cluster worker, the minimum (requests) and the maximum (limits), I believe that here can enter the maximum information, but the minimum does not fit, as the client that details this information. I will add a TODO to add something to that specification in the future.


K8sClient k8sClient = null;
try {
k8sClient = new DefaultK8sClient(address, namespace, volumeName);
} catch (IOException e) {
LOGGER.error("Error while create k8s client in worker [" + id + "]", e);
}

K8sTaskExecutor k8sTaskExecutor = new K8sTaskExecutor(k8sClusterResource, k8sClient);
Worker worker = new MatchAnyWorker(AppUtil.generateUniqueStringId(), resourceSpec, poolId, k8sTaskExecutor);
LOGGER.info("Created K8s Worker [" + worker.getId() + "]");
return worker;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package org.fogbowcloud.arrebol.execution.k8s;

import org.fogbowcloud.arrebol.execution.k8s.constants.K8sConstants;
import org.fogbowcloud.arrebol.models.configuration.Configuration;
import org.fogbowcloud.arrebol.models.configuration.Property;

public class K8sConfiguration {
private final Integer capacity;
private final String address;
private final String namespace;
private final String volumeName;

public K8sConfiguration(Configuration configuration) throws Exception {
checkK8sConfigurationProperties(configuration);
Property<Double> capacity = configuration.getProperty(K8sConstants.K8S_CLUSTER_CAPACITY);
Property<String> address = configuration.getProperty(K8sConstants.K8S_CLUSTER_ADDRESS);
Property<String> namespace = configuration.getProperty(K8sConstants.K8S_CLUSTER_NAMESPACE);
Property<String> volumeName = configuration.getProperty(K8sConstants.K8S_CLUSTER_VOLUME_NAME);
this.capacity = capacity.getValue().intValue();
this.address = address.getValue();
this.namespace = namespace.getValue();
this.volumeName = volumeName.getValue();
}

private void checkK8sConfigurationProperties(Configuration configuration) throws Exception {
String verifyMsg = " Please, verify your configuration file.";
String capacityMsg = "K8s cluster capacity configuration property wrong or missing." + verifyMsg;
String addressMsg = "K8s cluster address configuration property wrong or missing." + verifyMsg;
String namespaceMsg = "K8s cluster namespace configuration property wrong or missing." + verifyMsg;

Property<Double> capacity = configuration.getProperty(K8sConstants.K8S_CLUSTER_CAPACITY);
Property<String> address = configuration.getProperty(K8sConstants.K8S_CLUSTER_ADDRESS);
Property<String> namespace = configuration.getProperty(K8sConstants.K8S_CLUSTER_NAMESPACE);

if (address.getValue() == null || address.getValue().trim().isEmpty()) {
throw new Exception(addressMsg);
} else if (capacity.getValue() == null || capacity.getValue().intValue() == 0) {
throw new Exception(capacityMsg);
} else if (namespace.getValue() == null || namespace.getValue().trim().isEmpty()) {
throw new Exception(namespaceMsg);
}
}

public Integer getCapacity() {
return capacity;
}

public String getAddress() {
return address;
}

public String getNamespace() {
return namespace;
}

public String getVolumeName() {
return volumeName;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package org.fogbowcloud.arrebol.execution.k8s;

import static java.lang.Thread.sleep;
import static org.fogbowcloud.arrebol.execution.docker.constants.DockerConstants.ADDRESS_METADATA_KEY;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import javax.persistence.CascadeType;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.OneToOne;
import javax.persistence.Transient;

import org.apache.log4j.Logger;
import org.fogbowcloud.arrebol.execution.TaskExecutionResult;
import org.fogbowcloud.arrebol.execution.TaskExecutor;
import org.fogbowcloud.arrebol.execution.TaskExecutionResult.RESULT;
import org.fogbowcloud.arrebol.execution.k8s.client.DefaultK8sClient;
import org.fogbowcloud.arrebol.execution.k8s.client.K8sClient;
import org.fogbowcloud.arrebol.execution.k8s.resource.DefaultK8sClusterResource;
import org.fogbowcloud.arrebol.execution.k8s.resource.K8sClusterResource;
import org.fogbowcloud.arrebol.models.command.Command;
import org.fogbowcloud.arrebol.models.command.CommandState;
import org.fogbowcloud.arrebol.models.task.RequirementsContants;
import org.fogbowcloud.arrebol.models.task.Task;

import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.models.V1Job;

@Entity
public class K8sTaskExecutor implements TaskExecutor {

@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Integer id;
@Transient
private final Logger LOGGER = Logger.getLogger(K8sTaskExecutor.class);
@OneToOne(cascade = CascadeType.ALL, orphanRemoval = true, targetEntity = DefaultK8sClusterResource.class)
private K8sClusterResource k8sClusterResource;
@OneToOne(cascade = CascadeType.ALL, orphanRemoval = true, targetEntity = DefaultK8sClient.class)
private K8sClient k8sClient;
@Transient
private static final long POOLING_PERIOD_TIME_MS = 5 * 1000;

public K8sTaskExecutor(K8sClusterResource k8sClusterResource, K8sClient k8sClient) {
this.k8sClusterResource = k8sClusterResource;
this.k8sClient = k8sClient;
}

public K8sTaskExecutor() {
}

@Override
public TaskExecutionResult execute(Task task) {
TaskExecutionResult taskExecutionResult = null;

String jobName = task.getId();

List<Command> commands = task.getTaskSpec().getCommands();
String command = joinCommands(commands);

Map<String, String> requirements = task.getTaskSpec().getRequirements();
String imageId = getImageId(requirements);

LOGGER.debug("Image id: " + imageId);
LOGGER.debug("Command: " + command);

int tasksListSize = task.getTaskSpec().getCommands().size();

try {
V1Job job = k8sClient.createJob(jobName, imageId, command);
boolean jobIsRunning = true;

while (jobIsRunning) {
try {
sleep(POOLING_PERIOD_TIME_MS);
} catch (InterruptedException e) {
LOGGER.error(e.getMessage(), e);
}

job = k8sClient.getJob(jobName);
jobIsRunning = job.getStatus().getActive() != null;
}

if (wasSuccessful(job)) {
finishSuccessfulExecution(task);
taskExecutionResult = getSuccessResultInstance(tasksListSize);
} else {
finishFailExecution(task);
taskExecutionResult = getFailResultInstance(tasksListSize);
}
} catch (ApiException e) {
LOGGER.error("Error while call K8s client API. " + e.getMessage(), e);
finishFailExecution(task);
taskExecutionResult = getFailResultInstance(tasksListSize);
return taskExecutionResult;
}

return taskExecutionResult;
}

@Override
public Map<String, String> getMetadata() {
Map<String, String> metadata = new HashMap<>();
String address = this.k8sClusterResource.getApiAddress();
metadata.put(ADDRESS_METADATA_KEY, address);
return metadata;
}

private String joinCommands(List<Command> commands) {
String result = "";
for (Command command : commands)
result += command.getCommand() + " && ";
result = result.substring(0, result.length() - 4);

return result;
}

private String getImageId(Map<String, String> requirements) throws IllegalArgumentException {
String imageId = null;
if (Objects.nonNull(requirements))
imageId = requirements.get(RequirementsContants.IMAGE_KEY);

return imageId;
}

private boolean wasSuccessful(V1Job job) {
Integer succeededAmount = job.getStatus().getSucceeded();
return succeededAmount != null && succeededAmount > 0;
}

private void finishSuccessfulExecution(Task task) {
for (Command c : task.getTaskSpec().getCommands()) {
c.setState(CommandState.FINISHED);
c.setExitcode(TaskExecutionResult.SUCCESS_RESULT);
}
}

private void finishFailExecution(Task task) {
for (Command c : task.getTaskSpec().getCommands()) {
c.setState(CommandState.FAILED);
c.setExitcode(TaskExecutionResult.UNDETERMINED_RESULT);
}
}

private TaskExecutionResult getFailResultInstance(int size) {
int[] exitCodes = new int[size];
Arrays.fill(exitCodes, TaskExecutionResult.UNDETERMINED_RESULT);
TaskExecutionResult taskExecutionResult = new TaskExecutionResult(RESULT.FAILURE, exitCodes);
return taskExecutionResult;
}

private TaskExecutionResult getSuccessResultInstance(int size) {
int[] exitCodes = new int[size];
Arrays.fill(exitCodes, TaskExecutionResult.SUCCESS_RESULT);
TaskExecutionResult taskExecutionResult = new TaskExecutionResult(RESULT.SUCCESS, exitCodes);
return taskExecutionResult;
}

}
Loading