Skip to content

Commit

Permalink
Add webhook block private setting
Browse files Browse the repository at this point in the history
  • Loading branch information
alexhermida committed Oct 14, 2018
1 parent 8b130f0 commit e37eb65
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 43 deletions.
1 change: 1 addition & 0 deletions settings/common.py
Expand Up @@ -547,6 +547,7 @@

CELERY_ENABLED = False
WEBHOOKS_ENABLED = False
WEBHOOKS_BLOCK_PRIVATE_ADDRESS = False


# If is True /front/sitemap.xml show a valid sitemap of taiga-front client
Expand Down
4 changes: 2 additions & 2 deletions taiga/base/utils/urls.py
Expand Up @@ -50,7 +50,7 @@ def reverse(viewname, *args, **kwargs):
return get_absolute_url(django_reverse(viewname, *args, **kwargs))


class HostnameValueError(Exception):
class HostnameException(Exception):
pass


Expand All @@ -65,7 +65,7 @@ def validate_private_url(url):
try:
socket_args, *others = socket.getaddrinfo(host, port)
except Exception:
raise HostnameValueError(_("Host access error"))
raise HostnameException(_("Host access error"))

destination_address = socket_args[4][0]
try:
Expand Down
47 changes: 28 additions & 19 deletions taiga/webhooks/tasks.py
Expand Up @@ -21,6 +21,8 @@
import requests
from requests.exceptions import RequestException

from django.conf import settings

from taiga.base.api.renderers import UnicodeJSONRenderer
from taiga.base.utils import json, urls
from taiga.base.utils.db import get_typename_for_model_instance
Expand Down Expand Up @@ -64,6 +66,15 @@ def _generate_signature(data, key):
return mac.hexdigest()


def _remove_leftover_webhooklogs(webhook_id):
# Only the last ten webhook logs traces are required
# so remove the leftover
ids = (WebhookLog.objects.filter(webhook_id=webhook_id)
.order_by("-id")
.values_list('id', flat=True)[10:])
WebhookLog.objects.filter(id__in=ids).delete()


def _send_request(webhook_id, url, key, data):
serialized_data = UnicodeJSONRenderer().render(data)
signature = _generate_signature(serialized_data, key)
Expand All @@ -73,19 +84,22 @@ def _send_request(webhook_id, url, key, data):
"Content-Type": "application/json"
}

try:
urls.validate_destination_address(url)
except urls.IpAddresValueError as e:
# Error validating url
webhook_log = WebhookLog.objects.create(webhook_id=webhook_id, url=url,
status=0,
request_data=data,
request_headers=dict(),
response_data="error-in-request: {}".format(
str(e)),
response_headers={},
duration=0)
return webhook_log
if settings.WEBHOOKS_BLOCK_PRIVATE_ADDRESS:
try:
urls.validate_private_url(url)
except (urls.IpAddresValueError, urls.HostnameException) as e:
# Error validating url
webhook_log = WebhookLog.objects.create(webhook_id=webhook_id, url=url,
status=0,
request_data=data,
request_headers=dict(),
response_data="error-in-request: {}".format(
str(e)),
response_headers={},
duration=0)
return webhook_log
finally:
_remove_leftover_webhooklogs(webhook_id)

request = requests.Request('POST', url, data=serialized_data, headers=headers)
prepared_request = request.prepare()
Expand Down Expand Up @@ -114,12 +128,7 @@ def _send_request(webhook_id, url, key, data):
response_headers=dict(response.headers),
duration=response.elapsed.total_seconds())
finally:
# Only the last ten webhook logs traces are required
# so remove the leftover
ids = (WebhookLog.objects.filter(webhook_id=webhook_id)
.order_by("-id")
.values_list('id', flat=True)[10:])
WebhookLog.objects.filter(id__in=ids).delete()
_remove_leftover_webhooklogs(webhook_id)

return webhook_log

Expand Down
20 changes: 11 additions & 9 deletions taiga/webhooks/validators.py
Expand Up @@ -16,13 +16,14 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import ipaddress
from urllib.parse import urlparse

from django.conf import settings
from django.utils.translation import ugettext as _

from taiga.base.api import validators
from urllib.parse import urlparse

from taiga.base.exceptions import ValidationError

from .models import Webhook


Expand All @@ -31,11 +32,12 @@ class Meta:
model = Webhook

def validate_url(self, attrs, source):
host = urlparse(attrs[source]).hostname
try:
ipa = ipaddress.ip_address(host)
except ValueError:
if settings.WEBHOOKS_BLOCK_PRIVATE_ADDRESS:
host = urlparse(attrs[source]).hostname
try:
ipa = ipaddress.ip_address(host)
except ValueError:
return attrs
if ipa.is_private:
raise ValidationError(_("Not allowed IP Address"))
return attrs
if ipa.is_private:
raise ValidationError(_("Not allowed IP Address"))
return attrs
2 changes: 1 addition & 1 deletion tests/integration/test_webhooks.py
Expand Up @@ -55,7 +55,7 @@ def test_webhook_action_test_transform_to_json(client, data):
response.elapsed.total_seconds.return_value = 100

with patch("taiga.webhooks.tasks.requests.Session.send", return_value=response), \
patch("taiga.base.utils.urls.validate_destination_address", return_value=True):
patch("taiga.base.utils.urls.validate_private_url", return_value=True):
client.login(data.project_owner)
response = client.json.post(url)
assert response.status_code == 200
Expand Down
20 changes: 10 additions & 10 deletions tests/integration/test_webhooks_signals.py
Expand Up @@ -45,25 +45,25 @@ def test_new_object_with_one_webhook_signal(settings):

for obj in objects:
with patch("taiga.webhooks.tasks.requests.Session.send", return_value=response) as session_send_mock, \
patch("taiga.base.utils.urls.validate_destination_address", return_value=True):
patch("taiga.base.utils.urls.validate_private_url", return_value=True):
services.take_snapshot(obj, user=obj.owner, comment="test")
assert session_send_mock.call_count == 1

for obj in objects:
with patch("taiga.webhooks.tasks.requests.Session.send", return_value=response) as session_send_mock, \
patch("taiga.base.utils.urls.validate_destination_address", return_value=True):
patch("taiga.base.utils.urls.validate_private_url", return_value=True):
services.take_snapshot(obj, user=obj.owner)
assert session_send_mock.call_count == 0

for obj in objects:
with patch("taiga.webhooks.tasks.requests.Session.send", return_value=response) as session_send_mock, \
patch("taiga.base.utils.urls.validate_destination_address", return_value=True):
patch("taiga.base.utils.urls.validate_private_url", return_value=True):
services.take_snapshot(obj, user=obj.owner, comment="test")
assert session_send_mock.call_count == 1

for obj in objects:
with patch("taiga.webhooks.tasks.requests.Session.send", return_value=response) as session_send_mock, \
patch("taiga.base.utils.urls.validate_destination_address", return_value=True):
patch("taiga.base.utils.urls.validate_private_url", return_value=True):
services.take_snapshot(obj, user=obj.owner, comment="test", delete=True)
assert session_send_mock.call_count == 1

Expand All @@ -86,25 +86,25 @@ def test_new_object_with_two_webhook_signals(settings):

for obj in objects:
with patch("taiga.webhooks.tasks.requests.Session.send", return_value=response) as session_send_mock, \
patch("taiga.base.utils.urls.validate_destination_address", return_value=True):
patch("taiga.base.utils.urls.validate_private_url", return_value=True):
services.take_snapshot(obj, user=obj.owner, comment="test")
assert session_send_mock.call_count == 2

for obj in objects:
with patch("taiga.webhooks.tasks.requests.Session.send", return_value=response) as session_send_mock, \
patch("taiga.base.utils.urls.validate_destination_address", return_value=True):
patch("taiga.base.utils.urls.validate_private_url", return_value=True):
services.take_snapshot(obj, user=obj.owner, comment="test")
assert session_send_mock.call_count == 2

for obj in objects:
with patch("taiga.webhooks.tasks.requests.Session.send", return_value=response) as session_send_mock, \
patch("taiga.base.utils.urls.validate_destination_address", return_value=True):
patch("taiga.base.utils.urls.validate_private_url", return_value=True):
services.take_snapshot(obj, user=obj.owner)
assert session_send_mock.call_count == 0

for obj in objects:
with patch("taiga.webhooks.tasks.requests.Session.send", return_value=response) as session_send_mock, \
patch("taiga.base.utils.urls.validate_destination_address", return_value=True):
patch("taiga.base.utils.urls.validate_private_url", return_value=True):
services.take_snapshot(obj, user=obj.owner, comment="test", delete=True)
assert session_send_mock.call_count == 2

Expand All @@ -126,12 +126,12 @@ def test_send_request_one_webhook_signal(settings):

for obj in objects:
with patch("taiga.webhooks.tasks.requests.Session.send", return_value=response) as session_send_mock, \
patch("taiga.base.utils.urls.validate_destination_address", return_value=True):
patch("taiga.base.utils.urls.validate_private_url", return_value=True):
services.take_snapshot(obj, user=obj.owner, comment="test")
assert session_send_mock.call_count == 1

for obj in objects:
with patch("taiga.webhooks.tasks.requests.Session.send", return_value=response) as session_send_mock, \
patch("taiga.base.utils.urls.validate_destination_address", return_value=True):
patch("taiga.base.utils.urls.validate_private_url", return_value=True):
services.take_snapshot(obj, user=obj.owner, comment="test", delete=True)
assert session_send_mock.call_count == 1
14 changes: 12 additions & 2 deletions tests/unit/test_utils.py
Expand Up @@ -23,7 +23,7 @@
import re

from taiga.base.utils.urls import get_absolute_url, is_absolute_url, build_url, \
validate_private_url, IpAddresValueError
validate_private_url, IpAddresValueError, HostnameException
from taiga.base.utils.db import save_in_bulk, update_in_bulk, to_tsquery

pytestmark = pytest.mark.django_db
Expand Down Expand Up @@ -103,6 +103,8 @@ def test_update_in_bulk_with_a_callback():
('""', "'\"\"':*"),
('"""', "'\"\"':* & '\"':*"),
]


def test_to_tsquery():
for (input, expected) in TS_QUERY_TRANSFORMATIONS:
expected = re.sub("([0-9])", r"'\1':*", expected)
Expand All @@ -121,13 +123,21 @@ def test_to_tsquery():
"http://[::ffff:c0a8:164]/",
"scp://192.168.1.100/",
"http://www.192.168.1.100.xip.io/",
"http://test.local/",
])
def test_validate_bad_destination_address(url):
with pytest.raises(IpAddresValueError):
validate_private_url(url)


@pytest.mark.parametrize("url", [
"http://test.local/",
"http://test.test/",
])
def test_validate_invalid_destination_address(url):
with pytest.raises(HostnameException):
validate_private_url(url)


@pytest.mark.parametrize("url", [
"http://192.167.0.12",
"http://11.0.0.1",
Expand Down

0 comments on commit e37eb65

Please sign in to comment.