From 71a357e3c096bdf1a1b29f1bfe3aeae61e4a77b7 Mon Sep 17 00:00:00 2001
From: stelin <794774870@qq.com>
Date: Tue, 23 May 2023 23:39:48 +0800
Subject: [PATCH 1/7] :zap:add test
---
openjob-server/openjob-server-log/pom.xml | 17 +++
.../java/io/openjob/server/log/EsTest.java | 100 ++++++++++++++++++
.../java/io/openjob/server/log/Product.java | 16 +++
3 files changed, 133 insertions(+)
create mode 100644 openjob-server/openjob-server-log/src/test/java/io/openjob/server/log/EsTest.java
create mode 100644 openjob-server/openjob-server-log/src/test/java/io/openjob/server/log/Product.java
diff --git a/openjob-server/openjob-server-log/pom.xml b/openjob-server/openjob-server-log/pom.xml
index db107b2d..23ba7b3b 100644
--- a/openjob-server/openjob-server-log/pom.xml
+++ b/openjob-server/openjob-server-log/pom.xml
@@ -32,6 +32,23 @@
openjob-server-common
+
+ org.elasticsearch.client
+ elasticsearch-rest-client
+ 7.0.0
+
+
+
+ org.elasticsearch
+ elasticsearch
+ 7.0.0
+
+
+ org.elasticsearch.client
+ elasticsearch-rest-high-level-client
+ 7.0.0
+
+
com.h2database
diff --git a/openjob-server/openjob-server-log/src/test/java/io/openjob/server/log/EsTest.java b/openjob-server/openjob-server-log/src/test/java/io/openjob/server/log/EsTest.java
new file mode 100644
index 00000000..9af71a52
--- /dev/null
+++ b/openjob-server/openjob-server-log/src/test/java/io/openjob/server/log/EsTest.java
@@ -0,0 +1,100 @@
+package io.openjob.server.log;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.index.IndexResponse;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author stelin swoft@qq.com
+ * @since 1.0.1
+ */
+public class EsTest {
+ @Test
+ public void test() throws IOException {
+ RestHighLevelClient client = new RestHighLevelClient(
+ RestClient.builder(
+ new HttpHost("localhost", 9200, "http")));
+
+ Map cnt = new HashMap<>();
+ cnt.put("key", "key2");
+ cnt.put("key2", "key236");
+ cnt.put("key3", "key326");
+ Product product1 = new Product();
+ product1.setId("id1");
+ product1.setName("name2");
+ product1.setContent(cnt);
+
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ String jsonString = objectMapper.writeValueAsString(product1);
+
+ IndexRequest indexRequest = new IndexRequest("product2");
+ indexRequest.id(product1.getId())
+ .source(jsonString, XContentType.JSON);
+
+ // 同步执行,并使用自定义RequestOptions(COMMON_OPTIONS)。
+ IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
+
+ long version = indexResponse.getVersion();
+ System.out.println(version);
+ }
+
+ @Test
+ public void test2() throws IOException {
+ RestHighLevelClient client = new RestHighLevelClient(
+ RestClient.builder(
+ new HttpHost("localhost", 9200, "http")));
+
+ Map cnt = new HashMap<>();
+ cnt.put("key", "key2");
+ cnt.put("key2", "key236");
+ cnt.put("key3", "key326");
+ Product product1 = new Product();
+ product1.setId("id12");
+ product1.setName("name12");
+ product1.setContent(cnt);
+
+ Map cnt2 = new HashMap<>();
+ cnt.put("key", "key2");
+ cnt.put("key2", "key236");
+ cnt.put("key3", "key326");
+ Product product2 = new Product();
+ product2.setId("id13");
+ product2.setName("name13");
+ product2.setContent(cnt2);
+
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ String jsonString = objectMapper.writeValueAsString(product1);
+
+ ObjectMapper objectMapper2 = new ObjectMapper();
+ String jsonString2 = objectMapper2.writeValueAsString(product2);
+
+ IndexRequest indexRequest = new IndexRequest("product2");
+ indexRequest.id(product1.getId())
+ .source(jsonString, XContentType.JSON);
+
+ IndexRequest indexRequest2 = new IndexRequest("product2");
+ indexRequest2.id(product2.getId())
+ .source(jsonString2, XContentType.JSON);
+
+ BulkRequest bulkRequest = new BulkRequest();
+ bulkRequest.add(indexRequest);
+ bulkRequest.add(indexRequest2);
+
+
+ BulkResponse bulk = client.bulk(bulkRequest, RequestOptions.DEFAULT);
+ }
+}
diff --git a/openjob-server/openjob-server-log/src/test/java/io/openjob/server/log/Product.java b/openjob-server/openjob-server-log/src/test/java/io/openjob/server/log/Product.java
new file mode 100644
index 00000000..6ccb67de
--- /dev/null
+++ b/openjob-server/openjob-server-log/src/test/java/io/openjob/server/log/Product.java
@@ -0,0 +1,16 @@
+package io.openjob.server.log;
+
+import lombok.Data;
+
+import java.util.Map;
+
+/**
+ * @author stelin swoft@qq.com
+ * @since 1.0.1
+ */
+@Data
+public class Product {
+ private String id;
+ private String name;
+ private Map content;
+}
From bdf67bb9c74adf34819dbf264f01bc2d84f2bf97 Mon Sep 17 00:00:00 2001
From: stelin <794774870@qq.com>
Date: Wed, 24 May 2023 10:46:42 +0800
Subject: [PATCH 2/7] :zap:updat version
---
openjob-common/pom.xml | 2 +-
openjob-server/openjob-server-admin/pom.xml | 2 +-
openjob-server/openjob-server-cluster/pom.xml | 2 +-
openjob-server/openjob-server-common/pom.xml | 2 +-
openjob-server/openjob-server-dispatcher/pom.xml | 2 +-
openjob-server/openjob-server-log/pom.xml | 2 +-
openjob-server/openjob-server-openapi/pom.xml | 2 +-
openjob-server/openjob-server-repository/pom.xml | 2 +-
openjob-server/openjob-server-scheduler/pom.xml | 2 +-
openjob-server/openjob-server-starter/pom.xml | 2 +-
openjob-server/pom.xml | 2 +-
openjob-worker/openjob-worker-core/pom.xml | 2 +-
openjob-worker/openjob-worker-spring-boot-starter/pom.xml | 2 +-
openjob-worker/pom.xml | 4 ++--
pom.xml | 2 +-
15 files changed, 16 insertions(+), 16 deletions(-)
diff --git a/openjob-common/pom.xml b/openjob-common/pom.xml
index cc86fa87..60ff139d 100644
--- a/openjob-common/pom.xml
+++ b/openjob-common/pom.xml
@@ -5,7 +5,7 @@
openjob
io.openjob
- 1.0.1
+ 1.0.2
4.0.0
openjob-common
diff --git a/openjob-server/openjob-server-admin/pom.xml b/openjob-server/openjob-server-admin/pom.xml
index d4ac0dbd..e7b2c4da 100644
--- a/openjob-server/openjob-server-admin/pom.xml
+++ b/openjob-server/openjob-server-admin/pom.xml
@@ -5,7 +5,7 @@
openjob-server
io.openjob
- 1.0.1
+ 1.0.2
4.0.0
openjob-server-admin
diff --git a/openjob-server/openjob-server-cluster/pom.xml b/openjob-server/openjob-server-cluster/pom.xml
index 399f1e4a..54084e31 100644
--- a/openjob-server/openjob-server-cluster/pom.xml
+++ b/openjob-server/openjob-server-cluster/pom.xml
@@ -5,7 +5,7 @@
openjob-server
io.openjob
- 1.0.1
+ 1.0.2
4.0.0
diff --git a/openjob-server/openjob-server-common/pom.xml b/openjob-server/openjob-server-common/pom.xml
index cce0e1da..18399465 100644
--- a/openjob-server/openjob-server-common/pom.xml
+++ b/openjob-server/openjob-server-common/pom.xml
@@ -5,7 +5,7 @@
openjob-server
io.openjob
- 1.0.1
+ 1.0.2
4.0.0
openjob-server-common
diff --git a/openjob-server/openjob-server-dispatcher/pom.xml b/openjob-server/openjob-server-dispatcher/pom.xml
index 41d7cde7..f5c72de9 100644
--- a/openjob-server/openjob-server-dispatcher/pom.xml
+++ b/openjob-server/openjob-server-dispatcher/pom.xml
@@ -5,7 +5,7 @@
openjob-server
io.openjob
- 1.0.1
+ 1.0.2
4.0.0
diff --git a/openjob-server/openjob-server-log/pom.xml b/openjob-server/openjob-server-log/pom.xml
index 23ba7b3b..68b903dc 100644
--- a/openjob-server/openjob-server-log/pom.xml
+++ b/openjob-server/openjob-server-log/pom.xml
@@ -5,7 +5,7 @@
openjob-server
io.openjob
- 1.0.1
+ 1.0.2
4.0.0
openjob-server-log
diff --git a/openjob-server/openjob-server-openapi/pom.xml b/openjob-server/openjob-server-openapi/pom.xml
index 64a47e6b..e72cd747 100644
--- a/openjob-server/openjob-server-openapi/pom.xml
+++ b/openjob-server/openjob-server-openapi/pom.xml
@@ -5,7 +5,7 @@
openjob-server
io.openjob
- 1.0.1
+ 1.0.2
4.0.0
openjob-server-openapi
diff --git a/openjob-server/openjob-server-repository/pom.xml b/openjob-server/openjob-server-repository/pom.xml
index 9b5ea416..250c1439 100644
--- a/openjob-server/openjob-server-repository/pom.xml
+++ b/openjob-server/openjob-server-repository/pom.xml
@@ -5,7 +5,7 @@
openjob-server
io.openjob
- 1.0.1
+ 1.0.2
4.0.0
openjob-server-repository
diff --git a/openjob-server/openjob-server-scheduler/pom.xml b/openjob-server/openjob-server-scheduler/pom.xml
index d94efd0c..03eae2fe 100644
--- a/openjob-server/openjob-server-scheduler/pom.xml
+++ b/openjob-server/openjob-server-scheduler/pom.xml
@@ -5,7 +5,7 @@
openjob-server
io.openjob
- 1.0.1
+ 1.0.2
4.0.0
diff --git a/openjob-server/openjob-server-starter/pom.xml b/openjob-server/openjob-server-starter/pom.xml
index 457ed91e..e320801c 100644
--- a/openjob-server/openjob-server-starter/pom.xml
+++ b/openjob-server/openjob-server-starter/pom.xml
@@ -5,7 +5,7 @@
openjob-server
io.openjob
- 1.0.1
+ 1.0.2
4.0.0
diff --git a/openjob-server/pom.xml b/openjob-server/pom.xml
index 99536a8b..9c4e8ae0 100644
--- a/openjob-server/pom.xml
+++ b/openjob-server/pom.xml
@@ -5,7 +5,7 @@
openjob
io.openjob
- 1.0.1
+ 1.0.2
4.0.0
openjob-server
diff --git a/openjob-worker/openjob-worker-core/pom.xml b/openjob-worker/openjob-worker-core/pom.xml
index 170054c6..5a55a3bb 100644
--- a/openjob-worker/openjob-worker-core/pom.xml
+++ b/openjob-worker/openjob-worker-core/pom.xml
@@ -5,7 +5,7 @@
openjob-worker
io.openjob.worker
- 1.0.1
+ 1.0.2
4.0.0
diff --git a/openjob-worker/openjob-worker-spring-boot-starter/pom.xml b/openjob-worker/openjob-worker-spring-boot-starter/pom.xml
index b25f0717..7eb7e155 100644
--- a/openjob-worker/openjob-worker-spring-boot-starter/pom.xml
+++ b/openjob-worker/openjob-worker-spring-boot-starter/pom.xml
@@ -5,7 +5,7 @@
openjob-worker
io.openjob.worker
- 1.0.1
+ 1.0.2
4.0.0
diff --git a/openjob-worker/pom.xml b/openjob-worker/pom.xml
index babbb69d..107ec67b 100644
--- a/openjob-worker/pom.xml
+++ b/openjob-worker/pom.xml
@@ -5,7 +5,7 @@
openjob
io.openjob
- 1.0.1
+ 1.0.2
4.0.0
@@ -25,7 +25,7 @@
io.openjob.worker
openjob-worker-core
- 1.0.1
+ 1.0.2
diff --git a/pom.xml b/pom.xml
index 4bdbbddb..a420843f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -13,7 +13,7 @@
io.openjob
openjob
pom
- 1.0.1
+ 1.0.2
Openjob Build ${version}
Openjob build with Maven
From 129348827738f767079851cd338be5ca0c47d0d0 Mon Sep 17 00:00:00 2001
From: stelin <794774870@qq.com>
Date: Wed, 24 May 2023 18:03:33 +0800
Subject: [PATCH 3/7] :zap:add es
---
.../impl/DelayInstanceServiceImpl.java | 7 +-
.../service/impl/JobInstanceServiceImpl.java | 7 +-
.../server/admin/util/LogFormatUtil.java | 10 +-
.../service/JobInstanceTaskLogService.java | 9 +-
.../autoconfigure/LogAutoConfiguration.java | 15 ++
.../log/autoconfigure/LogProperties.java | 33 +----
.../log/client/AbstractJdbcHikariClient.java | 6 +-
.../io/openjob/server/log/client/Client.java | 4 +-
.../log/client/Elasticsearch7Client.java | 113 ++++++++++++++
.../openjob/server/log/client/H2Client.java | 3 +
.../server/log/client/MysqlClient.java | 3 +
.../log/constant/LogStorageConstant.java | 3 -
.../io/openjob/server/log/dao/LogDAO.java | 25 +++-
.../log/dao/impl/Elasticsearch7DAOImpl.java | 138 ++++++++++++++++++
.../server/log/dao/impl/JdbcDAOImpl.java | 34 +++--
...ProcessorLog.java => ProcessorLogDTO.java} | 4 +-
.../log/dto/ProcessorLogElasticDTO.java | 17 +++
...ogField.java => ProcessorLogFieldDTO.java} | 4 +-
.../openjob/server/log/mapper/LogMapper.java | 4 +-
.../log/dao/Elasticsearch7DAOImplTest.java | 60 ++++++++
.../scheduler/DelayInstanceScheduler.java | 7 +-
.../scheduler/DelayZsetScheduler.java | 11 +-
22 files changed, 435 insertions(+), 82 deletions(-)
create mode 100644 openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/client/Elasticsearch7Client.java
create mode 100644 openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/dao/impl/Elasticsearch7DAOImpl.java
rename openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/dto/{ProcessorLog.java => ProcessorLogDTO.java} (74%)
create mode 100644 openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/dto/ProcessorLogElasticDTO.java
rename openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/dto/{ProcessorLogField.java => ProcessorLogFieldDTO.java} (70%)
create mode 100644 openjob-server/openjob-server-log/src/test/java/io/openjob/server/log/dao/Elasticsearch7DAOImplTest.java
diff --git a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/DelayInstanceServiceImpl.java b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/DelayInstanceServiceImpl.java
index 41a665e9..3ac0c425 100644
--- a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/DelayInstanceServiceImpl.java
+++ b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/DelayInstanceServiceImpl.java
@@ -18,7 +18,7 @@
import io.openjob.server.common.util.PageUtil;
import io.openjob.server.common.vo.PageVO;
import io.openjob.server.log.dao.LogDAO;
-import io.openjob.server.log.dto.ProcessorLog;
+import io.openjob.server.log.dto.ProcessorLogDTO;
import io.openjob.server.repository.dao.DelayInstanceDAO;
import io.openjob.server.repository.dto.DelayInstancePageDTO;
import io.openjob.server.repository.entity.DelayInstance;
@@ -30,7 +30,6 @@
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
-import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
@@ -91,7 +90,7 @@ public ListDelayInstanceLogVO listProcessorLog(ListDelayInstanceLogRequest reque
AtomicLong nextTime = new AtomicLong(0L);
Integer isComplete = CommonConstant.NO;
try {
- List processorLogs = this.logDAO.queryByPage(request.getTaskId(), request.getTime(), request.getSize());
+ List processorLogs = this.logDAO.queryByPage(request.getTaskId(), request.getTime(), request.getSize());
if (!CollectionUtils.isEmpty(processorLogs)) {
// Processor list and nextTime.
@@ -108,7 +107,7 @@ public ListDelayInstanceLogVO listProcessorLog(ListDelayInstanceLogRequest reque
}
}
}
- } catch (SQLException e) {
+ } catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/JobInstanceServiceImpl.java b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/JobInstanceServiceImpl.java
index c81b00da..4a8d5302 100644
--- a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/JobInstanceServiceImpl.java
+++ b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/JobInstanceServiceImpl.java
@@ -21,7 +21,7 @@
import io.openjob.server.common.util.PageUtil;
import io.openjob.server.common.vo.PageVO;
import io.openjob.server.log.dao.LogDAO;
-import io.openjob.server.log.dto.ProcessorLog;
+import io.openjob.server.log.dto.ProcessorLogDTO;
import io.openjob.server.repository.dao.JobInstanceDAO;
import io.openjob.server.repository.dao.JobInstanceLogDAO;
import io.openjob.server.repository.dto.JobInstancePageDTO;
@@ -34,7 +34,6 @@
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
-import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
@@ -101,7 +100,7 @@ public ListProcessorLogVO getProcessorList(ListProcessorLogRequest request) {
Integer isComplete = CommonConstant.NO;
try {
String taskId = TaskUtil.getRandomUniqueId(request.getJobId(), request.getJobInstanceId(), 0L, 0L);
- List processorLogs = this.logDAO.queryByPage(taskId, request.getTime(), request.getSize());
+ List processorLogs = this.logDAO.queryByPage(taskId, request.getTime(), request.getSize());
if (!CollectionUtils.isEmpty(processorLogs)) {
// Processor list and nextTime.
@@ -118,7 +117,7 @@ public ListProcessorLogVO getProcessorList(ListProcessorLogRequest request) {
}
}
}
- } catch (SQLException e) {
+ } catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/util/LogFormatUtil.java b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/util/LogFormatUtil.java
index 6d2108cf..f845f9fe 100644
--- a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/util/LogFormatUtil.java
+++ b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/util/LogFormatUtil.java
@@ -2,13 +2,11 @@
import io.openjob.common.constant.LogFieldConstant;
import io.openjob.common.util.DateUtil;
-import io.openjob.server.log.dto.ProcessorLog;
-import io.openjob.server.log.dto.ProcessorLogField;
+import io.openjob.server.log.dto.ProcessorLogDTO;
+import io.openjob.server.log.dto.ProcessorLogFieldDTO;
import org.apache.commons.lang3.StringUtils;
import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
import java.util.stream.Collectors;
/**
@@ -29,9 +27,9 @@ public class LogFormatUtil {
* @param processorLog processorLog
* @return String
*/
- public static String formatLog(ProcessorLog processorLog) {
+ public static String formatLog(ProcessorLogDTO processorLog) {
Map fieldMap = processorLog.getFields().stream()
- .collect(Collectors.toMap(ProcessorLogField::getName, ProcessorLogField::getValue));
+ .collect(Collectors.toMap(ProcessorLogFieldDTO::getName, ProcessorLogFieldDTO::getValue));
String location = fieldMap.get(LogFieldConstant.LOCATION);
String message = String.format(
LOG_FORMAT,
diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/JobInstanceTaskLogService.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/JobInstanceTaskLogService.java
index d40178f9..2c9c5963 100644
--- a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/JobInstanceTaskLogService.java
+++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/JobInstanceTaskLogService.java
@@ -5,13 +5,12 @@
import io.openjob.common.request.WorkerJobInstanceTaskLogFieldRequest;
import io.openjob.common.request.WorkerJobInstanceTaskLogRequest;
import io.openjob.server.log.dao.LogDAO;
-import io.openjob.server.log.dto.ProcessorLog;
+import io.openjob.server.log.dto.ProcessorLogDTO;
import io.openjob.server.log.mapper.LogMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -37,7 +36,7 @@ public JobInstanceTaskLogService(LogDAO logDAO) {
* @param logReq log request.
*/
public void handleInstanceTaskLog(WorkerJobInstanceTaskLogRequest logReq) {
- List processorLogList = logReq.getFieldList().stream().map(fields -> {
+ List processorLogList = logReq.getFieldList().stream().map(fields -> {
// Field map.
Map> fieldMap = fields.stream()
.collect(Collectors.groupingBy(WorkerJobInstanceTaskLogFieldRequest::getName));
@@ -54,7 +53,7 @@ public void handleInstanceTaskLog(WorkerJobInstanceTaskLogRequest logReq) {
String timeStamp = Optional.ofNullable(fieldMap.get(LogFieldConstant.TIME_STAMP))
.orElseGet(() -> Lists.newArrayList(new WorkerJobInstanceTaskLogFieldRequest())).get(0).getValue();
- ProcessorLog processorLog = new ProcessorLog();
+ ProcessorLogDTO processorLog = new ProcessorLogDTO();
processorLog.setTaskId(taskId);
processorLog.setWorkerAddress(workerAddress);
processorLog.setTime(Long.valueOf(timeStamp));
@@ -64,7 +63,7 @@ public void handleInstanceTaskLog(WorkerJobInstanceTaskLogRequest logReq) {
try {
logDAO.batchAdd(processorLogList);
- } catch (SQLException e) {
+ } catch (Exception e) {
log.error("Batch add task log failed!", e);
throw new RuntimeException(e);
}
diff --git a/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/autoconfigure/LogAutoConfiguration.java b/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/autoconfigure/LogAutoConfiguration.java
index c32a7f84..2df7d255 100644
--- a/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/autoconfigure/LogAutoConfiguration.java
+++ b/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/autoconfigure/LogAutoConfiguration.java
@@ -1,9 +1,11 @@
package io.openjob.server.log.autoconfigure;
+import io.openjob.server.log.client.Elasticsearch7Client;
import io.openjob.server.log.client.H2Client;
import io.openjob.server.log.client.MysqlClient;
import io.openjob.server.log.constant.LogStorageConstant;
import io.openjob.server.log.dao.LogDAO;
+import io.openjob.server.log.dao.impl.Elasticsearch7DAOImpl;
import io.openjob.server.log.dao.impl.H2LogDAOImpl;
import io.openjob.server.log.dao.impl.MysqlLogDAOImpl;
import org.springframework.beans.factory.annotation.Autowired;
@@ -51,4 +53,17 @@ public LogDAO h2LogDAO(MysqlClient mysqlClient) {
return new MysqlLogDAOImpl(mysqlClient);
}
}
+
+ @ConditionalOnProperty(prefix = "openjob.log.storage", name = "selector", havingValue = LogStorageConstant.ELASTICSEARCH7)
+ public class Elasticsearch7autoconfiguration {
+ @Bean
+ public Elasticsearch7Client mysqlClient() {
+ return new Elasticsearch7Client(logProperties.getStorage().getElasticsearch7());
+ }
+
+ @Bean
+ public LogDAO h2LogDAO(Elasticsearch7Client elasticsearch7Client, LogProperties.Elasticsearch7Properties properties) {
+ return new Elasticsearch7DAOImpl(elasticsearch7Client, properties);
+ }
+ }
}
diff --git a/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/autoconfigure/LogProperties.java b/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/autoconfigure/LogProperties.java
index 522dee23..74f739d9 100644
--- a/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/autoconfigure/LogProperties.java
+++ b/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/autoconfigure/LogProperties.java
@@ -21,10 +21,7 @@ public static class Storage {
private H2Properties h2 = new H2Properties();
private MysqlProperties mysql = new MysqlProperties();
private TidbProperties tidb = new TidbProperties();
- private ElasticsearchProperties elasticsearch = new ElasticsearchProperties();
private Elasticsearch7Properties elasticsearch7 = new Elasticsearch7Properties();
- private SlsProperties sls = new SlsProperties();
- private ClsProperties cls = new ClsProperties();
}
@Data
@@ -42,30 +39,16 @@ public static class TidbProperties {
private JdbcProperties properties = new JdbcProperties();
}
- @Data
- public static class ElasticsearchProperties {
-
- }
-
@Data
public static class Elasticsearch7Properties {
-
- }
-
- /**
- * Aliyun sls
- */
- @Data
- public static class SlsProperties {
-
- }
-
- /**
- * Qcloud cls
- */
- @Data
- public static class ClsProperties {
-
+ private String clusterNodes;
+ private String username;
+ private String password;
+ private String protocol = "http";
+ private Integer connectTimeout = 3000;
+ private Integer socketTimeout = 3000;
+ private Integer responseTimeout = 3000;
+ private Integer bufferLimit = 104857600;
}
@Data
diff --git a/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/client/AbstractJdbcHikariClient.java b/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/client/AbstractJdbcHikariClient.java
index 267512b7..1b35ff6c 100644
--- a/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/client/AbstractJdbcHikariClient.java
+++ b/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/client/AbstractJdbcHikariClient.java
@@ -19,6 +19,7 @@ public abstract class AbstractJdbcHikariClient implements Client {
/**
* Init
+ *
* @param properties properties
*/
public void init(LogProperties.JdbcProperties properties) {
@@ -39,6 +40,7 @@ public void init(LogProperties.JdbcProperties properties) {
/**
* Init table.
+ *
* @throws SQLException SQLException
*/
public void initTable() throws SQLException {
@@ -63,7 +65,7 @@ public void initTable() throws SQLException {
* @return Connection
*/
public Connection getConnection() throws SQLException {
- return dataSource.getConnection();
+ return this.dataSource.getConnection();
}
@Override
@@ -73,6 +75,6 @@ public void connect() {
@Override
public void shutdown() {
-
+ this.dataSource.close();
}
}
diff --git a/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/client/Client.java b/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/client/Client.java
index 8c09821c..160522e2 100644
--- a/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/client/Client.java
+++ b/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/client/Client.java
@@ -1,12 +1,14 @@
package io.openjob.server.log.client;
+import org.springframework.beans.factory.InitializingBean;
+
import java.io.IOException;
/**
* @author stelin swoft@qq.com
* @since 1.0.0
*/
-public interface Client {
+public interface Client extends InitializingBean {
/**
* Connect
*
diff --git a/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/client/Elasticsearch7Client.java b/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/client/Elasticsearch7Client.java
new file mode 100644
index 00000000..bf8fc7dc
--- /dev/null
+++ b/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/client/Elasticsearch7Client.java
@@ -0,0 +1,113 @@
+package io.openjob.server.log.client;
+
+import io.openjob.server.log.autoconfigure.LogProperties;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.elasticsearch.client.HttpAsyncResponseConsumerFactory;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Objects;
+
+/**
+ * @author stelin swoft@qq.com
+ * @since 1.0.2
+ */
+public class Elasticsearch7Client implements Client {
+
+ private final LogProperties.Elasticsearch7Properties properties;
+ private RestHighLevelClient client;
+ private RequestOptions requestOptions;
+
+ public Elasticsearch7Client(LogProperties.Elasticsearch7Properties elasticsearch7Properties) {
+ this.properties = elasticsearch7Properties;
+ }
+
+ @Override
+ public void afterPropertiesSet() throws Exception {
+ // Connect client
+ this.connect();
+
+ // Request options
+ this.clientRequestOptions();
+ }
+
+ @Override
+ public void connect() throws Exception {
+ // Cluster hosts
+ RestClientBuilder builder = RestClient.builder(this.clientClusterHosts());
+
+ // Cluster credentials
+ this.clientCredentials(builder);
+ client = new RestHighLevelClient(builder);
+ }
+
+ @Override
+ public void shutdown() throws IOException {
+ this.client.close();
+ }
+
+ public RestHighLevelClient getClient() {
+ return client;
+ }
+
+ public RequestOptions getRequestOptions() {
+ return requestOptions;
+ }
+
+ /**
+ * Client credentials
+ *
+ * @param builder builder
+ */
+ private void clientCredentials(RestClientBuilder builder) {
+ if (Objects.isNull(this.properties.getUsername()) || Objects.isNull(this.properties.getPassword())) {
+ return;
+ }
+
+ CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+ credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(this.properties.getUsername(), this.properties.getPassword()));
+ builder.setHttpClientConfigCallback(f -> f.setDefaultCredentialsProvider(credentialsProvider));
+ }
+
+ /**
+ * Client hosts
+ *
+ * @return HttpHost[]
+ */
+ private HttpHost[] clientClusterHosts() {
+ if (StringUtils.isBlank(this.properties.getClusterNodes())) {
+ throw new RuntimeException("Elasticsearch7 `clusterNodes` can not be empty!");
+ }
+
+ if (StringUtils.isBlank(this.properties.getProtocol())) {
+ throw new RuntimeException("Elasticsearch7 `protocol` can not be empty!");
+ }
+
+ return Arrays.stream(this.properties.getClusterNodes().split(",")).map(cn -> {
+ String[] clusterSplit = cn.split(":");
+ if (clusterSplit.length != 2) {
+ throw new RuntimeException(String.format("Elasticsearch7 `clusterNodes` is invalid(clusterNode=%s)!", cn));
+ }
+ return new HttpHost(clusterSplit[0], Integer.parseInt(clusterSplit[1]), this.properties.getProtocol());
+ }).toArray(HttpHost[]::new);
+ }
+
+ /**
+ * Client default request options
+ */
+ private void clientRequestOptions() {
+ RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
+ builder.setHttpAsyncResponseConsumerFactory(
+ new HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory(this.properties.getBufferLimit()));
+ this.requestOptions = builder.build();
+ }
+}
diff --git a/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/client/H2Client.java b/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/client/H2Client.java
index bb6dbbcd..19163ec8 100644
--- a/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/client/H2Client.java
+++ b/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/client/H2Client.java
@@ -22,7 +22,10 @@ public class H2Client extends AbstractJdbcHikariClient {
*/
public H2Client(LogProperties.H2Properties h2Properties) {
this.h2Properties = h2Properties;
+ }
+ @Override
+ public void afterPropertiesSet() {
// Driver
LogProperties.JdbcProperties properties = h2Properties.getProperties();
if (Objects.isNull(properties.getDriver())) {
diff --git a/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/client/MysqlClient.java b/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/client/MysqlClient.java
index e68881d7..e516f5e7 100644
--- a/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/client/MysqlClient.java
+++ b/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/client/MysqlClient.java
@@ -19,7 +19,10 @@ public class MysqlClient extends AbstractJdbcHikariClient {
*/
public MysqlClient(LogProperties.MysqlProperties mysqlProperties) {
this.mysqlProperties = mysqlProperties;
+ }
+ @Override
+ public void afterPropertiesSet() {
// Driver
LogProperties.JdbcProperties properties = mysqlProperties.getProperties();
if (Objects.isNull(properties.getDriver())) {
diff --git a/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/constant/LogStorageConstant.java b/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/constant/LogStorageConstant.java
index b4bc865f..e51b37f7 100644
--- a/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/constant/LogStorageConstant.java
+++ b/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/constant/LogStorageConstant.java
@@ -8,8 +8,5 @@ public class LogStorageConstant {
public static final String H2 = "h2";
public static final String MYSQL = "mysql";
public static final String TIDB = "tidb";
- public static final String ELASTICSEARCH = "elasticsearch";
public static final String ELASTICSEARCH7 = "elasticsearch7";
- public static final String SLS = "sls";
- public static final String CLS = "cls";
}
diff --git a/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/dao/LogDAO.java b/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/dao/LogDAO.java
index d5c1b277..3b3442bb 100644
--- a/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/dao/LogDAO.java
+++ b/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/dao/LogDAO.java
@@ -1,6 +1,6 @@
package io.openjob.server.log.dao;
-import io.openjob.server.log.dto.ProcessorLog;
+import io.openjob.server.log.dto.ProcessorLogDTO;
import java.sql.SQLException;
import java.util.List;
@@ -17,7 +17,7 @@ public interface LogDAO {
* @param jobInstanceTaskLogs jobInstanceTaskLogs
* @throws SQLException SQLException
*/
- void batchAdd(List jobInstanceTaskLogs) throws SQLException;
+ void batchAdd(List jobInstanceTaskLogs) throws Exception;
/**
* Query by page.
@@ -28,5 +28,24 @@ public interface LogDAO {
* @return List
* @throws SQLException SQLException
*/
- List queryByPage(String taskUniqueId, Long time, Long size) throws SQLException;
+ List queryByPage(String taskUniqueId, Long time, Long size) throws Exception;
+
+ /**
+ * Query by page size
+ *
+ * @param taskUniqueId taskUniqueId
+ * @param searchKey searchKey
+ * @param page page
+ * @param size size
+ * @return List
+ * @throws Exception Exception
+ */
+ List queryByPageSize(String taskUniqueId, String searchKey, Long page, Long size) throws Exception;
+
+ /**
+ * Delete log before days
+ *
+ * @param beforeDays beforeDays
+ */
+ void deleteByDays(Integer beforeDays);
}
diff --git a/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/dao/impl/Elasticsearch7DAOImpl.java b/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/dao/impl/Elasticsearch7DAOImpl.java
new file mode 100644
index 00000000..7cfb66da
--- /dev/null
+++ b/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/dao/impl/Elasticsearch7DAOImpl.java
@@ -0,0 +1,138 @@
+package io.openjob.server.log.dao.impl;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.openjob.server.log.autoconfigure.LogProperties;
+import io.openjob.server.log.client.Elasticsearch7Client;
+import io.openjob.server.log.dao.LogDAO;
+import io.openjob.server.log.dto.ProcessorLogDTO;
+import io.openjob.server.log.dto.ProcessorLogElasticDTO;
+import io.openjob.server.log.dto.ProcessorLogFieldDTO;
+import lombok.extern.slf4j.Slf4j;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.index.query.BoolQueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.query.RangeQueryBuilder;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.sort.FieldSortBuilder;
+import org.elasticsearch.search.sort.SortOrder;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/**
+ * Elasticsearch7 client document
+ * ...
+ *
+ * @author stelin swoft@qq.com
+ * @since 1.0.2
+ */
+@Slf4j
+public class Elasticsearch7DAOImpl implements LogDAO {
+ private final Elasticsearch7Client elasticsearch7Client;
+ private final LogProperties.Elasticsearch7Properties properties;
+
+ public Elasticsearch7DAOImpl(Elasticsearch7Client elasticsearch7Client, LogProperties.Elasticsearch7Properties properties) {
+ this.elasticsearch7Client = elasticsearch7Client;
+ this.properties = properties;
+ }
+
+ @Override
+ public void batchAdd(List jobInstanceTaskLogs) {
+ BulkRequest bulkRequest = new BulkRequest();
+ jobInstanceTaskLogs.forEach(p -> {
+ try {
+ ProcessorLogElasticDTO processorLogElasticDTO = new ProcessorLogElasticDTO();
+ processorLogElasticDTO.setTaskId(p.getTaskId());
+ processorLogElasticDTO.setWorkerAddress(p.getWorkerAddress());
+ processorLogElasticDTO.setTime(p.getTime());
+ processorLogElasticDTO.setFields(p.getFields().stream().collect(Collectors.toMap(ProcessorLogFieldDTO::getName, ProcessorLogFieldDTO::getValue)));
+
+ // Json
+ ObjectMapper objectMapper = new ObjectMapper();
+ String jsonLog = objectMapper.writeValueAsString(processorLogElasticDTO);
+
+ // Index request
+ IndexRequest indexRequest = new IndexRequest(this.getCreateIndex());
+ indexRequest.id(UUID.randomUUID().toString()).source(jsonLog, XContentType.JSON);
+ bulkRequest.add(indexRequest);
+ } catch (Exception exception) {
+ throw new RuntimeException("Elasticsearch7 format ", exception);
+ }
+ });
+
+ // Async bulk
+ bulkRequest.timeout(TimeValue.timeValueMillis(this.properties.getSocketTimeout()));
+ this.elasticsearch7Client.getClient().bulkAsync(bulkRequest, this.elasticsearch7Client.getRequestOptions(), new BulkListener());
+ }
+
+ @Override
+ public List queryByPage(String taskUniqueId, Long time, Long size) throws Exception {
+ SearchRequest searchRequest = new SearchRequest(this.getSearchIndex());
+ SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+
+ // Bool query builder
+ BoolQueryBuilder boolBuilder = QueryBuilders.boolQuery();
+
+ // `taskId=?`
+ // Append `.keyword` to exact Match
+ boolBuilder.must(QueryBuilders.termQuery("taskId" + ".keyword", taskUniqueId));
+
+ // `time >= ?`
+ RangeQueryBuilder timeQueryBuilder = QueryBuilders.rangeQuery("time");
+ timeQueryBuilder.gte(time);
+ boolBuilder.must(timeQueryBuilder);
+
+ searchSourceBuilder.query(boolBuilder);
+ searchSourceBuilder.size(size.intValue());
+ searchSourceBuilder.sort(new FieldSortBuilder("time").order(SortOrder.DESC));
+ searchRequest.source(searchSourceBuilder);
+ SearchResponse searchResponse = this.elasticsearch7Client.getClient().search(searchRequest, this.elasticsearch7Client.getRequestOptions());
+
+ SearchHit[] searchHit = searchResponse.getHits().getHits();
+ if (searchHit.length > 0) {
+ for (SearchHit hit : searchHit) {
+ System.out.println(hit.getSourceAsString());
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public List queryByPageSize(String taskUniqueId, String searchKey, Long page, Long size) throws Exception {
+ return null;
+ }
+
+ @Override
+ public void deleteByDays(Integer beforeDays) {
+
+ }
+
+ public static class BulkListener implements ActionListener {
+ @Override
+ public void onResponse(BulkResponse bulkItemResponses) {
+
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ log.error("Elasticsearch7 bulk failed!", e);
+ }
+ }
+
+ private String getCreateIndex() {
+ return String.format("openjob_%s", "20230522");
+ }
+
+ private String getSearchIndex() {
+ return "openjob*";
+ }
+}
diff --git a/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/dao/impl/JdbcDAOImpl.java b/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/dao/impl/JdbcDAOImpl.java
index 920c3198..55abce27 100644
--- a/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/dao/impl/JdbcDAOImpl.java
+++ b/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/dao/impl/JdbcDAOImpl.java
@@ -5,8 +5,8 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import io.openjob.server.log.client.AbstractJdbcHikariClient;
import io.openjob.server.log.dao.LogDAO;
-import io.openjob.server.log.dto.ProcessorLog;
-import io.openjob.server.log.dto.ProcessorLogField;
+import io.openjob.server.log.dto.ProcessorLogDTO;
+import io.openjob.server.log.dto.ProcessorLogFieldDTO;
import java.sql.Connection;
import java.sql.PreparedStatement;
@@ -30,7 +30,7 @@ public JdbcDAOImpl(AbstractJdbcHikariClient jdbcHikariClient) {
}
@Override
- public void batchAdd(List processorLogList) throws SQLException {
+ public void batchAdd(List processorLogList) throws Exception {
String sql = "INSERT INTO `processor_log` ("
+ "`task_id`,"
+ "`worker_address`,"
@@ -42,7 +42,7 @@ public void batchAdd(List processorLogList) throws SQLException {
try (Connection connection = this.jdbcHikariClient.getConnection()) {
ps = connection.prepareStatement(sql);
connection.setAutoCommit(false);
- for (ProcessorLog processorLog : processorLogList) {
+ for (ProcessorLogDTO processorLog : processorLogList) {
ps.setString(1, processorLog.getTaskId());
ps.setString(2, processorLog.getWorkerAddress());
ps.setString(3, this.getContent(processorLog.getFields()));
@@ -61,7 +61,7 @@ public void batchAdd(List processorLogList) throws SQLException {
}
@Override
- public List queryByPage(String taskUniqueId, Long time, Long size) throws SQLException {
+ public List queryByPage(String taskUniqueId, Long time, Long size) throws Exception {
ResultSet rs = null;
String sql = "SELECT * FROM `processor_log` WHERE `task_id`=? AND `time` > ? limit ?";
try (Connection connection = this.jdbcHikariClient.getConnection(); PreparedStatement ps = connection.prepareStatement(sql)) {
@@ -70,7 +70,7 @@ public List queryByPage(String taskUniqueId, Long time, Long size)
ps.setLong(3, size);
rs = ps.executeQuery();
- List taskLogList = new ArrayList<>();
+ List taskLogList = new ArrayList<>();
while (rs.next()) {
taskLogList.add(convert(rs));
}
@@ -82,9 +82,19 @@ public List queryByPage(String taskUniqueId, Long time, Long size)
}
}
- private String getContent(List fields) {
+ @Override
+ public List queryByPageSize(String taskUniqueId, String searchKey, Long page, Long size) throws Exception {
+ return null;
+ }
+
+ @Override
+ public void deleteByDays(Integer beforeDays) {
+
+ }
+
+ private String getContent(List fields) {
Map fieldMap = fields.stream()
- .collect(Collectors.toMap(ProcessorLogField::getName, ProcessorLogField::getValue));
+ .collect(Collectors.toMap(ProcessorLogFieldDTO::getName, ProcessorLogFieldDTO::getValue));
// Format log fields
try {
@@ -95,8 +105,8 @@ private String getContent(List fields) {
}
}
- private ProcessorLog convert(ResultSet rs) throws SQLException {
- ProcessorLog taskLog = new ProcessorLog();
+ private ProcessorLogDTO convert(ResultSet rs) throws SQLException {
+ ProcessorLogDTO taskLog = new ProcessorLogDTO();
taskLog.setTaskId(rs.getString("task_id"));
taskLog.setWorkerAddress(rs.getString("worker_address"));
taskLog.setTime(rs.getLong("time"));
@@ -108,8 +118,8 @@ private ProcessorLog convert(ResultSet rs) throws SQLException {
Map fieldMap = mapper.readValue(content, new TypeReference