Browse files

Replace ehcache persistence for Spot instances with Derby persistence

  • Loading branch information...
1 parent c7f75a9 commit 393f95d2199194a63c1b0536c688a5d93a645d4c @oldpatricka oldpatricka committed Jun 20, 2011
Showing with 750 additions and 63 deletions.
  1. +1 −1 service/service/java/source/etc/workspace-service/other/main.xml
  2. +23 −0 service/service/java/source/share/lib/workspace_service_derby_schema.sql
  3. +5 −1 service/service/java/source/src/org/globus/workspace/async/AsyncRequest.java
  4. +41 −54 service/service/java/source/src/org/globus/workspace/async/AsyncRequestMap.java
  5. +36 −0 service/service/java/source/src/org/globus/workspace/persistence/DataConvert.java
  6. +11 −0 service/service/java/source/src/org/globus/workspace/persistence/PersistenceAdapter.java
  7. +23 −1 service/service/java/source/src/org/globus/workspace/persistence/PersistenceAdapterConstants.java
  8. +239 −4 service/service/java/source/src/org/globus/workspace/persistence/PersistenceAdapterImpl.java
  9. +157 −0 ...ervice/java/source/src/org/globus/workspace/persistence/impls/AsyncRequestMapPersistenceUtil.java
  10. +7 −0 ...service/java/source/src/org/globus/workspace/persistence/impls/VirtualMachinePersistenceUtil.java
  11. +9 −0 service/service/java/tests/suites/basic/AsyncRequestMapSuite.xml
  12. +11 −2 service/service/java/tests/suites/basic/home/services/etc/nimbus/workspace-service/other/main.xml
  13. +23 −0 ...service/java/tests/suites/basic/home/services/share/nimbus/lib/workspace_service_derby_schema.sql
  14. +156 −0 ...e/java/tests/suites/basic/src/org/globus/workspace/testing/suites/basic/AsyncRequestMapSuite.java
  15. +1 −0 service/service/java/tests/suites/build.properties
  16. +7 −0 ...rvice/java/tests/suites/failure/home/services/share/nimbus/lib/workspace_service_derby_schema.sql
View
2 service/service/java/source/etc/workspace-service/other/main.xml
@@ -614,7 +614,7 @@
<bean id="nimbus-rm.si.pricingmodel" class="$ASYNC{si.pricingmodel}" />
<bean id="nimbus-rm.si.asyncreqmap" class="org.globus.workspace.async.AsyncRequestMap">
- <constructor-arg value="$COMMON{caches.dir}" />
+ <constructor-arg ref="nimbus-rm.persistence.PersistenceAdapter" />
</bean>
<bean id="nimbus-rm.async.manager"
View
23 service/service/java/source/share/lib/workspace_service_derby_schema.sql
@@ -250,3 +250,26 @@ site_capacity INT NOT NULL,
repo_user VARCHAR(512) NOT NULL,
instance_mem SMALLINT NOT NULL
);
+
+--
+-- Persistence for AsyncRequests
+CREATE TABLE async_requests
+(
+id VARCHAR(512) NOT NULL PRIMARY KEY,
+max_bid DOUBLE,
+spot SMALLINT,
+persistent SMALLINT,
+creator_dn VARCHAR(512),
+creator_is_superuser SMALLINT,
+group_id VARCHAR(512),
+ssh_key_name VARCHAR(512),
+creation_time BIGINT,
+nics VARCHAR(512)
+);
+
+-- Persistence for AsyncRequest list of NICs
+CREATE TABLE async_requests_vms
+(
+id VARCHAR(512),
+vmid INT
+);
View
6 service/service/java/source/src/org/globus/workspace/async/AsyncRequest.java
@@ -143,7 +143,7 @@ public AsyncRequestStatus getStatus() {
public boolean isAlive(){
return this.statusIsOpenOrActive() || (this.status.isCancelled() && !allocatedVMs.isEmpty());
}
-
+
public boolean setStatus(AsyncRequestStatus status) {
if(statusIsOpenOrActive()){
this.status = status;
@@ -203,6 +203,10 @@ public void setProblem(Throwable problem) {
this.problem = problem;
}
+ public void setBindings(VirtualMachine[] toBind) {
+ this.bindings = toBind;
+ }
+
@Override
public String toString() {
return "SIRequest [id " + id + ", status= " + status + ", requestedInstances="
View
95 service/service/java/source/src/org/globus/workspace/async/AsyncRequestMap.java
@@ -16,15 +16,18 @@
package org.globus.workspace.async;
-import net.sf.ehcache.Cache;
-import net.sf.ehcache.CacheManager;
-import net.sf.ehcache.Element;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.globus.workspace.persistence.PersistenceAdapter;
+import org.globus.workspace.persistence.WorkspaceDatabaseException;
+import org.springframework.beans.factory.parsing.Problem;
import org.springframework.core.io.Resource;
+import org.springframework.scheduling.annotation.Async;
+import sun.tools.asm.CatchData;
import java.io.IOException;
import java.net.URL;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
@@ -38,46 +41,26 @@
private static final Log logger =
LogFactory.getLog(AsyncRequestMap.class.getName());
- private static final String CACHE_NAME = "nimbus-siCache";
- private static final String DISK_PROPKEY = "ehcache.disk.store.dir";
- private static final String SHUTDOWN_PROPKEY = "net.sf.ehcache.enableShutdownHook";
// -----------------------------------------------------------------------------------------
// INSTANCE VARIABLES
// -----------------------------------------------------------------------------------------
- private final Cache cache;
+ private PersistenceAdapter persistence;
// -----------------------------------------------------------------------------------------
// CONSTRUCTORS
// -----------------------------------------------------------------------------------------
- public AsyncRequestMap(Resource diskStoreResource) throws IOException {
+ public AsyncRequestMap(PersistenceAdapter persistenceAdapter) throws IOException {
- if (diskStoreResource == null) {
- throw new IllegalArgumentException("diskStoreResource may not be null");
+ if (persistenceAdapter == null) {
+ throw new IllegalArgumentException("persistenceAdapter may not be null");
}
- final String diskStorePath = diskStoreResource.getFile().getAbsolutePath();
-
- // TODO: Do dynamically. This must not be in conflict with other ehcache+diskstore
- // users (and there is at least one other), could not see how to set
- // on per-manager basis via spring (specifically, the to-disk part).
- System.setProperty(DISK_PROPKEY, diskStorePath);
-
- // TODO: We need a shutdown hook with disk-based. This creates a jvm
- // shutdown hook and is the least-recommended solution.
- System.setProperty(SHUTDOWN_PROPKEY, "true");
-
- final URL url = this.getClass().getResource("ehcache.xml");
- final CacheManager cacheManager = new CacheManager(url);
- this.cache = cacheManager.getCache(CACHE_NAME);
- if (this.cache == null) {
- throw new IllegalArgumentException(
- "cacheManager does not provide '" + CACHE_NAME + "'");
- }
+ this.persistence = persistenceAdapter;
this.loadAllFromDisk();
}
@@ -86,56 +69,60 @@ public AsyncRequestMap(Resource diskStoreResource) throws IOException {
// IMPL
// -----------------------------------------------------------------------------------------
- synchronized void addOrReplace(AsyncRequest asyncRequest) {
+ synchronized public void addOrReplace(AsyncRequest asyncRequest) {
if (asyncRequest == null) {
throw new IllegalArgumentException("asyncRequest is missing");
}
final String id = asyncRequest.getId();
if (id == null) {
throw new IllegalArgumentException("asyncRequest ID is missing");
}
- final Element el = new Element(id, asyncRequest);
- this.cache.put(el);
- this.cache.flush();
+
+ try {
+ this.persistence.addAsyncRequest(asyncRequest);
+ } catch(WorkspaceDatabaseException e) {
+ logger.error("Problem persisting AsyncRequest: ", e);
+ }
+
logger.debug("saved spot request, id: '" + id + "'");
}
- synchronized AsyncRequest getByID(String id) {
+ synchronized public AsyncRequest getByID(String id) {
if (id == null) {
return null;
}
- final Element el = this.cache.get(id);
- if (el == null) {
- return null;
- }
- final AsyncRequest req = (AsyncRequest) el.getObjectValue();
- if (req != null) {
- return req;
+
+ try {
+ final AsyncRequest asyncRequest = this.persistence.getAsyncRequest(id);
+ if (asyncRequest != null) {
+ return asyncRequest;
+ }
+ } catch(WorkspaceDatabaseException e) {
+ logger.error("Couldn't retrieve " + id + " from persistence");
}
+
+
logger.fatal("illegal object extension, no null values allowed");
return null;
}
- synchronized Collection<AsyncRequest> getAll() {
- final List allIDs = this.cache.getKeys();
- Collection<AsyncRequest> all = new HashSet<AsyncRequest>();
- for (int i = 0; i < allIDs.size(); i++) {
- String id = (String)allIDs.get(i);
- all.add(this.getByID(id));
+ synchronized public Collection<AsyncRequest> getAll() {
+
+ Collection<AsyncRequest> all = null;
+ try {
+ all = this.persistence.getAllAsyncRequests();
+ } catch(WorkspaceDatabaseException e) {
+ logger.error("Unable to load spot instances from persistence");
}
+
return all;
}
private void loadAllFromDisk() throws IOException {
- final List allIDs = this.cache.getKeys();
+ Collection<AsyncRequest> all = this.getAll();
int count = 0;
- for (int i = 0; i < allIDs.size(); i++) {
- String id = (String)allIDs.get(i);
- AsyncRequest ar = this.getByID(id);
- if (ar == null) {
- throw new IOException("SI Cache is inconsistent or corrupted");
- }
- count += 1;
+ if (all != null) {
+ count = all.size();
}
logger.info("Found " + count + " spot requests on disk.");
}
View
36 service/service/java/source/src/org/globus/workspace/persistence/DataConvert.java
@@ -390,6 +390,11 @@ public State getState(InstanceResource resource)
throw new CannotTranslateException("no network?");
}
+ return getNICs(network);
+ }
+
+ public NIC[] getNICs(String network) throws CannotTranslateException {
+
// Create objects
if (network.equalsIgnoreCase("NONE")) {
@@ -468,6 +473,37 @@ private static String nicPart(String part) {
}
+ public String nicsAsString(NIC[] nics) {
+
+ // NIC string format:
+ // Name;Assocaition;MAC;Network Mode;IP method
+ // ;IP address;gateway;broadcast;subnetmask;dns;hostname
+ // ;null;null;null;null (maintain old protocol)
+
+ String nicString = "";
+ for (NIC nic : nics) {
+ nicString += nic.getName() + ";";
+ nicString += nic.getNetworkName() + ";";
+ nicString += nic.getMAC() + ";";
+ nicString += "null" + ";"; //No network mode
+ nicString += nic.getAcquisitionMethod() + ";";
+ nicString += nic.getIpAddress() + ";";
+ nicString += nic.getGateway() + ";";
+ nicString += nic.getBroadcast() + ";";
+ nicString += nic.getNetmask() + ";";
+ nicString += "null" + ";"; //No dns
+ nicString += nic.getHostname() + ";";
+ nicString += "null;null;null";
+
+ if (nic != nics[nics.length-1]) {
+ nicString += ";;";
+ }
+
+ }
+ return nicString;
+ }
+
+
// -------------------------------------------------------------------------
// ALLOCATION RELATED
// -------------------------------------------------------------------------
View
11 service/service/java/source/src/org/globus/workspace/persistence/PersistenceAdapter.java
@@ -16,10 +16,12 @@
package org.globus.workspace.persistence;
+import java.util.ArrayList;
import java.util.Calendar;
import java.util.Hashtable;
import java.util.List;
+import org.globus.workspace.async.AsyncRequest;
import org.globus.workspace.async.backfill.Backfill;
import org.globus.workspace.creation.IdempotentReservation;
import org.globus.workspace.network.AssociationEntry;
@@ -311,4 +313,13 @@ public void addIdempotentReservation(IdempotentReservation reservation)
*/
public void removeIdempotentReservation(String creatorId, String clientToken)
throws WorkspaceDatabaseException;
+
+ public void addAsyncRequest(AsyncRequest asyncRequest)
+ throws WorkspaceDatabaseException;
+
+ public AsyncRequest getAsyncRequest(String id)
+ throws WorkspaceDatabaseException;
+
+ public ArrayList<AsyncRequest> getAllAsyncRequests()
+ throws WorkspaceDatabaseException;
}
View
24 ...service/java/source/src/org/globus/workspace/persistence/PersistenceAdapterConstants.java
@@ -243,7 +243,26 @@
public static final String SQL_DELETE_IDEMPOTENT_CREATION =
"DELETE FROM idempotency WHERE creator_dn=? AND client_token=?";
-
+
+ public static final String SQL_INSERT_ASYNC_REQUEST =
+ "INSERT INTO async_requests (id, max_bid, spot, group_id, persistent, creator_dn, creator_is_superuser, ssh_key_name, creation_time, nics) " +
+ " VALUES (?,?,?,?,?,?,?,?,?,?)";
+
+ public static final String SQL_LOAD_ASYNC_REQUEST =
+ "SELECT id, max_bid, spot, group_id, persistent, creator_dn, creator_is_superuser, ssh_key_name, creation_time, nics FROM async_requests WHERE id=?";
+
+ public static final String SQL_LOAD_ALL_ASYNC_REQUESTS =
+ "SELECT id, max_bid, spot, group_id, persistent, creator_dn, creator_is_superuser, ssh_key_name, creation_time, nics FROM async_requests";
+
+ public static final String SQL_LOAD_ASYNC_REQUESTS_VMS =
+ "SELECT vmid FROM async_requests_vms WHERE id=?";
+
+ public static final String SQL_INSERT_ASYNC_REQUESTS_VMS =
+ "INSERT INTO async_requests_vms (id,vmid) VALUES (?,?)";
+
+ public static final String SQL_UPDATE_ASYNC_REQUEST =
+ "UPDATE async_requests SET id=?";
+
public static final String[] PREPARED_STATEMENTS = {
SQL_SELECT_RESOURCES,
SQL_SELECT_ALL_ASSOCIATIONS,
@@ -304,5 +323,8 @@
SQL_SELECT_IDEMPOTENT_CREATION,
SQL_INSERT_IDEMPOTENT_CREATION,
SQL_DELETE_IDEMPOTENT_CREATION,
+// SQL_INSERT_ASYNC_REQUEST,
+// SQL_LOAD_ALL_ASYNC_REQUESTS,
+// SQL_LOAD_ASYNC_REQUEST,
};
}
View
243 service/service/java/source/src/org/globus/workspace/persistence/PersistenceAdapterImpl.java
@@ -36,16 +36,15 @@
import org.apache.commons.logging.LogFactory;
import org.globus.workspace.Lager;
import org.globus.workspace.WorkspaceConstants;
+import org.globus.workspace.async.AsyncRequest;
+import org.globus.workspace.async.AsyncRequestMap;
import org.globus.workspace.creation.IdempotentInstance;
import org.globus.workspace.creation.IdempotentReservation;
import org.globus.workspace.creation.defaults.IdempotentInstanceImpl;
import org.globus.workspace.creation.defaults.IdempotentReservationImpl;
import org.globus.workspace.network.Association;
import org.globus.workspace.network.AssociationEntry;
-import org.globus.workspace.persistence.impls.AssociationPersistenceUtil;
-import org.globus.workspace.persistence.impls.IdempotencyPersistenceUtil;
-import org.globus.workspace.persistence.impls.VMPersistence;
-import org.globus.workspace.persistence.impls.VirtualMachinePersistenceUtil;
+import org.globus.workspace.persistence.impls.*;
import org.globus.workspace.async.backfill.Backfill;
import org.globus.workspace.scheduler.defaults.ResourcepoolEntry;
import org.globus.workspace.service.CoschedResource;
@@ -55,8 +54,10 @@
import org.globus.workspace.service.binding.vm.VirtualMachine;
import org.globus.workspace.service.binding.vm.VirtualMachinePartition;
import org.nimbustools.api._repr._SpotPriceEntry;
+import org.nimbustools.api.repr.CannotTranslateException;
import org.nimbustools.api.repr.ReprFactory;
import org.nimbustools.api.repr.SpotPriceEntry;
+import org.nimbustools.api.repr.vm.NIC;
import org.nimbustools.api.services.rm.DoesNotExistException;
import org.nimbustools.api.services.rm.ManageException;
@@ -1503,6 +1504,77 @@ public void load(int id, InstanceResource resource)
}
}
+ public VirtualMachine loadVM(int id) throws SQLException, DoesNotExistException, WorkspaceDatabaseException {
+
+ if (this.dbTrace) {
+ logger.trace(Lager.id(id) + ": load virtual machine");
+ }
+
+ Connection c = getConnection();
+ PreparedStatement[] pstmts = VirtualMachinePersistenceUtil.getVMQuery(id, c);
+ ResultSet rs = pstmts[0].executeQuery();
+ if (rs == null || !rs.next()) {
+ logger.error("resource with id=" + id + " not found");
+ throw new DoesNotExistException();
+ }
+
+ final VirtualMachine vm =
+ VirtualMachinePersistenceUtil.newVM(id, rs);
+
+ if (this.dbTrace) {
+ logger.trace(Lager.id(id) +
+ ", created vm:\n" + vm.toString());
+ }
+
+ rs.close();
+
+ rs = pstmts[1].executeQuery();
+ if (rs == null || !rs.next()) {
+ logger.debug("resource with id=" + id + " has no" +
+ " deployment information");
+ } else {
+ VirtualMachinePersistenceUtil.addDeployment(vm, rs);
+ if (this.dbTrace) {
+ logger.trace("added deployment info to vm object");
+ }
+ rs.close();
+ }
+
+ rs = pstmts[2].executeQuery();
+
+ if (rs == null || !rs.next()) {
+ logger.warn("resource with id=" + id + " has no" +
+ " partitions");
+ } else {
+ final ArrayList partitions = new ArrayList(8);
+ do {
+ partitions.add(VirtualMachinePersistenceUtil.
+ getPartition(rs));
+ } while (rs.next());
+
+ final VirtualMachinePartition[] parts =
+ (VirtualMachinePartition[]) partitions.toArray(
+ new VirtualMachinePartition[partitions.size()]);
+ vm.setPartitions(parts);
+ }
+
+ rs = pstmts[3].executeQuery();
+
+ if (rs == null || !rs.next()) {
+ if (this.lager.dbLog) {
+ logger.debug("resource with id=" + id + " has no" +
+ " customization needs");
+ }
+ } else {
+ do {
+ vm.addFileCopyNeed(
+ VirtualMachinePersistenceUtil.getNeed(rs));
+ } while (rs.next());
+ }
+
+ return vm;
+ }
+
public void loadGroup(String id, GroupResource resource)
throws DoesNotExistException, WorkspaceDatabaseException {
@@ -3149,4 +3221,167 @@ public void removeIdempotentReservation(String creatorId, String clientToken)
}
}
}
+
+ public void addAsyncRequest(AsyncRequest asyncRequest)
+ throws WorkspaceDatabaseException {
+
+ if (asyncRequest == null) {
+ throw new IllegalArgumentException("asyncRequest may not be null");
+ }
+
+
+ Connection c = null;
+ PreparedStatement pstmt = null;
+ PreparedStatement[] pstmts = null;
+ try {
+ c = getConnection();
+ logger.debug("SETTING spot = " + asyncRequest.isSpotRequest());
+ pstmt = AsyncRequestMapPersistenceUtil.getInsertAsyncRequest(asyncRequest, this.repr, c);
+ pstmt.executeUpdate();
+
+ VirtualMachine[] bindings = asyncRequest.getBindings();
+ if (bindings != null) {
+
+ for (VirtualMachine vm : asyncRequest.getBindings()) {
+
+ pstmts = VirtualMachinePersistenceUtil.
+ getInsertVM(vm, vm.getID(), c);
+
+ if (this.dbTrace) {
+ logger.trace("creating VirtualMachine db " +
+ "entry for " + Lager.id(vm.getID()) + ": " +
+ pstmts.length + " inserts");
+ }
+
+ for (int i = 0; i < pstmts.length; i++) {
+ pstmts[i].executeUpdate();
+ }
+
+ pstmt = AsyncRequestMapPersistenceUtil.getInsertAsyncRequestVM(asyncRequest.getId(), vm.getID(), c);
+ pstmt.executeUpdate();
+ }
+ }
+ c.commit();
+
+ } catch (ManageException e) {
+ logger.error("",e);
+ throw new WorkspaceDatabaseException(e);
+ } catch (SQLException e) {
+ logger.error("",e);
+ throw new WorkspaceDatabaseException(e);
+ } finally {
+ try {
+ if (pstmt != null) {
+ pstmt.close();
+ }
+ if (pstmts != null) {
+ for (PreparedStatement p : pstmts) {
+ p.close();
+ }
+ }
+ if (c != null) {
+ returnConnection(c);
+ }
+ } catch (SQLException sql) {
+ logger.error("SQLException in finally cleanup", sql);
+ }
+ }
+ }
+
+ public AsyncRequest getAsyncRequest(String id)
+ throws WorkspaceDatabaseException {
+
+ if (id == null) {
+ throw new IllegalArgumentException("id may not be null");
+ }
+
+ AsyncRequest asyncRequest = null;
+
+ Connection c = null;
+ PreparedStatement pstmt = null;
+ ResultSet rs = null;
+ try {
+ c = getConnection();
+ pstmt = AsyncRequestMapPersistenceUtil.getAsyncRequest(id, c);
+ rs = pstmt.executeQuery();
+
+ if (rs == null || !rs.next()) {
+ logger.debug("No Asyncrequest with ID " + id);
+ return null;
+ }
+
+ asyncRequest = AsyncRequestMapPersistenceUtil.rsToAsyncRequest(rs, this.repr, c);
+ ArrayList<VirtualMachine> bindings = new ArrayList<VirtualMachine>();
+ for (int vmid : AsyncRequestMapPersistenceUtil.getVMIDs(asyncRequest.getId(), c)) {
+ bindings.add(loadVM(vmid));
+ }
+ VirtualMachine[] newBindings = bindings.toArray(new VirtualMachine[bindings.size()]);
+ asyncRequest.setBindings(newBindings);
+
+ } catch (SQLException e) {
+ logger.error("",e);
+ throw new WorkspaceDatabaseException(e);
+ } catch (DoesNotExistException e) {
+ logger.error("",e);
+ throw new WorkspaceDatabaseException(e);
+ } catch (CannotTranslateException e) {
+ logger.error("",e);
+ throw new WorkspaceDatabaseException(e);
+ } finally {
+ try {
+ if (pstmt != null) {
+ pstmt.close();
+ }
+ if (c != null) {
+ returnConnection(c);
+ }
+ } catch (SQLException sql) {
+ logger.error("SQLException in finally cleanup", sql);
+ }
+ }
+
+ return asyncRequest;
+ }
+
+ public ArrayList<AsyncRequest> getAllAsyncRequests()
+ throws WorkspaceDatabaseException {
+
+ ArrayList<AsyncRequest> asyncRequests = new ArrayList<AsyncRequest>();
+
+ Connection c = null;
+ PreparedStatement pstmt = null;
+ ResultSet rs = null;
+ try {
+ c = getConnection();
+ pstmt = AsyncRequestMapPersistenceUtil.getAllAsyncRequests(c);
+ rs = pstmt.executeQuery();
+
+ if (rs == null || !rs.next()) {
+ logger.debug("No AsyncRequests");
+ return asyncRequests;
+ }
+
+ do {
+ String id = rs.getString("id");
+ asyncRequests.add(getAsyncRequest(id));
+ } while(rs.next());
+
+ } catch (SQLException e) {
+ logger.error("",e);
+ throw new WorkspaceDatabaseException(e);
+ } finally {
+ try {
+ if (pstmt != null) {
+ pstmt.close();
+ }
+ if (c != null) {
+ returnConnection(c);
+ }
+ } catch (SQLException sql) {
+ logger.error("SQLException in finally cleanup", sql);
+ }
+ }
+
+ return asyncRequests;
+ }
}
View
157 ...ava/source/src/org/globus/workspace/persistence/impls/AsyncRequestMapPersistenceUtil.java
@@ -0,0 +1,157 @@
+/*
+ * Copyright 1999-2008 University of Chicago
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy
+ * of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.globus.workspace.persistence.impls;
+
+import org.globus.workspace.async.AsyncRequest;
+import org.globus.workspace.network.Association;
+import org.globus.workspace.network.AssociationEntry;
+import org.globus.workspace.persistence.DataConvert;
+import org.globus.workspace.persistence.PersistenceAdapterConstants;
+import org.nimbustools.api._repr._Caller;
+import org.nimbustools.api.repr.Caller;
+import org.nimbustools.api.repr.CannotTranslateException;
+import org.nimbustools.api.repr.ReprFactory;
+import org.nimbustools.api.repr.ctx.Context;
+import org.nimbustools.api.repr.vm.NIC;
+import org.nimbustools.api.services.rm.ManageException;
+
+import javax.xml.crypto.Data;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.*;
+import java.util.logging.Logger;
+
+public class AsyncRequestMapPersistenceUtil
+ implements PersistenceAdapterConstants {
+
+ public static PreparedStatement getInsertAsyncRequest(AsyncRequest asyncRequest, ReprFactory repr, Connection c)
+ throws SQLException {
+
+ final PreparedStatement pstmt = c.prepareStatement(SQL_INSERT_ASYNC_REQUEST);
+ pstmt.setString(1, asyncRequest.getId());
+ pstmt.setDouble(2, asyncRequest.getMaxBid());
+ if (asyncRequest.isSpotRequest()) {
+ pstmt.setInt(3, 1);
+ }
+ else {
+ pstmt.setInt(3, 0);
+ }
+ pstmt.setString(4, asyncRequest.getGroupID());
+ if (asyncRequest.isPersistent()) {
+ pstmt.setInt(5, 1);
+ }
+ else {
+ pstmt.setInt(5, 0);
+ }
+ pstmt.setString(6, asyncRequest.getCaller().getIdentity());
+ if (asyncRequest.getCaller().isSuperUser()) {
+ pstmt.setInt(7, 1);
+ }
+ else {
+ pstmt.setInt(7, 0);
+ }
+ pstmt.setString(8, asyncRequest.getSshKeyName());
+
+ if (asyncRequest.getCreationTime() != null) {
+ pstmt.setLong(9, asyncRequest.getCreationTime().getTimeInMillis());
+ }
+ else {
+ pstmt.setInt(9,0);
+ }
+ DataConvert dataConvert = new DataConvert(repr);
+ String nics = dataConvert.nicsAsString(asyncRequest.getRequestedNics());
+ pstmt.setString(10, nics);
+ return pstmt;
+ }
+
+ public static PreparedStatement getAsyncRequest(String id, Connection c)
+ throws SQLException {
+
+ final PreparedStatement pstmt = c.prepareStatement(SQL_LOAD_ASYNC_REQUEST);
+
+ pstmt.setString(1, id);
+
+ return pstmt;
+ }
+
+ public static PreparedStatement getAllAsyncRequests(Connection c)
+ throws SQLException {
+
+ final PreparedStatement pstmt = c.prepareStatement(SQL_LOAD_ALL_ASYNC_REQUESTS);
+ return pstmt;
+ }
+
+ public static AsyncRequest rsToAsyncRequest(ResultSet rs, ReprFactory repr, Connection c)
+ throws SQLException, CannotTranslateException {
+
+ final String id = rs.getString("id");
+ final Double maxBid = rs.getDouble("max_bid");
+ final boolean isSpotInstance = rs.getBoolean("spot");
+ final String groupID = rs.getString("group_id");
+ final boolean isPersistent = rs.getBoolean("persistent");
+ final String creatorDN = rs.getString("creator_dn");
+ final boolean isSuperuser = rs.getBoolean("creator_is_superuser");
+ _Caller caller = repr._newCaller();
+ caller.setIdentity(creatorDN);
+ caller.setSuperUser(isSuperuser);
+ // NOTE: this context isn't persisted because it doesn't seem to be used
+ final Context context = repr._newContext();
+
+ final long t = rs.getLong("creation_time");
+ final String sshKeyName = rs.getString("ssh_key_name");
+ final Calendar creationTime = Calendar.getInstance();
+ creationTime.setTimeInMillis(t);
+ final String nicsAsString = rs.getString("nics");
+
+ DataConvert dataConvert = new DataConvert(repr);
+ NIC[] nics = null;
+ nics = dataConvert.getNICs(nicsAsString);
+
+ //public AsyncRequest(String id, boolean spotinstances, Double spotPrice, boolean persistent, Caller caller, String groupID, VirtualMachine[] bindings, Context context, NIC[] requestedNics, String sshKeyName, Calendar creationTime) {
+ //AsyncRequest testRequest = new AsyncRequest(testID, testSpotinstances, testMaxBid, false, null, testGroupID, null, null, null, null, null);
+ return new AsyncRequest(id, isSpotInstance, maxBid, isPersistent, caller, groupID, null, context, nics, sshKeyName, creationTime);
+ }
+
+ public static ArrayList<Integer> getVMIDs(String id, Connection c) throws SQLException{
+
+ final PreparedStatement pstmt = c.prepareStatement(SQL_LOAD_ASYNC_REQUESTS_VMS);
+ pstmt.setString(1, id);
+ ResultSet rs = pstmt.executeQuery();
+
+ ArrayList<Integer> ids = new ArrayList<Integer>();
+
+ if (rs == null || !rs.next()) {
+ return ids;
+ }
+
+ do {
+ ids.add(rs.getInt(1));
+ } while (rs.next());
+
+ return ids;
+ }
+
+ public static PreparedStatement getInsertAsyncRequestVM(String id, int vmid, Connection c) throws SQLException {
+
+ final PreparedStatement pstmt = c.prepareStatement(SQL_INSERT_ASYNC_REQUESTS_VMS);
+ pstmt.setString(1, id);
+ pstmt.setInt(2, vmid);
+ return pstmt;
+ }
+}
View
7 ...java/source/src/org/globus/workspace/persistence/impls/VirtualMachinePersistenceUtil.java
@@ -45,6 +45,13 @@
if (vm == null) {
throw new ProgrammingError("vm is null");
}
+ return getInsertVM(vm, id, c);
+ }
+
+ public static PreparedStatement[] getInsertVM(VirtualMachine vm,
+ int id,
+ Connection c)
+ throws ManageException, SQLException {
final PreparedStatement pstmt = c.prepareStatement(SQL_INSERT_VM);
View
9 service/service/java/tests/suites/basic/AsyncRequestMapSuite.xml
@@ -0,0 +1,9 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd">
+<suite name="BasicSuite" parallel="none">
+ <test verbose="1" name="nimbus" annotations="JDK">
+ <classes>
+ <class name="org.globus.workspace.testing.suites.basic.AsyncRequestMapSuite"/>
+ </classes>
+ </test>
+</suite>
View
13 ...service/java/tests/suites/basic/home/services/etc/nimbus/workspace-service/other/main.xml
@@ -97,7 +97,16 @@
<constructor-arg ref="other.timerManager" />
<constructor-arg ref="nimbus-rm.loglevels" />
<constructor-arg ref="nimbus-rm.service.binding.BindNetwork" />
- <constructor-arg ref="nimbus-rm.creation.idempotent" />
+
+ <!-- idempotent requests can be disabled by replacing this
+ last arg with a null reference, like:
+
+ <constructor-arg><null/></constructor-arg>
+
+ You can also comment out the idempotent bean definition below but it
+ is not required.
+ -->
+ <constructor-arg ref="nimbus-rm.creation.idempotent"/>
<property name="accountingEventAdapter"
ref="nimbus-rm.accounting" />
@@ -605,7 +614,7 @@
<bean id="nimbus-rm.si.pricingmodel" class="$ASYNC{si.pricingmodel}" />
<bean id="nimbus-rm.si.asyncreqmap" class="org.globus.workspace.async.AsyncRequestMap">
- <constructor-arg value="$COMMON{caches.dir}" />
+ <constructor-arg ref="nimbus-rm.persistence.PersistenceAdapter" />
</bean>
<bean id="nimbus-rm.async.manager"
View
23 ...java/tests/suites/basic/home/services/share/nimbus/lib/workspace_service_derby_schema.sql
@@ -250,3 +250,26 @@ site_capacity INT NOT NULL,
repo_user VARCHAR(512) NOT NULL,
instance_mem SMALLINT NOT NULL
);
+
+--
+-- Persistence for AsyncRequests
+CREATE TABLE async_requests
+(
+id VARCHAR(512) NOT NULL PRIMARY KEY,
+max_bid DOUBLE,
+spot SMALLINT,
+persistent SMALLINT,
+creator_dn VARCHAR(512),
+creator_is_superuser SMALLINT,
+group_id VARCHAR(512),
+ssh_key_name VARCHAR(512),
+creation_time BIGINT,
+nics VARCHAR(512)
+);
+
+-- Persistence for AsyncRequest list of NICs
+CREATE TABLE async_requests_vms
+(
+id VARCHAR(512),
+vmid INT
+);
View
156 ...ests/suites/basic/src/org/globus/workspace/testing/suites/basic/AsyncRequestMapSuite.java
@@ -0,0 +1,156 @@
+/*
+ * Copyright 1999-2010 University of Chicago
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy
+ * of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.globus.workspace.testing.suites.basic;
+
+import org.apache.commons.dbcp.BasicDataSource;
+import org.globus.workspace.async.AsyncRequest;
+import org.globus.workspace.async.AsyncRequestMap;
+import org.globus.workspace.persistence.DataConvert;
+import org.globus.workspace.persistence.PersistenceAdapter;
+import org.globus.workspace.service.binding.vm.VirtualMachine;
+import org.globus.workspace.testing.NimbusTestBase;
+import org.globus.workspace.testing.NimbusTestContextLoader;
+import org.nimbustools.api.repr.Caller;
+import org.nimbustools.api.repr.CreateResult;
+import org.nimbustools.api.repr.ctx.Context;
+import org.nimbustools.api.repr.vm.NIC;
+import org.nimbustools.api.repr.vm.VM;
+import org.nimbustools.api.services.rm.Manager;
+import org.springframework.test.context.ContextConfiguration;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.Test;
+import org.springframework.core.io.FileSystemResource;
+import sun.management.FileSystem;
+
+
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.Properties;
+
+import static org.testng.AssertJUnit.*;
+
+@ContextConfiguration(
+ locations={"file:./service/service/java/tests/suites/basic/home/services/etc/nimbus/workspace-service/other/main.xml"},
+ loader=NimbusTestContextLoader.class)
+public class AsyncRequestMapSuite extends NimbusTestBase {
+
+ // -----------------------------------------------------------------------------------------
+ // extends NimbusTestBase
+ // -----------------------------------------------------------------------------------------
+
+ @AfterSuite(alwaysRun=true)
+ @Override
+ public void suiteTeardown() throws Exception {
+ super.suiteTeardown();
+ }
+
+ /**
+ * This is how coordinate your Java test suite code with the conf files to use.
+ * @return absolute path to the value that should be set for $NIMBUS_HOME
+ * @throws Exception if $NIMBUS_HOME cannot be determined
+ */
+ @Override
+ protected String getNimbusHome() throws Exception {
+ return this.determineSuitesPath() + "/basic/home";
+ }
+
+
+ // -----------------------------------------------------------------------------------------
+ // PREREQ TESTS (if any of these fail, nothing else will work at all)
+ // -----------------------------------------------------------------------------------------
+
+ /**
+ * Check if ModuleLocator can be retrieved and used at all.
+ * @throws Exception problem
+ */
+ @Test(groups="prereqs")
+ public void retrieveModuleLocator() throws Exception {
+ logger.debug("retrieveModuleLocator");
+ final Manager rm = this.locator.getManager();
+ final VM[] vms = rm.getGlobalAll();
+
+ // we know there are zero so far because it is in group 'prereqs'
+ assertEquals(0, vms.length);
+ }
+
+ /**
+ * Lease a VM and then destroy it.
+ * @throws Exception problem
+ */
+ @Test(dependsOnGroups="prereqs")
+ public void persistOne() throws Exception {
+ logger.debug("persistOne");
+ final Manager rm = this.locator.getManager();
+ final Caller caller = this.populator().getCaller();
+
+ PersistenceAdapter persistence = (PersistenceAdapter) applicationContext.getBean("nimbus-rm.persistence.PersistenceAdapter");
+ AsyncRequestMap asyncRequestMap = new AsyncRequestMap(persistence);
+
+
+ // Validate that we have a working AsyncRequestMap. It should be empty
+ Collection<AsyncRequest> allRequests = asyncRequestMap.getAll();
+ logger.debug("You have " + allRequests.size() + " requests.");
+ assert(allRequests.size() == 0);
+
+ // Test putting and getting from persistence
+ String testID = "fake-id";
+ Double testMaxBid = 42.0;
+ boolean testSpotinstances = false;
+ String testGroupID = "fake-group-id";
+ boolean testIsPersistent = true;
+ Caller testCaller = this.populator().getSuperuserCaller();
+ Context context = null;
+ String testSshKeyName = "fake-ssh-key";
+ Calendar testCreationTime = Calendar.getInstance();
+ testCreationTime.setTimeInMillis(424242424242l);
+ String testNIC = "FakeName;FakeAssociation;FAKEMAC;NetMode;IPmethod;192.168.1.42;192.168.1.1;192.168.1.2;subnetmask;dns;hostname;null;null;null;null";
+ VirtualMachine testVM = new VirtualMachine();
+ testVM.setID(42);
+ testVM.setName("fakename");
+ testVM.setNetwork(testNIC);
+ testVM.setPropagateRequired(false);
+ testVM.setUnPropagateRequired(true);
+ VirtualMachine[] testBindings = new VirtualMachine[1];
+ testBindings[0] = testVM;
+
+ DataConvert dataConvert = new DataConvert(this.locator.getReprFactory());
+ NIC[] testNICs = dataConvert.getNICs(testVM);
+ logger.debug("Nics: " + testNICs[0]);
+
+ //public AsyncRequest(String id, boolean spotinstances, Double spotPrice, boolean persistent, Caller caller, String groupID, VirtualMachine[] bindings, Context context, NIC[] requestedNics, String sshKeyName, Calendar creationTime) {
+ AsyncRequest testRequest = new AsyncRequest(testID, testSpotinstances, testMaxBid, testIsPersistent, testCaller, testGroupID, testBindings, context, testNICs, testSshKeyName, testCreationTime);
+ asyncRequestMap.addOrReplace(testRequest);
+
+ allRequests = asyncRequestMap.getAll();
+ logger.debug("You have " + allRequests.size() + " requests.");
+ assert(allRequests.size() == 1);
+
+ // Note, the persistence layer just dumps the context, so we don't test for it
+
+ AsyncRequest gotRequest = asyncRequestMap.getByID(testID);
+ assertEquals(testID, gotRequest.getId());
+ assertEquals(testMaxBid, gotRequest.getMaxBid());
+ assertEquals(testSpotinstances, gotRequest.isSpotRequest());
+ assertEquals(testGroupID, gotRequest.getGroupID());
+ assertEquals(testIsPersistent, gotRequest.isPersistent());
+ assertEquals(testCaller, gotRequest.getCaller());
+ assertEquals(testSshKeyName, gotRequest.getSshKeyName());
+ assertEquals(testCreationTime, gotRequest.getCreationTime());
+ assertEquals(testVM.getID(), gotRequest.getBindings()[0].getID());
+ assertEquals(testNICs[0].getIpAddress(), gotRequest.getRequestedNics()[0].getIpAddress());
+ }
+}
View
1 service/service/java/tests/suites/build.properties
@@ -28,6 +28,7 @@ st.basic03=IdemptotentCreationSuite
st.basic04=Issue37Suite
st.basic05=NodeManagementSuite
st.basic06=ParallelIdempotentCreationSuite
+st.basic07=AsyncRequestMapSuite
# FAILURE SUITES
st.failure.dir=${nimbus.suitesdir}/failure
View
7 ...va/tests/suites/failure/home/services/share/nimbus/lib/workspace_service_derby_schema.sql
@@ -250,3 +250,10 @@ site_capacity INT NOT NULL,
repo_user VARCHAR(512) NOT NULL,
instance_mem SMALLINT NOT NULL
);
+
+--
+-- Persistence for AsyncRequests
+CREATE TABLE async_requests
+(
+id VARCHAR(512) NOT NULL PRIMARY KEY
+);

0 comments on commit 393f95d

Please sign in to comment.