Skip to content

Commit

Permalink
fixed #84
Browse files Browse the repository at this point in the history
The problem was that the output worker would shut down before the
resolution workers, which caused oq.put() to block because the queue
filled up.

Fixed this by changing the shutting down sequence.
  • Loading branch information
pietdevaere committed Oct 22, 2016
1 parent cfa1cdb commit cb0a116
Showing 1 changed file with 17 additions and 7 deletions.
24 changes: 17 additions & 7 deletions pathspider/util/dnsresolv.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import argparse
from time import sleep
import logging
import random

TIMEOUT = None #: The timeout for DNS resolution.
SLEEP = None #: Time to sleep before each resolution, for crude rate-limiting.
Expand Down Expand Up @@ -57,15 +58,15 @@ def resolve(domain, query='A', max_tries=3):
# answer now is an array of strings of ip's
break

if max_tries == 0: return None
if max_tries <= 0: return None
return answer


def resolve_both(domain):
'''
Gets al A and AAAA records for domain
'''
a = resolve(domain)
a = resolve(domain, 'A')
a4 = resolve(domain, 'AAAA')

return (a, a4)
Expand Down Expand Up @@ -100,13 +101,15 @@ def csv_gen(skip=0, count=0, *args, **kwargs):

def resolution_worker(iq, oq, only_first=False):
logger = logging.getLogger('dnsresolv')

while True:
entry = iq.get()

# Shutdown and cascade
if entry is None:
logger.info("Input cascading shutdown signal")
oq.put(None)
# ad a random value, so you can see that the prompt is still moving
logger.debug("Resolution worker shutting down {}"
.format(random.random()))
iq.task_done()
break

Expand Down Expand Up @@ -146,13 +149,14 @@ def resolution_worker(iq, oq, only_first=False):
# now, if we didn't get an A or AAAA record,
# try to get if for the domain
if aw == None:
a = resolve(domain)
a = resolve(domain, 'A')
if a4w == None:
a4 = resolve(domain, 'AAAA')
else:
logger.error("Internal error: illegal WWW value")
sys.exit(1)


## SECOND: see what records we received, and process them

if a != None:
Expand Down Expand Up @@ -301,12 +305,18 @@ def main(args):
logger.info(logstring.format(num_dom=dc+1, cur=current_rate,
avg=average_rate))

# now enqueue a quit signal
iq.put(None)
# now enqueue a quit signal, one for each worker
for i in range(args.workers):
iq.put(None)

# wait for queues to drain
logger.info('Sent shtudown signal to all resolution workers')
iq.join()
logger.info('All resolution workers have shut down')
logger.info('Sending shut down signal to output worker')
oq.put(None)
ot.join()
logger.info('Output worker has shut down')

t1 = datetime.datetime.now()
time = t1 - t0
Expand Down

0 comments on commit cb0a116

Please sign in to comment.