Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions lts-core/src/main/java/com/lts/core/support/RetryScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public abstract class RetryScheduler<T> {
private ScheduledFuture<?> scheduledFuture;
private AtomicBoolean start = new AtomicBoolean(false);
private FailStore failStore;
// 名称主要是用来记录日志
private String name;

// 批量发送的消息数
private int batchSize = 5;
Expand All @@ -47,6 +49,7 @@ public RetryScheduler(Application application, String storePath) {
FailStoreFactory failStoreFactory = ExtensionLoader.getExtensionLoader(FailStoreFactory.class).getAdaptiveExtension();
failStore = failStoreFactory.getFailStore(application.getConfig(), storePath);
}

public RetryScheduler(Application application, String storePath, int batchSize) {
this(application, storePath);
this.batchSize = batchSize;
Expand All @@ -57,15 +60,19 @@ protected RetryScheduler(Application application, int batchSize) {
this.batchSize = batchSize;
}

public void setName(String name) {
this.name = name;
}

public void start() {
try {
if (start.compareAndSet(false, true)) {
// 这个时间后面再去优化
scheduledFuture = RETRY_EXECUTOR_SERVICE.scheduleWithFixedDelay(new CheckRunner(), 10, 30, TimeUnit.SECONDS);
LOGGER.error("Start retry scheduler success!");
LOGGER.info("Start {} retry scheduler success!", name);
}
} catch (Throwable t) {
LOGGER.error("Start retry scheduler failed!", t);
LOGGER.error("Start {} retry scheduler failed!", name, t);
}
}

Expand All @@ -74,10 +81,10 @@ public void stop() {
if (start.compareAndSet(false, true)) {
scheduledFuture.cancel(true);
RETRY_EXECUTOR_SERVICE.shutdown();
LOGGER.error("Stop retry scheduler success!");
LOGGER.info("Stop {} retry scheduler success!", name);
}
} catch (Throwable t) {
LOGGER.error("Stop retry scheduler failed!", t);
LOGGER.error("Stop {} retry scheduler failed!", name, t);
}
}

Expand Down Expand Up @@ -117,7 +124,7 @@ public void run() {
values.add(kvPair.getValue());
}
if (retry(values)) {
LOGGER.info("本地数据发送成功, {}", JSONUtils.toJSONString(values));
LOGGER.info("{} local files send success, {}", name, JSONUtils.toJSONString(values));
failStore.delete(keys);
} else {
break;
Expand All @@ -137,7 +144,7 @@ public void run() {
} while (CollectionUtils.isNotEmpty(kvPairs));

} catch (Throwable e) {
LOGGER.error(e.getMessage(), e);
LOGGER.error("Run {} retry scheduler error.", name, e);
}
}

Expand All @@ -152,7 +159,7 @@ public void inSchedule(String key, T value) {
failStore.close();
}
} catch (FailStoreException e) {
LOGGER.error(e.getMessage(), e);
LOGGER.error("{} in schedule error. ", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ protected boolean retry(List<Job> jobs) {
return false;
}
};
retryScheduler.setName(RetryJobClient.class.getSimpleName());
super.innerStart();
retryScheduler.start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ protected boolean retry(List<BizLog> list) {
return sendBizLog(list);
}
};
retryScheduler.setName(BizLogger.class.getSimpleName());
this.retryScheduler.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ protected boolean retry(List<JobResult> jobResults) {
return retrySendJobResults(jobResults);
}
};

retryScheduler.setName("JobPush");
retryScheduler.start();

// 线程安全的
Expand Down