-
Notifications
You must be signed in to change notification settings - Fork 2.3k
/
RelationshipLinkStep.java
174 lines (160 loc) · 6.3 KB
/
RelationshipLinkStep.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
/*
* Copyright (c) 2002-2018 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.unsafe.impl.batchimport;
import java.util.function.Predicate;
import org.neo4j.kernel.impl.store.record.RelationshipRecord;
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache;
import org.neo4j.unsafe.impl.batchimport.cache.NodeType;
import org.neo4j.unsafe.impl.batchimport.staging.ForkedProcessorStep;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;
import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider;
/**
* Links relationship chains together, the "prev" pointers of them. "next" pointers are set when
* initially creating the relationship records. Setting prev pointers at that time would incur
* random access and so that is done here separately with help from {@link NodeRelationshipCache}.
*/
public abstract class RelationshipLinkStep extends ForkedProcessorStep<RelationshipRecord[]>
{
protected final NodeRelationshipCache cache;
private final int nodeTypes;
private final Predicate<RelationshipRecord> filter;
private final boolean forwards;
private final RelationshipLinkingProgress progress;
public RelationshipLinkStep( StageControl control, Configuration config,
NodeRelationshipCache cache, Predicate<RelationshipRecord> filter, int nodeTypes, boolean forwards,
StatsProvider... additionalStatsProvider )
{
super( control, "LINK", config, additionalStatsProvider );
this.cache = cache;
this.filter = filter;
this.nodeTypes = nodeTypes;
this.forwards = forwards;
this.progress = findLinkingProgressStatsProvider();
}
/**
* There should be a {@link RelationshipLinkingProgress} injected from the outside to better keep track of global
* progress of relationship linking even when linking in multiple passes.
*/
private RelationshipLinkingProgress findLinkingProgressStatsProvider()
{
for ( StatsProvider provider : additionalStatsProvider )
{
if ( provider instanceof RelationshipLinkingProgress )
{
return (RelationshipLinkingProgress) provider;
}
}
return new RelationshipLinkingProgress();
}
@Override
protected void forkedProcess( int id, int processors, RelationshipRecord[] batch )
{
int stride = forwards ? 1 : -1;
int start = forwards ? 0 : batch.length - 1;
int end = forwards ? batch.length : -1;
int localChangeCount = 0;
for ( int i = start; i != end; i += stride )
{
RelationshipRecord item = batch[i];
if ( item != null && item.inUse() )
{
int changeCount = process( item, id, processors );
if ( changeCount == -1 )
{
// No change for this record, it's OK, all the processors will reach the same conclusion
batch[i].setInUse( false );
}
else
{
localChangeCount += changeCount;
}
}
}
progress.add( localChangeCount );
}
public int process( RelationshipRecord record, int id, int processors )
{
long startNode = record.getFirstNode();
long endNode = record.getSecondNode();
boolean processFirst = startNode % processors == id;
boolean processSecond = endNode % processors == id;
int changeCount = 0;
if ( !processFirst && !processSecond )
{
// We won't process this relationship, but we cannot return false because that means
// that it won't even be updated. Arriving here merely means that this thread won't process
// this record at all and so we won't even have to ask cache about dense or not (which is costly)
return changeCount;
}
boolean firstIsDense = cache.isDense( startNode );
boolean changed = false;
boolean isLoop = startNode == endNode;
if ( isLoop )
{
// Both start/end node
if ( shouldChange( firstIsDense, record ) )
{
if ( processFirst )
{
linkLoop( record );
changeCount += 2;
}
changed = true;
}
}
else
{
// Start node
if ( shouldChange( firstIsDense, record ) )
{
if ( processFirst )
{
linkStart( record );
changeCount++;
}
changed = true;
}
// End node
boolean secondIsDense = cache.isDense( endNode );
if ( shouldChange( secondIsDense, record ) )
{
if ( processSecond )
{
linkEnd( record );
changeCount++;
}
changed = true;
}
}
return changed ? changeCount : -1;
}
protected abstract void linkStart( RelationshipRecord record );
protected abstract void linkEnd( RelationshipRecord record );
protected abstract void linkLoop( RelationshipRecord record );
private boolean shouldChange( boolean isDense, RelationshipRecord record )
{
if ( !NodeType.matchesDense( nodeTypes, isDense ) )
{
return false;
}
// Here we have a special case where we want to filter on type, but only for dense nodes
return !(isDense && filter != null && !filter.test( record ));
}
}