Skip to content

Commit

Permalink
test parallelization
Browse files Browse the repository at this point in the history
  • Loading branch information
shivammathur committed May 19, 2017
1 parent 30119a0 commit 4f0608c
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 32 deletions.
60 changes: 32 additions & 28 deletions ippy/ippy.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import threading
import pingparsing
import queue

install_aliases()


Expand Down Expand Up @@ -108,41 +109,14 @@ def get_filepath(self):
file_path = os.path.join(script_dir, self.file)
return file_path

@staticmethod
def worker_func(ping_args, pending, done):
""" Function to perform the actual ping.
:param ping_args: Ping arguments
:param pending: Pending queue
:param done: Done queue
"""
try:
while True:
# Get the next address to ping.
address = pending.get_nowait()

ping = subprocess.Popen(ping_args + [address],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
out, error = ping.communicate()

# Output the result to the 'done' queue.
done.put(({'result': out, 'ip': address}, error))
except queue.Empty:
# No more addresses.
pass
finally:
# Tell the main thread that a worker is about to terminate.
done.put(None)

def run(self):
""" Function to put everything together and parse the ping outputs.
"""
ping_args = self.get_ping_args()
file_path = self.get_filepath()

for _ in range(self.num_workers):
self._workers.append(threading.Thread(target=self.worker_func, args=(ping_args, self._pending, self._done)))
self._workers.append(threading.Thread(target=worker_func, args=(ping_args, self._pending, self._done)))

# Put all the addresses into the 'pending' queue.
with open(file_path, "r") as hostsFile:
Expand Down Expand Up @@ -200,3 +174,33 @@ def result(self):
raise ValueError("Unknown output mode")

return result


def worker_func(ping_args, pending, done):
""" Function to perform the actual ping.
:param ping_args: Ping arguments
:param pending: Pending queue
:param done: Done queue
"""
try:
while True:
# Get the next address to ping.
address = pending.get_nowait()
try:
ping = subprocess.Popen(ping_args + [address],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
out, error = ping.communicate()

# Output the result to the 'done' queue.
done.put(({'result': out, 'ip': address}, error))
except Exception as e:
print(e)

except queue.Empty:
# No more addresses.
pass
finally:
# Tell the main thread that a worker is about to terminate.
done.put(None)
8 changes: 4 additions & 4 deletions tests/test_ippy.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def test_filepath():

def test_run():
ippt = ippy.Ippy()
ippt.set_config(True, 'csv', 10)
ippt.set_config(True, 'csv', 1)
ippt.set_file(file='ip_list.csv')
ippt.run()

Expand All @@ -78,7 +78,7 @@ def test_run():

def test_empty_result():
ippt = ippy.Ippy()
ippt.set_config(False, 'test', 10)
ippt.set_config(False, 'test', 1)
ippt.set_file(file='ip_list.csv')
ippt.run()
with pytest.raises(ValueError):
Expand All @@ -87,14 +87,14 @@ def test_empty_result():

def test_json_result():
ippt = ippy.Ippy()
ippt.set_config(False, 'json', 10)
ippt.set_config(False, 'json', 1)
result = ippt.result()
assert json.loads(result) is not None


def test_csv_result():
ippt = ippy.Ippy()
ippt.set_config(False, 'csv', 10)
ippt.set_config(False, 'csv', 1)
ippt.set_file(file='ip_list.csv')
ippt.run()
result = ippt.result()
Expand Down

0 comments on commit 4f0608c

Please sign in to comment.