diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchExecutor.java b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchExecutor.java index 1e0288f..0e7da5e 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchExecutor.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchExecutor.java @@ -6,16 +6,10 @@ package org.apache.flink.connector.nebula.sink; -import com.vesoft.nebula.client.graph.data.ResultSet; import com.vesoft.nebula.client.graph.net.Session; import java.util.Map; -import org.apache.flink.connector.nebula.statement.EdgeExecutionOptions; import org.apache.flink.connector.nebula.statement.ExecutionOptions; -import org.apache.flink.connector.nebula.statement.VertexExecutionOptions; -import org.apache.flink.connector.nebula.utils.NebulaConstant; import org.apache.flink.connector.nebula.utils.VidTypeEnum; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public abstract class NebulaBatchExecutor { diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchOutputFormat.java b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchOutputFormat.java index 31737c3..9686fc6 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchOutputFormat.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchOutputFormat.java @@ -21,6 +21,10 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.flink.api.common.io.RichOutputFormat; import org.apache.flink.configuration.Configuration; @@ -28,6 +32,7 @@ import org.apache.flink.connector.nebula.connection.NebulaMetaConnectionProvider; import org.apache.flink.connector.nebula.statement.ExecutionOptions; import org.apache.flink.connector.nebula.utils.VidTypeEnum; +import org.apache.flink.runtime.util.ExecutorThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,6 +50,10 @@ public class NebulaBatchOutputFormat extends RichOutputFormat implements F private ExecutionOptions executionOptions; private List errorBuffer = new ArrayList<>(); + private transient ScheduledExecutorService scheduler; + private transient ScheduledFuture scheduledFuture; + private transient volatile boolean closed = false; + public NebulaBatchOutputFormat(NebulaGraphConnectionProvider graphProvider, NebulaMetaConnectionProvider metaProvider) { this.graphProvider = graphProvider; @@ -102,13 +111,28 @@ public void open(int i, int i1) throws IOException { executionOptions.getLabel()); nebulaBatchExecutor = new NebulaEdgeBatchExecutor(executionOptions, vidType, schema); } + // start the schedule task: submit the buffer records every batchInterval. + // If batchIntervalMs is 0, do not start the scheduler task. + if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatch() != 1) { + this.scheduler = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory( + "nebula-write-output-format")); + this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> { + synchronized (NebulaBatchOutputFormat.this) { + if (!closed) { + commit(); + } + } }, + executionOptions.getBatchIntervalMs(), + executionOptions.getBatchIntervalMs(), + TimeUnit.MILLISECONDS); + } } /** * write one record to buffer */ @Override - public final synchronized void writeRecord(T row) throws IOException { + public final synchronized void writeRecord(T row) { nebulaBatchExecutor.addToBatch(row); if (numPendingRow.incrementAndGet() >= executionOptions.getBatch()) { @@ -119,7 +143,7 @@ public final synchronized void writeRecord(T row) throws IOException { /** * commit batch insert statements */ - private synchronized void commit() throws IOException { + private synchronized void commit() { String errorExec = nebulaBatchExecutor.executeBatch(session); if (errorExec != null) { errorBuffer.add(errorExec); @@ -132,21 +156,28 @@ private synchronized void commit() throws IOException { * commit the batch write operator before release connection */ @Override - public final synchronized void close() throws IOException { - if (numPendingRow.get() > 0) { - commit(); - } - if (!errorBuffer.isEmpty()) { - LOG.error("insert error statements: {}", errorBuffer); - } - if (session != null) { - session.release(); - } - if (nebulaPool != null) { - nebulaPool.close(); - } - if (metaClient != null) { - metaClient.close(); + public final synchronized void close() { + if (!closed) { + closed = true; + if (scheduledFuture != null) { + scheduledFuture.cancel(false); + scheduler.shutdown(); + } + if (numPendingRow != null && numPendingRow.get() > 0) { + commit(); + } + if (!errorBuffer.isEmpty()) { + LOG.error("insert error statements: {}", errorBuffer); + } + if (session != null) { + session.release(); + } + if (nebulaPool != null) { + nebulaPool.close(); + } + if (metaClient != null) { + metaClient.close(); + } } } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaEdgeBatchExecutor.java b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaEdgeBatchExecutor.java index 9c98e70..afb278b 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaEdgeBatchExecutor.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaEdgeBatchExecutor.java @@ -47,6 +47,9 @@ void addToBatch(T record) { @Override String executeBatch(Session session) { + if (nebulaEdgeList.size() == 0) { + return null; + } NebulaEdges nebulaEdges = new NebulaEdges(executionOptions.getLabel(), executionOptions.getFields(), nebulaEdgeList, executionOptions.getPolicy(), executionOptions.getPolicy()); diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaRowEdgeOutputFormatConverter.java b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaRowEdgeOutputFormatConverter.java index b51e3f6..fd6fe92 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaRowEdgeOutputFormatConverter.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaRowEdgeOutputFormatConverter.java @@ -13,7 +13,6 @@ import java.util.List; import java.util.Map; import org.apache.flink.connector.nebula.statement.EdgeExecutionOptions; -import org.apache.flink.connector.nebula.utils.NebulaConstant; import org.apache.flink.connector.nebula.utils.NebulaEdge; import org.apache.flink.connector.nebula.utils.NebulaUtils; import org.apache.flink.connector.nebula.utils.PolicyEnum; diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaRowVertexOutputFormatConverter.java b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaRowVertexOutputFormatConverter.java index 1c2c019..bc03b50 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaRowVertexOutputFormatConverter.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaRowVertexOutputFormatConverter.java @@ -13,7 +13,6 @@ import java.util.List; import java.util.Map; import org.apache.flink.connector.nebula.statement.VertexExecutionOptions; -import org.apache.flink.connector.nebula.utils.NebulaConstant; import org.apache.flink.connector.nebula.utils.NebulaUtils; import org.apache.flink.connector.nebula.utils.NebulaVertex; import org.apache.flink.connector.nebula.utils.PolicyEnum; diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaSinkFunction.java b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaSinkFunction.java index 1104bce..acd6f11 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaSinkFunction.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaSinkFunction.java @@ -41,12 +41,12 @@ public void open(Configuration parameters) throws Exception { } @Override - public void close() throws Exception { + public void close() { outPutFormat.close(); } @Override - public void invoke(T value, Context context) throws Exception { + public void invoke(T value, Context context) { checkErrorAndRethrow(); outPutFormat.writeRecord(value); } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaVertexBatchExecutor.java b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaVertexBatchExecutor.java index 5e08e9a..5716290 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaVertexBatchExecutor.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaVertexBatchExecutor.java @@ -49,6 +49,9 @@ void addToBatch(T record) { @Override String executeBatch(Session session) { + if (nebulaVertexList.size() == 0) { + return null; + } NebulaVertices nebulaVertices = new NebulaVertices(executionOptions.getLabel(), executionOptions.getFields(), nebulaVertexList, executionOptions.getPolicy()); // generate the write ngql statement diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/statement/EdgeExecutionOptions.java b/connector/src/main/java/org.apache.flink/connector/nebula/statement/EdgeExecutionOptions.java index 8581ad6..6d56f4b 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/statement/EdgeExecutionOptions.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/statement/EdgeExecutionOptions.java @@ -6,6 +6,7 @@ package org.apache.flink.connector.nebula.statement; +import static org.apache.flink.connector.nebula.utils.NebulaConstant.DEFAULT_BATCH_INTERVAL_MS; import static org.apache.flink.connector.nebula.utils.NebulaConstant.DEFAULT_ROW_INFO_INDEX; import static org.apache.flink.connector.nebula.utils.NebulaConstant.DEFAULT_SCAN_LIMIT; import static org.apache.flink.connector.nebula.utils.NebulaConstant.DEFAULT_WRITE_BATCH; @@ -41,10 +42,10 @@ public class EdgeExecutionOptions extends ExecutionOptions { private EdgeExecutionOptions(String graphSpace, String executeStatement, List fields, List positions, boolean noColumn, int limit, long startTime, long endTime, long batch, PolicyEnum policy, - WriteModeEnum mode, - String edge, int srcIndex, int dstIndex, int rankIndex) { + WriteModeEnum mode, String edge, int srcIndex, int dstIndex, + int rankIndex, long batchIntervalMs) { super(graphSpace, executeStatement, fields, positions, noColumn, limit, startTime, - endTime, batch, policy, mode); + endTime, batch, policy, mode, batchIntervalMs); this.edge = edge; this.srcIndex = srcIndex; this.dstIndex = dstIndex; @@ -88,6 +89,7 @@ public static class ExecutionOptionBuilder { private long startTime = 0; private long endTime = Long.MAX_VALUE; private int batch = DEFAULT_WRITE_BATCH; + private long batchIntervalMs = DEFAULT_BATCH_INTERVAL_MS; private PolicyEnum policy = null; private WriteModeEnum mode = WriteModeEnum.INSERT; private int srcIndex = DEFAULT_ROW_INFO_INDEX; @@ -171,6 +173,11 @@ public ExecutionOptionBuilder setWriteMode(WriteModeEnum mode) { return this; } + public ExecutionOptionBuilder setBathIntervalMs(long batchIntervalMs) { + this.batchIntervalMs = batchIntervalMs; + return this; + } + public ExecutionOptions builder() { if (graphSpace == null || graphSpace.trim().isEmpty()) { throw new IllegalArgumentException("graph space can not be empty."); @@ -180,8 +187,7 @@ public ExecutionOptions builder() { } return new EdgeExecutionOptions(graphSpace, executeStatement, fields, positions, noColumn, limit, startTime, endTime, batch, policy, mode, edge, srcIndex, - dstIndex, - rankIndex); + dstIndex, rankIndex, batchIntervalMs); } } } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/statement/ExecutionOptions.java b/connector/src/main/java/org.apache.flink/connector/nebula/statement/ExecutionOptions.java index 6be81a0..b5dc1b3 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/statement/ExecutionOptions.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/statement/ExecutionOptions.java @@ -132,6 +132,11 @@ public abstract class ExecutionOptions implements Serializable { */ private WriteModeEnum writeMode; + /** + * interval between write submit + */ + private long batchIntervalMs; + protected ExecutionOptions(String graphSpace, String executeStatement, @@ -143,7 +148,8 @@ protected ExecutionOptions(String graphSpace, long endTime, long batch, PolicyEnum policy, - WriteModeEnum writeMode) { + WriteModeEnum writeMode, + long batchIntervalMs) { this.graphSpace = graphSpace; this.executeStatement = executeStatement; @@ -156,6 +162,7 @@ protected ExecutionOptions(String graphSpace, this.batch = batch; this.policy = policy; this.writeMode = writeMode; + this.batchIntervalMs = batchIntervalMs; } public String getGraphSpace() { @@ -206,6 +213,10 @@ public WriteModeEnum getWriteMode() { return writeMode; } + public long getBatchIntervalMs() { + return batchIntervalMs; + } + @Override public String toString() { return "ExecutionOptions{" @@ -220,6 +231,7 @@ public String toString() { + ", batch=" + batch + ", policy=" + policy + ", mode=" + writeMode + + ", batchIntervalMs=" + batchIntervalMs + '}'; } } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/statement/VertexExecutionOptions.java b/connector/src/main/java/org.apache.flink/connector/nebula/statement/VertexExecutionOptions.java index 683874f..a672b24 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/statement/VertexExecutionOptions.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/statement/VertexExecutionOptions.java @@ -6,6 +6,7 @@ package org.apache.flink.connector.nebula.statement; +import static org.apache.flink.connector.nebula.utils.NebulaConstant.DEFAULT_BATCH_INTERVAL_MS; import static org.apache.flink.connector.nebula.utils.NebulaConstant.DEFAULT_ROW_INFO_INDEX; import static org.apache.flink.connector.nebula.utils.NebulaConstant.DEFAULT_SCAN_LIMIT; import static org.apache.flink.connector.nebula.utils.NebulaConstant.DEFAULT_WRITE_BATCH; @@ -39,9 +40,10 @@ public VertexExecutionOptions(String graphSpace, PolicyEnum policy, WriteModeEnum mode, String tag, - int idIndex) { + int idIndex, + long batchIntervalMs) { super(graphSpace, executeStatement, fields, positions, noColumn, limit, startTime, - endTime, batch, policy, mode); + endTime, batch, policy, mode, batchIntervalMs); this.tag = tag; this.idIndex = idIndex; } @@ -71,6 +73,7 @@ public static class ExecutionOptionBuilder { private long startTime = 0; private long endTime = Long.MAX_VALUE; private int batch = DEFAULT_WRITE_BATCH; + private long batchIntervalMs = DEFAULT_BATCH_INTERVAL_MS; private PolicyEnum policy = null; private WriteModeEnum mode = WriteModeEnum.INSERT; private int idIndex = DEFAULT_ROW_INFO_INDEX; @@ -146,6 +149,11 @@ public ExecutionOptionBuilder setWriteMode(WriteModeEnum mode) { return this; } + public ExecutionOptionBuilder setBathIntervalMs(long batchIntervalMs) { + this.batchIntervalMs = batchIntervalMs; + return this; + } + public ExecutionOptions builder() { if (graphSpace == null || graphSpace.trim().isEmpty()) { throw new IllegalArgumentException("graph space can not be empty."); @@ -155,7 +163,7 @@ public ExecutionOptions builder() { } return new VertexExecutionOptions(graphSpace, executeStatement, fields, positions, noColumn, limit, startTime, endTime, batch, policy, mode, tag, - idIndex); + idIndex, batchIntervalMs); } } } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaConstant.java b/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaConstant.java index 17060bf..da4d1e3 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaConstant.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaConstant.java @@ -36,6 +36,7 @@ public class NebulaConstant { // default value for read & write public static final int DEFAULT_SCAN_LIMIT = 2000; public static final int DEFAULT_WRITE_BATCH = 2000; + public static final long DEFAULT_BATCH_INTERVAL_MS = 0; public static final int DEFAULT_ROW_INFO_INDEX = -1; // default value for connection