From 38055a0f8959c9df42b9088af600cacd4b5ff547 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=90=D0=BD=D1=82=D0=BE=D0=BD=D0=BE=D0=B2=20=D0=9A=D0=BE?= =?UTF-8?q?=D0=BD=D1=81=D1=82=D0=B0=D0=BD=D1=82=D0=B8=D0=BD=20=D0=9E=D0=BB?= =?UTF-8?q?=D0=B5=D0=B3=D0=BE=D0=B2=D0=B8=D1=87?= Date: Wed, 2 Apr 2025 18:04:28 +0300 Subject: [PATCH 1/2] Added YqlWriter thread synchronization --- src/main/java/tech/ydb/app/YqlWriter.java | 32 ++++++++++++++++++----- 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/src/main/java/tech/ydb/app/YqlWriter.java b/src/main/java/tech/ydb/app/YqlWriter.java index 10cf293..0b72339 100644 --- a/src/main/java/tech/ydb/app/YqlWriter.java +++ b/src/main/java/tech/ydb/app/YqlWriter.java @@ -26,6 +26,8 @@ */ public class YqlWriter implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(YqlWriter.class); + private final Object lock = new Object(); + private volatile boolean running = false; private final YdbService ydb; private final int errorThreshold; @@ -106,13 +108,21 @@ public Writer(CdcMsgParser parser, int batchSize, String threadName) { } public void start() { - thread.start(); - logger.info("writer {} started", thread.getName()); + synchronized (lock) { + if (isRunning()) return; + thread.start(); + running = true; + logger.info("writer {} started", thread.getName()); + } } public void stop() { - thread.interrupt(); - logger.info("writer {} stopped", thread.getName()); + synchronized (lock) { + if (!isRunning()) return; + running = false; + thread.interrupt(); + logger.info("writer {} stopped", thread.getName()); + } } public void join() throws InterruptedException { @@ -129,8 +139,11 @@ public void addMesssage(Message msg) { } lastReaded = msg.getWrittenAt(); } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - logger.warn("worker thread was interrupted"); + synchronized (lock) { + running = false; + Thread.currentThread().interrupt(); + logger.warn("worker thread was interrupted"); + } } } @@ -184,7 +197,9 @@ public void run() { lastStatus = Status.of(StatusCode.CLIENT_INTERNAL_ERROR, ex, Issue.of(ex.getMessage(), Issue.Severity.ERROR)); } catch (InterruptedException ex) { - // stoppping + synchronized (lock) { + running = false; + } } } @@ -239,5 +254,8 @@ public void write(Random rnd, YqlQuery query, Instant lastMsgCreated) throws Int lastWrited = lastMsgCreated; } + private boolean isRunning() { + return running && thread.isAlive(); + } } } From 23d0ad3d4fd52ad5b76b2ce6cec78404a0bba589 Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Sun, 8 Jun 2025 14:01:05 +0100 Subject: [PATCH 2/2] Added lock-free thread safety for YqlWriter --- src/main/java/tech/ydb/app/YqlWriter.java | 60 ++++++++++++----------- 1 file changed, 31 insertions(+), 29 deletions(-) diff --git a/src/main/java/tech/ydb/app/YqlWriter.java b/src/main/java/tech/ydb/app/YqlWriter.java index 0b72339..a3e6266 100644 --- a/src/main/java/tech/ydb/app/YqlWriter.java +++ b/src/main/java/tech/ydb/app/YqlWriter.java @@ -8,6 +8,7 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; @@ -26,8 +27,6 @@ */ public class YqlWriter implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(YqlWriter.class); - private final Object lock = new Object(); - private volatile boolean running = false; private final YdbService ydb; private final int errorThreshold; @@ -40,6 +39,9 @@ public class YqlWriter implements AutoCloseable { private final AtomicLong lastPrinted = new AtomicLong(); private final AtomicLong writtenCount = new AtomicLong(); + private final AtomicBoolean isStarted = new AtomicBoolean(false); + private final AtomicBoolean isStoppped = new AtomicBoolean(false); + public YqlWriter(YdbService ydb, Supplier parser, XmlConfig.Cdc config) { this.ydb = ydb; this.errorThreshold = config.getErrorThreshold(); @@ -73,12 +75,29 @@ public Instant getLastReaded() { } public void start() { - lastPrinted.set(System.currentTimeMillis()); - writers.forEach(Writer::start); + if (isStoppped.get()) { + logger.error("writer is already stopped"); + return; + } + if (isStarted.compareAndExchange(false, true)) { + lastPrinted.set(System.currentTimeMillis()); + writers.forEach(Writer::start); + } else { + logger.warn("writer is already started"); + } } @Override public void close() { + if (!isStoppped.compareAndExchange(false, true)) { + logger.error("writer is already stopped"); + return; + } + + if (!isStarted.get()) { + return; + } + writers.forEach(Writer::stop); try { @@ -108,21 +127,13 @@ public Writer(CdcMsgParser parser, int batchSize, String threadName) { } public void start() { - synchronized (lock) { - if (isRunning()) return; - thread.start(); - running = true; - logger.info("writer {} started", thread.getName()); - } + thread.start(); + logger.info("writer {} started", thread.getName()); } public void stop() { - synchronized (lock) { - if (!isRunning()) return; - running = false; - thread.interrupt(); - logger.info("writer {} stopped", thread.getName()); - } + thread.interrupt(); + logger.info("writer {} stopped", thread.getName()); } public void join() throws InterruptedException { @@ -139,11 +150,8 @@ public void addMesssage(Message msg) { } lastReaded = msg.getWrittenAt(); } catch (InterruptedException ex) { - synchronized (lock) { - running = false; - Thread.currentThread().interrupt(); - logger.warn("worker thread was interrupted"); - } + Thread.currentThread().interrupt(); + logger.warn("worker thread was interrupted"); } } @@ -197,16 +205,14 @@ public void run() { lastStatus = Status.of(StatusCode.CLIENT_INTERNAL_ERROR, ex, Issue.of(ex.getMessage(), Issue.Severity.ERROR)); } catch (InterruptedException ex) { - synchronized (lock) { - running = false; - } + // stopping } } private void printDebugStats() { long now = System.currentTimeMillis(); long printedAt = lastPrinted.get(); - if (printedAt > 0L && (now - printedAt > 1000L) + if (printedAt > 0L && (now - printedAt > 1000L) && lastPrinted.compareAndSet(printedAt, now)) { long ms = now - printedAt; long written = writtenCount.getAndSet(0); @@ -253,9 +259,5 @@ public void write(Random rnd, YqlQuery query, Instant lastMsgCreated) throws Int query.clear(); lastWrited = lastMsgCreated; } - - private boolean isRunning() { - return running && thread.isAlive(); - } } }