Skip to content

Commit ac65b81

Browse files
author
dapeng
committed
rowkey的字段拼接形式修改
1 parent e04f99b commit ac65b81

File tree

4 files changed

+14
-18
lines changed

4 files changed

+14
-18
lines changed

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public class HbaseOutputFormat extends AbstractDtRichOutputFormat<Tuple2> {
5555

5656
private String host;
5757
private String zkParent;
58-
private String[] rowkey;
58+
private String rowkey;
5959
private String tableName;
6060
private String[] columnNames;
6161
private String updateMode;
@@ -164,25 +164,21 @@ private Put getPutByRow(Row record) {
164164
}
165165

166166
private String buildRowKey(Row record) {
167-
List<String> rowKeyValues = getRowKeyValues(record);
167+
String rowKeyValues = getRowKeyValues(record);
168168
// all rowkey not null
169-
if (rowKeyValues.size() != rowkey.length) {
169+
if (StringUtils.isBlank(rowKeyValues)) {
170170
LOG.error("row key value must not null,record is ..{}", record);
171171
outDirtyRecords.inc();
172172
return "";
173173
}
174-
return StringUtils.join(rowKeyValues, "-");
174+
return rowKeyValues;
175175
}
176176

177-
private List<String> getRowKeyValues(Row record) {
178-
List<String> rowKeyValues = Lists.newArrayList();
177+
private String getRowKeyValues(Row record) {
179178
Map<String, Object> row = rowConvertMap(record);
180-
for (String key : rowkey) {
181-
RowKeyBuilder rowKeyBuilder = new RowKeyBuilder();
182-
rowKeyBuilder.init(key);
183-
rowKeyValues.add(rowKeyBuilder.getRowKey(row));
184-
}
185-
return rowKeyValues;
179+
RowKeyBuilder rowKeyBuilder = new RowKeyBuilder();
180+
rowKeyBuilder.init(rowkey);
181+
return rowKeyBuilder.getRowKey(row);
186182
}
187183

188184
private Map<String, Object> rowConvertMap(Row record){
@@ -232,7 +228,7 @@ public HbaseOutputFormatBuilder setTable(String tableName) {
232228
return this;
233229
}
234230

235-
public HbaseOutputFormatBuilder setRowkey(String[] rowkey) {
231+
public HbaseOutputFormatBuilder setRowkey(String rowkey) {
236232
format.rowkey = rowkey;
237233
return this;
238234
}

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public class HbaseSink implements RetractStreamTableSink<Row>, IStreamSinkGener<
5151
protected String parent;
5252
protected String tableName;
5353
protected String updateMode;
54-
protected String[] rowkey;
54+
protected String rowkey;
5555

5656
public HbaseSink() {
5757
// TO DO NOTHING

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/table/HbaseSinkParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
6969
hbaseTableInfo.setHost((String) props.get(HBASE_ZOOKEEPER_QUORUM.toLowerCase()));
7070
hbaseTableInfo.setParent((String)props.get(ZOOKEEPER_PARENT.toLowerCase()));
7171
String rk = (String) props.get(HBASE_ROWKEY.toLowerCase());
72-
hbaseTableInfo.setRowkey(StringUtils.split(rk, ","));
72+
hbaseTableInfo.setRowkey(rk);
7373
String updateMode = (String) props.getOrDefault(UPDATE_KEY, EUpdateMode.APPEND.name());
7474
hbaseTableInfo.setUpdateMode(updateMode);
7575
return hbaseTableInfo;

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/table/HbaseTableInfo.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public class HbaseTableInfo extends AbstractTargetTableInfo {
4040

4141
private String parent;
4242

43-
private String[] rowkey;
43+
private String rowkey;
4444

4545
private Map<String, String> columnNameFamily;
4646

@@ -90,11 +90,11 @@ public void setParent(String parent) {
9090
this.parent = parent;
9191
}
9292

93-
public String[] getRowkey() {
93+
public String getRowkey() {
9494
return rowkey;
9595
}
9696

97-
public void setRowkey(String[] rowkey) {
97+
public void setRowkey(String rowkey) {
9898
this.rowkey = rowkey;
9999
}
100100

0 commit comments

Comments
 (0)