Skip to content

Commit 6ddfba9

Browse files
committed
Merge branch 'feat_1.8_oceanbase' into 'v1.8.0_dev'
oceanbase-sink插件和维表插件 See merge request dt-insight-engine/flinkStreamSQL!13
2 parents fe59047 + 8aacd7e commit 6ddfba9

File tree

22 files changed

+820
-27
lines changed

22 files changed

+820
-27
lines changed

core/pom.xml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -116,12 +116,6 @@
116116
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
117117
<version>${flink.version}</version>
118118
</dependency>
119-
120-
<dependency>
121-
<groupId>junit</groupId>
122-
<artifactId>junit</artifactId>
123-
<version>4.12</version>
124-
</dependency>
125119
<dependency>
126120
<groupId>com.aiweiergou</groupId>
127121
<artifactId>tools-logger</artifactId>

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -282,9 +282,7 @@ public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment
282282

283283
RowTypeInfo typeInfo = new RowTypeInfo(adaptTable.getSchema().getFieldTypes(), adaptTable.getSchema().getFieldNames());
284284
DataStream adaptStream = tableEnv.toRetractStream(adaptTable, typeInfo)
285-
.map((Tuple2<Boolean, Row> f0) -> {
286-
return f0.f1;
287-
})
285+
.map((Tuple2<Boolean, Row> f0) -> f0.f1)
288286
.returns(typeInfo);
289287

290288
String fields = String.join(",", typeInfo.getFieldNames());

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ public void exec(String sql, Map<String, AbstractSideTableInfo> sideTableMap, St
121121
SideSQLParser sideSQLParser = new SideSQLParser();
122122
sideSQLParser.setLocalTableCache(localTableCache);
123123
Queue<Object> exeQueue = sideSQLParser.getExeQueue(sql, sideTableMap.keySet());
124-
Object pollObj = null;
124+
Object pollObj;
125125

126126
//need clean
127127
boolean preIsSideJoin = false;

core/src/main/java/com/dtstack/flink/sql/watermarker/WaterMarkerAssigner.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,7 @@
3939
public class WaterMarkerAssigner {
4040

4141
public boolean checkNeedAssignWaterMarker(AbstractSourceTableInfo tableInfo){
42-
if(Strings.isNullOrEmpty(tableInfo.getEventTimeField())){
43-
return false;
44-
}
45-
46-
return true;
42+
return !Strings.isNullOrEmpty(tableInfo.getEventTimeField());
4743
}
4844

4945
public DataStream assignWaterMarker(DataStream<Row> dataStream, RowTypeInfo typeInfo, AbstractSourceTableInfo sourceTableInfo){

launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherMain.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import java.io.File;
4646
import java.io.IOException;
4747
import java.net.URLDecoder;
48+
import java.nio.charset.StandardCharsets;
4849
import java.util.LinkedList;
4950
import java.util.List;
5051
import java.util.Map;
@@ -63,8 +64,7 @@ public class LauncherMain {
6364

6465
private static String getLocalCoreJarPath(String localSqlRootJar) throws Exception {
6566
String jarPath = PluginUtil.getCoreJarFileName(localSqlRootJar, CORE_JAR);
66-
String corePath = localSqlRootJar + SP + jarPath;
67-
return corePath;
67+
return localSqlRootJar + SP + jarPath;
6868
}
6969

7070
public static void main(String[] args) throws Exception {
@@ -81,14 +81,14 @@ public static void main(String[] args) throws Exception {
8181
Properties confProperties = PluginUtil.jsonStrToObject(confProp, Properties.class);
8282

8383
if(mode.equals(ClusterMode.local.name())) {
84-
String[] localArgs = argList.toArray(new String[argList.size()]);
84+
String[] localArgs = argList.toArray(new String[0]);
8585
Main.main(localArgs);
8686
return;
8787
}
8888

8989
String pluginRoot = launcherOptions.getLocalSqlPluginPath();
9090
File jarFile = new File(getLocalCoreJarPath(pluginRoot));
91-
String[] remoteArgs = argList.toArray(new String[argList.size()]);
91+
String[] remoteArgs = argList.toArray(new String[0]);
9292
PackagedProgram program = new PackagedProgram(jarFile, Lists.newArrayList(), remoteArgs);
9393

9494
String savePointPath = confProperties.getProperty(ConfigConstrant.SAVE_POINT_PATH_KEY);
@@ -119,7 +119,6 @@ private static String[] parseJson(String[] args) throws IOException {
119119
list.add("-" + entry.getKey());
120120
list.add(entry.getValue().toString());
121121
}
122-
String[] array = list.toArray(new String[list.size()]);
123-
return array;
122+
return list.toArray(new String[0]);
124123
}
125124
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>sql.side.oceanbase</artifactId>
7+
<groupId>com.dtstack.flink</groupId>
8+
<version>1.0-SNAPSHOT</version>
9+
<relativePath>../pom.xml</relativePath>
10+
</parent>
11+
<modelVersion>4.0.0</modelVersion>
12+
13+
<artifactId>sql.side.all.oceanbase</artifactId>
14+
<name>oceanbase-all-side</name>
15+
<packaging>jar</packaging>
16+
<version>1.0-SNAPSHOT</version>
17+
18+
<properties>
19+
<sql.side.oceanbase.core.version>1.0-SNAPSHOT</sql.side.oceanbase.core.version>
20+
</properties>
21+
22+
<dependencies>
23+
<dependency>
24+
<groupId>com.dtstack.flink</groupId>
25+
<artifactId>sql.side.oceanbase.core</artifactId>
26+
<version>${sql.side.oceanbase.core.version}</version>
27+
</dependency>
28+
</dependencies>
29+
30+
<build>
31+
<plugins>
32+
<plugin>
33+
<groupId>org.apache.maven.plugins</groupId>
34+
<artifactId>maven-shade-plugin</artifactId>
35+
<version>1.4</version>
36+
<executions>
37+
<execution>
38+
<phase>package</phase>
39+
<goals>
40+
<goal>shade</goal>
41+
</goals>
42+
<configuration>
43+
<createDependencyReducedPom>false</createDependencyReducedPom>
44+
<artifactSet>
45+
<excludes>
46+
47+
</excludes>
48+
</artifactSet>
49+
<filters>
50+
<filter>
51+
<artifact>*:*</artifact>
52+
<excludes>
53+
<exclude>META-INF/*.SF</exclude>
54+
<exclude>META-INF/*.DSA</exclude>
55+
<exclude>META-INF/*.RSA</exclude>
56+
</excludes>
57+
</filter>
58+
</filters>
59+
</configuration>
60+
</execution>
61+
</executions>
62+
</plugin>
63+
64+
<plugin>
65+
<artifactId>maven-antrun-plugin</artifactId>
66+
<version>1.2</version>
67+
<executions>
68+
<execution>
69+
<id>copy-resources</id>
70+
<!-- here the phase you need -->
71+
<phase>package</phase>
72+
<goals>
73+
<goal>run</goal>
74+
</goals>
75+
<configuration>
76+
<tasks>
77+
<copy todir="${basedir}/../../../plugins/oceanbaseallside">
78+
<fileset dir="target/">
79+
<include name="${project.artifactId}-${project.version}.jar"/>
80+
</fileset>
81+
</copy>
82+
83+
<move file="${basedir}/../../../plugins/oceanbaseallside/${project.artifactId}-${project.version}.jar"
84+
tofile="${basedir}/../../../plugins/oceanbaseallside/${project.name}-${git.branch}.jar"/>
85+
</tasks>
86+
</configuration>
87+
</execution>
88+
</executions>
89+
</plugin>
90+
</plugins>
91+
</build>
92+
93+
94+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstatck.flink.sql.side.oceanbase;
20+
21+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
22+
import com.dtstack.flink.sql.side.FieldInfo;
23+
import com.dtstack.flink.sql.side.JoinInfo;
24+
import com.dtstack.flink.sql.side.rdb.all.AbstractRdbAllReqRow;
25+
import com.dtstack.flink.sql.util.DtStringUtil;
26+
import com.google.common.collect.Maps;
27+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
import java.sql.Connection;
32+
import java.sql.DriverManager;
33+
import java.util.List;
34+
import java.util.Map;
35+
36+
/**
37+
* @author : tiezhu
38+
* @date : 2020/3/26
39+
*/
40+
public class OceanbaseAllReqRow extends AbstractRdbAllReqRow {
41+
42+
private static final Logger LOG = LoggerFactory.getLogger(OceanbaseAllReqRow.class);
43+
44+
private static final String OCEAN_BASE_DRIVER = "com.mysql.jdbc.Driver";
45+
46+
public OceanbaseAllReqRow(RowTypeInfo rowTypeInfo,
47+
JoinInfo joinInfo,
48+
List<FieldInfo> outFieldInfoList,
49+
AbstractSideTableInfo sideTableInfo) {
50+
super(new OceanbaseAllSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
51+
}
52+
53+
@Override
54+
public Connection getConn(String dbUrl, String userName, String password) {
55+
try {
56+
Class.forName(OCEAN_BASE_DRIVER);
57+
Map<String, String> addParams = Maps.newHashMap();
58+
addParams.put("useCursorFetch", "true");
59+
String targetDbUrl = DtStringUtil.addJdbcParam(dbUrl, addParams, true);
60+
return DriverManager.getConnection(targetDbUrl, userName, password);
61+
} catch (Exception e) {
62+
LOG.error("oceanbase get connect error", e);
63+
throw new RuntimeException(e);
64+
}
65+
}
66+
67+
@Override
68+
public int getFetchSize() {
69+
return Integer.MIN_VALUE;
70+
}
71+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstatck.flink.sql.side.oceanbase;
20+
21+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
22+
import com.dtstack.flink.sql.side.FieldInfo;
23+
import com.dtstack.flink.sql.side.JoinInfo;
24+
import com.dtstack.flink.sql.side.rdb.all.RdbAllSideInfo;
25+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
26+
27+
import java.util.List;
28+
29+
/**
30+
* @author : tiezhu
31+
* @date : 2020/3/26
32+
*/
33+
public class OceanbaseAllSideInfo extends RdbAllSideInfo {
34+
public OceanbaseAllSideInfo(RowTypeInfo rowTypeInfo,
35+
JoinInfo joinInfo,
36+
List<FieldInfo> outFieldInfoList,
37+
AbstractSideTableInfo sideTableInfo) {
38+
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
39+
}
40+
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>sql.side.oceanbase</artifactId>
7+
<groupId>com.dtstack.flink</groupId>
8+
<version>1.0-SNAPSHOT</version>
9+
<relativePath>../pom.xml</relativePath>
10+
</parent>
11+
<modelVersion>4.0.0</modelVersion>
12+
13+
<artifactId>sql.side.async.oceanbase</artifactId>
14+
<name>oceanbase-async-side</name>
15+
<packaging>jar</packaging>
16+
<version>1.0-SNAPSHOT</version>
17+
18+
<properties>
19+
<sql.side.oceanbase.core.version>1.0-SNAPSHOT</sql.side.oceanbase.core.version>
20+
</properties>
21+
22+
<dependencies>
23+
<dependency>
24+
<groupId>com.dtstack.flink</groupId>
25+
<artifactId>sql.side.oceanbase.core</artifactId>
26+
<version>${sql.side.oceanbase.core.version}</version>
27+
</dependency>
28+
</dependencies>
29+
30+
<build>
31+
<plugins>
32+
<plugin>
33+
<groupId>org.apache.maven.plugins</groupId>
34+
<artifactId>maven-shade-plugin</artifactId>
35+
<version>1.4</version>
36+
<executions>
37+
<execution>
38+
<phase>package</phase>
39+
<goals>
40+
<goal>shade</goal>
41+
</goals>
42+
<configuration>
43+
<createDependencyReducedPom>false</createDependencyReducedPom>
44+
<artifactSet>
45+
<excludes>
46+
47+
</excludes>
48+
</artifactSet>
49+
<filters>
50+
<filter>
51+
<artifact>*:*</artifact>
52+
<excludes>
53+
<exclude>META-INF/*.SF</exclude>
54+
<exclude>META-INF/*.DSA</exclude>
55+
<exclude>META-INF/*.RSA</exclude>
56+
</excludes>
57+
</filter>
58+
</filters>
59+
</configuration>
60+
</execution>
61+
</executions>
62+
</plugin>
63+
64+
<plugin>
65+
<artifactId>maven-antrun-plugin</artifactId>
66+
<version>1.2</version>
67+
<executions>
68+
<execution>
69+
<id>copy-resources</id>
70+
<!-- here the phase you need -->
71+
<phase>package</phase>
72+
<goals>
73+
<goal>run</goal>
74+
</goals>
75+
<configuration>
76+
<tasks>
77+
<copy todir="${basedir}/../../../plugins/oceanbaseasyncside">
78+
<fileset dir="target/">
79+
<include name="${project.artifactId}-${project.version}.jar"/>
80+
</fileset>
81+
</copy>
82+
83+
<move file="${basedir}/../../../plugins/oceanbaseasyncside/${project.artifactId}-${project.version}.jar"
84+
tofile="${basedir}/../../../plugins/oceanbaseasyncside/${project.name}-${git.branch}.jar"/>
85+
</tasks>
86+
</configuration>
87+
</execution>
88+
</executions>
89+
</plugin>
90+
</plugins>
91+
</build>
92+
</project>

0 commit comments

Comments
 (0)