Skip to content

Commit

Permalink
Add UDP group
Browse files Browse the repository at this point in the history
add UDP group IPV4 & IPV6 join group, leave group.
  • Loading branch information
x1244 committed Jan 1, 2024
1 parent 7cdb4ec commit baba125
Show file tree
Hide file tree
Showing 6 changed files with 256 additions and 2 deletions.
1 change: 1 addition & 0 deletions README-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,7 @@ int main(int argc, char** argv) {
- TCP客户端: [evpp/TcpClient_test.cpp](evpp/TcpClient_test.cpp)
- UDP服务端: [evpp/UdpServer_test.cpp](evpp/UdpServer_test.cpp)
- UDP客户端: [evpp/UdpClient_test.cpp](evpp/UdpClient_test.cpp)
- UDP组播接收:[evpp/UdpGroupDest_test.cpp](evpp/UdpGroupDest_test.cpp)
- HTTP服务端: [examples/http_server_test.cpp](examples/http_server_test.cpp)
- HTTP客户端: [examples/http_client_test.cpp](examples/http_client_test.cpp)
- WebSocket服务端: [examples/websocket_server_test.cpp](examples/websocket_server_test.cpp)
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ int main(int argc, char** argv) {
- [evpp/TcpClient_test.cpp](evpp/TcpClient_test.cpp)
- [evpp/UdpServer_test.cpp](evpp/UdpServer_test.cpp)
- [evpp/UdpClient_test.cpp](evpp/UdpClient_test.cpp)
- [evpp/UdpGroupDest_test.cpp](evpp/UdpGroupDest_test.cpp)
- [examples/http_server_test.cpp](examples/http_server_test.cpp)
- [examples/http_client_test.cpp](examples/http_client_test.cpp)
- [examples/websocket_server_test.cpp](examples/websocket_server_test.cpp)
Expand Down
25 changes: 25 additions & 0 deletions base/hsocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,31 @@ HV_INLINE int udp_broadcast(int sockfd, int on DEFAULT(1)) {
return setsockopt(sockfd, SOL_SOCKET, SO_BROADCAST, (const char*)&on, sizeof(int));
}

HV_INLINE int udp_joingroupv4(int sockfd, const char* group, const char* local_host) {
struct ip_mreq mreq;
mreq.imr_multiaddr.s_addr = inet_addr(group);
mreq.imr_interface.s_addr = inet_addr(local_host);
return setsockopt(sockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (const char*)&mreq, sizeof(mreq));
}
HV_INLINE int udp_leavegroupv4(int sockfd, const char* group, const char* local_host) {
struct ip_mreq mreq;
mreq.imr_multiaddr.s_addr = inet_addr(group);
mreq.imr_interface.s_addr = inet_addr(local_host);
return setsockopt(sockfd, IPPROTO_IP, IP_DROP_MEMBERSHIP, (const char*)&mreq, sizeof(mreq));
}
HV_INLINE int udp_joingroupv6(int sockfd, const char* group, ULONG interface) {
struct ipv6_mreq mreq;
mreq.ipv6mr_interface = interface;
inet_pton(AF_INET6, group, &mreq.ipv6mr_multiaddr);
return setsockopt(sockfd, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, (const char*)&mreq, sizeof(mreq));
}
HV_INLINE int udp_leavegroupv6(int sockfd, const char* group, ULONG interface) {
struct ipv6_mreq mreq;
mreq.ipv6mr_interface = interface;
inet_pton(AF_INET6, group, &mreq.ipv6mr_multiaddr);
return setsockopt(sockfd, IPPROTO_IPV6, IPV6_DROP_MEMBERSHIP, (const char*)&mreq, sizeof(mreq));
}

HV_INLINE int ip_v6only(int sockfd, int on DEFAULT(1)) {
#ifdef IPV6_V6ONLY
return setsockopt(sockfd, IPPROTO_IPV6, IPV6_V6ONLY, (const char*)&on, sizeof(int));
Expand Down
4 changes: 2 additions & 2 deletions evpp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@ hloop.h中的c接口被封装成了c++的类,参考了muduo和evpp。
├── TcpClient.h TCP客户端类
├── TcpServer.h TCP服务端类
├── UdpClient.h UDP客户端类
── UdpServer.h UDP服务端类
── UdpServer.h UDP服务端类
└── UdpGroup.h UDP组播类
```
170 changes: 170 additions & 0 deletions evpp/UdpGroup.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
#ifndef HV_UDP_GROUP_HPP_
#define HV_UDP_GROUP_HPP_
#include "hsocket.h"

#include "EventLoopThreadPool.h"
#include "Channel.h"

namespace hv {

template<class TSocketChannel = SocketChannel>
class UdpGroupEventLoopTmpl {
public:
typedef std::shared_ptr<TSocketChannel> TSocketChannelPtr;

UdpGroupEventLoopTmpl(EventLoopPtr loop = NULL) {
loop_ = loop ? loop : std::make_shared<EventLoop>();
port = 0;
#if WITH_KCP
kcp_setting = NULL;
#endif
}

virtual ~UdpGroupEventLoopTmpl() {
#if WITH_KCP
HV_FREE(kcp_setting);
#endif
}

const EventLoopPtr& loop() {
return loop_;
}

//use createsocket() for server OR use createsocketRemote() for client
//@retval >=0 bindfd, <0 error
int createsocket(int port, const char* host = "0.0.0.0") {
hio_t* io = hloop_create_udp_server(loop_->loop(), host, port);
if (io == NULL) return -1;
this->host = host;
this->port = port;
channel = std::make_shared<TSocketChannel>(io);
return channel->fd();
}

// join group
int joinGroup(const char* g){
if(channel == NULL || channel->isClosed()){
return -1;
}
return udp_joingroupv4(channel->fd(), g, host.c_str());
}
// leave group
int leaveGroup(const char* g){
if(channel == NULL || channel->isClosed()){
return -1;
}
return udp_leavegroupv4(channel->fd(), g, host.c_str());
}

// closesocket thread-safe
void closesocket() {
if (channel) {
channel->close(true);
}
}

int startRecv() {
if (channel == NULL || channel->isClosed()) {
return -1;
}
channel->onread = [this](Buffer* buf) {
if (onMessage) {
onMessage(channel, buf);
}
};
#if WITH_KCP
if (kcp_setting) {
hio_set_kcp(channel->io(), kcp_setting);
}
#endif
return channel->startRead();
}

int stopRecv() {
if (channel == NULL) return -1;
return channel->stopRead();
}

// start thread-safe
void start() {
loop_->runInLoop(std::bind(&UdpGroupEventLoopTmpl::startRecv, this));
}

#if WITH_KCP
void setKcp(kcp_setting_t* setting) {
if (setting == NULL) {
HV_FREE(kcp_setting);
return;
}
if (kcp_setting == NULL) {
HV_ALLOC_SIZEOF(kcp_setting);
}
*kcp_setting = *setting;
}
#endif

public:
std::string host;
int port;
TSocketChannelPtr channel;
#if WITH_KCP
kcp_setting_t* kcp_setting;
#endif
// Callback
std::function<void(const TSocketChannelPtr&, Buffer*)> onMessage;
private:
EventLoopPtr loop_;
};

template<class TSocketChannel = SocketChannel>
class UdpGroupTmpl : private EventLoopThread, public UdpGroupEventLoopTmpl<TSocketChannel> {
public:
UdpGroupTmpl(EventLoopPtr loop = NULL)
: EventLoopThread(loop)
, UdpGroupEventLoopTmpl<TSocketChannel>(EventLoopThread::loop())
, is_loop_owner(loop == NULL)
{}
virtual ~UdpGroupTmpl() {
stop(true);
}

const EventLoopPtr& loop() {
return EventLoopThread::loop();
}

// join group
int joinGroup(const char* g){
return UdpGroupEventLoopTmpl<TSocketChannel>::joinGroup(g);
}

// leave group
int leaveGroup(const char* g){
return UdpGroupEventLoopTmpl<TSocketChannel>::leaveGroup(g);
}

// start thread-safe
void start(bool wait_threads_started = true) {
if (isRunning()) {
UdpGroupEventLoopTmpl<TSocketChannel>::start();
} else {
EventLoopThread::start(wait_threads_started, std::bind(&UdpGroupTmpl::startRecv, this));
}
}

// stop thread-safe
void stop(bool wait_threads_stopped = true) {
UdpGroupEventLoopTmpl<TSocketChannel>::closesocket();
if (is_loop_owner) {
EventLoopThread::stop(wait_threads_stopped);
}
}

private:
bool is_loop_owner;
};

typedef UdpGroupTmpl<SocketChannel> UdpGroup;

}

#endif // HV_UDP_GROUP_HPP_
57 changes: 57 additions & 0 deletions evpp/UdpGroupDest_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* UdpGroupDest_test.cpp
*
* @build make evpp
* @client bin/UdpClient_test 9997 230.1.1.25
* @server bin/UdpGroupDest_test 9997 230.1.1.25
*
*/

#include <iostream>

#include "UdpGroup.h"

using namespace hv;

int main(int argc, char* argv[]) {
if (argc < 2) {
printf("Usage: %s port\n", argv[0]);
return -10;
}
int port = atoi(argv[1]);
const char* group = "230.1.1.25";
if (argc > 2) {
group = argv[2];
}

UdpGroup ug;
int bindfd = ug.createsocket(port);
if (bindfd < 0) {
return -20;
}
int rst = ug.joinGroup(group);
printf("udp group bind on port %d, bindfd=%d rst=%d ...\n", port, bindfd, rst);
ug.onMessage = [](const SocketChannelPtr& channel, Buffer* buf) {
// echo
printf(" >>> %.*s\n", (int)buf->size(), (char*)buf->data());
};
ug.start();

std::string str;
while (std::getline(std::cin, str)) {
if (str == "close") {
ug.closesocket();
break;
} else if (str == "leave") {
int c = ug.leaveGroup(group);
printf("leave group rst=%d\n", c);
} else if (str == "join") {
int c = ug.joinGroup(group);
printf("join group rst=%d\n", c);
} else {
break;
}
}

return 0;
}

0 comments on commit baba125

Please sign in to comment.