Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Refactored to fix minor bugs and improve readability and added docume…

…ntation
  • Loading branch information...
commit 3809cc5ae8dc553ae81c1a2394d2c2236746d1b4 1 parent 8f7bfe1
@pauloricardomg pauloricardomg authored
View
2  service-api/java/source/src/org/nimbustools/api/repr/si/SIConstants.java
@@ -24,5 +24,7 @@
public static Integer getInstanceMem(String instanceType){
return instanceMems.get(instanceType);
}
+
+ public static final String SI_REQUEST_PREFIX = "si-";
}
View
23 service/service/java/source/src/org/globus/workspace/StateChangeInterested.java
@@ -21,16 +21,16 @@
public interface StateChangeInterested {
/**
- * This notification allows the scheduler to be autonomous
+ * This notification allows modules to be autonomous
* from the service layer's actions if it wants to be (instead
* of allowing the resource states to progress, it could time
* every state transition by continually re-adjusting the
* resource's target state when it is time to transition it).
*
* The first state notification (always preceded by a call to
- * schedule) signals that the scheduler may act. This allows
- * the service layer to finalize creation before the scheduler
- * acts on a a resouce.
+ * schedule) signals that creation process has finished.
+ * This allows the service layer to finalize creation
+ * before a module (ie. scheduler) can act on a a resouce.
*
* @param vmid id
* @param state STATE_* in WorkspaceConstants
@@ -42,13 +42,16 @@ public void stateNotification(int vmid, int state)
/**
* Batch state notification
+ *
* NOTE: This version doesn't throw exception when
- * an error occurs during the notification. If error conditions need to be
- * treated, use {@code stateNotification(int vmid, int state)}
- * instead. However, implementations of this interface
- * are recommended to log errors.
- * @param vmids
- * @param state
+ * an error occurs during the notification. If error
+ * conditions need to be treated, use
+ * {@code stateNotification(int vmid, int state)}
+ * instead. However, implementations of this
+ * interface are recommended to log possible errors.
+ *
+ * @param vmids ids of vms
+ * @param stateSTATE_* in WorkspaceConstants
*/
public void stateNotification(int[] vmids, int state);
View
12 ...e/service/java/source/src/org/globus/workspace/creation/defaults/CreationManagerImpl.java
@@ -57,6 +57,7 @@
import org.nimbustools.api.repr.ReprFactory;
import org.nimbustools.api.repr.RequestSI;
import org.nimbustools.api.repr.ctx.Context;
+import org.nimbustools.api.repr.si.SIConstants;
import org.nimbustools.api.repr.vm.NIC;
import org.nimbustools.api.repr.vm.ResourceAllocation;
import org.nimbustools.api.services.rm.AuthorizationException;
@@ -300,7 +301,7 @@ public SIRequest requestSpotInstances(RequestSI req, Caller caller)
final String groupID = this.getGroupID(creatorID, bound.length);
- final String siID = generateID();
+ final String siID = generateRequestID();
SIRequest siRequest = new SIRequest(siID, req.getSpotPrice(), req.isPersistent(), caller, groupID, bound, req.getContext(), req.getRequestedNics(), Calendar.getInstance());
siManager.addRequest(siRequest);
@@ -345,11 +346,12 @@ public SIRequest requestSpotInstances(RequestSI req, Caller caller)
}
/**
- * TODO: Temporary random generation, update to use more advanced method.
- * @return
+ * Generates a random Spot Instance request ID
+ * @return the generated ID
*/
- private String generateID() {
- return "" + Math.random()*10000000;
+ private String generateRequestID() {
+
+ return SIConstants.SI_REQUEST_PREFIX + this.uuidGen.generateRandomBasedUUID().toString();
}
protected String getType(CreateRequest req) {
View
19 service/service/java/source/src/org/globus/workspace/persistence/PersistenceAdapter.java
@@ -206,6 +206,25 @@ public Integer getTotalAvailableMemory()
throws WorkspaceDatabaseException;
+ /**
+ * Gets the total available memory as
+ * a sum of integer available chunks from
+ * each resource pool entry
+ *
+ * This is useful for knowing the exact
+ * amount of memory that is readily available
+ * for allocations of that chunk size (ie. 128MB),
+ * and not incurring the risk of having
+ * 64MB in one VMM and 64MB in another,
+ * what will not suffice to allocate a 128MB
+ * VM (although 128MB are theoretically
+ * available)
+ *
+ * @param multipleOf size of the chunk
+ * @return the total available memory as a
+ * multiple of the chunk size (ie: result%multipleOf = 0)
+ * @throws WorkspaceDatabaseException DB error
+ */
public Integer getTotalAvailableMemory(Integer multipleOf)
throws WorkspaceDatabaseException;
View
4 ...ervice/java/source/src/org/globus/workspace/scheduler/defaults/DefaultSlotManagement.java
@@ -353,7 +353,7 @@ public synchronized Reservation reserveCoscheduledSpace(
//so, free preemptable space
//and decrease i value, so
//previous entry can be reconsidered
- preempManager.freeSpace(neededMem);
+ preempManager.releaseSpace(neededMem);
i--;
} else {
throw e;
@@ -591,7 +591,7 @@ public synchronized void validate() throws Exception {
this.db.replaceResourcepools(new_resourcepools);
- preempManager.start();
+ preempManager.init();
}
}
View
65 ...vice/java/source/src/org/globus/workspace/scheduler/defaults/PreemptableSpaceManager.java
@@ -1,10 +1,69 @@
+/*
+ * Copyright 1999-2010 University of Chicago
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy
+ * of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
package org.globus.workspace.scheduler.defaults;
-
+/**
+ * Entity that manages workspaces
+ * that use pre-emptable space
+ *
+ * A pre-emptable-space-enabled {@link SlotManagement}
+ * will normally contain one associated
+ * {@link PreemptableSpaceManager} in order to
+ * release space in emergency situations (ie. there
+ * is no sufficient memory for a non-preemptable
+ * reservation)
+ *
+ */
public interface PreemptableSpaceManager {
- public void freeSpace(Integer memoryToFree);
+ /**
+ * Releases the needed amount of space from
+ * pre-emptables reservations.
+ *
+ * This method is called when the {@link SlotManagement}
+ * doesn't find sufficient space for a non-preemptable
+ * reservation, so it tries to fulfill that request with
+ * space currently allocated to a non-preemptable workspace.
+ *
+ * This method should block until the process of
+ * releasing the needed memory is completed, what
+ * means that who is calling might assume that
+ * the needed space is already released by the end
+ * of this method's execution.
+ *
+ * @param memoryToRelease the minimum amount
+ * of memory that should be released from
+ * pre-emptable reservations. In case this value
+ * is higher than the amount of space currently
+ * managed by this {@link PreemptableSpaceManager},
+ * all the pre-emptable space currently allocated
+ * must be released.
+ *
+ */
+ public void releaseSpace(Integer memoryToRelease);
- public void start();
+
+ /**
+ * Initalizes this {@link PreemptableSpaceManager}.
+ *
+ * This method is called after the associated
+ * {@link SlotManagement} was initialized, indicating
+ * that the resource pool DB was already populated and
+ * can be queried
+ */
+ public void init();
}
View
14 service/service/java/source/src/org/globus/workspace/spotinstances/SIRequestUtils.java
@@ -62,20 +62,6 @@
return activeRequestsEqualPrice;
}
- public static List<SIRequest> filterOpenRequestsEqualPrice(
- Double price, Collection<SIRequest> allRequests) {
-
- List<SIRequest> inactiveRequestsEqualPrice = new ArrayList<SIRequest>();
-
- for (SIRequest siRequest : allRequests) {
- if(siRequest.getStatus().equals(SIRequestStatus.OPEN) && siRequest.getMaxBid().equals(price)){
- inactiveRequestsEqualPrice.add(siRequest);
- }
- }
-
- return inactiveRequestsEqualPrice;
- }
-
public static List<SIRequest> filterAliveRequestsAbovePrice(
Double currentPrice, Collection<SIRequest> allRequests) {
List<SIRequest> aliveRequestsAbovePrice = new ArrayList<SIRequest>();
View
43 service/service/java/source/src/org/globus/workspace/spotinstances/SpotInstancesHome.java
@@ -1,17 +1,60 @@
+/*
+ * Copyright 1999-2010 University of Chicago
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy
+ * of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
package org.globus.workspace.spotinstances;
import org.nimbustools.api.repr.Caller;
import org.nimbustools.api.services.rm.DoesNotExistException;
+/**
+ * Frontend interface that provides
+ * RETRIEVE and CANCEL operations to
+ * Spot Instance Requests
+ */
public interface SpotInstancesHome {
+ /**
+ * Cancels a Spot Instance request
+ * @param reqID the id of the request to be canceled
+ * @return the canceled request
+ * @throws DoesNotExistException in case the id argument does not map
+ * to any spot instance request
+ */
public SIRequest cancelRequest(String reqID) throws DoesNotExistException;
+ /**
+ * Retrieves a Spot Instance request and its related information
+ * @param id the id of the request to be retrieved
+ * @return the wanted request
+ * @throws DoesNotExistException in case the id argument does not map
+ * to any spot instance request
+ */
public SIRequest getRequest(String id) throws DoesNotExistException;
+ /**
+ * Retrieves all Spot Instance requests from a caller
+ * @param caller the owner of the Spot Instances' requests
+ * @return an array of spot instance requests from this caller
+ */
public SIRequest[] getRequests(Caller caller);
+ /**
+ * Retrieves current spot price
+ * @return current spot price
+ */
public Double getSpotPrice();
}
View
24 service/service/java/source/src/org/globus/workspace/spotinstances/SpotInstancesManager.java
@@ -1,10 +1,34 @@
+/*
+ * Copyright 1999-2010 University of Chicago
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy
+ * of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
package org.globus.workspace.spotinstances;
import org.globus.workspace.StateChangeInterested;
import org.globus.workspace.scheduler.defaults.PreemptableSpaceManager;
+/**
+ * Interface that represents
+ * a Spot Instances module
+ */
public interface SpotInstancesManager extends SpotInstancesHome, PreemptableSpaceManager, StateChangeInterested {
+ /**
+ * Adds a Spot Instances request
+ * to this module
+ * @param request the request to be added
+ */
public void addRequest(SIRequest request);
}
View
572 .../service/java/source/src/org/globus/workspace/spotinstances/SpotInstancesManagerImpl.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 1999-2010 University of Chicago
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy
+ * of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
package org.globus.workspace.spotinstances;
import java.util.ArrayList;
@@ -17,6 +32,8 @@
import org.globus.workspace.creation.InternalCreationManager;
import org.globus.workspace.persistence.PersistenceAdapter;
import org.globus.workspace.persistence.WorkspaceDatabaseException;
+import org.globus.workspace.scheduler.defaults.PreemptableSpaceManager;
+import org.globus.workspace.scheduler.defaults.SlotManagement;
import org.globus.workspace.service.InstanceResource;
import org.globus.workspace.service.WorkspaceGroupHome;
import org.globus.workspace.service.WorkspaceHome;
@@ -28,12 +45,13 @@
public class SpotInstancesManagerImpl implements SpotInstancesManager {
-
- private static String MACHINE_TYPE = SIConstants.SI_TYPE_BASIC;
- private static Integer INSTANCE_MEM = SIConstants.getInstanceMem(MACHINE_TYPE);
+ //TODO: Move this constants to a Spring configuration file
+ private static String MACHINE_TYPE = SIConstants.SI_TYPE_BASIC;
private static final Integer MINIMUM_RESERVED_MEMORY = 256;
private static final Double MAX_NON_PREEMP_UTILIZATION = 0.7;
+
+ private static Integer INSTANCE_MEM = SIConstants.getInstanceMem(MACHINE_TYPE);
private static final Log logger =
LogFactory.getLog(SpotInstancesManagerImpl.class.getName());
@@ -46,12 +64,12 @@
protected Double currentPrice;
-
- protected PersistenceAdapter persistence;
- protected Lager lager;
- protected InternalCreationManager creationManager;
+ protected final Lager lager;
+ protected final PersistenceAdapter persistence;
protected final WorkspaceHome home;
protected final WorkspaceGroupHome ghome;
+
+ protected InternalCreationManager creationManager;
public SpotInstancesManagerImpl(PersistenceAdapter persistenceAdapterImpl,
Lager lagerImpl,
@@ -85,17 +103,33 @@ public SpotInstancesManagerImpl(PersistenceAdapter persistenceAdapterImpl,
}
// -------------------------------------------------------------------------
- // Implements org.globus.workspace.spotinstances.SpotInstancesHome
- // -------------------------------------------------------------------------
+ // Implements org.globus.workspace.spotinstances.SpotInstancesManager
+ // -------------------------------------------------------------------------
+ /**
+ * Adds a Spot Instances request
+ * to this module
+ * @param request the request to be added
+ */
public void addRequest(SIRequest request){
allRequests.put(request.getId(), request);
if (this.lager.eventLog) {
logger.info(Lager.ev(-1) + "[Spot Instances] REQUEST ARRIVED: " + request.toString() + ". Changing price and reallocating requests.");
}
- changePriceAndReallocateRequests();
- }
+ changePriceAndAllocateRequests();
+ }
+
+ // -------------------------------------------------------------------------
+ // Implements org.globus.workspace.spotinstances.SpotInstancesHome
+ // -------------------------------------------------------------------------
+ /**
+ * Cancels a Spot Instance request
+ * @param reqID the id of the request to be canceled
+ * @return the canceled request
+ * @throws DoesNotExistException in case the id argument does not map
+ * to any spot instance request
+ */
public SIRequest cancelRequest(String reqID) throws DoesNotExistException {
logger.info(Lager.ev(-1) + "[Spot Instances] Cancelling request with id: " + reqID + ".");
SIRequest siRequest = getRequest(reqID, false);
@@ -105,26 +139,28 @@ public SIRequest cancelRequest(String reqID) throws DoesNotExistException {
if(prevStatus.isActive()){
preempt(siRequest, siRequest.getAllocatedInstances());
}
+
+ changePriceAndAllocateRequests();
return siRequest;
}
+ /**
+ * Retrieves a Spot Instance request and its related information
+ * @param id the id of the request to be retrieved
+ * @return the wanted request
+ * @throws DoesNotExistException in case the id argument does not map
+ * to any spot instance request
+ */
public SIRequest getRequest(String id) throws DoesNotExistException {
return this.getRequest(id, true);
}
- protected SIRequest getRequest(String id, boolean log) throws DoesNotExistException {
- if(log){
- logger.info(Lager.ev(-1) + "[Spot Instances] Retrieving request with id: " + id + ".");
- }
- SIRequest siRequest = allRequests.get(id);
- if(siRequest != null){
- return siRequest;
- } else {
- throw new DoesNotExistException("Spot instance request with id " + id + " does not exists.");
- }
- }
-
+ /**
+ * Retrieves all Spot Instance requests from a caller
+ * @param caller the owner of the Spot Instances' requests
+ * @return an array of spot instance requests from this caller
+ */
public SIRequest[] getRequests(Caller caller) {
logger.info(Lager.ev(-1) + "[Spot Instances] Retrieving requests from caller: " + caller.getIdentity() + ".");
ArrayList<SIRequest> requestsByCaller = new ArrayList<SIRequest>();
@@ -136,25 +172,186 @@ protected SIRequest getRequest(String id, boolean log) throws DoesNotExistExcept
return requestsByCaller.toArray(new SIRequest[0]);
}
+ /**
+ * Retrieves current spot price
+ * @return current spot price
+ */
public Double getSpotPrice() {
return this.currentPrice;
+ }
+
+ // -------------------------------------------------------------------------
+ // Implements org.globus.workspace.scheduler.defaults.PreemptableSpaceManager
+ // -------------------------------------------------------------------------
+
+ /**
+ * Initalizes this {@link PreemptableSpaceManager}.
+ *
+ * This method is called after the associated
+ * {@link SlotManagement} was initialized, indicating
+ * that the resource pool DB was already populated and
+ * can be queried
+ */
+ public void init() {
+ this.calculateMaximumInstances();
}
+
+ /**
+ * Releases the needed amount of space from
+ * pre-emptables reservations.
+ *
+ * This method is called when the {@link SlotManagement}
+ * doesn't find sufficient space for a non-preemptable
+ * reservation, so it tries to fulfill that request with
+ * space currently allocated to a non-preemptable workspace.
+ *
+ * This method should block until the process of
+ * releasing the needed memory is completed, what
+ * means that who is calling might assume that
+ * the needed space is already released by the end
+ * of this method's execution.
+ *
+ * @param memoryToRelease the minimum amount
+ * of memory that should be released from
+ * pre-emptable reservations. In case this value
+ * is higher than the amount of space currently
+ * managed by this {@link PreemptableSpaceManager},
+ * all the pre-emptable space currently allocated
+ * must be released.
+ *
+ */
+ public void releaseSpace(Integer memoryToFree) {
+ if (this.lager.eventLog) {
+ logger.info(Lager.ev(-1) + "[Spot Instances] " + memoryToFree +
+ "MB RAM have to be freed to give space to higher priority requests");
+ }
+
+ Integer usedMemory = maximumInstances*INSTANCE_MEM;
+
+ if(memoryToFree > usedMemory){
+ logger.warn(Lager.ev(-1) + "[Spot Instances] Spot Instances requests are consuming " + usedMemory +
+ "MB RAM , but SIManager was requested to free " + memoryToFree + "MB RAM. " +
+ "Freeing " + usedMemory + "MB RAM.");
+ memoryToFree = usedMemory;
+ }
- protected synchronized void changePriceAndReallocateRequests(){
+ Integer availableMemory = usedMemory - memoryToFree;
+
+ //Since available memory has decreased,
+ //this will cause lower bid workspaces
+ //to be pre-empted
+ changeMaximumInstances(availableMemory);
+ }
+
+ // -------------------------------------------------------------------------
+ // Implements org.globus.workspace.StateChangeInterested
+ // -------------------------------------------------------------------------
+
+ /**
+ * This notification allows modules to be autonomous
+ * from the service layer's actions if it wants to be (instead
+ * of allowing the resource states to progress, it could time
+ * every state transition by continually re-adjusting the
+ * resource's target state when it is time to transition it).
+ *
+ * The first state notification (always preceded by a call to
+ * schedule) signals that creation process has finished.
+ * This allows the service layer to finalize creation
+ * before a module (ie. scheduler) can act on a a resouce.
+ *
+ * @param vmid id
+ * @param state STATE_* in WorkspaceConstants
+ * @throws ManageException problem
+ */
+ public void stateNotification(int vmid, int state) throws ManageException {
+ if(state == WorkspaceConstants.STATE_DESTROYING){
+ SIRequest siRequest = this.getSIRequest(vmid);
+ if(siRequest != null){
+ if (this.lager.eventLog) {
+ logger.info(Lager.ev(-1) + "[Spot Instances] VM '" + vmid + "' from request '" + siRequest.getId() + "' finished.");
+ }
+
+ if(!siRequest.finishVM(vmid)){
+ if(siRequest.getAllocatedInstances().equals(0)){
+ allVMsFinished(siRequest);
+ }
+
+ //Will just change price and reallocate requests
+ //if this was not a pre-emption
+ this.changePriceAndAllocateRequests();
+ }
+
+ } else {
+ if (this.lager.eventLog) {
+ logger.info(Lager.ev(-1) + "[Spot Instances] A non-preemptable VM was destroyed. Recalculating maximum instances.");
+ }
+ this.calculateMaximumInstances();
+ }
+ }
+ }
+
+ /**
+ * Batch state notification
+ *
+ * NOTE: This version doesn't throw exception when
+ * an error occurs during the notification. If error
+ * conditions need to be treated, use
+ * {@code stateNotification(int vmid, int state)}
+ * instead. However, implementations of this
+ * interface are recommended to log possible errors.
+ *
+ * @param vmids ids of vms
+ * @param stateSTATE_* in WorkspaceConstants
+ */
+ public void stateNotification(int[] vmids, int state) {
+ //assume just non-preemptable VM's are being notified here
+ if(state == WorkspaceConstants.STATE_FIRST_LEGAL){
+ if (this.lager.eventLog) {
+ logger.info(Lager.ev(-1) + "[Spot Instances] " + vmids.length + " non-preemptable VMs created. Recalculating maximum instances.");
+ }
+ this.calculateMaximumInstances();
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // PRICE SETTING
+ // -------------------------------------------------------------------------
+
+ /**
+ * Updates the spot price, and
+ * allocate/preempts the requests
+ *
+ * This method is called every time the
+ * number of maximum instances,
+ * current requests or allocated instances
+ * change. This happens when:
+ * * A SI Request is added
+ * * The number of maximum instances changes
+ * * An SI instance is terminated
+ * * A SI Request is canceled
+ *
+ */
+ protected synchronized void changePriceAndAllocateRequests(){
changePrice();
- reallocateRequests();
+ allocateRequests();
if(maximumInstances == 0){
changePrice();
}
}
+ /**
+ * Invokes the associated PricingModel in order
+ * to calculate the next price (given current
+ * OPEN and ACTIVE requests), and changes the
+ * price in case the new price is different
+ */
private void changePrice() {
Double newPrice = pricingModel.getNextPrice(maximumInstances, getAliveRequests(), currentPrice);
if(!newPrice.equals(this.currentPrice)){
if (this.lager.eventLog) {
- logger.info(Lager.ev(-1) + "[Spot Instances] PRICE CHANGED. OLD PRICE = " + this.currentPrice + ". NEW PRICE = " + newPrice);
+ logger.info(Lager.ev(-1) + "[Spot Instances] Spot price has changed. Previous price = " + this.currentPrice + ". Current price = " + newPrice);
}
this.currentPrice = newPrice;
}
@@ -162,50 +359,46 @@ private void changePrice() {
// -------------------------------------------------------------------------
// ALLOCATION
- // -------------------------------------------------------------------------
+ // ------------------------------------------------------------------------
- protected void reallocateRequests() {
+ /**
+ * Performs a series of allocations
+ * and pre-emptions in order to satisfy
+ * Spot Price and Maximum Instances
+ * contraints
+ */
+ protected void allocateRequests() {
- preemptLowerBidRequests();
+ preemptActiveLowerBidRequests();
allocateEqualBidRequests();
allocateHigherBidRequests();
}
- private void preemptLowerBidRequests() {
-
- this.currentPrice = pricingModel.getNextPrice(maximumInstances, getAliveRequests(), currentPrice);
- if (this.lager.eventLog) {
- logger.info(Lager.ev(-1) + "[Spot Instances] PRE-EMPTING LOWER BID REQUESTS.");
- }
+ /**
+ * Preempts all ACTIVE requests that have bid
+ * below the current spot price
+ */
+ private void preemptActiveLowerBidRequests() {
Collection<SIRequest> inelegibleRequests = getLowerBidActiveRequests();
+
+ if(!inelegibleRequests.isEmpty() && this.lager.eventLog){
+ logger.info(Lager.ev(-1) + "[Spot Instances] Pre-empting " +
+ inelegibleRequests.size() + " lower bid requests.");
+ }
+
for (SIRequest inelegibleRequest : inelegibleRequests) {
preempt(inelegibleRequest, inelegibleRequest.getAllocatedInstances());
}
}
- private void allocateHigherBidRequests() {
-
- if (this.lager.eventLog) {
- logger.info(Lager.ev(-1) + "[Spot Instances] ALLOCATING HIGHER BID REQUESTS.");
- }
-
- Collection<SIRequest> aliveRequests = getHigherBidAliveRequests();
-
- for (SIRequest aliveRequest : aliveRequests) {
- if(aliveRequest.needsMoreInstances()){
- allocate(aliveRequest, aliveRequest.getUnallocatedInstances());
- }
- }
- }
-
- private void allocateEqualBidRequests() {
-
- if (this.lager.eventLog) {
- logger.info(Lager.ev(-1) + "[Spot Instances] ALLOCATING EQUAL BID REQUESTS.");
- }
+ /**
+ * Allocates requests that have bid equal to current spot price,
+ * if there are available instances, or pre-empt them otherwise
+ */
+ private void allocateEqualBidRequests() {
Integer greaterBidVMs = getGreaterBidInstancesCount();
@@ -220,16 +413,38 @@ private void allocateEqualBidRequests() {
if(allocatedVMs <= availableInstances){
availableInstances -= allocatedVMs;
+ allocateEvenly(availableInstances);
} else {
Integer needToPreempt = allocatedVMs - availableInstances;
if (this.lager.eventLog) {
- logger.info(Lager.ev(-1) + "[Spot Instances] No more VMs for equal bid requests. Pre-empting " + needToPreempt + " VMs.");
+ logger.info(Lager.ev(-1) + "[Spot Instances] No more resources for equal bid requests. " +
+ "Pre-empting " + needToPreempt + " VMs.");
}
preemptProportionaly(activeRequests, needToPreempt, allocatedVMs);
- return;
}
-
- allocateEvenly(availableInstances);
+ }
+
+ /**
+ * Allocates all requests that have bid
+ * above the current spot price
+ */
+ private void allocateHigherBidRequests() {
+
+ Collection<SIRequest> aliveRequests = getHigherBidAliveRequests();
+
+ int count = 0;
+
+ for (SIRequest aliveRequest : aliveRequests) {
+ if(aliveRequest.needsMoreInstances()){
+ allocate(aliveRequest, aliveRequest.getUnallocatedInstances());
+ count++;
+ }
+ }
+
+ if(count > 0 && this.lager.eventLog){
+ logger.info(Lager.ev(-1) + "[Spot Instances] Allocated " +
+ count + " higher bid requests.");
+ }
}
/**
@@ -246,6 +461,14 @@ private void allocateEvenly(Integer availableInstances) {
List<SIRequest> hungryRequests = getEqualBidHungryRequests();
Collections.sort(hungryRequests, getAllocationComparator());
+ if(hungryRequests.isEmpty()){
+ return;
+ } else {
+ if (this.lager.eventLog) {
+ logger.info(Lager.ev(-1) + "[Spot Instances] Allocating " + hungryRequests.size() + "equal bid requests.");
+ }
+ }
+
Map<SIRequest, Integer> allocations = new HashMap<SIRequest, Integer>();
for (SIRequest hungryRequest : hungryRequests) {
allocations.put(hungryRequest, 0);
@@ -326,7 +549,7 @@ private void preemptProportionaly(List<SIRequest> activeRequests, Integer needTo
//This may never happen. But just in case.
if(stillToPreempt > 0){
- logger.error("Unable to pre-empt VMs proportionally. Still " + stillToPreempt +
+ logger.warn("Unable to pre-empt VMs proportionally. Still " + stillToPreempt +
" VMs to pre-empt. Pre-empting best-effort.");
iterator = activeRequests.iterator();
@@ -346,6 +569,13 @@ private void preemptProportionaly(List<SIRequest> activeRequests, Integer needTo
}
}
+ /**
+ * Creates a SIRequest comparator that
+ * prioritizes recent requests with more
+ * allocated instances to be pre-empted
+ * first
+ * @return the generated comparator
+ */
private Comparator<SIRequest> getPreemptionComparator() {
return new Comparator<SIRequest>() {
@@ -365,6 +595,13 @@ public int compare(SIRequest o1, SIRequest o2) {
};
}
+ /**
+ * Creates a SIRequest comparator that
+ * prioritizes older requests with less
+ * allocated instances to be allocated
+ * first
+ * @return the generated comparator
+ */
private Comparator<SIRequest> getAllocationComparator() {
return new Comparator<SIRequest>() {
@@ -384,6 +621,12 @@ public int compare(SIRequest o1, SIRequest o2) {
};
}
+ /**
+ * Preempts (ie. destroys) the desired quantity
+ * of VMs from a given request
+ * @param siRequest the request to be pre-empted
+ * @param quantity the quantity to be pre-empted
+ */
protected void preempt(SIRequest siRequest, int quantity) {
if(siRequest.getAllocatedInstances() == quantity){
@@ -422,6 +665,11 @@ protected void preempt(SIRequest siRequest, int quantity) {
}
}
+ /**
+ * Trigger a status change after
+ * all VMs from a given request are finished
+ * @param siRequest
+ */
private void allVMsFinished(SIRequest siRequest){
if(!siRequest.isPersistent() && (!siRequest.needsMoreInstances() || currentPrice > siRequest.getMaxBid())){
changeStatus(siRequest, SIRequestStatus.CLOSED);
@@ -430,6 +678,14 @@ private void allVMsFinished(SIRequest siRequest){
}
}
+ /**
+ * Changes the status of a given request to FAILED,
+ * and sets the cause of the problem
+ * @param action the action that caused the request to fail (log purposes)
+ * @param siRequest the request that has failed
+ * @param errorStr the error message
+ * @param problem the problem that caused the request to fail
+ */
private void failRequest(String action, SIRequest siRequest, String errorStr, Throwable problem) {
logger.warn(Lager.ev(-1) + "[Spot Instances] Error while " + action + " VMs for request: " +
siRequest.getId() + ". Setting state to FAILED. Problem: " +
@@ -440,6 +696,12 @@ private void failRequest(String action, SIRequest siRequest, String errorStr, Th
}
}
+ /**
+ * Allocates the desired quantity
+ * of VMs to a given request
+ * @param siRequest the request to be pre-empted
+ * @param quantity the quantity to be pre-empted
+ */
protected void allocate(SIRequest siRequest, Integer quantity) {
if(quantity < 1){
@@ -475,6 +737,11 @@ protected void allocate(SIRequest siRequest, Integer quantity) {
}
}
+ /**
+ * Changes the status of a Spot Instance request
+ * @param siRequest the request that will change status
+ * @param newStatus the new status
+ */
private void changeStatus(SIRequest siRequest, SIRequestStatus newStatus) {
SIRequestStatus oldStatus = siRequest.getStatus();
boolean changed = siRequest.setStatus(newStatus);
@@ -487,10 +754,27 @@ private void changeStatus(SIRequest siRequest, SIRequestStatus newStatus) {
// DEFINE SPOT INSTANCES CAPACITY
// -------------------------------------------------------------------------
+ /**
+ * Calculates the maximum number of instances
+ * the Spot Instances module can allocate
+ *
+ * The amount of memory available for SI requests
+ * will depend on the reserved available capacity
+ * for non-preemptable reservations, that is based
+ * on non-preemptable resources' utilization.
+ * For this reason, every time the utilization of
+ * non-preemptable resources change this method
+ * must be called:
+ *
+ * * Initialization
+ * * Creation of non-preemptable VMs
+ * * Destructions of non-preemptable VMs
+ *
+ */
protected synchronized void calculateMaximumInstances() {
if (this.lager.eventLog) {
- logger.info(Lager.ev(-1) + "[Spot Instances] Calculating available SI instances..");
+ logger.info(Lager.ev(-1) + "[Spot Instances] Calculating maximum SI instances..");
}
Integer siMem;
@@ -501,22 +785,14 @@ protected synchronized void calculateMaximumInstances() {
Integer usedNonPreemptableMem = persistence.getUsedNonPreemptableMemory();
//Formula derived from maximum_utilization = usedNonPreemptable
- // ---------------------------------
+ // -----------------------------------------
// usedNonPreemptable + reservedNonPreempMem
Integer reservedNonPreempMem = (int)Math.round((1-MAX_NON_PREEMP_UTILIZATION)*usedNonPreemptableMem/MAX_NON_PREEMP_UTILIZATION);
reservedNonPreempMem = Math.max(reservedNonPreempMem, MINIMUM_RESERVED_MEMORY);
- if(availableMem >= reservedNonPreempMem){
- siMem = (availableMem - reservedNonPreempMem) + usedPreemptableMem;
- } else {
- if (this.lager.eventLog) {
- logger.info(Lager.ev(-1) + "[Spot Instances] Not enough available memory for Spot Instances. Trying to satisfy currently active requests.");
- }
- siMem = Math.max((availableMem+usedPreemptableMem)-reservedNonPreempMem, 0);
- }
+ siMem = Math.max((availableMem+usedPreemptableMem)-reservedNonPreempMem, 0);
if (this.lager.eventLog) {
- logger.info(Lager.ev(-1) + "[Spot Instances] REAL available site memory: " + persistence.getTotalAvailableMemory(1) + "MB");
logger.info(Lager.ev(-1) + "[Spot Instances] Available site memory: " + availableMem + "MB");
logger.info(Lager.ev(-1) + "[Spot Instances] Used non pre-emptable memory: " + usedNonPreemptableMem + "MB");
logger.info(Lager.ev(-1) + "[Spot Instances] Reserved non pre-emptable memory: " + reservedNonPreempMem + "MB");
@@ -529,11 +805,17 @@ protected synchronized void calculateMaximumInstances() {
return;
}
- //TODO
-
changeMaximumInstances(siMem);
}
+ /**
+ * Changes the maximum allowed number of SI instances.
+ * In case the maximum number changes, the
+ * {@code changePriceAndAllocateRequests()} method
+ * is called
+ * @param availableMemory the new amount of memory
+ * available for SI requests
+ */
protected void changeMaximumInstances(Integer availableMemory){
if(availableMemory == null || availableMemory < 0){
@@ -542,110 +824,103 @@ protected void changeMaximumInstances(Integer availableMemory){
Integer newMaxInstances = availableMemory/INSTANCE_MEM;
+ //TODO Also take available network associations
+ // into account
+
if(newMaxInstances != maximumInstances){
if (this.lager.eventLog) {
logger.info(Lager.ev(-1) + "[Spot Instances] Maximum instances changed. Previous maximum instances = " + maximumInstances + ". Current maximum instances = " + newMaxInstances);
}
this.maximumInstances = newMaxInstances;
- changePriceAndReallocateRequests();
+ changePriceAndAllocateRequests();
}
}
// -------------------------------------------------------------------------
- // Implements org.globus.workspace.StateChangeInterested
- // -------------------------------------------------------------------------
-
- public void stateNotification(int vmid, int state) throws ManageException {
- if(state == WorkspaceConstants.STATE_DESTROYING){
- SIRequest siRequest = this.getSIRequest(vmid);
- if(siRequest != null){
- if (this.lager.eventLog) {
- logger.info(Lager.ev(-1) + "[Spot Instances] VM '" + vmid + "' from request '" + siRequest.getId() + "' finished.");
- }
-
- if(!siRequest.finishVM(vmid)){
- if(siRequest.getAllocatedInstances().equals(0)){
- allVMsFinished(siRequest);
- }
- //Will just change price and reallocate requests
- //if this was not a pre-emption
- this.changePriceAndReallocateRequests();
- }
-
- } else {
- if (this.lager.eventLog) {
- logger.info(Lager.ev(-1) + "[Spot Instances] A non-preemptable VM was destroyed. Recalculating maximum instances.");
- }
- this.calculateMaximumInstances();
- }
- }
- }
+ // UTILS - Candidates for moving down to SQL
+ // -------------------------------------------------------------------------
- public void stateNotification(int[] vmids, int state) {
- //assume just non-preemptable VM's are being notified here
- if(state == WorkspaceConstants.STATE_FIRST_LEGAL){
- if (this.lager.eventLog) {
- logger.info(Lager.ev(-1) + "[Spot Instances] " + vmids.length + " non-preemptable VMs created. Recalculating maximum instances.");
- }
- this.calculateMaximumInstances();
+ /**
+ * Retrieves a Spot Instance request and its related information
+ * @param id the id of the request to be retrieved
+ * @param log wether the retrieval is logged or not
+ * @return the wanted request
+ * @throws DoesNotExistException in case the id argument does not map
+ * to any spot instance request
+ */
+ protected SIRequest getRequest(String id, boolean log) throws DoesNotExistException {
+ if(log){
+ logger.info(Lager.ev(-1) + "[Spot Instances] Retrieving request with id: " + id + ".");
+ }
+ SIRequest siRequest = allRequests.get(id);
+ if(siRequest != null){
+ return siRequest;
+ } else {
+ throw new DoesNotExistException("Spot instance request with id " + id + " does not exists.");
}
- }
-
- // -------------------------------------------------------------------------
- // Implements org.globus.workspace.scheduler.defaults.PreemptableSpaceManager
- // -------------------------------------------------------------------------
-
- public void start() {
- this.calculateMaximumInstances();
}
- public void freeSpace(Integer memoryToFree) {
- Integer usedMemory = maximumInstances*INSTANCE_MEM;
-
- if (this.lager.eventLog) {
- logger.info(Lager.ev(-1) + "[Spot Instances] " + memoryToFree +
- "MB RAM have to be freed to give space to higher priority requests");
- }
-
- if(memoryToFree > usedMemory){
- logger.warn(Lager.ev(-1) + "[Spot Instances] Spot Instances requests are consuming " + usedMemory +
- "MB RAM , but SIManager was requested to free " + memoryToFree + "MB RAM. " +
- "Freeing " + usedMemory + "MB RAM.");
- memoryToFree = usedMemory;
+ /**
+ * Retrieves the Spot Instance request associated with
+ * this Virtual Machine ID
+ * @param vmid the id of the vm
+ * @return the request that has this VM allocated
+ */
+ protected SIRequest getSIRequest(int vmid) {
+ for (SIRequest request : allRequests.values()) {
+ if(request.isAllocatedVM(vmid)){
+ return request;
+ }
}
-
- Integer availableMemory = usedMemory - memoryToFree;
- changeMaximumInstances(availableMemory);
+
+ return null;
}
- // -------------------------------------------------------------------------
- // UTILS
- // -------------------------------------------------------------------------
-
- private List<SIRequest> getEqualBidHungryRequests() {
+ /**
+ * Retrieves OPEN or ACTIVE equal bid requests that
+ * still needs more instances
+ * @return list of equal bid hungry alive requests
+ */
+ protected List<SIRequest> getEqualBidHungryRequests() {
return SIRequestUtils.filterHungryAliveRequestsEqualPrice(this.currentPrice, this.allRequests.values());
}
+ /**
+ * Retrieves ACTIVE lower bid requests
+ * @return list of lower bid active requests
+ */
protected Collection<SIRequest> getLowerBidActiveRequests() {
return SIRequestUtils.filterActiveRequestsBelowPrice(this.currentPrice, this.allRequests.values());
}
+ /**
+ * Retrieves ACTIVE equal bid requests
+ * @return list of equal bid active requests
+ */
protected List<SIRequest> getEqualBidActiveRequests(){
return SIRequestUtils.filterActiveRequestsEqualPrice(this.currentPrice, this.allRequests.values());
- }
-
- protected Collection<SIRequest> getEqualBidOpenRequests() {
- return SIRequestUtils.filterOpenRequestsEqualPrice(this.currentPrice, this.allRequests.values());
- }
+ }
+ /**
+ * Retrieves OPEN or ACTIVE higher bid requests
+ * @return list of higher bid active alive requests
+ */
protected Collection<SIRequest> getHigherBidAliveRequests() {
return SIRequestUtils.filterAliveRequestsAbovePrice(this.currentPrice, this.allRequests.values());
}
+ /**
+ * Retrieves OPEN or ACTIVE requests
+ * @return list of alive requests
+ */
protected Collection<SIRequest> getAliveRequests() {
return SIRequestUtils.filterAliveRequests(this.allRequests.values());
}
+ /**
+ * Retrieves the number of needed VMs by greater bid requests
+ * @return number of needed VMs
+ */
protected Integer getGreaterBidInstancesCount() {
Collection<SIRequest> priorityRequests = getHigherBidAliveRequests();
@@ -658,19 +933,10 @@ protected Integer getGreaterBidInstancesCount() {
return instanceCount;
}
- protected SIRequest getSIRequest(int vmid) {
- for (SIRequest request : allRequests.values()) {
- if(request.isAllocatedVM(vmid)){
- return request;
- }
- }
-
- return null;
- }
-
// -------------------------------------------------------------------------
// MODULE SET (avoids circular dependency problem)
// -------------------------------------------------------------------------
+
public void setCreationManager(InternalCreationManager creationManagerImpl) {
if (creationManagerImpl == null) {
throw new IllegalArgumentException("creationManagerImpl may not be null");
Please sign in to comment.
Something went wrong with that request. Please try again.