Skip to content

Commit

Permalink
Deduplicated TCP connect code
Browse files Browse the repository at this point in the history
  • Loading branch information
irl committed Oct 18, 2016
1 parent ba2acb7 commit ed181a9
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 38 deletions.
34 changes: 34 additions & 0 deletions pathspider/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import threading
import multiprocessing as mp
import queue
from datetime import datetime
from enum import Enum

from ipaddress import ip_address
Expand Down Expand Up @@ -96,6 +97,8 @@ class Conn(Enum):
TIMEOUT = 2
SKIPPED = 3

Connection = collections.namedtuple("Connection", ["client", "port", "state", "tstart"])

QUEUE_SIZE = 1000
QUEUE_SLEEP = 0.5

Expand Down Expand Up @@ -176,6 +179,8 @@ def __init__(self, worker_count, libtrace_uri, args):
self.lock = threading.Lock()
self.exception = None

self.conn_timeout = 0

def config_zero(self):
"""
Changes the global state or system configuration for the
Expand Down Expand Up @@ -615,6 +620,35 @@ def add_job(self, job):

self.jobqueue.put(job)

def tcp_connect(self, job):
"""
This helper function will perform a TCP connection. It will not perform
any special action in the event that this is the experimental flow,
it only performs a TCP connection. This function expects that
self.conn_timeout has been set to a sensible value.
"""

if self.conn_timeout is None:
self.conn_timeout = 3

tstart = str(datetime.utcnow())

if ":" in job[0]:
sock = socket.socket(socket.AF_INET6)
else:
sock = socket.socket(socket.AF_INET)

try:
sock.settimeout(self.conn_timeout)
sock.connect((job[0], job[1]))

return Connection(sock, sock.getsockname()[1], Conn.OK, tstart)
except TimeoutError:
return Connection(sock, sock.getsockname()[1], Conn.TIMEOUT, tstart)
except OSError:
return Connection(sock, sock.getsockname()[1], Conn.FAILED, tstart)


# def local_address(ipv=4, target="path-ams.corvid.ch", port=53):
# if ipv == 4:
# addrfamily = socket.AF_INET
Expand Down
25 changes: 6 additions & 19 deletions pathspider/plugins/dscp.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from pathspider.base import SynchronizedSpider
from pathspider.base import PluggableSpider
from pathspider.base import Conn
from pathspider.base import Connection
from pathspider.base import NO_FLOW

from pathspider.observer import Observer
Expand All @@ -20,7 +21,6 @@
from pathspider.observer.tcp import tcp_handshake
from pathspider.observer.tcp import tcp_complete

Connection = collections.namedtuple("Connection", ["client", "port", "state"])
SpiderRecord = collections.namedtuple("SpiderRecord", ["ip", "rport", "port",
"host", "config",
"connstate"])
Expand Down Expand Up @@ -83,38 +83,25 @@ def config_one(self):
'--set-dscp', str(self.args.codepoint)])
logger.debug("Configurator enabled DSCP marking")

def _connect(self, sock, job):
try:
sock.settimeout(self.conn_timeout)
sock.connect((job[0], job[1]))

return Connection(sock, sock.getsockname()[1], Conn.OK)
except TimeoutError:
return Connection(sock, sock.getsockname()[1], Conn.TIMEOUT)
except OSError:
return Connection(sock, sock.getsockname()[1], Conn.FAILED)

def connect(self, job, pcs, config):
"""
Performs a TCP connection.
"""

if ":" in job[0]:
sock = socket.socket(socket.AF_INET6)
else:
sock = socket.socket(socket.AF_INET)

conn = self._connect(sock, job)
conn = self.tcp_connect(job)
sock = conn.client

try:
sock.shutdown(socket.SHUT_RDWR)
sock.close()
# FIXME: This is intended to ensure the connection is done and
# won't see futher packets after the next configuration, but the
# observer functions could also be made more robust too.
except:
pass

return conn


def post_connect(self, job, conn, pcs, config):
"""
Create the SpiderRecord
Expand Down
20 changes: 1 addition & 19 deletions pathspider/plugins/ecn.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from pathspider.observer.tcp import TCP_SAE
from pathspider.observer.tcp import TCP_SAEC

Connection = collections.namedtuple("Connection", ["client", "port", "state", "tstart"])
SpiderRecord = collections.namedtuple("SpiderRecord", ["ip", "rport", "port",
"rank", "host", "config",
"connstate", "tstart", "tstop"])
Expand Down Expand Up @@ -96,24 +95,7 @@ def connect(self, job, pcs, config):
Performs a TCP connection.
"""

job_ip, job_port, job_host, job_rank = job

tstart = str(datetime.utcnow())

if ":" in job_ip:
sock = socket.socket(socket.AF_INET6)
else:
sock = socket.socket(socket.AF_INET)

try:
sock.settimeout(self.conn_timeout)
sock.connect((job_ip, job_port))

return Connection(sock, sock.getsockname()[1], Conn.OK, tstart)
except TimeoutError:
return Connection(sock, sock.getsockname()[1], Conn.TIMEOUT, tstart)
except OSError:
return Connection(sock, sock.getsockname()[1], Conn.FAILED, tstart)
return self.tcp_connect(job)

def post_connect(self, job, conn, pcs, config):
"""
Expand Down

0 comments on commit ed181a9

Please sign in to comment.