From 38055a0f8959c9df42b9088af600cacd4b5ff547 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=90=D0=BD=D1=82=D0=BE=D0=BD=D0=BE=D0=B2=20=D0=9A=D0=BE?= =?UTF-8?q?=D0=BD=D1=81=D1=82=D0=B0=D0=BD=D1=82=D0=B8=D0=BD=20=D0=9E=D0=BB?= =?UTF-8?q?=D0=B5=D0=B3=D0=BE=D0=B2=D0=B8=D1=87?= Date: Wed, 2 Apr 2025 18:04:28 +0300 Subject: [PATCH] Added YqlWriter thread synchronization --- src/main/java/tech/ydb/app/YqlWriter.java | 32 ++++++++++++++++++----- 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/src/main/java/tech/ydb/app/YqlWriter.java b/src/main/java/tech/ydb/app/YqlWriter.java index 10cf293..0b72339 100644 --- a/src/main/java/tech/ydb/app/YqlWriter.java +++ b/src/main/java/tech/ydb/app/YqlWriter.java @@ -26,6 +26,8 @@ */ public class YqlWriter implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(YqlWriter.class); + private final Object lock = new Object(); + private volatile boolean running = false; private final YdbService ydb; private final int errorThreshold; @@ -106,13 +108,21 @@ public Writer(CdcMsgParser parser, int batchSize, String threadName) { } public void start() { - thread.start(); - logger.info("writer {} started", thread.getName()); + synchronized (lock) { + if (isRunning()) return; + thread.start(); + running = true; + logger.info("writer {} started", thread.getName()); + } } public void stop() { - thread.interrupt(); - logger.info("writer {} stopped", thread.getName()); + synchronized (lock) { + if (!isRunning()) return; + running = false; + thread.interrupt(); + logger.info("writer {} stopped", thread.getName()); + } } public void join() throws InterruptedException { @@ -129,8 +139,11 @@ public void addMesssage(Message msg) { } lastReaded = msg.getWrittenAt(); } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - logger.warn("worker thread was interrupted"); + synchronized (lock) { + running = false; + Thread.currentThread().interrupt(); + logger.warn("worker thread was interrupted"); + } } } @@ -184,7 +197,9 @@ public void run() { lastStatus = Status.of(StatusCode.CLIENT_INTERNAL_ERROR, ex, Issue.of(ex.getMessage(), Issue.Severity.ERROR)); } catch (InterruptedException ex) { - // stoppping + synchronized (lock) { + running = false; + } } } @@ -239,5 +254,8 @@ public void write(Random rnd, YqlQuery query, Instant lastMsgCreated) throws Int lastWrited = lastMsgCreated; } + private boolean isRunning() { + return running && thread.isAlive(); + } } }