Skip to content

Commit

Permalink
Push old code: busypolling and multiprocessing fix
Browse files Browse the repository at this point in the history
  • Loading branch information
blechschmidt committed Feb 13, 2018
1 parent 4d3f666 commit 4e66c1b
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 43 deletions.
150 changes: 109 additions & 41 deletions main.c
Expand Up @@ -33,6 +33,7 @@ void print_help()
fprintf(stderr, ""
"Usage: %s [options] [domainlist]\n"
" -b --bindto Bind to IP address and port. (Default: 0.0.0.0:0)\n"
" --busypoll Increase performance using busy polling instead of epoll.\n"
" -c --resolve-count Number of resolves for a name before giving up. (Default: 50)\n"
" --drop-user User to drop privileges to when running as root. (Default: nobody)\n"
" --flush Flush the output file whenever a response was received.\n"
Expand Down Expand Up @@ -122,20 +123,19 @@ void cleanup()

free(context.stat_messages);

free(context.sockets.pipes);

free(context.sockets.master_pipes_read);

free(context.lookup_pool.data);
free(context.lookup_space);

for (size_t i = 0; i < context.cmd_args.num_processes * 2; i++)
{
if(context.sockets.pipes && context.sockets.pipes[i] >= 0)
{
close(context.sockets.pipes[i]);
}
}
free(context.sockets.pipes);
free(context.sockets.master_pipes_read);
}

void log_msg(const char* format, ...)
Expand Down Expand Up @@ -297,7 +297,7 @@ buffer_t massdns_resolvers_from_file(char *filename)
void set_sndbuf(int fd)
{
if(context.cmd_args.sndbuf
&& setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &context.cmd_args.sndbuf, sizeof(context.cmd_args.sndbuf)) == 0)
&& setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &context.cmd_args.sndbuf, sizeof(context.cmd_args.sndbuf)) != 0)
{
log_msg("Failed to adjust send buffer size: %s\n", strerror(errno));
}
Expand All @@ -306,7 +306,7 @@ void set_sndbuf(int fd)
void set_rcvbuf(int fd)
{
if(context.cmd_args.rcvbuf
&& setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &context.cmd_args.rcvbuf, sizeof(context.cmd_args.rcvbuf)) == 0)
&& setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &context.cmd_args.rcvbuf, sizeof(context.cmd_args.rcvbuf)) != 0)
{
log_msg("Failed to adjust receive buffer size: %s\n", strerror(errno));
}
Expand Down Expand Up @@ -388,9 +388,18 @@ void query_sockets_setup()
bool next_query(char **qname)
{
static char line[512];
static size_t line_index = 0;

while (fgets(line, sizeof(line), context.domainfile))
{
if(line_index >= context.cmd_args.num_processes)
{
line_index = 0;
}
if (context.fork_index != line_index++)
{
continue;
}
trim_end(line);
if (strcmp(line, "") == 0)
{
Expand Down Expand Up @@ -444,7 +453,7 @@ char *canonicalized_name_copy(const char *qname)
void end_warmup()
{
context.state = STATE_QUERYING;
if(context.cmd_args.extreme <= 1)
if(context.cmd_args.extreme <= 1 && !context.cmd_args.busypoll)
{
// Reduce our CPU load from epoll interrupts by removing the EPOLLOUT event
#ifdef PCAP_SUPPORT
Expand Down Expand Up @@ -841,6 +850,7 @@ void lookup_done(lookup_t *lookup)
hashmapRemove(context.map, lookup->key);

// Return lookup to pool.
// According to ISO/IEC 9899:TC2 §6.7.2.1 (13), structs are not padded at the beginning
((lookup_key_t**)context.lookup_pool.data)[context.lookup_pool.len++] = lookup->key;


Expand Down Expand Up @@ -1365,6 +1375,11 @@ void setup_pipes()
context.sockets.master_pipes_read[i].type = SOCKET_TYPE_CONTROL;
context.sockets.master_pipes_read[i].data = (void*)i;

if(context.cmd_args.busypoll)
{
continue;
}

// Add all pipes the main process can read from to the epoll descriptor
struct epoll_event ev;
bzero(&ev, sizeof(ev));
Expand Down Expand Up @@ -1402,9 +1417,9 @@ void read_control_message(socket_info_t *socket_info)
{
size_t process = (size_t)socket_info->data;
ssize_t read_result = read(socket_info->descriptor, context.stat_messages + process, sizeof(stats_exchange_t));
if(read_result < sizeof(stats_exchange_t))
if(read_result > 0 && read_result < sizeof(stats_exchange_t))
{
log_msg("Atomic read failed %ld.\n", read_result);
log_msg("Atomic read failed: Read %ld bytes.\n", read_result);
}

}
Expand Down Expand Up @@ -1443,7 +1458,10 @@ void run()

init_pipes();
context.fork_index = split_process(context.cmd_args.num_processes);
context.epollfd = epoll_create(1);
if(!context.cmd_args.busypoll)
{
context.epollfd = epoll_create(1);
}
#ifdef PCAP_SUPPORT
if(context.cmd_args.use_pcap)
{
Expand Down Expand Up @@ -1490,6 +1508,13 @@ void run()
}
}

context.domainfile = fopen(context.cmd_args.domains, "r");
if (context.domainfile == NULL)
{
log_msg("Failed to open domain file \"%s\".\n", context.cmd_args.domains);
clean_exit(EXIT_FAILURE);
}

if(context.cmd_args.output == OUTPUT_BINARY)
{
binfile_write_head();
Expand All @@ -1504,50 +1529,81 @@ void run()

privilege_drop();

add_sockets(context.epollfd, socket_events, EPOLL_CTL_ADD, &context.sockets.interfaces4);
add_sockets(context.epollfd, socket_events, EPOLL_CTL_ADD, &context.sockets.interfaces6);
if(!context.cmd_args.busypoll)
{
add_sockets(context.epollfd, socket_events, EPOLL_CTL_ADD, &context.sockets.interfaces4);
add_sockets(context.epollfd, socket_events, EPOLL_CTL_ADD, &context.sockets.interfaces6);
}


clock_gettime(CLOCK_MONOTONIC, &context.stats.start_time);
check_progress();

while(context.state < STATE_DONE)
if(!context.cmd_args.busypoll)
{
int ready = epoll_wait(context.epollfd, pevents, sizeof(pevents) / sizeof(pevents[0]), 1);
if (ready < 0)
{
log_msg("Epoll failure: %s\n", strerror(errno));
}
else if(ready == 0) // Epoll timeout
while(context.state < STATE_DONE)
{
timed_ring_handle(&context.ring, ring_timeout);
}
else if (ready > 0)
{
for (int i = 0; i < ready; i++)

int ready = epoll_wait(context.epollfd, pevents, sizeof(pevents) / sizeof(pevents[0]), 1);
if (ready < 0)
{
socket_info_t *socket_info = pevents[i].data.ptr;
if ((pevents[i].events & EPOLLOUT) && socket_info->type == SOCKET_TYPE_QUERY)
{
can_send();
timed_ring_handle(&context.ring, ring_timeout);
}
if ((pevents[i].events & EPOLLIN) && socket_info->type == SOCKET_TYPE_QUERY)
log_msg("Epoll failure: %s\n", strerror(errno));
}
else if (ready == 0) // Epoll timeout
{
timed_ring_handle(&context.ring, ring_timeout);
}
else if (ready > 0)
{
for (int i = 0; i < ready; i++)
{
can_read(socket_info);
}
socket_info_t *socket_info = pevents[i].data.ptr;
if ((pevents[i].events & EPOLLOUT) && socket_info->type == SOCKET_TYPE_QUERY)
{
can_send();
timed_ring_handle(&context.ring, ring_timeout);
}
if ((pevents[i].events & EPOLLIN) && socket_info->type == SOCKET_TYPE_QUERY)
{
can_read(socket_info);
}
#ifdef PCAP_SUPPORT
else if((pevents[i].events & EPOLLIN) && socket_info == &context.pcap_info)
{
pcap_can_read();
}
else if((pevents[i].events & EPOLLIN) && socket_info == &context.pcap_info)
{
pcap_can_read();
}
#endif
else if((pevents[i].events & EPOLLIN) && socket_info->type == SOCKET_TYPE_CONTROL)
{
read_control_message(socket_info);
else if ((pevents[i].events & EPOLLIN) && socket_info->type == SOCKET_TYPE_CONTROL)
{
read_control_message(socket_info);
}
}
timed_ring_handle(&context.ring, ring_timeout);
}
}
}
else
{
while(context.state < STATE_DONE)
{
can_send();
for(size_t i = 0; i < context.sockets.interfaces4.len; i++)
{
can_read(((socket_info_t*)context.sockets.interfaces4.data) + i);
}
for(size_t i = 0; i < context.sockets.interfaces6.len; i++)
{
can_read(((socket_info_t*)context.sockets.interfaces4.data) + i);
}
timed_ring_handle(&context.ring, ring_timeout);

if(context.cmd_args.num_processes > 1 && context.fork_index == 0)
{
for (size_t i = 1; i < context.cmd_args.num_processes; i++)
{
read_control_message(context.sockets.master_pipes_read + i);
}
}
}
}
}
Expand Down Expand Up @@ -1605,6 +1661,10 @@ int parse_cmd(int argc, char **argv)
print_help();
clean_exit(EXIT_SUCCESS);
}
else if (strcmp(argv[i], "--busypoll") == 0)
{
context.cmd_args.busypoll = true;
}
else if (strcmp(argv[i], "--resolvers") == 0 || strcmp(argv[i], "-r") == 0)
{
if (context.cmd_args.resolvers == NULL)
Expand Down Expand Up @@ -1852,7 +1912,7 @@ int parse_cmd(int argc, char **argv)
{
// If we can seek through the domain file, we seek to the end and store the file size
// in order to be able to report an estimate progress of resolving.
context.domainfile = fopen(argv[i], "r");
context.domainfile = fopen(context.cmd_args.domains, "r");
if (context.domainfile == NULL)
{
log_msg("Failed to open domain file \"%s\".\n", argv[i]);
Expand All @@ -1872,6 +1932,7 @@ int parse_cmd(int argc, char **argv)
context.domainfile_size = -1;
}
}
fclose(context.domainfile);
}
}
else
Expand Down Expand Up @@ -1909,6 +1970,13 @@ int parse_cmd(int argc, char **argv)
clean_exit(EXIT_FAILURE);
}
}

if(context.domainfile == stdin && context.cmd_args.num_processes > 0)
{
log_msg("In order to use multiprocessing, the domain list needs to be supplied as file.\n");
clean_exit(EXIT_FAILURE);
}

return 0;
}

Expand Down
3 changes: 1 addition & 2 deletions massdns.h
Expand Up @@ -165,15 +165,14 @@ typedef struct
bool use_pcap;
size_t num_processes;
size_t socket_count;
bool busypoll;
} cmd_args;

struct
{
buffer_t interfaces4; // Sockets used for receiving queries
buffer_t interfaces6; // Sockets used for receiving queries
buffer_t queries; // Sockets used for sending out queries
int *pipes;
socket_info_t read_pipe;
socket_info_t write_pipe;
socket_info_t *master_pipes_read;
} sockets;
Expand Down

0 comments on commit 4e66c1b

Please sign in to comment.