Skip to content

Commit 79c38d3

Browse files
committed
Merge branch 'hotfix_1.8_24468'
# Conflicts: # rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/AbstractRdbAllReqRow.java
2 parents e5a51e0 + 3993942 commit 79c38d3

File tree

78 files changed

+1641
-734
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

78 files changed

+1641
-734
lines changed

.gitlab-ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ build:
22
stage: test
33
script:
44
- mvn clean org.jacoco:jacoco-maven-plugin:0.7.8:prepare-agent package -Dmaven.test.failure.ignore=true -q
5-
- mvn sonar:sonar -Dsonar.projectKey="dt-insight-engine/flinkStreamSQL" -Dsonar.branch.name="v1.8.0_dev" -Dsonar.login=11974c5e9a29625efa09fdc3c3fdc031efb1aab1 -Dsonar.host.url=http://172.16.100.198:9000 -Dsonar.jdbc.url=jdbc:postgresql://172.16.100.198:5432/sonar -Dsonar.java.binaries=target/sonar
5+
- mvn sonar:sonar -Dsonar.projectKey="dt-insight-engine/flinkStreamSQL" -Dsonar.login=11974c5e9a29625efa09fdc3c3fdc031efb1aab1 -Dsonar.host.url=http://172.16.100.198:9000 -Dsonar.jdbc.url=jdbc:postgresql://172.16.100.198:5432/sonar -Dsonar.java.binaries=target/sonar
66
- sh ci/sonar_notify.sh
77
only:
88
- v1.8.0_dev

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws SQ
267267
String connInfo = "address:" + tableInfo.getAddress() + ";userName:" + tableInfo.getUserName()
268268
+ ",pwd:" + tableInfo.getPassword();
269269
LOG.warn("get conn fail, wait for 5 sec and try again, connInfo:" + connInfo);
270-
Thread.sleep(5 * 1000);
270+
Thread.sleep(LOAD_DATA_ERROR_SLEEP_TIME);
271271
} catch (InterruptedException e1) {
272272
LOG.error("", e1);
273273
}

core/pom.xml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -116,12 +116,6 @@
116116
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
117117
<version>${flink.version}</version>
118118
</dependency>
119-
120-
<dependency>
121-
<groupId>junit</groupId>
122-
<artifactId>junit</artifactId>
123-
<version>4.12</version>
124-
</dependency>
125119
<dependency>
126120
<groupId>com.aiweiergou</groupId>
127121
<artifactId>tools-logger</artifactId>

core/src/main/java/com/dtstack/flink/sql/environment/MyLocalStreamEnvironment.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -113,14 +113,13 @@ public JobExecutionResult execute(String jobName) throws Exception {
113113
LOG.info("Running job on local embedded Flink mini cluster");
114114
}
115115

116-
MiniCluster exec = new MiniCluster(configBuilder.build());
117-
try {
116+
try (MiniCluster exec = new MiniCluster(configBuilder.build());) {
118117
exec.start();
119-
return exec.executeJobBlocking(jobGraph);
120-
}
121-
finally {
118+
JobExecutionResult jobExecutionResult = exec.executeJobBlocking(jobGraph);
122119
transformations.clear();
123-
exec.closeAsync();
120+
return jobExecutionResult;
121+
} catch (Exception e) {
122+
throw new RuntimeException(e);
124123
}
125124
}
126125
}

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -282,9 +282,7 @@ public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment
282282

283283
RowTypeInfo typeInfo = new RowTypeInfo(adaptTable.getSchema().getFieldTypes(), adaptTable.getSchema().getFieldNames());
284284
DataStream adaptStream = tableEnv.toRetractStream(adaptTable, typeInfo)
285-
.map((Tuple2<Boolean, Row> f0) -> {
286-
return f0.f1;
287-
})
285+
.map((Tuple2<Boolean, Row> f0) -> f0.f1)
288286
.returns(typeInfo);
289287

290288
String fields = String.join(",", typeInfo.getFieldNames());

core/src/main/java/com/dtstack/flink/sql/format/SerializationMetricWrapper.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.flink.metrics.Meter;
2626
import org.apache.flink.metrics.MeterView;
2727
import org.apache.flink.table.runtime.types.CRow;
28-
import org.apache.flink.types.Row;
2928

3029

3130
/**

core/src/main/java/com/dtstack/flink/sql/option/OptionParser.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,12 @@
2626
import org.apache.commons.lang.StringUtils;
2727
import java.lang.reflect.InvocationTargetException;
2828
import java.lang.reflect.Field;
29+
import java.nio.charset.StandardCharsets;
2930
import java.util.List;
3031
import java.util.Map;
3132
import java.io.File;
32-
import java.io.FileInputStream;
3333
import java.net.URLEncoder;
34-
import java.util.stream.Stream;
3534

36-
import org.apache.commons.codec.Charsets;
3735
import org.apache.flink.util.FileUtils;
3836

3937

@@ -104,8 +102,8 @@ public List<String> getProgramExeArgList() throws Exception {
104102
continue;
105103
} else if (OPTION_SQL.equalsIgnoreCase(key)) {
106104
File file = new File(value.toString());
107-
String content = FileUtils.readFile(file, "UTF-8");
108-
value = URLEncoder.encode(content, Charsets.UTF_8.name());
105+
String content = FileUtils.readFile(file, StandardCharsets.UTF_8.name());
106+
value = URLEncoder.encode(content, StandardCharsets.UTF_8.name());
109107
}
110108
args.add("-" + key);
111109
args.add(value.toString());

core/src/main/java/com/dtstack/flink/sql/parser/CreateTmpTableParser.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,11 @@
2222

2323
import com.dtstack.flink.sql.util.DtStringUtil;
2424
import org.apache.calcite.config.Lex;
25-
import org.apache.calcite.sql.*;
25+
import org.apache.calcite.sql.SqlBasicCall;
26+
import org.apache.calcite.sql.SqlJoin;
27+
import org.apache.calcite.sql.SqlKind;
28+
import org.apache.calcite.sql.SqlNode;
29+
import org.apache.calcite.sql.SqlSelect;
2630
import org.apache.calcite.sql.parser.SqlParseException;
2731
import org.apache.calcite.sql.parser.SqlParser;
2832
import com.google.common.collect.Lists;

core/src/main/java/com/dtstack/flink/sql/parser/InsertSqlParser.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,14 @@
2121
package com.dtstack.flink.sql.parser;
2222

2323
import org.apache.calcite.config.Lex;
24-
import org.apache.calcite.sql.*;
24+
import org.apache.calcite.sql.SqlBasicCall;
25+
import org.apache.calcite.sql.SqlInsert;
26+
import org.apache.calcite.sql.SqlJoin;
27+
import org.apache.calcite.sql.SqlKind;
28+
import org.apache.calcite.sql.SqlMatchRecognize;
29+
import org.apache.calcite.sql.SqlNode;
30+
import org.apache.calcite.sql.SqlOrderBy;
31+
import org.apache.calcite.sql.SqlSelect;
2532
import org.apache.calcite.sql.parser.SqlParseException;
2633
import org.apache.calcite.sql.parser.SqlParser;
2734
import org.apache.commons.lang3.StringUtils;

core/src/main/java/com/dtstack/flink/sql/side/AbstractSideTableInfo.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,11 @@ public abstract class AbstractSideTableInfo extends AbstractTableInfo implements
5757

5858
public static final String ASYNC_REQ_POOL_KEY = "asyncPoolSize";
5959

60-
private String cacheType = "none";//None or LRU or ALL
60+
private String cacheType = "none";
6161

6262
private int cacheSize = 10000;
6363

64-
private long cacheTimeout = 60 * 1000;//
64+
private long cacheTimeout = 60_000L;
6565

6666
private int asyncCapacity=100;
6767

core/src/main/java/com/dtstack/flink/sql/side/BaseAllReqRow.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ public abstract class BaseAllReqRow extends RichFlatMapFunction<CRow, CRow> impl
4747

4848
private static final Logger LOG = LoggerFactory.getLogger(BaseAllReqRow.class);
4949

50+
public static final long LOAD_DATA_ERROR_SLEEP_TIME = 5_000L;
51+
5052
protected BaseSideInfo sideInfo;
5153

5254
private ScheduledExecutorService es;

core/src/main/java/com/dtstack/flink/sql/side/JoinInfo.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import com.google.common.base.Strings;
2626

2727
import java.io.Serializable;
28-
import java.util.Map;
2928

3029
/**
3130
* Join信息

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ public void exec(String sql, Map<String, AbstractSideTableInfo> sideTableMap, St
121121
SideSQLParser sideSQLParser = new SideSQLParser();
122122
sideSQLParser.setLocalTableCache(localTableCache);
123123
Queue<Object> exeQueue = sideSQLParser.getExeQueue(sql, sideTableMap.keySet());
124-
Object pollObj = null;
124+
Object pollObj;
125125

126126
//need clean
127127
boolean preIsSideJoin = false;

0 commit comments

Comments
 (0)