Skip to content

Commit fe59047

Browse files
committed
Merge branch 'feat_1.8_rowkey' into 'v1.8.0_dev'
hbase 结果表支持 rowKey See merge request dt-insight-engine/flinkStreamSQL!15
2 parents f78e936 + ac65b81 commit fe59047

File tree

10 files changed

+427
-35
lines changed

10 files changed

+427
-35
lines changed
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
20+
21+
package com.dtstack.flink.sql.sink.hbase;
22+
23+
24+
import com.dtstack.flink.sql.sink.hbase.enums.EReplaceOpType;
25+
26+
import java.util.Map;
27+
28+
/**
29+
* Reason:
30+
* Date: 2018/8/23
31+
* Company: www.dtstack.com
32+
* @author xuchao
33+
*/
34+
35+
public abstract class AbstractReplaceOperator {
36+
37+
private EReplaceOpType opType;
38+
39+
public AbstractReplaceOperator(EReplaceOpType opType){
40+
this.opType = opType;
41+
}
42+
43+
public String doOperator(Map<String, Object> refData){
44+
String replaceStr = replaceStr(refData);
45+
return doFunc(replaceStr);
46+
}
47+
48+
public String replaceStr(Map<String, Object> refData){
49+
return "";
50+
}
51+
52+
/**
53+
* The processing function to provide custom
54+
* @param replaceStr
55+
* @return
56+
*/
57+
abstract String doFunc(String replaceStr);
58+
}

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java

Lines changed: 17 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.dtstack.flink.sql.enums.EUpdateMode;
2424
import com.dtstack.flink.sql.outputformat.AbstractDtRichOutputFormat;
2525
import com.google.common.collect.Lists;
26+
import com.google.common.collect.Maps;
2627
import org.apache.commons.lang3.StringUtils;
2728
import org.apache.flink.api.java.tuple.Tuple2;
2829
import org.apache.flink.configuration.Configuration;
@@ -40,7 +41,6 @@
4041
import org.slf4j.LoggerFactory;
4142

4243
import java.io.IOException;
43-
import java.text.SimpleDateFormat;
4444
import java.util.List;
4545
import java.util.Map;
4646
import java.util.Set;
@@ -55,7 +55,7 @@ public class HbaseOutputFormat extends AbstractDtRichOutputFormat<Tuple2> {
5555

5656
private String host;
5757
private String zkParent;
58-
private String[] rowkey;
58+
private String rowkey;
5959
private String tableName;
6060
private String[] columnNames;
6161
private String updateMode;
@@ -69,8 +69,6 @@ public class HbaseOutputFormat extends AbstractDtRichOutputFormat<Tuple2> {
6969
private transient Connection conn;
7070
private transient Table table;
7171

72-
public final SimpleDateFormat ROWKEY_DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss");
73-
7472
@Override
7573
public void configure(Configuration parameters) {
7674
LOG.warn("---configure---");
@@ -166,40 +164,29 @@ private Put getPutByRow(Row record) {
166164
}
167165

168166
private String buildRowKey(Row record) {
169-
List<String> rowKeyValues = getRowKeyValues(record);
167+
String rowKeyValues = getRowKeyValues(record);
170168
// all rowkey not null
171-
if (rowKeyValues.size() != rowkey.length) {
169+
if (StringUtils.isBlank(rowKeyValues)) {
172170
LOG.error("row key value must not null,record is ..{}", record);
173171
outDirtyRecords.inc();
174172
return "";
175173
}
176-
return StringUtils.join(rowKeyValues, "-");
174+
return rowKeyValues;
177175
}
178176

179-
private List<String> getRowKeyValues(Row record) {
180-
List<String> rowKeyValues = Lists.newArrayList();
181-
for (int i = 0; i < rowkey.length; ++i) {
182-
String colName = rowkey[i];
183-
int rowKeyIndex = 0;
184-
for (; rowKeyIndex < columnNames.length; ++rowKeyIndex) {
185-
if (columnNames[rowKeyIndex].equals(colName)) {
186-
break;
187-
}
188-
}
177+
private String getRowKeyValues(Row record) {
178+
Map<String, Object> row = rowConvertMap(record);
179+
RowKeyBuilder rowKeyBuilder = new RowKeyBuilder();
180+
rowKeyBuilder.init(rowkey);
181+
return rowKeyBuilder.getRowKey(row);
182+
}
189183

190-
if (rowKeyIndex != columnNames.length && record.getField(rowKeyIndex) != null) {
191-
Object field = record.getField(rowKeyIndex);
192-
if (field == null) {
193-
continue;
194-
} else if (field instanceof java.util.Date) {
195-
java.util.Date d = (java.util.Date) field;
196-
rowKeyValues.add(ROWKEY_DATE_FORMAT.format(d));
197-
} else {
198-
rowKeyValues.add(field.toString());
199-
}
200-
}
184+
private Map<String, Object> rowConvertMap(Row record){
185+
Map<String, Object> rowValue = Maps.newHashMap();
186+
for(int i = 0; i < columnNames.length; i++){
187+
rowValue.put(columnNames[i], record.getField(i));
201188
}
202-
return rowKeyValues;
189+
return rowValue;
203190
}
204191

205192
@Override
@@ -241,7 +228,7 @@ public HbaseOutputFormatBuilder setTable(String tableName) {
241228
return this;
242229
}
243230

244-
public HbaseOutputFormatBuilder setRowkey(String[] rowkey) {
231+
public HbaseOutputFormatBuilder setRowkey(String rowkey) {
245232
format.rowkey = rowkey;
246233
return this;
247234
}

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public class HbaseSink implements RetractStreamTableSink<Row>, IStreamSinkGener<
5151
protected String parent;
5252
protected String tableName;
5353
protected String updateMode;
54-
protected String[] rowkey;
54+
protected String rowkey;
5555

5656
public HbaseSink() {
5757
// TO DO NOTHING
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
20+
21+
package com.dtstack.flink.sql.sink.hbase;
22+
23+
import com.dtstack.flink.sql.sink.hbase.enums.EReplaceOpType;
24+
import com.dtstack.flink.sql.util.MD5Utils;
25+
26+
/**
27+
* Reason:
28+
* Date: 2018/8/23
29+
* Company: www.dtstack.com
30+
* @author xuchao
31+
*/
32+
33+
public class Md5ReplaceOperator extends AbstractReplaceOperator {
34+
35+
public Md5ReplaceOperator(EReplaceOpType opType) {
36+
super(opType);
37+
}
38+
39+
@Override
40+
String doFunc(String replaceStr) {
41+
return MD5Utils.getMD5String(replaceStr);
42+
}
43+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
20+
21+
package com.dtstack.flink.sql.sink.hbase;
22+
23+
24+
import com.dtstack.flink.sql.sink.hbase.enums.EReplaceType;
25+
26+
import java.io.Serializable;
27+
import java.util.List;
28+
29+
/**
30+
* Reason:
31+
* Date: 2018/8/23
32+
* Company: www.dtstack.com
33+
*
34+
* @author xuchao
35+
*/
36+
37+
public class ReplaceInfo implements Serializable {
38+
39+
private static final long serialVersionUID = 2058635242957737717L;
40+
41+
private EReplaceType type;
42+
43+
private String param;
44+
45+
private List<ReplaceInfo> subReplaceInfos;
46+
47+
public ReplaceInfo(EReplaceType type){
48+
this.type = type;
49+
}
50+
51+
public EReplaceType getType() {
52+
return type;
53+
}
54+
55+
public void setType(EReplaceType type) {
56+
this.type = type;
57+
}
58+
59+
public String getParam() {
60+
return param;
61+
}
62+
63+
public void setParam(String param) {
64+
this.param = param;
65+
}
66+
67+
public List<ReplaceInfo> getSubReplaceInfos() {
68+
return subReplaceInfos;
69+
}
70+
71+
public void setSubReplaceInfos(List<ReplaceInfo> subReplaceInfos) {
72+
this.subReplaceInfos = subReplaceInfos;
73+
}
74+
}

0 commit comments

Comments
 (0)