Skip to content

Commit

Permalink
support zabbix agent (#32)
Browse files Browse the repository at this point in the history
* support query history

* zabbix agent init

* support zabbix agent properties

* support expiredDataCleaner
  • Loading branch information
tomsun28 committed Mar 6, 2023
1 parent db27d07 commit e393010
Show file tree
Hide file tree
Showing 19 changed files with 510 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.zmops.open.collector.dispatch.entrance.zabbix.agent.ZabbixAgentService;
import com.zmops.open.collector.dispatch.timer.Timeout;
import com.zmops.open.collector.dispatch.timer.TimerDispatch;
import com.zmops.open.collector.dispatch.timer.WheelTimerTask;
Expand Down Expand Up @@ -64,6 +65,9 @@ public class CommonDispatcher implements MetricsTaskDispatch, CollectDataDispatc
* 触发子任务最大数量
*/
private static final int MAX_SUB_TASK_NUM = 50;

private static final long ITEM_ID_START = 10000L;
private static final long ITEM_ID_END = 99999L;
private static final Gson GSON = new Gson();
/**
* Priority queue of index group collection tasks
Expand All @@ -80,6 +84,7 @@ public class CommonDispatcher implements MetricsTaskDispatch, CollectDataDispatc
* 采集数据导出器
*/
private CommonDataQueue commonDataQueue;
private ZabbixAgentService zabbixAgentService;
/**
* Metric group task and start time mapping map
* 指标组任务与开始时间映射map
Expand All @@ -91,11 +96,13 @@ public class CommonDispatcher implements MetricsTaskDispatch, CollectDataDispatc
public CommonDispatcher(MetricsCollectorQueue jobRequestQueue,
TimerDispatch timerDispatch,
CommonDataQueue commonDataQueue,
ZabbixAgentService zabbixAgentService,
WorkerPool workerPool,
List<UnitConvert> unitConvertList) {
this.commonDataQueue = commonDataQueue;
this.jobRequestQueue = jobRequestQueue;
this.timerDispatch = timerDispatch;
this.zabbixAgentService = zabbixAgentService;
this.unitConvertList = unitConvertList;
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2, 2, 1,
TimeUnit.SECONDS,
Expand Down Expand Up @@ -206,7 +213,12 @@ public void dispatchCollectData(Timeout timeout, Metrics metrics, CollectRep.Met
if (job.isCyclic()) {
// If it is an asynchronous periodic cyclic task, directly send the collected data of the indicator group to the message middleware
// 若是异步的周期性循环任务,直接发送指标组的采集数据到消息中间件
commonDataQueue.sendMetricsData(metricsData);
if (job.getMonitorId() <= ITEM_ID_END && job.getMonitorId() >= ITEM_ID_START) {
// from zabbix send data to zabbix
zabbixAgentService.sendMetricsData(metricsData);
} else {
commonDataQueue.sendMetricsData(metricsData);
}
if (log.isDebugEnabled()) {
log.debug("Cyclic Job: {}",metricsData.getMetrics());
for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
* 调度分发任务配置属性
*
* @author tomsun28 from <a href="https://github.com/dromara/hertzbeat">hertzbeat</a>
* @date 2021/10/16 14:54
* @date 2023/03/06 13:17
*/
@Component
@ConfigurationProperties(prefix = "collector.dispatch")
Expand Down Expand Up @@ -72,6 +72,11 @@ public static class EntranceProperties {
*/
private EtcdProperties etcd;

/**
* zabbix properties
*/
private ZabbixProperties zabbix;

public EtcdProperties getEtcd() {
return etcd;
}
Expand All @@ -80,13 +85,21 @@ public void setEtcd(EtcdProperties etcd) {
this.etcd = etcd;
}

public ZabbixProperties getZabbix() {
return zabbix;
}

public void setZabbix(ZabbixProperties zabbix) {
this.zabbix = zabbix;
}

public static class EtcdProperties {

/**
* Whether etcd scheduling is started
* etcd调度是否启动
*/
private boolean enabled = true;
private boolean enabled = false;

/**
* etcd's connection endpoint url
Expand Down Expand Up @@ -194,6 +207,61 @@ public void setJobDir(String jobDir) {
this.jobDir = jobDir;
}
}

public static class ZabbixProperties {
/**
* Whether zabbix scheduling is started
* zabbix调度是否启动
*/
private boolean enabled = false;

/**
* zabbix server host
*/
private String host;

/**
* zabbix server port
*/
private Integer port = 10051;

/**
* zabbix agent host name
*/
private String agentHost = "ArgusDBM";

public boolean isEnabled() {
return enabled;
}

public void setEnabled(boolean enabled) {
this.enabled = enabled;
}

public String getHost() {
return host;
}

public void setHost(String host) {
this.host = host;
}

public Integer getPort() {
return port;
}

public void setPort(Integer port) {
this.port = port;
}

public String getAgentHost() {
return agentHost;
}

public void setAgentHost(String agentHost) {
this.agentHost = agentHost;
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
* 指标组采集
*
* @author tomsun28 from <a href="https://github.com/dromara/hertzbeat">hertzbeat</a>
* @date 2021/10/10 15:35
* @date 2023/03/06 13:17
*/
@Slf4j
@Data
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,85 @@
package com.zmops.open.collector.dispatch.entrance.zabbix.agent;

import com.zmops.open.collector.dispatch.entrance.internal.CollectJobService;
import com.zmops.open.collector.dispatch.entrance.zabbix.protocol.bean.ZabbixProtocolType;
import com.zmops.open.collector.dispatch.entrance.zabbix.protocol.bean.ZabbixResponse;
import com.zmops.open.common.entity.job.Configmap;
import com.zmops.open.common.entity.job.Job;
import com.zmops.open.common.entity.job.Metrics;
import com.zmops.open.common.service.AppDefineHouse;
import com.zmops.open.common.support.SpringContextHolder;
import com.zmops.open.common.util.GsonUtil;
import com.zmops.open.common.util.SnowFlakeIdGenerator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/**
* @author nantian Zabbix protocol type
*/
@Slf4j
public class TcpClientHandler extends SimpleChannelInboundHandler<ZabbixResponse> {

private CollectJobService collectJobService;
private Set<Long> runningJobs;

public TcpClientHandler() {
this.collectJobService = SpringContextHolder.getBean(CollectJobService.class);
this.runningJobs = new HashSet<>(8);
}

@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ZabbixResponse response) throws Exception {

if (response != null && response.getType() == ZabbixProtocolType.ACTIVE_CHECKS
&& response.getActiveChecks() != null) {
runningJobs.forEach(jobId -> {
collectJobService.cancelAsyncCollectJob(jobId);
});
runningJobs.clear();
for (ZabbixResponse.ActiveChecks metric : response.getActiveChecks()) {
try {
Map<String, String> paramsMap = metric.getParamsMap();
if (paramsMap.isEmpty()) {
continue;
}
// 构造采集任务Job实体
Job appDefine = AppDefineHouse.getAppDefine(paramsMap.get(ZabbixResponse.APP));
appDefine = GsonUtil.fromJson(GsonUtil.toJson(appDefine), Job.class);
long jobId = SnowFlakeIdGenerator.generateId();
appDefine.setId(jobId);
// set itemId 10000 - 99999 in monitorId
appDefine.setMonitorId(metric.getItemid());
appDefine.setInterval(30);
appDefine.setCyclic(true);
appDefine.setTimestamp(System.currentTimeMillis());
List<Configmap> configmaps = appDefine.getConfigmap().stream().peek(config -> {
String value = paramsMap.get(config.getKey());
config.setValue(value);
}).collect(Collectors.toList());
appDefine.setConfigmap(configmaps);
// filter metric
List<Metrics> metrics = appDefine.getMetrics().stream().filter(item ->
item.getName().equals(paramsMap.get(ZabbixResponse.METRICS)))
.peek(item -> item.setPriority((byte) 0))
.collect(Collectors.toList());
if (metrics.isEmpty()) {
continue;
}
appDefine.setMetrics(metrics);
// 下发采集任务
collectJobService.addAsyncCollectJob(appDefine);
runningJobs.add(jobId);
} catch (Exception e) {
log.error("add zabbix monitor job error {}", e.getMessage(), e);
}
}
}
}

@Override
Expand Down

This file was deleted.

Loading

0 comments on commit e393010

Please sign in to comment.