diff --git a/algo/src/main/java/org/neo4j/graphalgo/PageRankProc.java b/algo/src/main/java/org/neo4j/graphalgo/PageRankProc.java index 231fe4c80..03d3000eb 100644 --- a/algo/src/main/java/org/neo4j/graphalgo/PageRankProc.java +++ b/algo/src/main/java/org/neo4j/graphalgo/PageRankProc.java @@ -201,6 +201,7 @@ private PageRankResult evaluate( PageRankAlgorithm prAlgo; if(weightPropertyKey != null) { + final boolean cacheWeights = configuration.get("cacheWeights", false); prAlgo = PageRankAlgorithm.weightedOf( tracker, graph, @@ -208,7 +209,8 @@ private PageRankResult evaluate( sourceNodeIds, Pools.DEFAULT, concurrency, - batchSize); + batchSize, + cacheWeights); } else { prAlgo = PageRankAlgorithm.of( tracker, diff --git a/algo/src/main/java/org/neo4j/graphalgo/impl/WeightedDegreeCentrality.java b/algo/src/main/java/org/neo4j/graphalgo/impl/WeightedDegreeCentrality.java index cacb6821a..ce9ddd2a1 100644 --- a/algo/src/main/java/org/neo4j/graphalgo/impl/WeightedDegreeCentrality.java +++ b/algo/src/main/java/org/neo4j/graphalgo/impl/WeightedDegreeCentrality.java @@ -1,16 +1,13 @@ package org.neo4j.graphalgo.impl; import org.neo4j.graphalgo.api.Graph; -import org.neo4j.graphalgo.api.WeightedRelationshipConsumer; import org.neo4j.graphalgo.core.utils.ParallelUtil; import org.neo4j.graphalgo.core.utils.Pools; -import org.neo4j.graphalgo.impl.pagerank.HugeComputeStep; import org.neo4j.graphdb.Direction; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -22,7 +19,9 @@ public class WeightedDegreeCentrality extends Algorithm tasks = new ArrayList<>(); + List tasks = new ArrayList<>(); for (int i = 0; i < concurrency; i++) { - tasks.add(new DegreeTask()); + if(cacheWeights) { + tasks.add(new DegreeAndWeightsTask()); + } else { + tasks.add(new DegreeTask()); + } } ParallelUtil.runWithConcurrency(concurrency, tasks, executor); @@ -79,6 +83,36 @@ public void run() { if(weight > 0) { weightedDegree[0] += weight; } + + return true; + }); + + degrees[nodeId] = weightedDegree[0]; + + } + } + } + + private class DegreeAndWeightsTask implements Runnable { + @Override + public void run() { + for (; ; ) { + final int nodeId = nodeQueue.getAndIncrement(); + if (nodeId >= nodeCount || !running()) { + return; + } + + weights[nodeId] = new double[graph.degree(nodeId, direction)]; + + int[] index = {0}; + double[] weightedDegree = new double[1]; + graph.forEachRelationship(nodeId, direction, (sourceNodeId, targetNodeId, relationId, weight) -> { + if(weight > 0) { + weightedDegree[0] += weight; + } + + weights[nodeId][index[0]] = weight; + index[0]++; return true; }); @@ -91,6 +125,9 @@ public void run() { public double[] degrees() { return degrees; } + public double[][] weights() { + return weights; + } public Stream resultStream() { return IntStream.range(0, nodeCount) diff --git a/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/BaseComputeStep.java b/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/BaseComputeStep.java index ce0ed125e..a636a8c08 100644 --- a/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/BaseComputeStep.java +++ b/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/BaseComputeStep.java @@ -1,15 +1,10 @@ package org.neo4j.graphalgo.impl.pagerank; import org.neo4j.graphalgo.api.Degrees; -import org.neo4j.graphalgo.api.RelationshipIterator; -import org.neo4j.graphalgo.api.RelationshipWeights; -import org.neo4j.graphdb.Direction; import java.util.Arrays; import java.util.stream.IntStream; -import static org.neo4j.graphalgo.core.utils.ArrayUtil.binaryLookup; - public abstract class BaseComputeStep implements ComputeStep { private static final int S_INIT = 0; private static final int S_CALC = 1; @@ -20,7 +15,6 @@ public abstract class BaseComputeStep implements ComputeStep { int[] starts; private int[] lengths; private int[] sourceNodeIds; - final RelationshipIterator relationshipIterator; final Degrees degrees; private final double alpha; @@ -38,14 +32,12 @@ public abstract class BaseComputeStep implements ComputeStep { BaseComputeStep( double dampingFactor, int[] sourceNodeIds, - RelationshipIterator relationshipIterator, Degrees degrees, int partitionSize, int startNode) { this.dampingFactor = dampingFactor; this.alpha = 1.0 - dampingFactor; this.sourceNodeIds = sourceNodeIds; - this.relationshipIterator = relationshipIterator; this.degrees = degrees; this.partitionSize = partitionSize; this.startNode = startNode; diff --git a/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/DegreeCache.java b/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/DegreeCache.java new file mode 100644 index 000000000..18d6a7c03 --- /dev/null +++ b/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/DegreeCache.java @@ -0,0 +1,26 @@ +package org.neo4j.graphalgo.impl.pagerank; + +public class DegreeCache { + + public final static DegreeCache EMPTY = new DegreeCache(new double[0], new double[0][0]); + + private double[] aggregatedDegrees; + private double[][] weights; + + public DegreeCache(double[] aggregatedDegrees, double[][] weights) { + this.aggregatedDegrees = aggregatedDegrees; + this.weights = weights; + } + + double[] aggregatedDegrees() { + return aggregatedDegrees; + } + + double[][] weights() { + return weights; + } + + boolean hasCachedValues() { + return weights.length > 0; + } +} diff --git a/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/DegreeComputer.java b/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/DegreeComputer.java index f65f47201..571265876 100644 --- a/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/DegreeComputer.java +++ b/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/DegreeComputer.java @@ -3,5 +3,5 @@ import java.util.concurrent.ExecutorService; public interface DegreeComputer { - double[] degree(ExecutorService executor, int concurrency); + DegreeCache degree(ExecutorService executor, int concurrency); } diff --git a/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/HugePageRank.java b/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/HugePageRank.java index 19a761496..c68389005 100644 --- a/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/HugePageRank.java +++ b/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/HugePageRank.java @@ -205,9 +205,7 @@ private void initializeSteps() { ? this.executor : null; WeightedDegreeCentrality degreeCentrality = new WeightedDegreeCentrality(graph, executor, concurrency, Direction.OUTGOING); - degreeCentrality.compute(); - - DegreeComputer degreeComputer = pageRankVariant.degreeComputer(graph); + degreeCentrality.compute(false); computeSteps = createComputeSteps( concurrency, @@ -270,7 +268,7 @@ private ComputeSteps createComputeSteps( Iterator parts = partitions.iterator(); DegreeComputer degreeComputer = pageRankVariant.degreeComputer(graph); - double[] aggregatedDegrees = degreeComputer.degree(pool, concurrency); + DegreeCache degreeCache = degreeComputer.degree(pool, concurrency); while (parts.hasNext()) { Partition partition = parts.next(); @@ -296,7 +294,7 @@ private ComputeSteps createComputeSteps( tracker, partitionCount, start, - aggregatedDegrees)); + degreeCache)); } long[] startArray = starts.toArray(); diff --git a/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/HugeWeightedComputeStep.java b/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/HugeWeightedComputeStep.java index f170778e0..c9dd86bbd 100644 --- a/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/HugeWeightedComputeStep.java +++ b/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/HugeWeightedComputeStep.java @@ -23,7 +23,8 @@ public class HugeWeightedComputeStep extends HugeBaseComputeStep implements Huge HugeRelationshipWeights relationshipWeights, AllocationTracker tracker, int partitionSize, - long startNode, double[] aggregatedDegrees) { + long startNode, + DegreeCache degreeCache) { super(dampingFactor, sourceNodeIds, relationshipIterator, @@ -32,7 +33,7 @@ public class HugeWeightedComputeStep extends HugeBaseComputeStep implements Huge partitionSize, startNode); this.relationshipWeights = relationshipWeights; - this.aggregatedDegrees = aggregatedDegrees; + this.aggregatedDegrees = degreeCache.aggregatedDegrees(); } void singleIteration() { diff --git a/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/NoOpDegreeComputer.java b/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/NoOpDegreeComputer.java index b34123074..9e1e72679 100644 --- a/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/NoOpDegreeComputer.java +++ b/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/NoOpDegreeComputer.java @@ -5,7 +5,7 @@ public class NoOpDegreeComputer implements DegreeComputer { @Override - public double[] degree(ExecutorService executor, int concurrency) { - return new double[0]; + public DegreeCache degree(ExecutorService executor, int concurrency) { + return DegreeCache.EMPTY; } } diff --git a/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/NonWeightedComputeStep.java b/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/NonWeightedComputeStep.java index 5a23eebba..284fd10f9 100644 --- a/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/NonWeightedComputeStep.java +++ b/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/NonWeightedComputeStep.java @@ -9,6 +9,7 @@ final class NonWeightedComputeStep extends BaseComputeStep implements RelationshipConsumer { + private final RelationshipIterator relationshipIterator; NonWeightedComputeStep( double dampingFactor, @@ -17,7 +18,8 @@ final class NonWeightedComputeStep extends BaseComputeStep implements Relationsh Degrees degrees, int partitionSize, int startNode) { - super(dampingFactor, sourceNodeIds, relationshipIterator, degrees, partitionSize, startNode); + super(dampingFactor, sourceNodeIds, degrees, partitionSize, startNode); + this.relationshipIterator = relationshipIterator; } diff --git a/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/NonWeightedPageRankVariant.java b/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/NonWeightedPageRankVariant.java index 5ce18f26c..8a5766241 100644 --- a/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/NonWeightedPageRankVariant.java +++ b/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/NonWeightedPageRankVariant.java @@ -3,10 +3,13 @@ import org.neo4j.graphalgo.api.*; import org.neo4j.graphalgo.core.utils.paged.AllocationTracker; -import java.util.concurrent.ExecutorService; - public class NonWeightedPageRankVariant implements PageRankVariant { - public ComputeStep createComputeStep(double dampingFactor, int[] sourceNodeIds, RelationshipIterator relationshipIterator, Degrees degrees, RelationshipWeights relationshipWeights, int partitionCount, int start, double[] aggregatedDegrees) { + public ComputeStep createComputeStep(double dampingFactor, int[] sourceNodeIds, + RelationshipIterator relationshipIterator, + WeightedRelationshipIterator weightedRelationshipIterator, + Degrees degrees, + int partitionCount, int start, + DegreeCache degreeCache) { return new NonWeightedComputeStep( dampingFactor, sourceNodeIds, @@ -18,7 +21,7 @@ public ComputeStep createComputeStep(double dampingFactor, int[] sourceNodeIds, } @Override - public HugeNonWeightedComputeStep createHugeComputeStep(double dampingFactor, long[] sourceNodeIds, HugeRelationshipIterator relationshipIterator, HugeDegrees degrees, HugeRelationshipWeights relationshipWeights, AllocationTracker tracker, int partitionCount, long start, double[] aggregatedDegrees) { + public HugeNonWeightedComputeStep createHugeComputeStep(double dampingFactor, long[] sourceNodeIds, HugeRelationshipIterator relationshipIterator, HugeDegrees degrees, HugeRelationshipWeights relationshipWeights, AllocationTracker tracker, int partitionCount, long start, DegreeCache aggregatedDegrees) { return new HugeNonWeightedComputeStep( dampingFactor, sourceNodeIds, diff --git a/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/PageRank.java b/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/PageRank.java index 44605553a..e76745f95 100644 --- a/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/PageRank.java +++ b/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/PageRank.java @@ -106,7 +106,8 @@ public class PageRank extends Algorithm implements PageRankAlgorithm { graph, dampingFactor, sourceNodeIds, - pageRankVariant); + pageRankVariant + ); } /** @@ -218,8 +219,8 @@ private ComputeSteps createComputeSteps( double dampingFactor, int[] sourceNodeIds, RelationshipIterator relationshipIterator, + WeightedRelationshipIterator weightedRelationshipIterator, Degrees degrees, - RelationshipWeights relationshipWeights, List partitions, ExecutorService pool, PageRankVariant pageRankVariant, @@ -238,7 +239,7 @@ private ComputeSteps createComputeSteps( partitions.size()); Iterator parts = partitions.iterator(); - double[] aggregatedDegrees = degreeComputer.degree(pool, concurrency); + DegreeCache degreeCache = degreeComputer.degree(pool, concurrency); while (parts.hasNext()) { Partition partition = parts.next(); @@ -256,11 +257,11 @@ private ComputeSteps createComputeSteps( dampingFactor, sourceNodeIds, relationshipIterator, + weightedRelationshipIterator, degrees, - relationshipWeights, partitionCount, start, - aggregatedDegrees + degreeCache )); } diff --git a/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/PageRankAlgorithm.java b/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/PageRankAlgorithm.java index ccd9542fe..1fe5e18d4 100644 --- a/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/PageRankAlgorithm.java +++ b/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/PageRankAlgorithm.java @@ -37,16 +37,18 @@ public interface PageRankAlgorithm { static PageRankAlgorithm weightedOf( Graph graph, double dampingFactor, - LongStream sourceNodeIds) { - return weightedOf(AllocationTracker.EMPTY, dampingFactor, sourceNodeIds, graph); + LongStream sourceNodeIds + ) { + return weightedOf(AllocationTracker.EMPTY, dampingFactor, sourceNodeIds, graph, false); } static PageRankAlgorithm weightedOf( AllocationTracker tracker, double dampingFactor, LongStream sourceNodeIds, - Graph graph) { - WeightedPageRankVariant computeStepFactory = new WeightedPageRankVariant(); + Graph graph, + boolean cacheWeights) { + WeightedPageRankVariant computeStepFactory = new WeightedPageRankVariant(cacheWeights); if (graph instanceof HugeGraph) { HugeGraph huge = (HugeGraph) graph; return new HugePageRank(tracker, huge, dampingFactor, sourceNodeIds, computeStepFactory); @@ -127,8 +129,9 @@ static PageRankAlgorithm weightedOf( LongStream sourceNodeIds, ExecutorService pool, int concurrency, - int batchSize) { - WeightedPageRankVariant computeStepFactory = new WeightedPageRankVariant(); + int batchSize, + boolean cacheWeights) { + WeightedPageRankVariant pageRankVariant = new WeightedPageRankVariant(cacheWeights); if (graph instanceof HugeGraph) { HugeGraph huge = (HugeGraph) graph; return new HugePageRank( @@ -139,7 +142,7 @@ static PageRankAlgorithm weightedOf( huge, dampingFactor, sourceNodeIds, - computeStepFactory + pageRankVariant ); } @@ -150,6 +153,6 @@ static PageRankAlgorithm weightedOf( graph, dampingFactor, sourceNodeIds, - computeStepFactory); + pageRankVariant); } } diff --git a/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/PageRankVariant.java b/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/PageRankVariant.java index 327d502b9..d59edbcd1 100644 --- a/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/PageRankVariant.java +++ b/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/PageRankVariant.java @@ -2,16 +2,19 @@ import org.neo4j.graphalgo.api.*; import org.neo4j.graphalgo.core.utils.paged.AllocationTracker; -import org.neo4j.graphalgo.impl.WeightedDegreeCentrality; -import org.neo4j.graphdb.Direction; - -import java.util.concurrent.ExecutorService; public interface PageRankVariant { - ComputeStep createComputeStep(double dampingFactor, int[] sourceNodeIds, RelationshipIterator relationshipIterator, Degrees degrees, RelationshipWeights relationshipWeights, int partitionCount, int start, double[] aggregatedDegrees); - - HugeComputeStep createHugeComputeStep(double dampingFactor, long[] sourceNodeIds, HugeRelationshipIterator relationshipIterator, HugeDegrees degrees, HugeRelationshipWeights relationshipWeights, AllocationTracker tracker, int partitionCount, long start, double[] aggregatedDegrees); + ComputeStep createComputeStep(double dampingFactor, int[] sourceNodeIds, + RelationshipIterator relationshipIterator, + WeightedRelationshipIterator weightedRelationshipIterator, Degrees degrees, + int partitionCount, int start, + DegreeCache degreeCache); + + HugeComputeStep createHugeComputeStep(double dampingFactor, long[] sourceNodeIds, + HugeRelationshipIterator relationshipIterator, HugeDegrees degrees, + HugeRelationshipWeights relationshipWeights, AllocationTracker tracker, + int partitionCount, long start, DegreeCache aggregatedDegrees); DegreeComputer degreeComputer(Graph graph); diff --git a/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/WeightedComputeStep.java b/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/WeightedComputeStep.java index 0ebe3d0c7..05f436b7c 100644 --- a/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/WeightedComputeStep.java +++ b/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/WeightedComputeStep.java @@ -1,44 +1,39 @@ package org.neo4j.graphalgo.impl.pagerank; import org.neo4j.graphalgo.api.Degrees; -import org.neo4j.graphalgo.api.RelationshipConsumer; -import org.neo4j.graphalgo.api.RelationshipIterator; -import org.neo4j.graphalgo.api.RelationshipWeights; +import org.neo4j.graphalgo.api.WeightedRelationshipConsumer; +import org.neo4j.graphalgo.api.WeightedRelationshipIterator; import org.neo4j.graphdb.Direction; -import java.util.Arrays; -import java.util.stream.IntStream; - import static org.neo4j.graphalgo.core.utils.ArrayUtil.binaryLookup; -final class WeightedComputeStep extends BaseComputeStep implements RelationshipConsumer { - private final RelationshipWeights relationshipWeights; +final class WeightedComputeStep extends BaseComputeStep implements WeightedRelationshipConsumer { private final double[] aggregatedDegrees; + private final WeightedRelationshipIterator relationshipIterator; private double sumOfWeights; private double delta; WeightedComputeStep( double dampingFactor, int[] sourceNodeIds, - RelationshipIterator relationshipIterator, + WeightedRelationshipIterator weightedRelationshipIterator, Degrees degrees, - RelationshipWeights relationshipWeights, int partitionSize, - int startNode, double[] aggregatedDegrees) { + int startNode, + DegreeCache degreeCache) { super(dampingFactor, sourceNodeIds, - relationshipIterator, degrees, partitionSize, startNode); - this.relationshipWeights = relationshipWeights; - this.aggregatedDegrees = aggregatedDegrees; + this.relationshipIterator = weightedRelationshipIterator; + this.aggregatedDegrees = degreeCache.aggregatedDegrees(); } void singleIteration() { int startNode = this.startNode; int endNode = this.endNode; - RelationshipIterator rels = this.relationshipIterator; + WeightedRelationshipIterator rels = this.relationshipIterator; for (int nodeId = startNode; nodeId < endNode; ++nodeId) { delta = deltas[nodeId - startNode]; if (delta > 0) { @@ -46,17 +41,15 @@ void singleIteration() { if (degree > 0) { sumOfWeights = aggregatedDegrees[nodeId]; - rels.forEachRelationship(nodeId, Direction.OUTGOING,this); + rels.forEachRelationship(nodeId, Direction.OUTGOING, this); } } } } @Override - public boolean accept(int sourceNodeId, int targetNodeId, long relationId) { - double weight = relationshipWeights.weightOf(sourceNodeId, targetNodeId); - - if(weight > 0) { + public boolean accept(int sourceNodeId, int targetNodeId, long relationId, double weight) { + if (weight > 0) { double proportion = weight / sumOfWeights; int srcRankDelta = (int) (100_000 * (delta * proportion)); if (srcRankDelta != 0) { diff --git a/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/WeightedDegreeComputer.java b/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/WeightedDegreeComputer.java index b417ed50a..d45b62c98 100644 --- a/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/WeightedDegreeComputer.java +++ b/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/WeightedDegreeComputer.java @@ -9,15 +9,17 @@ public class WeightedDegreeComputer implements DegreeComputer { private Graph graph; + private boolean cacheWeights; - public WeightedDegreeComputer(Graph graph) { + public WeightedDegreeComputer(Graph graph, boolean cacheWeights) { this.graph = graph; + this.cacheWeights = cacheWeights; } @Override - public double[] degree(ExecutorService executor, int concurrency) { + public DegreeCache degree(ExecutorService executor, int concurrency) { WeightedDegreeCentrality degreeCentrality = new WeightedDegreeCentrality(graph, executor, concurrency, Direction.OUTGOING); - degreeCentrality.compute(); - return degreeCentrality.degrees(); + degreeCentrality.compute(cacheWeights); + return new DegreeCache(degreeCentrality.degrees(), degreeCentrality.weights()); } } diff --git a/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/WeightedPageRankVariant.java b/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/WeightedPageRankVariant.java index b26cdd1be..fa1d1b751 100644 --- a/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/WeightedPageRankVariant.java +++ b/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/WeightedPageRankVariant.java @@ -2,27 +2,49 @@ import org.neo4j.graphalgo.api.*; import org.neo4j.graphalgo.core.utils.paged.AllocationTracker; -import org.neo4j.graphalgo.impl.WeightedDegreeCentrality; -import org.neo4j.graphdb.Direction; - -import java.util.concurrent.ExecutorService; public class WeightedPageRankVariant implements PageRankVariant { - public ComputeStep createComputeStep(double dampingFactor, int[] sourceNodeIds, RelationshipIterator relationshipIterator, Degrees degrees, RelationshipWeights relationshipWeights, int partitionCount, int start, double[] aggregatedDegrees) { - return new WeightedComputeStep( - dampingFactor, - sourceNodeIds, - relationshipIterator, - degrees, - relationshipWeights, - partitionCount, - start, - aggregatedDegrees - ); + private boolean cacheWeights; + + public WeightedPageRankVariant(boolean cacheWeights) { + this.cacheWeights = cacheWeights; + } + + + public ComputeStep createComputeStep(double dampingFactor, int[] sourceNodeIds, + RelationshipIterator relationshipIterator, + WeightedRelationshipIterator weightedRelationshipIterator, + Degrees degrees, + int partitionCount, int start, + DegreeCache degreeCache) { + if(cacheWeights ){ + return new WeightedWithCachedWeightsComputeStep( + dampingFactor, + sourceNodeIds, + relationshipIterator, + degrees, + partitionCount, + start, + degreeCache + ); + } else { + return new WeightedComputeStep( + dampingFactor, + sourceNodeIds, + weightedRelationshipIterator, + degrees, + partitionCount, + start, + degreeCache + ); + } } @Override - public HugeComputeStep createHugeComputeStep(double dampingFactor, long[] sourceNodeIds, HugeRelationshipIterator relationshipIterator, HugeDegrees degrees, HugeRelationshipWeights relationshipWeights, AllocationTracker tracker, int partitionCount, long start, double[] aggregatedDegrees) { + public HugeComputeStep createHugeComputeStep(double dampingFactor, long[] sourceNodeIds, + HugeRelationshipIterator relationshipIterator, HugeDegrees degrees, + HugeRelationshipWeights relationshipWeights, AllocationTracker tracker, + int partitionCount, long start, DegreeCache aggregatedDegrees) { return new HugeWeightedComputeStep( dampingFactor, sourceNodeIds, @@ -38,6 +60,6 @@ public HugeComputeStep createHugeComputeStep(double dampingFactor, long[] source @Override public DegreeComputer degreeComputer(Graph graph) { - return new WeightedDegreeComputer(graph); + return new WeightedDegreeComputer(graph, cacheWeights); } } diff --git a/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/WeightedWithCachedWeightsComputeStep.java b/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/WeightedWithCachedWeightsComputeStep.java new file mode 100644 index 000000000..f16c9dcd7 --- /dev/null +++ b/algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/WeightedWithCachedWeightsComputeStep.java @@ -0,0 +1,84 @@ +package org.neo4j.graphalgo.impl.pagerank; + +import org.neo4j.graphalgo.api.Degrees; +import org.neo4j.graphalgo.api.RelationshipIterator; +import org.neo4j.graphalgo.api.WeightedRelationshipConsumer; +import org.neo4j.graphalgo.api.WeightedRelationshipIterator; +import org.neo4j.graphdb.Direction; + +import static org.neo4j.graphalgo.core.utils.ArrayUtil.binaryLookup; + +final class WeightedWithCachedWeightsComputeStep extends BaseComputeStep implements WeightedRelationshipConsumer { + private final double[] aggregatedDegrees; + private final WeightedRelationshipIterator relationshipIterator; + private double sumOfWeights; + private double delta; + + WeightedWithCachedWeightsComputeStep( + double dampingFactor, + int[] sourceNodeIds, + RelationshipIterator relationshipIterator, + Degrees degrees, + int partitionSize, + int startNode, + DegreeCache degreeCache) { + super(dampingFactor, + sourceNodeIds, + degrees, + partitionSize, + startNode); + this.relationshipIterator = new CachedWeightsRelationshipIterator(relationshipIterator, degreeCache.weights()); + this.aggregatedDegrees = degreeCache.aggregatedDegrees(); + } + + void singleIteration() { + int startNode = this.startNode; + int endNode = this.endNode; + WeightedRelationshipIterator rels = this.relationshipIterator; + for (int nodeId = startNode; nodeId < endNode; ++nodeId) { + delta = deltas[nodeId - startNode]; + if (delta > 0) { + int degree = degrees.degree(nodeId, Direction.OUTGOING); + if (degree > 0) { + sumOfWeights = aggregatedDegrees[nodeId]; + + rels.forEachRelationship(nodeId, Direction.OUTGOING, this); + } + } + } + } + + @Override + public boolean accept(int sourceNodeId, int targetNodeId, long relationId, double weight) { + if (weight > 0) { + double proportion = weight / sumOfWeights; + int srcRankDelta = (int) (100_000 * (delta * proportion)); + if (srcRankDelta != 0) { + int idx = binaryLookup(targetNodeId, starts); + nextScores[idx][targetNodeId - starts[idx]] += srcRankDelta; + } + } + + return true; + } + + private class CachedWeightsRelationshipIterator implements WeightedRelationshipIterator { + private final RelationshipIterator delegate; + private final double[][] weights; + + public CachedWeightsRelationshipIterator(RelationshipIterator relationshipIterator, double[][] weights) { + this.delegate = relationshipIterator; + this.weights = weights; + } + + @Override + public void forEachRelationship(int nodeId, Direction direction, WeightedRelationshipConsumer consumer) { + final int[] index = {0}; + delegate.forEachRelationship(nodeId, direction, (sourceNodeId, targetNodeId, relationId) -> { + consumer.accept(sourceNodeId, targetNodeId, -1, weights[nodeId][index[0]]); + index[0]++; + return true; + }); + } + } +} diff --git a/benchmark/src/main/java/org/neo4j/graphalgo/bench/PageRankBenchmarkLdbc.java b/benchmark/src/main/java/org/neo4j/graphalgo/bench/PageRankBenchmarkLdbc.java index d079d394a..b86bc0c68 100644 --- a/benchmark/src/main/java/org/neo4j/graphalgo/bench/PageRankBenchmarkLdbc.java +++ b/benchmark/src/main/java/org/neo4j/graphalgo/bench/PageRankBenchmarkLdbc.java @@ -42,20 +42,20 @@ @OutputTimeUnit(TimeUnit.MILLISECONDS) public class PageRankBenchmarkLdbc { - @Param({"HEAVY", "HUGE"}) -// @Param({"HEAVY"}) +// @Param({"HEAVY", "HUGE"}) + @Param({"HEAVY"}) GraphImpl graph; - @Param({"true", "false"}) -// @Param({"false"}) +// @Param({"true", "false"}) + @Param({"false"}) boolean parallel; - @Param({"L01", "L10"}) -// @Param({"L10"}) +// @Param({"L01", "L10"}) + @Param({"L01"}) String graphId; -// @Param({"20"}) - @Param({"5", "20"}) + @Param({"5"}) +// @Param({"5", "20"}) int iterations; private GraphDatabaseAPI db; diff --git a/benchmark/src/main/java/org/neo4j/graphalgo/bench/WeightedPageRankBenchmarkLdbc.java b/benchmark/src/main/java/org/neo4j/graphalgo/bench/WeightedPageRankBenchmarkLdbc.java index 4b2afad77..13ace7de9 100644 --- a/benchmark/src/main/java/org/neo4j/graphalgo/bench/WeightedPageRankBenchmarkLdbc.java +++ b/benchmark/src/main/java/org/neo4j/graphalgo/bench/WeightedPageRankBenchmarkLdbc.java @@ -46,8 +46,8 @@ public class WeightedPageRankBenchmarkLdbc { @Param({"HEAVY"}) GraphImpl graph; - @Param({"true", "false"}) -// @Param({"false"}) +// @Param({"true", "false"}) + @Param({"false"}) boolean parallel; // @Param({"L01", "L10"}) @@ -59,6 +59,9 @@ public class WeightedPageRankBenchmarkLdbc { // @Param({"5"}) int iterations; + @Param({"true", "false"}) + boolean cacheWeights; + private GraphDatabaseAPI db; private Graph grph; private int batchSize; @@ -67,20 +70,20 @@ public class WeightedPageRankBenchmarkLdbc { public void setup() throws KernelException, IOException { db = LdbcDownloader.openDb(graphId); -// Transaction tx = db.beginTx(); -// int count = 0; -// for (Relationship relationship : db.getAllRelationships()) { -// long startNodeId = relationship.getStartNodeId(); -// long endNodeId = relationship.getEndNodeId(); -// relationship.setProperty("weight", startNodeId + endNodeId % 100); -// if(++ count % 100000 == 0) { -// tx.success(); tx.close(); -// tx = db.beginTx(); -// } -// } -// -// tx.success(); -// tx.close(); + Transaction tx = db.beginTx(); + int count = 0; + for (Relationship relationship : db.getAllRelationships()) { + long startNodeId = relationship.getStartNodeId(); + long endNodeId = relationship.getEndNodeId(); + relationship.setProperty("weight", startNodeId + endNodeId % 100); + if(++ count % 100000 == 0) { + tx.success(); tx.close(); + tx = db.beginTx(); + } + } + + tx.success(); + tx.close(); grph = new GraphLoader(db, Pools.DEFAULT) .withDirection(Direction.OUTGOING) @@ -106,7 +109,8 @@ public PageRankResult run() throws Exception { LongStream.empty(), Pools.DEFAULT, Pools.getNoThreadsInDefaultPool(), - batchSize) + batchSize, + cacheWeights) .compute(iterations) .result(); } diff --git a/tests/src/test/java/org/neo4j/graphalgo/algo/PageRankProcIntegrationTest.java b/tests/src/test/java/org/neo4j/graphalgo/algo/PageRankProcIntegrationTest.java index 99ed05895..bb9ccb8e1 100644 --- a/tests/src/test/java/org/neo4j/graphalgo/algo/PageRankProcIntegrationTest.java +++ b/tests/src/test/java/org/neo4j/graphalgo/algo/PageRankProcIntegrationTest.java @@ -178,6 +178,18 @@ public void testWeightedPageRankStream() throws Exception { assertMapEquals(weightedExpected, actual); } + @Test + public void testWeightedPageRankWithCachedWeightsStream() throws Exception { + final Map actual = new HashMap<>(); + runQuery( + "CALL algo.pageRank.stream('Label1', 'TYPE1', {graph:'"+graphImpl+"', weightProperty: 'foo', cacheWeights: true}) YIELD node, score", + row -> actual.put( + row.getNode("node").getId(), + (Double) row.get("score"))); + + assertMapEquals(weightedExpected, actual); + } + @Test public void testWeightedPageRankWithAllRelationshipsEqualStream() throws Exception { final Map actual = new HashMap<>(); diff --git a/tests/src/test/java/org/neo4j/graphalgo/impl/DegreeCentralityTest.java b/tests/src/test/java/org/neo4j/graphalgo/impl/DegreeCentralityTest.java index 415c68e09..0062cf538 100644 --- a/tests/src/test/java/org/neo4j/graphalgo/impl/DegreeCentralityTest.java +++ b/tests/src/test/java/org/neo4j/graphalgo/impl/DegreeCentralityTest.java @@ -30,7 +30,6 @@ import org.neo4j.graphalgo.core.heavyweight.HeavyCypherGraphFactory; import org.neo4j.graphalgo.core.heavyweight.HeavyGraphFactory; import org.neo4j.graphalgo.core.huge.HugeGraphFactory; -import org.neo4j.graphalgo.core.neo4jview.GraphViewFactory; import org.neo4j.graphalgo.core.utils.Pools; import org.neo4j.graphdb.Direction; import org.neo4j.graphdb.Label; @@ -229,7 +228,7 @@ public void weightedOutgoingCentrality() throws Exception { } WeightedDegreeCentrality degreeCentrality = new WeightedDegreeCentrality(graph, Pools.DEFAULT, 1, Direction.OUTGOING); - degreeCentrality.compute(); + degreeCentrality.compute(false); IntStream.range(0, expected.size()).forEach(i -> { final long nodeId = graph.toOriginalNodeId(i); @@ -279,7 +278,7 @@ public void excludeNegativeWeights() throws Exception { } WeightedDegreeCentrality degreeCentrality = new WeightedDegreeCentrality(graph, Pools.DEFAULT, 1, Direction.OUTGOING); - degreeCentrality.compute(); + degreeCentrality.compute(false); IntStream.range(0, expected.size()).forEach(i -> { final long nodeId = graph.toOriginalNodeId(i); @@ -377,7 +376,7 @@ public void weightedIncomingCentrality() throws Exception { } WeightedDegreeCentrality degreeCentrality = new WeightedDegreeCentrality(graph, Pools.DEFAULT, 4, Direction.INCOMING); - degreeCentrality.compute(); + degreeCentrality.compute(false); IntStream.range(0, expected.size()).forEach(i -> { final long nodeId = graph.toOriginalNodeId(i); diff --git a/tests/src/test/java/org/neo4j/graphalgo/impl/WeightedDegreeCentralityTest.java b/tests/src/test/java/org/neo4j/graphalgo/impl/WeightedDegreeCentralityTest.java new file mode 100644 index 000000000..fa3c98d31 --- /dev/null +++ b/tests/src/test/java/org/neo4j/graphalgo/impl/WeightedDegreeCentralityTest.java @@ -0,0 +1,202 @@ +/** + * Copyright (c) 2017 "Neo4j, Inc." + * + * This file is part of Neo4j Graph Algorithms . + * + * Neo4j Graph Algorithms is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.graphalgo.impl; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.neo4j.graphalgo.TestDatabaseCreator; +import org.neo4j.graphalgo.api.Graph; +import org.neo4j.graphalgo.api.GraphFactory; +import org.neo4j.graphalgo.core.GraphLoader; +import org.neo4j.graphalgo.core.heavyweight.HeavyCypherGraphFactory; +import org.neo4j.graphalgo.core.heavyweight.HeavyGraphFactory; +import org.neo4j.graphalgo.core.huge.HugeGraphFactory; +import org.neo4j.graphalgo.core.utils.Pools; +import org.neo4j.graphdb.Direction; +import org.neo4j.graphdb.Label; +import org.neo4j.graphdb.Transaction; +import org.neo4j.kernel.internal.GraphDatabaseAPI; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.IntStream; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +@RunWith(Parameterized.class) +public final class WeightedDegreeCentralityTest { + + private Class graphImpl; + + @Parameterized.Parameters(name = "{1}") + public static Collection data() { +// return Arrays.asList( +// new Object[]{HeavyGraphFactory.class, "HeavyGraphFactory"}, +// new Object[]{HeavyCypherGraphFactory.class, "HeavyCypherGraphFactory"}, +// new Object[]{HugeGraphFactory.class, "HugeGraphFactory"}, +// new Object[]{GraphViewFactory.class, "GraphViewFactory"} +// ); + + return Arrays.asList( + new Object[]{HeavyGraphFactory.class, "HeavyGraphFactory"}, + new Object[]{HugeGraphFactory.class, "HugeGraphFactory"} + ); + } + + private static final String DB_CYPHER = "" + + "CREATE (_:Label0 {name:\"_\"})\n" + + "CREATE (a:Label1 {name:\"a\"})\n" + + "CREATE (b:Label1 {name:\"b\"})\n" + + "CREATE (c:Label1 {name:\"c\"})\n" + + "CREATE (d:Label1 {name:\"d\"})\n" + + "CREATE (e:Label1 {name:\"e\"})\n" + + "CREATE (f:Label1 {name:\"f\"})\n" + + "CREATE (g:Label1 {name:\"g\"})\n" + + "CREATE (h:Label1 {name:\"h\"})\n" + + "CREATE (i:Label1 {name:\"i\"})\n" + + "CREATE (j:Label1 {name:\"j\"})\n" + + "CREATE (k:Label2 {name:\"k\"})\n" + + "CREATE (l:Label2 {name:\"l\"})\n" + + "CREATE (m:Label2 {name:\"m\"})\n" + + "CREATE (n:Label2 {name:\"n\"})\n" + + "CREATE (o:Label2 {name:\"o\"})\n" + + "CREATE (p:Label2 {name:\"p\"})\n" + + "CREATE (q:Label2 {name:\"q\"})\n" + + "CREATE (r:Label2 {name:\"r\"})\n" + + "CREATE (s:Label2 {name:\"s\"})\n" + + "CREATE (t:Label2 {name:\"t\"})\n" + + "CREATE\n" + + " (b)-[:TYPE1 {weight: 2.0}]->(c),\n" + + " (c)-[:TYPE1 {weight: 2.0}]->(b),\n" + + + " (d)-[:TYPE1 {weight: 5.0}]->(a),\n" + + " (d)-[:TYPE1 {weight: 2.0}]->(b),\n" + + + " (e)-[:TYPE1 {weight: 2.0}]->(b),\n" + + " (e)-[:TYPE1 {weight: 7.0}]->(d),\n" + + " (e)-[:TYPE1 {weight: 1.0}]->(f),\n" + + + " (f)-[:TYPE1 {weight: 2.0}]->(b),\n" + + " (f)-[:TYPE1 {weight: 2.0}]->(e),\n" + + + " (a)-[:TYPE3 {weight: -2.0}]->(b),\n" + + + " (b)-[:TYPE3 {weight: 2.0}]->(c),\n" + + " (c)-[:TYPE3 {weight: 2.0}]->(b),\n" + + + " (d)-[:TYPE3 {weight: 2.0}]->(a),\n" + + " (d)-[:TYPE3 {weight: 2.0}]->(b),\n" + + + " (e)-[:TYPE3 {weight: 2.0}]->(b),\n" + + " (e)-[:TYPE3 {weight: 2.0}]->(d),\n" + + " (e)-[:TYPE3 {weight: 2.0}]->(f),\n" + + + " (f)-[:TYPE3 {weight: 2.0}]->(b),\n" + + " (f)-[:TYPE3 {weight: 2.0}]->(e),\n" + + + " (g)-[:TYPE2]->(b),\n" + + " (g)-[:TYPE2]->(e),\n" + + " (h)-[:TYPE2]->(b),\n" + + " (h)-[:TYPE2]->(e),\n" + + " (i)-[:TYPE2]->(b),\n" + + " (i)-[:TYPE2]->(e),\n" + + " (j)-[:TYPE2]->(e),\n" + + " (k)-[:TYPE2]->(e)\n"; + + private static GraphDatabaseAPI db; + + @BeforeClass + public static void setupGraph() { + db = TestDatabaseCreator.createTestDatabase(); + try (Transaction tx = db.beginTx()) { + db.execute(DB_CYPHER).close(); + tx.success(); + } + } + + @AfterClass + public static void shutdownGraph() throws Exception { + if (db!=null) db.shutdown(); + } + + public WeightedDegreeCentralityTest( + Class graphImpl, + String nameIgnoredOnlyForTestName) { + this.graphImpl = graphImpl; + } + + @Test + public void buildWeightsArray() throws Exception { + final Label label = Label.label("Label1"); + final Map expected = new HashMap<>(); + + try (Transaction tx = db.beginTx()) { + expected.put(db.findNode(label, "name", "a").getId(), new double[] {}); + expected.put(db.findNode(label, "name", "b").getId(), new double[] {2.0}); + expected.put(db.findNode(label, "name", "c").getId(), new double[] {2.0}); + expected.put(db.findNode(label, "name", "d").getId(), new double[] {5.0,2.0}); + expected.put(db.findNode(label, "name", "e").getId(), new double[] {2.0,7.0,1.0}); + expected.put(db.findNode(label, "name", "f").getId(), new double[] {2.0,2.0}); + expected.put(db.findNode(label, "name", "g").getId(), new double[] {}); + expected.put(db.findNode(label, "name", "h").getId(), new double[] {}); + expected.put(db.findNode(label, "name", "i").getId(), new double[] {}); + expected.put(db.findNode(label, "name", "j").getId(), new double[] {}); + tx.close(); + } + + final Graph graph; + if (graphImpl.isAssignableFrom(HeavyCypherGraphFactory.class)) { + graph = new GraphLoader(db) + .withLabel("MATCH (n:Label1) RETURN id(n) as id") + .withRelationshipType("MATCH (n:Label1)-[type:TYPE1]->(m:Label1) RETURN id(n) as source,id(m) as target, type.weight AS weight") + .withOptionalRelationshipWeightsFromProperty("weight", 1.0) + .load(graphImpl); + + } else { + graph = new GraphLoader(db) + .withLabel(label) + .withRelationshipType("TYPE1") + .withDirection(Direction.OUTGOING) + .withOptionalRelationshipWeightsFromProperty("weight", 1.0) + .withSort(true) + .load(graphImpl); + } + + WeightedDegreeCentrality degreeCentrality = new WeightedDegreeCentrality(graph, Pools.DEFAULT, 1, Direction.OUTGOING); + degreeCentrality.compute(false); + + IntStream.range(0, expected.size()).forEach(i -> { + final long nodeId = graph.toOriginalNodeId(i); + assertArrayEquals( + "Node#" + nodeId, + expected.get(nodeId), + degreeCentrality.weights()[i], + 0.01 + + ); + }); + } + +}