Skip to content

Commit

Permalink
Initial upgrade to TP3 of core Titan classes. Does not build yet (int…
Browse files Browse the repository at this point in the history
…ermediate commit).
  • Loading branch information
mbroecheler committed Apr 8, 2015
1 parent fce384d commit 24bb478
Show file tree
Hide file tree
Showing 12 changed files with 228 additions and 179 deletions.
Expand Up @@ -2,6 +2,8 @@


// TODO is this vestigial now that TP3's VertexProperty.Cardinality exists? // TODO is this vestigial now that TP3's VertexProperty.Cardinality exists?


import org.apache.tinkerpop.gremlin.structure.VertexProperty;

/** /**
* The cardinality of the values associated with given key for a particular element. * The cardinality of the values associated with given key for a particular element.
* *
Expand All @@ -25,5 +27,23 @@ public enum Cardinality {
*/ */
SET; SET;


public org.apache.tinkerpop.gremlin.structure.VertexProperty.Cardinality convert() {
switch (this) {
case SINGLE: return VertexProperty.Cardinality.single;
case LIST: return VertexProperty.Cardinality.list;
case SET: return VertexProperty.Cardinality.set;
default: throw new AssertionError("Unrecognized cardinality: " + this);
}
}

public static Cardinality convert(org.apache.tinkerpop.gremlin.structure.VertexProperty.Cardinality cardinality) {
switch (cardinality) {
case single: return SINGLE;
case list: return LIST;
case set: return SET;
default: throw new AssertionError("Unrecognized cardinality: " + cardinality);
}
}



} }
Expand Up @@ -52,7 +52,7 @@ public interface TitanTransaction extends TitanGraphTransaction {
/** /**
* Retrieves the vertex for the specified id. * Retrieves the vertex for the specified id.
* *
* This method is intended for internal use only. Use {@link com.tinkerpop.gremlin.structure.Graph#V(Object...)} instead. * This method is intended for internal use only. Use {@link org.apache.tinkerpop.gremlin.structure.Graph#vertices(Object...)} instead.
* *
* @param id id of the vertex to retrieve * @param id id of the vertex to retrieve
* @return vertex with the given id if it exists, else null * @return vertex with the given id if it exists, else null
Expand Down
Expand Up @@ -258,19 +258,19 @@ private synchronized void closeInternal() {
} }
} }


private synchronized void removeHook() { private synchronized void removeHook() {
if (null == shutdownHook) if (null == shutdownHook)
return; return;


ShutdownThread tmp = shutdownHook; ShutdownThread tmp = shutdownHook;
shutdownHook = null; shutdownHook = null;
// Remove shutdown hook to avoid reference retention // Remove shutdown hook to avoid reference retention
try { try {
Runtime.getRuntime().removeShutdownHook(tmp); Runtime.getRuntime().removeShutdownHook(tmp);
} catch (IllegalStateException e) { } catch (IllegalStateException e) {
log.warn("Failed to remove shutdown hook", e); log.warn("Failed to remove shutdown hook", e);
}
} }
}




// ################### Simple Getters ######################### // ################### Simple Getters #########################
Expand Down
Expand Up @@ -14,6 +14,7 @@
import com.thinkaurelius.titan.diskstorage.keycolumnvalue.scan.StandardScanner; import com.thinkaurelius.titan.diskstorage.keycolumnvalue.scan.StandardScanner;
import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration; import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration;
import com.thinkaurelius.titan.graphdb.database.StandardTitanGraph; import com.thinkaurelius.titan.graphdb.database.StandardTitanGraph;
import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects; import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
import org.apache.tinkerpop.gremlin.process.computer.ComputerResult; import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
Expand All @@ -24,6 +25,7 @@
import org.apache.tinkerpop.gremlin.process.computer.util.GraphComputerHelper; import org.apache.tinkerpop.gremlin.process.computer.util.GraphComputerHelper;
import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Vertex; import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.VertexProperty;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory; import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -267,9 +269,9 @@ public Map<String,Object> apply(@Nullable Map<String,Object> o) {
} else if (resultMode==ResultMode.LOCALTX) { } else if (resultMode==ResultMode.LOCALTX) {
resultgraph = graph.newTransaction(); resultgraph = graph.newTransaction();
for (Map.Entry<Long, Map<String, Object>> vprop : mutatedProperties.entrySet()) { for (Map.Entry<Long, Map<String, Object>> vprop : mutatedProperties.entrySet()) {
Vertex v = resultgraph.V(vprop.getKey()).next(); Vertex v = resultgraph.vertices(vprop.getKey()).next();
for (Map.Entry<String,Object> prop : vprop.getValue().entrySet()) { for (Map.Entry<String,Object> prop : vprop.getValue().entrySet()) {
v.singleProperty(prop.getKey(),prop.getValue()); v.property(VertexProperty.Cardinality.single,prop.getKey(), prop.getValue());
} }
} }
} }
Expand All @@ -278,7 +280,7 @@ public Map<String,Object> apply(@Nullable Map<String,Object> o) {
// update runtime and return the newly computed graph // update runtime and return the newly computed graph
this.memory.setRuntime(System.currentTimeMillis() - time); this.memory.setRuntime(System.currentTimeMillis() - time);
this.memory.complete(); this.memory.complete();
return new ComputerResult(resultgraph, this.memory); return new DefaultComputerResult(resultgraph, this.memory);
}); });
} }


Expand All @@ -301,7 +303,7 @@ public void run() {
for (Map.Entry<Long, Map<String, Object>> vprop : properties) { for (Map.Entry<Long, Map<String, Object>> vprop : properties) {
Vertex v = tx.getVertex(vprop.getKey()); Vertex v = tx.getVertex(vprop.getKey());
for (Map.Entry<String,Object> prop : vprop.getValue().entrySet()) { for (Map.Entry<String,Object> prop : vprop.getValue().entrySet()) {
v.singleProperty(prop.getKey(),prop.getValue()); v.property(VertexProperty.Cardinality.single,prop.getKey(), prop.getValue());
} }
} }
tx.commit(); tx.commit();
Expand All @@ -324,8 +326,8 @@ public String toString() {
public Features features() { public Features features() {
return new Features() { return new Features() {
@Override @Override
public boolean supportsWorkerPersistenceBetweenIterations() { public boolean supportsResultGraphPersistCombination(final ResultGraph resultGraph, final Persist persist) {
return false; //?
} }


@Override @Override
Expand Down
Expand Up @@ -44,7 +44,7 @@ <V> TitanVertexProperty<V> constructProperty(String key, V value) {
} }


@Override @Override
public <V> Iterator<VertexProperty<V>> propertyIterator(String... keys) { public <V> Iterator<VertexProperty<V>> properties(String... keys) {
if (vertexMemory.elementKeyMap.isEmpty()) return Collections.emptyIterator(); if (vertexMemory.elementKeyMap.isEmpty()) return Collections.emptyIterator();
if (keys==null || keys.length==0) { if (keys==null || keys.length==0) {
return Collections.emptyIterator(); //Do NOT return compute keys as part of all the properties... return Collections.emptyIterator(); //Do NOT return compute keys as part of all the properties...
Expand All @@ -66,14 +66,10 @@ public boolean supports(String key) {
} }


@Override @Override
public <V> TitanVertexProperty<V> property(String key, V value) { public <V> TitanVertexProperty<V> property(VertexProperty.Cardinality cardinality, String key, V value) {
return singleProperty(key,value);
}

@Override
public <V> TitanVertexProperty<V> singleProperty(String key, V value) {
if (!supports(key)) throw GraphComputer.Exceptions.providedKeyIsNotAnElementComputeKey(key); if (!supports(key)) throw GraphComputer.Exceptions.providedKeyIsNotAnElementComputeKey(key);
Preconditions.checkArgument(value != null); Preconditions.checkArgument(value != null);
Preconditions.checkArgument(cardinality== VertexProperty.Cardinality.single,"Only single cardinality is supported, provided: %s",cardinality);
vertexMemory.setProperty(vertexId, key, value); vertexMemory.setProperty(vertexId, key, value);
return constructProperty(key,value); return constructProperty(key,value);
} }
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
Expand Down Expand Up @@ -99,7 +100,8 @@ public void process(TitanVertex vertex, ScanMetrics metrics) {
for (MessageScope scope : scopes) { for (MessageScope scope : scopes) {
if (scope instanceof MessageScope.Local) { if (scope instanceof MessageScope.Local) {
M combinedMsg = null; M combinedMsg = null;
for (M msg : vh.receiveMessages(scope)) { for (Iterator<M> msgIter = vh.receiveMessages(scope); msgIter.hasNext(); ) {
M msg = msgIter.next();
if (combinedMsg==null) combinedMsg=msg; if (combinedMsg==null) combinedMsg=msg;
else combinedMsg = combiner.combine(combinedMsg,msg); else combinedMsg = combiner.combine(combinedMsg,msg);
} }
Expand Down
Expand Up @@ -16,16 +16,14 @@
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
import org.apache.tinkerpop.gremlin.structure.*; import org.apache.tinkerpop.gremlin.structure.*;
import org.apache.tinkerpop.gremlin.structure.io.DefaultIo; import org.apache.tinkerpop.gremlin.structure.io.DefaultIo;
import org.apache.tinkerpop.gremlin.structure.util.AbstractTransaction;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory; import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import java.util.Collection; import java.util.*;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
Expand All @@ -42,10 +40,7 @@ public abstract class TitanBlueprintsGraph implements TitanGraph {
LoggerFactory.getLogger(TitanBlueprintsGraph.class); LoggerFactory.getLogger(TitanBlueprintsGraph.class);




@Override
public Io io() {
return new TitanIo(this);
}


// ########## TRANSACTION HANDLING ########################### // ########## TRANSACTION HANDLING ###########################


Expand All @@ -67,7 +62,7 @@ private TitanBlueprintsTransaction getAutoStartTx() {


TitanBlueprintsTransaction tx = txs.get(); TitanBlueprintsTransaction tx = txs.get();
Preconditions.checkState(tx!=null,"Invalid read-write behavior configured: " + Preconditions.checkState(tx!=null,"Invalid read-write behavior configured: " +
"Should either open transaction or throw exception. [%s]",tinkerpopTxContainer.readWriteBehavior); "Should either open transaction or throw exception.");
return tx; return tx;
} }


Expand Down Expand Up @@ -112,6 +107,11 @@ public Configuration configuration() {
return config.getLocalConfiguration(); return config.getLocalConfiguration();
} }


@Override
public Io io() {
return new TitanIo(this);
}

// ########## TRANSACTIONAL FORWARDING ########################### // ########## TRANSACTIONAL FORWARDING ###########################


@Override @Override
Expand All @@ -125,27 +125,30 @@ public TitanVertex addVertex(Object... keyValues) {
// } // }


@Override @Override
public GraphTraversal<Vertex, Vertex> V(Object... ids) { public Iterator<Vertex> vertices(Object... vertexIds) {
return getAutoStartTx().V(ids); return getAutoStartTx().vertices(vertexIds);
} }


@Override @Override
public GraphTraversal<Edge, Edge> E(Object... ids) { public Iterator<Edge> edges(Object... edgeIds) {
return getAutoStartTx().E(ids); return getAutoStartTx().edges(edgeIds);
} }


@Override @Override
public TitanGraphComputer compute(final Class... graphComputerClass) { public <C extends GraphComputer> C compute(Class<C> graphComputerClass) throws IllegalArgumentException {
if (graphComputerClass.length > 1) if (!graphComputerClass.equals(FulgoraGraphComputer.class)) {
throw Graph.Exceptions.onlyOneOrNoGraphComputerClass(); throw Graph.Exceptions.graphDoesNotSupportProvidedGraphComputer(graphComputerClass);
if (graphComputerClass.length == 0 || graphComputerClass[0].equals(FulgoraGraphComputer.class)) {
StandardTitanGraph graph = (StandardTitanGraph)this;
return new FulgoraGraphComputer(graph,graph.getConfiguration().getConfiguration());
} else { } else {
throw Graph.Exceptions.graphDoesNotSupportProvidedGraphComputer(graphComputerClass[0]); return (C)compute();
} }
} }


@Override
public FulgoraGraphComputer compute() throws IllegalArgumentException {
StandardTitanGraph graph = (StandardTitanGraph)this;
return new FulgoraGraphComputer(graph,graph.getConfiguration().getConfiguration());
}

@Override @Override
public TitanVertex addVertex(String vertexLabel) { public TitanVertex addVertex(String vertexLabel) {
return getAutoStartTx().addVertex(vertexLabel); return getAutoStartTx().addVertex(vertexLabel);
Expand Down Expand Up @@ -246,32 +249,27 @@ public VertexLabel getOrCreateVertexLabel(String name) {






class GraphTransaction implements Transaction { class GraphTransaction extends AbstractTransaction {


private Consumer<Transaction> readWriteBehavior = READ_WRITE_BEHAVIOR.AUTO; public GraphTransaction() {
private Consumer<Transaction> closeBehavior = CLOSE_BEHAVIOR.COMMIT; super(TitanBlueprintsGraph.this);
}


@Override @Override
public void open() { public void doOpen() {
if (isOpen()) throw Exceptions.transactionAlreadyOpen();
startNewTx(); startNewTx();
} }


@Override @Override
public void commit() { public void doCommit() {
getAutoStartTx().commit(); getAutoStartTx().commit();
} }


@Override @Override
public void rollback() { public void doRollback() {
getAutoStartTx().rollback(); getAutoStartTx().rollback();
} }


@Override
public <R> Workload<R> submit(Function<Graph, R> graphRFunction) {
return new Workload<R>(TitanBlueprintsGraph.this,graphRFunction);
}

@Override @Override
public TitanTransaction create() { public TitanTransaction create() {
return newTransaction(); return newTransaction();
Expand All @@ -283,37 +281,28 @@ public boolean isOpen() {
return tx!=null && tx.isOpen(); return tx!=null && tx.isOpen();
} }


@Override
public void readWrite() {
readWriteBehavior.accept(this);
}

@Override @Override
public void close() { public void close() {
close(this); close(this);
} }


void close(Transaction tx) { void close(Transaction tx) {
closeBehavior.accept(tx); closeConsumer.accept(tx);
Preconditions.checkState(!tx.isOpen(),"Invalid close behavior configured: Should close transaction. [%s]",closeBehavior); Preconditions.checkState(!tx.isOpen(),"Invalid close behavior configured: Should close transaction. [%s]",closeConsumer);
} }


@Override @Override
public Transaction onReadWrite(Consumer<Transaction> transactionConsumer) { public Transaction onReadWrite(Consumer<Transaction> transactionConsumer) {
if (transactionConsumer==null) throw Exceptions.onReadWriteBehaviorCannotBeNull();
Preconditions.checkArgument(transactionConsumer instanceof READ_WRITE_BEHAVIOR, Preconditions.checkArgument(transactionConsumer instanceof READ_WRITE_BEHAVIOR,
"Only READ_WRITE_BEHAVIOR instances are accepted argument, got: %s", transactionConsumer); "Only READ_WRITE_BEHAVIOR instances are accepted argument, got: %s", transactionConsumer);
this.readWriteBehavior = transactionConsumer; return super.onReadWrite(transactionConsumer);
return this;
} }


@Override @Override
public Transaction onClose(Consumer<Transaction> transactionConsumer) { public Transaction onClose(Consumer<Transaction> transactionConsumer) {
if (transactionConsumer==null) throw Exceptions.onCloseBehaviorCannotBeNull();
Preconditions.checkArgument(transactionConsumer instanceof CLOSE_BEHAVIOR, Preconditions.checkArgument(transactionConsumer instanceof CLOSE_BEHAVIOR,
"Only CLOSE_BEHAVIOR instances are accepted argument, got: %s", transactionConsumer); "Only CLOSE_BEHAVIOR instances are accepted argument, got: %s", transactionConsumer);
this.closeBehavior = transactionConsumer; return super.onClose(transactionConsumer);
return this;
} }
} }


Expand Down

0 comments on commit 24bb478

Please sign in to comment.