Skip to content
This repository was archived by the owner on Apr 22, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions algo/src/main/java/org/neo4j/graphalgo/PageRankProc.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.neo4j.graphalgo.impl.PageRankAlgorithm;
import org.neo4j.graphalgo.results.PageRankScore;
import org.neo4j.graphdb.Direction;
import org.neo4j.graphdb.Node;
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.logging.Log;
Expand All @@ -42,6 +43,8 @@
import org.neo4j.procedure.Name;
import org.neo4j.procedure.Procedure;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
Expand Down Expand Up @@ -104,7 +107,7 @@ public Stream<PageRankScore> pageRankStream(
@Name(value = "relationship", defaultValue = "") String relationship,
@Name(value = "config", defaultValue = "{}") Map<String, Object> config) {

ProcedureConfiguration configuration = ProcedureConfiguration.create(config);
ProcedureConfiguration configuration = ProcedureConfiguration.create(config);

PageRankScore.Stats.Builder statsBuilder = new PageRankScore.Stats.Builder();
AllocationTracker tracker = AllocationTracker.create();
Expand Down Expand Up @@ -150,13 +153,19 @@ private Graph load(
AllocationTracker tracker,
Class<? extends GraphFactory> graphFactory,
PageRankScore.Stats.Builder statsBuilder, ProcedureConfiguration configuration) {

GraphLoader graphLoader = new GraphLoader(api, Pools.DEFAULT)
.init(log, label, relationship, configuration)
.withAllocationTracker(tracker)
.withDirection(Direction.OUTGOING)
.withoutRelationshipWeights();

Direction direction = configuration.getDirection(Direction.OUTGOING);
if (direction == Direction.BOTH) {
graphLoader.asUndirected(true);
} else {
graphLoader.withDirection(direction);
}


try (ProgressTimer timer = statsBuilder.timeLoad()) {
Graph graph = graphLoader.load(graphFactory);
statsBuilder.withNodes(graph.nodeCount());
Expand All @@ -177,10 +186,14 @@ private PageRankResult evaluate(
final int concurrency = configuration.getConcurrency(Pools.getNoThreadsInDefaultPool());
log.debug("Computing page rank with damping of " + dampingFactor + " and " + iterations + " iterations.");


List<Node> sourceNodes = configuration.get("sourceNodes", new ArrayList<>());
LongStream sourceNodeIds = sourceNodes.stream().mapToLong(Node::getId);
PageRankAlgorithm prAlgo = PageRankAlgorithm.of(
tracker,
graph,
dampingFactor,
sourceNodeIds,
Pools.DEFAULT,
concurrency,
batchSize);
Expand All @@ -189,6 +202,7 @@ private PageRankResult evaluate(
.withLog(log)
.withTerminationFlag(terminationFlag);


statsBuilder.timeEval(() -> prAlgo.compute(iterations));

statsBuilder
Expand Down
70 changes: 40 additions & 30 deletions algo/src/main/java/org/neo4j/graphalgo/impl/HugePageRank.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,7 @@
import com.carrotsearch.hppc.IntArrayList;
import com.carrotsearch.hppc.LongArrayList;
import org.neo4j.collection.primitive.PrimitiveLongIterator;
import org.neo4j.graphalgo.api.HugeDegrees;
import org.neo4j.graphalgo.api.HugeIdMapping;
import org.neo4j.graphalgo.api.HugeNodeIterator;
import org.neo4j.graphalgo.api.HugeRelationshipConsumer;
import org.neo4j.graphalgo.api.HugeRelationshipIterator;
import org.neo4j.graphalgo.api.*;
import org.neo4j.graphalgo.core.utils.ParallelUtil;
import org.neo4j.graphalgo.core.utils.paged.AllocationTracker;
import org.neo4j.graphalgo.core.write.Exporter;
Expand All @@ -39,14 +35,11 @@
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.stream.LongStream;

import static org.neo4j.graphalgo.core.utils.ArrayUtil.binaryLookup;
import static org.neo4j.graphalgo.core.utils.paged.AllocationTracker.humanReadable;
import static org.neo4j.graphalgo.core.utils.paged.MemoryUsage.shallowSizeOfInstance;
import static org.neo4j.graphalgo.core.utils.paged.MemoryUsage.sizeOfDoubleArray;
import static org.neo4j.graphalgo.core.utils.paged.MemoryUsage.sizeOfIntArray;
import static org.neo4j.graphalgo.core.utils.paged.MemoryUsage.sizeOfLongArray;
import static org.neo4j.graphalgo.core.utils.paged.MemoryUsage.sizeOfObjectArray;
import static org.neo4j.graphalgo.core.utils.paged.MemoryUsage.*;


/**
Expand Down Expand Up @@ -111,6 +104,8 @@ public class HugePageRank extends Algorithm<HugePageRank> implements PageRankAlg
private final HugeRelationshipIterator relationshipIterator;
private final HugeDegrees degrees;
private final double dampingFactor;
private final HugeGraph graph;
private LongStream sourceNodeIds;

private Log log;
private ComputeSteps computeSteps;
Expand All @@ -121,21 +116,17 @@ public class HugePageRank extends Algorithm<HugePageRank> implements PageRankAlg
*/
HugePageRank(
AllocationTracker tracker,
HugeIdMapping idMapping,
HugeNodeIterator nodeIterator,
HugeRelationshipIterator relationshipIterator,
HugeDegrees degrees,
double dampingFactor) {
HugeGraph graph,
double dampingFactor,
LongStream sourceNodeIds) {
this(
null,
-1,
ParallelUtil.DEFAULT_BATCH_SIZE,
tracker,
idMapping,
nodeIterator,
relationshipIterator,
degrees,
dampingFactor);
graph,
dampingFactor,
sourceNodeIds);
}

/**
Expand All @@ -148,20 +139,20 @@ public class HugePageRank extends Algorithm<HugePageRank> implements PageRankAlg
int concurrency,
int batchSize,
AllocationTracker tracker,
HugeIdMapping idMapping,
HugeNodeIterator nodeIterator,
HugeRelationshipIterator relationshipIterator,
HugeDegrees degrees,
double dampingFactor) {
HugeGraph graph,
double dampingFactor,
LongStream sourceNodeIds) {
this.executor = executor;
this.concurrency = concurrency;
this.batchSize = batchSize;
this.tracker = tracker;
this.idMapping = idMapping;
this.nodeIterator = nodeIterator;
this.relationshipIterator = relationshipIterator;
this.degrees = degrees;
this.idMapping = graph;
this.nodeIterator = graph;
this.relationshipIterator = graph;
this.degrees = graph;
this.graph = graph;
this.dampingFactor = dampingFactor;
this.sourceNodeIds = sourceNodeIds;
}

/**
Expand Down Expand Up @@ -209,6 +200,7 @@ private void initializeSteps() {
concurrency,
idMapping.nodeCount(),
dampingFactor,
sourceNodeIds.map(graph::toHugeMappedNodeId).filter(mappedId -> mappedId != -1L).toArray(),
relationshipIterator,
degrees,
partitions,
Expand Down Expand Up @@ -246,6 +238,7 @@ private ComputeSteps createComputeSteps(
int concurrency,
long nodeCount,
double dampingFactor,
long[] sourceNodeIds,
HugeRelationshipIterator relationshipIterator,
HugeDegrees degrees,
List<Partition> partitions,
Expand Down Expand Up @@ -281,6 +274,7 @@ private ComputeSteps createComputeSteps(

computeSteps.add(new ComputeStep(
dampingFactor,
sourceNodeIds,
relationshipIterator,
degrees,
tracker,
Expand Down Expand Up @@ -542,6 +536,7 @@ private static final class ComputeStep implements Runnable, HugeRelationshipCons

private long[] starts;
private int[] lengths;
private long[] sourceNodeIds;
private final HugeRelationshipIterator relationshipIterator;
private final HugeDegrees degrees;
private final AllocationTracker tracker;
Expand All @@ -562,13 +557,15 @@ private static final class ComputeStep implements Runnable, HugeRelationshipCons

ComputeStep(
double dampingFactor,
long[] sourceNodeIds,
HugeRelationshipIterator relationshipIterator,
HugeDegrees degrees,
AllocationTracker tracker,
int partitionSize,
long startNode) {
this.dampingFactor = dampingFactor;
this.alpha = 1.0 - dampingFactor;
this.sourceNodeIds = sourceNodeIds;
this.relationshipIterator = relationshipIterator.concurrentCopy();
this.degrees = degrees;
this.tracker = tracker;
Expand Down Expand Up @@ -606,8 +603,21 @@ private void initialize() {
});

tracker.add(sizeOfDoubleArray(partitionSize) << 1);

double[] partitionRank = new double[partitionSize];
Arrays.fill(partitionRank, alpha);
if(sourceNodeIds.length == 0) {
Arrays.fill(partitionRank, alpha);
} else {
Arrays.fill(partitionRank,0);

long[] partitionSourceNodeIds = LongStream.of(sourceNodeIds)
.filter(sourceNodeId -> sourceNodeId >= startNode && sourceNodeId <= endNode)
.toArray();

for (long sourceNodeId : partitionSourceNodeIds) {
partitionRank[Math.toIntExact(sourceNodeId - this.startNode)] = alpha;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@knutwalker is this the correct way to find the place in the array to update?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mneedham yes, that's fine; we usually have (int) (nodeId - startNode) for local nodes or (int) (nodeId - starts[partitionIndex]) for nodes in a different partition.

}
}

this.pageRank = partitionRank;
this.deltas = Arrays.copyOf(partitionRank, partitionSize);
Expand Down
Loading