Skip to content

Commit e83f8c3

Browse files
committed
Merge branch 'v1.8.0_dev' into v1.9.0_dev
# Conflicts: # core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java # core/src/main/java/com/dtstack/flink/sql/format/SerializationMetricWrapper.java # hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllReqRow.java # kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerFlinkPartition.java # kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/serialization/CustomerKeyedSerializationSchema.java # kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducer09.java # kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducer09Factory.java # kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java # kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducer010Factory.java # kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducer011Factory.java # launcher/src/main/java/com/dtstack/flink/sql/launcher/ClusterClientFactory.java # mongo/mongo-side/mongo-all-side/src/main/java/com/dtstack/flink/sql/side/mongo/MongoAllReqRow.java # rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/AbstractRdbAllReqRow.java
2 parents 37844f4 + f063639 commit e83f8c3

File tree

62 files changed

+1411
-250
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

62 files changed

+1411
-250
lines changed

.gitlab-ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ build:
22
stage: test
33
script:
44
- mvn clean org.jacoco:jacoco-maven-plugin:0.7.8:prepare-agent package -Dmaven.test.failure.ignore=true -q
5-
- mvn sonar:sonar -Dsonar.projectKey="dt-insight-engine/flinkStreamSQL" -Dsonar.branch.name="v1.8.0_dev" -Dsonar.login=11974c5e9a29625efa09fdc3c3fdc031efb1aab1 -Dsonar.host.url=http://172.16.100.198:9000 -Dsonar.jdbc.url=jdbc:postgresql://172.16.100.198:5432/sonar -Dsonar.java.binaries=target/sonar
5+
- mvn sonar:sonar -Dsonar.projectKey="dt-insight-engine/flinkStreamSQL" -Dsonar.login=11974c5e9a29625efa09fdc3c3fdc031efb1aab1 -Dsonar.host.url=http://172.16.100.198:9000 -Dsonar.jdbc.url=jdbc:postgresql://172.16.100.198:5432/sonar -Dsonar.java.binaries=target/sonar
66
- sh ci/sonar_notify.sh
77
only:
88
- v1.8.0_dev

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws SQ
261261
String connInfo = "address:" + tableInfo.getAddress() + ";userName:" + tableInfo.getUserName()
262262
+ ",pwd:" + tableInfo.getPassword();
263263
LOG.warn("get conn fail, wait for 5 sec and try again, connInfo:" + connInfo);
264-
Thread.sleep(5 * 1000);
264+
Thread.sleep(LOAD_DATA_ERROR_SLEEP_TIME);
265265
} catch (InterruptedException e1) {
266266
LOG.error("", e1);
267267
}

core/pom.xml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -117,12 +117,6 @@
117117
<artifactId>joda-time</artifactId>
118118
<version>2.5</version>
119119
</dependency>
120-
121-
<dependency>
122-
<groupId>junit</groupId>
123-
<artifactId>junit</artifactId>
124-
<version>4.12</version>
125-
</dependency>
126120
<dependency>
127121
<groupId>com.aiweiergou</groupId>
128122
<artifactId>tools-logger</artifactId>

core/src/main/java/com/dtstack/flink/sql/environment/MyLocalStreamEnvironment.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -117,14 +117,13 @@ public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
117117
LOG.info("Running job on local embedded Flink mini cluster");
118118
}
119119

120-
MiniCluster exec = new MiniCluster(configBuilder.build());
121-
try {
120+
try (MiniCluster exec = new MiniCluster(configBuilder.build());) {
122121
exec.start();
123-
return exec.executeJobBlocking(jobGraph);
124-
}
125-
finally {
122+
JobExecutionResult jobExecutionResult = exec.executeJobBlocking(jobGraph);
126123
transformations.clear();
127-
exec.closeAsync();
124+
return jobExecutionResult;
125+
} catch (Exception e) {
126+
throw new RuntimeException(e);
128127
}
129128
}
130129
}

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -95,11 +95,8 @@ public class ExecuteProcessHelper {
9595

9696
public static ParamsInfo parseParams(String[] args) throws Exception {
9797
LOG.info("------------program params-------------------------");
98-
System.out.println("------------program params-------------------------");
9998
Arrays.stream(args).forEach(arg -> LOG.info("{}", arg));
100-
Arrays.stream(args).forEach(System.out::println);
10199
LOG.info("-------------------------------------------");
102-
System.out.println("----------------------------------------");
103100

104101
OptionParser optionParser = new OptionParser(args);
105102
Options options = optionParser.getOptions();
@@ -226,12 +223,10 @@ private static void sqlTranslation(String localSqlPluginPath,
226223
//sql-dimensional table contains the dimension table of execution
227224
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, null);
228225
} else {
229-
System.out.println("----------exec sql without dimension join-----------");
230-
System.out.println("----------real sql exec is--------------------------");
231-
System.out.println(result.getExecSql());
226+
LOG.info("----------exec sql without dimension join-----------");
227+
LOG.info("----------real sql exec is--------------------------\n{}", result.getExecSql());
232228
FlinkSQLExec.sqlUpdate(tableEnv, result.getExecSql());
233229
if (LOG.isInfoEnabled()) {
234-
System.out.println();
235230
LOG.info("exec sql: " + result.getExecSql());
236231
}
237232
}
@@ -270,7 +265,7 @@ public static void registerUserDefinedFunction(SqlTree sqlTree, List<URL> jarUrl
270265
*/
271266
public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment env, StreamTableEnvironment tableEnv, String localSqlPluginPath,
272267
String remoteSqlPluginPath, String pluginLoadMode, Map<String, AbstractSideTableInfo> sideTableMap, Map<String, Table> registerTableCache) throws Exception {
273-
Set<URL> pluginClassPatshSets = Sets.newHashSet();
268+
Set<URL> pluginClassPathSets = Sets.newHashSet();
274269
WaterMarkerAssigner waterMarkerAssigner = new WaterMarkerAssigner();
275270
for (AbstractTableInfo tableInfo : sqlTree.getTableInfoMap().values()) {
276271

@@ -304,26 +299,26 @@ public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment
304299
registerTableCache.put(tableInfo.getName(), regTable);
305300

306301
URL sourceTablePathUrl = PluginUtil.buildSourceAndSinkPathByLoadMode(tableInfo.getType(), AbstractSourceTableInfo.SOURCE_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode);
307-
pluginClassPatshSets.add(sourceTablePathUrl);
302+
pluginClassPathSets.add(sourceTablePathUrl);
308303
} else if (tableInfo instanceof AbstractTargetTableInfo) {
309304

310305
TableSink tableSink = StreamSinkFactory.getTableSink((AbstractTargetTableInfo) tableInfo, localSqlPluginPath);
311306
TypeInformation[] flinkTypes = FunctionManager.transformTypes(tableInfo.getFieldClasses());
312307
tableEnv.registerTableSink(tableInfo.getName(), tableInfo.getFields(), flinkTypes, tableSink);
313308

314309
URL sinkTablePathUrl = PluginUtil.buildSourceAndSinkPathByLoadMode(tableInfo.getType(), AbstractTargetTableInfo.TARGET_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode);
315-
pluginClassPatshSets.add(sinkTablePathUrl);
310+
pluginClassPathSets.add(sinkTablePathUrl);
316311
} else if (tableInfo instanceof AbstractSideTableInfo) {
317312
String sideOperator = ECacheType.ALL.name().equals(((AbstractSideTableInfo) tableInfo).getCacheType()) ? "all" : "async";
318313
sideTableMap.put(tableInfo.getName(), (AbstractSideTableInfo) tableInfo);
319314

320315
URL sideTablePathUrl = PluginUtil.buildSidePathByLoadMode(tableInfo.getType(), sideOperator, AbstractSideTableInfo.TARGET_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode);
321-
pluginClassPatshSets.add(sideTablePathUrl);
316+
pluginClassPathSets.add(sideTablePathUrl);
322317
} else {
323318
throw new RuntimeException("not support table type:" + tableInfo.getType());
324319
}
325320
}
326-
return pluginClassPatshSets;
321+
return pluginClassPathSets;
327322
}
328323

329324
/**

core/src/main/java/com/dtstack/flink/sql/option/OptionParser.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,12 @@
2626
import org.apache.commons.lang.StringUtils;
2727
import java.lang.reflect.InvocationTargetException;
2828
import java.lang.reflect.Field;
29+
import java.nio.charset.StandardCharsets;
2930
import java.util.List;
3031
import java.util.Map;
3132
import java.io.File;
32-
import java.io.FileInputStream;
3333
import java.net.URLEncoder;
34-
import java.util.stream.Stream;
3534

36-
import org.apache.commons.codec.Charsets;
3735
import org.apache.flink.util.FileUtils;
3836

3937

@@ -104,8 +102,8 @@ public List<String> getProgramExeArgList() throws Exception {
104102
continue;
105103
} else if (OPTION_SQL.equalsIgnoreCase(key)) {
106104
File file = new File(value.toString());
107-
String content = FileUtils.readFile(file, "UTF-8");
108-
value = URLEncoder.encode(content, Charsets.UTF_8.name());
105+
String content = FileUtils.readFile(file, StandardCharsets.UTF_8.name());
106+
value = URLEncoder.encode(content, StandardCharsets.UTF_8.name());
109107
}
110108
args.add("-" + key);
111109
args.add(value.toString());

core/src/main/java/com/dtstack/flink/sql/parser/CreateTmpTableParser.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,11 @@
2222

2323
import com.dtstack.flink.sql.util.DtStringUtil;
2424
import org.apache.calcite.config.Lex;
25-
import org.apache.calcite.sql.*;
25+
import org.apache.calcite.sql.SqlBasicCall;
26+
import org.apache.calcite.sql.SqlJoin;
27+
import org.apache.calcite.sql.SqlKind;
28+
import org.apache.calcite.sql.SqlNode;
29+
import org.apache.calcite.sql.SqlSelect;
2630
import org.apache.calcite.sql.parser.SqlParseException;
2731
import org.apache.calcite.sql.parser.SqlParser;
2832
import com.google.common.collect.Lists;

core/src/main/java/com/dtstack/flink/sql/parser/InsertSqlParser.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,14 @@
2121
package com.dtstack.flink.sql.parser;
2222

2323
import org.apache.calcite.config.Lex;
24-
import org.apache.calcite.sql.*;
24+
import org.apache.calcite.sql.SqlBasicCall;
25+
import org.apache.calcite.sql.SqlInsert;
26+
import org.apache.calcite.sql.SqlJoin;
27+
import org.apache.calcite.sql.SqlKind;
28+
import org.apache.calcite.sql.SqlMatchRecognize;
29+
import org.apache.calcite.sql.SqlNode;
30+
import org.apache.calcite.sql.SqlOrderBy;
31+
import org.apache.calcite.sql.SqlSelect;
2532
import org.apache.calcite.sql.parser.SqlParseException;
2633
import org.apache.calcite.sql.parser.SqlParser;
2734
import org.apache.commons.lang3.StringUtils;

core/src/main/java/com/dtstack/flink/sql/side/AbstractSideTableInfo.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,11 @@ public abstract class AbstractSideTableInfo extends AbstractTableInfo implements
5757

5858
public static final String ASYNC_REQ_POOL_KEY = "asyncPoolSize";
5959

60-
private String cacheType = "none";//None or LRU or ALL
60+
private String cacheType = "none";
6161

6262
private int cacheSize = 10000;
6363

64-
private long cacheTimeout = 60 * 1000;//
64+
private long cacheTimeout = 60_000L;
6565

6666
private int asyncCapacity=100;
6767

core/src/main/java/com/dtstack/flink/sql/side/BaseAllReqRow.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ public abstract class BaseAllReqRow extends RichFlatMapFunction<Tuple2<Boolean,R
5050

5151
private static final Logger LOG = LoggerFactory.getLogger(BaseAllReqRow.class);
5252

53+
public static final long LOAD_DATA_ERROR_SLEEP_TIME = 5_000L;
54+
5355
protected BaseSideInfo sideInfo;
5456

5557
private ScheduledExecutorService es;

core/src/main/java/com/dtstack/flink/sql/side/JoinInfo.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import com.google.common.base.Strings;
2626

2727
import java.io.Serializable;
28-
import java.util.Map;
2928

3029
/**
3130
* Join信息

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ public void exec(String sql, Map<String, AbstractSideTableInfo> sideTableMap, St
120120
SideSQLParser sideSQLParser = new SideSQLParser();
121121
sideSQLParser.setLocalTableCache(localTableCache);
122122
Queue<Object> exeQueue = sideSQLParser.getExeQueue(sql, sideTableMap.keySet());
123-
Object pollObj = null;
123+
Object pollObj;
124124

125125
//need clean
126126
boolean preIsSideJoin = false;

0 commit comments

Comments
 (0)