Skip to content
This repository was archived by the owner on Apr 22, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion algo/src/main/java/org/neo4j/graphalgo/PageRankProc.java
Original file line number Diff line number Diff line change
Expand Up @@ -201,14 +201,16 @@ private PageRankResult evaluate(

PageRankAlgorithm prAlgo;
if(weightPropertyKey != null) {
final boolean cacheWeights = configuration.get("cacheWeights", false);
prAlgo = PageRankAlgorithm.weightedOf(
tracker,
graph,
dampingFactor,
sourceNodeIds,
Pools.DEFAULT,
concurrency,
batchSize);
batchSize,
cacheWeights);
} else {
prAlgo = PageRankAlgorithm.of(
tracker,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
package org.neo4j.graphalgo.impl;

import org.neo4j.graphalgo.api.Graph;
import org.neo4j.graphalgo.api.WeightedRelationshipConsumer;
import org.neo4j.graphalgo.core.utils.ParallelUtil;
import org.neo4j.graphalgo.core.utils.Pools;
import org.neo4j.graphalgo.impl.pagerank.HugeComputeStep;
import org.neo4j.graphdb.Direction;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import java.util.stream.Stream;
Expand All @@ -22,7 +19,9 @@ public class WeightedDegreeCentrality extends Algorithm<WeightedDegreeCentrality
private final ExecutorService executor;
private final int concurrency;
private volatile AtomicInteger nodeQueue = new AtomicInteger();

private double[] degrees;
private double[][] weights;

public WeightedDegreeCentrality(
Graph graph,
Expand All @@ -40,14 +39,19 @@ public WeightedDegreeCentrality(
nodeCount = Math.toIntExact(graph.nodeCount());
this.direction = direction;
degrees = new double[nodeCount];
weights = new double[nodeCount][];
}

public WeightedDegreeCentrality compute() {
public WeightedDegreeCentrality compute(boolean cacheWeights) {
nodeQueue.set(0);

List<DegreeTask> tasks = new ArrayList<>();
List<Runnable> tasks = new ArrayList<>();
for (int i = 0; i < concurrency; i++) {
tasks.add(new DegreeTask());
if(cacheWeights) {
tasks.add(new DegreeAndWeightsTask());
} else {
tasks.add(new DegreeTask());
}
}
ParallelUtil.runWithConcurrency(concurrency, tasks, executor);

Expand Down Expand Up @@ -79,6 +83,36 @@ public void run() {
if(weight > 0) {
weightedDegree[0] += weight;
}

return true;
});

degrees[nodeId] = weightedDegree[0];

}
}
}

private class DegreeAndWeightsTask implements Runnable {
@Override
public void run() {
for (; ; ) {
final int nodeId = nodeQueue.getAndIncrement();
if (nodeId >= nodeCount || !running()) {
return;
}

weights[nodeId] = new double[graph.degree(nodeId, direction)];

int[] index = {0};
double[] weightedDegree = new double[1];
graph.forEachRelationship(nodeId, direction, (sourceNodeId, targetNodeId, relationId, weight) -> {
if(weight > 0) {
weightedDegree[0] += weight;
}

weights[nodeId][index[0]] = weight;
index[0]++;
return true;
});

Expand All @@ -91,6 +125,9 @@ public void run() {
public double[] degrees() {
return degrees;
}
public double[][] weights() {
return weights;
}

public Stream<DegreeCentrality.Result> resultStream() {
return IntStream.range(0, nodeCount)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
package org.neo4j.graphalgo.impl.pagerank;

import org.neo4j.graphalgo.api.Degrees;
import org.neo4j.graphalgo.api.RelationshipIterator;
import org.neo4j.graphalgo.api.RelationshipWeights;
import org.neo4j.graphdb.Direction;

import java.util.Arrays;
import java.util.stream.IntStream;

import static org.neo4j.graphalgo.core.utils.ArrayUtil.binaryLookup;

public abstract class BaseComputeStep implements ComputeStep {
private static final int S_INIT = 0;
private static final int S_CALC = 1;
Expand All @@ -20,7 +15,6 @@ public abstract class BaseComputeStep implements ComputeStep {
int[] starts;
private int[] lengths;
private int[] sourceNodeIds;
final RelationshipIterator relationshipIterator;
final Degrees degrees;

private final double alpha;
Expand All @@ -38,14 +32,12 @@ public abstract class BaseComputeStep implements ComputeStep {
BaseComputeStep(
double dampingFactor,
int[] sourceNodeIds,
RelationshipIterator relationshipIterator,
Degrees degrees,
int partitionSize,
int startNode) {
this.dampingFactor = dampingFactor;
this.alpha = 1.0 - dampingFactor;
this.sourceNodeIds = sourceNodeIds;
this.relationshipIterator = relationshipIterator;
this.degrees = degrees;
this.partitionSize = partitionSize;
this.startNode = startNode;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.neo4j.graphalgo.impl.pagerank;

public class DegreeCache {

public final static DegreeCache EMPTY = new DegreeCache(new double[0], new double[0][0]);

private double[] aggregatedDegrees;
private double[][] weights;

public DegreeCache(double[] aggregatedDegrees, double[][] weights) {
this.aggregatedDegrees = aggregatedDegrees;
this.weights = weights;
}

double[] aggregatedDegrees() {
return aggregatedDegrees;
}

double[][] weights() {
return weights;
}

boolean hasCachedValues() {
return weights.length > 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
import java.util.concurrent.ExecutorService;

public interface DegreeComputer {
double[] degree(ExecutorService executor, int concurrency);
DegreeCache degree(ExecutorService executor, int concurrency);
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,9 +205,7 @@ private void initializeSteps() {
? this.executor : null;

WeightedDegreeCentrality degreeCentrality = new WeightedDegreeCentrality(graph, executor, concurrency, Direction.OUTGOING);
degreeCentrality.compute();

DegreeComputer degreeComputer = pageRankVariant.degreeComputer(graph);
degreeCentrality.compute(false);

computeSteps = createComputeSteps(
concurrency,
Expand Down Expand Up @@ -270,7 +268,7 @@ private ComputeSteps createComputeSteps(
Iterator<Partition> parts = partitions.iterator();

DegreeComputer degreeComputer = pageRankVariant.degreeComputer(graph);
double[] aggregatedDegrees = degreeComputer.degree(pool, concurrency);
DegreeCache degreeCache = degreeComputer.degree(pool, concurrency);

while (parts.hasNext()) {
Partition partition = parts.next();
Expand All @@ -296,7 +294,7 @@ private ComputeSteps createComputeSteps(
tracker,
partitionCount,
start,
aggregatedDegrees));
degreeCache));
}

long[] startArray = starts.toArray();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ public class HugeWeightedComputeStep extends HugeBaseComputeStep implements Huge
HugeRelationshipWeights relationshipWeights,
AllocationTracker tracker,
int partitionSize,
long startNode, double[] aggregatedDegrees) {
long startNode,
DegreeCache degreeCache) {
super(dampingFactor,
sourceNodeIds,
relationshipIterator,
Expand All @@ -32,7 +33,7 @@ public class HugeWeightedComputeStep extends HugeBaseComputeStep implements Huge
partitionSize,
startNode);
this.relationshipWeights = relationshipWeights;
this.aggregatedDegrees = aggregatedDegrees;
this.aggregatedDegrees = degreeCache.aggregatedDegrees();
}

void singleIteration() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
public class NoOpDegreeComputer implements DegreeComputer {

@Override
public double[] degree(ExecutorService executor, int concurrency) {
return new double[0];
public DegreeCache degree(ExecutorService executor, int concurrency) {
return DegreeCache.EMPTY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

final class NonWeightedComputeStep extends BaseComputeStep implements RelationshipConsumer {

private final RelationshipIterator relationshipIterator;

NonWeightedComputeStep(
double dampingFactor,
Expand All @@ -17,7 +18,8 @@ final class NonWeightedComputeStep extends BaseComputeStep implements Relationsh
Degrees degrees,
int partitionSize,
int startNode) {
super(dampingFactor, sourceNodeIds, relationshipIterator, degrees, partitionSize, startNode);
super(dampingFactor, sourceNodeIds, degrees, partitionSize, startNode);
this.relationshipIterator = relationshipIterator;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
import org.neo4j.graphalgo.api.*;
import org.neo4j.graphalgo.core.utils.paged.AllocationTracker;

import java.util.concurrent.ExecutorService;

public class NonWeightedPageRankVariant implements PageRankVariant {
public ComputeStep createComputeStep(double dampingFactor, int[] sourceNodeIds, RelationshipIterator relationshipIterator, Degrees degrees, RelationshipWeights relationshipWeights, int partitionCount, int start, double[] aggregatedDegrees) {
public ComputeStep createComputeStep(double dampingFactor, int[] sourceNodeIds,
RelationshipIterator relationshipIterator,
WeightedRelationshipIterator weightedRelationshipIterator,
Degrees degrees,
int partitionCount, int start,
DegreeCache degreeCache) {
return new NonWeightedComputeStep(
dampingFactor,
sourceNodeIds,
Expand All @@ -18,7 +21,7 @@ public ComputeStep createComputeStep(double dampingFactor, int[] sourceNodeIds,
}

@Override
public HugeNonWeightedComputeStep createHugeComputeStep(double dampingFactor, long[] sourceNodeIds, HugeRelationshipIterator relationshipIterator, HugeDegrees degrees, HugeRelationshipWeights relationshipWeights, AllocationTracker tracker, int partitionCount, long start, double[] aggregatedDegrees) {
public HugeNonWeightedComputeStep createHugeComputeStep(double dampingFactor, long[] sourceNodeIds, HugeRelationshipIterator relationshipIterator, HugeDegrees degrees, HugeRelationshipWeights relationshipWeights, AllocationTracker tracker, int partitionCount, long start, DegreeCache aggregatedDegrees) {
return new HugeNonWeightedComputeStep(
dampingFactor,
sourceNodeIds,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ public class PageRank extends Algorithm<PageRank> implements PageRankAlgorithm {
graph,
dampingFactor,
sourceNodeIds,
pageRankVariant);
pageRankVariant
);
}

/**
Expand Down Expand Up @@ -218,8 +219,8 @@ private ComputeSteps createComputeSteps(
double dampingFactor,
int[] sourceNodeIds,
RelationshipIterator relationshipIterator,
WeightedRelationshipIterator weightedRelationshipIterator,
Degrees degrees,
RelationshipWeights relationshipWeights,
List<Partition> partitions,
ExecutorService pool,
PageRankVariant pageRankVariant,
Expand All @@ -238,7 +239,7 @@ private ComputeSteps createComputeSteps(
partitions.size());
Iterator<Partition> parts = partitions.iterator();

double[] aggregatedDegrees = degreeComputer.degree(pool, concurrency);
DegreeCache degreeCache = degreeComputer.degree(pool, concurrency);

while (parts.hasNext()) {
Partition partition = parts.next();
Expand All @@ -256,11 +257,11 @@ private ComputeSteps createComputeSteps(
dampingFactor,
sourceNodeIds,
relationshipIterator,
weightedRelationshipIterator,
degrees,
relationshipWeights,
partitionCount,
start,
aggregatedDegrees
degreeCache
));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,18 @@ public interface PageRankAlgorithm {
static PageRankAlgorithm weightedOf(
Graph graph,
double dampingFactor,
LongStream sourceNodeIds) {
return weightedOf(AllocationTracker.EMPTY, dampingFactor, sourceNodeIds, graph);
LongStream sourceNodeIds
) {
return weightedOf(AllocationTracker.EMPTY, dampingFactor, sourceNodeIds, graph, false);
}

static PageRankAlgorithm weightedOf(
AllocationTracker tracker,
double dampingFactor,
LongStream sourceNodeIds,
Graph graph) {
WeightedPageRankVariant computeStepFactory = new WeightedPageRankVariant();
Graph graph,
boolean cacheWeights) {
WeightedPageRankVariant computeStepFactory = new WeightedPageRankVariant(cacheWeights);
if (graph instanceof HugeGraph) {
HugeGraph huge = (HugeGraph) graph;
return new HugePageRank(tracker, huge, dampingFactor, sourceNodeIds, computeStepFactory);
Expand Down Expand Up @@ -127,8 +129,9 @@ static PageRankAlgorithm weightedOf(
LongStream sourceNodeIds,
ExecutorService pool,
int concurrency,
int batchSize) {
WeightedPageRankVariant computeStepFactory = new WeightedPageRankVariant();
int batchSize,
boolean cacheWeights) {
WeightedPageRankVariant pageRankVariant = new WeightedPageRankVariant(cacheWeights);
if (graph instanceof HugeGraph) {
HugeGraph huge = (HugeGraph) graph;
return new HugePageRank(
Expand All @@ -139,7 +142,7 @@ static PageRankAlgorithm weightedOf(
huge,
dampingFactor,
sourceNodeIds,
computeStepFactory
pageRankVariant
);
}

Expand All @@ -150,6 +153,6 @@ static PageRankAlgorithm weightedOf(
graph,
dampingFactor,
sourceNodeIds,
computeStepFactory);
pageRankVariant);
}
}
Loading