/
client.cpp
317 lines (270 loc) · 11.6 KB
/
client.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
#include <st.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <string.h>
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <getopt.h>
#include <linux/version.h>
// @see https://www.kernel.org/doc/html/latest/networking/msg_zerocopy.html#notification-reception
#include <sys/epoll.h>
// @see https://github.com/torvalds/linux/blob/master/tools/testing/selftests/net/msg_zerocopy.c
#include <linux/errqueue.h>
#ifndef SO_EE_ORIGIN_ZEROCOPY
#define SO_EE_ORIGIN_ZEROCOPY 5
#endif
#ifndef SO_ZEROCOPY
#define SO_ZEROCOPY 60
#endif
#ifndef SO_EE_CODE_ZEROCOPY_COPIED
#define SO_EE_CODE_ZEROCOPY_COPIED 1
#endif
#ifndef MSG_ZEROCOPY
#define MSG_ZEROCOPY 0x4000000
#endif
#include <netinet/udp.h>
// Define macro for UDP GSO.
// @see https://github.com/torvalds/linux/blob/master/tools/testing/selftests/net/udpgso.c
#ifndef UDP_SEGMENT
#define UDP_SEGMENT 103
#endif
void* receiver(void* arg)
{
st_netfd_t stfd = (st_netfd_t)arg;
for (;;) {
sockaddr_in peer;
memset(&peer, 0, sizeof(sockaddr_in));
char buf[1500];
memset(buf, 0, sizeof(buf));
iovec iov;
iov.iov_base = buf;
iov.iov_len = sizeof(buf);
msghdr msg;
memset(&msg, 0, sizeof(msghdr));
msg.msg_name = (sockaddr_in*)&peer;
msg.msg_namelen = sizeof(sockaddr_in);
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
int r0 = st_recvmsg(stfd, &msg, 0, ST_UTIME_NO_TIMEOUT);
assert(r0 > 0);
printf("Pong %s:%d %d bytes, flags %#x, %s\n", inet_ntoa(peer.sin_addr), ntohs(peer.sin_port), r0,
msg.msg_flags, msg.msg_iov->iov_base);
}
return NULL;
}
void parse_reception(st_netfd_t stfd, int nn_confirm)
{
int left = nn_confirm;
while (left > 0) {
msghdr msg;
memset(&msg, 0, sizeof(msghdr));
// Reception from kernel, @see https://www.kernel.org/doc/html/latest/networking/msg_zerocopy.html#notification-reception
// See do_recv_completion at https://github.com/torvalds/linux/blob/master/tools/testing/selftests/net/msg_zerocopy.c#L393
char control[100];
msg.msg_control = control;
msg.msg_controllen = sizeof(control);
// Note that the r0 is 0, the reception is in the control.
int r0 = st_recvmsg(stfd, &msg, MSG_ERRQUEUE, ST_UTIME_NO_TIMEOUT);
assert(r0 >= 0);
assert(msg.msg_flags == MSG_ERRQUEUE);
// Notification parsing, @see https://www.kernel.org/doc/html/latest/networking/msg_zerocopy.html#notification-parsing
cmsghdr* cm = CMSG_FIRSTHDR(&msg);
assert(cm->cmsg_level == SOL_IP || cm->cmsg_type == IP_RECVERR);
sock_extended_err* serr = (sock_extended_err*)(void*)CMSG_DATA(cm);
assert(serr->ee_errno == 0 && serr->ee_origin == SO_EE_ORIGIN_ZEROCOPY);
uint32_t hi = serr->ee_data;
uint32_t lo = serr->ee_info;
uint32_t range = hi - lo + 1;
left -= range;
printf("Reception %d bytes, flags %#x, cmsg(level %#x, type %#x), serr(errno %#x, origin %#x, code %#x), range %d [%d, %d]\n",
msg.msg_controllen, msg.msg_flags, cm->cmsg_level, cm->cmsg_type, serr->ee_errno, serr->ee_origin, serr->ee_code, range, lo, hi);
// Defered Copies, @see https://www.kernel.org/doc/html/latest/networking/msg_zerocopy.html#deferred-copies
if (serr->ee_code == SO_EE_CODE_ZEROCOPY_COPIED) {
printf("Warning: Defered copies, should stop zerocopy\n");
}
}
}
void usage(int argc, char** argv)
{
printf("Usage: %s <options>\n", argv[0]);
printf("Options:\n");
printf(" --help Print this help and exit.\n");
printf(" --host=string The host to send to.\n");
printf(" --port=int The port to send to.\n");
printf(" --pong=bool Whether response pong, true|false\n");
printf(" --zerocopy=bool Whether use zerocopy to sendmsg, true|false\n");
printf(" --copy=int The copies of message, 1 means sendmmsg(msg+msg)\n");
printf(" --loop=int The number of loop to send out messages\n");
printf(" --batch=bool Whether read reception by batch, true|false\n");
printf(" --mix=bool Whether mix msg with zerocopy and those without, true|false\n");
printf(" --size=int Each message size in bytes.\n");
printf(" --gso=int The GSO size in bytes, 0 to disable it.\n");
printf(" --iovs=int The number of iovs to send, at least 1.\n");
printf(" --sndbuf=int The SO_SNDBUF size in bytes, 0 to ignore.\n");
printf("For example:\n");
printf(" %s --host=127.0.0.1 --port=8000 --pong=true --zerocopy=true --copy=0 --loop=1 --batch=true --mix=true --size=1400 --gso=0 --iovs=1 --sndbuf=0\n", argv[0]);
}
int main(int argc, char** argv)
{
option longopts[] = {
{ "host", required_argument, NULL, 'o' },
{ "port", required_argument, NULL, 'p' },
{ "pong", required_argument, NULL, 'n' },
{ "zerocopy", required_argument, NULL, 'z' },
{ "copy", required_argument, NULL, 'c' },
{ "loop", required_argument, NULL, 'l' },
{ "batch", required_argument, NULL, 'b' },
{ "mix", required_argument, NULL, 'm' },
{ "size", required_argument, NULL, 's' },
{ "gso", required_argument, NULL, 'g' },
{ "iovs", required_argument, NULL, 'i' },
{ "sndbuf", required_argument, NULL, 'u' },
{ "help", no_argument, NULL, 'h' },
{ NULL, 0, NULL, 0 }
};
char* host = NULL; char ch;
int port = 0; int nn_copies = 0; int loop = 1; int size = 1500; int gso = 0; int nn_iovs = 0; int sndbuf = 0;
bool pong = false; bool zerocopy = false; bool batch = false; bool mix = false;
while ((ch = getopt_long(argc, argv, "o:p:n:z:c:l:b:m:s:g:u:h", longopts, NULL)) != -1) {
switch (ch) {
case 'o': host = (char*)optarg; break;
case 'p': port = atoi(optarg); break;
case 'n': pong = !strcmp(optarg,"true"); break;
case 'z': zerocopy = !strcmp(optarg,"true"); break;
case 'c': nn_copies = atoi(optarg); break;
case 'l': loop = atoi(optarg); break;
case 'b': batch = !strcmp(optarg,"true"); break;
case 'm': mix = !strcmp(optarg,"true"); break;
case 's': size = atoi(optarg); break;
case 'g': gso = atoi(optarg); break;
case 'i': nn_iovs = atoi(optarg); break;
case 'u': sndbuf = atoi(optarg); break;
case 'h': usage(argc, argv); exit(0);
default: usage(argc, argv); exit(-1);
}
}
printf("Server listen %s:%d, pong %d, zerocopy %d, copies %d, loop %d, batch %d, mix %d, size %d, gso %d, iovs %d, sndbuf %d\n",
host, port, pong, zerocopy, nn_copies, loop, batch, mix, size, gso, nn_iovs, sndbuf);
if (!host || !port || !nn_iovs) {
usage(argc, argv);
exit(-1);
}
assert(!st_set_eventsys(ST_EVENTSYS_ALT));
assert(!st_init());
int fd = socket(PF_INET, SOCK_DGRAM, 0);
assert(fd > 0);
// @see https://github.com/torvalds/linux/blob/master/tools/testing/selftests/net/msg_zerocopy.c
if (zerocopy) {
int one = 1;
int r0 = setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &one, sizeof(one));
// MSG_ZEROCOPY for UDP was added in commit b5947e5d1e71 ("udp: msg_zerocopy") in Linux 5.0.
// @see https://lore.kernel.org/netdev/CA+FuTSfBFqRViKfG5crEv8xLMgAkp3cZ+yeuELK5TVv61xT=Yw@mail.gmail.com/
#if LINUX_VERSION_CODE < KERNEL_VERSION(5,0,0)
if (r0 == -1) {
printf("MSG_ZEROCOPY should be kernel 5.0+, kernel %#x, errno=%d\n", LINUX_VERSION_CODE, 524);
exit(-1);
}
#endif
assert(!r0);
printf("epoll events EPOLLERR=%#x, EPOLLHUP=%#x\n", EPOLLERR, EPOLLHUP);
}
if (true) {
int dv = 0;
socklen_t len = sizeof(dv);
int r0 = getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &dv, &len);
int r1 = 0;
if (sndbuf > 0) {
r1 = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sndbuf, sizeof(sndbuf));
}
int nv = 0;
int r2 = getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &nv, &len);
printf("socket SO_SNDBUF default=%d, user=%d, now=%d, r0=%d, r1=%d, r2=%d\n", dv, sndbuf, nv, r0, r1, r2);
}
st_netfd_t stfd = st_netfd_open_socket(fd);
assert(stfd);
printf("Client fd=%d\n", fd);
if (pong) {
st_thread_t r0 = st_thread_create(receiver, stfd, 0, 0);
assert(r0);
}
sockaddr_in peer;
memset(&peer, 0, sizeof(sockaddr_in));
peer.sin_family = AF_INET;
peer.sin_port = htons(port);
peer.sin_addr.s_addr = inet_addr(host);
char* buf = new char[size];
memset(buf, 0, size);
memcpy(buf, "Hello", size < 5? size : 5);
iovec iov;
iov.iov_base = buf;
iov.iov_len = size;
int nn_confirm = 0;
for (int k = 0; k < loop; k++) {
msghdr msg;
memset(&msg, 0, sizeof(msghdr));
msg.msg_name = (sockaddr_in*)&peer;
msg.msg_namelen = sizeof(sockaddr_in);
msg.msg_iov = new iovec[nn_iovs];
msg.msg_iovlen = nn_iovs;
for (int i = 0; i < nn_iovs; i++) {
iovec* p = msg.msg_iov + i;
memcpy(p, &iov, sizeof(iovec));
}
if (gso > 0) {
msg.msg_controllen = CMSG_SPACE(sizeof(uint16_t));
if (!msg.msg_control) {
msg.msg_control = new char[msg.msg_controllen];
}
cmsghdr* cm = CMSG_FIRSTHDR(&msg);
cm->cmsg_level = SOL_UDP;
cm->cmsg_type = UDP_SEGMENT;
cm->cmsg_len = CMSG_LEN(sizeof(uint16_t));
*((uint16_t*)CMSG_DATA(cm)) = gso;
}
int r0;
if (nn_copies == 0) {
if (zerocopy) {
r0 = st_sendmsg(stfd, &msg, MSG_ZEROCOPY, ST_UTIME_NO_TIMEOUT);
} else {
r0 = st_sendmsg(stfd, &msg, 0, ST_UTIME_NO_TIMEOUT);
}
} else {
mmsghdr* hdrs = new mmsghdr[nn_copies + 1];
for (int i = 0; i < nn_copies + 1; i++) {
mmsghdr* p = hdrs + i;
memcpy(&p->msg_hdr, &msg, sizeof(msghdr));
p->msg_len = 0;
}
// The sendmmsg is removed by https://github.com/ossrs/srs/commit/34dae0fe0ddf2e95353ca8cbdc799f4abf96aead
if (zerocopy) {
r0 = st_sendmmsg(stfd, hdrs, nn_copies + 1, MSG_ZEROCOPY, ST_UTIME_NO_TIMEOUT);
} else {
r0 = st_sendmmsg(stfd, hdrs, nn_copies + 1, 0, ST_UTIME_NO_TIMEOUT);
}
}
if (r0 > 0) {
printf("Ping %s:%d %d bytes, control %d, copies=%d, r0=%d, %s\n", host, port, iov.iov_len * nn_iovs,
msg.msg_controllen, nn_copies, r0, msg.msg_iov->iov_base);
} else {
printf("Ping %d bytes, error r0=%d, errno=%d\n", iov.iov_len * nn_iovs, r0, errno); exit(1);
}
if (zerocopy && !batch) {
parse_reception(stfd, r0);
} else {
nn_confirm += r0;
}
if (mix) {
r0 = st_sendmsg(stfd, &msg, 0, ST_UTIME_NO_TIMEOUT);
assert(r0 > 0);
printf("Mix %s:%d %d bytes, r0=%d, %s\n", host, port, iov.iov_len * nn_iovs, r0, msg.msg_iov->iov_base);
}
}
// @see https://www.kernel.org/doc/html/latest/networking/msg_zerocopy.html#notification-batching
if (batch) {
parse_reception(stfd, nn_confirm);
}
st_sleep(-1);
return 0;
}