Skip to content
This repository was archived by the owner on Apr 22, 2020. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 106 additions & 65 deletions core/src/main/java/org/neo4j/graphalgo/core/huge/HugeGraphImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.neo4j.graphalgo.core.utils.paged.ByteArray;
import org.neo4j.graphalgo.core.utils.paged.HugeLongArray;
import org.neo4j.graphdb.Direction;
import org.neo4j.internal.kernel.api.CursorFactory;
import org.neo4j.internal.kernel.api.NodeCursor;

import java.util.Collection;
import java.util.function.LongPredicate;
Expand All @@ -47,7 +49,7 @@
* <code>degree</code> ~ <code>targetId</code><sub><code>1</code></sub> ~ <code>targetId</code><sub><code>2</code></sub> ~ <code>targetId</code><sub><code>n</code></sub>
* </blockquote>
* The {@code degree} is stored as a fill-sized 4 byte long {@code int}
* (the neo kernel api returns an int for {@link org.neo4j.kernel.api.ReadOperations#nodeGetDegree(long, Direction)}).
* (the neo kernel api returns an int for {@link org.neo4j.internal.kernel.api.helpers.Nodes#countAll(NodeCursor, CursorFactory)}).
* Every target ID is first sorted, then delta encoded, and finally written as variable-length vlongs.
* The delta encoding does not write the actual value but only the difference to the previous value, which plays very nice with the vlong encoding.
* <p>
Expand Down Expand Up @@ -139,23 +141,7 @@ public double weightOf(final long sourceNodeId, final long targetNodeId) {

@Override
public void forEachRelationship(long nodeId, Direction direction, HugeRelationshipConsumer consumer) {
switch (direction) {
case INCOMING:
forEachIncoming(nodeId, consumer);
return;

case OUTGOING:
forEachOutgoing(nodeId, consumer);
return;

case BOTH:
forEachOutgoing(nodeId, consumer);
forEachIncoming(nodeId, consumer);
return;

default:
throw new IllegalArgumentException(direction + "");
}
runForEach(nodeId, direction, consumer, /* reuseCursor */ true);
}

@Override
Expand All @@ -169,13 +155,9 @@ public void forEachRelationship(int nodeId, Direction direction, RelationshipCon
forEachOutgoing(nodeId, consumer);
return;

case BOTH:
default:
forEachOutgoing(nodeId, consumer);
forEachIncoming(nodeId, consumer);
return;

default:
throw new IllegalArgumentException(direction + "");
}
}

Expand All @@ -190,13 +172,9 @@ public void forEachRelationship(int nodeId, Direction direction, WeightedRelatio
forEachOutgoing(nodeId, consumer);
return;

case BOTH:
default:
forEachOutgoing(nodeId, consumer);
forEachIncoming(nodeId, consumer);
return;

default:
throw new IllegalArgumentException(direction + "");
}
}

Expand Down Expand Up @@ -239,40 +217,50 @@ public boolean contains(final long nodeId) {

@Override
public void forEachIncoming(long node, final HugeRelationshipConsumer consumer) {
forEachIncoming(node, inCache, consumer);
runForEach(node, Direction.INCOMING, consumer, /* reuseCursor */ true);
}

@Override
public void forEachIncoming(int nodeId, RelationshipConsumer consumer) {
forEachIncoming((long) nodeId, inAdjacency.newCursor(), toHugeInConsumer(consumer));
runForEach(
Integer.toUnsignedLong(nodeId),
Direction.INCOMING,
toHugeInConsumer(consumer),
/* reuseCursor */ false
);
}

public void forEachIncoming(int nodeId, WeightedRelationshipConsumer consumer) {
forEachIncoming((long) nodeId, inAdjacency.newCursor(), toHugeInConsumer(consumer));
}

private void forEachIncoming(long node, ByteArray.DeltaCursor newCursor, final HugeRelationshipConsumer consumer) {
ByteArray.DeltaCursor cursor = cursor(node, newCursor, inOffsets, inAdjacency);
consumeNodes(node, cursor, consumer);
runForEach(
Integer.toUnsignedLong(nodeId),
Direction.INCOMING,
toHugeInConsumer(consumer),
/* reuseCursor */ false
);
}

@Override
public void forEachOutgoing(long node, final HugeRelationshipConsumer consumer) {
forEachOutgoing(node, outCache, consumer);
runForEach(node, Direction.OUTGOING, consumer, /* reuseCursor */ true);
}

@Override
public void forEachOutgoing(int nodeId, RelationshipConsumer consumer) {
forEachOutgoing((long) nodeId, outAdjacency.newCursor(), toHugeOutConsumer(consumer));
runForEach(
Integer.toUnsignedLong(nodeId),
Direction.OUTGOING,
toHugeOutConsumer(consumer),
/* reuseCursor */ false
);
}

public void forEachOutgoing(int nodeId, WeightedRelationshipConsumer consumer) {
forEachOutgoing((long) nodeId, outAdjacency.newCursor(), toHugeOutConsumer(consumer));
}

private void forEachOutgoing(long node, ByteArray.DeltaCursor newCursor, final HugeRelationshipConsumer consumer) {
ByteArray.DeltaCursor cursor = cursor(node, newCursor, outOffsets, outAdjacency);
consumeNodes(node, cursor, consumer);
runForEach(
Integer.toUnsignedLong(nodeId),
Direction.OUTGOING,
toHugeOutConsumer(consumer),
/* reuseCursor */ false
);
}

@Override
Expand All @@ -293,48 +281,101 @@ public HugeRelationshipIntersect intersectionCopy() {
return new HugeGraphIntersectImpl(outAdjacency, outOffsets);
}

/*
/**
* O(n) !
*/
@Override
public boolean exists(int sourceNodeId, int targetNodeId, Direction direction) {
return exists(
Integer.toUnsignedLong(sourceNodeId),
Integer.toUnsignedLong(targetNodeId),
direction,
// Graph interface should be thread-safe
false
);
}

/**
* O(n) !
*/
@Override
public boolean exists(long sourceNodeId, long targetNodeId, Direction direction) {
return exists(
sourceNodeId,
targetNodeId,
direction,
// HugeGraph interface make no promises about thread-safety (that's what concurrentCopy is for)
true
);
}

private boolean exists(long sourceNodeId, long targetNodeId, Direction direction, boolean reuseCursor) {
ExistsConsumer consumer = new ExistsConsumer(targetNodeId);
switch (direction) {
case OUTGOING:
forEachOutgoing(sourceNodeId, consumer);
case INCOMING:
forEachIncoming(sourceNodeId, consumer);
default:
forEachRelationship(sourceNodeId, Direction.BOTH, consumer);
}
runForEach(sourceNodeId, direction, consumer, reuseCursor);
return consumer.found;
}

@Override
public int getTarget(int nodeId, int index, Direction direction) {
return Math.toIntExact(getTarget(Integer.toUnsignedLong(nodeId), Integer.toUnsignedLong(index), direction));
return Math.toIntExact(getTarget(
Integer.toUnsignedLong(nodeId),
Integer.toUnsignedLong(index),
direction,
// Graph interface should be thread-safe
false
));
}

/*
* O(n) !
*/
@Override
public long getTarget(long sourceNodeId, long index, Direction direction) {
return getTarget(
sourceNodeId,
index,
direction,
// HugeGraph interface make no promises about thread-safety (that's what concurrentCopy is for)
true
);
}

private long getTarget(long sourceNodeId, long index, Direction direction, boolean reuseCursor) {
GetTargetConsumer consumer = new GetTargetConsumer(index);
switch (direction) {
case OUTGOING:
forEachOutgoing(sourceNodeId, consumer);
case INCOMING:
forEachIncoming(sourceNodeId, consumer);
default:
forEachRelationship(sourceNodeId, Direction.BOTH, consumer);
}
runForEach(sourceNodeId, direction, consumer, reuseCursor);
return consumer.target;
}

@Override
public boolean exists(int sourceNodeId, int targetNodeId, Direction direction) {
return exists(Integer.toUnsignedLong(sourceNodeId), Integer.toUnsignedLong(targetNodeId), direction);
private void runForEach(
long sourceNodeId, Direction direction,
HugeRelationshipConsumer consumer,
boolean reuseCursor) {
if (direction == Direction.BOTH) {
runForEach(sourceNodeId, Direction.OUTGOING, consumer, reuseCursor);
runForEach(sourceNodeId, Direction.INCOMING, consumer, reuseCursor);
return;
}
ByteArray.DeltaCursor cursor = forEachCursor(sourceNodeId, direction, reuseCursor);
consumeNodes(sourceNodeId, cursor, consumer);
}

private ByteArray.DeltaCursor forEachCursor(
long sourceNodeId,
Direction direction,
boolean reuseCursor) {
if (direction == Direction.OUTGOING) {
return cursor(
sourceNodeId,
reuseCursor ? outCache : outAdjacency.newCursor(),
outOffsets,
outAdjacency);
} else {
return cursor(
sourceNodeId,
reuseCursor ? inCache : inAdjacency.newCursor(),
inOffsets,
inAdjacency);
}
}

@Override
Expand Down