Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Implement scaling support
 to take into account whether an instance index is required or not

- for indexed deployments deploy N number of apps with instance index of 0 to N-1, where N is the count deployment property

- for non-indexed deployments use Kubernetes scaling to launch N number of pods for the app, where N is the count deployment property

Resolves #37

Switching to use he new AppDeployer.INSTANCE_INDEX_PROPERTY_KEY constant
  • Loading branch information
trisberg authored and ilayaperumalg committed Jul 8, 2016
1 parent 03aa4e5 commit 8589335
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 70 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -20,7 +20,7 @@
<java.version>1.8</java.version>
<kubernetes-client.version>1.3.83</kubernetes-client.version>
<kubernetes-assertions.version>2.2.110</kubernetes-assertions.version>
<spring-cloud-deployer-spi.version>1.0.1.RELEASE</spring-cloud-deployer-spi.version>
<spring-cloud-deployer-spi.version>1.0.2.BUILD-SNAPSHOT</spring-cloud-deployer-spi.version>
</properties>

<dependencies>
Expand Down
Expand Up @@ -51,15 +51,16 @@ public class AbstractKubernetesDeployer {
* Creates a map of labels for a given ID. This will allow Kubernetes services
* to "select" the right ReplicationControllers.
*/
Map<String, String> createIdMap(String appId, AppDeploymentRequest request) {
Map<String, String> createIdMap(String appId, AppDeploymentRequest request, Integer instanceIndex) {
//TODO: handling of app and group ids
Map<String, String> map = new HashMap<>();
map.put(SPRING_APP_KEY, appId);
String groupId = request.getDeploymentProperties().get(AppDeployer.GROUP_PROPERTY_KEY);
if (groupId != null) {
map.put(SPRING_GROUP_KEY, groupId);
}
map.put(SPRING_DEPLOYMENT_KEY, createDeploymentId(request));
String appInstanceId = instanceIndex == null ? appId : appId + "-" + instanceIndex;
map.put(SPRING_DEPLOYMENT_KEY, appInstanceId);
return map;
}

Expand All @@ -78,7 +79,6 @@ String createDeploymentId(AppDeploymentRequest request) {

AppStatus buildAppStatus(KubernetesDeployerProperties properties, String id, PodList list) {
AppStatus.Builder statusBuilder = AppStatus.of(id);

if (list == null) {
statusBuilder.with(new KubernetesAppInstanceStatus(id, null, properties));
} else {
Expand Down
Expand Up @@ -27,6 +27,6 @@
*/
public interface ContainerFactory {

Container create(String appId, AppDeploymentRequest request, Integer externalPort);
Container create(String appId, AppDeploymentRequest request, Integer externalPort, Integer instanceIndex);

}
Expand Up @@ -31,6 +31,8 @@

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.cloud.deployer.spi.app.AppDeployer;
import org.springframework.cloud.deployer.spi.core.AppDeploymentRequest;
import org.springframework.util.Assert;

Expand All @@ -56,10 +58,8 @@ public DefaultContainerFactory(KubernetesDeployerProperties properties) {
}

@Override
public Container create(String appId, AppDeploymentRequest request, Integer port) {
ContainerBuilder container = new ContainerBuilder();
public Container create(String appId, AppDeploymentRequest request, Integer port, Integer instanceIndex) {
String image = null;
//TODO: what's the proper format for a Docker URI?
try {
image = request.getResource().getURI().getSchemeSpecificPart();
} catch (IOException e) {
Expand All @@ -73,8 +73,14 @@ public Container create(String appId, AppDeploymentRequest request, Integer port
Assert.isTrue(strings.length == 2, "Invalid environment variable declared: " + envVar);
envVars.add(new EnvVar(strings[0], strings[1], null));
}
if (instanceIndex != null) {
envVars.add(new EnvVar(AppDeployer.INSTANCE_INDEX_PROPERTY_KEY, instanceIndex.toString(), null));
}

container.withName(appId)
String appInstanceId = instanceIndex == null ? appId : appId + "-" + instanceIndex;

ContainerBuilder container = new ContainerBuilder();
container.withName(appInstanceId)
.withImage(image)
.withEnv(envVars)
.withArgs(createCommandArgs(request));
Expand Down
Expand Up @@ -17,6 +17,7 @@
package org.springframework.cloud.deployer.spi.kubernetes;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -26,6 +27,7 @@
import org.springframework.cloud.deployer.spi.core.AppDeploymentRequest;

import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.PodSpec;
import io.fabric8.kubernetes.api.model.PodSpecBuilder;
Expand Down Expand Up @@ -71,8 +73,6 @@ public KubernetesAppDeployer(KubernetesDeployerProperties properties,
public String deploy(AppDeploymentRequest request) {

String appId = createDeploymentId(request);
Map<String, String> idMap = createIdMap(appId, request);

logger.debug("Deploying app: {}", appId);

try {
Expand All @@ -86,11 +86,30 @@ public String deploy(AppDeploymentRequest request) {
if (parameters.containsKey(SERVER_PORT_KEY)) {
externalPort = Integer.valueOf(parameters.get(SERVER_PORT_KEY));
}
logger.debug("Creating service: {} on {}", appId, externalPort);
createService(appId, request, idMap, externalPort);

logger.debug("Creating repl controller: {} on {}", appId, externalPort);
createReplicationController(appId, request, idMap, externalPort);
String countProperty = request.getDeploymentProperties().get(COUNT_PROPERTY_KEY);
int count = (countProperty != null) ? Integer.parseInt(countProperty) : 1;

String indexedProperty = request.getDeploymentProperties().get(INDEXED_PROPERTY_KEY);
boolean indexed = (indexedProperty != null) ? Boolean.valueOf(indexedProperty).booleanValue() : false;

if (indexed) {
for (int index=0 ; index < count ; index++) {
String indexedId = appId + "-" + index;
Map<String, String> idMap = createIdMap(appId, request, index);
logger.debug("Creating service: {} on {} with index {}", appId, externalPort, index);
createService(indexedId, request, idMap, externalPort);
logger.debug("Creating repl controller: {} on {} with index {}", appId, externalPort, index);
createReplicationController(indexedId, request, idMap, externalPort, 1, index);
}
}
else {
Map<String, String> idMap = createIdMap(appId, request, null);
logger.debug("Creating service: {} on {}", appId, externalPort);
createService(appId, request, idMap, externalPort);
logger.debug("Creating repl controller: {} on {}", appId, externalPort);
createReplicationController(appId, request, idMap, externalPort, count, null);
}

return appId;
} catch (RuntimeException e) {
Expand All @@ -102,43 +121,50 @@ public String deploy(AppDeploymentRequest request) {

@Override
public void undeploy(String appId) {
logger.debug("Undeploying module: {}", appId);

try {
if ("LoadBalancer".equals(client.services().withName(appId).get().getSpec().getType())) {
Service svc = client.services().withName(appId).get();
int tries = 0;
int maxWait = properties.getMinutesToWaitForLoadBalancer() * 6; // we check 6 times per minute
while (tries++ < maxWait) {
if (svc.getStatus() != null && svc.getStatus().getLoadBalancer() != null &&
svc.getStatus().getLoadBalancer().getIngress() != null &&
svc.getStatus().getLoadBalancer().getIngress().isEmpty()) {
if (tries % 6 == 0) {
logger.warn("Waiting for LoadBalancer to complete before deleting it ...");
}
logger.debug("Waiting for LoadBalancer, try {}", tries);
try {
Thread.sleep(10000L);
} catch (InterruptedException e) {
logger.debug("Undeploying app: {}", appId);

List<ReplicationController> apps =
client.replicationControllers().withLabel(SPRING_APP_KEY, appId).list().getItems();
for (ReplicationController rc : apps) {
String appIdToDelete = rc.getMetadata().getName();
logger.debug("Deleting svc, rc and pods for: {}", appIdToDelete);

Service svc = client.services().withName(appIdToDelete).get();
try {
if (svc != null && "LoadBalancer".equals(svc.getSpec().getType())) {
int tries = 0;
int maxWait = properties.getMinutesToWaitForLoadBalancer() * 6; // we check 6 times per minute
while (tries++ < maxWait) {
if (svc.getStatus() != null && svc.getStatus().getLoadBalancer() != null &&
svc.getStatus().getLoadBalancer().getIngress() != null &&
svc.getStatus().getLoadBalancer().getIngress().isEmpty()) {
if (tries % 6 == 0) {
logger.warn("Waiting for LoadBalancer to complete before deleting it ...");
}
logger.debug("Waiting for LoadBalancer, try {}", tries);
try {
Thread.sleep(10000L);
} catch (InterruptedException e) {
}
svc = client.services().withName(appIdToDelete).get();
} else {
break;
}
svc = client.services().withName(appId).get();
} else {
break;
}
logger.debug("LoadBalancer Ingress: {}", svc.getStatus().getLoadBalancer().getIngress());
}
logger.debug("LoadBalancer Ingress: {}", svc.getStatus().getLoadBalancer().getIngress());
Boolean svcDeleted = client.services().withName(appIdToDelete).delete();
logger.debug("Deleted service for: {} {}", appIdToDelete, svcDeleted);
Boolean rcDeleted = client.replicationControllers().withName(appIdToDelete).delete();
logger.debug("Deleted replication controller for: {} {}", appIdToDelete, rcDeleted);
Map<String, String> selector = new HashMap<>();
selector.put(SPRING_APP_KEY, appIdToDelete);
Boolean podDeleted = client.pods().withLabels(selector).delete();
logger.debug("Deleted pods for: {} {}", appIdToDelete, podDeleted);
} catch (RuntimeException e) {
logger.error(e.getMessage(), e);
throw e;
}
Boolean svcDeleted = client.services().withName(appId).delete();
logger.debug("Deleted service for: {} {}", appId, svcDeleted);
Boolean rcDeleted = client.replicationControllers().withName(appId).delete();
logger.debug("Deleted replication controller for: {} {}", appId, rcDeleted);
Map<String, String> selector = new HashMap<>();
selector.put(SPRING_APP_KEY, appId);
Boolean podDeleted = client.pods().withLabels(selector).delete();
logger.debug("Deleted pods for: {} {}", appId, podDeleted);
} catch (RuntimeException e) {
logger.error(e.getMessage(), e);
throw e;
}
}

Expand All @@ -147,6 +173,13 @@ public AppStatus status(String appId) {
Map<String, String> selector = new HashMap<>();
selector.put(SPRING_APP_KEY, appId);
PodList list = client.pods().withLabels(selector).list();
if (logger.isDebugEnabled()) {
logger.debug("Building AppStatus for app: {}", appId);
logger.debug("Pods for appId {}: {}", appId, list.getItems().size());
for (Pod pod : list.getItems()) {
logger.debug("Pod: {}", pod.getMetadata().getName());
}
}
AppStatus status = buildAppStatus(properties, appId, list);
logger.debug("Status for app: {} is {}", appId, status);

Expand All @@ -155,40 +188,38 @@ public AppStatus status(String appId) {

private ReplicationController createReplicationController(
String appId, AppDeploymentRequest request,
Map<String, String> idMap, int externalPort) {
String countProperty = request.getDeploymentProperties().get(COUNT_PROPERTY_KEY);
int count = (countProperty != null) ? Integer.parseInt(countProperty) : 1;
Map<String, String> idMap, int externalPort, int replicas, Integer instanceIndex) {
ReplicationController rc = new ReplicationControllerBuilder()
.withNewMetadata()
.withName(appId)
.withLabels(idMap)
.addToLabels(SPRING_MARKER_KEY, SPRING_MARKER_VALUE)
.withName(appId)
.withLabels(idMap)
.addToLabels(SPRING_MARKER_KEY, SPRING_MARKER_VALUE)
.endMetadata()
.withNewSpec()
.withReplicas(count)
.withSelector(idMap)
.withNewTemplate()
.withNewMetadata()
.withLabels(idMap)
.addToLabels(SPRING_MARKER_KEY, SPRING_MARKER_VALUE)
.endMetadata()
.withSpec(createPodSpec(appId, request, externalPort))
.endTemplate()
.withReplicas(replicas)
.withSelector(idMap)
.withNewTemplate()
.withNewMetadata()
.withLabels(idMap)
.addToLabels(SPRING_MARKER_KEY, SPRING_MARKER_VALUE)
.endMetadata()
.withSpec(createPodSpec(appId, request, Integer.valueOf(externalPort), instanceIndex))
.endTemplate()
.endSpec()
.build();

return client.replicationControllers().create(rc);
}

private PodSpec createPodSpec(String appId, AppDeploymentRequest request, int port) {
private PodSpec createPodSpec(String appId, AppDeploymentRequest request, Integer port, Integer instanceIndex) {
PodSpecBuilder podSpec = new PodSpecBuilder();

// Add image secrets if set
if (properties.getImagePullSecret() != null) {
podSpec.addNewImagePullSecret(properties.getImagePullSecret());
}

Container container = containerFactory.create(appId, request, port);
Container container = containerFactory.create(appId, request, port, instanceIndex);

// add memory and cpu resource limits
ResourceRequirements req = new ResourceRequirements();
Expand Down
Expand Up @@ -67,8 +67,8 @@ public DeploymentState getState() {
* Maps Kubernetes phases/states onto Spring Cloud Deployer states
*/
private DeploymentState mapState() {
logger.debug("Phase [" + pod.getStatus().getPhase() + "]");
logger.debug("ContainerStatus [" + containerStatus + "]");
logger.debug("{} - Phase [ {} ]", pod.getMetadata().getName(), pod.getStatus().getPhase());
logger.debug("{} - ContainerStatus [ {} ]", pod.getMetadata().getName(), containerStatus);
switch (pod.getStatus().getPhase()) {

case "Pending":
Expand Down
Expand Up @@ -39,7 +39,6 @@
import io.fabric8.kubernetes.api.model.extensions.JobSpecBuilder;
import io.fabric8.kubernetes.api.model.extensions.JobStatus;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;

/**
* A task launcher that targets Kubernetes.
Expand Down Expand Up @@ -80,7 +79,7 @@ public String launch(AppDeploymentRequest request) {
}
deleteJob(appId);
}
Map<String, String> idMap = createIdMap(appId, request);
Map<String, String> idMap = createIdMap(appId, request, null);

logger.debug("Launching job: {}", appId);
try {
Expand Down Expand Up @@ -117,7 +116,7 @@ public TaskStatus status(String id) {
}

private Container createContainer(String appId, AppDeploymentRequest request) {
Container container = containerFactory.create(appId, request, null);
Container container = containerFactory.create(appId, request, null, null);
// add memory and cpu resource limits
ResourceRequirements req = new ResourceRequirements();
req.setLimits(deduceResourceLimits(properties, request));
Expand Down

0 comments on commit 8589335

Please sign in to comment.