From baba125d77254d3e8e67af16df06b369d596e8a5 Mon Sep 17 00:00:00 2001 From: x1244 Date: Mon, 1 Jan 2024 21:25:43 +0800 Subject: [PATCH] Add UDP group add UDP group IPV4 & IPV6 join group, leave group. --- README-CN.md | 1 + README.md | 1 + base/hsocket.h | 25 ++++++ evpp/README.md | 4 +- evpp/UdpGroup.h | 170 +++++++++++++++++++++++++++++++++++++ evpp/UdpGroupDest_test.cpp | 57 +++++++++++++ 6 files changed, 256 insertions(+), 2 deletions(-) create mode 100644 evpp/UdpGroup.h create mode 100644 evpp/UdpGroupDest_test.cpp diff --git a/README-CN.md b/README-CN.md index 74953ebaf..bcbf98a61 100644 --- a/README-CN.md +++ b/README-CN.md @@ -420,6 +420,7 @@ int main(int argc, char** argv) { - TCP客户端: [evpp/TcpClient_test.cpp](evpp/TcpClient_test.cpp) - UDP服务端: [evpp/UdpServer_test.cpp](evpp/UdpServer_test.cpp) - UDP客户端: [evpp/UdpClient_test.cpp](evpp/UdpClient_test.cpp) +- UDP组播接收:[evpp/UdpGroupDest_test.cpp](evpp/UdpGroupDest_test.cpp) - HTTP服务端: [examples/http_server_test.cpp](examples/http_server_test.cpp) - HTTP客户端: [examples/http_client_test.cpp](examples/http_client_test.cpp) - WebSocket服务端: [examples/websocket_server_test.cpp](examples/websocket_server_test.cpp) diff --git a/README.md b/README.md index 8b0f8a51b..6c0494a44 100644 --- a/README.md +++ b/README.md @@ -360,6 +360,7 @@ int main(int argc, char** argv) { - [evpp/TcpClient_test.cpp](evpp/TcpClient_test.cpp) - [evpp/UdpServer_test.cpp](evpp/UdpServer_test.cpp) - [evpp/UdpClient_test.cpp](evpp/UdpClient_test.cpp) +- [evpp/UdpGroupDest_test.cpp](evpp/UdpGroupDest_test.cpp) - [examples/http_server_test.cpp](examples/http_server_test.cpp) - [examples/http_client_test.cpp](examples/http_client_test.cpp) - [examples/websocket_server_test.cpp](examples/websocket_server_test.cpp) diff --git a/base/hsocket.h b/base/hsocket.h index d8a5158aa..79cbc57d5 100644 --- a/base/hsocket.h +++ b/base/hsocket.h @@ -199,6 +199,31 @@ HV_INLINE int udp_broadcast(int sockfd, int on DEFAULT(1)) { return setsockopt(sockfd, SOL_SOCKET, SO_BROADCAST, (const char*)&on, sizeof(int)); } +HV_INLINE int udp_joingroupv4(int sockfd, const char* group, const char* local_host) { + struct ip_mreq mreq; + mreq.imr_multiaddr.s_addr = inet_addr(group); + mreq.imr_interface.s_addr = inet_addr(local_host); + return setsockopt(sockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (const char*)&mreq, sizeof(mreq)); +} +HV_INLINE int udp_leavegroupv4(int sockfd, const char* group, const char* local_host) { + struct ip_mreq mreq; + mreq.imr_multiaddr.s_addr = inet_addr(group); + mreq.imr_interface.s_addr = inet_addr(local_host); + return setsockopt(sockfd, IPPROTO_IP, IP_DROP_MEMBERSHIP, (const char*)&mreq, sizeof(mreq)); +} +HV_INLINE int udp_joingroupv6(int sockfd, const char* group, ULONG interface) { + struct ipv6_mreq mreq; + mreq.ipv6mr_interface = interface; + inet_pton(AF_INET6, group, &mreq.ipv6mr_multiaddr); + return setsockopt(sockfd, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, (const char*)&mreq, sizeof(mreq)); +} +HV_INLINE int udp_leavegroupv6(int sockfd, const char* group, ULONG interface) { + struct ipv6_mreq mreq; + mreq.ipv6mr_interface = interface; + inet_pton(AF_INET6, group, &mreq.ipv6mr_multiaddr); + return setsockopt(sockfd, IPPROTO_IPV6, IPV6_DROP_MEMBERSHIP, (const char*)&mreq, sizeof(mreq)); +} + HV_INLINE int ip_v6only(int sockfd, int on DEFAULT(1)) { #ifdef IPV6_V6ONLY return setsockopt(sockfd, IPPROTO_IPV6, IPV6_V6ONLY, (const char*)&on, sizeof(int)); diff --git a/evpp/README.md b/evpp/README.md index cac9ad935..d12eb85a8 100644 --- a/evpp/README.md +++ b/evpp/README.md @@ -19,6 +19,6 @@ hloop.h中的c接口被封装成了c++的类,参考了muduo和evpp。 ├── TcpClient.h TCP客户端类 ├── TcpServer.h TCP服务端类 ├── UdpClient.h UDP客户端类 -└── UdpServer.h UDP服务端类 - +├── UdpServer.h UDP服务端类 +└── UdpGroup.h UDP组播类 ``` diff --git a/evpp/UdpGroup.h b/evpp/UdpGroup.h new file mode 100644 index 000000000..3de29515a --- /dev/null +++ b/evpp/UdpGroup.h @@ -0,0 +1,170 @@ +#ifndef HV_UDP_GROUP_HPP_ +#define HV_UDP_GROUP_HPP_ +#include "hsocket.h" + +#include "EventLoopThreadPool.h" +#include "Channel.h" + +namespace hv { + +template +class UdpGroupEventLoopTmpl { +public: + typedef std::shared_ptr TSocketChannelPtr; + + UdpGroupEventLoopTmpl(EventLoopPtr loop = NULL) { + loop_ = loop ? loop : std::make_shared(); + port = 0; +#if WITH_KCP + kcp_setting = NULL; +#endif + } + + virtual ~UdpGroupEventLoopTmpl() { +#if WITH_KCP + HV_FREE(kcp_setting); +#endif + } + + const EventLoopPtr& loop() { + return loop_; + } + + //use createsocket() for server OR use createsocketRemote() for client + //@retval >=0 bindfd, <0 error + int createsocket(int port, const char* host = "0.0.0.0") { + hio_t* io = hloop_create_udp_server(loop_->loop(), host, port); + if (io == NULL) return -1; + this->host = host; + this->port = port; + channel = std::make_shared(io); + return channel->fd(); + } + + // join group + int joinGroup(const char* g){ + if(channel == NULL || channel->isClosed()){ + return -1; + } + return udp_joingroupv4(channel->fd(), g, host.c_str()); + } + // leave group + int leaveGroup(const char* g){ + if(channel == NULL || channel->isClosed()){ + return -1; + } + return udp_leavegroupv4(channel->fd(), g, host.c_str()); + } + + // closesocket thread-safe + void closesocket() { + if (channel) { + channel->close(true); + } + } + + int startRecv() { + if (channel == NULL || channel->isClosed()) { + return -1; + } + channel->onread = [this](Buffer* buf) { + if (onMessage) { + onMessage(channel, buf); + } + }; +#if WITH_KCP + if (kcp_setting) { + hio_set_kcp(channel->io(), kcp_setting); + } +#endif + return channel->startRead(); + } + + int stopRecv() { + if (channel == NULL) return -1; + return channel->stopRead(); + } + + // start thread-safe + void start() { + loop_->runInLoop(std::bind(&UdpGroupEventLoopTmpl::startRecv, this)); + } + +#if WITH_KCP + void setKcp(kcp_setting_t* setting) { + if (setting == NULL) { + HV_FREE(kcp_setting); + return; + } + if (kcp_setting == NULL) { + HV_ALLOC_SIZEOF(kcp_setting); + } + *kcp_setting = *setting; + } +#endif + +public: + std::string host; + int port; + TSocketChannelPtr channel; +#if WITH_KCP + kcp_setting_t* kcp_setting; +#endif + // Callback + std::function onMessage; +private: + EventLoopPtr loop_; +}; + +template +class UdpGroupTmpl : private EventLoopThread, public UdpGroupEventLoopTmpl { +public: + UdpGroupTmpl(EventLoopPtr loop = NULL) + : EventLoopThread(loop) + , UdpGroupEventLoopTmpl(EventLoopThread::loop()) + , is_loop_owner(loop == NULL) + {} + virtual ~UdpGroupTmpl() { + stop(true); + } + + const EventLoopPtr& loop() { + return EventLoopThread::loop(); + } + + // join group + int joinGroup(const char* g){ + return UdpGroupEventLoopTmpl::joinGroup(g); + } + + // leave group + int leaveGroup(const char* g){ + return UdpGroupEventLoopTmpl::leaveGroup(g); + } + + // start thread-safe + void start(bool wait_threads_started = true) { + if (isRunning()) { + UdpGroupEventLoopTmpl::start(); + } else { + EventLoopThread::start(wait_threads_started, std::bind(&UdpGroupTmpl::startRecv, this)); + } + } + + // stop thread-safe + void stop(bool wait_threads_stopped = true) { + UdpGroupEventLoopTmpl::closesocket(); + if (is_loop_owner) { + EventLoopThread::stop(wait_threads_stopped); + } + } + +private: + bool is_loop_owner; +}; + +typedef UdpGroupTmpl UdpGroup; + +} + +#endif // HV_UDP_GROUP_HPP_ diff --git a/evpp/UdpGroupDest_test.cpp b/evpp/UdpGroupDest_test.cpp new file mode 100644 index 000000000..39cc5aa75 --- /dev/null +++ b/evpp/UdpGroupDest_test.cpp @@ -0,0 +1,57 @@ +/* + * UdpGroupDest_test.cpp + * + * @build make evpp + * @client bin/UdpClient_test 9997 230.1.1.25 + * @server bin/UdpGroupDest_test 9997 230.1.1.25 + * + */ + +#include + +#include "UdpGroup.h" + +using namespace hv; + +int main(int argc, char* argv[]) { + if (argc < 2) { + printf("Usage: %s port\n", argv[0]); + return -10; + } + int port = atoi(argv[1]); + const char* group = "230.1.1.25"; + if (argc > 2) { + group = argv[2]; + } + + UdpGroup ug; + int bindfd = ug.createsocket(port); + if (bindfd < 0) { + return -20; + } + int rst = ug.joinGroup(group); + printf("udp group bind on port %d, bindfd=%d rst=%d ...\n", port, bindfd, rst); + ug.onMessage = [](const SocketChannelPtr& channel, Buffer* buf) { + // echo + printf(" >>> %.*s\n", (int)buf->size(), (char*)buf->data()); + }; + ug.start(); + + std::string str; + while (std::getline(std::cin, str)) { + if (str == "close") { + ug.closesocket(); + break; + } else if (str == "leave") { + int c = ug.leaveGroup(group); + printf("leave group rst=%d\n", c); + } else if (str == "join") { + int c = ug.joinGroup(group); + printf("join group rst=%d\n", c); + } else { + break; + } + } + + return 0; +}