Skip to content

Commit

Permalink
Titus: only poll once for every job and map v3 states to v2
Browse files Browse the repository at this point in the history
  • Loading branch information
tomaslin committed Mar 26, 2018
1 parent c923baa commit 8e2d9ec
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,9 @@ class TitusClusterCachingAgent implements CachingAgent, CustomScheduledAgent {
private Map<String, String> getTitusHealth(Job.TaskSummary task) {
TaskState taskState = task.state
HealthState healthState = HealthState.Unknown
if (taskState in [TaskState.STOPPED, TaskState.FAILED, TaskState.CRASHED, TaskState.FINISHED, TaskState.DEAD, TaskState.TERMINATING, TaskState.KILLINITIATED, TaskState.DISCONNECTED]) {
if (taskState in [TaskState.STOPPED, TaskState.FAILED, TaskState.CRASHED, TaskState.FINISHED, TaskState.DEAD, TaskState.TERMINATING]) {
healthState = HealthState.Down
} else if (taskState in [TaskState.STARTING, TaskState.DISPATCHED, TaskState.PENDING, TaskState.QUEUED, TaskState.ACCEPTED, TaskState.LAUNCHED, TaskState.STARTINITIATED]) {
} else if (taskState in [TaskState.STARTING, TaskState.DISPATCHED, TaskState.PENDING, TaskState.QUEUED]) {
healthState = HealthState.Starting
} else {
healthState = HealthState.Unknown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,12 @@ public Job(com.netflix.titus.grpc.protogen.Job grpcJob, List<com.netflix.titus.g

labels = grpcJob.getJobDescriptor().getAttributesMap();
user = grpcJob.getJobDescriptor().getOwner().getTeamEmail();
tasks = grpcTasks.stream().map( grpcTask -> new TaskSummary(grpcTask)).collect(Collectors.toList());

if(grpcTasks != null) {
tasks = grpcTasks.stream().map(grpcTask -> new TaskSummary(grpcTask)).collect(Collectors.toList());
} else {
tasks = new ArrayList<>();
}

appName = grpcJob.getJobDescriptor().getApplicationName();
name = grpcJob.getJobDescriptor().getAttributesOrDefault("name", appName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,27 @@ public enum TaskState {
ALL,
RUNNING, DISPATCHED, FAILED, STOPPED, CRASHED, FINISHED,
STARTING, QUEUED,
TERMINATING, DEAD, PENDING, // Deprecated
ACCEPTED, LAUNCHED, STARTINITIATED, STARTED, KILLINITIATED, DISCONNECTED;// v3
TERMINATING, DEAD, PENDING; // Deprecated

public static TaskState from(String taskStateStr) {
for (TaskState taskState : TaskState.values()) {
if (taskState.name().equalsIgnoreCase(taskStateStr)) return taskState;
if (taskState.name().equals(taskStateStr)) return taskState;
}
switch (taskStateStr) {
case "Accepted":
return TaskState.QUEUED;
case "Launched":
return TaskState.DISPATCHED;
case "StartInitiated":
return TaskState.STARTING;
case "Started":
return TaskState.RUNNING;
case "KillInitiated":
case "Disconnected":
case "Finished":
return TaskState.FINISHED;
default:
return null;
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -101,7 +102,7 @@ public RegionScopedV3TitusClient(TitusRegion titusRegion,

@Override
public Job getJob(String jobId) {
return new Job(grpcBlockingStub.findJob(JobId.newBuilder().setId(jobId).build()), getTasks(jobId));
return new Job(grpcBlockingStub.findJob(JobId.newBuilder().setId(jobId).build()), getTasks(Arrays.asList(jobId)).get(jobId));
}

@Override
Expand Down Expand Up @@ -207,32 +208,36 @@ private List<Job> getJobs(JobQuery.Builder jobQuery) {
int currentPage = 0;
int totalPages;
List<Job> jobs = new ArrayList<>();
List<com.netflix.titus.grpc.protogen.Job> grpcJobs = new ArrayList<>();
do {
jobQuery.setPage(Page.newBuilder().setPageNumber(currentPage).setPageSize(100));
JobQuery criteria = jobQuery.build();
JobQueryResult resultPage = grpcBlockingStub.findJobs(criteria);
jobs.addAll(resultPage.getItemsList().stream().map(grpcJob -> new Job(grpcJob, getTasks(grpcJob.getId()))).collect(Collectors.toList()));
grpcJobs.addAll(resultPage.getItemsList());
totalPages = resultPage.getPagination().getTotalPages();
currentPage++;
} while (totalPages > currentPage);
return jobs;
List<String> jobIds = grpcJobs.stream().map(grpcJob -> grpcJob.getId()).collect(
Collectors.toList()
);
Map<String, List<com.netflix.titus.grpc.protogen.Task>> tasks = getTasks(jobIds);
return grpcJobs.stream().map(grpcJob -> new Job(grpcJob, tasks.get(grpcJob.getId()))).collect(Collectors.toList());
}

private List<com.netflix.titus.grpc.protogen.Task> getTasks(String jobId) {
private Map<String, List<com.netflix.titus.grpc.protogen.Task>> getTasks(List<String> jobIds) {
List<com.netflix.titus.grpc.protogen.Task> tasks = new ArrayList<>();
TaskQueryResult taskResults;
int currentTaskPage = 0;
do {
taskResults = grpcBlockingStub.findTasks(
TaskQuery.newBuilder()
.putFilteringCriteria("jobIds", jobId)
.putFilteringCriteria("jobIds", jobIds.stream().collect(Collectors.joining(",")))
.setPage(Page.newBuilder().setPageNumber(currentTaskPage).setPageSize(100)
).build()
);
tasks.addAll(taskResults.getItemsList());
currentTaskPage++;
} while (taskResults.getPagination().getHasMore());
return tasks;
return tasks.stream().collect(Collectors.groupingBy(task -> task.getJobId()));
}

}

0 comments on commit 8e2d9ec

Please sign in to comment.