-
Notifications
You must be signed in to change notification settings - Fork 29
/
EventSource.java
899 lines (823 loc) · 33.3 KB
/
EventSource.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
package com.launchdarkly.eventsource;
import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Proxy;
import java.net.Proxy.Type;
import java.net.URI;
import java.security.GeneralSecurityException;
import java.security.KeyStore;
import java.time.Duration;
import java.util.Arrays;
import java.util.Locale;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.X509TrustManager;
import static com.launchdarkly.eventsource.Helpers.pow2;
import static com.launchdarkly.eventsource.ReadyState.CLOSED;
import static com.launchdarkly.eventsource.ReadyState.CONNECTING;
import static com.launchdarkly.eventsource.ReadyState.OPEN;
import static com.launchdarkly.eventsource.ReadyState.RAW;
import static com.launchdarkly.eventsource.ReadyState.SHUTDOWN;
import static java.lang.String.format;
import okhttp3.Authenticator;
import okhttp3.Call;
import okhttp3.ConnectionPool;
import okhttp3.Headers;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import okio.BufferedSource;
import okio.Okio;
/**
* Client for <a href="https://www.w3.org/TR/2015/REC-eventsource-20150203/">Server-Sent Events</a>
* aka EventSource
*/
public class EventSource implements Closeable {
final Logger logger; // visible for tests
/**
* The default value for {@link Builder#reconnectTime(Duration)}: 1 second.
*/
public static final Duration DEFAULT_RECONNECT_TIME = Duration.ofSeconds(1);
/**
* The default value for {@link Builder#maxReconnectTime(Duration)}: 30 seconds.
*/
public static final Duration DEFAULT_MAX_RECONNECT_TIME = Duration.ofSeconds(30);
/**
* The default value for {@link Builder#connectTimeout(Duration)}: 10 seconds.
*/
public static final Duration DEFAULT_CONNECT_TIMEOUT = Duration.ofSeconds(10);
/**
* The default value for {@link Builder#writeTimeout(Duration)}: 5 seconds.
*/
public static final Duration DEFAULT_WRITE_TIMEOUT = Duration.ofSeconds(5);
/**
* The default value for {@link Builder#readTimeout(Duration)}: 5 minutes.
*/
public static final Duration DEFAULT_READ_TIMEOUT = Duration.ofMinutes(5);
/**
* The default value for {@link Builder#backoffResetThreshold(Duration)}: 60 seconds.
*/
public static final Duration DEFAULT_BACKOFF_RESET_THRESHOLD = Duration.ofSeconds(60);
private static final Headers defaultHeaders =
new Headers.Builder().add("Accept", "text/event-stream").add("Cache-Control", "no-cache").build();
private final String name;
private volatile HttpUrl url;
private final Headers headers;
private final String method;
private final RequestBody body;
private final RequestTransformer requestTransformer;
private final ExecutorService eventExecutor;
private final ExecutorService streamExecutor;
volatile Duration reconnectTime; // visible for tests
final Duration maxReconnectTime; // visible for tests
final Duration backoffResetThreshold; // visible for tests
private volatile String lastEventId;
private final AsyncEventHandler handler;
private final ConnectionErrorHandler connectionErrorHandler;
private final AtomicReference<ReadyState> readyState;
private final OkHttpClient client;
private volatile Call call;
private final Random jitter = new Random();
private Response response;
private BufferedSource bufferedSource;
EventSource(Builder builder) {
this.name = builder.name == null ? "" : builder.name;
if (builder.logger == null) {
String loggerName = (builder.loggerBaseName == null ? EventSource.class.getCanonicalName() : builder.loggerBaseName) +
(name.isEmpty() ? "" : ("." + name));
this.logger = new SLF4JLogger(loggerName);
} else {
this.logger = builder.logger;
}
this.url = builder.url;
this.headers = addDefaultHeaders(builder.headers);
this.method = builder.method;
this.body = builder.body;
this.requestTransformer = builder.requestTransformer;
this.lastEventId = builder.lastEventId;
this.reconnectTime = builder.reconnectTime;
this.maxReconnectTime = builder.maxReconnectTime;
this.backoffResetThreshold = builder.backoffResetThreshold;
ThreadFactory eventsThreadFactory = createThreadFactory("okhttp-eventsource-events", builder.threadPriority);
this.eventExecutor = Executors.newSingleThreadExecutor(eventsThreadFactory);
ThreadFactory streamThreadFactory = createThreadFactory("okhttp-eventsource-stream", builder.threadPriority);
this.streamExecutor = Executors.newSingleThreadExecutor(streamThreadFactory);
this.handler = new AsyncEventHandler(this.eventExecutor, builder.handler, logger);
this.connectionErrorHandler = builder.connectionErrorHandler;
this.readyState = new AtomicReference<>(RAW);
this.client = builder.clientBuilder.build();
}
private ThreadFactory createThreadFactory(final String type, final Integer threadPriority) {
final ThreadFactory backingThreadFactory = Executors.defaultThreadFactory();
final AtomicLong count = new AtomicLong(0);
return runnable -> {
Thread thread = backingThreadFactory.newThread(runnable);
thread.setName(format(Locale.ROOT, "%s-[%s]-%d", type, name, count.getAndIncrement()));
thread.setDaemon(true);
if (threadPriority != null) {
thread.setPriority(threadPriority);
}
return thread;
};
}
/**
* Attempts to connect to the remote event source if not already connected. This method returns
* immediately; the connection happens on a worker thread.
*/
public void start() {
if (!readyState.compareAndSet(RAW, CONNECTING)) {
logger.info("Start method called on this already-started EventSource object. Doing nothing");
return;
}
logger.debug("readyState change: {} -> {}", RAW, CONNECTING);
logger.info("Starting EventSource client using URI: " + url);
streamExecutor.execute(this::connect);
}
/**
* Drops the current stream connection (if any) and attempts to reconnect.
* <p>
* This method returns immediately after dropping the current connection; the reconnection happens on
* a worker thread.
* <p>
* If a connection attempt is already in progress but has not yet connected, or if {@link #close()} has
* previously been called, this method has no effect. If {@link #start()} has never been called, it is
* the same as calling {@link #start()}.
*/
public void restart() {
ReadyState previousState = readyState.getAndUpdate(t -> t == ReadyState.OPEN ? ReadyState.CLOSED : t);
if (previousState == OPEN) {
closeCurrentStream(previousState);
} else if (previousState == RAW) {
start();
}
// if already connecting or already shutdown or in the process of closing, do nothing
}
/**
* Returns an enum indicating the current status of the connection.
* @return a {@link ReadyState} value
*/
public ReadyState getState() {
return readyState.get();
}
/**
* Drops the current stream connection (if any) and permanently shuts down the EventSource.
*/
@Override
public void close() {
ReadyState currentState = readyState.getAndSet(SHUTDOWN);
logger.debug("readyState change: {} -> {}", currentState, SHUTDOWN);
if (currentState == SHUTDOWN) {
return;
}
closeCurrentStream(currentState);
eventExecutor.shutdownNow();
streamExecutor.shutdownNow();
// COVERAGE: these null guards are here for safety but in practice the values are never null and there
// is no way to cause them to be null in unit tests
if (client.connectionPool() != null) {
client.connectionPool().evictAll();
}
if (client.dispatcher() != null) {
client.dispatcher().cancelAll();
if (client.dispatcher().executorService() != null) {
client.dispatcher().executorService().shutdownNow();
}
}
}
private void closeCurrentStream(ReadyState previousState) {
if (previousState == ReadyState.OPEN) {
handler.onClosed();
}
if (call != null) {
// The call.cancel() must precede the bufferedSource.close().
// Otherwise, an IllegalArgumentException "Unbalanced enter/exit" error is thrown by okhttp.
// https://github.com/google/ExoPlayer/issues/1348
call.cancel();
logger.debug("call cancelled", null);
}
}
Request buildRequest() {
Request.Builder builder = new Request.Builder()
.headers(headers)
.url(url)
.method(method, body);
if (lastEventId != null && !lastEventId.isEmpty()) {
builder.addHeader("Last-Event-ID", lastEventId);
}
Request request = builder.build();
return requestTransformer == null ? request : requestTransformer.transformRequest(request);
}
private void connect() {
response = null;
bufferedSource = null;
int reconnectAttempts = 0;
ConnectionErrorHandler.Action errorHandlerAction = null;
ConnectionHandler connectionHandler = new ConnectionHandler() {
@Override
public void setReconnectionTime(Duration reconnectionTime) {
EventSource.this.setReconnectionTime(reconnectionTime);
}
@Override
public void setLastEventId(String lastEventId) {
EventSource.this.setLastEventId(lastEventId);
}
};
try {
while (!Thread.currentThread().isInterrupted() && readyState.get() != SHUTDOWN) {
long connectedTime = -1;
ReadyState currentState = readyState.getAndSet(CONNECTING);
logger.debug("readyState change: {} -> {}", currentState, CONNECTING);
try {
call = client.newCall(buildRequest());
response = call.execute();
if (response.isSuccessful()) {
connectedTime = System.currentTimeMillis();
currentState = readyState.getAndSet(OPEN);
if (currentState != CONNECTING) {
// COVERAGE: there is no way to simulate this condition in unit tests
logger.warn("Unexpected readyState change: " + currentState + " -> " + OPEN);
} else {
logger.debug("readyState change: {} -> {}", currentState, OPEN);
}
logger.info("Connected to Event Source stream.");
handler.onOpen();
if (bufferedSource != null) {
bufferedSource.close();
}
bufferedSource = Okio.buffer(response.body().source());
EventParser parser = new EventParser(url.uri(), handler, connectionHandler, logger);
// COVERAGE: the isInterrupted() condition is not encountered in unit tests and it's unclear if it can ever happen
for (String line; !Thread.currentThread().isInterrupted() && (line = bufferedSource.readUtf8LineStrict()) != null; ) {
parser.line(line);
}
} else {
logger.debug("Unsuccessful response: {}", response);
errorHandlerAction = dispatchError(new UnsuccessfulResponseException(response.code()));
}
} catch (EOFException eofe) {
logger.warn("Connection unexpectedly closed.");
} catch (IOException ioe) {
ReadyState state = readyState.get();
if (state == SHUTDOWN) {
errorHandlerAction = ConnectionErrorHandler.Action.SHUTDOWN;
} else if (state == CLOSED) { // this happens if it's being restarted
errorHandlerAction = ConnectionErrorHandler.Action.PROCEED;
} else {
// COVERAGE: there is no way to simulate this condition in unit tests - closing the stream causes EOFException
logger.debug("Connection problem: {}", ioe);
errorHandlerAction = dispatchError(ioe);
}
} finally {
ReadyState nextState = CLOSED;
if (errorHandlerAction == ConnectionErrorHandler.Action.SHUTDOWN) {
if (readyState.get() != SHUTDOWN) {
logger.info("Connection has been explicitly shut down by error handler");
}
nextState = SHUTDOWN;
}
currentState = readyState.getAndSet(nextState);
logger.debug("readyState change: {} -> {}", currentState, nextState);
if (response != null && response.body() != null) {
response.close();
logger.debug("response closed", null);
}
if (bufferedSource != null) {
try {
bufferedSource.close();
logger.debug("buffered source closed", null);
} catch (IOException e) {
// COVERAGE: there is no way to simulate this condition in unit tests
logger.warn("Exception when closing bufferedSource: " + e.toString());
}
}
if (currentState == ReadyState.OPEN) {
handler.onClosed();
}
if (nextState != SHUTDOWN) {
// Reset the backoff if we had a successful connection that stayed good for at least
// backoffResetThresholdMs milliseconds.
if (connectedTime >= 0 && (System.currentTimeMillis() - connectedTime) >= backoffResetThreshold.toMillis()) {
reconnectAttempts = 0;
}
maybeWaitWithBackoff(++reconnectAttempts);
}
}
}
} catch (RejectedExecutionException ignored) {
// COVERAGE: there is no way to simulate this condition in unit tests
call = null;
response = null;
bufferedSource = null;
logger.debug("Rejected execution exception ignored: {}", ignored);
// During shutdown, we tried to send a message to the event handler
// Do not reconnect; the executor has been shut down
}
}
private ConnectionErrorHandler.Action dispatchError(Throwable t) {
ConnectionErrorHandler.Action action = connectionErrorHandler.onConnectionError(t);
if (action != ConnectionErrorHandler.Action.SHUTDOWN) {
handler.onError(t);
}
return action;
}
private void maybeWaitWithBackoff(int reconnectAttempts) {
if (!reconnectTime.isZero() && !reconnectTime.isNegative() && reconnectAttempts > 0) {
try {
Duration sleepTime = backoffWithJitter(reconnectAttempts);
logger.info("Waiting " + sleepTime.toMillis() + " milliseconds before reconnecting...");
Thread.sleep(sleepTime.toMillis());
} catch (InterruptedException ignored) {
}
}
}
Duration backoffWithJitter(int reconnectAttempts) {
long maxTimeLong = Math.min(maxReconnectTime.toMillis(), reconnectTime.toMillis() * pow2(reconnectAttempts));
// 2^31 milliseconds is much longer than any reconnect time we would reasonably want to use, so we can pin this to int
int maxTimeInt = maxTimeLong > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int)maxTimeLong;
return Duration.ofMillis(maxTimeInt / 2 + jitter.nextInt(maxTimeInt) / 2);
}
private static Headers addDefaultHeaders(Headers custom) {
Headers.Builder builder = new Headers.Builder();
for (String name : defaultHeaders.names()) {
if (!custom.names().contains(name)) { // skip the default if they set any custom values for this key
for (String value: defaultHeaders.values(name)) {
builder.add(name, value);
}
}
}
for (String name : custom.names()) {
for (String value : custom.values(name)) {
builder.add(name, value);
}
}
return builder.build();
}
// setReconnectionTime and setLastEventId are used only by our internal ConnectionHandler, in response
// to stream events. From an application's point of view, these properties can only be set at
// configuration time via the builder.
private void setReconnectionTime(Duration reconnectionTime) {
this.reconnectTime = reconnectionTime;
}
private void setLastEventId(String lastEventId) {
this.lastEventId = lastEventId;
}
/**
* Returns the ID value, if any, of the last known event.
* <p>
* This can be set initially with {@link Builder#lastEventId(String)}, and is updated whenever an event
* is received that has an ID. Whether event IDs are supported depends on the server; it may ignore this
* value.
*
* @return the last known event ID, or null
* @see Builder#lastEventId(String)
* @since 2.0.0
*/
public String getLastEventId() {
return lastEventId;
}
/**
* Returns the current stream endpoint as an OkHttp HttpUrl.
*
* @return the endpoint URL
* @since 1.9.0
* @see #getUri()
*/
public HttpUrl getHttpUrl() {
return this.url;
}
/**
* Returns the current stream endpoint as a java.net.URI.
*
* @return the endpoint URI
* @see #getHttpUrl()
*/
public URI getUri() {
return this.url.uri();
}
/**
* Interface for an object that can modify the network request that the EventSource will make.
* Use this in conjunction with {@link EventSource.Builder#requestTransformer(EventSource.RequestTransformer)}
* if you need to set request properties other than the ones that are already supported by the builder (or if,
* for whatever reason, you need to determine the request properties dynamically rather than setting them
* to fixed values initially). For example:
* <pre><code>
* public class RequestTagger implements EventSource.RequestTransformer {
* public Request transformRequest(Request input) {
* return input.newBuilder().tag("hello").build();
* }
* }
*
* EventSource es = new EventSource.Builder(handler, uri).requestTransformer(new RequestTagger()).build();
* </code></pre>
*
* @since 1.9.0
*/
public static interface RequestTransformer {
/**
* Returns a request that is either the same as the input request or based on it. When
* this method is called, EventSource has already set all of its standard properties on
* the request.
*
* @param input the original request
* @return the request that will be used
*/
public Request transformRequest(Request input);
}
/**
* Builder for {@link EventSource}.
*/
public static final class Builder {
private String name;
private Duration reconnectTime = DEFAULT_RECONNECT_TIME;
private Duration maxReconnectTime = DEFAULT_MAX_RECONNECT_TIME;
private Duration backoffResetThreshold = DEFAULT_BACKOFF_RESET_THRESHOLD;
private String lastEventId;
private final HttpUrl url;
private final EventHandler handler;
private ConnectionErrorHandler connectionErrorHandler = ConnectionErrorHandler.DEFAULT;
private Integer threadPriority = null;
private Headers headers = Headers.of();
private Proxy proxy;
private Authenticator proxyAuthenticator = null;
private String method = "GET";
private RequestTransformer requestTransformer = null;
private RequestBody body = null;
private OkHttpClient.Builder clientBuilder;
private Logger logger = null;
private String loggerBaseName = null;
/**
* Creates a new builder.
*
* @param handler the event handler
* @param uri the endpoint as a java.net.URI
* @throws IllegalArgumentException if either argument is null, or if the endpoint is not HTTP or HTTPS
*/
public Builder(EventHandler handler, URI uri) {
this(handler, uri == null ? null : HttpUrl.get(uri));
}
/**
* Creates a new builder.
*
* @param handler the event handler
* @param url the endpoint as an OkHttp HttpUrl
* @throws IllegalArgumentException if either argument is null, or if the endpoint is not HTTP or HTTPS
*
* @since 1.9.0
*/
public Builder(EventHandler handler, HttpUrl url) {
if (handler == null) {
throw new IllegalArgumentException("handler must not be null");
}
if (url == null) {
throw new IllegalArgumentException("URI/URL must not be null");
}
this.url = url;
this.handler = handler;
this.clientBuilder = createInitialClientBuilder();
}
private static OkHttpClient.Builder createInitialClientBuilder() {
OkHttpClient.Builder b = new OkHttpClient.Builder()
.connectionPool(new ConnectionPool(1, 1, TimeUnit.SECONDS))
.connectTimeout(DEFAULT_CONNECT_TIMEOUT)
.readTimeout(DEFAULT_READ_TIMEOUT)
.writeTimeout(DEFAULT_WRITE_TIMEOUT)
.retryOnConnectionFailure(true);
try {
b.sslSocketFactory(new ModernTLSSocketFactory(), defaultTrustManager());
} catch (GeneralSecurityException e) {
// TLS is not available, so don't set up the socket factory, swallow the exception
// COVERAGE: There is no way to cause this to happen in unit tests
}
return b;
}
/**
* Set the HTTP method used for this EventSource client to use for requests to establish the EventSource.
* <p>
* Defaults to "GET".
*
* @param method the HTTP method name; if null or empty, "GET" is used as the default
* @return the builder
*/
public Builder method(String method) {
this.method = (method != null && method.length() > 0) ? method.toUpperCase() : "GET";
return this;
}
/**
* Sets the request body to be used for this EventSource client to use for requests to establish the EventSource.
*
* @param body the body to use in HTTP requests
* @return the builder
*/
public Builder body(RequestBody body) {
this.body = body;
return this;
}
/**
* Specifies an object that will be used to customize outgoing requests. See {@link RequestTransformer} for details.
*
* @param requestTransformer the transformer object
* @return the builder
*
* @since 1.9.0
*/
public Builder requestTransformer(RequestTransformer requestTransformer) {
this.requestTransformer = requestTransformer;
return this;
}
/**
* Set the name for this EventSource client to be used when naming the logger and threadpools. This is mainly useful when
* multiple EventSource clients exist within the same process.
* <p>
* The name only affects logging when using the default SLF4J integration; if you have specified a custom
* {@link #logger(Logger)}, the name will not be included in log messages unless your logger implementation adds it.
*
* @param name the name (without any whitespaces)
* @return the builder
*/
public Builder name(String name) {
this.name = name;
return this;
}
/**
* Sets the ID value of the last event received.
* <p>
* This will be sent to the remote server on the initial connection request, allowing the server to
* skip past previously sent events if it supports this behavior. Once the connection is established,
* this value will be updated whenever an event is received that has an ID. Whether event IDs are
* supported depends on the server; it may ignore this value.
*
* @param lastEventId the last event identifier
* @return the builder
* @since 2.0.0
*/
public Builder lastEventId(String lastEventId) {
this.lastEventId = lastEventId;
return this;
}
/**
* Sets the minimum delay between connection attempts. The actual delay may be slightly less or
* greater, since there is a random jitter. When there is a connection failure, the delay will
* start at this value and will increase exponentially up to the {@link #maxReconnectTime(Duration)}
* value with each subsequent failure, unless it is reset as described in
* {@link Builder#backoffResetThreshold(Duration)}.
*
* @param reconnectTime the minimum delay; null to use the default
* @return the builder
* @see EventSource#DEFAULT_RECONNECT_TIME
*/
public Builder reconnectTime(Duration reconnectTime) {
this.reconnectTime = reconnectTime == null ? DEFAULT_RECONNECT_TIME : reconnectTime;
return this;
}
/**
* Sets the maximum delay between connection attempts. See {@link #reconnectTime(Duration)}.
* The default value is 30 seconds.
*
* @param maxReconnectTime the maximum delay; null to use the default
* @return the builder
* @see EventSource#DEFAULT_MAX_RECONNECT_TIME
*/
public Builder maxReconnectTime(Duration maxReconnectTime) {
this.maxReconnectTime = maxReconnectTime == null ? DEFAULT_MAX_RECONNECT_TIME : maxReconnectTime;
return this;
}
/**
* Sets the minimum amount of time that a connection must stay open before the EventSource resets its
* backoff delay. If a connection fails before the threshold has elapsed, the delay before reconnecting
* will be greater than the last delay; if it fails after the threshold, the delay will start over at
* the initial minimum value. This prevents long delays from occurring on connections that are only
* rarely restarted.
*
* @param backoffResetThreshold the minimum time that a connection must stay open to avoid resetting
* the delay; null to use the default
* @return the builder
* @see EventSource#DEFAULT_BACKOFF_RESET_THRESHOLD
*/
public Builder backoffResetThreshold(Duration backoffResetThreshold) {
this.backoffResetThreshold = backoffResetThreshold == null ? DEFAULT_BACKOFF_RESET_THRESHOLD : backoffResetThreshold;
return this;
}
/**
* Set the headers to be sent when establishing the EventSource connection.
*
* @param headers headers to be sent with the EventSource request
* @return the builder
*/
public Builder headers(Headers headers) {
this.headers = headers;
return this;
}
/**
* Set a custom HTTP client that will be used to make the EventSource connection.
* If you're setting this along with other connection-related items (ie timeouts, proxy),
* you should do this first to avoid overwriting values.
*
* @param client the HTTP client
* @return the builder
*/
public Builder client(OkHttpClient client) {
this.clientBuilder = client.newBuilder();
return this;
}
/**
* Set the HTTP proxy address to be used to make the EventSource connection
*
* @param proxyHost the proxy hostname
* @param proxyPort the proxy port
* @return the builder
*/
public Builder proxy(String proxyHost, int proxyPort) {
proxy = new Proxy(Type.HTTP, new InetSocketAddress(proxyHost, proxyPort));
return this;
}
/**
* Set the {@link Proxy} to be used to make the EventSource connection.
*
* @param proxy the proxy
* @return the builder
*/
public Builder proxy(Proxy proxy) {
this.proxy = proxy;
return this;
}
/**
* Sets the Proxy Authentication mechanism if needed. Defaults to no auth.
*
* @param proxyAuthenticator the authentication mechanism
* @return the builder
*/
public Builder proxyAuthenticator(Authenticator proxyAuthenticator) {
this.proxyAuthenticator = proxyAuthenticator;
return this;
}
/**
* Sets the connection timeout.
*
* @param connectTimeout the connection timeout; null to use the default
* @return the builder
* @see EventSource#DEFAULT_CONNECT_TIMEOUT
*/
public Builder connectTimeout(Duration connectTimeout) {
this.clientBuilder.connectTimeout(connectTimeout == null ? DEFAULT_CONNECT_TIMEOUT : connectTimeout);
return this;
}
/**
* Sets the write timeout.
*
* @param writeTimeout the write timeout; null to use the default
* @return the builder
* @see EventSource#DEFAULT_WRITE_TIMEOUT
*/
public Builder writeTimeout(Duration writeTimeout) {
this.clientBuilder.writeTimeout(writeTimeout == null ? DEFAULT_WRITE_TIMEOUT : writeTimeout);
return this;
}
/**
* Sets the read timeout. If a read timeout happens, the {@code EventSource}
* will restart the connection.
*
* @param readTimeout the read timeout; null to use the default
* @return the builder
* @see EventSource#DEFAULT_READ_TIMEOUT
*/
public Builder readTimeout(Duration readTimeout) {
this.clientBuilder.readTimeout(readTimeout == null ? DEFAULT_READ_TIMEOUT : readTimeout);
return this;
}
/**
* Sets the {@link ConnectionErrorHandler} that should process connection errors.
*
* @param handler the error handler
* @return the builder
*/
public Builder connectionErrorHandler(ConnectionErrorHandler handler) {
this.connectionErrorHandler = handler;
return this;
}
/**
* Specifies the priority for threads created by {@code EventSource}.
* <p>
* If this is left unset, or set to {@code null}, threads will inherit the default priority
* provided by {@code Executors.defaultThreadFactory()}.
*
* @param threadPriority the thread priority, or null to ue the default
* @return the builder
* @since 2.2.0
*/
public Builder threadPriority(Integer threadPriority) {
this.threadPriority = threadPriority;
return this;
}
/**
* Specifies any type of configuration actions you want to perform on the OkHttpClient builder.
* <p>
* {@link ClientConfigurer} is an interface with a single method, {@link ClientConfigurer#configure(okhttp3.OkHttpClient.Builder)},
* that will be called with the {@link okhttp3.OkHttpClient.Builder} instance being used by EventSource.
* In Java 8, this can be a lambda.
* <p>
* It is not guaranteed to be called in any particular order relative to other configuration
* actions specified by this Builder, so if you are using more than one method, do not attempt
* to overwrite the same setting in two ways.
* <pre><code>
* // Java 8 example (lambda)
* eventSourceBuilder.clientBuilderActions(b -> {
* b.sslSocketFactory(mySocketFactory, myTrustManager);
* });
*
* // Java 7 example (anonymous class)
* eventSourceBuilder.clientBuilderActions(new EventSource.Builder.ClientConfigurer() {
* public void configure(OkHttpClient.Builder v) {
* b.sslSocketFactory(mySocketFactory, myTrustManager);
* }
* });
* </code></pre>
* @param configurer a ClientConfigurer (or lambda) that will act on the HTTP client builder
* @return the builder
* @since 1.10.0
*/
public Builder clientBuilderActions(ClientConfigurer configurer) {
configurer.configure(clientBuilder);
return this;
}
/**
* Specifies a custom logger to receive EventSource logging.
* <p>
* If you do not provide a logger, the default is to send log output to SLF4J.
*
* @param logger a {@link Logger} implementation, or null to use the default (SLF4J)
* @return the builder
* @since 2.3.0
*/
public Builder logger(Logger logger) {
this.logger = logger;
return this;
}
/**
* Specifies the base logger name to use for SLF4J logging.
* <p>
* The default is {@code com.launchdarkly.eventsource.EventSource}, plus any name suffix specified
* by {@link #name(String)}. If you instead use {@link #logger(Logger)} to specify some other log
* destination rather than SLF4J, this name is unused.
*
* @param loggerBaseName the SLF4J logger name, or null to use the default
* @return the builder
* @since 2.3.0
*/
public Builder loggerBaseName(String loggerBaseName) {
this.loggerBaseName = loggerBaseName;
return this;
}
/**
* Constructs an {@link EventSource} using the builder's current properties.
* @return the new EventSource instance
*/
public EventSource build() {
if (proxy != null) {
clientBuilder.proxy(proxy);
}
if (proxyAuthenticator != null) {
clientBuilder.proxyAuthenticator(proxyAuthenticator);
}
return new EventSource(this);
}
protected OkHttpClient.Builder getClientBuilder() {
return clientBuilder;
}
private static X509TrustManager defaultTrustManager() throws GeneralSecurityException {
TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(
TrustManagerFactory.getDefaultAlgorithm());
trustManagerFactory.init((KeyStore) null);
TrustManager[] trustManagers = trustManagerFactory.getTrustManagers();
if (trustManagers.length != 1 || !(trustManagers[0] instanceof X509TrustManager)) {
// COVERAGE: There is no way to cause this to happen in unit tests
throw new IllegalStateException("Unexpected default trust managers:"
+ Arrays.toString(trustManagers));
}
return (X509TrustManager) trustManagers[0];
}
/**
* An interface for use with {@link EventSource.Builder#clientBuilderActions(ClientConfigurer)}.
* @since 1.10.0
*/
public static interface ClientConfigurer {
/**
* This method is called with the OkHttp {@link okhttp3.OkHttpClient.Builder} that will be used for
* the EventSource, allowing you to call any configuration methods you want.
* @param builder the client builder
*/
public void configure(OkHttpClient.Builder builder);
}
}
}