Permalink
Browse files

New transaction maid implementation

  • Loading branch information...
timf committed Feb 13, 2013
1 parent 4d73b68 commit 5721a2aad05c49033dda8f2e7f053e5c1ec7d144
Showing with 114 additions and 68 deletions.
  1. +1 −1 pom.xml
  2. +113 −67 src/main/java/org/dasein/persist/Transaction.java
View
@@ -102,7 +102,7 @@
<dependency>
<groupId>org.dasein</groupId>
<artifactId>dasein-util</artifactId>
- <version>2012.08</version>
+ <version>2013.02</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
@@ -21,16 +21,17 @@
package org.dasein.persist;
// J2SE imports
+import java.io.InputStream;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
-import java.util.ArrayList;
import java.util.Date;
import java.util.EmptyStackException;
import java.util.HashMap;
import java.util.Map;
+import java.util.Properties;
import java.util.Stack;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -43,6 +44,7 @@
// Apache imports
import org.apache.log4j.Logger;
+import org.dasein.util.DaseinUtilTasks;
/**
* Represents a database transaction to applications managing their
@@ -81,59 +83,66 @@
static private final AtomicBoolean maidLaunched = new AtomicBoolean(false);
- static private final boolean tracking = true;
+ static private final Properties properties = new Properties();
+
+ static public final String MAID_DISABLED = "dasein.persist.maid.disabled";
+ static public final String MAID_FREQUENCY = "dasein.persist.maid.frequency";
+ static public final String MAID_MAXSECONDS = "dasein.persist.maid.maxseconds";
+ static public final String MAID_WARNSECONDS = "dasein.persist.maid.warnseconds";
+
+ static private final boolean tracking;
+ static {
+ loadProperties();
+ tracking = !isMaidDisabled();
+ }
/**
- * Cleans up transactions that somehow never got cleaned up.
+ * Clean up transactions.
*/
- static private void clean() {
- while( true ) {
- ArrayList<Transaction> closing;
- int count;
-
- if( logger.isInfoEnabled() ) {
- logger.info("There are " + connections.get() + " open connections right now (high point: " + highPoint.get() + ").");
- }
- try { Thread.sleep(1000L); }
- catch( InterruptedException ignore ) { /* ignore me */ }
- count = transactions.size();
- if( count < 1 ) {
- continue;
- }
- if( logger.isInfoEnabled() ) {
- logger.info("Running the maid service on " + count + " transactions.");
+ private static class TransactionMaid implements Runnable {
+ @Override
+ public void run() {
+ int cycleCount = 0;
+ long warn = getMaidWarnMs();
+ long max = getMaidMaxMs();
+ long freq = getMaidFrequencyMs();
+ while (true) {
+ if (cycleCount % 21 == 20) {
+ warn = getMaidWarnMs();
+ max = getMaidMaxMs();
+ freq = getMaidFrequencyMs();
+ }
+ try {
+ Thread.sleep(freq);
+ _clean(warn, max);
+ } catch (Throwable t) {
+ logger.error("Problem running transaction maid: " + t.getMessage(), t);
+ } finally {
+ cycleCount += 1;
+ }
}
- closing = new ArrayList<Transaction>();
- long now = System.currentTimeMillis();
+ }
+ private static void _clean(long warnMs, long maxMs) {
+ if (transactions.isEmpty()) {
+ return;
+ }
+ final long now = System.currentTimeMillis();
// ConcurrentHashMap provides a weakly-consistent view for this iterator
- for( Transaction xaction : transactions.values() ) {
- long diff = (now - xaction.openTime)/1000L;
-
- if( diff > 10 ) {
- Thread t = xaction.executionThread;
-
- logger.warn("Open transaction " + xaction.transactionId + " has been open for " + diff + " seconds.");
- logger.warn("Transaction " + xaction.transactionId + " state: " + xaction.state);
- if( t== null ) {
- logger.warn("Thread: no execution thread active");
- }
- else {
- logger.warn("Thread " + t.getName() + " (" + t.getState() + "):");
- for( StackTraceElement elem : t.getStackTrace() ) {
- logger.warn("\t" + elem.toString());
- }
+ for (Transaction xaction : transactions.values()) {
+ final long diff = now - xaction.openTime;
+ if (diff > maxMs) {
+ logger.error("Transaction " + xaction.transactionId + " has been open for " + diff/1000L + " seconds, forcing a close: " + xaction.state);
+ try {
+ xaction.rollback(true);
+ xaction.close();
+ } catch (Throwable t) {
+ logger.error("Problem rolling back offending transaction " + xaction.transactionId + ": " + t.getMessage());
}
- }
- if( diff > 600 ) {
- closing.add(xaction);
+ } else if (diff > warnMs) {
+ logger.warn("Transaction " + xaction.transactionId + " has been open for " + diff/1000L + " seconds.");
}
}
- for( Transaction xaction: closing ) {
- logger.warn("Encountered a stale transaction (" + xaction.transactionId + "), forcing a close: " + xaction.state);
- xaction.logStackTrace();
- xaction.close();
- }
}
}
@@ -147,34 +156,29 @@ static public Transaction getInstance() {
}
static public Transaction getInstance(boolean readOnly) {
- Transaction xaction;
final int xid = nextTransactionId.incrementAndGet();
- if( xid == Integer.MAX_VALUE ) {
+ if( xid == Integer.MAX_VALUE ) { // contention here will cause a few screwy values
nextTransactionId.set(0);
}
- xaction = new Transaction(xid, readOnly);
+ Transaction xaction = new Transaction(xid, readOnly);
if (maidLaunched.compareAndSet(false, true)) {
- final Thread maid = new Thread() {
- public void run() {
- clean();
- }
- };
- maid.setName("Transaction Maid");
- maid.setDaemon(true);
- maid.setPriority(Thread.MIN_PRIORITY + 1);
- maid.start();
+ loadProperties();
+ if (!isMaidDisabled()) {
+ DaseinUtilTasks.submit(new TransactionMaid());
+ }
}
return xaction;
}
static public void report() {
MemoryMXBean bean = ManagementFactory.getMemoryMXBean();
StringBuilder sb = new StringBuilder();
- sb.append("Dasein Connection Report (" + new Date() + "):");
- sb.append("Open connections: " + connections);
- sb.append(", High connections: " + highPoint);
- sb.append(", Transaction cache size: " + transactions.size());
- sb.append(", Event cache size: " + eventCache.size());
+ sb.append("Dasein Connection Report (" + new Date() + "): ");
+ if (tracking) {
+ sb.append("Open connections: " + connections.get() + ", ");
+ sb.append("Transaction cache size: " + transactions.size() + ", ");
+ }
+ sb.append("Event cache size: " + eventCache.size());
sb.append(", Heap memory usage: " + bean.getHeapMemoryUsage());
sb.append(", Non-heap memory usage: " + bean.getNonHeapMemoryUsage());
sb.append(", Free memory: " + (Runtime.getRuntime().freeMemory() / 1024000L) + "MB");
@@ -192,7 +196,6 @@ static public void report() {
*/
private volatile boolean dirty = false;
- private volatile Thread executionThread = null;
/**
* A stack of events that are part of this transaction.
*/
@@ -325,7 +328,6 @@ public void commit() throws PersistenceException {
StringBuilder holder = new StringBuilder();
boolean success = false;
- executionThread = Thread.currentThread();
state = "PREPARING";
try {
Execution event = getEvent(cls);
@@ -397,7 +399,6 @@ public void commit() throws PersistenceException {
logger.warn("FAILED TRANSACTION (" + transactionId + "): " + holder.toString());
rollback();
}
- executionThread = null;
}
}
finally {
@@ -409,7 +410,6 @@ public void commit() throws PersistenceException {
StringBuilder holder = new StringBuilder();
boolean success = false;
- executionThread = Thread.currentThread();
state = "PREPARING";
try {
Map<String,Object> res;
@@ -454,7 +454,6 @@ public void commit() throws PersistenceException {
logger.warn("FAILED TRANSACTION (" + transactionId + "): " + holder.toString());
rollback();
}
- executionThread = null;
}
}
finally {
@@ -687,4 +686,51 @@ protected void rollback(boolean cancelStatements) {
finally {
}
}
+
+ static void loadProperties() {
+ try {
+ InputStream is = DaseinSequencer.class.getResourceAsStream(DaseinSequencer.PROPERTIES);
+ try {
+ if( is != null ) {
+ properties.load(is);
+ }
+ } finally {
+ if( is != null ) {
+ is.close();
+ }
+ }
+ } catch (Throwable t) {
+ logger.error("Problem loading dasein persist transaction properties: " + t.getMessage());
+ }
+ }
+
+
+ static boolean isMaidDisabled() {
+ return properties.containsKey(MAID_DISABLED) && properties.getProperty(MAID_DISABLED).equalsIgnoreCase("true");
+ }
+
+ static private long getMsFromSecondsProperty(String propKey, int defaultSeconds) {
+ String secondsStr = properties.getProperty(propKey);
+ if (secondsStr != null) {
+ try {
+ int seconds = Integer.parseInt(secondsStr);
+ return (long)seconds * 1000L;
+ } catch (NumberFormatException ignore) {
+ logger.error("Value for '" + propKey + "' is not an integer, using default: " + defaultSeconds);
+ }
+ }
+ return (long)defaultSeconds * 1000L;
+ }
+
+ static long getMaidFrequencyMs() {
+ return getMsFromSecondsProperty(MAID_FREQUENCY, 5);
+ }
+
+ static long getMaidWarnMs() {
+ return getMsFromSecondsProperty(MAID_WARNSECONDS, 10);
+ }
+
+ static long getMaidMaxMs() {
+ return getMsFromSecondsProperty(MAID_MAXSECONDS, 60);
+ }
}

0 comments on commit 5721a2a

Please sign in to comment.