diff --git a/openjob-common/pom.xml b/openjob-common/pom.xml
index cc86fa87..60ff139d 100644
--- a/openjob-common/pom.xml
+++ b/openjob-common/pom.xml
@@ -5,7 +5,7 @@
openjob
io.openjob
- 1.0.1
+ 1.0.2
4.0.0
openjob-common
diff --git a/openjob-server/openjob-server-admin/pom.xml b/openjob-server/openjob-server-admin/pom.xml
index d4ac0dbd..e7b2c4da 100644
--- a/openjob-server/openjob-server-admin/pom.xml
+++ b/openjob-server/openjob-server-admin/pom.xml
@@ -5,7 +5,7 @@
openjob-server
io.openjob
- 1.0.1
+ 1.0.2
4.0.0
openjob-server-admin
diff --git a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/request/delay/ListDelayInstanceLogRequest.java b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/request/delay/ListDelayInstanceLogRequest.java
index 2ff9a04f..2aac5ec1 100644
--- a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/request/delay/ListDelayInstanceLogRequest.java
+++ b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/request/delay/ListDelayInstanceLogRequest.java
@@ -29,5 +29,5 @@ public class ListDelayInstanceLogRequest {
private Long time = 0L;
@ApiModelProperty(value = "Page size", required = true)
- private Long size = 20L;
+ private Integer size = 20;
}
diff --git a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/request/job/ListProcessorLogRequest.java b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/request/job/ListProcessorLogRequest.java
index 96eb5781..27402d40 100644
--- a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/request/job/ListProcessorLogRequest.java
+++ b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/request/job/ListProcessorLogRequest.java
@@ -37,5 +37,5 @@ public class ListProcessorLogRequest {
private Long time = 0L;
@ApiModelProperty(value = "Page size", required = true)
- private Long size = 20L;
+ private Integer size = 20;
}
diff --git a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/DelayInstanceServiceImpl.java b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/DelayInstanceServiceImpl.java
index 41a665e9..fec5e10e 100644
--- a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/DelayInstanceServiceImpl.java
+++ b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/DelayInstanceServiceImpl.java
@@ -18,7 +18,7 @@
import io.openjob.server.common.util.PageUtil;
import io.openjob.server.common.vo.PageVO;
import io.openjob.server.log.dao.LogDAO;
-import io.openjob.server.log.dto.ProcessorLog;
+import io.openjob.server.log.dto.ProcessorLogDTO;
import io.openjob.server.repository.dao.DelayInstanceDAO;
import io.openjob.server.repository.dto.DelayInstancePageDTO;
import io.openjob.server.repository.entity.DelayInstance;
@@ -30,7 +30,6 @@
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
-import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
@@ -91,7 +90,7 @@ public ListDelayInstanceLogVO listProcessorLog(ListDelayInstanceLogRequest reque
AtomicLong nextTime = new AtomicLong(0L);
Integer isComplete = CommonConstant.NO;
try {
- List processorLogs = this.logDAO.queryByPage(request.getTaskId(), request.getTime(), request.getSize());
+ List processorLogs = this.logDAO.queryByScroll(request.getTaskId(), request.getTime(), request.getSize());
if (!CollectionUtils.isEmpty(processorLogs)) {
// Processor list and nextTime.
@@ -108,7 +107,7 @@ public ListDelayInstanceLogVO listProcessorLog(ListDelayInstanceLogRequest reque
}
}
}
- } catch (SQLException e) {
+ } catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/JobInstanceServiceImpl.java b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/JobInstanceServiceImpl.java
index c81b00da..f473d0bb 100644
--- a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/JobInstanceServiceImpl.java
+++ b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/JobInstanceServiceImpl.java
@@ -21,7 +21,7 @@
import io.openjob.server.common.util.PageUtil;
import io.openjob.server.common.vo.PageVO;
import io.openjob.server.log.dao.LogDAO;
-import io.openjob.server.log.dto.ProcessorLog;
+import io.openjob.server.log.dto.ProcessorLogDTO;
import io.openjob.server.repository.dao.JobInstanceDAO;
import io.openjob.server.repository.dao.JobInstanceLogDAO;
import io.openjob.server.repository.dto.JobInstancePageDTO;
@@ -34,7 +34,6 @@
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
-import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
@@ -101,7 +100,7 @@ public ListProcessorLogVO getProcessorList(ListProcessorLogRequest request) {
Integer isComplete = CommonConstant.NO;
try {
String taskId = TaskUtil.getRandomUniqueId(request.getJobId(), request.getJobInstanceId(), 0L, 0L);
- List processorLogs = this.logDAO.queryByPage(taskId, request.getTime(), request.getSize());
+ List processorLogs = this.logDAO.queryByScroll(taskId, request.getTime(), request.getSize());
if (!CollectionUtils.isEmpty(processorLogs)) {
// Processor list and nextTime.
@@ -118,7 +117,7 @@ public ListProcessorLogVO getProcessorList(ListProcessorLogRequest request) {
}
}
}
- } catch (SQLException e) {
+ } catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/util/LogFormatUtil.java b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/util/LogFormatUtil.java
index 6d2108cf..f845f9fe 100644
--- a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/util/LogFormatUtil.java
+++ b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/util/LogFormatUtil.java
@@ -2,13 +2,11 @@
import io.openjob.common.constant.LogFieldConstant;
import io.openjob.common.util.DateUtil;
-import io.openjob.server.log.dto.ProcessorLog;
-import io.openjob.server.log.dto.ProcessorLogField;
+import io.openjob.server.log.dto.ProcessorLogDTO;
+import io.openjob.server.log.dto.ProcessorLogFieldDTO;
import org.apache.commons.lang3.StringUtils;
import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
import java.util.stream.Collectors;
/**
@@ -29,9 +27,9 @@ public class LogFormatUtil {
* @param processorLog processorLog
* @return String
*/
- public static String formatLog(ProcessorLog processorLog) {
+ public static String formatLog(ProcessorLogDTO processorLog) {
Map fieldMap = processorLog.getFields().stream()
- .collect(Collectors.toMap(ProcessorLogField::getName, ProcessorLogField::getValue));
+ .collect(Collectors.toMap(ProcessorLogFieldDTO::getName, ProcessorLogFieldDTO::getValue));
String location = fieldMap.get(LogFieldConstant.LOCATION);
String message = String.format(
LOG_FORMAT,
diff --git a/openjob-server/openjob-server-cluster/pom.xml b/openjob-server/openjob-server-cluster/pom.xml
index 399f1e4a..54084e31 100644
--- a/openjob-server/openjob-server-cluster/pom.xml
+++ b/openjob-server/openjob-server-cluster/pom.xml
@@ -5,7 +5,7 @@
openjob-server
io.openjob
- 1.0.1
+ 1.0.2
4.0.0
diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/JobInstanceTaskLogService.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/JobInstanceTaskLogService.java
index d40178f9..2c9c5963 100644
--- a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/JobInstanceTaskLogService.java
+++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/JobInstanceTaskLogService.java
@@ -5,13 +5,12 @@
import io.openjob.common.request.WorkerJobInstanceTaskLogFieldRequest;
import io.openjob.common.request.WorkerJobInstanceTaskLogRequest;
import io.openjob.server.log.dao.LogDAO;
-import io.openjob.server.log.dto.ProcessorLog;
+import io.openjob.server.log.dto.ProcessorLogDTO;
import io.openjob.server.log.mapper.LogMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -37,7 +36,7 @@ public JobInstanceTaskLogService(LogDAO logDAO) {
* @param logReq log request.
*/
public void handleInstanceTaskLog(WorkerJobInstanceTaskLogRequest logReq) {
- List processorLogList = logReq.getFieldList().stream().map(fields -> {
+ List processorLogList = logReq.getFieldList().stream().map(fields -> {
// Field map.
Map> fieldMap = fields.stream()
.collect(Collectors.groupingBy(WorkerJobInstanceTaskLogFieldRequest::getName));
@@ -54,7 +53,7 @@ public void handleInstanceTaskLog(WorkerJobInstanceTaskLogRequest logReq) {
String timeStamp = Optional.ofNullable(fieldMap.get(LogFieldConstant.TIME_STAMP))
.orElseGet(() -> Lists.newArrayList(new WorkerJobInstanceTaskLogFieldRequest())).get(0).getValue();
- ProcessorLog processorLog = new ProcessorLog();
+ ProcessorLogDTO processorLog = new ProcessorLogDTO();
processorLog.setTaskId(taskId);
processorLog.setWorkerAddress(workerAddress);
processorLog.setTime(Long.valueOf(timeStamp));
@@ -64,7 +63,7 @@ public void handleInstanceTaskLog(WorkerJobInstanceTaskLogRequest logReq) {
try {
logDAO.batchAdd(processorLogList);
- } catch (SQLException e) {
+ } catch (Exception e) {
log.error("Batch add task log failed!", e);
throw new RuntimeException(e);
}
diff --git a/openjob-server/openjob-server-common/pom.xml b/openjob-server/openjob-server-common/pom.xml
index cce0e1da..18399465 100644
--- a/openjob-server/openjob-server-common/pom.xml
+++ b/openjob-server/openjob-server-common/pom.xml
@@ -5,7 +5,7 @@
openjob-server
io.openjob
- 1.0.1
+ 1.0.2
4.0.0
openjob-server-common
diff --git a/openjob-server/openjob-server-common/src/main/java/io/openjob/server/common/util/JsonUtil.java b/openjob-server/openjob-server-common/src/main/java/io/openjob/server/common/util/JsonUtil.java
new file mode 100644
index 00000000..7aa4cf44
--- /dev/null
+++ b/openjob-server/openjob-server-common/src/main/java/io/openjob/server/common/util/JsonUtil.java
@@ -0,0 +1,59 @@
+package io.openjob.server.common.util;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * @author stelin swoft@qq.com
+ * @since 1.0.2
+ */
+public class JsonUtil {
+ public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ /**
+ * Json encode
+ *
+ * @param object object
+ * @return String
+ */
+ public static String encode(Object object) {
+ try {
+ return OBJECT_MAPPER.writeValueAsString(object);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException("Json encode failed!", e);
+ }
+ }
+
+ /**
+ * Json decode
+ *
+ * @param json json
+ * @param typeReference typeReference
+ * @param T
+ * @return T
+ */
+ public static T decode(String json, TypeReference typeReference) {
+ try {
+ return OBJECT_MAPPER.readValue(json, typeReference);
+ } catch (Exception e) {
+ throw new RuntimeException("Json decode failed!", e);
+ }
+ }
+
+ /**
+ * Json decode
+ *
+ * @param json json
+ * @param targetClass targetClass
+ * @param T
+ * @return T
+ */
+ public static T decode(String json, Class targetClass) {
+ try {
+ return OBJECT_MAPPER.readValue(json, targetClass);
+ } catch (Exception e) {
+ throw new RuntimeException("Json decode failed!", e);
+ }
+ }
+}
diff --git a/openjob-server/openjob-server-dispatcher/pom.xml b/openjob-server/openjob-server-dispatcher/pom.xml
index 41d7cde7..f5c72de9 100644
--- a/openjob-server/openjob-server-dispatcher/pom.xml
+++ b/openjob-server/openjob-server-dispatcher/pom.xml
@@ -5,7 +5,7 @@
openjob-server
io.openjob
- 1.0.1
+ 1.0.2
4.0.0
diff --git a/openjob-server/openjob-server-log/pom.xml b/openjob-server/openjob-server-log/pom.xml
index db107b2d..68b903dc 100644
--- a/openjob-server/openjob-server-log/pom.xml
+++ b/openjob-server/openjob-server-log/pom.xml
@@ -5,7 +5,7 @@
openjob-server
io.openjob
- 1.0.1
+ 1.0.2
4.0.0
openjob-server-log
@@ -32,6 +32,23 @@
openjob-server-common
+
+ org.elasticsearch.client
+ elasticsearch-rest-client
+ 7.0.0
+
+
+
+ org.elasticsearch
+ elasticsearch
+ 7.0.0
+
+
+ org.elasticsearch.client
+ elasticsearch-rest-high-level-client
+ 7.0.0
+
+
com.h2database
diff --git a/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/autoconfigure/LogAutoConfiguration.java b/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/autoconfigure/LogAutoConfiguration.java
index c32a7f84..2df7d255 100644
--- a/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/autoconfigure/LogAutoConfiguration.java
+++ b/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/autoconfigure/LogAutoConfiguration.java
@@ -1,9 +1,11 @@
package io.openjob.server.log.autoconfigure;
+import io.openjob.server.log.client.Elasticsearch7Client;
import io.openjob.server.log.client.H2Client;
import io.openjob.server.log.client.MysqlClient;
import io.openjob.server.log.constant.LogStorageConstant;
import io.openjob.server.log.dao.LogDAO;
+import io.openjob.server.log.dao.impl.Elasticsearch7DAOImpl;
import io.openjob.server.log.dao.impl.H2LogDAOImpl;
import io.openjob.server.log.dao.impl.MysqlLogDAOImpl;
import org.springframework.beans.factory.annotation.Autowired;
@@ -51,4 +53,17 @@ public LogDAO h2LogDAO(MysqlClient mysqlClient) {
return new MysqlLogDAOImpl(mysqlClient);
}
}
+
+ @ConditionalOnProperty(prefix = "openjob.log.storage", name = "selector", havingValue = LogStorageConstant.ELASTICSEARCH7)
+ public class Elasticsearch7autoconfiguration {
+ @Bean
+ public Elasticsearch7Client mysqlClient() {
+ return new Elasticsearch7Client(logProperties.getStorage().getElasticsearch7());
+ }
+
+ @Bean
+ public LogDAO h2LogDAO(Elasticsearch7Client elasticsearch7Client, LogProperties.Elasticsearch7Properties properties) {
+ return new Elasticsearch7DAOImpl(elasticsearch7Client, properties);
+ }
+ }
}
diff --git a/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/autoconfigure/LogProperties.java b/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/autoconfigure/LogProperties.java
index 522dee23..74f739d9 100644
--- a/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/autoconfigure/LogProperties.java
+++ b/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/autoconfigure/LogProperties.java
@@ -21,10 +21,7 @@ public static class Storage {
private H2Properties h2 = new H2Properties();
private MysqlProperties mysql = new MysqlProperties();
private TidbProperties tidb = new TidbProperties();
- private ElasticsearchProperties elasticsearch = new ElasticsearchProperties();
private Elasticsearch7Properties elasticsearch7 = new Elasticsearch7Properties();
- private SlsProperties sls = new SlsProperties();
- private ClsProperties cls = new ClsProperties();
}
@Data
@@ -42,30 +39,16 @@ public static class TidbProperties {
private JdbcProperties properties = new JdbcProperties();
}
- @Data
- public static class ElasticsearchProperties {
-
- }
-
@Data
public static class Elasticsearch7Properties {
-
- }
-
- /**
- * Aliyun sls
- */
- @Data
- public static class SlsProperties {
-
- }
-
- /**
- * Qcloud cls
- */
- @Data
- public static class ClsProperties {
-
+ private String clusterNodes;
+ private String username;
+ private String password;
+ private String protocol = "http";
+ private Integer connectTimeout = 3000;
+ private Integer socketTimeout = 3000;
+ private Integer responseTimeout = 3000;
+ private Integer bufferLimit = 104857600;
}
@Data
diff --git a/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/client/AbstractJdbcHikariClient.java b/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/client/AbstractJdbcHikariClient.java
index 267512b7..1b35ff6c 100644
--- a/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/client/AbstractJdbcHikariClient.java
+++ b/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/client/AbstractJdbcHikariClient.java
@@ -19,6 +19,7 @@ public abstract class AbstractJdbcHikariClient implements Client {
/**
* Init
+ *
* @param properties properties
*/
public void init(LogProperties.JdbcProperties properties) {
@@ -39,6 +40,7 @@ public void init(LogProperties.JdbcProperties properties) {
/**
* Init table.
+ *
* @throws SQLException SQLException
*/
public void initTable() throws SQLException {
@@ -63,7 +65,7 @@ public void initTable() throws SQLException {
* @return Connection
*/
public Connection getConnection() throws SQLException {
- return dataSource.getConnection();
+ return this.dataSource.getConnection();
}
@Override
@@ -73,6 +75,6 @@ public void connect() {
@Override
public void shutdown() {
-
+ this.dataSource.close();
}
}
diff --git a/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/client/Client.java b/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/client/Client.java
index 8c09821c..160522e2 100644
--- a/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/client/Client.java
+++ b/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/client/Client.java
@@ -1,12 +1,14 @@
package io.openjob.server.log.client;
+import org.springframework.beans.factory.InitializingBean;
+
import java.io.IOException;
/**
* @author stelin swoft@qq.com
* @since 1.0.0
*/
-public interface Client {
+public interface Client extends InitializingBean {
/**
* Connect
*
diff --git a/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/client/Elasticsearch7Client.java b/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/client/Elasticsearch7Client.java
new file mode 100644
index 00000000..d573dc4b
--- /dev/null
+++ b/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/client/Elasticsearch7Client.java
@@ -0,0 +1,117 @@
+package io.openjob.server.log.client;
+
+import io.openjob.server.log.autoconfigure.LogProperties;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.elasticsearch.client.HttpAsyncResponseConsumerFactory;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Objects;
+
+/**
+ * @author stelin swoft@qq.com
+ * @since 1.0.2
+ */
+public class Elasticsearch7Client implements Client {
+ /**
+ * Host split count
+ */
+ private static final Integer HOST_SPLIT_COUNT = 2;
+
+ private final LogProperties.Elasticsearch7Properties properties;
+ private RestHighLevelClient client;
+ private RequestOptions requestOptions;
+
+ public Elasticsearch7Client(LogProperties.Elasticsearch7Properties elasticsearch7Properties) {
+ this.properties = elasticsearch7Properties;
+ }
+
+ @Override
+ public void afterPropertiesSet() throws Exception {
+ // Connect client
+ this.connect();
+
+ // Request options
+ this.clientRequestOptions();
+ }
+
+ @Override
+ public void connect() throws Exception {
+ // Cluster hosts
+ RestClientBuilder builder = RestClient.builder(this.clientClusterHosts());
+
+ // Cluster credentials
+ this.clientCredentials(builder);
+ client = new RestHighLevelClient(builder);
+ }
+
+ @Override
+ public void shutdown() throws IOException {
+ this.client.close();
+ }
+
+ public RestHighLevelClient getClient() {
+ return client;
+ }
+
+ public RequestOptions getRequestOptions() {
+ return requestOptions;
+ }
+
+ /**
+ * Client credentials
+ *
+ * @param builder builder
+ */
+ private void clientCredentials(RestClientBuilder builder) {
+ if (Objects.isNull(this.properties.getUsername()) || Objects.isNull(this.properties.getPassword())) {
+ return;
+ }
+
+ CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+ credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(this.properties.getUsername(), this.properties.getPassword()));
+ builder.setHttpClientConfigCallback(f -> f.setDefaultCredentialsProvider(credentialsProvider));
+ }
+
+ /**
+ * Client hosts
+ *
+ * @return HttpHost[]
+ */
+ private HttpHost[] clientClusterHosts() {
+ if (StringUtils.isBlank(this.properties.getClusterNodes())) {
+ throw new RuntimeException("Elasticsearch7 `clusterNodes` can not be empty!");
+ }
+
+ if (StringUtils.isBlank(this.properties.getProtocol())) {
+ throw new RuntimeException("Elasticsearch7 `protocol` can not be empty!");
+ }
+
+ return Arrays.stream(this.properties.getClusterNodes().split(",")).map(cn -> {
+ String[] clusterSplit = cn.split(":");
+ if (clusterSplit.length != HOST_SPLIT_COUNT) {
+ throw new RuntimeException(String.format("Elasticsearch7 `clusterNodes` is invalid(clusterNode=%s)!", cn));
+ }
+ return new HttpHost(clusterSplit[0], Integer.parseInt(clusterSplit[1]), this.properties.getProtocol());
+ }).toArray(HttpHost[]::new);
+ }
+
+ /**
+ * Client default request options
+ */
+ private void clientRequestOptions() {
+ RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
+ builder.setHttpAsyncResponseConsumerFactory(
+ new HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory(this.properties.getBufferLimit()));
+ this.requestOptions = builder.build();
+ }
+}
diff --git a/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/client/H2Client.java b/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/client/H2Client.java
index bb6dbbcd..19163ec8 100644
--- a/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/client/H2Client.java
+++ b/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/client/H2Client.java
@@ -22,7 +22,10 @@ public class H2Client extends AbstractJdbcHikariClient {
*/
public H2Client(LogProperties.H2Properties h2Properties) {
this.h2Properties = h2Properties;
+ }
+ @Override
+ public void afterPropertiesSet() {
// Driver
LogProperties.JdbcProperties properties = h2Properties.getProperties();
if (Objects.isNull(properties.getDriver())) {
diff --git a/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/client/MysqlClient.java b/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/client/MysqlClient.java
index e68881d7..e516f5e7 100644
--- a/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/client/MysqlClient.java
+++ b/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/client/MysqlClient.java
@@ -19,7 +19,10 @@ public class MysqlClient extends AbstractJdbcHikariClient {
*/
public MysqlClient(LogProperties.MysqlProperties mysqlProperties) {
this.mysqlProperties = mysqlProperties;
+ }
+ @Override
+ public void afterPropertiesSet() {
// Driver
LogProperties.JdbcProperties properties = mysqlProperties.getProperties();
if (Objects.isNull(properties.getDriver())) {
diff --git a/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/constant/LogStorageConstant.java b/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/constant/LogStorageConstant.java
index b4bc865f..e51b37f7 100644
--- a/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/constant/LogStorageConstant.java
+++ b/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/constant/LogStorageConstant.java
@@ -8,8 +8,5 @@ public class LogStorageConstant {
public static final String H2 = "h2";
public static final String MYSQL = "mysql";
public static final String TIDB = "tidb";
- public static final String ELASTICSEARCH = "elasticsearch";
public static final String ELASTICSEARCH7 = "elasticsearch7";
- public static final String SLS = "sls";
- public static final String CLS = "cls";
}
diff --git a/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/dao/LogDAO.java b/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/dao/LogDAO.java
index d5c1b277..531c472a 100644
--- a/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/dao/LogDAO.java
+++ b/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/dao/LogDAO.java
@@ -1,6 +1,7 @@
package io.openjob.server.log.dao;
-import io.openjob.server.log.dto.ProcessorLog;
+import io.openjob.server.common.dto.PageDTO;
+import io.openjob.server.log.dto.ProcessorLogDTO;
import java.sql.SQLException;
import java.util.List;
@@ -15,9 +16,9 @@ public interface LogDAO {
* Batch add.
*
* @param jobInstanceTaskLogs jobInstanceTaskLogs
- * @throws SQLException SQLException
+ * @throws Exception Exception
*/
- void batchAdd(List jobInstanceTaskLogs) throws SQLException;
+ void batchAdd(List jobInstanceTaskLogs) throws Exception;
/**
* Query by page.
@@ -26,7 +27,27 @@ public interface LogDAO {
* @param time time
* @param size size
* @return List
- * @throws SQLException SQLException
+ * @throws Exception Exception
*/
- List queryByPage(String taskUniqueId, Long time, Long size) throws SQLException;
+ List queryByScroll(String taskUniqueId, Long time, Integer size) throws Exception;
+
+ /**
+ * Query by page size
+ *
+ * @param taskUniqueId taskUniqueId
+ * @param searchKey searchKey
+ * @param page page
+ * @param size size
+ * @return PageDTO
+ * @throws Exception Exception
+ */
+ PageDTO queryByPageSize(String taskUniqueId, String searchKey, Integer page, Integer size) throws Exception;
+
+ /**
+ * Delete log before days
+ *
+ * @param beforeDays beforeDays
+ * @throws Exception Exception
+ */
+ void deleteByDays(Integer beforeDays) throws Exception;
}
diff --git a/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/dao/impl/Elasticsearch7DAOImpl.java b/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/dao/impl/Elasticsearch7DAOImpl.java
new file mode 100644
index 00000000..0b9c6010
--- /dev/null
+++ b/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/dao/impl/Elasticsearch7DAOImpl.java
@@ -0,0 +1,214 @@
+package io.openjob.server.log.dao.impl;
+
+import io.openjob.common.constant.LogFieldConstant;
+import io.openjob.server.common.dto.PageDTO;
+import io.openjob.server.common.util.JsonUtil;
+import io.openjob.server.log.autoconfigure.LogProperties;
+import io.openjob.server.log.client.Elasticsearch7Client;
+import io.openjob.server.log.dao.LogDAO;
+import io.openjob.server.log.dto.ProcessorLogDTO;
+import io.openjob.server.log.dto.ProcessorLogElasticDTO;
+import io.openjob.server.log.dto.ProcessorLogFieldDTO;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.index.query.BoolQueryBuilder;
+import org.elasticsearch.index.query.MultiMatchQueryBuilder;
+import org.elasticsearch.index.query.Operator;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.query.RangeQueryBuilder;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.sort.FieldSortBuilder;
+import org.elasticsearch.search.sort.SortOrder;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/**
+ * Elasticsearch7 client document
+ * ...
+ *
+ * @author stelin swoft@qq.com
+ * @since 1.0.2
+ */
+@Slf4j
+public class Elasticsearch7DAOImpl implements LogDAO {
+ private final Elasticsearch7Client elasticsearch7Client;
+ private final LogProperties.Elasticsearch7Properties properties;
+
+ public Elasticsearch7DAOImpl(Elasticsearch7Client elasticsearch7Client, LogProperties.Elasticsearch7Properties properties) {
+ this.elasticsearch7Client = elasticsearch7Client;
+ this.properties = properties;
+ }
+
+ @Override
+ public void batchAdd(List jobInstanceTaskLogs) {
+ BulkRequest bulkRequest = new BulkRequest();
+ jobInstanceTaskLogs.forEach(p -> {
+ try {
+ ProcessorLogElasticDTO processorLogElasticDTO = new ProcessorLogElasticDTO();
+ processorLogElasticDTO.setTaskId(p.getTaskId());
+ processorLogElasticDTO.setWorkerAddress(p.getWorkerAddress());
+ processorLogElasticDTO.setTime(p.getTime());
+
+ // Field map
+ Map fieldMap = new HashMap<>(32);
+ p.getFields().forEach(plf -> {
+ if (LogFieldConstant.MESSAGE.equals(plf.getName())) {
+ processorLogElasticDTO.setMessage(plf.getValue());
+ return;
+ }
+
+ if (LogFieldConstant.THROWABLE.equals(plf.getName())) {
+ processorLogElasticDTO.setThrowable(plf.getValue());
+ return;
+ }
+
+ fieldMap.put(plf.getName(), plf.getValue());
+ });
+ processorLogElasticDTO.setFields(fieldMap);
+
+ // Json
+ String jsonLog = JsonUtil.encode(processorLogElasticDTO);
+
+ // Index request
+ IndexRequest indexRequest = new IndexRequest(this.getCreateIndex());
+ indexRequest.id(UUID.randomUUID().toString()).source(jsonLog, XContentType.JSON);
+ bulkRequest.add(indexRequest);
+ } catch (Exception exception) {
+ throw new RuntimeException("Elasticsearch7 format ", exception);
+ }
+ });
+
+ // Async bulk
+ bulkRequest.timeout(TimeValue.timeValueMillis(this.properties.getSocketTimeout()));
+ this.elasticsearch7Client.getClient().bulkAsync(bulkRequest, this.elasticsearch7Client.getRequestOptions(), new BulkListener());
+ }
+
+ @Override
+ public List queryByScroll(String taskUniqueId, Long time, Integer size) throws Exception {
+ SearchRequest searchRequest = new SearchRequest(this.getSearchIndex());
+ SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+
+ // Bool query builder
+ BoolQueryBuilder boolBuilder = QueryBuilders.boolQuery();
+
+ // `taskId=?`
+ // Append `.keyword` to exact Match
+ boolBuilder.must(QueryBuilders.termQuery("taskId" + ".keyword", taskUniqueId));
+
+ // `time >= ?`
+ RangeQueryBuilder timeQueryBuilder = QueryBuilders.rangeQuery("time");
+ timeQueryBuilder.gte(time);
+ boolBuilder.must(timeQueryBuilder);
+
+ searchSourceBuilder.query(boolBuilder);
+ searchSourceBuilder.size(size);
+ searchSourceBuilder.sort(new FieldSortBuilder("time").order(SortOrder.DESC));
+ searchRequest.source(searchSourceBuilder);
+ return this.queryResult(searchRequest, 0, size).getList();
+ }
+
+ @Override
+ public PageDTO queryByPageSize(String taskUniqueId, String searchKey, Integer page, Integer size) throws IOException {
+
+ SearchRequest searchRequest = new SearchRequest(this.getSearchIndex());
+ SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+
+ // Bool query builder
+ BoolQueryBuilder boolBuilder = QueryBuilders.boolQuery();
+
+ // `taskId=?` and ( message=? or throwable=?)
+ // Append `.keyword` to exact Match
+ boolBuilder.must(QueryBuilders.termQuery("taskId" + ".keyword", taskUniqueId));
+ if (StringUtils.isNotBlank(searchKey)) {
+ MultiMatchQueryBuilder multiMatchQueryBuilder = QueryBuilders.multiMatchQuery(searchKey, LogFieldConstant.MESSAGE, LogFieldConstant.THROWABLE);
+ multiMatchQueryBuilder.operator(Operator.OR);
+ boolBuilder.must(multiMatchQueryBuilder);
+ }
+
+ // From
+ searchSourceBuilder.query(boolBuilder);
+ searchSourceBuilder.from((page - 1) * size);
+ searchSourceBuilder.size(size);
+ searchSourceBuilder.sort(new FieldSortBuilder("time").order(SortOrder.DESC));
+ searchRequest.source(searchSourceBuilder);
+ return this.queryResult(searchRequest, page, size);
+ }
+
+ private PageDTO queryResult(SearchRequest searchRequest, Integer page, Integer size) throws IOException {
+ SearchResponse searchResponse = this.elasticsearch7Client.getClient().search(searchRequest, this.elasticsearch7Client.getRequestOptions());
+ SearchHit[] searchHit = searchResponse.getHits().getHits();
+
+ // Processor log list
+ List processorLogList = Arrays.stream(searchHit).map(h -> {
+ String sourceJson = h.getSourceAsString();
+ ProcessorLogElasticDTO processorLogElasticDTO = JsonUtil.decode(sourceJson, ProcessorLogElasticDTO.class);
+
+ // Append search field
+ Map fieldsMap = processorLogElasticDTO.getFields();
+ fieldsMap.put(LogFieldConstant.MESSAGE, Optional.ofNullable(processorLogElasticDTO.getMessage()).orElse(""));
+ fieldsMap.put(LogFieldConstant.THROWABLE, Optional.ofNullable(processorLogElasticDTO.getThrowable()).orElse(""));
+
+ // Processor log
+ ProcessorLogDTO processorLogDTO = new ProcessorLogDTO();
+ processorLogDTO.setTime(processorLogElasticDTO.getTime());
+ processorLogDTO.setWorkerAddress(processorLogElasticDTO.getWorkerAddress());
+ processorLogDTO.setTaskId(processorLogElasticDTO.getTaskId());
+
+ // Processor log field
+ List fieldList = new ArrayList<>();
+ fieldsMap.forEach((n, v) -> fieldList.add(new ProcessorLogFieldDTO(n, v)));
+ processorLogDTO.setFields(fieldList);
+ return processorLogDTO;
+ }).collect(Collectors.toList());
+
+ // Page
+ PageDTO pageDTO = new PageDTO<>();
+ pageDTO.setPage(page);
+ pageDTO.setSize(size);
+ pageDTO.setList(processorLogList);
+ pageDTO.setTotal(searchResponse.getHits().getTotalHits().value);
+ return pageDTO;
+ }
+
+ @Override
+ public void deleteByDays(Integer beforeDays) {
+
+ }
+
+ public static class BulkListener implements ActionListener {
+ @Override
+ public void onResponse(BulkResponse bulkItemResponses) {
+
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ log.error("Elasticsearch7 bulk failed!", e);
+ }
+ }
+
+ private String getCreateIndex() {
+ return String.format("openjob_%s", "20230522");
+ }
+
+ private String getSearchIndex() {
+ return "openjob*";
+ }
+}
diff --git a/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/dao/impl/JdbcDAOImpl.java b/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/dao/impl/JdbcDAOImpl.java
index 920c3198..a8418fc5 100644
--- a/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/dao/impl/JdbcDAOImpl.java
+++ b/openjob-server/openjob-server-log/src/main/java/io/openjob/server/log/dao/impl/JdbcDAOImpl.java
@@ -3,10 +3,11 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
+import io.openjob.server.common.dto.PageDTO;
import io.openjob.server.log.client.AbstractJdbcHikariClient;
import io.openjob.server.log.dao.LogDAO;
-import io.openjob.server.log.dto.ProcessorLog;
-import io.openjob.server.log.dto.ProcessorLogField;
+import io.openjob.server.log.dto.ProcessorLogDTO;
+import io.openjob.server.log.dto.ProcessorLogFieldDTO;
import java.sql.Connection;
import java.sql.PreparedStatement;
@@ -30,7 +31,7 @@ public JdbcDAOImpl(AbstractJdbcHikariClient jdbcHikariClient) {
}
@Override
- public void batchAdd(List processorLogList) throws SQLException {
+ public void batchAdd(List processorLogList) throws Exception {
String sql = "INSERT INTO `processor_log` ("
+ "`task_id`,"
+ "`worker_address`,"
@@ -42,7 +43,7 @@ public void batchAdd(List processorLogList) throws SQLException {
try (Connection connection = this.jdbcHikariClient.getConnection()) {
ps = connection.prepareStatement(sql);
connection.setAutoCommit(false);
- for (ProcessorLog processorLog : processorLogList) {
+ for (ProcessorLogDTO processorLog : processorLogList) {
ps.setString(1, processorLog.getTaskId());
ps.setString(2, processorLog.getWorkerAddress());
ps.setString(3, this.getContent(processorLog.getFields()));
@@ -61,7 +62,7 @@ public void batchAdd(List processorLogList) throws SQLException {
}
@Override
- public List queryByPage(String taskUniqueId, Long time, Long size) throws SQLException {
+ public List queryByScroll(String taskUniqueId, Long time, Integer size) throws Exception {
ResultSet rs = null;
String sql = "SELECT * FROM `processor_log` WHERE `task_id`=? AND `time` > ? limit ?";
try (Connection connection = this.jdbcHikariClient.getConnection(); PreparedStatement ps = connection.prepareStatement(sql)) {
@@ -70,7 +71,7 @@ public List queryByPage(String taskUniqueId, Long time, Long size)
ps.setLong(3, size);
rs = ps.executeQuery();
- List taskLogList = new ArrayList<>();
+ List taskLogList = new ArrayList<>();
while (rs.next()) {
taskLogList.add(convert(rs));
}
@@ -82,9 +83,19 @@ public List queryByPage(String taskUniqueId, Long time, Long size)
}
}
- private String getContent(List fields) {
+ @Override
+ public PageDTO queryByPageSize(String taskUniqueId, String searchKey, Integer page, Integer size) {
+ return null;
+ }
+
+ @Override
+ public void deleteByDays(Integer beforeDays) {
+
+ }
+
+ private String getContent(List fields) {
Map fieldMap = fields.stream()
- .collect(Collectors.toMap(ProcessorLogField::getName, ProcessorLogField::getValue));
+ .collect(Collectors.toMap(ProcessorLogFieldDTO::getName, ProcessorLogFieldDTO::getValue));
// Format log fields
try {
@@ -95,8 +106,8 @@ private String getContent(List fields) {
}
}
- private ProcessorLog convert(ResultSet rs) throws SQLException {
- ProcessorLog taskLog = new ProcessorLog();
+ private ProcessorLogDTO convert(ResultSet rs) throws SQLException {
+ ProcessorLogDTO taskLog = new ProcessorLogDTO();
taskLog.setTaskId(rs.getString("task_id"));
taskLog.setWorkerAddress(rs.getString("worker_address"));
taskLog.setTime(rs.getLong("time"));
@@ -108,8 +119,8 @@ private ProcessorLog convert(ResultSet rs) throws SQLException {
Map fieldMap = mapper.readValue(content, new TypeReference