Skip to content

Commit

Permalink
work in progress... read src/TODO to pick up progress
Browse files Browse the repository at this point in the history
  • Loading branch information
Niv Veertu committed Feb 12, 2020
1 parent d38678d commit c9341b0
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 98 deletions.
25 changes: 25 additions & 0 deletions src/TODO
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
current situation
---------------------
regular pipeline and freestyle jobs work as expected
weird behaviour to consider: if you run a basic job, cancel it before jenkins node has not started yet (but after start vm has been sent to controller)
What happens is that jenkins cancels the job but the node and vm get generated and connected to one another.
They die when jenkins kills the idle node, or they get picked up if same job is started again.
We need to think if this is a normal behaviour for us or not.

current issues
----------------
- daemon instance behaviour on jenkins restarts and config changes:
config changes at the moment do not create new instance daemons (good)
restarts lose all data saved in the daemons and zombie vms are left
- handle errors better on getStatus() called from instance daemon

to be tested
--------------
- cache builder jobs
- dynamic slave jobs
- restart resilience
- check regression for running two parallel jobs in pipeline that one fails while the other runs

more to do
-------------
- add tests for zombie vms
25 changes: 17 additions & 8 deletions src/main/java/com/veertu/plugin/anka/AnkaMgmtCloud.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class AnkaMgmtCloud extends Cloud {

private SaveImageRequestsHolder saveImageRequestsHolder = SaveImageRequestsHolder.getInstance();
private InstanceDaemon daemon;
public static Map<String, InstanceDaemon> cloudToDaemonMap;

@DataBoundConstructor
public AnkaMgmtCloud(String ankaMgmtUrl,
Expand Down Expand Up @@ -86,16 +87,25 @@ public AnkaMgmtCloud(String ankaMgmtUrl,
initEvents();
}

private void initEvents() {
public void initEvents() {
if (!eventsInit) {
if (ankaAPI != null) {
daemon = new InstanceDaemon();
AnkaEvents.addListener(Event.nodeStarted, daemon );
AnkaEvents.addListener(Event.VMStarted, daemon );
AnkaEvents.addListener(Event.nodeTerminated, daemon );
AnkaEvents.addListener(Event.saveImage, daemon );
new Thread(daemon).run();
if (cloudToDaemonMap == null)
cloudToDaemonMap = new HashMap<>();
InstanceDaemon runningDaemon = cloudToDaemonMap.get(getCloudName());
if (runningDaemon != null) {
daemon = runningDaemon;
}
else {
daemon = new InstanceDaemon();
AnkaEvents.addListener(Event.nodeStarted, daemon );
AnkaEvents.addListener(Event.VMStarted, daemon );
AnkaEvents.addListener(Event.nodeTerminated, daemon );
AnkaEvents.addListener(Event.saveImage, daemon );
new Thread(daemon).start();
}
eventsInit = true;
cloudToDaemonMap.put(getCloudName(), daemon);
}
}
}
Expand Down Expand Up @@ -190,7 +200,6 @@ public Collection<NodeProvisioner.PlannedNode> provision(Label label, int excess
break;
}
try {

NodeProvisioner.PlannedNode newNode = AnkaPlannedNode.createInstance(this, t);
plannedNodes.add(newNode);
excessWorkload -= t.getNumberOfExecutors();
Expand Down
64 changes: 26 additions & 38 deletions src/main/java/com/veertu/plugin/anka/AnkaOnDemandSlave.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,22 +101,13 @@ private static AnkaOnDemandSlave createJNLPSlave(AnkaMgmtCloud cloud, final Anka
launcher,
template.getNodeProperties(), template, vm);

new Thread(new Runnable() {

@Override
public void run() {
try {
vm.waitForBoot(template.getSchedulingTimeout());
} catch (InterruptedException | IOException | AnkaMgmtException e) {
vm.terminate();
throw new RuntimeException(new AnkaMgmtException(e));
}



}
}).run();

try {
vm.waitForBoot(template.getSchedulingTimeout());
} catch (InterruptedException | IOException | AnkaMgmtException e) {
vm.terminate();
throw new RuntimeException(new AnkaMgmtException(e));
}

slave.setDisplayName(vm.getName());
return slave;
Expand All @@ -134,29 +125,26 @@ private static AnkaOnDemandSlave createSSHSlave(AnkaMgmtCloud cloud, final AnkaC
template.getNodeProperties(), template, vm);

final AnkaOnDemandSlave finalSlave = slave;
new Thread(new Runnable() {

@Override
public void run() {
AnkaMgmtCloud.Log("vm %s is booting...", vm.getId());
try {
vm.waitForBoot(template.getSchedulingTimeout());
} catch (InterruptedException | IOException | AnkaMgmtException e) {
vm.terminate();
throw new RuntimeException(new AnkaMgmtException(e));
}
AnkaMgmtCloud.Log("vm %s %s is booted, creating ssh launcher", vm.getId(), vm.getName());
SSHLauncher launcher = new SSHLauncher(vm.getConnectionIp(), vm.getConnectionPort(),
template.getCredentialsId(),
template.getJavaArgs(), null, null, null, launchTimeoutSeconds, maxNumRetries, retryWaitTime, null);

finalSlave.setLauncher(launcher);

AnkaMgmtCloud.Log("launcher created for vm %s %s", vm.getId(), vm.getName());
String name = vm.getName();
finalSlave.setDisplayName(name);
}
}).run();




AnkaMgmtCloud.Log("vm %s is booting...", vm.getId());
try {
vm.waitForBoot(template.getSchedulingTimeout());
} catch (InterruptedException | IOException | AnkaMgmtException e) {
vm.terminate();
throw new RuntimeException(new AnkaMgmtException(e));
}
AnkaMgmtCloud.Log("vm %s %s is booted, creating ssh launcher", vm.getId(), vm.getName());
SSHLauncher launcher = new SSHLauncher(vm.getConnectionIp(), vm.getConnectionPort(),
template.getCredentialsId(),
template.getJavaArgs(), null, null, null, launchTimeoutSeconds, maxNumRetries, retryWaitTime, null);
finalSlave.setLauncher(launcher);

AnkaMgmtCloud.Log("launcher created for vm %s %s", vm.getId(), vm.getName());
String name = vm.getName();
finalSlave.setDisplayName(name);

return slave;
} catch (Exception e) {
Expand Down
8 changes: 5 additions & 3 deletions src/main/java/com/veertu/plugin/anka/AnkaPlannedNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,15 @@ public Node call() throws Exception {
e.printStackTrace();
throw e;
}
if (slave == null) {
return null;
}

slave.register();

if (template.getLaunchMethod().toLowerCase().equals(LaunchMethod.SSH)) {
return slave;
}
if (slave == null) {
return null;
}
long startTime = System.currentTimeMillis(); // fetch starting time
while (true) {
try {
Expand Down
116 changes: 68 additions & 48 deletions src/main/java/com/veertu/plugin/anka/InstanceDaemon.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
import com.veertu.ankaMgmtSdk.exceptions.AnkaMgmtException;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

public class InstanceDaemon implements EventHandler, Runnable {

// private List<AnkaMgmtVm> instances = new LinkedList<>();
private final Object mutex = new Object();
private Map<String, VMNodeWrapper> instanceMap = new HashMap<>();
private Map<String, VMNodeWrapper> nodeMap = new HashMap<>();
Expand All @@ -22,7 +22,6 @@ public void vmStarted(AnkaMgmtVm vm) {
}
}


public void vmAttachedToNode(AbstractAnkaSlave node) {
synchronized (mutex) {
VMNodeWrapper instance = instanceMap.get(node.getVM().getId());
Expand All @@ -36,6 +35,7 @@ public void nodeTerminated(AbstractAnkaSlave node) {
VMNodeWrapper instance = nodeMap.get(node.getNodeName());
if (instance.state != State.pushing) {
instance.state = State.shouldTerminate;
nodeMap.remove(node.getNodeName());
}
}
}
Expand All @@ -50,77 +50,97 @@ public void saveImageSent(AbstractAnkaSlave node) {
public void run() {
while (true) {
try {
for (VMNodeWrapper instance: instanceMap.values()) {
checkInstance(instance);
Thread.sleep(100); // 100 ms between each request
Thread.sleep(5000);
try {
Iterator<VMNodeWrapper> it = instanceMap.values().iterator();
while (it.hasNext()) {
VMNodeWrapper instance = it.next();
if (removeInstance(instance)) {
it.remove();
}
Thread.sleep(100); // 100 ms between each request
}
} catch (Exception e) {
AnkaMgmtCloud.Log("Got exception running anka daemon instance loop");
e.printStackTrace();
}
Thread.sleep(5000); // run every 5 seconds
} catch (Exception e) {
AnkaMgmtCloud.Log("Got exception running instance loop");
AnkaMgmtCloud.Log("Got exception while waiting between anka daemon instance loop executions");
e.printStackTrace();
}
}
}

private void checkInstance(VMNodeWrapper instance) throws AnkaMgmtException {
String status = instance.vm.getStatus();
if (status != null) {
switch (instance.state) {
case started:
switch (status) {
case "Scheduling":
case "Pulling":
case "Started":
return;
}
AnkaMgmtCloud.Log("VM %s is in unexpected state %s", instance.vm.getId(), status);

break;
case pushing:
switch (status) {
case "Pushing":
return;
}
AnkaMgmtCloud.Log("VM %s is in unexpected state %s", instance.vm.getId(), status);
break;
case shouldTerminate:
switch (status) {
case "Terminating":
case "Terminated":
return;
case "Started":
case "Scheduling":
instance.vm.terminate();
return;
}
AnkaMgmtCloud.Log("VM %s is in unexpected state %s", instance.vm.getId(), status);
break;
private boolean removeInstance(VMNodeWrapper instance) throws AnkaMgmtException {
try {
String status = instance.vm.getStatus();
if (status != null) {
switch (instance.state) {
case started:
switch (status) {
case "Scheduling":
case "Pulling":
case "Started":
return false;
case "Terminated":
case "Terminating":
AnkaMgmtCloud.Log("VM %s is in unexpected state %s (jenkins instance state: started)", instance.vm.getId(), status);
return true;
}
AnkaMgmtCloud.Log("VM %s is in unexpected state %s (jenkins instance state: started)", instance.vm.getId(), status);
break;
case pushing:
switch (status) {
case "Pushing":
return false;
}
AnkaMgmtCloud.Log("VM %s is in unexpected state: %s (jenkins instance state: pushing)", instance.vm.getId(), status);
break;
case shouldTerminate:
switch (status) {
case "Terminating":
case "Terminated":
return false;
default:
AnkaMgmtCloud.Log("VM %s is in unexpected state %s (jenkins instance state: shouldTerminate)", instance.vm.getId(), status);
AnkaMgmtCloud.Log("Terminating VM %s", instance.vm.getId());
instance.vm.terminate();
return true;
}
}
}
} catch (NullPointerException e) {
AnkaMgmtCloud.Log("VM %s does not exist in controller", instance.vm.getId());
if (instance.node != null)
nodeMap.remove(instance.node.getNodeName());
return true;
}

return false;
}

@Override
public void handle(AnkaEvent e) {

switch (e.getClass().getCanonicalName()) {
case "com.veertu.plugin.anka.events.NodeStarted":
String eventName = e.getClass().getCanonicalName();
switch (eventName) {
case "com.veertu.plugin.anka.NodeStarted":
NodeStarted event = (NodeStarted) e;
this.vmAttachedToNode(event.getNode());
break;
case "com.veertu.plugin.anka.events.VMStarted":
case "com.veertu.plugin.anka.VMStarted":
VMStarted startEvent = (VMStarted) e;
this.vmStarted(startEvent.getVm());
break;
case "com.veertu.plugin.anka.events.SaveImageEvent":
case "com.veertu.plugin.anka.SaveImageEvent":
SaveImageEvent saveEvent = (SaveImageEvent) e;
this.saveImageSent(saveEvent.getNode());
break;
case "com.veertu.plugin.anka.events.NodeTerminated":
case "com.veertu.plugin.anka.NodeTerminated":
NodeTerminated nodeTerminatedEvent = (NodeTerminated) e;
this.nodeTerminated(nodeTerminatedEvent.getNode());
break;

default:
AnkaMgmtCloud.Log("Could not identify event name. Got: %s ", eventName);
break;
}

}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/veertu/plugin/anka/events.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public static void fire(Event event, final AnkaEvent e) {
public void run() {
hl.handle(e);
}
}).run();
}).start();
}
}

Expand Down

0 comments on commit c9341b0

Please sign in to comment.