diff --git a/src/main/java/tech/ydb/app/YqlWriter.java b/src/main/java/tech/ydb/app/YqlWriter.java index 10cf293..a3e6266 100644 --- a/src/main/java/tech/ydb/app/YqlWriter.java +++ b/src/main/java/tech/ydb/app/YqlWriter.java @@ -8,6 +8,7 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; @@ -38,6 +39,9 @@ public class YqlWriter implements AutoCloseable { private final AtomicLong lastPrinted = new AtomicLong(); private final AtomicLong writtenCount = new AtomicLong(); + private final AtomicBoolean isStarted = new AtomicBoolean(false); + private final AtomicBoolean isStoppped = new AtomicBoolean(false); + public YqlWriter(YdbService ydb, Supplier parser, XmlConfig.Cdc config) { this.ydb = ydb; this.errorThreshold = config.getErrorThreshold(); @@ -71,12 +75,29 @@ public Instant getLastReaded() { } public void start() { - lastPrinted.set(System.currentTimeMillis()); - writers.forEach(Writer::start); + if (isStoppped.get()) { + logger.error("writer is already stopped"); + return; + } + if (isStarted.compareAndExchange(false, true)) { + lastPrinted.set(System.currentTimeMillis()); + writers.forEach(Writer::start); + } else { + logger.warn("writer is already started"); + } } @Override public void close() { + if (!isStoppped.compareAndExchange(false, true)) { + logger.error("writer is already stopped"); + return; + } + + if (!isStarted.get()) { + return; + } + writers.forEach(Writer::stop); try { @@ -184,14 +205,14 @@ public void run() { lastStatus = Status.of(StatusCode.CLIENT_INTERNAL_ERROR, ex, Issue.of(ex.getMessage(), Issue.Severity.ERROR)); } catch (InterruptedException ex) { - // stoppping + // stopping } } private void printDebugStats() { long now = System.currentTimeMillis(); long printedAt = lastPrinted.get(); - if (printedAt > 0L && (now - printedAt > 1000L) + if (printedAt > 0L && (now - printedAt > 1000L) && lastPrinted.compareAndSet(printedAt, now)) { long ms = now - printedAt; long written = writtenCount.getAndSet(0); @@ -238,6 +259,5 @@ public void write(Random rnd, YqlQuery query, Instant lastMsgCreated) throws Int query.clear(); lastWrited = lastMsgCreated; } - } }