Permalink
Browse files

Add persistence for vm lists in AsyncRequest

  • Loading branch information...
oldpatricka committed Jun 22, 2011
1 parent b6a77a7 commit e732ad490a5985312ce3a29df897db05d87a1ce4
@@ -344,3 +344,21 @@ destpath VARCHAR(512),
on_image SMALLINT NOT NULL
);
+CREATE TABLE async_requests_allocated_vms
+(
+id VARCHAR(512) NOT NULL,
+vmid INT NOT NULL
+);
+
+CREATE TABLE async_requests_finished_vms
+(
+id VARCHAR(512) NOT NULL,
+vmid INT NOT NULL
+);
+
+CREATE TABLE async_requests_to_be_preempted
+(
+id VARCHAR(512) NOT NULL,
+vmid INT NOT NULL
+);
+
@@ -1,6 +1,8 @@
package org.globus.workspace.async;
+import java.io.InterruptedIOException;
import java.io.Serializable;
+import java.lang.reflect.Array;
import java.util.Calendar;
import java.util.Collection;
import java.util.Collections;
@@ -11,6 +13,7 @@
import org.nimbustools.api.repr.Caller;
import org.nimbustools.api.repr.ctx.Context;
import org.nimbustools.api.repr.vm.NIC;
+import sun.text.normalizer.IntTrie;
public class AsyncRequest implements Comparable<AsyncRequest>, Serializable {
@@ -119,7 +122,15 @@ public Integer getAllocatedInstances() {
public void addAllocatedVM(int createdId) {
this.allocatedVMs.add(createdId);
}
-
+
+ public void addFinishedVM(int createdId) {
+ this.finishedVMs.add(createdId);
+ }
+
+ public void addToBePreempted(int createdId) {
+ this.toBePreempted.add(createdId);
+ }
+
public Integer getUnallocatedInstances(){
return this.getNeededInstances() - getAllocatedInstances();
}
@@ -278,6 +289,36 @@ public boolean finishVM(int vmid) {
return result;
}
+ public int[] getAllocatedVMs() {
+ int[] allocated = new int[this.allocatedVMs.size()];
+ int i=0;
+ for (int vm : this.allocatedVMs) {
+ allocated[i] = vm;
+ i++;
+ }
+ return allocated;
+ }
+
+ public int[] getFinishedVMs() {
+ int[] finished = new int[this.finishedVMs.size()];
+ int i=0;
+ for (int vm : this.finishedVMs) {
+ finished[i] = vm;
+ i++;
+ }
+ return finished;
+ }
+
+ public int[] getToBePreempted() {
+ int[] preempted = new int[this.toBePreempted.size()];
+ int i=0;
+ for (int vm : this.toBePreempted) {
+ preempted[i] = vm;
+ i++;
+ }
+ return preempted;
+ }
+
public Calendar getCreationTime() {
return this.creationTime;
}
@@ -277,6 +277,42 @@
"(async_id, binding_index, vmid, sourcepath, destpath, on_image) " +
" VALUES (?,?,?,?,?,?)";
+ public static final String SQL_INSERT_ASYNC_REQUESTS_ALLOCATED_VMS =
+ "INSERT INTO async_requests_allocated_vms " +
+ "(id, vmid) VALUES (?,?)";
+
+ public static final String SQL_INSERT_ASYNC_REQUESTS_FINISHED_VMS =
+ "INSERT INTO async_requests_finished_vms " +
+ "(id, vmid) VALUES (?,?)";
+
+ public static final String SQL_INSERT_ASYNC_REQUESTS_TO_BE_PREEMPTED =
+ "INSERT INTO async_requests_to_be_preempted " +
+ "(id, vmid) VALUES (?,?)";
+
+ public static final String SQL_DELETE_ASYNC_REQUESTS_ALLOCATED_VMS =
+ "DELETE FROM async_requests_allocated_vms " +
+ "WHERE id=?";
+
+ public static final String SQL_DELETE_ASYNC_REQUESTS_FINISHED_VMS =
+ "DELETE FROM async_requests_finished_vms " +
+ "WHERE id=?";
+
+ public static final String SQL_DELETE_ASYNC_REQUESTS_TO_BE_PREEMPTED =
+ "DELETE FROM async_requests_to_be_preempted " +
+ "WHERE id=?";
+
+ public static final String SQL_LOAD_ASYNC_REQUESTS_ALLOCATED_VMS =
+ "SELECT vmid FROM async_requests_allocated_vms " +
+ "WHERE id=?";
+
+ public static final String SQL_LOAD_ASYNC_REQUESTS_FINISHED_VMS =
+ "SELECT vmid FROM async_requests_finished_vms " +
+ "WHERE id=?";
+
+ public static final String SQL_LOAD_ASYNC_REQUESTS_TO_BE_PREEMPTED =
+ "SELECT vmid FROM async_requests_to_be_preempted " +
+ "WHERE id=?";
+
public static final String SQL_UPDATE_ASYNC_REQUEST =
"UPDATE async_requests SET id=?, max_bid=?, spot=?, group_id=?, persistent=?, creator_dn=?, creator_is_superuser=?, ssh_key_name=?, creation_time=?, nics=?, status=? WHERE id=?";
@@ -60,6 +60,7 @@
import org.nimbustools.api.repr.vm.NIC;
import org.nimbustools.api.services.rm.DoesNotExistException;
import org.nimbustools.api.services.rm.ManageException;
+import org.springframework.scheduling.annotation.Async;
public class PersistenceAdapterImpl implements WorkspaceConstants,
PersistenceAdapterConstants,
@@ -3254,11 +3255,20 @@ public void addAsyncRequest(AsyncRequest asyncRequest)
pstmt = AsyncRequestMapPersistenceUtil.getDeleteAsyncRequestVMs(asyncRequest, c);
pstmt.executeUpdate();
+ AsyncRequestMapPersistenceUtil.removeAllocatedVMs(asyncRequest, c);
+ AsyncRequestMapPersistenceUtil.removeFinishedVMs(asyncRequest, c);
+ AsyncRequestMapPersistenceUtil.removeToBePreempted(asyncRequest, c);
+ AsyncRequestMapPersistenceUtil.putAllocatedVMs(asyncRequest, c);
+ AsyncRequestMapPersistenceUtil.putFinishedVMs(asyncRequest, c);
+ AsyncRequestMapPersistenceUtil.putToBePreempted(asyncRequest, c);
}
else {
logger.debug("Persisting request: " + asyncRequest.getId());
pstmt = AsyncRequestMapPersistenceUtil.getInsertAsyncRequest(asyncRequest, this.repr, c);
pstmt.executeUpdate();
+ AsyncRequestMapPersistenceUtil.putAllocatedVMs(asyncRequest, c);
+ AsyncRequestMapPersistenceUtil.putFinishedVMs(asyncRequest, c);
+ AsyncRequestMapPersistenceUtil.putToBePreempted(asyncRequest, c);
}
AsyncRequestMapPersistenceUtil.putAsyncRequestBindings(asyncRequest, c);
@@ -3314,6 +3324,9 @@ public AsyncRequest getAsyncRequest(String id)
asyncRequest = AsyncRequestMapPersistenceUtil.rsToAsyncRequest(rs, this.repr, c);
VirtualMachine[] bindings = AsyncRequestMapPersistenceUtil.getAsyncVMs(asyncRequest.getId(), c);
asyncRequest.setBindings(bindings);
+ AsyncRequestMapPersistenceUtil.addAllocatedVMs(asyncRequest, c);
+ AsyncRequestMapPersistenceUtil.addFinishedVMs(asyncRequest, c);
+ AsyncRequestMapPersistenceUtil.addToBePreempted(asyncRequest, c);
} catch (SQLException e) {
logger.error("",e);
@@ -28,18 +28,21 @@
import org.globus.workspace.service.binding.vm.VirtualMachineDeployment;
import org.globus.workspace.service.binding.vm.VirtualMachinePartition;
import org.nimbustools.api._repr._Caller;
+import org.nimbustools.api.repr.AsyncCreateRequest;
import org.nimbustools.api.repr.Caller;
import org.nimbustools.api.repr.CannotTranslateException;
import org.nimbustools.api.repr.ReprFactory;
import org.nimbustools.api.repr.ctx.Context;
import org.nimbustools.api.repr.vm.NIC;
import org.nimbustools.api.services.rm.ManageException;
+import org.springframework.scheduling.annotation.Async;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;
+import java.util.concurrent.ConcurrentNavigableMap;
public class AsyncRequestMapPersistenceUtil
implements PersistenceAdapterConstants {
@@ -76,7 +79,7 @@ public static PreparedStatement getInsertAsyncRequest(AsyncRequest asyncRequest,
pstmt.setLong(9, asyncRequest.getCreationTime().getTimeInMillis());
}
else {
- pstmt.setInt(9,0);
+ pstmt.setInt(9, 0);
}
DataConvert dataConvert = new DataConvert(repr);
String nics = dataConvert.nicsAsString(asyncRequest.getRequestedNics());
@@ -86,6 +89,114 @@ public static PreparedStatement getInsertAsyncRequest(AsyncRequest asyncRequest,
return pstmt;
}
+ public static void putAllocatedVMs(AsyncRequest asyncRequest, Connection c) throws SQLException {
+
+ for (int vmid : asyncRequest.getAllocatedVMs()) {
+ PreparedStatement pstmt = c.prepareStatement(SQL_INSERT_ASYNC_REQUESTS_ALLOCATED_VMS);
+ pstmt.setString(1, asyncRequest.getId());
+ pstmt.setInt(2, vmid);
+ pstmt.executeUpdate();
+ pstmt.close();
+ }
+ }
+
+ public static void putFinishedVMs(AsyncRequest asyncRequest, Connection c) throws SQLException {
+
+ for (int vmid : asyncRequest.getFinishedVMs()) {
+ PreparedStatement pstmt = c.prepareStatement(SQL_INSERT_ASYNC_REQUESTS_FINISHED_VMS);
+ pstmt.setString(1, asyncRequest.getId());
+ pstmt.setInt(2, vmid);
+ pstmt.executeUpdate();
+ pstmt.close();
+ }
+ }
+
+ public static void putToBePreempted(AsyncRequest asyncRequest, Connection c) throws SQLException {
+
+ for (int vmid : asyncRequest.getToBePreempted()) {
+ PreparedStatement pstmt = c.prepareStatement(SQL_INSERT_ASYNC_REQUESTS_TO_BE_PREEMPTED);
+ pstmt.setString(1, asyncRequest.getId());
+ pstmt.setInt(2, vmid);
+ pstmt.executeUpdate();
+ pstmt.close();
+ }
+ }
+
+ public static void removeAllocatedVMs(AsyncRequest asyncRequest, Connection c) throws SQLException {
+
+ PreparedStatement pstmt = c.prepareStatement(SQL_DELETE_ASYNC_REQUESTS_ALLOCATED_VMS);
+ pstmt.setString(1, asyncRequest.getId());
+ pstmt.executeUpdate();
+ pstmt.close();
+ }
+
+ public static void removeFinishedVMs(AsyncRequest asyncRequest, Connection c) throws SQLException {
+
+ PreparedStatement pstmt = c.prepareStatement(SQL_DELETE_ASYNC_REQUESTS_FINISHED_VMS);
+ pstmt.setString(1, asyncRequest.getId());
+ pstmt.executeUpdate();
+ pstmt.close();
+ }
+
+ public static void removeToBePreempted(AsyncRequest asyncRequest, Connection c) throws SQLException {
+
+ PreparedStatement pstmt = c.prepareStatement(SQL_DELETE_ASYNC_REQUESTS_TO_BE_PREEMPTED);
+ pstmt.setString(1, asyncRequest.getId());
+ pstmt.executeUpdate();
+ pstmt.close();
+ }
+
+ public static void addAllocatedVMs(AsyncRequest asyncRequest, Connection c) throws SQLException {
+
+ PreparedStatement pstmt = c.prepareStatement(SQL_LOAD_ASYNC_REQUESTS_ALLOCATED_VMS);
+ pstmt.setString(1, asyncRequest.getId());
+ ResultSet rs = pstmt.executeQuery();
+
+ if (rs == null || !rs.next()) {
+ return;
+ }
+
+ do {
+ asyncRequest.addAllocatedVM(rs.getInt(1));
+ } while (rs.next());
+
+ pstmt.close();
+ }
+
+ public static void addFinishedVMs(AsyncRequest asyncRequest, Connection c) throws SQLException {
+
+ PreparedStatement pstmt = c.prepareStatement(SQL_LOAD_ASYNC_REQUESTS_FINISHED_VMS);
+ pstmt.setString(1, asyncRequest.getId());
+ ResultSet rs = pstmt.executeQuery();
+
+ if (rs == null || !rs.next()) {
+ return;
+ }
+
+ do {
+ asyncRequest.addFinishedVM(rs.getInt(1));
+ } while (rs.next());
+
+ pstmt.close();
+ }
+
+ public static void addToBePreempted(AsyncRequest asyncRequest, Connection c) throws SQLException {
+
+ PreparedStatement pstmt = c.prepareStatement(SQL_LOAD_ASYNC_REQUESTS_TO_BE_PREEMPTED);
+ pstmt.setString(1, asyncRequest.getId());
+ ResultSet rs = pstmt.executeQuery();
+
+ if (rs == null || !rs.next()) {
+ return;
+ }
+
+ do {
+ asyncRequest.addToBePreempted(rs.getInt(1));
+ } while (rs.next());
+
+ pstmt.close();
+ }
+
public static PreparedStatement getUpdateAsyncRequest(AsyncRequest asyncRequest, ReprFactory repr, Connection c)
throws SQLException {
@@ -365,6 +476,10 @@ public static void putAsyncRequestVMDeployment(AsyncRequest asyncRequest, int bi
VirtualMachine binding = asyncRequest.getBindings()[binding_index];
VirtualMachineDeployment dep = binding.getDeployment();
+ if (dep == null) {
+ return;
+ }
+
PreparedStatement pstmt = c.prepareStatement(SQL_INSERT_ASYNC_REQUESTS_VM_DEPLOYMENT);
pstmt.setString(1, asyncRequest.getId());
@@ -382,6 +497,10 @@ public static void putAsyncRequestVMDeployment(AsyncRequest asyncRequest, int bi
public static void putAsyncRequestVMPartitions(AsyncRequest asyncRequest, int binding_index, Connection c) throws SQLException {
VirtualMachine binding = asyncRequest.getBindings()[binding_index];
+ VirtualMachinePartition[] partitions = binding.getPartitions();
+ if (partitions == null) {
+ return;
+ }
for (VirtualMachinePartition partition : binding.getPartitions()) {
@@ -344,3 +344,21 @@ destpath VARCHAR(512),
on_image SMALLINT NOT NULL
);
+CREATE TABLE async_requests_allocated_vms
+(
+id VARCHAR(512) NOT NULL,
+vmid INT NOT NULL
+);
+
+CREATE TABLE async_requests_finished_vms
+(
+id VARCHAR(512) NOT NULL,
+vmid INT NOT NULL
+);
+
+CREATE TABLE async_requests_to_be_preempted
+(
+id VARCHAR(512) NOT NULL,
+vmid INT NOT NULL
+);
+
@@ -126,13 +126,15 @@ public void persistOne() throws Exception {
testVM.setUnPropagateRequired(true);
VirtualMachine[] testBindings = new VirtualMachine[1];
testBindings[0] = testVM;
+ int testAllocatedVM = 42;
DataConvert dataConvert = new DataConvert(this.locator.getReprFactory());
NIC[] testNICs = dataConvert.getNICs(testVM);
logger.debug("Nics: " + testNICs[0]);
//public AsyncRequest(String id, boolean spotinstances, Double spotPrice, boolean persistent, Caller caller, String groupID, VirtualMachine[] bindings, Context context, NIC[] requestedNics, String sshKeyName, Calendar creationTime) {
AsyncRequest testRequest = new AsyncRequest(testID, testSpotinstances, testMaxBid, testIsPersistent, testCaller, testGroupID, testBindings, context, testNICs, testSshKeyName, testCreationTime);
+ testRequest.addAllocatedVM(testAllocatedVM);
asyncRequestMap.addOrReplace(testRequest);
allRequests = asyncRequestMap.getAll();
@@ -153,5 +155,6 @@ public void persistOne() throws Exception {
assertEquals(testCreationTime, gotRequest.getCreationTime());
assertEquals(testVM.getID(), gotRequest.getBindings()[0].getID());
assertEquals(testNICs[0].getIpAddress(), gotRequest.getRequestedNics()[0].getIpAddress());
+ assertEquals(testAllocatedVM, gotRequest.getAllocatedVMs()[0]);
}
}
Oops, something went wrong.

0 comments on commit e732ad4

Please sign in to comment.