-
Notifications
You must be signed in to change notification settings - Fork 2.3k
/
CheckPointThreshold.java
145 lines (131 loc) · 5.55 KB
/
CheckPointThreshold.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.transaction.log.checkpoint;
import java.time.Clock;
import java.util.NoSuchElementException;
import java.util.function.Consumer;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.logging.LogProvider;
import static org.neo4j.graphdb.factory.GraphDatabaseSettings.check_point_policy;
/**
* A check point threshold provides information if a check point is required or not.
*/
public interface CheckPointThreshold
{
/**
* This method initialize the threshold by providing the initial transaction id
*
* @param transactionId the latest transaction committed id
*/
void initialize( long transactionId );
/**
* This method can be used for querying the threshold about the necessity of a check point.
*
* @param lastCommittedTransactionId the latest transaction committed id
* @param consumer will be called with the description about this threshold only if the return value is true
* @return true is a check point is needed, false otherwise.
*/
boolean isCheckPointingNeeded( long lastCommittedTransactionId, Consumer<String> consumer );
/**
* This method notifies the threshold that a check point has happened. This must be called every time a check point
* has been written in the transaction log in order to make sure that the threshold updates its condition.
* <p>
* This is important since we might have multiple thresholds or forced check points.
*
* @param transactionId the latest transaction committed id used by the check point
*/
void checkPointHappened( long transactionId );
/**
* Return any desired checking frequencies, as a number of milliseconds between calls to
* {@link #isCheckPointingNeeded(long, Consumer)}, if this {@link CheckPointThreshold} instance has any opinion on
* the matter, or return {@link LongStream#empty()} is fine with some default checking frequency.
* <p>
* This is returned as an {@link LongStream} because a threshold might be composed of multiple other thresholds.
* It is up to the caller to figure out how to best schedule this threshold, if the stream contains more than one
* frequency. One way could be to use the lowest frequency, e.g. with {@link LongStream#min()}.
*
* @return A stream desired scheduling frequencies, if any specific ones are desired by this threshold.
*/
LongStream checkFrequencyMillis();
/**
* Create and configure a {@link CheckPointThreshold} based on the given configurations.
*/
static CheckPointThreshold createThreshold( Config config, Clock clock, LogProvider logProvider )
{
String policyName = config.get( check_point_policy );
CheckPointThresholdPolicy policy;
try
{
policy = CheckPointThresholdPolicy.loadPolicy( policyName );
}
catch ( NoSuchElementException e )
{
logProvider.getLog( CheckPointThreshold.class ).warn(
"Could not load check point policy '" + check_point_policy.name() + "=" + policyName + "'. " +
"Using default policy instead.", e );
policy = new PeriodicThresholdPolicy();
}
return policy.createThreshold( config, clock, logProvider );
}
/**
* Create a new {@link CheckPointThreshold} which will trigger if any of the given thresholds triggers.
*/
static CheckPointThreshold or( final CheckPointThreshold... thresholds )
{
return new CheckPointThreshold()
{
@Override
public void initialize( long transactionId )
{
for ( CheckPointThreshold threshold : thresholds )
{
threshold.initialize( transactionId );
}
}
@Override
public boolean isCheckPointingNeeded( long transactionId, Consumer<String> consumer )
{
for ( CheckPointThreshold threshold : thresholds )
{
if ( threshold.isCheckPointingNeeded( transactionId, consumer ) )
{
return true;
}
}
return false;
}
@Override
public void checkPointHappened( long transactionId )
{
for ( CheckPointThreshold threshold : thresholds )
{
threshold.checkPointHappened( transactionId );
}
}
@Override
public LongStream checkFrequencyMillis()
{
return Stream.of( thresholds ).flatMapToLong( CheckPointThreshold::checkFrequencyMillis );
}
};
}
}