Skip to content

Commit 63f3ab1

Browse files
committed
fix spelling and kafka params map name
1 parent c27194e commit 63f3ab1

File tree

3 files changed

+13
-13
lines changed

3 files changed

+13
-13
lines changed

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ public static void registerUserDefinedFunction(SqlTree sqlTree, List<URL> jarUrl
266266
*/
267267
public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment env, StreamTableEnvironment tableEnv, String localSqlPluginPath,
268268
String remoteSqlPluginPath, String pluginLoadMode, Map<String, AbstractSideTableInfo> sideTableMap, Map<String, Table> registerTableCache) throws Exception {
269-
Set<URL> pluginClassPatshSets = Sets.newHashSet();
269+
Set<URL> pluginClassPathSets = Sets.newHashSet();
270270
WaterMarkerAssigner waterMarkerAssigner = new WaterMarkerAssigner();
271271
for (AbstractTableInfo tableInfo : sqlTree.getTableInfoMap().values()) {
272272

@@ -304,26 +304,26 @@ public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment
304304
registerTableCache.put(tableInfo.getName(), regTable);
305305

306306
URL sourceTablePathUrl = PluginUtil.buildSourceAndSinkPathByLoadMode(tableInfo.getType(), AbstractSourceTableInfo.SOURCE_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode);
307-
pluginClassPatshSets.add(sourceTablePathUrl);
307+
pluginClassPathSets.add(sourceTablePathUrl);
308308
} else if (tableInfo instanceof AbstractTargetTableInfo) {
309309

310310
TableSink tableSink = StreamSinkFactory.getTableSink((AbstractTargetTableInfo) tableInfo, localSqlPluginPath);
311311
TypeInformation[] flinkTypes = FunctionManager.transformTypes(tableInfo.getFieldClasses());
312312
tableEnv.registerTableSink(tableInfo.getName(), tableInfo.getFields(), flinkTypes, tableSink);
313313

314314
URL sinkTablePathUrl = PluginUtil.buildSourceAndSinkPathByLoadMode(tableInfo.getType(), AbstractTargetTableInfo.TARGET_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode);
315-
pluginClassPatshSets.add(sinkTablePathUrl);
315+
pluginClassPathSets.add(sinkTablePathUrl);
316316
} else if (tableInfo instanceof AbstractSideTableInfo) {
317317
String sideOperator = ECacheType.ALL.name().equals(((AbstractSideTableInfo) tableInfo).getCacheType()) ? "all" : "async";
318318
sideTableMap.put(tableInfo.getName(), (AbstractSideTableInfo) tableInfo);
319319

320320
URL sideTablePathUrl = PluginUtil.buildSidePathByLoadMode(tableInfo.getType(), sideOperator, AbstractSideTableInfo.TARGET_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode);
321-
pluginClassPatshSets.add(sideTablePathUrl);
321+
pluginClassPathSets.add(sideTablePathUrl);
322322
} else {
323323
throw new RuntimeException("not support table type:" + tableInfo.getType());
324324
}
325325
}
326-
return pluginClassPatshSets;
326+
return pluginClassPathSets;
327327
}
328328

329329
/**

kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/table/KafkaSinkTableInfo.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public class KafkaSinkTableInfo extends AbstractTargetTableInfo {
5858

5959
private String bootstrapServers;
6060

61-
public Map<String, String> kafkaParam = new HashMap<String, String>();
61+
public Map<String, String> kafkaParams = new HashMap<String, String>();
6262

6363
private String topic;
6464

@@ -73,15 +73,15 @@ public class KafkaSinkTableInfo extends AbstractTargetTableInfo {
7373
private String updateMode;
7474

7575
public void addKafkaParam(String key, String value) {
76-
kafkaParam.put(key, value);
76+
kafkaParams.put(key, value);
7777
}
7878

7979
public String getKafkaParam(String key) {
80-
return kafkaParam.get(key);
80+
return kafkaParams.get(key);
8181
}
8282

8383
public Set<String> getKafkaParamKeys() {
84-
return kafkaParam.keySet();
84+
return kafkaParams.keySet();
8585
}
8686

8787
public String getBootstrapServers() {

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceTableInfo.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public class KafkaSourceTableInfo extends AbstractSourceTableInfo {
6969

7070
private String fieldDelimiter;
7171

72-
public Map<String, String> kafkaParam = new HashMap<>();
72+
public Map<String, String> kafkaParams = new HashMap<>();
7373

7474

7575
public String getBootstrapServers() {
@@ -113,15 +113,15 @@ public void setTopicIsPattern(Boolean topicIsPattern) {
113113
}
114114

115115
public void addKafkaParam(Map<String, String> kafkaParam) {
116-
kafkaParam.putAll(kafkaParam);
116+
kafkaParams.putAll(kafkaParam);
117117
}
118118

119119
public String getKafkaParam(String key) {
120-
return kafkaParam.get(key);
120+
return kafkaParams.get(key);
121121
}
122122

123123
public Set<String> getKafkaParamKeys() {
124-
return kafkaParam.keySet();
124+
return kafkaParams.keySet();
125125
}
126126

127127
public String getSourceDataType() {

0 commit comments

Comments
 (0)