|
1 | 1 | /* |
2 | | - * Copyright (c) 2005, 2024, Oracle and/or its affiliates. All rights reserved. |
| 2 | + * Copyright (c) 2005, 2025, Oracle and/or its affiliates. All rights reserved. |
3 | 3 | * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
4 | 4 | * |
5 | 5 | * This code is free software; you can redistribute it and/or modify it |
|
60 | 60 | import java.util.Set; |
61 | 61 | import java.util.Timer; |
62 | 62 | import java.util.TimerTask; |
| 63 | +import java.util.concurrent.CountDownLatch; |
63 | 64 | import java.util.concurrent.Executor; |
| 65 | +import java.util.concurrent.TimeUnit; |
64 | 66 |
|
65 | 67 | import static java.nio.charset.StandardCharsets.ISO_8859_1; |
66 | 68 | import static sun.net.httpserver.Utils.isValidName; |
@@ -93,7 +95,7 @@ class ServerImpl { |
93 | 95 | private final Set<HttpConnection> rspConnections; |
94 | 96 | private List<Event> events; |
95 | 97 | private final Object lolock = new Object(); |
96 | | - private volatile boolean finished = false; |
| 98 | + private final CountDownLatch finishedLatch = new CountDownLatch(1); |
97 | 99 | private volatile boolean terminating = false; |
98 | 100 | private boolean bound = false; |
99 | 101 | private boolean started = false; |
@@ -179,7 +181,7 @@ public void bind (InetSocketAddress addr, int backlog) throws IOException { |
179 | 181 | } |
180 | 182 |
|
181 | 183 | public void start () { |
182 | | - if (!bound || started || finished) { |
| 184 | + if (!bound || started || finished()) { |
183 | 185 | throw new IllegalStateException ("server in wrong state"); |
184 | 186 | } |
185 | 187 | if (executor == null) { |
@@ -222,45 +224,75 @@ public HttpsConfigurator getHttpsConfigurator () { |
222 | 224 | return httpsConfig; |
223 | 225 | } |
224 | 226 |
|
| 227 | + private final boolean finished(){ |
| 228 | + // if the latch is 0, the server is finished |
| 229 | + return finishedLatch.getCount() == 0; |
| 230 | + } |
| 231 | + |
225 | 232 | public final boolean isFinishing() { |
226 | | - return finished; |
| 233 | + return finished(); |
227 | 234 | } |
228 | 235 |
|
| 236 | + /** |
| 237 | + * This method stops the server by adding a stop request event and |
| 238 | + * waiting for the server until the event is triggered or until the maximum delay is triggered. |
| 239 | + * <p> |
| 240 | + * This ensures that the server is stopped immediately after all exchanges are complete. HttpConnections will be forcefully closed if active exchanges do not |
| 241 | + * complete within the imparted delay. |
| 242 | + * |
| 243 | + * @param delay maximum delay to wait for exchanges completion, in seconds |
| 244 | + */ |
229 | 245 | public void stop (int delay) { |
230 | 246 | if (delay < 0) { |
231 | 247 | throw new IllegalArgumentException ("negative delay parameter"); |
232 | 248 | } |
| 249 | + |
| 250 | + logger.log(Level.TRACE, "stopping"); |
| 251 | + // posting a stop event, which will flip finished flag if it finishes |
| 252 | + // before the timeout in this method |
233 | 253 | terminating = true; |
| 254 | + |
| 255 | + addEvent(new Event.StopRequested()); |
| 256 | + |
234 | 257 | try { schan.close(); } catch (IOException e) {} |
235 | 258 | selector.wakeup(); |
236 | | - long latest = System.currentTimeMillis() + delay * 1000; |
237 | | - while (System.currentTimeMillis() < latest) { |
238 | | - delay(); |
239 | | - if (finished) { |
240 | | - break; |
| 259 | + |
| 260 | + try { |
| 261 | + // waiting for the duration of the delay, unless released before |
| 262 | + finishedLatch.await(delay, TimeUnit.SECONDS); |
| 263 | + |
| 264 | + } catch (InterruptedException e) { |
| 265 | + logger.log(Level.TRACE, "Error in awaiting the delay"); |
| 266 | + |
| 267 | + } finally { |
| 268 | + |
| 269 | + logger.log(Level.TRACE, "closing connections"); |
| 270 | + finishedLatch.countDown(); |
| 271 | + selector.wakeup(); |
| 272 | + synchronized (allConnections) { |
| 273 | + for (HttpConnection c : allConnections) { |
| 274 | + c.close(); |
| 275 | + } |
241 | 276 | } |
242 | | - } |
243 | | - finished = true; |
244 | | - selector.wakeup(); |
245 | | - synchronized (allConnections) { |
246 | | - for (HttpConnection c : allConnections) { |
247 | | - c.close(); |
| 277 | + allConnections.clear(); |
| 278 | + idleConnections.clear(); |
| 279 | + newlyAcceptedConnections.clear(); |
| 280 | + timer.cancel(); |
| 281 | + if (reqRspTimeoutEnabled) { |
| 282 | + timer1.cancel(); |
248 | 283 | } |
249 | | - } |
250 | | - allConnections.clear(); |
251 | | - idleConnections.clear(); |
252 | | - newlyAcceptedConnections.clear(); |
253 | | - timer.cancel(); |
254 | | - if (reqRspTimeoutEnabled) { |
255 | | - timer1.cancel(); |
256 | | - } |
257 | | - if (dispatcherThread != null && dispatcherThread != Thread.currentThread()) { |
258 | | - try { |
259 | | - dispatcherThread.join(); |
260 | | - } catch (InterruptedException e) { |
261 | | - Thread.currentThread().interrupt(); |
262 | | - logger.log (Level.TRACE, "ServerImpl.stop: ", e); |
| 284 | + logger.log(Level.TRACE, "connections closed"); |
| 285 | + |
| 286 | + if (dispatcherThread != null && dispatcherThread != Thread.currentThread()) { |
| 287 | + logger.log(Level.TRACE, "waiting for dispatcher thread"); |
| 288 | + try { |
| 289 | + dispatcherThread.join(); |
| 290 | + } catch (InterruptedException e) { |
| 291 | + Thread.currentThread().interrupt(); |
| 292 | + logger.log(Level.TRACE, "ServerImpl.stop: ", e); |
| 293 | + } |
263 | 294 | } |
| 295 | + logger.log(Level.TRACE, "server stopped"); |
264 | 296 | } |
265 | 297 | } |
266 | 298 |
|
@@ -382,15 +414,34 @@ void addEvent (Event r) { |
382 | 414 | class Dispatcher implements Runnable { |
383 | 415 |
|
384 | 416 | private void handleEvent (Event r) { |
| 417 | + |
| 418 | + // Stopping marking the state as finished if stop is requested, |
| 419 | + // termination is in progress and exchange count is 0 |
| 420 | + if (r instanceof Event.StopRequested) { |
| 421 | + logger.log(Level.TRACE, "Handling Stop Requested Event"); |
| 422 | + |
| 423 | + // checking if terminating is set to true |
| 424 | + final boolean terminatingCopy = terminating; |
| 425 | + assert terminatingCopy; |
| 426 | + |
| 427 | + if (getExchangeCount() == 0 && reqConnections.isEmpty()) { |
| 428 | + finishedLatch.countDown(); |
| 429 | + } else { |
| 430 | + logger.log(Level.TRACE, "Some requests are still pending"); |
| 431 | + } |
| 432 | + return; |
| 433 | + } |
| 434 | + |
385 | 435 | ExchangeImpl t = r.exchange; |
386 | 436 | HttpConnection c = t.getConnection(); |
| 437 | + |
387 | 438 | try { |
388 | | - if (r instanceof WriteFinishedEvent) { |
| 439 | + if (r instanceof Event.WriteFinished) { |
389 | 440 |
|
390 | 441 | logger.log(Level.TRACE, "Write Finished"); |
391 | 442 | int exchanges = endExchange(); |
392 | | - if (terminating && exchanges == 0) { |
393 | | - finished = true; |
| 443 | + if (terminating && exchanges == 0 && reqConnections.isEmpty()) { |
| 444 | + finishedLatch.countDown(); |
394 | 445 | } |
395 | 446 | LeftOverInputStream is = t.getOriginalInputStream(); |
396 | 447 | if (!is.isEOF()) { |
@@ -440,11 +491,12 @@ void reRegister (HttpConnection c) { |
440 | 491 | } |
441 | 492 |
|
442 | 493 | public void run() { |
443 | | - while (!finished) { |
| 494 | + // finished() will be true when there are no active exchange after terminating |
| 495 | + while (!finished()) { |
444 | 496 | try { |
445 | 497 | List<Event> list = null; |
446 | 498 | synchronized (lolock) { |
447 | | - if (events.size() > 0) { |
| 499 | + if (!events.isEmpty()) { |
448 | 500 | list = events; |
449 | 501 | events = new ArrayList<>(); |
450 | 502 | } |
@@ -591,18 +643,18 @@ private void closeConnection(HttpConnection conn) { |
591 | 643 | conn.close(); |
592 | 644 | allConnections.remove(conn); |
593 | 645 | switch (conn.getState()) { |
594 | | - case REQUEST: |
595 | | - reqConnections.remove(conn); |
596 | | - break; |
597 | | - case RESPONSE: |
598 | | - rspConnections.remove(conn); |
599 | | - break; |
600 | | - case IDLE: |
601 | | - idleConnections.remove(conn); |
602 | | - break; |
603 | | - case NEWLY_ACCEPTED: |
604 | | - newlyAcceptedConnections.remove(conn); |
605 | | - break; |
| 646 | + case REQUEST: |
| 647 | + reqConnections.remove(conn); |
| 648 | + break; |
| 649 | + case RESPONSE: |
| 650 | + rspConnections.remove(conn); |
| 651 | + break; |
| 652 | + case IDLE: |
| 653 | + idleConnections.remove(conn); |
| 654 | + break; |
| 655 | + case NEWLY_ACCEPTED: |
| 656 | + newlyAcceptedConnections.remove(conn); |
| 657 | + break; |
606 | 658 | } |
607 | 659 | assert !reqConnections.remove(conn); |
608 | 660 | assert !rspConnections.remove(conn); |
@@ -925,19 +977,16 @@ void logReply (int code, String requestStr, String text) { |
925 | 977 | logger.log (Level.DEBUG, message); |
926 | 978 | } |
927 | 979 |
|
928 | | - void delay () { |
929 | | - Thread.yield(); |
930 | | - try { |
931 | | - Thread.sleep (200); |
932 | | - } catch (InterruptedException e) {} |
933 | | - } |
934 | | - |
935 | 980 | private int exchangeCount = 0; |
936 | 981 |
|
937 | 982 | synchronized void startExchange () { |
938 | 983 | exchangeCount ++; |
939 | 984 | } |
940 | 985 |
|
| 986 | + synchronized int getExchangeCount() { |
| 987 | + return exchangeCount; |
| 988 | + } |
| 989 | + |
941 | 990 | synchronized int endExchange () { |
942 | 991 | exchangeCount --; |
943 | 992 | assert exchangeCount >= 0; |
|
0 commit comments