Skip to content

Commit

Permalink
add zabbix agent communications protocol (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomsun28 committed Feb 13, 2023
1 parent 0136eea commit 3766073
Show file tree
Hide file tree
Showing 13 changed files with 677 additions and 0 deletions.
6 changes: 6 additions & 0 deletions collector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@
<artifactId>json-path</artifactId>
<version>2.6.0</version>
</dependency>
<!-- netty all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.87.Final</version>
</dependency>
<!-- lru hashmap -->
<dependency>
<groupId>com.googlecode.concurrentlinkedhashmap</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.zmops.open.collector.dispatch.entrance.zabbix.agent;

import com.zmops.open.collector.dispatch.entrance.zabbix.protocol.ZabbixProtocolDataCodec;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

/**
* @author nantian Zabbix protocol type
*/
public class TcpClient {

private final String host;
private final int port;
private Channel channel;
private EventLoopGroup group;

public TcpClient(String host, int port) {
this.host = host;
this.port = port;
}

public void shutdown() throws InterruptedException {
channel.closeFuture().sync();
group.shutdownGracefully();

}

public void start() throws Exception {
group = new NioEventLoopGroup();

Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ZabbixProtocolDataCodec());
pipeline.addLast(new TcpClientHandler());
}
});

final ChannelFuture future = b.connect(host, port).sync();

future.addListener((ChannelFutureListener) arg0 -> {
if (future.isSuccess()) {
System.out.println("服务器连接成功...");
} else {
System.out.println("服务器连接失败!");
future.cause().printStackTrace();
group.shutdownGracefully();
}
});

this.channel = future.channel();
}

public Channel getChannel() {
return channel;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.zmops.open.collector.dispatch.entrance.zabbix.agent;

import com.zmops.open.collector.dispatch.entrance.zabbix.protocol.bean.ZabbixResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
* @author nantian Zabbix protocol type
*/
public class TcpClientHandler extends SimpleChannelInboundHandler<ZabbixResponse> {


@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ZabbixResponse response) throws Exception {

}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.zmops.open.collector.dispatch.entrance.zabbix.agent;


import com.zmops.open.collector.dispatch.entrance.zabbix.protocol.bean.ZabbixProtocolType;
import com.zmops.open.collector.dispatch.entrance.zabbix.protocol.bean.ZabbixRequest;
import io.netty.channel.Channel;

import java.util.Collections;
import java.util.Random;
import java.util.UUID;

/**
* @author nantian Zabbix protocol type
*/
public class ZabbixAgentClient {

public static void main(String[] args) throws Exception {

TcpClient zabbixAgent = new TcpClient("172.16.3.77", 10051);
zabbixAgent.start();

Channel channel = zabbixAgent.getChannel();


ZabbixRequest request = new ZabbixRequest();
request.setType(ZabbixProtocolType.ACTIVE_CHECKS);
request.setHost("ZabbixAgentTest");

channel.writeAndFlush(request);
zabbixAgent.shutdown();

Thread.sleep(2000);

// Zabbix Server 会主动断开,每次提交数据都需要重新创建连接
//==========================================================

TcpClient zabbixAgent2 = new TcpClient("172.16.3.77", 10051);
zabbixAgent2.start();

Channel channel2 = zabbixAgent2.getChannel();

ZabbixRequest requestData = new ZabbixRequest();
requestData.setType(ZabbixProtocolType.AGENT_DATA);
requestData.setHost("ZabbixAgentTest");
// agent 启动时生成, 32位
requestData.setSession(UUID.randomUUID().toString().replace("-", ""));

Random random = new Random();

ZabbixRequest.AgentData data = new ZabbixRequest.AgentData();
data.setItemid(43488);
data.setValue("" + random.nextInt(1000));
data.setClock(System.currentTimeMillis() / 1000);
data.setNs(76808644);

requestData.setAgentDataList(Collections.singletonList(data));

channel2.writeAndFlush(requestData);
zabbixAgent2.shutdown();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package com.zmops.open.collector.dispatch.entrance.zabbix.protocol;

/**
* Decode content is not the Zabbix protocol
* @author nantian Zabbix protocol type
*/
public class ZabbixErrorProtocolException extends Exception {

public ZabbixErrorProtocolException(final String message) {
super(message);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package com.zmops.open.collector.dispatch.entrance.zabbix.protocol;

import io.netty.channel.CombinedChannelDuplexHandler;

/**
* Zabbix protocol type
* @author nantian
*/
public class ZabbixProtocolDataCodec extends CombinedChannelDuplexHandler<ZabbixProtocolDecoder, ZabbixProtocolEncoder> {
public ZabbixProtocolDataCodec() {
init(new ZabbixProtocolDecoder(), new ZabbixProtocolEncoder());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package com.zmops.open.collector.dispatch.entrance.zabbix.protocol;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.zmops.open.collector.dispatch.entrance.zabbix.protocol.bean.ZabbixResponse;
import com.zmops.open.collector.dispatch.entrance.zabbix.protocol.bean.ZabbixResponseJsonDeserializer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.nio.charset.StandardCharsets;
import java.util.List;

/**
* @author nantian zabbix server response data protocol decoder
*/
public class ZabbixProtocolDecoder extends ByteToMessageDecoder {
private static final int HEADER_LEN = 9;
private static final byte[] PROTOCOL = new byte[]{'Z', 'B', 'X', 'D'};

private static final int HEADER_FLAG_LEN = 4;

private final Gson requestParser = new GsonBuilder()
.registerTypeAdapter(ZabbixResponse.class, new ZabbixResponseJsonDeserializer()).create();

@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list)
throws Exception {
try {
// Decode header and get payload
String payload = decodeToPayload(channelHandlerContext, byteBuf);
if (payload == null) {
return;
}

System.out.println("ZabbixServer 响应数据为:" + payload);

// Parse content and add to list
ZabbixResponse response = requestParser.fromJson(payload, ZabbixResponse.class);
list.add(response);
} catch (Exception e) {
errorProtocol(channelHandlerContext, byteBuf, "Parsing zabbix request data error", e);
}
}

/**
* Decode protocol to payload string
*/
public String decodeToPayload(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf)
throws InterruptedException, ZabbixErrorProtocolException {
int readable = byteBuf.readableBytes();
int baseIndex = byteBuf.readerIndex();
if (readable < HEADER_LEN) {
byteBuf.readerIndex(baseIndex);
return null;
}

// Read header
ByteBuf headerBuf = byteBuf.readSlice(HEADER_LEN);
if (headerBuf.getByte(0) != PROTOCOL[0] || headerBuf.getByte(1) != PROTOCOL[1]
|| headerBuf.getByte(2) != PROTOCOL[2] || headerBuf.getByte(3) != PROTOCOL[3]) {
throw new ZabbixErrorProtocolException("header is not right");
}

// Only support communications protocol
if (headerBuf.getByte(HEADER_FLAG_LEN) != 1) {
throw new ZabbixErrorProtocolException("header flags only support communications protocol");
}

// Check payload
int dataLength = headerBuf.getByte(5) & 0xFF
| (headerBuf.getByte(6) & 0xFF) << 8
| (headerBuf.getByte(7) & 0xFF) << 16
| (headerBuf.getByte(8) & 0xFF) << 24;
int totalLength = HEADER_LEN + dataLength + 4;
// If not receive all data, reset buffer and re-decode after content receive finish
if (readable < totalLength) {
byteBuf.readerIndex(baseIndex);
return null;
}

if (dataLength <= 0) {
throw new ZabbixErrorProtocolException("content could not be empty");
}

// Skip protocol extensions
byteBuf.skipBytes(4);

// Reading content
ByteBuf payload = byteBuf.readSlice(dataLength);

return payload.toString(StandardCharsets.UTF_8);
}

/**
* Close connection if protocol error
*/
protected void errorProtocol(ChannelHandlerContext context, ByteBuf byteBuf, String reason, Throwable ex) throws InterruptedException {
// Skip all content
byteBuf.skipBytes(byteBuf.readableBytes());
// Close connection
context.close();
}

}
Loading

0 comments on commit 3766073

Please sign in to comment.