diff --git a/src/main/java/tech/ydb/app/YqlWriter.java b/src/main/java/tech/ydb/app/YqlWriter.java index 10cf293..0b72339 100644 --- a/src/main/java/tech/ydb/app/YqlWriter.java +++ b/src/main/java/tech/ydb/app/YqlWriter.java @@ -26,6 +26,8 @@ */ public class YqlWriter implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(YqlWriter.class); + private final Object lock = new Object(); + private volatile boolean running = false; private final YdbService ydb; private final int errorThreshold; @@ -106,13 +108,21 @@ public Writer(CdcMsgParser parser, int batchSize, String threadName) { } public void start() { - thread.start(); - logger.info("writer {} started", thread.getName()); + synchronized (lock) { + if (isRunning()) return; + thread.start(); + running = true; + logger.info("writer {} started", thread.getName()); + } } public void stop() { - thread.interrupt(); - logger.info("writer {} stopped", thread.getName()); + synchronized (lock) { + if (!isRunning()) return; + running = false; + thread.interrupt(); + logger.info("writer {} stopped", thread.getName()); + } } public void join() throws InterruptedException { @@ -129,8 +139,11 @@ public void addMesssage(Message msg) { } lastReaded = msg.getWrittenAt(); } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - logger.warn("worker thread was interrupted"); + synchronized (lock) { + running = false; + Thread.currentThread().interrupt(); + logger.warn("worker thread was interrupted"); + } } } @@ -184,7 +197,9 @@ public void run() { lastStatus = Status.of(StatusCode.CLIENT_INTERNAL_ERROR, ex, Issue.of(ex.getMessage(), Issue.Severity.ERROR)); } catch (InterruptedException ex) { - // stoppping + synchronized (lock) { + running = false; + } } } @@ -239,5 +254,8 @@ public void write(Random rnd, YqlQuery query, Instant lastMsgCreated) throws Int lastWrited = lastMsgCreated; } + private boolean isRunning() { + return running && thread.isAlive(); + } } }