Skip to content

Commit

Permalink
[FLINK-35786] Fix NPE BlobServer / shutdownHook
Browse files Browse the repository at this point in the history
  • Loading branch information
rkhachatryan committed Jul 9, 2024
1 parent 93d7f45 commit 050767c
Showing 1 changed file with 16 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ public class BlobServer extends Thread
private final AtomicLong tempFileCounter = new AtomicLong(0);

/** The server socket listening for incoming connections. */
private final ServerSocket serverSocket;
// can be null if BlobServer is shut down before constructor completion
@Nullable private final ServerSocket serverSocket;

/** Blob Server configuration. */
private final Configuration blobServiceConfiguration;
Expand Down Expand Up @@ -354,10 +355,12 @@ public void close() throws IOException {
if (shutdownRequested.compareAndSet(false, true)) {
Exception exception = null;

try {
this.serverSocket.close();
} catch (IOException ioe) {
exception = ioe;
if (serverSocket != null) {
try {
this.serverSocket.close();
} catch (IOException ioe) {
exception = ioe;
}
}

// wake the thread up, in case it is waiting on some operation
Expand Down Expand Up @@ -394,10 +397,14 @@ public void close() throws IOException {
ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG);

if (LOG.isInfoEnabled()) {
LOG.info(
"Stopped BLOB server at {}:{}",
serverSocket.getInetAddress().getHostAddress(),
getPort());
if (serverSocket != null) {
LOG.info(
"Stopped BLOB server at {}:{}",
serverSocket.getInetAddress().getHostAddress(),
getPort());
} else {
LOG.info("Stopped BLOB server before initializing the socket");
}
}

ExceptionUtils.tryRethrowIOException(exception);
Expand Down

0 comments on commit 050767c

Please sign in to comment.