Skip to content

Commit

Permalink
moved resolver clients to own python file. added scamper component. r…
Browse files Browse the repository at this point in the history
…emoved dead code
  • Loading branch information
gubser committed Jul 7, 2015
1 parent 5dd90dd commit 5df8764
Show file tree
Hide file tree
Showing 12 changed files with 3,313 additions and 181 deletions.
19 changes: 15 additions & 4 deletions pathspider/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import mplane.tls
import mplane.utils
import pathspider.client
import pathspider.client.resolver
import time

here = os.path.abspath(os.path.dirname(__file__))
Expand Down Expand Up @@ -34,9 +35,19 @@ def run_standalone(args, config):

def run_client(args, config):
tls_state = mplane.tls.TlsState(config)
resolver = pathspider.client.BtDhtResolverClient(tls_state, "http://localhost:18888/")

ecnspider = pathspider.client.EcnSpiderClient(1000, tls_state, [('local', "http://localhost:18888/")], resolver)
resolver = pathspider.client.resolver.BtDhtResolverClient(tls_state, "http://localhost:18888/")
reasoner = pathspider.client.Reasoner()

if False:
ecnspider = pathspider.client.EcnSpiderClient(1000, tls_state, [('local', "http://localhost:18888/")], resolver, reasoner)
elif False:
import pickle
chunk = pickle.load(open('compiled_chunk.pickle', 'rb'))
reasoner.process(chunk)
print("done")
else:
imp = pathspider.client.TraceboxImp('local', tls_state, "http://localhost:18888/")
imp.add('192.33.91.96', 22)

while True:
time.sleep(10)
Expand All @@ -46,7 +57,7 @@ def main():
# parse command line
parser = argparse.ArgumentParser()
parser.add_argument('--version', action='version', version='%(prog)s '+version)
parser.add_argument('--mode', '-m', choices=['standalone', 'client', 'service'], required=True, help='Set the operating mode.')
parser.add_argument('--mode', '-m', choices=['standalone', 'client', 'service', 'reasoner'], required=True, help='Set the operating mode.')
parser.add_argument('--config', '-c', default='AUTO', help='Set pathspider configuration file.')

args = parser.parse_args()
Expand Down
215 changes: 40 additions & 175 deletions pathspider/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@
Ecnspider2: Qofspider-based tool for measuring ECN-linked connectivity
Derived from ECN Spider (c) 2014 Damiano Boppart <hat.guy.repo@gmail.com>
Simple client for collecting endpoint addresses from the BitTorrent network
and performing ECN measurements on them.
.. moduleauthor:: Elio Gubser <elio.gubser@alumni.ethz.ch>
This program is free software; you can redistribute it and/or modify
Expand All @@ -27,139 +24,20 @@
import mplane.tls
import mplane.utils
import mplane.client
import mplane.supervisor
import argparse
import configparser
import time
import collections
import threading
import itertools
import numpy as np
import pandas as pd
from ipaddress import ip_address

STAGE_RESOLVER = 0x00
STAGE_ECNSPIDER = 0x01
STAGE_TRACBOX = 0x02
STAGE_ERROR = 0x03

STATUS_QUEUED = 0x00
STATUS_RUNNING = 0x01
STATUS_FINISHED = 0x02
STATUS_ERROR = 0xFF

def take(count, iterable):
"""
Iterate over at most count elements in iterable.
"""
it = iter(iterable)
for index in range(0, count):
if index >= count:
break
yield next(it)
import pickle

class NotFinishedException(Exception):
pass

class TimeoutException(Exception):
class NotFinishedException(Exception):
pass

ResolverResult = collections.namedtuple('ResolverResult', ['identifier', 'ip', 'port'])

class ResolverClient:
def __init__(self, tls_state, resolver_url):
self.url = resolver_url
self.client = mplane.client.HttpInitiatorClient(tls_state)
self.client.retrieve_capabilities(self.url)
self.last_updated = 0
self.lock = threading.RLock()

def _fetch_result(self, token, request_timeout):
time_spent = 0
while time_spent < request_timeout:
with self.lock:
try:
# limit polling to once every 5 seconds
if self.last_updated + 5 < time.time():
# update capabilities information
self.client.retrieve_capabilities(self.url)
self.last_updated = time.time()
except:
print(str(self.url) + " unreachable. Retrying in 5 seconds")

# check results
result = self.client.result_for(token)
if isinstance(result, mplane.model.Exception):
print(result.__repr__())
self.client.forget(token)
return None
elif isinstance(result, mplane.model.Receipt):
pass
elif isinstance(result, mplane.model.Result):
addrs = list(result.schema_dict_iterator())
self.client.forget(token)
return addrs
else:
# other result, just print it out
print(result)

time.sleep(5)
time_spent += 5

raise TimeoutException("Could not complete address retrieval within timeout period.")

def request(self, count, ipv='ip4', when = 'now ... future', request_timeout = 30):
raise NotImplementedError("You have to implement the generator() function in your subclass of ResolverClient.")

class BtDhtResolverClient(ResolverClient):
def __init__(self, tls_state, resolver_url):
super(BtDhtResolverClient, self).__init__(tls_state, resolver_url)

def request(self, count, ipv='ip4', when = 'now ... future', request_timeout = 30):
token = None
with self.lock:
label = 'btdhtresolver-'+ipv
try:
spec = self.client.invoke_capability(label, when, { "btdhtresolver.count": count })
token = spec.get_token()
except KeyError as e:
print("Specified URL does not support '"+label+"' capability.")
raise e

if token is None:
raise ValueError("Could not acquire request token.")

return [(row['destination.'+ipv], row['destination.port']) for row in self._fetch_result(token, request_timeout)]

class WebResolverClient(ResolverClient):
def __init__(self, tls_state, resolver_url, urls = None):
super(WebResolverClient, self).__init__(tls_state, resolver_url)
self.lock = threading.RLock()
self.queued = collections.deque()
if urls is not None:
self.queued.extend(urls)

def extend(self, urls):
with self.lock:
self.queued.extend(urls)

def request(self, count, ipv='ip4', when = 'now ... future', request_timeout = 30):
token = None
with self.lock:
label = 'btdhtresolver-'+ipv
try:
urls = list(take(count, self.queued))
spec = self.client.invoke_capability(label, when, { "destination.url": urls })
token = spec.get_token()
except KeyError as e:
print("Specified URL does not support '"+label+"' capability.")
raise e

if token is None:
raise ValueError("Could not acquire request token.")

return [(row['destination.'+ipv], row['destination.port']) for row in self._fetch_result(token, request_timeout)]




class EcnSpiderImp:
QUEUED_MIN_LENGTH = 10

Expand Down Expand Up @@ -241,6 +119,8 @@ def add_chunk(self, addrs, chunk_id, ipv, when):
self.queued.append(EcnSpiderImp.ChunkJob(chunk_id, addrs, ipv, when, None))

class TraceboxImp:
TraceboxJob = collections.namedtuple('TraceboxJob', ['ip', 'port', 'ipv', 'probe', 'when'])

def __init__(self, name, tls_state, url):
self.name = name
self.url = url
Expand All @@ -251,16 +131,20 @@ def __init__(self, name, tls_state, url):
self.pending = None
self.finished = {}

self.lock = threading.RLock()
self.worker_thread = threading.Thread(target=self.worker, name='TraceboxImp-'+self.name, daemon=True)
self.worker_thread.start()

def worker(self):
while True:
with self.lock:
if self.pending_token is None and len(self.queued) > 0:
print("sending tracebox request")
self.pending = self.queued.popleft()
label = ''
label = 'scamper-tracebox-specific-'+self.pending.ipv
try:
spec = self.client.invoke_capability(label, self.pending.when,
{ } )
{ 'destination.'+self.pending.ipv: self.pending.ip, 'scamper.tracebox.dport': self.pending.port, 'scamper.tracebox.probe': self.pending.probe } )
self.pending_token = spec.get_token()
except KeyError as e:
print("Specified URL does not support '"+label+"' capability.")
Expand Down Expand Up @@ -289,10 +173,10 @@ def worker(self):
elif isinstance(result, mplane.model.Receipt):
pass
elif isinstance(result, mplane.model.Result):
print("received result for chunk id: ", self.pending.chunk_id)
print("received result for: ", self.pending.ip)
# add to results
self.client.forget(self.pending_token)
self.finished[self.pending.url] = list(result.schema_dict_iterator())
self.finished[self.pending.ip] = list(result.schema_dict_iterator())
self.pending_token = None
self.pending = None
else:
Expand All @@ -302,15 +186,20 @@ def worker(self):
time.sleep(5)


def add(self, url):
self.queued.append(url)
def add(self, ip, port, when='now ... future', mode='tcp'):
if mode == 'tcp':
probe = 'IP/TCP/ECE'
else:
raise NotImplementedError("This mode is not implemented.")

self.queued.append(TraceboxImp.TraceboxJob(ip, port, 'ip'+str(ip_address(ip).version), probe, when))

class EcnSpiderClient:
# queued chunks - in queue to be sent to ecnspider component, feeder thread loads addresses
# pending chunks - currently processing
# finished chunks - finished, interrupted chunks or chunks which encountered an exception, consumer thread takes finished chunks and do further investigation

def __init__(self, count, tls_state, ecnspiders_name_and_urls, resolver, chunk_size = 200, ipv='ip4'):
def __init__(self, count, tls_state, ecnspiders_name_and_urls, resolver, reasoner, chunk_size = 200, ipv='ip4'):
self.chunk_size = chunk_size
self.ipv = ipv
self.imps = [EcnSpiderImp(name, tls_state, url) for name, url in ecnspiders_name_and_urls]
Expand All @@ -321,6 +210,7 @@ def __init__(self, count, tls_state, ecnspiders_name_and_urls, resolver, chunk_s
self.imp_merger_thread = threading.Thread(target=self.imp_merger, daemon=True, name="consumer")

self.resolver = resolver
self.reasoner = reasoner

self.count = 0

Expand Down Expand Up @@ -358,62 +248,37 @@ def imp_merger(self):
time.sleep(5)
continue

merged_chunks = {}
for chunk_id in chunks_finished:
merged_chunk = {}
compiled_chunk = {}
for imp in self.imps:
with imp.lock:
merged_chunk[imp.name] = imp.finished.pop(chunk_id)

merged_chunks[chunk_id] = merged_chunk
compiled_chunk[imp.name] = pd.DataFrame(imp.finished.pop(chunk_id))

print("merged results:", merged_chunks)
pickle.dump(compiled_chunk, open("compiled_chunk.pickle", 'wb'))
exit(0)

#self.reasoner.process(compiled_chunk)

class Reasoner:
def __init__(self):
pass

"""
class Client:
def __init__(self, count, tls_state, resolver_url, probes_url):
self.count = count
self.probes = []
for probe_url in probes_url:
probe = mplane.client.HttpInitiatorClient(tls_state)
probe.retrieve_capabilities(probe_url)
self.probes.append(probe)
def process(self, compiled_chunk: dict, ipv='ip4'):
"""
results = { 'ams': [result_row, result_row...], 'sin': [result_row....], ...}
"""

self.stage_resolver = collections.deque()
self.stage_ecnspider = collections.deque()
self.stage_tracebox = collections.deque()
self.stage_finished = collections.deque()

def add_webresolver(self, hostname):
self.stage_resolver.append(WebresolverOrder(hostname, STATUS_QUEUED, None, None))
# get all different ip addresses
ips = set([row['destination.'+ipv] for row in itertools.chain(*compiled_chunk.values())])
print("reasoner: got {} ips".format(len(ips)))

def add_btdhtresolver(self, count, ipv=4):
self.stage_resolver.append((count, ipv))

def handle_message(self, msg, identity):
if isinstance(msg, mplane.model.Capability):
pass

elif isinstance(msg, mplane.model.Receipt):
pass

elif (isinstance(msg, mplane.model.Result) or
isinstance(msg, mplane.model.Exception)):
pass

elif isinstance(msg, mplane.model.Withdrawal):
pass
elif isinstance(msg, mplane.model.Envelope):
for imsg in msg.messages():
self.handle_message(imsg, identity)
else:
raise ValueError("Internal error: unknown message "+repr(msg))

"""
def retrieve_addresses(client, ipv, count, label, url, unique = True, when = "now ... future"):
try:
spec = client.invoke_capability(label, when, { "btdhtspider.count": count, "btdhtspider.unique": unique })
Expand Down

0 comments on commit 5df8764

Please sign in to comment.