Skip to content
This repository has been archived by the owner on Apr 19, 2019. It is now read-only.

Commit

Permalink
Move URL truncation and blacklisting to url plugin
Browse files Browse the repository at this point in the history
The plugin is the only thing in Omnipresence that makes use of these
features to begin with, so it's more logical that they live there.
  • Loading branch information
kxz committed Jul 29, 2015
1 parent 6a0e519 commit 4b53dd2
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 127 deletions.
66 changes: 66 additions & 0 deletions omnipresence/plugins/url/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,18 @@


import re
import sys
from urlparse import urlparse

import ipaddress
from twisted.internet import defer, reactor
from twisted.web.client import IAgent, _ReadBodyProtocol
from zope.interface import implements


#
# Utility methods
#

# Based on django.utils.html.urlize from the Django project.
TRAILING_PUNCTUATION = ['.', ',', ':', ';', '.)', '"', "'", '!']
Expand Down Expand Up @@ -36,3 +47,58 @@ def extract_urls(text):
# Yield the resulting URL.
if SIMPLE_URL_RE.match(middle):
yield middle


#
# Twisted HTTP machinery
#

class TruncatingReadBodyProtocol(_ReadBodyProtocol):
"""A protocol that collects data sent to it up to a maximum of
*max_bytes*, then discards the rest."""

def __init__(self, status, message, deferred, max_bytes=None):
_ReadBodyProtocol.__init__(self, status, message, deferred)
self.remaining = self.max_bytes = (max_bytes or sys.maxsize)

def dataReceived(self, data):
if self.remaining > 0:
to_buffer = data[:self.remaining]
_ReadBodyProtocol.dataReceived(self, to_buffer)
self.remaining -= len(to_buffer)


class BlacklistedHost(Exception):
"""Raised when a `BlacklistingAgent` attempts to request a
blacklisted resource."""

def __init__(self, hostname, ip):
self.hostname = hostname
self.ip = ip

def __str__(self):
return 'host {} corresponds to blacklisted IP {}'.format(
self.hostname, self.ip)


class BlacklistingAgent(object):
"""An `~twisted.web.client.Agent` wrapper that forbids requests to
loopback, private, and internal IP addresses."""
implements(IAgent)

def __init__(self, agent, resolve=None):
self.agent = agent
self.resolve = resolve or reactor.resolve

@defer.inlineCallbacks
def request(self, method, uri, headers=None, bodyProducer=None):
"""Issue a request to the server indicated by *uri*."""
hostname = urlparse(uri).hostname
ip_str = yield self.resolve(hostname)
# `ipaddress` takes a Unicode string and I don't really care to
# handle `UnicodeDecodeError` separately.
ip = ipaddress.ip_address(ip_str.decode('ascii', 'replace'))
if ip.is_private or ip.is_loopback or ip.is_link_local:
raise BlacklistedHost(hostname, ip)
response = yield self.agent.request(method, uri, headers, bodyProducer)
defer.returnValue(response)
68 changes: 65 additions & 3 deletions omnipresence/plugins/url/test_url.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
# -*- coding: utf-8 -*-
"""Unit tests for the url event plugin."""
# pylint: disable=missing-docstring,too-few-public-methods


from __future__ import unicode_literals
from twisted.internet.defer import Deferred, succeed
from twisted.python.failure import Failure
from twisted.trial import unittest
from twisted.web.client import ResponseDone
from twisted.web.test.test_agent import (AgentTestsMixin,
FakeReactorAndConnectMixin)

from . import extract_urls
from . import (extract_urls, TruncatingReadBodyProtocol,
BlacklistingAgent, BlacklistedHost)


class ExtractURLsTestCase(unittest.TestCase):
Expand Down Expand Up @@ -36,6 +40,8 @@ def test_word_with_dot(self):
def test_parentheses(self):
self._assert_urls('http://example.com/a_(b)',
['http://example.com/a_(b)'])
self._assert_urls('(http://example.com/a_(b))',
['http://example.com/a_(b)'])
self._assert_urls('(see http://example.com/a_(b))',
['http://example.com/a_(b)'])

Expand Down Expand Up @@ -64,3 +70,59 @@ def test_catastrophic_backtracking(self):
self._assert_urls(
'http://i.ebayimg.com/00/s/MTAwOFgxMDI0/$(KGrHqYOKo0E6fEy4,lqBOt,yzoor!~~60_12.JPG',
['http://i.ebayimg.com/00/s/MTAwOFgxMDI0/$(KGrHqYOKo0E6fEy4,lqBOt,yzoor!~~60_12.JPG'])


class TruncatingReadBodyProtocolTestCase(unittest.TestCase):
def _assert_delivery(self, data, expected):
finished = Deferred()
protocol = TruncatingReadBodyProtocol(200, 'OK', finished, 8)
finished.addCallback(self.assertEqual, expected)
protocol.dataReceived(data)
protocol.connectionLost(Failure(ResponseDone()))
return finished

def test_complete(self):
return self._assert_delivery('#' * 8, '#' * 8)

def test_truncated(self):
return self._assert_delivery('#' * 16, '#' * 8)


class BlacklistingAgentTestCase(unittest.TestCase,
FakeReactorAndConnectMixin, AgentTestsMixin):
# <https://twistedmatrix.com/trac/ticket/4024>... one wishes.
#
# Based in part on `twisted.web.test.test_agent.RedirectAgentTests`.

sample_hosts = ('localhost', '0.0.0.0', '10.0.0.1', '127.0.0.1',
'169.254.0.1', '172.16.0.1', '192.168.0.1')

@staticmethod
def resolve(hostname):
if hostname == 'localhost':
return succeed('127.0.0.1')
elif hostname == 'foo.test':
return succeed('8.8.8.8')
return succeed(hostname)

def makeAgent(self):
return BlacklistingAgent(self.buildAgentForWrapperTest(self.reactor),
resolve=self.resolve)

def setUp(self):
self.reactor = self.Reactor()
self.agent = self.makeAgent()

def test_no_blacklist(self):
self.agent.request('GET', 'http://foo.test/')

def _assert_blacklist(self, method, uri):
d = self.agent.request(method, uri)
f = self.failureResultOf(d, BlacklistedHost)

def test_blacklist(self):
for protocol in ('http', 'https'):
for host in self.sample_hosts:
uri = '{}://{}/'.format(protocol, host)
for method in ('GET', 'POST'):
self._assert_blacklist(method, uri)
69 changes: 0 additions & 69 deletions omnipresence/test/test_http.py

This file was deleted.

57 changes: 2 additions & 55 deletions omnipresence/web/http.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,10 @@
"""Wrappers for Twisted's HTTP request machinery."""


import sys
from urlparse import urlparse

import ipaddress
from twisted.internet import defer, reactor
from twisted.web.client import (
IAgent, Agent, ContentDecoderAgent, RedirectAgent, GzipDecoder,
_ReadBodyProtocol, PartialDownloadError)
from twisted.web.client import (Agent, ContentDecoderAgent, RedirectAgent,
GzipDecoder, PartialDownloadError)
from twisted.web.http_headers import Headers
from zope.interface import implements

from .. import __version__, __source__

Expand All @@ -19,53 +13,6 @@
USER_AGENT = 'Omnipresence/{} (+bot; {})'.format(__version__, __source__)


class TruncatingReadBodyProtocol(_ReadBodyProtocol):
"""A protocol that collects data sent to it up to a maximum of
*max_bytes*, then discards the rest."""

def __init__(self, status, message, deferred, max_bytes=None):
_ReadBodyProtocol.__init__(self, status, message, deferred)
self.remaining = self.max_bytes = (max_bytes or sys.maxsize)

def dataReceived(self, data):
if self.remaining > 0:
to_buffer = data[:self.remaining]
_ReadBodyProtocol.dataReceived(self, to_buffer)
self.remaining -= len(to_buffer)


class BlacklistedHost(Exception):
def __init__(self, hostname, ip):
self.hostname = hostname
self.ip = ip

def __str__(self):
return 'host {} corresponds to blacklisted IP {}'.format(
self.hostname, self.ip)


class BlacklistingAgent(object):
"""An `~twisted.web.client.Agent` wrapper that forbids requests to
loopback, private, and internal IP addresses."""
implements(IAgent)

def __init__(self, agent, resolve=None):
self.agent = agent
self.resolve = resolve or reactor.resolve

@defer.inlineCallbacks
def request(self, method, uri, headers=None, bodyProducer=None):
hostname = urlparse(uri).hostname
ip_str = yield self.resolve(hostname)
# `ipaddress` takes a Unicode string and I don't really care to
# handle `UnicodeDecodeError` separately.
ip = ipaddress.ip_address(ip_str.decode('ascii', 'replace'))
if ip.is_private or ip.is_loopback or ip.is_link_local:
raise BlacklistedHost(hostname, ip)
response = yield self.agent.request(method, uri, headers, bodyProducer)
defer.returnValue(response)


default_agent = ContentDecoderAgent(RedirectAgent(Agent(reactor)),
[('gzip', GzipDecoder)])

Expand Down

0 comments on commit 4b53dd2

Please sign in to comment.