From 45db8266b15f8b41d973d650657b283e2aa0137c Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Fri, 14 Nov 2025 11:26:21 +0530 Subject: [PATCH 1/4] Executor service shutdown moved to finally block --- .../core/dataexport/ExportManager.java | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java index 4a6a41596e..dc3332fdb0 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java @@ -105,20 +105,21 @@ public ExportReport startExport( isFirstBatch, exportReport)); } - executorService.shutdown(); - if (executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)) { - logger.info("All tasks completed"); - } else { - logger.error("Timeout occurred while waiting for tasks to complete"); - // TODO: handle this - } processFooter(exportOptions, tableMetadata, bufferedWriter); - } catch (InterruptedException - | IOException - | UnknownTransactionStatusException - | CrudException e) { + } catch (IOException | UnknownTransactionStatusException | CrudException e) { logger.error("Error during export: ", e); } finally { + executorService.shutdown(); + try { + if (executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)) { + logger.info("All tasks completed"); + } else { + logger.error("Timeout occurred while waiting for tasks to complete"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("Interrupted while waiting for executor termination", e); + } bufferedWriter.flush(); } } catch (ExportOptionsValidationException | IOException | ScalarDbDaoException e) { From e84bc49f79246af1833133048c4398ee357dc3fc Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Fri, 14 Nov 2025 11:48:13 +0530 Subject: [PATCH 2/4] Process footer changes --- .../core/dataexport/ExportManager.java | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java index dc3332fdb0..58aea61a32 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java @@ -105,8 +105,7 @@ public ExportReport startExport( isFirstBatch, exportReport)); } - processFooter(exportOptions, tableMetadata, bufferedWriter); - } catch (IOException | UnknownTransactionStatusException | CrudException e) { + } catch (UnknownTransactionStatusException | CrudException e) { logger.error("Error during export: ", e); } finally { executorService.shutdown(); @@ -120,7 +119,18 @@ public ExportReport startExport( Thread.currentThread().interrupt(); logger.error("Interrupted while waiting for executor termination", e); } - bufferedWriter.flush(); + // Process footer after all tasks are complete + try { + processFooter(exportOptions, tableMetadata, bufferedWriter); + } catch (IOException e) { + logger.error("Error processing footer", e); + } + // Flush buffered writer + try { + bufferedWriter.flush(); + } catch (IOException e) { + logger.error("Error flushing writer", e); + } } } catch (ExportOptionsValidationException | IOException | ScalarDbDaoException e) { logger.error("Error during export: {}", e.getMessage()); From 27b548476c502abe2a55351f1ffbbf2ae77051df Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Mon, 17 Nov 2025 11:41:33 +0530 Subject: [PATCH 3/4] move process footer insude termination if condition --- .../dataloader/core/dataexport/ExportManager.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java index 58aea61a32..80d6e5cceb 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java @@ -112,6 +112,12 @@ public ExportReport startExport( try { if (executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)) { logger.info("All tasks completed"); + // Process footer after all tasks are complete + try { + processFooter(exportOptions, tableMetadata, bufferedWriter); + } catch (IOException e) { + logger.error("Error processing footer", e); + } } else { logger.error("Timeout occurred while waiting for tasks to complete"); } @@ -119,12 +125,7 @@ public ExportReport startExport( Thread.currentThread().interrupt(); logger.error("Interrupted while waiting for executor termination", e); } - // Process footer after all tasks are complete - try { - processFooter(exportOptions, tableMetadata, bufferedWriter); - } catch (IOException e) { - logger.error("Error processing footer", e); - } + // Flush buffered writer try { bufferedWriter.flush(); From 342a5cfaaaf4630b537c3eab90028a0c14e3ae27 Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Thu, 20 Nov 2025 10:03:32 +0530 Subject: [PATCH 4/4] feedback changes --- .../core/dataexport/ExportManager.java | 42 +++++++------------ 1 file changed, 16 insertions(+), 26 deletions(-) diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java index a27854dcf5..517cd34b06 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java @@ -80,11 +80,11 @@ public ExportReport startExport( ExecutorService executorService = Executors.newFixedThreadPool(exportOptions.getMaxThreadCount()); - BufferedWriter bufferedWriter = new BufferedWriter(writer); boolean isJson = exportOptions.getOutputFileFormat() == FileFormat.JSON; try (TransactionManagerCrudOperable.Scanner scanner = - createScanner(exportOptions, dao, manager)) { + createScanner(exportOptions, dao, manager); + BufferedWriter bufferedWriter = new BufferedWriter(writer)) { Iterator iterator = scanner.iterator(); AtomicBoolean isFirstBatch = new AtomicBoolean(true); @@ -103,32 +103,22 @@ public ExportReport startExport( isFirstBatch, exportReport)); } - } catch (UnknownTransactionStatusException | CrudException e) { - logger.error("Error during export: ", e); - } finally { executorService.shutdown(); - try { - if (executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)) { - logger.info("All tasks completed"); - // Process footer after all tasks are complete - try { - processFooter(exportOptions, tableMetadata, bufferedWriter); - } catch (IOException e) { - logger.error("Error processing footer", e); - } - } else { - logger.error("Timeout occurred while waiting for tasks to complete"); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - logger.error("Interrupted while waiting for executor termination", e); + if (executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)) { + logger.info("All tasks completed"); + processFooter(exportOptions, tableMetadata, bufferedWriter); + } else { + logger.error("Timeout occurred while waiting for tasks to complete"); + // TODO: handle this } - - // Flush buffered writer - try { - bufferedWriter.flush(); - } catch (IOException e) { - logger.error("Error flushing writer", e); + } catch (InterruptedException + | IOException + | UnknownTransactionStatusException + | CrudException e) { + logger.error("Error during export: ", e); + } finally { + if (!executorService.isShutdown()) { + executorService.shutdownNow(); } } } catch (ExportOptionsValidationException | IOException | ScalarDbDaoException e) {