Skip to content

Commit c27194e

Browse files
committed
code opt
1 parent 304f180 commit c27194e

File tree

29 files changed

+62
-72
lines changed

29 files changed

+62
-72
lines changed

core/src/main/java/com/dtstack/flink/sql/format/SerializationMetricWrapper.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.flink.metrics.Meter;
2626
import org.apache.flink.metrics.MeterView;
2727
import org.apache.flink.table.runtime.types.CRow;
28-
import org.apache.flink.types.Row;
2928

3029

3130
/**

core/src/main/java/com/dtstack/flink/sql/option/OptionParser.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,7 @@
2929
import java.util.List;
3030
import java.util.Map;
3131
import java.io.File;
32-
import java.io.FileInputStream;
3332
import java.net.URLEncoder;
34-
import java.util.stream.Stream;
3533

3634
import org.apache.commons.codec.Charsets;
3735
import org.apache.flink.util.FileUtils;

core/src/main/java/com/dtstack/flink/sql/parser/CreateTmpTableParser.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,11 @@
2222

2323
import com.dtstack.flink.sql.util.DtStringUtil;
2424
import org.apache.calcite.config.Lex;
25-
import org.apache.calcite.sql.*;
25+
import org.apache.calcite.sql.SqlBasicCall;
26+
import org.apache.calcite.sql.SqlJoin;
27+
import org.apache.calcite.sql.SqlKind;
28+
import org.apache.calcite.sql.SqlNode;
29+
import org.apache.calcite.sql.SqlSelect;
2630
import org.apache.calcite.sql.parser.SqlParseException;
2731
import org.apache.calcite.sql.parser.SqlParser;
2832
import com.google.common.collect.Lists;

core/src/main/java/com/dtstack/flink/sql/parser/InsertSqlParser.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,14 @@
2121
package com.dtstack.flink.sql.parser;
2222

2323
import org.apache.calcite.config.Lex;
24-
import org.apache.calcite.sql.*;
24+
import org.apache.calcite.sql.SqlBasicCall;
25+
import org.apache.calcite.sql.SqlInsert;
26+
import org.apache.calcite.sql.SqlJoin;
27+
import org.apache.calcite.sql.SqlKind;
28+
import org.apache.calcite.sql.SqlMatchRecognize;
29+
import org.apache.calcite.sql.SqlNode;
30+
import org.apache.calcite.sql.SqlOrderBy;
31+
import org.apache.calcite.sql.SqlSelect;
2532
import org.apache.calcite.sql.parser.SqlParseException;
2633
import org.apache.calcite.sql.parser.SqlParser;
2734
import org.apache.commons.lang3.StringUtils;

core/src/main/java/com/dtstack/flink/sql/side/JoinInfo.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import com.google.common.base.Strings;
2626

2727
import java.io.Serializable;
28-
import java.util.Map;
2928

3029
/**
3130
* Join信息

core/src/main/java/com/dtstack/flink/sql/util/DateUtil.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,21 @@
2323
import java.sql.Timestamp;
2424
import java.text.ParseException;
2525
import java.text.SimpleDateFormat;
26-
import java.time.*;
26+
27+
import java.time.Instant;
28+
import java.time.LocalDate;
29+
import java.time.LocalDateTime;
30+
import java.time.LocalTime;
31+
import java.time.ZoneId;
32+
import java.time.ZoneOffset;
2733
import java.time.format.DateTimeFormatter;
2834
import java.time.format.DateTimeParseException;
29-
import java.util.*;
35+
36+
import java.util.Calendar;
37+
import java.util.Date;
38+
import java.util.Locale;
39+
import java.util.SimpleTimeZone;
40+
import java.util.TimeZone;
3041
import java.util.regex.Pattern;
3142

3243
import static java.time.format.DateTimeFormatter.ISO_INSTANT;
@@ -756,7 +767,8 @@ public static java.sql.Timestamp columnToTimestamp(Object column) {
756767
if (column == null) {
757768
return null;
758769
} else if(column instanceof String) {
759-
return null == stringToDate((String) column) ? null : new java.sql.Timestamp(stringToDate((String) column).getTime());
770+
Date date = stringToDate((String) column);
771+
return null == date ? null : new java.sql.Timestamp(date.getTime());
760772
} else if (column instanceof Integer) {
761773
Integer rawData = (Integer) column;
762774
return new java.sql.Timestamp(rawData.longValue());
@@ -790,7 +802,8 @@ public static Timestamp getTimestampFromStr(String timeStr) {
790802
Instant instant = Instant.from(ISO_INSTANT.parse(timeStr));
791803
return new Timestamp(instant.getEpochSecond() * MILLIS_PER_SECOND);
792804
}
793-
return null == stringToDate(timeStr) ? null : new Timestamp(stringToDate(timeStr).getTime());
805+
Date date = stringToDate(timeStr);
806+
return null == date ? null : new Timestamp(date.getTime());
794807
}
795808

796809
public static java.sql.Date getDateFromStr(String dateStr) {
@@ -802,7 +815,8 @@ public static java.sql.Date getDateFromStr(String dateStr) {
802815
Instant instant = Instant.from(ISO_INSTANT.parse(dateStr));
803816
return new java.sql.Date(instant.toEpochMilli());
804817
}
805-
return null == stringToDate(dateStr) ? null : new java.sql.Date(stringToDate(dateStr).getTime());
818+
Date date = stringToDate(dateStr);
819+
return null == date ? null : new java.sql.Date(date.getTime());
806820
}
807821

808822
}

core/src/main/java/com/dtstack/flink/sql/util/MathUtil.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,6 @@
2323
import java.math.BigInteger;
2424
import java.sql.Date;
2525
import java.sql.Timestamp;
26-
import java.text.ParseException;
27-
import java.text.SimpleDateFormat;
28-
29-
30-
import java.time.Instant;
31-
import java.time.LocalDate;
32-
import java.time.LocalTime;
33-
import java.time.ZoneOffset;
34-
import java.util.TimeZone;
35-
import java.util.regex.Pattern;
36-
37-
import static java.time.format.DateTimeFormatter.ISO_INSTANT;
3826

3927
/**
4028
* Convert val to specified numeric type

core/src/main/java/com/dtstack/flink/sql/util/ParseUtils.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,16 +37,23 @@
3737
package com.dtstack.flink.sql.util;
3838

3939
import com.google.common.collect.HashBasedTable;
40-
import org.apache.calcite.sql.*;
40+
41+
import org.apache.calcite.sql.SqlBasicCall;
42+
import org.apache.calcite.sql.SqlIdentifier;
43+
import org.apache.calcite.sql.SqlJoin;
44+
import org.apache.calcite.sql.SqlKind;
45+
import org.apache.calcite.sql.SqlNode;
4146
import org.apache.commons.lang3.StringUtils;
4247
import org.apache.flink.api.java.tuple.Tuple2;
4348

4449
import java.util.Arrays;
4550
import java.util.List;
46-
import java.util.Map;
4751
import java.util.Set;
4852

49-
import static org.apache.calcite.sql.SqlKind.*;
53+
import static org.apache.calcite.sql.SqlKind.AS;
54+
import static org.apache.calcite.sql.SqlKind.IDENTIFIER;
55+
import static org.apache.calcite.sql.SqlKind.JOIN;
56+
5057

5158
/**
5259
* @Auther: jiangjunjie

core/src/main/java/com/dtstack/flink/sql/watermarker/WaterMarkerAssigner.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@
2929
import org.apache.flink.types.Row;
3030
import org.apache.flink.util.Preconditions;
3131
import java.sql.Timestamp;
32-
import java.lang.Long;
33-
3432
/**
3533
* define watermarker
3634
* Date: 2018/6/29

elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/CustomerSinkFunc.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.slf4j.Logger;
3333
import org.slf4j.LoggerFactory;
3434

35-
import java.util.ArrayList;
3635
import java.util.List;
3736
import java.util.Map;
3837
import java.util.stream.Collectors;

hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllReqRow.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@
2020

2121
package com.dtstack.flink.sql.side.hbase;
2222

23-
import com.dtstack.flink.sql.side.*;
23+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
24+
import com.dtstack.flink.sql.side.BaseAllReqRow;
25+
import com.dtstack.flink.sql.side.FieldInfo;
26+
import com.dtstack.flink.sql.side.JoinInfo;
2427
import com.dtstack.flink.sql.side.hbase.table.HbaseSideTableInfo;
2528
import org.apache.calcite.sql.JoinType;
2629
import org.apache.commons.collections.map.HashedMap;
@@ -34,15 +37,23 @@
3437
import org.apache.hadoop.hbase.Cell;
3538
import org.apache.hadoop.hbase.CellUtil;
3639
import org.apache.hadoop.hbase.TableName;
37-
import org.apache.hadoop.hbase.client.*;
40+
import org.apache.hadoop.hbase.client.Connection;
41+
import org.apache.hadoop.hbase.client.ConnectionFactory;
42+
import org.apache.hadoop.hbase.client.Result;
43+
import org.apache.hadoop.hbase.client.ResultScanner;
44+
import org.apache.hadoop.hbase.client.Scan;
45+
import org.apache.hadoop.hbase.client.Table;
3846
import org.apache.hadoop.hbase.util.Bytes;
3947
import org.slf4j.Logger;
4048
import org.slf4j.LoggerFactory;
4149

4250
import java.io.IOException;
4351
import java.sql.SQLException;
4452
import java.sql.Timestamp;
45-
import java.util.*;
53+
import java.util.Calendar;
54+
import java.util.HashMap;
55+
import java.util.List;
56+
import java.util.Map;
4657
import java.util.concurrent.atomic.AtomicReference;
4758

4859
public class HbaseAllReqRow extends BaseAllReqRow {

kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerKeyedSerializationSchema.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,10 @@
44
import com.dtstack.flink.sql.format.SerializationMetricWrapper;
55
import com.dtstack.flink.sql.sink.kafka.serialization.JsonCRowSerializationSchema;
66
import org.apache.flink.api.common.serialization.SerializationSchema;
7-
import org.apache.flink.formats.json.JsonRowSerializationSchema;
87
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
98
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
109
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
1110
import org.apache.flink.table.runtime.types.CRow;
12-
import org.apache.flink.types.Row;
1311
import org.slf4j.Logger;
1412
import org.slf4j.LoggerFactory;
1513

kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/serialization/AvroCRowSerializationSchema.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@
5353
import java.util.List;
5454
import java.util.Map;
5555
import java.util.Objects;
56-
import java.util.Optional;
5756
import java.util.TimeZone;
5857
import java.util.stream.Collectors;
5958

@@ -287,6 +286,7 @@ private Object convertFlinkType(Schema schema, Object object) {
287286
case DOUBLE:
288287
case BOOLEAN:
289288
return object;
289+
default:
290290
}
291291
throw new RuntimeException("Unsupported Avro type:" + schema);
292292
}

kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/serialization/CsvCRowSerializationSchema.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,7 @@
4747
import java.math.BigDecimal;
4848
import java.math.BigInteger;
4949
import java.util.Arrays;
50-
import java.util.Iterator;
5150
import java.util.Objects;
52-
import java.util.stream.IntStream;
53-
import java.util.stream.Stream;
5451

5552
/**
5653
* Serialization schema that serializes an object of Flink types into a CSV bytes.
@@ -132,9 +129,9 @@ public Builder setFieldDelimiter(char c) {
132129

133130
public Builder setLineDelimiter(String delimiter) {
134131
Preconditions.checkNotNull(delimiter, "Delimiter must not be null.");
135-
if (!delimiter.equals("\n") && !delimiter.equals("\r") && !delimiter.equals("\r\n")) {
132+
if (!("\n".equals(delimiter)) && !("\r".equals(delimiter)) && !("\r\n".equals(delimiter))) {
136133
throw new IllegalArgumentException(
137-
"Unsupported new line delimiter. Only \\n, \\r, or \\r\\n are supported.");
134+
"Unsupported new line delimiter. Only \\n, \\r, or \\r\\n are supported.");
138135
}
139136
this.csvSchema = this.csvSchema.rebuild().setLineSeparator(delimiter).build();
140137
return this;

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceTableInfo.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
package com.dtstack.flink.sql.source.kafka.table;
2121

22-
import com.dtstack.flink.sql.format.FormatType;
2322
import com.dtstack.flink.sql.table.AbstractSourceTableInfo;
2423
import com.google.common.base.Preconditions;
2524

kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducer09.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09;
2525
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
2626
import org.apache.flink.table.runtime.types.CRow;
27-
import org.apache.flink.types.Row;
2827
import org.slf4j.Logger;
2928
import org.slf4j.LoggerFactory;
3029

kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducer09Factory.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
2424
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
2525
import org.apache.flink.table.runtime.types.CRow;
26-
import org.apache.flink.types.Row;
2726

2827
import java.util.Optional;
2928
import java.util.Properties;

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,15 @@
2121

2222
import com.dtstack.flink.sql.source.kafka.table.KafkaSourceTableInfo;
2323
import com.dtstack.flink.sql.table.AbstractSourceTableInfo;
24-
import com.dtstack.flink.sql.util.DtStringUtil;
2524
import org.apache.commons.lang3.StringUtils;
2625
import org.apache.flink.api.common.typeinfo.TypeInformation;
2726
import org.apache.flink.streaming.api.datastream.DataStreamSource;
2827
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
2928
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
30-
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
3129
import org.apache.flink.table.api.Table;
3230
import org.apache.flink.table.api.java.StreamTableEnvironment;
3331
import org.apache.flink.types.Row;
3432

35-
import java.util.Map;
3633
import java.util.Properties;
3734

3835
/**

kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducer010Factory.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
2424
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
2525
import org.apache.flink.table.runtime.types.CRow;
26-
import org.apache.flink.types.Row;
2726

2827
import java.util.Optional;
2928
import java.util.Properties;

kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducer011Factory.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
2424
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
2525
import org.apache.flink.table.runtime.types.CRow;
26-
import org.apache.flink.types.Row;
2726

2827
import java.util.Optional;
2928
import java.util.Properties;

launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherMain.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,7 @@
2020

2121
package com.dtstack.flink.sql.launcher;
2222

23-
import com.aiweiergou.tool.logger.api.ChangeLogLevelProcess;
2423
import com.dtstack.flink.sql.constrant.ConfigConstrant;
25-
import com.dtstack.flink.sql.launcher.perjob.PerJobClusterClientBuilder;
2624
import com.google.common.collect.Lists;
2725
import com.alibaba.fastjson.JSON;
2826
import com.alibaba.fastjson.TypeReference;
@@ -43,14 +41,9 @@
4341
import org.apache.flink.runtime.jobgraph.JobGraph;
4442
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
4543
import org.apache.flink.util.FileUtils;
46-
import org.slf4j.Logger;
47-
import org.slf4j.LoggerFactory;
4844

49-
import java.io.BufferedReader;
5045
import java.io.File;
51-
import java.io.FileInputStream;
5246
import java.io.IOException;
53-
import java.io.InputStreamReader;
5447
import java.net.URLDecoder;
5548
import java.util.LinkedList;
5649
import java.util.List;

launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobClusterClientBuilder.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,12 @@
3232
import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
3333
import org.apache.flink.yarn.YarnClusterDescriptor;
3434
import org.apache.hadoop.fs.Path;
35-
import org.apache.hadoop.security.UserGroupInformation;
3635
import org.apache.hadoop.yarn.client.api.YarnClient;
3736
import org.apache.hadoop.yarn.conf.YarnConfiguration;
3837
import org.slf4j.Logger;
3938
import org.slf4j.LoggerFactory;
4039

4140
import java.io.File;
42-
import java.io.IOException;
4341
import java.net.MalformedURLException;
4442
import java.net.URL;
4543
import java.util.ArrayList;

mongo/mongo-side/mongo-all-side/src/main/java/com/dtstack/flink/sql/side/mongo/MongoAllReqRow.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,6 @@
3737
import org.apache.commons.collections.CollectionUtils;
3838
import org.apache.commons.lang3.StringUtils;
3939
import org.apache.flink.api.java.typeutils.RowTypeInfo;
40-
import com.google.common.collect.Lists;
41-
import com.google.common.collect.Maps;
4240
import org.apache.flink.table.runtime.types.CRow;
4341
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
4442
import org.apache.flink.types.Row;

mongo/mongo-side/mongo-async-side/src/main/java/com/dtstack/flink/sql/side/mongo/MongoAsyncReqRow.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,6 @@
4444
import com.mongodb.async.client.MongoClients;
4545
import com.mongodb.async.client.MongoCollection;
4646
import com.mongodb.async.client.MongoDatabase;
47-
import org.apache.flink.api.java.typeutils.RowTypeInfo;
48-
import com.google.common.collect.Lists;
49-
import org.apache.flink.configuration.Configuration;
50-
import org.apache.flink.streaming.api.functions.async.ResultFuture;
51-
import org.apache.flink.table.runtime.types.CRow;
52-
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
53-
import org.apache.flink.types.Row;
5447
import org.bson.Document;
5548
import org.slf4j.Logger;
5649
import org.slf4j.LoggerFactory;

mongo/mongo-sink/src/main/java/com/dtstack/flink/sql/sink/mongo/MongoOutputFormat.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,6 @@
3535
import org.slf4j.LoggerFactory;
3636

3737
import java.io.IOException;
38-
import java.util.ArrayList;
39-
import java.util.List;
4038

4139
/**
4240
* Reason:

0 commit comments

Comments
 (0)