1
1
package net .schmizz .keepalive ;
2
2
3
3
import net .schmizz .sshj .Config ;
4
+ import net .schmizz .sshj .common .LoggerFactory ;
4
5
import net .schmizz .sshj .connection .ConnectionException ;
5
6
import net .schmizz .sshj .connection .ConnectionImpl ;
6
7
import net .schmizz .sshj .transport .TransportException ;
@@ -29,8 +30,8 @@ public class BoundedKeepAliveProvider extends KeepAliveProvider {
29
30
protected final KeepAliveMonitor monitor ;
30
31
31
32
32
- public BoundedKeepAliveProvider (Config config , int numberOfThreads ) {
33
- this .monitor = new KeepAliveMonitor (config , numberOfThreads );
33
+ public BoundedKeepAliveProvider (LoggerFactory loggerFactory , int numberOfThreads ) {
34
+ this .monitor = new KeepAliveMonitor (loggerFactory , numberOfThreads );
34
35
}
35
36
36
37
public void setKeepAliveInterval (int interval ) {
@@ -76,40 +77,38 @@ public void startKeepAlive() {
76
77
}
77
78
78
79
protected static class KeepAliveMonitor {
80
+ private final Logger logger ;
79
81
80
- private final int numberOfThreads ;
81
- private final PriorityBlockingQueue <Wrapper > Q =
82
+ private final PriorityBlockingQueue <Wrapper > q =
82
83
new PriorityBlockingQueue <>(32 , Comparator .comparingLong (w -> w .nextTimeMillis ));
83
- private long idleSleepMillis = 100 ;
84
84
private static final List <Thread > workerThreads = new ArrayList <>();
85
+
86
+ private volatile long idleSleepMillis = 100 ;
87
+ private final int numberOfThreads ;
88
+
85
89
volatile boolean started = false ;
86
- private final Logger logger ;
87
90
88
91
private final ReentrantLock lock = new ReentrantLock ();
89
92
private final Condition shutDown = lock .newCondition ();
90
93
private final AtomicInteger shutDownCnt = new AtomicInteger (0 );
91
94
92
- public KeepAliveMonitor (Config config , int numberOfThreads ) {
95
+ public KeepAliveMonitor (LoggerFactory loggerFactory , int numberOfThreads ) {
93
96
this .numberOfThreads = numberOfThreads ;
94
- logger = config . getLoggerFactory () .getLogger (KeepAliveMonitor .class );
97
+ logger = loggerFactory .getLogger (KeepAliveMonitor .class );
95
98
}
96
99
97
100
// made public for test
98
101
public void register (KeepAlive keepAlive ) {
99
102
if (!started ) {
100
103
start ();
101
104
}
102
- Q .add (new Wrapper (keepAlive ));
105
+ q .add (new Wrapper (keepAlive ));
103
106
}
104
107
105
108
public void setIdleSleepMillis (long idleSleepMillis ) {
106
109
this .idleSleepMillis = idleSleepMillis ;
107
110
}
108
111
109
- void unregister (KeepAlive keepAlive ) {
110
- Q .removeIf (w -> keepAlive == w .keepAlive );
111
- }
112
-
113
112
private void sleep () {
114
113
sleep (idleSleepMillis );
115
114
}
@@ -140,7 +139,7 @@ private void doStart() {
140
139
while (!Thread .currentThread ().isInterrupted ()) {
141
140
Wrapper wrapper ;
142
141
143
- if (Q .isEmpty () || (wrapper = Q .poll ()) == null ) {
142
+ if (q .isEmpty () || (wrapper = q .poll ()) == null ) {
144
143
sleep ();
145
144
continue ;
146
145
}
@@ -154,7 +153,7 @@ private void doStart() {
154
153
155
154
try {
156
155
wrapper .keepAlive .doKeepAlive ();
157
- Q .add (wrapper .reschedule ());
156
+ q .add (wrapper .reschedule ());
158
157
} catch (Exception e ) {
159
158
// If we weren't interrupted, kill the transport, then this exception was unexpected.
160
159
// Else we're in shutdown-mode already, so don't forcibly kill the transport.
0 commit comments