Skip to content

Commit

Permalink
titus - first step of retry interceptor - just moves the deadline int…
Browse files Browse the repository at this point in the history
…o an interceptor
  • Loading branch information
tomaslin committed Mar 26, 2018
1 parent 35ae2f7 commit 75b0cd5
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2017 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License")
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.clouddriver.titus.v3client;

import io.grpc.*;

import java.util.concurrent.TimeUnit;

public class GrpcRetryInterceptor implements ClientInterceptor {

private static long deadline;

public GrpcRetryInterceptor(long deadline) {
this.deadline = deadline;
}

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
callOptions.withDeadlineAfter(deadline, TimeUnit.MILLISECONDS);
return next.newCall(method, callOptions);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

@Log
Expand Down Expand Up @@ -96,6 +95,7 @@ public RegionScopedV3TitusClient(TitusRegion titusRegion,
.negotiationType(NegotiationType.TLS)
.loadBalancerFactory(RoundRobinLoadBalancerFactory.getInstance())
.intercept(new GrpcMetricsInterceptor(registry, titusRegion))
.intercept(new GrpcRetryInterceptor(DEFAULT_CONNECT_TIMEOUT))
.build();

this.grpcBlockingStub = JobManagementServiceGrpc.newBlockingStub(channel);
Expand All @@ -106,7 +106,7 @@ public RegionScopedV3TitusClient(TitusRegion titusRegion,

@Override
public Job getJob(String jobId) {
return new Job(grpcBlockingStub.withDeadlineAfter(DEFAULT_CONNECT_TIMEOUT, TimeUnit.MILLISECONDS).findJob(JobId.newBuilder().setId(jobId).build()), getTasks(Arrays.asList(jobId)).get(jobId));
return new Job(grpcBlockingStub.findJob(JobId.newBuilder().setId(jobId).build()), getTasks(Arrays.asList(jobId)).get(jobId));
}

@Override
Expand Down Expand Up @@ -149,7 +149,7 @@ public String submitJob(SubmitJobRequest submitJobRequest) {
for (TitusJobCustomizer customizer : titusJobCustomizers) {
customizer.customize(jobDescription);
}
return grpcBlockingStub.withDeadlineAfter(DEFAULT_CONNECT_TIMEOUT, TimeUnit.MILLISECONDS).createJob(jobDescription.getGrpcJobDescriptor()).getId();
return grpcBlockingStub.createJob(jobDescription.getGrpcJobDescriptor()).getId();
}

@Override
Expand All @@ -161,7 +161,7 @@ public Task getTask(String taskId) {

@Override
public void resizeJob(ResizeJobRequest resizeJobRequest) {
grpcBlockingStub.withDeadlineAfter(DEFAULT_CONNECT_TIMEOUT, TimeUnit.MILLISECONDS).updateJobCapacity(JobCapacityUpdate.newBuilder()
grpcBlockingStub.updateJobCapacity(JobCapacityUpdate.newBuilder()
.setJobId(resizeJobRequest.getJobId())
.setCapacity(Capacity.newBuilder()
.setDesired(resizeJobRequest.getInstancesDesired())
Expand All @@ -174,18 +174,18 @@ public void resizeJob(ResizeJobRequest resizeJobRequest) {

@Override
public void activateJob(ActivateJobRequest activateJobRequest) {
grpcBlockingStub.withDeadlineAfter(DEFAULT_CONNECT_TIMEOUT, TimeUnit.MILLISECONDS).updateJobStatus(JobStatusUpdate.newBuilder().setId(activateJobRequest.getJobId()).setEnableStatus(activateJobRequest.getInService()).build());
grpcBlockingStub.updateJobStatus(JobStatusUpdate.newBuilder().setId(activateJobRequest.getJobId()).setEnableStatus(activateJobRequest.getInService()).build());
}

@Override
public void terminateJob(TerminateJobRequest terminateJobRequest) {
grpcBlockingStub.withDeadlineAfter(DEFAULT_CONNECT_TIMEOUT, TimeUnit.MILLISECONDS).killJob(JobId.newBuilder().setId(terminateJobRequest.getJobId()).build());
grpcBlockingStub.killJob(JobId.newBuilder().setId(terminateJobRequest.getJobId()).build());
}

@Override
public void terminateTasksAndShrink(TerminateTasksAndShrinkJobRequest terminateTasksAndShrinkJob) {
terminateTasksAndShrinkJob.getTaskIds().forEach(id ->
grpcBlockingStub.withDeadlineAfter(DEFAULT_CONNECT_TIMEOUT, TimeUnit.MILLISECONDS).killTask(TaskKillRequest.newBuilder().setTaskId(id).setShrink(terminateTasksAndShrinkJob.isShrink()).build())
grpcBlockingStub.killTask(TaskKillRequest.newBuilder().setTaskId(id).setShrink(terminateTasksAndShrinkJob.isShrink()).build())
);
}

Expand Down Expand Up @@ -215,7 +215,7 @@ private List<Job> getJobs(JobQuery.Builder jobQuery) {
do {
jobQuery.setPage(Page.newBuilder().setPageNumber(currentPage).setPageSize(100));
JobQuery criteria = jobQuery.build();
JobQueryResult resultPage = grpcBlockingStub.withDeadlineAfter(DEFAULT_CONNECT_TIMEOUT, TimeUnit.MILLISECONDS).findJobs(criteria);
JobQueryResult resultPage = grpcBlockingStub.findJobs(criteria);
grpcJobs.addAll(resultPage.getItemsList());
totalPages = resultPage.getPagination().getTotalPages();
currentPage++;
Expand All @@ -232,7 +232,7 @@ private Map<String, List<com.netflix.titus.grpc.protogen.Task>> getTasks(List<St
TaskQueryResult taskResults;
int currentTaskPage = 0;
do {
taskResults = grpcBlockingStub.withDeadlineAfter(DEFAULT_CONNECT_TIMEOUT, TimeUnit.MILLISECONDS).findTasks(
taskResults = grpcBlockingStub.findTasks(
TaskQuery.newBuilder()
.putFilteringCriteria("jobIds", jobIds.stream().collect(Collectors.joining(",")))
.setPage(Page.newBuilder().setPageNumber(currentTaskPage).setPageSize(100)
Expand All @@ -243,4 +243,5 @@ private Map<String, List<com.netflix.titus.grpc.protogen.Task>> getTasks(List<St
} while (taskResults.getPagination().getHasMore());
return tasks.stream().collect(Collectors.groupingBy(task -> task.getJobId()));
}

}

0 comments on commit 75b0cd5

Please sign in to comment.