This repository has been archived by the owner on Aug 23, 2023. It is now read-only.
/
cassandra_session.go
153 lines (137 loc) · 4.47 KB
/
cassandra_session.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package cassandra
import (
"fmt"
"sync"
"time"
"github.com/gocql/gocql"
log "github.com/sirupsen/logrus"
)
// Session stores a connection to Cassandra along with associated configurations
type Session struct {
wg sync.WaitGroup
session *gocql.Session
cluster *gocql.ClusterConfig
shutdown chan struct{}
connectionCheckTimeout time.Duration
connectionCheckInterval time.Duration
addrs string
logPrefix string
sync.RWMutex
}
// NewSession creates and returns a Session. Upon failure it will return nil and an error.
func NewSession(clusterConfig *gocql.ClusterConfig,
timeout time.Duration,
interval time.Duration,
addrs string,
logPrefix string) (*Session, error) {
if clusterConfig == nil {
log.Errorf("cassandra.NewSession received nil pointer for ClusterConfig")
return nil, fmt.Errorf("cassandra.NewSession received nil pointer for ClusterConfig")
}
session, err := clusterConfig.CreateSession()
if err != nil {
log.Errorf("cassandra.NewSession failed to create session: %v", err)
return nil, err
}
cs := &Session{
session: session,
cluster: clusterConfig,
shutdown: make(chan struct{}),
connectionCheckTimeout: timeout,
connectionCheckInterval: interval,
addrs: addrs,
logPrefix: logPrefix,
}
if cs.connectionCheckInterval > 0 {
cs.wg.Add(1)
go cs.deadConnectionRefresh()
}
return cs, nil
}
func (s *Session) Stop() {
close(s.shutdown)
s.wg.Wait()
}
// deadConnectionRefresh will run a query using the current Cassandra session every connectionCheckInterval
// if it cannot query Cassandra for longer than connectionCheckTimeout it will create a new session
//
// We implemented this due to an issue in gocql (https://github.com/gocql/gocql/issues/831). Once that issue is resolved
// we should be able to get rid of this code.
func (s *Session) deadConnectionRefresh() {
defer s.wg.Done()
log.Infof("%s: dead connection check enabled with an interval of %s", s.logPrefix, s.connectionCheckInterval.String())
ticker := time.NewTicker(s.connectionCheckInterval)
var totaltime time.Duration
var err error
var oldSession *gocql.Session
OUTER:
for {
// connection to Cassandra has been down for longer than the configured timeout
if totaltime >= s.connectionCheckTimeout {
s.Lock()
start := time.Now()
for {
select {
case <-s.shutdown:
log.Infof("%s: received shutdown, exiting deadConnectionRefresh", s.logPrefix)
if s.session != nil && !s.session.Closed() {
s.session.Close()
}
// make sure we unlock the sessionLock before returning
s.Unlock()
return
default:
log.Errorf("%s: creating new session to cassandra using hosts: %v", s.logPrefix, s.addrs)
if s.session != nil && !s.session.Closed() && oldSession == nil {
oldSession = s.session
}
s.session, err = s.cluster.CreateSession()
if err != nil {
log.Errorf("%s: error while attempting to recreate cassandra session. will retry after %v: %v", s.logPrefix, s.connectionCheckInterval.String(), err)
time.Sleep(s.connectionCheckInterval)
totaltime += s.connectionCheckInterval
// continue inner loop to attempt to reconnect
continue
}
s.Unlock()
log.Errorf("%s: reconnecting to cassandra took %s", s.logPrefix, time.Since(start).String())
totaltime = 0
if oldSession != nil {
oldSession.Close()
oldSession = nil
}
// we connected, so go back to the normal outer loop
continue OUTER
}
}
}
select {
case <-s.shutdown:
log.Infof("%s: received shutdown, exiting deadConnectionRefresh", s.logPrefix)
if s.session != nil && !s.session.Closed() {
s.session.Close()
}
return
case <-ticker.C:
s.RLock()
// this query should work on all cassandra deployments, but we may need to revisit this
err = s.session.Query("SELECT cql_version FROM system.local").Exec()
s.RUnlock()
if err == nil {
totaltime = 0
} else {
totaltime += s.connectionCheckInterval
log.Errorf("%s: could not execute connection check query for %v: %v", s.logPrefix, totaltime.String(), err)
}
}
}
}
// CurrentSession retrieves the current active Cassandra session
//
// If the connection to Cassandra is down, this will block until it can be restored
func (s *Session) CurrentSession() *gocql.Session {
s.RLock()
session := s.session
s.RUnlock()
return session
}