Skip to content

Commit

Permalink
(titus): toggle scaling enabled on enable/disable; guard against asyn…
Browse files Browse the repository at this point in the history
…c policy upserts
  • Loading branch information
anotherchrisberry authored and tomaslin committed Mar 26, 2018
1 parent 1484b06 commit 7414308
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ public List<ScalingPolicyResult> getJobScalingPolicies(String jobId) {
.getJobScalingPolicies(request).getItemsList();
}

@Override
public ScalingPolicyResult getScalingPolicy(String policyId) {
return autoScalingServiceBlockingStub.getScalingPolicy(ScalingPolicyID.newBuilder().setId(policyId).build()).getItems(0);
}

@Override
public ScalingPolicyID upsertScalingPolicy(PutPolicyRequest policy) {
return autoScalingServiceBlockingStub
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,11 @@ public void activateJob(ActivateJobRequest activateJobRequest){
execute("activateJob", titusRestAdapter.activateJob(activateJobRequest));
}

@Override
public void setAutoscaleEnabled(boolean shouldEnable) {
// noop, only supported in V3
}

@Override
public void terminateJob(TerminateJobRequest terminateJobRequest) {
if (terminateJobRequest.getUser() == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public interface TitusAutoscalingClient {

List<ScalingPolicyResult> getJobScalingPolicies(String jobId);

ScalingPolicyResult getScalingPolicy(String policyId);

ScalingPolicyID upsertScalingPolicy(PutPolicyRequest policy);

void deleteScalingPolicy(DeletePolicyRequest request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ public interface TitusClient {
*/
public void activateJob(ActivateJobRequest activateJobRequest);

/**
*
* @param shouldEnable
*/
public void setAutoscaleEnabled(boolean shouldEnable);

/**
*
* @param terminateJobRequest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ abstract class AbstractEnableDisableAtomicOperation implements AtomicOperation<V
new ActivateJobRequest()
.withUser('spinnaker')
.withJobId(job.id)
.withInService(disable ? false : true)
.withInService(!disable)
)

if (job.tasks) {
Expand All @@ -99,6 +99,8 @@ abstract class AbstractEnableDisableAtomicOperation implements AtomicOperation<V
)
}

provider.setAutoscaleEnabled(!disable)

task.updateStatus phaseName, "Finished ${presentParticipling} ServerGroup $serverGroupName."

return true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,18 @@ import com.netflix.spinnaker.clouddriver.data.task.TaskRepository
import com.netflix.spinnaker.clouddriver.orchestration.AtomicOperation
import com.netflix.spinnaker.clouddriver.titus.TitusClientProvider
import com.netflix.spinnaker.clouddriver.titus.deploy.description.UpsertTitusScalingPolicyDescription
import com.netflix.spinnaker.kork.core.RetrySupport
import com.netflix.titus.grpc.protogen.DeletePolicyRequest
import com.netflix.titus.grpc.protogen.PutPolicyRequest
import com.netflix.titus.grpc.protogen.PutPolicyRequest.Builder
import com.netflix.titus.grpc.protogen.ScalingPolicy
import com.netflix.titus.grpc.protogen.ScalingPolicyID
import com.netflix.titus.grpc.protogen.ScalingPolicyResult
import com.netflix.titus.grpc.protogen.ScalingPolicyStatus.ScalingPolicyState
import groovy.util.logging.Slf4j
import org.springframework.beans.factory.annotation.Autowired

@Slf4j
class UpsertTitusScalingPolicyAtomicOperation implements AtomicOperation<Map> {

UpsertTitusScalingPolicyDescription description
Expand All @@ -45,6 +50,9 @@ class UpsertTitusScalingPolicyAtomicOperation implements AtomicOperation<Map> {
@Autowired
TitusClientProvider titusClientProvider

@Autowired
RetrySupport retrySupport

@Override
Map operate(List priorOutputs) {
task.updateStatus BASE_PHASE, "Initializing Upsert Scaling Policy..."
Expand All @@ -54,31 +62,66 @@ class UpsertTitusScalingPolicyAtomicOperation implements AtomicOperation<Map> {
throw new UnsupportedOperationException("Autoscaling is not supported for this account/region")
}

ScalingPolicy.Builder builder = description.toScalingPolicyBuilder()

Builder requestBuilder = PutPolicyRequest.newBuilder()
.setScalingPolicy(builder)
.setJobId(description.jobId)

task.updateStatus BASE_PHASE, "Create Scaling Policy request constructed, sending..."

ScalingPolicyID result = client.upsertScalingPolicy(requestBuilder.build())

task.updateStatus BASE_PHASE, "Create Scaling Policy succeeded; new policy ID: ${result.id}"
ScalingPolicyResult previousPolicy

if (description.scalingPolicyID) {

previousPolicy = client.getScalingPolicy(description.scalingPolicyID)

task.updateStatus BASE_PHASE, "Deleting previous scaling policy (${description.scalingPolicyID})..."

DeletePolicyRequest.Builder deleteRequestBuilder = DeletePolicyRequest.newBuilder()
.setId(ScalingPolicyID.newBuilder().setId(description.scalingPolicyID))

client.deleteScalingPolicy(deleteRequestBuilder.build())

task.updateStatus BASE_PHASE, "Deleted old scaling policy (${description.scalingPolicyID})"
task.updateStatus BASE_PHASE, "Deleted previous scaling policy (${description.scalingPolicyID}); monitoring deletion"

retrySupport.retry({ ->
ScalingPolicyResult updatedPolicy = client.getScalingPolicy(description.scalingPolicyID)
if (!updatedPolicy || (updatedPolicy.getPolicyState().state != ScalingPolicyState.Deleted)) {
throw new IllegalStateException("Previous policy was not deleted after 45 seconds")
}
}, 5000, 10, false)

task.updateStatus BASE_PHASE, "Previous scaling policy successfully deleted"
}

ScalingPolicy.Builder builder = description.toScalingPolicyBuilder()

Builder requestBuilder = PutPolicyRequest.newBuilder()
.setScalingPolicy(builder)
.setJobId(description.jobId)

task.updateStatus BASE_PHASE, "Create Scaling Policy request constructed, sending..."

ScalingPolicyID result = client.upsertScalingPolicy(requestBuilder.build())

task.updateStatus BASE_PHASE, "Create Scaling Policy succeeded; new policy ID: ${result.id}; monitoring creation..."

// make sure the new policy was applied
try {
verifyNewPolicyState(client, result)
} catch (IllegalStateException e) {
if (previousPolicy) {
log.info("New policy creation failed; attempting to restore previous policy")
client.upsertScalingPolicy(PutPolicyRequest.newBuilder().setScalingPolicy(previousPolicy.scalingPolicy).build())
}
throw e
}

task.updateStatus BASE_PHASE, "Scaling policy successfully created"

return [scalingPolicyID: result.id]
}

private void verifyNewPolicyState(client, result) {
retrySupport.retry({ ->
ScalingPolicyResult updatedPolicy = client.getScalingPolicy(result.id)
if (!updatedPolicy || (updatedPolicy.getPolicyState().state != ScalingPolicyState.Applied)) {
throw new IllegalStateException("New policy did not transition to applied state within 45 seconds")
}
}, 5000, 10, false)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,18 @@ public void activateJob(ActivateJobRequest activateJobRequest) {
grpcBlockingStub.updateJobStatus(JobStatusUpdate.newBuilder().setId(activateJobRequest.getJobId()).setEnableStatus(activateJobRequest.getInService()).build());
}

@Override
public void setAutoscaleEnabled(boolean shouldEnable) {
grpcBlockingStub.updateJobProcesses(
JobProcessesUpdate.newBuilder().setServiceJobProcesses(
ServiceJobSpec.ServiceJobProcesses.newBuilder()
.setDisableDecreaseDesired(shouldEnable)
.setDisableIncreaseDesired(shouldEnable)
.build()
).build()
);
}

@Override
public void terminateJob(TerminateJobRequest terminateJobRequest) {
grpcBlockingStub.killJob(JobId.newBuilder().setId(terminateJobRequest.getJobId()).build());
Expand Down

0 comments on commit 7414308

Please sign in to comment.