Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support time slot for batch write #33

Merged
merged 5 commits into from
Nov 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,10 @@

package org.apache.flink.connector.nebula.sink;

import com.vesoft.nebula.client.graph.data.ResultSet;
import com.vesoft.nebula.client.graph.net.Session;
import java.util.Map;
import org.apache.flink.connector.nebula.statement.EdgeExecutionOptions;
import org.apache.flink.connector.nebula.statement.ExecutionOptions;
import org.apache.flink.connector.nebula.statement.VertexExecutionOptions;
import org.apache.flink.connector.nebula.utils.NebulaConstant;
import org.apache.flink.connector.nebula.utils.VidTypeEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class NebulaBatchExecutor<T> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,18 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.nebula.connection.NebulaGraphConnectionProvider;
import org.apache.flink.connector.nebula.connection.NebulaMetaConnectionProvider;
import org.apache.flink.connector.nebula.statement.ExecutionOptions;
import org.apache.flink.connector.nebula.utils.VidTypeEnum;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -45,6 +50,10 @@ public class NebulaBatchOutputFormat<T> extends RichOutputFormat<T> implements F
private ExecutionOptions executionOptions;
private List<String> errorBuffer = new ArrayList<>();

private transient ScheduledExecutorService scheduler;
private transient ScheduledFuture<?> scheduledFuture;
private transient volatile boolean closed = false;

public NebulaBatchOutputFormat(NebulaGraphConnectionProvider graphProvider,
NebulaMetaConnectionProvider metaProvider) {
this.graphProvider = graphProvider;
Expand Down Expand Up @@ -102,13 +111,28 @@ public void open(int i, int i1) throws IOException {
executionOptions.getLabel());
nebulaBatchExecutor = new NebulaEdgeBatchExecutor(executionOptions, vidType, schema);
}
// start the schedule task: submit the buffer records every batchInterval.
// If batchIntervalMs is 0, do not start the scheduler task.
if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatch() != 1) {
this.scheduler = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory(
"nebula-write-output-format"));
this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
synchronized (NebulaBatchOutputFormat.this) {
if (!closed) {
commit();
}
} },
executionOptions.getBatchIntervalMs(),
executionOptions.getBatchIntervalMs(),
TimeUnit.MILLISECONDS);
}
}

/**
* write one record to buffer
*/
@Override
public final synchronized void writeRecord(T row) throws IOException {
public final synchronized void writeRecord(T row) {
nebulaBatchExecutor.addToBatch(row);

if (numPendingRow.incrementAndGet() >= executionOptions.getBatch()) {
Expand All @@ -119,7 +143,7 @@ public final synchronized void writeRecord(T row) throws IOException {
/**
* commit batch insert statements
*/
private synchronized void commit() throws IOException {
private synchronized void commit() {
String errorExec = nebulaBatchExecutor.executeBatch(session);
if (errorExec != null) {
errorBuffer.add(errorExec);
Expand All @@ -132,21 +156,28 @@ private synchronized void commit() throws IOException {
* commit the batch write operator before release connection
*/
@Override
public final synchronized void close() throws IOException {
if (numPendingRow.get() > 0) {
commit();
}
if (!errorBuffer.isEmpty()) {
LOG.error("insert error statements: {}", errorBuffer);
}
if (session != null) {
session.release();
}
if (nebulaPool != null) {
nebulaPool.close();
}
if (metaClient != null) {
metaClient.close();
public final synchronized void close() {
if (!closed) {
closed = true;
if (scheduledFuture != null) {
scheduledFuture.cancel(false);
scheduler.shutdown();
}
if (numPendingRow != null && numPendingRow.get() > 0) {
commit();
}
if (!errorBuffer.isEmpty()) {
LOG.error("insert error statements: {}", errorBuffer);
}
if (session != null) {
session.release();
}
if (nebulaPool != null) {
nebulaPool.close();
}
if (metaClient != null) {
metaClient.close();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ void addToBatch(T record) {

@Override
String executeBatch(Session session) {
if (nebulaEdgeList.size() == 0) {
return null;
}
NebulaEdges nebulaEdges = new NebulaEdges(executionOptions.getLabel(),
executionOptions.getFields(), nebulaEdgeList, executionOptions.getPolicy(),
executionOptions.getPolicy());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import java.util.List;
import java.util.Map;
import org.apache.flink.connector.nebula.statement.EdgeExecutionOptions;
import org.apache.flink.connector.nebula.utils.NebulaConstant;
import org.apache.flink.connector.nebula.utils.NebulaEdge;
import org.apache.flink.connector.nebula.utils.NebulaUtils;
import org.apache.flink.connector.nebula.utils.PolicyEnum;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import java.util.List;
import java.util.Map;
import org.apache.flink.connector.nebula.statement.VertexExecutionOptions;
import org.apache.flink.connector.nebula.utils.NebulaConstant;
import org.apache.flink.connector.nebula.utils.NebulaUtils;
import org.apache.flink.connector.nebula.utils.NebulaVertex;
import org.apache.flink.connector.nebula.utils.PolicyEnum;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ public void open(Configuration parameters) throws Exception {
}

@Override
public void close() throws Exception {
public void close() {
outPutFormat.close();
}

@Override
public void invoke(T value, Context context) throws Exception {
public void invoke(T value, Context context) {
checkErrorAndRethrow();
outPutFormat.writeRecord(value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ void addToBatch(T record) {

@Override
String executeBatch(Session session) {
if (nebulaVertexList.size() == 0) {
return null;
}
NebulaVertices nebulaVertices = new NebulaVertices(executionOptions.getLabel(),
executionOptions.getFields(), nebulaVertexList, executionOptions.getPolicy());
// generate the write ngql statement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

package org.apache.flink.connector.nebula.statement;

import static org.apache.flink.connector.nebula.utils.NebulaConstant.DEFAULT_BATCH_INTERVAL_MS;
import static org.apache.flink.connector.nebula.utils.NebulaConstant.DEFAULT_ROW_INFO_INDEX;
import static org.apache.flink.connector.nebula.utils.NebulaConstant.DEFAULT_SCAN_LIMIT;
import static org.apache.flink.connector.nebula.utils.NebulaConstant.DEFAULT_WRITE_BATCH;
Expand Down Expand Up @@ -41,10 +42,10 @@ public class EdgeExecutionOptions extends ExecutionOptions {
private EdgeExecutionOptions(String graphSpace, String executeStatement, List<String> fields,
List<Integer> positions, boolean noColumn, int limit,
long startTime, long endTime, long batch, PolicyEnum policy,
WriteModeEnum mode,
String edge, int srcIndex, int dstIndex, int rankIndex) {
WriteModeEnum mode, String edge, int srcIndex, int dstIndex,
int rankIndex, long batchIntervalMs) {
super(graphSpace, executeStatement, fields, positions, noColumn, limit, startTime,
endTime, batch, policy, mode);
endTime, batch, policy, mode, batchIntervalMs);
this.edge = edge;
this.srcIndex = srcIndex;
this.dstIndex = dstIndex;
Expand Down Expand Up @@ -88,6 +89,7 @@ public static class ExecutionOptionBuilder {
private long startTime = 0;
private long endTime = Long.MAX_VALUE;
private int batch = DEFAULT_WRITE_BATCH;
private long batchIntervalMs = DEFAULT_BATCH_INTERVAL_MS;
private PolicyEnum policy = null;
private WriteModeEnum mode = WriteModeEnum.INSERT;
private int srcIndex = DEFAULT_ROW_INFO_INDEX;
Expand Down Expand Up @@ -171,6 +173,11 @@ public ExecutionOptionBuilder setWriteMode(WriteModeEnum mode) {
return this;
}

public ExecutionOptionBuilder setBathIntervalMs(long batchIntervalMs) {
this.batchIntervalMs = batchIntervalMs;
return this;
}

public ExecutionOptions builder() {
if (graphSpace == null || graphSpace.trim().isEmpty()) {
throw new IllegalArgumentException("graph space can not be empty.");
Expand All @@ -180,8 +187,7 @@ public ExecutionOptions builder() {
}
return new EdgeExecutionOptions(graphSpace, executeStatement, fields, positions,
noColumn, limit, startTime, endTime, batch, policy, mode, edge, srcIndex,
dstIndex,
rankIndex);
dstIndex, rankIndex, batchIntervalMs);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ public abstract class ExecutionOptions implements Serializable {
*/
private WriteModeEnum writeMode;

/**
* interval between write submit
*/
private long batchIntervalMs;


protected ExecutionOptions(String graphSpace,
String executeStatement,
Expand All @@ -143,7 +148,8 @@ protected ExecutionOptions(String graphSpace,
long endTime,
long batch,
PolicyEnum policy,
WriteModeEnum writeMode) {
WriteModeEnum writeMode,
long batchIntervalMs) {
this.graphSpace = graphSpace;

this.executeStatement = executeStatement;
Expand All @@ -156,6 +162,7 @@ protected ExecutionOptions(String graphSpace,
this.batch = batch;
this.policy = policy;
this.writeMode = writeMode;
this.batchIntervalMs = batchIntervalMs;
}

public String getGraphSpace() {
Expand Down Expand Up @@ -206,6 +213,10 @@ public WriteModeEnum getWriteMode() {
return writeMode;
}

public long getBatchIntervalMs() {
return batchIntervalMs;
}

@Override
public String toString() {
return "ExecutionOptions{"
Expand All @@ -220,6 +231,7 @@ public String toString() {
+ ", batch=" + batch
+ ", policy=" + policy
+ ", mode=" + writeMode
+ ", batchIntervalMs=" + batchIntervalMs
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

package org.apache.flink.connector.nebula.statement;

import static org.apache.flink.connector.nebula.utils.NebulaConstant.DEFAULT_BATCH_INTERVAL_MS;
import static org.apache.flink.connector.nebula.utils.NebulaConstant.DEFAULT_ROW_INFO_INDEX;
import static org.apache.flink.connector.nebula.utils.NebulaConstant.DEFAULT_SCAN_LIMIT;
import static org.apache.flink.connector.nebula.utils.NebulaConstant.DEFAULT_WRITE_BATCH;
Expand Down Expand Up @@ -39,9 +40,10 @@ public VertexExecutionOptions(String graphSpace,
PolicyEnum policy,
WriteModeEnum mode,
String tag,
int idIndex) {
int idIndex,
long batchIntervalMs) {
super(graphSpace, executeStatement, fields, positions, noColumn, limit, startTime,
endTime, batch, policy, mode);
endTime, batch, policy, mode, batchIntervalMs);
this.tag = tag;
this.idIndex = idIndex;
}
Expand Down Expand Up @@ -71,6 +73,7 @@ public static class ExecutionOptionBuilder {
private long startTime = 0;
private long endTime = Long.MAX_VALUE;
private int batch = DEFAULT_WRITE_BATCH;
private long batchIntervalMs = DEFAULT_BATCH_INTERVAL_MS;
private PolicyEnum policy = null;
private WriteModeEnum mode = WriteModeEnum.INSERT;
private int idIndex = DEFAULT_ROW_INFO_INDEX;
Expand Down Expand Up @@ -146,6 +149,11 @@ public ExecutionOptionBuilder setWriteMode(WriteModeEnum mode) {
return this;
}

public ExecutionOptionBuilder setBathIntervalMs(long batchIntervalMs) {
this.batchIntervalMs = batchIntervalMs;
return this;
}

public ExecutionOptions builder() {
if (graphSpace == null || graphSpace.trim().isEmpty()) {
throw new IllegalArgumentException("graph space can not be empty.");
Expand All @@ -155,7 +163,7 @@ public ExecutionOptions builder() {
}
return new VertexExecutionOptions(graphSpace, executeStatement, fields,
positions, noColumn, limit, startTime, endTime, batch, policy, mode, tag,
idIndex);
idIndex, batchIntervalMs);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class NebulaConstant {
// default value for read & write
public static final int DEFAULT_SCAN_LIMIT = 2000;
public static final int DEFAULT_WRITE_BATCH = 2000;
public static final long DEFAULT_BATCH_INTERVAL_MS = 0;
public static final int DEFAULT_ROW_INFO_INDEX = -1;

// default value for connection
Expand Down