Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 25 additions & 5 deletions src/main/java/tech/ydb/app/YqlWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

Expand Down Expand Up @@ -38,6 +39,9 @@ public class YqlWriter implements AutoCloseable {
private final AtomicLong lastPrinted = new AtomicLong();
private final AtomicLong writtenCount = new AtomicLong();

private final AtomicBoolean isStarted = new AtomicBoolean(false);
private final AtomicBoolean isStoppped = new AtomicBoolean(false);

public YqlWriter(YdbService ydb, Supplier<CdcMsgParser> parser, XmlConfig.Cdc config) {
this.ydb = ydb;
this.errorThreshold = config.getErrorThreshold();
Expand Down Expand Up @@ -71,12 +75,29 @@ public Instant getLastReaded() {
}

public void start() {
lastPrinted.set(System.currentTimeMillis());
writers.forEach(Writer::start);
if (isStoppped.get()) {
logger.error("writer is already stopped");
return;
}
if (isStarted.compareAndExchange(false, true)) {
lastPrinted.set(System.currentTimeMillis());
writers.forEach(Writer::start);
} else {
logger.warn("writer is already started");
}
}

@Override
public void close() {
if (!isStoppped.compareAndExchange(false, true)) {
logger.error("writer is already stopped");
return;
}

if (!isStarted.get()) {
return;
}

writers.forEach(Writer::stop);

try {
Expand Down Expand Up @@ -184,14 +205,14 @@ public void run() {
lastStatus = Status.of(StatusCode.CLIENT_INTERNAL_ERROR, ex,
Issue.of(ex.getMessage(), Issue.Severity.ERROR));
} catch (InterruptedException ex) {
// stoppping
// stopping
}
}

private void printDebugStats() {
long now = System.currentTimeMillis();
long printedAt = lastPrinted.get();
if (printedAt > 0L && (now - printedAt > 1000L)
if (printedAt > 0L && (now - printedAt > 1000L)
&& lastPrinted.compareAndSet(printedAt, now)) {
long ms = now - printedAt;
long written = writtenCount.getAndSet(0);
Expand Down Expand Up @@ -238,6 +259,5 @@ public void write(Random rnd, YqlQuery query, Instant lastMsgCreated) throws Int
query.clear();
lastWrited = lastMsgCreated;
}

}
}