Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add elasticsearch7 #95

Merged
merged 7 commits into from May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion openjob-common/pom.xml
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>openjob</artifactId>
<groupId>io.openjob</groupId>
<version>1.0.1</version>
<version>1.0.2</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>openjob-common</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion openjob-server/openjob-server-admin/pom.xml
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>openjob-server</artifactId>
<groupId>io.openjob</groupId>
<version>1.0.1</version>
<version>1.0.2</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>openjob-server-admin</artifactId>
Expand Down
Expand Up @@ -29,5 +29,5 @@ public class ListDelayInstanceLogRequest {
private Long time = 0L;

@ApiModelProperty(value = "Page size", required = true)
private Long size = 20L;
private Integer size = 20;
}
Expand Up @@ -37,5 +37,5 @@ public class ListProcessorLogRequest {
private Long time = 0L;

@ApiModelProperty(value = "Page size", required = true)
private Long size = 20L;
private Integer size = 20;
}
Expand Up @@ -18,7 +18,7 @@
import io.openjob.server.common.util.PageUtil;
import io.openjob.server.common.vo.PageVO;
import io.openjob.server.log.dao.LogDAO;
import io.openjob.server.log.dto.ProcessorLog;
import io.openjob.server.log.dto.ProcessorLogDTO;
import io.openjob.server.repository.dao.DelayInstanceDAO;
import io.openjob.server.repository.dto.DelayInstancePageDTO;
import io.openjob.server.repository.entity.DelayInstance;
Expand All @@ -30,7 +30,6 @@
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;

import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

Expand Down Expand Up @@ -91,7 +90,7 @@ public ListDelayInstanceLogVO listProcessorLog(ListDelayInstanceLogRequest reque
AtomicLong nextTime = new AtomicLong(0L);
Integer isComplete = CommonConstant.NO;
try {
List<ProcessorLog> processorLogs = this.logDAO.queryByPage(request.getTaskId(), request.getTime(), request.getSize());
List<ProcessorLogDTO> processorLogs = this.logDAO.queryByScroll(request.getTaskId(), request.getTime(), request.getSize());

if (!CollectionUtils.isEmpty(processorLogs)) {
// Processor list and nextTime.
Expand All @@ -108,7 +107,7 @@ public ListDelayInstanceLogVO listProcessorLog(ListDelayInstanceLogRequest reque
}
}
}
} catch (SQLException e) {
} catch (Exception e) {
throw new RuntimeException(e);
}

Expand Down
Expand Up @@ -21,7 +21,7 @@
import io.openjob.server.common.util.PageUtil;
import io.openjob.server.common.vo.PageVO;
import io.openjob.server.log.dao.LogDAO;
import io.openjob.server.log.dto.ProcessorLog;
import io.openjob.server.log.dto.ProcessorLogDTO;
import io.openjob.server.repository.dao.JobInstanceDAO;
import io.openjob.server.repository.dao.JobInstanceLogDAO;
import io.openjob.server.repository.dto.JobInstancePageDTO;
Expand All @@ -34,7 +34,6 @@
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;

import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

Expand Down Expand Up @@ -101,7 +100,7 @@ public ListProcessorLogVO getProcessorList(ListProcessorLogRequest request) {
Integer isComplete = CommonConstant.NO;
try {
String taskId = TaskUtil.getRandomUniqueId(request.getJobId(), request.getJobInstanceId(), 0L, 0L);
List<ProcessorLog> processorLogs = this.logDAO.queryByPage(taskId, request.getTime(), request.getSize());
List<ProcessorLogDTO> processorLogs = this.logDAO.queryByScroll(taskId, request.getTime(), request.getSize());

if (!CollectionUtils.isEmpty(processorLogs)) {
// Processor list and nextTime.
Expand All @@ -118,7 +117,7 @@ public ListProcessorLogVO getProcessorList(ListProcessorLogRequest request) {
}
}
}
} catch (SQLException e) {
} catch (Exception e) {
throw new RuntimeException(e);
}

Expand Down
Expand Up @@ -2,13 +2,11 @@

import io.openjob.common.constant.LogFieldConstant;
import io.openjob.common.util.DateUtil;
import io.openjob.server.log.dto.ProcessorLog;
import io.openjob.server.log.dto.ProcessorLogField;
import io.openjob.server.log.dto.ProcessorLogDTO;
import io.openjob.server.log.dto.ProcessorLogFieldDTO;
import org.apache.commons.lang3.StringUtils;

import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

/**
Expand All @@ -29,9 +27,9 @@ public class LogFormatUtil {
* @param processorLog processorLog
* @return String
*/
public static String formatLog(ProcessorLog processorLog) {
public static String formatLog(ProcessorLogDTO processorLog) {
Map<String, String> fieldMap = processorLog.getFields().stream()
.collect(Collectors.toMap(ProcessorLogField::getName, ProcessorLogField::getValue));
.collect(Collectors.toMap(ProcessorLogFieldDTO::getName, ProcessorLogFieldDTO::getValue));
String location = fieldMap.get(LogFieldConstant.LOCATION);
String message = String.format(
LOG_FORMAT,
Expand Down
2 changes: 1 addition & 1 deletion openjob-server/openjob-server-cluster/pom.xml
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>openjob-server</artifactId>
<groupId>io.openjob</groupId>
<version>1.0.1</version>
<version>1.0.2</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Expand Up @@ -5,13 +5,12 @@
import io.openjob.common.request.WorkerJobInstanceTaskLogFieldRequest;
import io.openjob.common.request.WorkerJobInstanceTaskLogRequest;
import io.openjob.server.log.dao.LogDAO;
import io.openjob.server.log.dto.ProcessorLog;
import io.openjob.server.log.dto.ProcessorLogDTO;
import io.openjob.server.log.mapper.LogMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -37,7 +36,7 @@ public JobInstanceTaskLogService(LogDAO logDAO) {
* @param logReq log request.
*/
public void handleInstanceTaskLog(WorkerJobInstanceTaskLogRequest logReq) {
List<ProcessorLog> processorLogList = logReq.getFieldList().stream().map(fields -> {
List<ProcessorLogDTO> processorLogList = logReq.getFieldList().stream().map(fields -> {
// Field map.
Map<String, List<WorkerJobInstanceTaskLogFieldRequest>> fieldMap = fields.stream()
.collect(Collectors.groupingBy(WorkerJobInstanceTaskLogFieldRequest::getName));
Expand All @@ -54,7 +53,7 @@ public void handleInstanceTaskLog(WorkerJobInstanceTaskLogRequest logReq) {
String timeStamp = Optional.ofNullable(fieldMap.get(LogFieldConstant.TIME_STAMP))
.orElseGet(() -> Lists.newArrayList(new WorkerJobInstanceTaskLogFieldRequest())).get(0).getValue();

ProcessorLog processorLog = new ProcessorLog();
ProcessorLogDTO processorLog = new ProcessorLogDTO();
processorLog.setTaskId(taskId);
processorLog.setWorkerAddress(workerAddress);
processorLog.setTime(Long.valueOf(timeStamp));
Expand All @@ -64,7 +63,7 @@ public void handleInstanceTaskLog(WorkerJobInstanceTaskLogRequest logReq) {

try {
logDAO.batchAdd(processorLogList);
} catch (SQLException e) {
} catch (Exception e) {
log.error("Batch add task log failed!", e);
throw new RuntimeException(e);
}
Expand Down
2 changes: 1 addition & 1 deletion openjob-server/openjob-server-common/pom.xml
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>openjob-server</artifactId>
<groupId>io.openjob</groupId>
<version>1.0.1</version>
<version>1.0.2</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>openjob-server-common</artifactId>
Expand Down
@@ -0,0 +1,59 @@
package io.openjob.server.common.util;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;

/**
* @author stelin swoft@qq.com
* @since 1.0.2
*/
public class JsonUtil {
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

/**
* Json encode
*
* @param object object
* @return String
*/
public static String encode(Object object) {
try {
return OBJECT_MAPPER.writeValueAsString(object);
} catch (JsonProcessingException e) {
throw new RuntimeException("Json encode failed!", e);
}
}

/**
* Json decode
*
* @param json json
* @param typeReference typeReference
* @param <T> T
* @return T
*/
public static <T> T decode(String json, TypeReference<T> typeReference) {
try {
return OBJECT_MAPPER.readValue(json, typeReference);
} catch (Exception e) {
throw new RuntimeException("Json decode failed!", e);
}
}

/**
* Json decode
*
* @param json json
* @param targetClass targetClass
* @param <T> T
* @return T
*/
public static <T> T decode(String json, Class<T> targetClass) {
try {
return OBJECT_MAPPER.readValue(json, targetClass);
} catch (Exception e) {
throw new RuntimeException("Json decode failed!", e);
}
}
}
2 changes: 1 addition & 1 deletion openjob-server/openjob-server-dispatcher/pom.xml
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>openjob-server</artifactId>
<groupId>io.openjob</groupId>
<version>1.0.1</version>
<version>1.0.2</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
19 changes: 18 additions & 1 deletion openjob-server/openjob-server-log/pom.xml
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>openjob-server</artifactId>
<groupId>io.openjob</groupId>
<version>1.0.1</version>
<version>1.0.2</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>openjob-server-log</artifactId>
Expand All @@ -32,6 +32,23 @@
<artifactId>openjob-server-common</artifactId>
</dependency>

<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>7.0.0</version>
</dependency>

<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.0.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.0.0</version>
</dependency>

<!-- H2 -->
<dependency>
<groupId>com.h2database</groupId>
Expand Down
@@ -1,9 +1,11 @@
package io.openjob.server.log.autoconfigure;

import io.openjob.server.log.client.Elasticsearch7Client;
import io.openjob.server.log.client.H2Client;
import io.openjob.server.log.client.MysqlClient;
import io.openjob.server.log.constant.LogStorageConstant;
import io.openjob.server.log.dao.LogDAO;
import io.openjob.server.log.dao.impl.Elasticsearch7DAOImpl;
import io.openjob.server.log.dao.impl.H2LogDAOImpl;
import io.openjob.server.log.dao.impl.MysqlLogDAOImpl;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -51,4 +53,17 @@ public LogDAO h2LogDAO(MysqlClient mysqlClient) {
return new MysqlLogDAOImpl(mysqlClient);
}
}

@ConditionalOnProperty(prefix = "openjob.log.storage", name = "selector", havingValue = LogStorageConstant.ELASTICSEARCH7)
public class Elasticsearch7autoconfiguration {
@Bean
public Elasticsearch7Client mysqlClient() {
return new Elasticsearch7Client(logProperties.getStorage().getElasticsearch7());
}

@Bean
public LogDAO h2LogDAO(Elasticsearch7Client elasticsearch7Client, LogProperties.Elasticsearch7Properties properties) {
return new Elasticsearch7DAOImpl(elasticsearch7Client, properties);
}
}
}
Expand Up @@ -21,10 +21,7 @@ public static class Storage {
private H2Properties h2 = new H2Properties();
private MysqlProperties mysql = new MysqlProperties();
private TidbProperties tidb = new TidbProperties();
private ElasticsearchProperties elasticsearch = new ElasticsearchProperties();
private Elasticsearch7Properties elasticsearch7 = new Elasticsearch7Properties();
private SlsProperties sls = new SlsProperties();
private ClsProperties cls = new ClsProperties();
}

@Data
Expand All @@ -42,30 +39,16 @@ public static class TidbProperties {
private JdbcProperties properties = new JdbcProperties();
}

@Data
public static class ElasticsearchProperties {

}

@Data
public static class Elasticsearch7Properties {

}

/**
* Aliyun sls
*/
@Data
public static class SlsProperties {

}

/**
* Qcloud cls
*/
@Data
public static class ClsProperties {

private String clusterNodes;
private String username;
private String password;
private String protocol = "http";
private Integer connectTimeout = 3000;
private Integer socketTimeout = 3000;
private Integer responseTimeout = 3000;
private Integer bufferLimit = 104857600;
}

@Data
Expand Down