Skip to content

Commit

Permalink
Fix compile errors in olap package
Browse files Browse the repository at this point in the history
I'm not sure that I got the GraphComputer.Persist and
GraphComputer.ResultGraph settings correct in the test VertexPrograms.
The expression changes were relatively straightforward though, with
one minor exception in the form of a second-degree traversal.
  • Loading branch information
dalaro committed Apr 10, 2015
1 parent 7eaadfd commit 2587f97
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 29 deletions.
60 changes: 38 additions & 22 deletions titan-test/src/main/java/com/thinkaurelius/titan/olap/OLAPTest.java
@@ -1,8 +1,8 @@
package com.thinkaurelius.titan.olap; package com.thinkaurelius.titan.olap;


import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.thinkaurelius.titan.core.*; import com.thinkaurelius.titan.core.*;
import com.thinkaurelius.titan.diskstorage.keycolumnvalue.scan.ScanJob; import com.thinkaurelius.titan.diskstorage.keycolumnvalue.scan.ScanJob;
import com.thinkaurelius.titan.diskstorage.keycolumnvalue.scan.ScanMetrics; import com.thinkaurelius.titan.diskstorage.keycolumnvalue.scan.ScanMetrics;
Expand All @@ -12,13 +12,12 @@
import org.apache.tinkerpop.gremlin.process.computer.*; import org.apache.tinkerpop.gremlin.process.computer.*;
import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce; import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
import org.apache.tinkerpop.gremlin.process.computer.util.StaticVertexProgram; import org.apache.tinkerpop.gremlin.process.computer.util.StaticVertexProgram;
import org.apache.tinkerpop.gremlin.process.graph.AnonymousGraphTraversal; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
import org.apache.tinkerpop.gremlin.structure.Direction; import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Vertex; import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.VertexProperty;
import org.apache.tinkerpop.gremlin.util.StreamFactory; import org.apache.tinkerpop.gremlin.util.StreamFactory;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
Expand All @@ -29,6 +28,7 @@
import static com.thinkaurelius.titan.testutil.TitanAssert.assertCount; import static com.thinkaurelius.titan.testutil.TitanAssert.assertCount;
import static org.junit.Assert.*; import static org.junit.Assert.*;



/** /**
* @author Matthias Broecheler (me@matthiasb.com) * @author Matthias Broecheler (me@matthiasb.com)
*/ */
Expand Down Expand Up @@ -65,7 +65,7 @@ private int generateRandomGraph(int numV) {
mgmt.makePropertyKey("numvals").dataType(Integer.class).make(); mgmt.makePropertyKey("numvals").dataType(Integer.class).make();
finishSchema(); finishSchema();
int numE = 0; int numE = 0;
Vertex[] vs = new Vertex[numV]; TitanVertex[] vs = new TitanVertex[numV];
for (int i=0;i<numV;i++) { for (int i=0;i<numV;i++) {
vs[i] = tx.addVertex("uid",i+1); vs[i] = tx.addVertex("uid",i+1);
int numVals = random.nextInt(5)+1; int numVals = random.nextInt(5)+1;
Expand Down Expand Up @@ -99,9 +99,9 @@ public void testVertexScan() throws Exception {


@Override @Override
public void process(TitanVertex vertex, ScanMetrics metrics) { public void process(TitanVertex vertex, ScanMetrics metrics) {
long outDegree = vertex.outE("knows").count().next(); long outDegree = vertex.query().labels("knows").direction(Direction.OUT).count();
assertEquals(0,vertex.inE("knows").count().next().longValue()); assertEquals(0, vertex.query().labels("knows").direction(Direction.IN).count());
assertEquals(1, vertex.properties("uid").count().next().longValue()); assertEquals(1, vertex.query().labels("uid").propertyCount());
assertTrue(vertex.<Integer>property("uid").orElse(0) > 0); assertTrue(vertex.<Integer>property("uid").orElse(0) > 0);
metrics.incrementCustom(DEGREE_COUNT,outDegree); metrics.incrementCustom(DEGREE_COUNT,outDegree);
metrics.incrementCustom(VERTEX_COUNT); metrics.incrementCustom(VERTEX_COUNT);
Expand All @@ -124,9 +124,9 @@ public void getQueries(QueryContainer queries) {
@Override @Override
public void process(TitanVertex vertex, ScanMetrics metrics) { public void process(TitanVertex vertex, ScanMetrics metrics) {
metrics.incrementCustom(VERTEX_COUNT); metrics.incrementCustom(VERTEX_COUNT);
assertEquals(1,vertex.properties("numvals").count().next().longValue()); assertEquals(1 ,vertex.query().labels("numvals").propertyCount());
int numvals = vertex.value("numvals"); int numvals = vertex.value("numvals");
assertEquals(numvals,vertex.properties("values").count().next().longValue()); assertEquals(numvals, vertex.query().labels("values").propertyCount());
} }


@Override @Override
Expand Down Expand Up @@ -166,15 +166,15 @@ public void removeGhostVertices() throws Exception {
assertNotNull(v3); assertNotNull(v3);
v1 = getV(xx, v1id); v1 = getV(xx, v1id);
assertNotNull(v1); assertNotNull(v1);
v3.property("name","deleted"); v3.property("name", "deleted");
v3.addEdge("knows",v1); v3.addEdge("knows", v1);
xx.commit(); xx.commit();


newTx(); newTx();
assertNull(getV(tx,v3id)); assertNull(getV(tx,v3id));
v1 = getV(tx, v1id); v1 = getV(tx, v1id);
assertNotNull(v1); assertNotNull(v1);
assertEquals(v3id,v1.in("knows").next().id()); assertEquals(v3id,v1.query().direction(Direction.IN).labels("knows").vertices().iterator().next().longId());
tx.commit(); tx.commit();
mgmt.commit(); mgmt.commit();


Expand Down Expand Up @@ -219,6 +219,7 @@ public void degreeCountingDistance() throws Exception {
int numE = generateRandomGraph(numV); int numE = generateRandomGraph(numV);
clopen(); clopen();


// TODO does this iteration over TitanGraphComputer.ResultMode values imply that DegreeVariation's ResultGraph/Persist should also change?
for (TitanGraphComputer.ResultMode mode : TitanGraphComputer.ResultMode.values()) { for (TitanGraphComputer.ResultMode mode : TitanGraphComputer.ResultMode.values()) {
final TitanGraphComputer computer = graph.compute(); final TitanGraphComputer computer = graph.compute();
computer.resultMode(mode); computer.resultMode(mode);
Expand All @@ -228,18 +229,21 @@ public void degreeCountingDistance() throws Exception {
System.out.println("Execution time (ms) ["+numV+"|"+numE+"]: " + result.memory().getRuntime()); System.out.println("Execution time (ms) ["+numV+"|"+numE+"]: " + result.memory().getRuntime());
assertEquals(2,result.memory().getIteration()); assertEquals(2,result.memory().getIteration());


Graph gview = null; TitanGraphTransaction gview = null;
switch (mode) { switch (mode) {
case LOCALTX: gview = result.graph(); break; case LOCALTX: gview = (TitanGraph) result.graph(); break;
case PERSIST: newTx(); gview = tx; break; case PERSIST: newTx(); gview = tx; break;
case NONE: break; case NONE: break;
default: throw new AssertionError(mode); default: throw new AssertionError(mode);
} }
if (gview == null) continue; if (gview == null) continue;


for (Vertex v : gview.V().toList()) { for (TitanVertex v : gview.query().vertices()) {
long degree2 = ((Integer)v.value(DegreeCounter.DEGREE)).longValue(); long degree2 = ((Integer)v.value(DegreeCounter.DEGREE)).longValue();
long actualDegree2 = v.out().out().count().next(); long actualDegree2 = 0;
for (TitanVertex w : v.query().direction(Direction.OUT).vertices()) {
actualDegree2 += Iterables.size(w.query().direction(Direction.OUT).vertices());
}
assertEquals(actualDegree2,degree2); assertEquals(actualDegree2,degree2);
} }
if (mode== TitanGraphComputer.ResultMode.LOCALTX) { if (mode== TitanGraphComputer.ResultMode.LOCALTX) {
Expand All @@ -253,7 +257,7 @@ public static class DegreeCounter extends StaticVertexProgram<Integer> {


public static final String DEGREE = "degree"; public static final String DEGREE = "degree";
public static final MessageCombiner<Integer> ADDITION = (a,b) -> a+b; public static final MessageCombiner<Integer> ADDITION = (a,b) -> a+b;
public static final MessageScope.Local<Integer> DEG_MSG = MessageScope.Local.of(AnonymousGraphTraversal.Tokens.__::inE); public static final MessageScope.Local<Integer> DEG_MSG = MessageScope.Local.of(__::inE);


private final int length; private final int length;


Expand All @@ -277,7 +281,7 @@ public void execute(Vertex vertex, Messenger<Integer> messenger, Memory memory)
messenger.sendMessage(DEG_MSG, 1); messenger.sendMessage(DEG_MSG, 1);
} else { } else {
int degree = StreamFactory.stream(messenger.receiveMessages(DEG_MSG)).reduce(0, (a, b) -> a + b); int degree = StreamFactory.stream(messenger.receiveMessages(DEG_MSG)).reduce(0, (a, b) -> a + b);
vertex.property(VertexProperty.Cardinality.single, DEGREE, degree); vertex.property(VertexProperty.Cardinality.single, DEGREE, degree);
if (memory.getIteration()<length) messenger.sendMessage(DEG_MSG, degree); if (memory.getIteration()<length) messenger.sendMessage(DEG_MSG, degree);
} }
} }
Expand All @@ -303,6 +307,18 @@ public Set<MessageScope> getMessageScopes(Memory memory) {
else return Collections.EMPTY_SET; else return Collections.EMPTY_SET;
} }


// TODO i'm not sure these preferences are correct

@Override
public GraphComputer.ResultGraph getPreferredResultGraph() {
return GraphComputer.ResultGraph.NEW;
}

@Override
public GraphComputer.Persist getPreferredPersist() {
return GraphComputer.Persist.VERTEX_PROPERTIES;
}

@Override @Override
public Features getFeatures() { public Features getFeatures() {
return new Features() { return new Features() {
Expand Down Expand Up @@ -425,7 +441,7 @@ public void testPageRank() throws ExecutionException, InterruptedException {
} }


double correctPRSum = 0; double correctPRSum = 0;
Iterator<Vertex> iv = tx.query().vertices(); Iterator<TitanVertex> iv = tx.query().vertices().iterator();
while (iv.hasNext()) { while (iv.hasNext()) {
correctPRSum += correctPR[iv.next().<Integer>value("distance")]; correctPRSum += correctPR[iv.next().<Integer>value("distance")];
} }
Expand Down Expand Up @@ -482,8 +498,8 @@ public void testShortestDistance() throws Exception {
assertCount(numV,tx.query().vertices()); assertCount(numV,tx.query().vertices());
assertCount(numE,tx.query().edges()); assertCount(numE,tx.query().edges());


log.debug("seed inE count: {}", vertex.inE().count().next()); log.debug("seed inE count: {}", vertex.query().direction(Direction.IN).count());
log.debug("seed outE count: {}", vertex.outE().count().next()); log.debug("seed outE count: {}", vertex.query().direction(Direction.OUT).count());


clopen(); clopen();


Expand Down
@@ -1,18 +1,20 @@
package com.thinkaurelius.titan.olap; package com.thinkaurelius.titan.olap;


import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import org.apache.tinkerpop.gremlin.process.computer.Memory; import org.apache.tinkerpop.gremlin.process.computer.Memory;
import org.apache.tinkerpop.gremlin.process.computer.MessageScope; import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
import org.apache.tinkerpop.gremlin.process.computer.Messenger; import org.apache.tinkerpop.gremlin.process.computer.Messenger;
import org.apache.tinkerpop.gremlin.process.computer.util.AbstractVertexProgramBuilder; import org.apache.tinkerpop.gremlin.process.computer.util.AbstractVertexProgramBuilder;
import org.apache.tinkerpop.gremlin.process.computer.util.StaticVertexProgram; import org.apache.tinkerpop.gremlin.process.computer.util.StaticVertexProgram;
import org.apache.tinkerpop.gremlin.structure.Vertex; import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.VertexProperty;
import org.apache.tinkerpop.gremlin.util.StreamFactory; import org.apache.tinkerpop.gremlin.util.StreamFactory;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;


import java.util.Set; import java.util.Set;


import static com.tinkerpop.gremlin.process.graph.AnonymousGraphTraversal.Tokens.__; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;


/** /**
* This implementation is only intended for testing. * This implementation is only intended for testing.
Expand Down Expand Up @@ -75,13 +77,13 @@ public void execute(Vertex vertex, Messenger<Double> messenger, Memory memory) {
} else if (1 == memory.getIteration()) { } else if (1 == memory.getIteration()) {
double initialPageRank = 1D / vertexCount; double initialPageRank = 1D / vertexCount;
double edgeCount = StreamFactory.stream(messenger.receiveMessages(inE)).reduce(0D, (a, b) -> a + b); double edgeCount = StreamFactory.stream(messenger.receiveMessages(inE)).reduce(0D, (a, b) -> a + b);
vertex.property(VertexProperty.Cardinality.single, PAGE_RANK, initialPageRank); vertex.property(VertexProperty.Cardinality.single, PAGE_RANK, initialPageRank);
vertex.property(VertexProperty.Cardinality.single, OUTGOING_EDGE_COUNT, edgeCount); vertex.property(VertexProperty.Cardinality.single, OUTGOING_EDGE_COUNT, edgeCount);
messenger.sendMessage(outE, initialPageRank / edgeCount); messenger.sendMessage(outE, initialPageRank / edgeCount);
} else { } else {
double newPageRank = StreamFactory.stream(messenger.receiveMessages(outE)).reduce(0D, (a, b) -> a + b); double newPageRank = StreamFactory.stream(messenger.receiveMessages(outE)).reduce(0D, (a, b) -> a + b);
newPageRank = (dampingFactor * newPageRank) + ((1D - dampingFactor) / vertexCount); newPageRank = (dampingFactor * newPageRank) + ((1D - dampingFactor) / vertexCount);
vertex.property(VertexProperty.Cardinality.single, PAGE_RANK, newPageRank); vertex.property(VertexProperty.Cardinality.single, PAGE_RANK, newPageRank);
messenger.sendMessage(outE, newPageRank / vertex.<Double>value(OUTGOING_EDGE_COUNT)); messenger.sendMessage(outE, newPageRank / vertex.<Double>value(OUTGOING_EDGE_COUNT));
} }
} }
Expand All @@ -96,6 +98,16 @@ public Set<MessageScope> getMessageScopes(Memory memory) {
return ImmutableSet.of(outE, inE); return ImmutableSet.of(outE, inE);
} }


@Override
public GraphComputer.ResultGraph getPreferredResultGraph() {
return GraphComputer.ResultGraph.ORIGINAL;
}

@Override
public GraphComputer.Persist getPreferredPersist() {
return GraphComputer.Persist.VERTEX_PROPERTIES;
}

@Override @Override
public Features getFeatures() { public Features getFeatures() {
return new Features() { return new Features() {
Expand Down
@@ -1,6 +1,7 @@
package com.thinkaurelius.titan.olap; package com.thinkaurelius.titan.olap;


import org.apache.tinkerpop.gremlin.process.Traversal; import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.computer.Memory; import org.apache.tinkerpop.gremlin.process.computer.Memory;
import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner; import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
import org.apache.tinkerpop.gremlin.process.computer.MessageScope; import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
Expand All @@ -19,11 +20,12 @@


import java.util.Arrays; import java.util.Arrays;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.function.Supplier; import java.util.function.Supplier;


import static com.tinkerpop.gremlin.process.graph.AnonymousGraphTraversal.Tokens.__; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;


public class ShortestDistanceVertexProgram extends StaticVertexProgram<Long> { public class ShortestDistanceVertexProgram extends StaticVertexProgram<Long> {


Expand Down Expand Up @@ -90,6 +92,16 @@ public Set<MessageScope> getMessageScopes(final Memory memory) {
return set; return set;
} }


@Override
public GraphComputer.ResultGraph getPreferredResultGraph() {
return GraphComputer.ResultGraph.ORIGINAL;
}

@Override
public GraphComputer.Persist getPreferredPersist() {
return GraphComputer.Persist.VERTEX_PROPERTIES;
}

@Override @Override
public void setup(final Memory memory) { public void setup(final Memory memory) {


Expand All @@ -106,7 +118,7 @@ public void execute(final Vertex vertex, Messenger<Long> messenger, final Memory
messenger.sendMessage(incidentMessageScope, 0L); messenger.sendMessage(incidentMessageScope, 0L);
} }
} else { } else {
Iterable<Long> distances = messenger.receiveMessages(incidentMessageScope); Iterator<Long> distances = messenger.receiveMessages(incidentMessageScope);


// Find minimum distance among all incoming messages, or null if no messages came in // Find minimum distance among all incoming messages, or null if no messages came in
Long shortestDistanceSeenOnThisIteration = Long shortestDistanceSeenOnThisIteration =
Expand Down

0 comments on commit 2587f97

Please sign in to comment.