Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
96e3337
add per job submit mode
zoudaokoulife Nov 16, 2018
1ef29b0
add per job submit mode
zoudaokoulife Nov 17, 2018
b4a2693
bugfix
Nov 17, 2018
c94f2bf
add per job submit mode
zoudaokoulife Nov 17, 2018
b70d90d
Merge remote-tracking branch 'origin/v1.5.0_dev' into v1.5.0_dev
zoudaokoulife Nov 17, 2018
e9c870a
modify readme doc
zoudaokoulife Nov 17, 2018
c3a92d2
exclude slf4j
zoudaokoulife Nov 17, 2018
53c8b20
support json
Nov 19, 2018
277bedc
update kafka source
zhihui-ge Nov 19, 2018
9a17a6e
Merge branch 'v1.5.0_dev' of github.com:DTStack/flinkStreamSQL into v…
Nov 19, 2018
b138bac
fix jar conflict
Nov 19, 2018
582ea99
Merge branch 'master' of github.com:DTStack/flinkStreamSQL into v1.5.…
Nov 20, 2018
35f8567
add init log info to hbase sink
zoudaokoulife Nov 20, 2018
5f29429
Merge branch 'v1.5.0_dev' of github.com:DTStack/flinkStreamSQL into v…
Nov 21, 2018
a425993
Merge pull request #47 from zhihui-ge/master
Nov 21, 2018
b55c733
Merge branch 'v1.5.0_dev' of github.com:DTStack/flinkStreamSQL into v…
Nov 21, 2018
0af546d
TableInfoParser remove sourceTableInfoMap static
Nov 22, 2018
d5cd251
Merge branch 'v1.5.0_dev' of https://github.com/DTStack/flinkStreamSQ…
Nov 22, 2018
0abed52
Merge branch 'v1.5.0_dev' of github.com:DTStack/flinkStreamSQL into v…
Nov 22, 2018
3b5298c
fix hbase thread bug
Nov 22, 2018
88f4287
support the pool configuration of redis
Nov 22, 2018
31c53e7
add metric
Nov 22, 2018
5da0748
modify HBaseClient init ThreadPoolExecutor pool size
zoudaokoulife Nov 22, 2018
d00f5b4
support the pool configuration of redis
Nov 22, 2018
63ba7d0
Merge branch 'v1.5.0_dev' of github.com:DTStack/flinkStreamSQL into v…
Nov 22, 2018
3d6f248
fix mysql output bug(type of float)
zoudaokoulife Nov 23, 2018
57c64d9
comment
Nov 23, 2018
545ec18
Merge branch 'v1.5.0_dev' of https://github.com/DTStack/flinkStreamSQ…
Nov 23, 2018
eb90a4b
add rdb module and add sqlserver side all request
todd5167 Nov 23, 2018
154cf4c
fix mysql output bug(type of float)
zoudaokoulife Nov 23, 2018
756b79c
Merge remote-tracking branch 'origin/v1.5.0_dev' into v1.5.0_dev
zoudaokoulife Nov 23, 2018
86af0ec
fix mysql output bug(type of BigDecimal)
zoudaokoulife Nov 23, 2018
3be4537
move side sign parser to absclass
zoudaokoulife Nov 26, 2018
a64d886
youhua
Nov 26, 2018
96c5617
create rdb module and extract common code
todd5167 Nov 26, 2018
251440b
sqlserver side depend on rdb
todd5167 Nov 26, 2018
742312a
sqlserver side depend on rdb module
todd5167 Nov 26, 2018
3cea722
pom.xml
Nov 26, 2018
34f28c5
mysql module depend on rdb
todd5167 Nov 27, 2018
e6035bf
Update README.md
yangsishu Nov 27, 2018
f9b6efd
fetch size
Nov 27, 2018
2c2ae4d
add metric
Nov 27, 2018
d865312
Merge branch 'v1.5.0_dev' of github.com:DTStack/flinkStreamSQL into v…
Nov 27, 2018
1bf0b00
abstract output metric
Nov 28, 2018
197d521
commnet
Nov 28, 2018
c1f29d2
Update README.md
yangsishu Nov 28, 2018
2a533b8
rename package
Nov 28, 2018
235d742
fix bugs
Nov 28, 2018
bf3d368
add netty
Nov 28, 2018
f696bfb
rename jarname
Nov 29, 2018
56f70ae
add elasticsearch metric
Nov 29, 2018
3832dca
add sqlserver result table
todd5167 Nov 29, 2018
25cc345
extract outputformat
todd5167 Nov 30, 2018
f487182
modify structure
Dec 1, 2018
e4f52cd
Merge branch 'master' of github.com:DTStack/flinkStreamSQL into v1.5.…
Dec 1, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 47 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
> > * 自定义create function 语法
> > * 实现了流与维表的join
> > * 支持原生FLinkSQL所有的语法
> > * 扩展了输入和输出的性能指标到promethus


# 已支持
* 源表:kafka 0.9,1.x版本
Expand All @@ -17,6 +19,7 @@
* 增加kafka结果表功能
* 增加SQL支持CEP
* 维表快照
* sql优化(谓词下移等)

## 1 快速起步
### 1.1 运行模式
Expand Down Expand Up @@ -55,8 +58,9 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
* **mode**
* 描述:执行模式,也就是flink集群的工作模式
* local: 本地模式
* standalone: 独立部署模式的flink集群
* yarn: yarn模式的flink集群
* standalone: 提交到独立部署模式的flink集群
* yarn: 提交到yarn模式的flink集群(即提交到已有flink集群)
* yarnPer: yarn per_job模式提交(即创建新flink application)
* 必选:否
* 默认值:local

Expand Down Expand Up @@ -101,6 +105,11 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
* sql.max.concurrent.checkpoints: 最大并发生成checkpoint数
* sql.checkpoint.cleanup.mode: 默认是不会将checkpoint存储到外部存储,[true(任务cancel之后会删除外部存储)|false(外部存储需要手动删除)]
* flinkCheckpointDataURI: 设置checkpoint的外部存储路径,根据实际的需求设定文件路径,hdfs://, file://
* jobmanager.memory.mb: per_job模式下指定jobmanager的内存大小(单位MB, 默认值:768)
* taskmanager.memory.mb: per_job模式下指定taskmanager的内存大小(单位MB, 默认值:768)
* taskmanager.num: per_job模式下指定taskmanager的实例数(默认1)
* taskmanager.slots:per_job模式下指定每个taskmanager对应的slot数量(默认1)
* [prometheus 相关参数](docs/prometheus.md) per_job可指定metric写入到外部监控组件,以prometheus pushgateway举例


* **flinkconf**
Expand All @@ -122,6 +131,16 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
* 描述:指示保存点是否允许非还原状态的标志
* 必选:否
* 默认值:false

* **flinkJarPath**
* 描述:per_job 模式提交需要指定本地的flink jar存放路径
* 必选:否
* 默认值:false

* **queue**
* 描述:per_job 模式下指定的yarn queue
* 必选:否
* 默认值:false

## 2 结构
### 2.1 源表插件
Expand All @@ -139,8 +158,33 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
* [mysql 维表插件](docs/mysqlSide.md)
* [mongo 维表插件](docs/mongoSide.md)
* [redis 维表插件](docs/redisSide.md)

## 3 性能指标(新增)

### kafka插件
* 业务延迟: flink_taskmanager_job_task_operator_dtEventDelay(单位s)
数据本身的时间和进入flink的当前时间的差值.

* 各个输入源的脏数据:flink_taskmanager_job_task_operator_dtDirtyData
从kafka获取的数据解析失败的视为脏数据

* 各Source的数据输入TPS: flink_taskmanager_job_task_operator_dtNumRecordsInRate
kafka接受的记录数(未解析前)/s

* 各Source的数据输入RPS: flink_taskmanager_job_task_operator_dtNumRecordsInResolveRate
kafka接受的记录数(解析后)/s

* 各Source的数据输入BPS: flink_taskmanager_job_task_operator_dtNumBytesInRate
kafka接受的字节数/s

* Kafka作为输入源的各个分区的延迟数: flink_taskmanager_job_task_operator_topic_partition_dtTopicPartitionLag
当前kafka10,kafka11有采集该指标

* 各个输出源RPS: flink_taskmanager_job_task_operator_dtNumRecordsOutRate
写入的外部记录数/s


## 3 样例
## 4 样例

```

Expand Down
33 changes: 33 additions & 0 deletions core/src/main/java/com/dtstack/flink/sql/enums/EDatabaseType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.dtstack.flink.sql.enums;

/**
* Database type
*
* Company: www.dtstack.com
* @author jiangbo
*/
public enum EDatabaseType {

MYSQL,
SQLSERVER,
ORACLE,

}
10 changes: 5 additions & 5 deletions core/src/main/java/com/dtstack/flink/sql/parser/SqlParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import com.dtstack.flink.sql.enums.ETableType;
import com.dtstack.flink.sql.table.TableInfo;
import com.dtstack.flink.sql.table.TableInfoParserFactory;
import com.dtstack.flink.sql.table.TableInfoParser;
import com.dtstack.flink.sql.util.DtStringUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.collect.Lists;
Expand Down Expand Up @@ -75,7 +75,7 @@ public static SqlTree parseSql(String sql) throws Exception {

List<String> sqlArr = DtStringUtil.splitIgnoreQuota(sql, SQL_DELIMITER);
SqlTree sqlTree = new SqlTree();

TableInfoParser tableInfoParser = new TableInfoParser();
for(String childSql : sqlArr){
if(Strings.isNullOrEmpty(childSql)){
continue;
Expand Down Expand Up @@ -112,7 +112,7 @@ public static SqlTree parseSql(String sql) throws Exception {
throw new RuntimeException("can't find table " + tableName);
}

TableInfo tableInfo = TableInfoParserFactory.parseWithTableType(ETableType.SOURCE.getType(),
TableInfo tableInfo = tableInfoParser.parseWithTableType(ETableType.SOURCE.getType(),
createTableResult, LOCAL_SQL_PLUGIN_ROOT);
sqlTree.addTableInfo(tableName, tableInfo);
}
Expand All @@ -125,7 +125,7 @@ public static SqlTree parseSql(String sql) throws Exception {
throw new RuntimeException("can't find table " + tableName);
}

TableInfo tableInfo = TableInfoParserFactory.parseWithTableType(ETableType.SINK.getType(),
TableInfo tableInfo = tableInfoParser.parseWithTableType(ETableType.SINK.getType(),
createTableResult, LOCAL_SQL_PLUGIN_ROOT);
sqlTree.addTableInfo(tableName, tableInfo);
}
Expand All @@ -141,7 +141,7 @@ public static SqlTree parseSql(String sql) throws Exception {
throw new RuntimeException("can't find table " + tableName);
}

TableInfo tableInfo = TableInfoParserFactory.parseWithTableType(ETableType.SOURCE.getType(),
TableInfo tableInfo = tableInfoParser.parseWithTableType(ETableType.SOURCE.getType(),
createTableResult, LOCAL_SQL_PLUGIN_ROOT);
sqlTree.addTableInfo(tableName, tableInfo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void exec(String sql, Map<String, SideTableInfo> sideTableMap, StreamTabl

}else if (pollObj instanceof JoinInfo){
preIsSideJoin = true;
jionFun(pollObj, localTableCache, sideTableMap, tableEnv, replaceInfoList);
joinFun(pollObj, localTableCache, sideTableMap, tableEnv, replaceInfoList);
}
}

Expand Down Expand Up @@ -545,11 +545,11 @@ public void registerTmpTable(CreateTmpTableParser.SqlParserResult result,

}else if (pollObj instanceof JoinInfo){
preIsSideJoin = true;
jionFun(pollObj, localTableCache, sideTableMap, tableEnv, replaceInfoList);
joinFun(pollObj, localTableCache, sideTableMap, tableEnv, replaceInfoList);
}
}
}
private void jionFun(Object pollObj, Map<String, Table> localTableCache,
private void joinFun(Object pollObj, Map<String, Table> localTableCache,
Map<String, SideTableInfo> sideTableMap, StreamTableEnvironment tableEnv,
List<FieldReplaceInfo> replaceInfoList) throws Exception{
JoinInfo joinInfo = (JoinInfo) pollObj;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.dtstack.flink.sql.sink;

import com.dtstack.flink.sql.metric.MetricConstant;
import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MeterView;

/**
* Created by sishu.yss on 2018/11/28.
*/
public abstract class MetricOutputFormat extends RichOutputFormat<Tuple2>{

protected transient Counter outRecords;

protected transient Meter outRecordsRate;

public void initMetric() {
outRecords = getRuntimeContext().getMetricGroup().counter(MetricConstant.DT_NUM_RECORDS_OUT);
outRecordsRate = getRuntimeContext().getMetricGroup().meter(MetricConstant.DT_NUM_RECORDS_OUT_RATE, new MeterView(outRecords, 20));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import com.dtstack.flink.sql.util.MathUtil;

import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* Reason:
Expand All @@ -35,6 +37,19 @@

public abstract class AbsSideTableParser extends AbsTableParser {

private final static String SIDE_SIGN_KEY = "sideSignKey";

private final static Pattern SIDE_TABLE_SIGN = Pattern.compile("(?i)^PERIOD\\s+FOR\\s+SYSTEM_TIME$");

static {
keyPatternMap.put(SIDE_SIGN_KEY, SIDE_TABLE_SIGN);
keyHandlerMap.put(SIDE_SIGN_KEY, AbsSideTableParser::dealSideSign);
}

private static void dealSideSign(Matcher matcher, TableInfo tableInfo){
//FIXME SIDE_TABLE_SIGN current just used as a sign for side table; and do nothing
}

//Analytical create table attributes ==> Get information cache
protected void parseCacheProp(SideTableInfo sideTableInfo, Map<String, Object> props){
if(props.containsKey(SideTableInfo.CACHE_KEY.toLowerCase())){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,22 @@
* @author xuchao
*/

public class TableInfoParserFactory {
public class TableInfoParser {

private final static String TYPE_KEY = "type";

private final static String SIDE_TABLE_SIGN = "(?i)^PERIOD\\s+FOR\\s+SYSTEM_TIME$";

private final static Pattern SIDE_PATTERN = Pattern.compile(SIDE_TABLE_SIGN);

private static Map<String, AbsTableParser> sourceTableInfoMap = Maps.newConcurrentMap();
private Map<String, AbsTableParser> sourceTableInfoMap = Maps.newConcurrentMap();

private static Map<String, AbsTableParser> targetTableInfoMap = Maps.newConcurrentMap();
private Map<String, AbsTableParser> targetTableInfoMap = Maps.newConcurrentMap();

private static Map<String, AbsTableParser> sideTableInfoMap = Maps.newConcurrentMap();
private Map<String, AbsTableParser> sideTableInfoMap = Maps.newConcurrentMap();

//Parsing loaded plugin
public static TableInfo parseWithTableType(int tableType, CreateTableParser.SqlParserResult parserResult,
public TableInfo parseWithTableType(int tableType, CreateTableParser.SqlParserResult parserResult,
String localPluginRoot) throws Exception {
AbsTableParser absTableParser = null;
Map<String, Object> props = parserResult.getPropMap();
Expand Down
18 changes: 18 additions & 0 deletions core/src/main/java/com/dtstack/flink/sql/util/DtStringUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
package com.dtstack.flink.sql.util;

import com.dtstack.flink.sql.enums.ColumnType;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import java.math.BigDecimal;
import java.util.ArrayList;
Expand All @@ -42,6 +44,9 @@ public class DtStringUtil {

private static final Pattern NO_VERSION_PATTERN = Pattern.compile("([a-zA-Z]+).*");

private static ObjectMapper objectMapper = new ObjectMapper();


/**
* Split the specified string delimiter --- ignored quotes delimiter
* @param str
Expand Down Expand Up @@ -207,4 +212,17 @@ public static String addJdbcParam(String dbUrl, Map<String, String> addParams, b

return preStr + "?" + sb.toString();
}

public static boolean isJosn(String str){
boolean flag = false;
if(StringUtils.isNotBlank(str)){
try {
objectMapper.readValue(str,Map.class);
flag = true;
} catch (Throwable e) {
flag=false;
}
}
return flag;
}
}
2 changes: 1 addition & 1 deletion docs/kafkaSource.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ CREATE TABLE tableName(
|bootstrapServers | kafka bootstrap-server 地址信息(多个用逗号隔开)|是||
|zookeeperQuorum | kafka zk地址信息(多个之间用逗号分隔)|是||
|topic | 需要读取的 topic 名称|是||
|offsetReset | 读取的topic 的offset初始位置[latest\|earliest]|否|latest|
|offsetReset | 读取的topic 的offset初始位置[latest\|earliest\|指定offset值({"0":12312,"1":12321,"2":12312},{"partition_no":offset_value})]|否|latest|
|parallelism | 并行度设置|否|1|

## 5.样例:
Expand Down
7 changes: 7 additions & 0 deletions docs/prometheus.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
## 使用 prometheus pushgateway 需要设置的 confProp 参数
* metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
* metrics.reporter.promgateway.host: prometheus pushgateway的地址
* metrics.reporter.promgateway.port:prometheus pushgateway的端口
* metrics.reporter.promgateway.jobName: 实例名称
* metrics.reporter.promgateway.randomJobNameSuffix: 是否在实例名称后面添加随机字符串(默认:true)
* metrics.reporter.promgateway.deleteOnShutdown: 是否在停止的时候删除数据(默认false)
3 changes: 1 addition & 2 deletions elasticsearch5/elasticsearch5-sink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<modelVersion>4.0.0</modelVersion>

<artifactId>sql.sink.elasticsearch5</artifactId>
<name>elasticsearch5-sink</name>
<name>elasticsearch-sink</name>

<dependencies>
<dependency>
Expand Down Expand Up @@ -48,7 +48,6 @@
<artifactSet>
<excludes>
<exclude>org.apache.logging.log4j:log4j-to-slf4j</exclude>
<exclude>org.slf4j:slf4j-api</exclude>
</excludes>
</artifactSet>
<filters>
Expand Down
Loading