This repository has been archived by the owner on Feb 27, 2023. It is now read-only.
forked from hector-client/hector
-
Notifications
You must be signed in to change notification settings - Fork 31
/
CassandraHostRetryService.java
130 lines (103 loc) · 4.33 KB
/
CassandraHostRetryService.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package me.prettyprint.cassandra.connection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import me.prettyprint.cassandra.service.CassandraHost;
import me.prettyprint.cassandra.service.CassandraHostConfigurator;
import me.prettyprint.cassandra.service.ExceptionsTranslator;
import me.prettyprint.hector.api.exceptions.HCassandraInternalException;
import me.prettyprint.hector.api.exceptions.HectorException;
import me.prettyprint.hector.api.exceptions.HectorTransportException;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CassandraHostRetryService extends BackgroundCassandraHostService {
private static Logger log = LoggerFactory.getLogger(CassandraHostRetryService.class);
public static final int DEF_QUEUE_SIZE = -1;
public static final int DEF_RETRY_DELAY = 10;
private final LinkedBlockingQueue<CassandraHost> downedHostQueue;
private final ExceptionsTranslator exceptionsTranslator;
public CassandraHostRetryService(HConnectionManager connectionManager,
CassandraHostConfigurator cassandraHostConfigurator) {
super(connectionManager, cassandraHostConfigurator);
this.exceptionsTranslator = connectionManager.exceptionsTranslator;
this.retryDelayInSeconds = cassandraHostConfigurator.getRetryDownedHostsDelayInSeconds();
downedHostQueue = new LinkedBlockingQueue<CassandraHost>(cassandraHostConfigurator.getRetryDownedHostsQueueSize() < 1
? Integer.MAX_VALUE : cassandraHostConfigurator.getRetryDownedHostsQueueSize());
sf = executor.scheduleWithFixedDelay(new RetryRunner(), this.retryDelayInSeconds,this.retryDelayInSeconds, TimeUnit.SECONDS);
log.info("Downed Host Retry service started with queue size {} and retry delay {}s",
cassandraHostConfigurator.getRetryDownedHostsQueueSize(),
retryDelayInSeconds);
}
@Override
void shutdown() {
log.info("Downed Host retry shutdown hook called");
if ( sf != null ) {
sf.cancel(true);
}
if ( executor != null ) {
executor.shutdownNow();
}
log.info("Downed Host retry shutdown complete");
}
public void add(CassandraHost cassandraHost) {
downedHostQueue.add(cassandraHost);
if ( log.isInfoEnabled() ) {
log.info("Host detected as down was added to retry queue: {}", cassandraHost.getName());
}
}
public boolean contains(CassandraHost cassandraHost) {
return downedHostQueue.contains(cassandraHost);
}
public Set<CassandraHost> getDownedHosts() {
return Collections.unmodifiableSet(new HashSet<CassandraHost>(downedHostQueue));
}
@Override
public void applyRetryDelay() {
sf.cancel(false);
executor.schedule(new RetryRunner(), retryDelayInSeconds, TimeUnit.SECONDS);
}
public void flushQueue() {
downedHostQueue.clear();
log.info("Downed Host retry queue flushed.");
}
class RetryRunner implements Runnable {
@Override
public void run() {
CassandraHost cassandraHost = downedHostQueue.poll();
if ( cassandraHost == null ) {
if ( log.isDebugEnabled() ) {
log.debug("Retry service fired... nothing to do.");
}
return;
}
boolean reconnected = verifyConnection(cassandraHost);
log.info("Downed Host retry status {} with host: {}", reconnected, cassandraHost.getName());
if ( reconnected ) {
reconnected = connectionManager.addCassandraHost(cassandraHost);
}
if ( !reconnected && cassandraHost != null ) {
downedHostQueue.add(cassandraHost);
}
}
private boolean verifyConnection(CassandraHost cassandraHost) {
if ( cassandraHost == null ) {
return false;
}
boolean found = false;
HThriftClient client = new HThriftClient(cassandraHost);
try {
client.open();
found = client.getCassandra().describe_cluster_name() != null;
client.close();
} catch (HectorTransportException he) {
log.warn("Downed {} host still appears to be down: {}", cassandraHost, he.getMessage());
} catch (Exception ex) {
log.error("Downed Host retry failed attempt to verify CassandraHost", ex);
}
return found;
}
}
}