Skip to content

Commit 304f180

Browse files
committed
code opt
1 parent f544d1a commit 304f180

File tree

15 files changed

+60
-90
lines changed

15 files changed

+60
-90
lines changed

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws SQ
267267
String connInfo = "address:" + tableInfo.getAddress() + ";userName:" + tableInfo.getUserName()
268268
+ ",pwd:" + tableInfo.getPassword();
269269
LOG.warn("get conn fail, wait for 5 sec and try again, connInfo:" + connInfo);
270-
Thread.sleep(5 * 1000);
270+
Thread.sleep(LOAD_DATA_ERROR_SLEEP_TIME);
271271
} catch (InterruptedException e1) {
272272
LOG.error("", e1);
273273
}

core/src/main/java/com/dtstack/flink/sql/environment/MyLocalStreamEnvironment.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -113,14 +113,16 @@ public JobExecutionResult execute(String jobName) throws Exception {
113113
LOG.info("Running job on local embedded Flink mini cluster");
114114
}
115115

116-
MiniCluster exec = new MiniCluster(configBuilder.build());
116+
MiniCluster exec = null;
117117
try {
118+
exec = new MiniCluster(configBuilder.build());
118119
exec.start();
119120
return exec.executeJobBlocking(jobGraph);
120-
}
121-
finally {
121+
} finally {
122122
transformations.clear();
123-
exec.closeAsync();
123+
if (null != exec) {
124+
exec.closeAsync();
125+
}
124126
}
125127
}
126128
}

core/src/main/java/com/dtstack/flink/sql/side/AbstractSideTableInfo.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,11 @@ public abstract class AbstractSideTableInfo extends AbstractTableInfo implements
5757

5858
public static final String ASYNC_REQ_POOL_KEY = "asyncPoolSize";
5959

60-
private String cacheType = "none";//None or LRU or ALL
60+
private String cacheType = "none";
6161

6262
private int cacheSize = 10000;
6363

64-
private long cacheTimeout = 60 * 1000;//
64+
private long cacheTimeout = 60_000L;
6565

6666
private int asyncCapacity=100;
6767

core/src/main/java/com/dtstack/flink/sql/side/BaseAllReqRow.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ public abstract class BaseAllReqRow extends RichFlatMapFunction<CRow, CRow> impl
4747

4848
private static final Logger LOG = LoggerFactory.getLogger(BaseAllReqRow.class);
4949

50+
public static final long LOAD_DATA_ERROR_SLEEP_TIME = 5_000L;
51+
5052
protected BaseSideInfo sideInfo;
5153

5254
private ScheduledExecutorService es;

core/src/main/java/com/dtstack/flink/sql/util/DateUtil.java

Lines changed: 26 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@ public static Date stringToDate(String strDate) {
7575
return null;
7676
}
7777
try {
78-
;
7978
return localDateTimetoDate(LocalDateTime.parse(strDate, DATE_TIME_FORMATTER));
8079
} catch (DateTimeParseException ignored) {
8180
}
@@ -113,13 +112,13 @@ public static long getTodayStart(long day) {
113112
if (("" + day).length() > 10) {
114113
cal.setTime(new Date(day));
115114
} else {
116-
cal.setTime(new Date(day * 1000));
115+
cal.setTime(new Date(day * 1000L));
117116
}
118117
cal.set(Calendar.HOUR_OF_DAY, 0);
119118
cal.set(Calendar.MINUTE, 0);
120119
cal.set(Calendar.SECOND, 0);
121120
cal.set(Calendar.MILLISECOND, 0);
122-
firstDay = cal.getTimeInMillis() / 1000;
121+
firstDay = cal.getTimeInMillis() / 1000L;
123122
return firstDay;
124123
}
125124

@@ -131,7 +130,7 @@ public static long getTodayStart(long day) {
131130
*/
132131
public static long getTodayStart(long day,String scope) {
133132
if("MS".equals(scope)){
134-
return getTodayStart(day)*1000;
133+
return getTodayStart(day)*1000L;
135134
}else if("S".equals(scope)){
136135
return getTodayStart(day);
137136
}else{
@@ -151,13 +150,13 @@ public static long getNextDayStart(long day) {
151150
if (("" + day).length() > 10) {
152151
cal.setTime(new Date(day));
153152
} else {
154-
cal.setTime(new Date(day * 1000));
153+
cal.setTime(new Date(day * 1000L));
155154
}
156155
cal.set(Calendar.HOUR_OF_DAY, 0);
157156
cal.set(Calendar.MINUTE, 0);
158157
cal.set(Calendar.SECOND, 0);
159158
cal.set(Calendar.MILLISECOND, 0);
160-
nextDay = (cal.getTimeInMillis() + daySpanMill) / 1000;
159+
nextDay = (cal.getTimeInMillis() + daySpanMill) / 1000L;
161160
return nextDay;
162161
}
163162

@@ -169,7 +168,7 @@ public static long getNextDayStart(long day) {
169168
*/
170169
public static long getNextDayStart(long day,String scope) {
171170
if("MS".equals(scope)){
172-
return getNextDayStart(day)*1000;
171+
return getNextDayStart(day)*1000L;
173172
}else if("S".equals(scope)){
174173
return getNextDayStart(day);
175174
}else{
@@ -186,13 +185,13 @@ public static long getNextDayStart(long day,String scope) {
186185
public static long getMonthFirst(long day) {
187186
long firstDay = 0L;
188187
Calendar cal = Calendar.getInstance();
189-
cal.setTime(new Date(day * 1000));
188+
cal.setTime(new Date(day * 1000L));
190189
cal.set(Calendar.DAY_OF_MONTH, 1);
191190
cal.set(Calendar.HOUR_OF_DAY, 0);
192191
cal.set(Calendar.MINUTE, 0);
193192
cal.set(Calendar.SECOND, 0);
194193
cal.set(Calendar.MILLISECOND, 0);
195-
firstDay = cal.getTimeInMillis() / 1000;
194+
firstDay = cal.getTimeInMillis() / 1000L;
196195
return firstDay;
197196
}
198197

@@ -224,7 +223,7 @@ public static int getYear(long day) {
224223
public static long getWeekFirst(long day) {
225224
long firstDay = 0L;
226225
Calendar cal = Calendar.getInstance();
227-
cal.setTime(new Date(day * 1000));
226+
cal.setTime(new Date(day * 1000L));
228227
cal.setFirstDayOfWeek(Calendar.MONDAY);
229228
cal.set(Calendar.DAY_OF_WEEK, Calendar.MONDAY);
230229
cal.set(Calendar.HOUR_OF_DAY, 0);
@@ -243,7 +242,7 @@ public static long getWeekFirst(long day) {
243242
*/
244243
public static int getWeekOfYear(long day) {
245244
Calendar cal = Calendar.getInstance();
246-
cal.setTime(new Date(day * 1000));
245+
cal.setTime(new Date(day * 1000L));
247246
return cal.get(Calendar.WEEK_OF_YEAR);
248247
}
249248

@@ -363,7 +362,7 @@ public static long getDateMillToFormat(String day, String inFormat) throws Parse
363362
Date date = sdf.parse(day);
364363
Calendar calendar = Calendar.getInstance();
365364
calendar.setTime(date);
366-
return calendar.getTimeInMillis()/1000;
365+
return calendar.getTimeInMillis()/1000L;
367366
}
368367

369368
/**
@@ -383,7 +382,7 @@ public static long getFirstDay4Month(int year, int month) {
383382
cal.set(Calendar.MINUTE, 0);
384383
cal.set(Calendar.SECOND, 0);
385384
cal.set(Calendar.MILLISECOND, 0);
386-
firstDay = cal.getTimeInMillis() / 1000;
385+
firstDay = cal.getTimeInMillis() / 1000L;
387386
return firstDay;
388387
}
389388

@@ -405,7 +404,7 @@ public static long getLastDay4Month(int year, int month) {
405404
cal.set(Calendar.MINUTE, 0);
406405
cal.set(Calendar.SECOND, 0);
407406
cal.set(Calendar.MILLISECOND, 0);
408-
lastDay = cal.getTimeInMillis() / 1000;
407+
lastDay = cal.getTimeInMillis() / 1000L;
409408
return lastDay;
410409
}
411410

@@ -444,7 +443,7 @@ public static long getMillByOneDay() {
444443
cal.set(Calendar.MINUTE, 0);
445444
cal.set(Calendar.SECOND, 0);
446445
cal.set(Calendar.MILLISECOND, 0);
447-
return cal.getTimeInMillis() / 1000;
446+
return cal.getTimeInMillis() / 1000L;
448447
}
449448

450449
/**
@@ -458,7 +457,7 @@ public static long getMillByYesDay() {
458457
cal.set(Calendar.MINUTE, 0);
459458
cal.set(Calendar.SECOND, 0);
460459
cal.set(Calendar.MILLISECOND, 0);
461-
return cal.getTimeInMillis() / 1000;
460+
return cal.getTimeInMillis() / 1000L;
462461
}
463462

464463
/**
@@ -472,7 +471,7 @@ public static long getMillByLastWeekDay() {
472471
cal.set(Calendar.MINUTE, 0);
473472
cal.set(Calendar.SECOND, 0);
474473
cal.set(Calendar.MILLISECOND, 0);
475-
return cal.getTimeInMillis() / 1000;
474+
return cal.getTimeInMillis() / 1000L;
476475
}
477476

478477
/**
@@ -534,7 +533,7 @@ public static long getMillToDay(Calendar cal,int dateT){
534533
cal.set(Calendar.MINUTE, 0);
535534
cal.set(Calendar.SECOND, 0);
536535
cal.set(Calendar.MILLISECOND, 0);
537-
return cal.getTimeInMillis()/1000;
536+
return cal.getTimeInMillis()/1000L;
538537
}
539538

540539
/**
@@ -563,7 +562,7 @@ public static String getDate(long day, String format) {
563562
if (("" + day).length() > 10) {
564563
cal.setTime(new Date(day));
565564
} else {
566-
cal.setTime(new Date(day * 1000));
565+
cal.setTime(new Date(day * 1000L));
567566
}
568567
SimpleDateFormat sf = new SimpleDateFormat(format);
569568
return sf.format(cal.getTime());
@@ -619,7 +618,7 @@ public static Date stringToDate(String day, String format) {
619618
*/
620619
public static String longToString(long day, String format) throws ParseException {
621620
if (("" + day).length() <= 10){
622-
day=day*1000;
621+
day=day*1000L;
623622
}
624623
SimpleDateFormat dateFormat = new SimpleDateFormat(format);
625624
String date = dateFormat.format(day);
@@ -634,7 +633,7 @@ public static String longToString(long day, String format) throws ParseException
634633
*/
635634
public static int getMinusDate(int day, int minusDay) {
636635
Calendar cal = Calendar.getInstance();
637-
cal.setTime(new Date(day * 1000));
636+
cal.setTime(new Date(day * 1000L));
638637
cal.set(Calendar.DATE, cal.get(Calendar.DATE) - minusDay);
639638
cal.set(Calendar.HOUR_OF_DAY, 0);
640639
cal.set(Calendar.MINUTE, 0);
@@ -663,7 +662,7 @@ public static int getWeeksBetweenTwoDates(long startDay, long endDay) {
663662

664663
public static int getMaxWeekOfYear(long startDay) {
665664
Calendar cal = Calendar.getInstance();
666-
cal.setTime(new Date(startDay * 1000));
665+
cal.setTime(new Date(startDay * 1000L));
667666
return cal.getMaximum(Calendar.WEEK_OF_YEAR);
668667
}
669668

@@ -696,7 +695,7 @@ public static long getMinuteStart(long time) {
696695
if (("" + time).length() > 10) {
697696
cal.setTime(new Date(time));
698697
} else {
699-
cal.setTime(new Date(time * 1000));
698+
cal.setTime(new Date(time * 1000L));
700699
}
701700
cal.set(Calendar.SECOND, 0);
702701
cal.set(Calendar.MILLISECOND, 0);
@@ -719,7 +718,7 @@ public static long getHourStart(long time) {
719718
cal.set(Calendar.SECOND, 0);
720719
cal.set(Calendar.MILLISECOND, 0);
721720
cal.set(Calendar.MINUTE, 0);
722-
firstDay = cal.getTimeInMillis() / 1000;
721+
firstDay = cal.getTimeInMillis() / 1000L;
723722
return firstDay;
724723
}
725724

@@ -757,7 +756,7 @@ public static java.sql.Timestamp columnToTimestamp(Object column) {
757756
if (column == null) {
758757
return null;
759758
} else if(column instanceof String) {
760-
return new java.sql.Timestamp(stringToDate((String)column).getTime());
759+
return null == stringToDate((String) column) ? null : new java.sql.Timestamp(stringToDate((String) column).getTime());
761760
} else if (column instanceof Integer) {
762761
Integer rawData = (Integer) column;
763762
return new java.sql.Timestamp(rawData.longValue());
@@ -791,7 +790,7 @@ public static Timestamp getTimestampFromStr(String timeStr) {
791790
Instant instant = Instant.from(ISO_INSTANT.parse(timeStr));
792791
return new Timestamp(instant.getEpochSecond() * MILLIS_PER_SECOND);
793792
}
794-
return new Timestamp(stringToDate(timeStr).getTime());
793+
return null == stringToDate(timeStr) ? null : new Timestamp(stringToDate(timeStr).getTime());
795794
}
796795

797796
public static java.sql.Date getDateFromStr(String dateStr) {
@@ -803,7 +802,7 @@ public static java.sql.Date getDateFromStr(String dateStr) {
803802
Instant instant = Instant.from(ISO_INSTANT.parse(dateStr));
804803
return new java.sql.Date(instant.toEpochMilli());
805804
}
806-
return new java.sql.Date(stringToDate(dateStr).getTime());
805+
return null == stringToDate(dateStr) ? null : new java.sql.Date(stringToDate(dateStr).getTime());
807806
}
808807

809808
}

core/src/main/java/com/dtstack/flink/sql/util/JDBCUtils.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,13 @@
1919

2020
package com.dtstack.flink.sql.util;
2121

22-
import org.slf4j.Logger;
23-
import org.slf4j.LoggerFactory;
24-
2522
import java.sql.DriverManager;
2623

2724
public class JDBCUtils {
28-
29-
private static final Logger LOG = LoggerFactory.getLogger(ClassUtil.class);
30-
31-
public final static String LOCK_STR = "jdbc_lock_str";
25+
private static final Object LOCK = new Object();
3226

3327
public static void forName(String clazz, ClassLoader classLoader) {
34-
synchronized (LOCK_STR){
28+
synchronized (LOCK){
3529
try {
3630
Class.forName(clazz, true, classLoader);
3731
DriverManager.setLoginTimeout(10);

elasticsearch6/elasticsearch6-side/elasticsearch6-all-side/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/Elasticsearch6AllReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws IO
187187
try {
188188
String connInfo = "url: " + tableInfo.getAddress() + "; userName: " + tableInfo.getUserName() + ", pwd:" + tableInfo.getPassword();
189189
LOG.warn("get conn fail, wait for 5 sec and try again, connInfo:" + connInfo);
190-
Thread.sleep(5 * 1000);
190+
Thread.sleep(LOAD_DATA_ERROR_SLEEP_TIME);
191191
} catch (InterruptedException e1) {
192192
LOG.error("", e1);
193193
}

elasticsearch6/elasticsearch6-side/elasticsearch6-side-core/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/util/Es6Util.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -176,13 +176,6 @@ public static BoolQueryBuilder buildFilterCondition(BoolQueryBuilder boolQueryBu
176176
case "NOT_EQUALS":
177177
return boolQueryBuilder.mustNot(QueryBuilders.termQuery(textConvertToKeyword(info.getFieldName(), sideInfo), removeSpaceAndApostrophe(info.getCondition())[0]));
178178
default:
179-
try {
180-
throw new Exception("elasticsearch6 does not support this operation: " + info.getOperatorKind());
181-
} catch (Exception e) {
182-
183-
e.printStackTrace();
184-
LOG.error(e.getMessage());
185-
}
186179
return boolQueryBuilder;
187180
}
188181

elasticsearch6/elasticsearch6-side/elasticsearch6-side-core/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/util/MathUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ public static BigDecimal getBigDecimal(Object obj) {
216216
} else if (obj instanceof BigInteger) {
217217
return new BigDecimal((BigInteger) obj);
218218
} else if (obj instanceof Number) {
219-
return new BigDecimal(((Number) obj).doubleValue());
219+
return BigDecimal.valueOf(((Number) obj).doubleValue());
220220
}
221221
throw new RuntimeException("not support type of " + obj.getClass() + " convert to BigDecimal.");
222222
}

hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/RowKeyBuilder.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,9 +120,10 @@ public ReplaceInfo getReplaceInfo(String field){
120120
}
121121

122122
private List<ReplaceInfo> makeFormula(String formula){
123-
if(formula == null || formula.length() <= 0){
124-
Lists.newArrayList();
123+
if (formula == null || formula.length() <= 0) {
124+
return Lists.newArrayList();
125125
}
126+
126127
List<ReplaceInfo> result = Lists.newArrayList();
127128
for(String meta: splitIgnoreQuotaBrackets(formula, "\\+")){
128129
Matcher matcher = Md5Operator.matcher(meta.trim());

kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerFlinkPartition.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ public void open(int parallelInstanceId, int parallelInstances) {
2020
@Override
2121
public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
2222
Preconditions.checkArgument(partitions != null && partitions.length > 0, "Partitions of the target topic is empty.");
23+
2324
if(key == null){
2425
return partitions[this.parallelInstanceId % partitions.length];
2526
}

launcher/src/main/java/com/dtstack/flink/sql/launcher/ClusterClientFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ private static ApplicationId getYarnClusterApplicationId(YarnClient yarnClient)
158158

159159
}
160160

161-
if (StringUtils.isEmpty(applicationId.toString())) {
161+
if (null == applicationId) {
162162
throw new RuntimeException("No flink session found on yarn cluster.");
163163
}
164164
return applicationId;

0 commit comments

Comments
 (0)