/
state.go
155 lines (134 loc) · 3.65 KB
/
state.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
/*
* Copyright (c) 2002-2020 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Package retry handles retry operations.
package retry
import (
"fmt"
"time"
"github.com/neo4j/neo4j-go-driver/v4/neo4j/db"
"github.com/neo4j/neo4j-go-driver/v4/neo4j/log"
)
type Router interface {
Invalidate(database string)
}
type CommitFailedDeadError struct {
inner error
}
func (e *CommitFailedDeadError) Error() string {
return fmt.Sprintf("Connection lost during commit: %s", e.inner)
}
type State struct {
LastErrWasRetryable bool
LastErr error
stop bool
Errs []error
Causes []string
MaxTransactionRetryTime time.Duration
Log log.Logger
LogName string
LogId string
Now func() time.Time
Sleep func(time.Duration)
Throttle Throttler
MaxDeadConnections int
Router Router
DatabaseName string
start time.Time
cause string
deadErrors int
skipSleep bool
}
func (s *State) OnFailure(conn db.Connection, err error, isCommitting bool) {
s.LastErr = err
s.cause = ""
s.skipSleep = false
// Check timeout
if s.start.IsZero() {
s.start = s.Now()
}
if s.Now().Sub(s.start) > s.MaxTransactionRetryTime {
s.stop = true
s.cause = "Timeout"
return
}
// Reset after determined to evaluate this error
s.LastErrWasRetryable = false
// Failed to connect
if conn == nil {
s.LastErrWasRetryable = true
s.cause = "No available connection"
return
}
// Check if the connection died, if it died during commit it is not safe to retry.
if !conn.IsAlive() {
if isCommitting {
s.stop = true
// The error is most probably io.EOF so enrich the error
// to make this error more recognizable.
s.LastErr = &CommitFailedDeadError{inner: s.LastErr}
return
}
s.deadErrors += 1
s.stop = s.deadErrors > s.MaxDeadConnections
s.LastErrWasRetryable = true
s.cause = "Connection lost"
s.skipSleep = true
return
}
if dbErr, isDbErr := err.(*db.Neo4jError); isDbErr {
if dbErr.IsRetriableCluster() {
// Force routing tables to be updated before trying again
s.Router.Invalidate(s.DatabaseName)
s.cause = "Cluster error"
s.LastErrWasRetryable = true
return
}
if dbErr.IsRetriableTransient() {
s.cause = "Transient error"
s.LastErrWasRetryable = true
return
}
}
s.stop = true
}
func (s *State) Continue() bool {
// No error happened yet
if !s.stop && s.LastErr == nil {
return true
}
// Track the error and the cause
s.Errs = append(s.Errs, s.LastErr)
if s.cause != "" {
s.Causes = append(s.Causes, s.cause)
}
// Retry after optional sleep
if !s.stop {
if s.skipSleep {
s.Log.Debugf(s.LogName, s.LogId, "Retrying transaction (%s): %s", s.cause, s.LastErr)
} else {
s.Throttle = s.Throttle.next()
sleepTime := s.Throttle.delay()
s.Log.Debugf(s.LogName, s.LogId,
"Retrying transaction (%s): %s [after %s]", s.cause, s.LastErr, sleepTime)
s.Sleep(sleepTime)
}
return true
}
return false
}