Skip to content

Commit

Permalink
Merge branch 'master' of github.com:sippy/rtpproxy
Browse files Browse the repository at this point in the history
  • Loading branch information
sobomax committed Nov 11, 2020
2 parents e905936 + a58ac65 commit 3a82dd9
Showing 1 changed file with 139 additions and 28 deletions.
167 changes: 139 additions & 28 deletions pertools/udp_storm/udp_storm.c
Original file line number Diff line number Diff line change
@@ -1,25 +1,41 @@
#define _GNU_SOURCE

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <assert.h>
#include <errno.h>
#include <err.h>
#include <fcntl.h>
#include <netdb.h>
#include <pthread.h>
#include <stdatomic.h>
#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#include <elperiodic.h>

#if !defined(SA_LEN)
#define SA_LEN(sa) \
(((sa)->sa_family == AF_INET) ? \
sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6))
#endif

struct sender_arg {
char *host;
int port;
int s;
const _Atomic(double) *prp;
char *sendbuf;
int sendlen;
_Atomic(uint64_t) *top;
};

struct receiver_arg {
int s;
const _Atomic(double) *prp;
_Atomic(uint64_t) *tup;
};

int
Expand All @@ -46,43 +62,91 @@ resolve(struct sockaddr *ia, int pf, const char *host,

void sender(void *prt)
{
int s, n;
struct sockaddr ia;
int n;
useconds_t sleeptime;
struct sender_arg *sender_arg;
char *port;
double prate;

{int b=0; while (b);}
sender_arg = (struct sender_arg *)prt;

asprintf(&port, "%d", sender_arg->port);
n = resolve(&ia, AF_INET, sender_arg->host, port, AI_PASSIVE);

s = socket(AF_INET, SOCK_DGRAM, 0);
connect(s, &ia, SA_LEN(&ia));
prate = atomic_load(sender_arg->prp);
sleeptime = (double)1e+6 / prate;

for (;;) {
n = send(s, sender_arg->sendbuf, sender_arg->sendlen, 0);
n = send(sender_arg->s, sender_arg->sendbuf, sender_arg->sendlen, 0);
if (n > 0)
atomic_fetch_add(sender_arg->top, 1);
/*printf("send: %d\n", n);*/
usleep(10000);
usleep(sleeptime);
}
}

#define RECV_UDP_MAXLEN (10*1024)
#define RECV_UDP_MAXPKTS 16

void receiver(void *prt)
{
int n, alen;
struct receiver_arg *receiver_arg;
struct rdata {
struct mmsghdr mhdr;
struct iovec miov;
char rbuf[1024];
};
struct mmsghdr *mstart;
struct iovec *iovecs;
char *rbstart;

alen = sizeof(struct rdata) * RECV_UDP_MAXPKTS;
mstart = malloc(alen);
memset(mstart, '\0', alen);
iovecs = (void *)((char *)mstart +
(offsetof(struct rdata, miov) * RECV_UDP_MAXPKTS));
rbstart = (char *)mstart +
(offsetof(struct rdata, rbuf) * RECV_UDP_MAXPKTS);

for (int i = 0; i < RECV_UDP_MAXPKTS; i++) {
iovecs[i].iov_base = rbstart;
iovecs[i].iov_len = 1024;
mstart[i].msg_hdr.msg_iov = &iovecs[i];
mstart[i].msg_hdr.msg_iovlen = 1;
rbstart += 1024;
}

receiver_arg = (struct receiver_arg *)prt;

for (;;) {
n = recvmmsg(receiver_arg->s, mstart, RECV_UDP_MAXPKTS,
MSG_WAITFORONE, NULL);
if (n > 0) {
//assert(n < 6);
atomic_fetch_add(receiver_arg->tup, n);
}
}
}

int main(int argc, char **argv)
{
int min_port, max_port, nthreads;
pthread_t thread;
struct sender_arg sender_arg, *sa;
const char *host;
struct sender_arg sender_arg;
void *thread_ret;
char ch, *datafile;
char sendbuf[88], databuf[1024 * 8];
FILE *f;
_Atomic(uint64_t) totalout = ATOMIC_VAR_INIT(0);
_Atomic(uint64_t) totalin = ATOMIC_VAR_INIT(0);
void *elp;
_Atomic(double) prate = ATOMIC_VAR_INIT(100.0);
double pr;

min_port = 6000;
max_port = 7000;
sender_arg.host = "1.2.3.4";
host = "1.2.3.4";
datafile = NULL;
nthreads = -1;
while ((ch = getopt(argc, argv, "p:P:h:f:t:")) != -1)
while ((ch = getopt(argc, argv, "p:P:h:f:t:r:")) != -1)
switch (ch) {
case 'p':
min_port = atoi(optarg);
Expand All @@ -93,7 +157,7 @@ int main(int argc, char **argv)
break;

case 'h':
sender_arg.host = optarg;
host = optarg;
break;

case 'f':
Expand All @@ -103,6 +167,12 @@ int main(int argc, char **argv)
case 't':
nthreads = atoi(optarg);
break;

case 'r':
pr = strtod(optarg, NULL);
assert(pr > 0.0);
atomic_store(&prate, pr);
break;
}

if (nthreads <= 0) {
Expand All @@ -125,19 +195,60 @@ int main(int argc, char **argv)
sender_arg.sendbuf = databuf;
fclose(f);
}
elp = prdic_init(1.0, 0.0);
if (elp == NULL) {
errx(1, "prdic_init() failed");
}

int n;
char *cport;
struct sockaddr ia;

asprintf(&cport, "%d", 5060);
n = resolve(&ia, AF_INET, host, cport, AI_PASSIVE);
if (n != 0) {
errx(1, "resolve() failed");
}

pthread_t rthreads[nthreads], sthreads[nthreads];
for (int port = min_port, i = 0; i < nthreads; i++) {
struct {
struct sender_arg snd;
struct receiver_arg rcv;
} *sa;

for (sender_arg.port = min_port; nthreads > 0; nthreads--) {
sa = malloc(sizeof(*sa));
sa->host = sender_arg.host;
sa->port = sender_arg.port;
sa->sendbuf = sender_arg.sendbuf;
sa->sendlen = sender_arg.sendlen;
pthread_create(&thread, NULL, (void *(*)(void *))&sender, (void *)sa);
sender_arg.port++;
if (sender_arg.port > max_port)
sender_arg.port = min_port;
sa->snd.sendbuf = sender_arg.sendbuf;
sa->snd.sendlen = sender_arg.sendlen;
sa->snd.top = &totalout;
sa->rcv.prp = sa->snd.prp = &prate;
sa->rcv.tup = &totalin;

sa->snd.s = socket(AF_INET, SOCK_DGRAM, 0);
connect(sa->snd.s, &ia, SA_LEN(&ia));
sa->rcv.s = sa->snd.s;

pthread_create(&sthreads[i], NULL, (void *(*)(void *))&sender,
(void *)&(sa->snd));
pthread_create(&rthreads[i], NULL, (void *(*)(void *))&receiver,
(void *)&(sa->rcv));
port++;
if (port > max_port)
port = min_port;
}
for (uint64_t iout = 0, iin = 0;;) {
uint64_t cout, cin;

cout = atomic_load(&totalout);
cin = atomic_load(&totalin);
fprintf(stdout, "out=%d in=%d\n", (int)(cout - iout), (int)(cin - iin));
prdic_procrastinate(elp);
iout = cout;
iin = cin;
}
for (int i = 0; i < nthreads; i++) {
pthread_join(rthreads[i], &thread_ret);
pthread_join(sthreads[i], &thread_ret);
}
pthread_join(thread, &thread_ret);
for(;;);
return (0);
}

0 comments on commit 3a82dd9

Please sign in to comment.