Skip to content

Commit

Permalink
tweak to TransactionalGraph API. Methods now have better names and ad…
Browse files Browse the repository at this point in the history
…ded TransactionalGraph.getCurrentBufferSize()
  • Loading branch information
okram committed Sep 1, 2011
1 parent 417e2f7 commit efbc6ce
Show file tree
Hide file tree
Showing 10 changed files with 88 additions and 54 deletions.
Expand Up @@ -24,30 +24,38 @@ public enum Conclusion {
}

/**
* Transactions in a transactional graph can either be handled automatically when provided a bufferSize > 0.
* If the graph is automatically handling transactions, then every X mutations to the graph will committed, where X is the bufferSize.
* A mutation is atomic up to the write methods of graph/element/index.
* Transactions in a transactional graph can be handled automatically when then transaction buffer size is greater than 0.
* If the graph is automatically handling transactions, then every X mutations to the graph, the mutations will be committed, where X is the provided bufferSize.
* If the graph has a bufferSize of 0, then the user is responsible for starting and stopping transactions.
*
* @param bufferSize 0 for manual transactions and > 0 for automatic transaction handling
* @param bufferSize 0 for manual transaction handling and > 0 for automatic transaction handling
*/
public void setTransactionBuffer(int bufferSize);
public void setMaxBufferSize(int bufferSize);

/**
* Returns the size of the transaction buffer.
* Returns the maximum size of the transaction buffer.
*
* @return the transaction buffer size
* @return the maximum transaction buffer size
*/
public int getTransactionBuffer();
public int getMaxBufferSize();

/**
* Returns the current size of the transaction buffer.
*
* @return the current size of the transaction buffer
*/
public int getCurrentBufferSize();

/**
* Start a transaction in order to manipulate the graph.
* This is required for graph manipulations in manual transaction mode.
*
* @throws RuntimeException If a transaction is already in progress, then a RuntimeException of "nested transaction" is thrown.
*/
public void startTransaction();

/**
* Stop the current transaction. This is possible only in manual transaction mode.
* Stop the current transaction. If the current buffer still has active mutations, then they are committed.
* Specify whether the transaction was successful or not.
* A failing transaction will rollback all updates to before the transaction was started.
*
Expand Down
Expand Up @@ -126,7 +126,7 @@ public GraphSail(final IndexableGraph graph, final String indexedPatterns) {
store.edges = graph.getIndex(Index.EDGES, Edge.class);
store.values = getOrCreateValuesIndex(graph);

store.manualTransactions = store.graph instanceof TransactionalGraph && 0 == ((TransactionalGraph) store.graph).getTransactionBuffer();
store.manualTransactions = store.graph instanceof TransactionalGraph && 0 == ((TransactionalGraph) store.graph).getMaxBufferSize();

store.namespaces = store.getReferenceVertex();
if (null == store.namespaces) {
Expand Down
Expand Up @@ -12,7 +12,7 @@ protected IndexableGraph createGraph() {

Neo4jGraph g = new Neo4jGraph(directory);
g.clear();
g.setTransactionBuffer(0);
g.setMaxBufferSize(0);
return g;
}

Expand Down
Expand Up @@ -29,7 +29,7 @@ protected IndexableGraph createGraph() {


OrientGraph g = new OrientGraph("local:" + directory + "/graph");
g.setTransactionBuffer(0);
g.setMaxBufferSize(0);
return g;
}

Expand Down
Expand Up @@ -310,11 +310,15 @@ public void stopTransaction(final Conclusion conclusion) {
txCounter.set(0);
}

public int getTransactionBuffer() {
public int getMaxBufferSize() {
return txBuffer.get();
}

public void setTransactionBuffer(final int size) {
public int getCurrentBufferSize() {
return txCounter.get();
}

public void setMaxBufferSize(final int size) {
if (null != tx.get()) {
tx.get().success();
tx.get().finish();
Expand Down Expand Up @@ -357,6 +361,7 @@ protected void autoStartTransaction() {

protected void autoStopTransaction(final Conclusion conclusion) {
if (this.txBuffer.get() > 0) {
txCounter.set(txCounter.get() + 1);
if (conclusion == Conclusion.FAILURE) {
tx.get().failure();
tx.get().finish();
Expand All @@ -367,8 +372,6 @@ protected void autoStopTransaction(final Conclusion conclusion) {
tx.get().finish();
tx.remove();
txCounter.set(0);
} else {
txCounter.set(txCounter.get() + 1);
}
}
}
Expand Down
Expand Up @@ -349,15 +349,19 @@ public void stopTransaction(final Conclusion conclusion) {
this.getRawGraph().commit();
}

public void setTransactionBuffer(final int bufferSize) {
public void setMaxBufferSize(final int bufferSize) {
getRawGraph().commit();
getContext(true).txBuffer = bufferSize;
}

public int getTransactionBuffer() {
public int getMaxBufferSize() {
return getContext(true).txBuffer;
}

public int getCurrentBufferSize() {
return getContext(true).txCounter;
}

protected void saveIndexConfiguration() {
getRawGraph().getMetadata().getIndexManager().getConfiguration().save();
}
Expand All @@ -375,14 +379,13 @@ protected boolean autoStartTransaction() {

protected void autoStopTransaction(final Conclusion conclusion) {
if (getContext(true).txBuffer > 0) {
getContext(true).txCounter = getContext(true).txCounter + 1;
final OGraphDatabase db = getRawGraph();
if (getContext(true).txBuffer == 0 || (getContext(true).txCounter % getContext(true).txBuffer == 0)) {
if (conclusion == Conclusion.SUCCESS)
db.commit();
else
db.rollback();
} else {
getContext(true).txCounter = getContext(true).txCounter + 1;
}
}
}
Expand Down
Expand Up @@ -392,6 +392,7 @@ public void startTransaction() {
if (inTransaction.get())
throw new RuntimeException(TransactionalGraph.NESTED_MESSAGE);
inTransaction.set(Boolean.TRUE);
this.txCounter.set(0);
}

public void stopTransaction(final Conclusion conclusion) {
Expand All @@ -402,6 +403,7 @@ public void stopTransaction(final Conclusion conclusion) {
} else {
this.sailConnection.get().rollback();
}
this.txCounter.set(0);
} catch (SailException e) {
throw new RuntimeException(e.getMessage(), e);
}
Expand All @@ -410,6 +412,7 @@ public void stopTransaction(final Conclusion conclusion) {
protected void autoStopTransaction(final Conclusion conclusion) {
if (txBuffer.get() > 0) {
try {
txCounter.set(txCounter.get() + 1);
if (conclusion == Conclusion.FAILURE) {
this.sailConnection.get().commit();
txCounter.set(0);
Expand All @@ -418,16 +421,14 @@ protected void autoStopTransaction(final Conclusion conclusion) {
this.sailConnection.get().commit();
txCounter.set(0);
inTransaction.set(Boolean.FALSE);
} else {
txCounter.set(txCounter.get() + 1);
}
} catch (SailException e) {
throw new RuntimeException(e.getMessage(), e);
}
}
}

public void setTransactionBuffer(final int bufferSize) {
public void setMaxBufferSize(final int bufferSize) {
try {
this.sailConnection.get().commit();
} catch (SailException e) {
Expand All @@ -436,10 +437,14 @@ public void setTransactionBuffer(final int bufferSize) {
this.txBuffer.set(bufferSize);
}

public int getTransactionBuffer() {
public int getMaxBufferSize() {
return this.txBuffer.get();
}

public int getCurrentBufferSize() {
return this.txCounter.get();
}


public String toString() {
String type = this.rawGraph.getClass().getSimpleName().toLowerCase();
Expand Down
@@ -1,5 +1,6 @@
package com.tinkerpop.blueprints.pgm;

import com.tinkerpop.blueprints.BaseTest;
import com.tinkerpop.blueprints.pgm.impls.GraphTest;

/**
Expand Down Expand Up @@ -144,4 +145,21 @@ public void testCloseableSequence() {
graph.shutdown();
}

public void testNoConcurrentModificationException() {
if (graphTest.supportsEdgeIndex) {
IndexableGraph graph = (IndexableGraph) graphTest.getGraphInstance();
for (int i = 0; i < 25; i++) {
graph.addEdge(null, graph.addVertex(null), graph.addVertex(null), "test");
}
assertEquals(BaseTest.count(graph.getVertices()), 50);
assertEquals(BaseTest.count(graph.getEdges()), 25);
for (final Edge edge : graph.getIndex(Index.EDGES, Edge.class).get("label", "test")) {
graph.removeEdge(edge);
}
assertEquals(BaseTest.count(graph.getVertices()), 50);
assertEquals(BaseTest.count(graph.getEdges()), 0);
graph.shutdown();
}
}

}
@@ -1,6 +1,5 @@
package com.tinkerpop.blueprints.pgm;

import com.tinkerpop.blueprints.BaseTest;
import com.tinkerpop.blueprints.pgm.impls.GraphTest;

import java.util.HashSet;
Expand Down Expand Up @@ -259,20 +258,4 @@ public void testExceptionOnIndexOverwrite() {
}
printPerformance(graphName, loop, "attempt(s) to overwrite existing indices", this.stopWatch());
}

public void testNoConcurrentModificationException() {
IndexableGraph graph = (IndexableGraph) graphTest.getGraphInstance();
for (int i = 0; i < 25; i++) {
graph.addEdge(null, graph.addVertex(null), graph.addVertex(null), "test");
}
assertEquals(BaseTest.count(graph.getVertices()), 50);
assertEquals(BaseTest.count(graph.getEdges()), 25);
for (final Edge edge : graph.getIndex(Index.EDGES, Edge.class).get("label", "test")) {
graph.removeEdge(edge);
}
assertEquals(BaseTest.count(graph.getVertices()), 50);
assertEquals(BaseTest.count(graph.getEdges()), 0);
graph.shutdown();
}

}
@@ -1,5 +1,6 @@
package com.tinkerpop.blueprints.pgm;

import com.tinkerpop.blueprints.BaseTest;
import com.tinkerpop.blueprints.pgm.impls.GraphTest;

import java.util.Random;
Expand All @@ -20,17 +21,17 @@ public TransactionalGraphTestSuite(final GraphTest graphTest) {
public void testConstructionBufferSizeOne() {
TransactionalGraph graph = (TransactionalGraph) graphTest.getGraphInstance();
this.stopWatch();
assertEquals(graph.getTransactionBuffer(), 1);
assertEquals(graph.getMaxBufferSize(), 1);
printPerformance(graph.toString(), 1, "transaction mode retrieved", this.stopWatch());
graph.shutdown();
}

public void testTransactionsForVertices() {
TransactionalGraph graph = (TransactionalGraph) graphTest.getGraphInstance();
if (graphTest.supportsVertexIteration) {
graph.setTransactionBuffer(1);
graph.setMaxBufferSize(1);
graph.addVertex(null);
graph.setTransactionBuffer(0);
graph.setMaxBufferSize(0);

this.stopWatch();
graph.startTransaction();
Expand Down Expand Up @@ -66,7 +67,7 @@ public void testTransactionsForVertices() {
public void testBruteVertexTransactions() {
TransactionalGraph graph = (TransactionalGraph) graphTest.getGraphInstance();
if (graphTest.supportsVertexIteration) {
graph.setTransactionBuffer(0);
graph.setMaxBufferSize(0);

this.stopWatch();
for (int i = 0; i < 100; i++) {
Expand Down Expand Up @@ -116,11 +117,11 @@ public void testBruteVertexTransactions() {

public void testTransactionsForEdges() {
TransactionalGraph graph = (TransactionalGraph) graphTest.getGraphInstance();
graph.setTransactionBuffer(1);
graph.setMaxBufferSize(1);
Vertex v = graph.addVertex(null);
Vertex u = graph.addVertex(null);
graph.setTransactionBuffer(0);
assertEquals(graph.getTransactionBuffer(), 0);
graph.setMaxBufferSize(0);
assertEquals(graph.getMaxBufferSize(), 0);

this.stopWatch();
graph.startTransaction();
Expand Down Expand Up @@ -166,7 +167,7 @@ public void testTransactionsForEdges() {

public void testBruteEdgeTransactions() {
TransactionalGraph graph = (TransactionalGraph) graphTest.getGraphInstance();
graph.setTransactionBuffer(0);
graph.setMaxBufferSize(0);
this.stopWatch();
for (int i = 0; i < 100; i++) {
graph.startTransaction();
Expand Down Expand Up @@ -237,7 +238,7 @@ public void testBruteEdgeTransactions() {
public void testPropertyTransactions() {
TransactionalGraph graph = (TransactionalGraph) graphTest.getGraphInstance();
if (!graphTest.isRDFModel) {
graph.setTransactionBuffer(0);
graph.setMaxBufferSize(0);

this.stopWatch();
graph.startTransaction();
Expand Down Expand Up @@ -297,7 +298,7 @@ public void testPropertyTransactions() {
public void testIndexTransactions() {
TransactionalGraph graph = (TransactionalGraph) graphTest.getGraphInstance();
if (graphTest.supportsVertexIndex) {
graph.setTransactionBuffer(0);
graph.setMaxBufferSize(0);

this.stopWatch();
graph.startTransaction();
Expand Down Expand Up @@ -370,7 +371,7 @@ public void testAutomaticIndexExceptionRollback() {

public void testNestedManualTransactions() {
TransactionalGraph graph = (TransactionalGraph) graphTest.getGraphInstance();
graph.setTransactionBuffer(0);
graph.setMaxBufferSize(0);
graph.startTransaction();
RuntimeException ex = null;
try {
Expand All @@ -387,7 +388,7 @@ public void testRollbackOpenTxOnGraphClose() {
if (graphTest.isPersistent) {
TransactionalGraph graph = (TransactionalGraph) graphTest.getGraphInstance();
Object v1id = graph.addVertex(null).getId();
graph.setTransactionBuffer(0);
graph.setMaxBufferSize(0);
graph.startTransaction();
Object v2id = graph.addVertex(null).getId();
graph.shutdown();
Expand All @@ -398,6 +399,19 @@ public void testRollbackOpenTxOnGraphClose() {
}
}

public void testBulkTransactions() {
TransactionalGraph graph = (TransactionalGraph) graphTest.getGraphInstance();
graph.setMaxBufferSize(15);
for (int i = 0; i < 3; i++) {
graph.addEdge(null, graph.addVertex(null), graph.addVertex(null), convertId("test"));
}
assertEquals(BaseTest.count(graph.getEdges()), 3);
graph.stopTransaction(TransactionalGraph.Conclusion.FAILURE);
assertEquals(BaseTest.count(graph.getEdges()), 0);

graph.shutdown();
}

public void testCompetingThreads() {
final TransactionalGraph graph = (TransactionalGraph) graphTest.getGraphInstance();
int totalThreads = 250;
Expand All @@ -424,7 +438,7 @@ public void run() {
edges.getAndAdd(1);

} else {
graph.setTransactionBuffer(0);
graph.setMaxBufferSize(0);
graph.startTransaction();
Vertex a = graph.addVertex(null);
Vertex b = graph.addVertex(null);
Expand Down

0 comments on commit efbc6ce

Please sign in to comment.