Skip to content

Commit

Permalink
Refs #74 - Accept dummy hashes in submissions
Browse files Browse the repository at this point in the history
Currently follows the implementation suggested in
#74 (comment)
including automatic upgrading of hashes to MD5-PW.
  • Loading branch information
mxsasha committed Aug 7, 2018
1 parent b824987 commit cfe146c
Show file tree
Hide file tree
Showing 7 changed files with 227 additions and 48 deletions.
56 changes: 50 additions & 6 deletions irrd/rpsl/rpsl_objects.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from collections import OrderedDict
from typing import Set, List, Optional
from typing import Set, List, Optional, Union

import gnupg

from irrd.conf import get_setting
from irrd.conf import get_setting, PASSWORD_HASH_DUMMY_VALUE
from .config import PASSWORD_HASHERS
from .fields import (RPSLTextField, RPSLIPv4PrefixField, RPSLIPv4PrefixesField, RPSLIPv6PrefixField,
RPSLIPv6PrefixesField, RPSLIPv4AddressRangeField, RPSLASNumberField, RPSLASBlockField,
Expand Down Expand Up @@ -298,25 +298,69 @@ class RPSLMntner(RPSLObject):
("source", RPSLGenericNameField()),
])

def clean(self):
if not super().clean():
return False # pragma: no cover

auth_values = [auth[1] for auth in self._auth_lines(True)]
dummy_matches = [value == PASSWORD_HASH_DUMMY_VALUE for value in auth_values]
if any(dummy_matches) and not all(dummy_matches):
self.messages.error('Either all password auth hashes in a submitted mntner must be dummy objects, or none.')

def verify_auth(self, passwords: List[str], keycert_obj_pk: Optional[str]=None) -> bool:
"""
Verify whether one of a given list of passwords matches
any of the auth hashes in this object, or match the
keycert object PK.
"""
for auth in self.parsed_data.get("auth", "").splitlines():
for auth in self.parsed_data.get('auth', '').splitlines():
if keycert_obj_pk and auth.upper() == keycert_obj_pk:
return True
if " " not in auth:
continue
scheme, hash = auth.split(" ", 1)
scheme, hash = auth.split(' ', 1)
hasher = PASSWORD_HASHERS.get(scheme.upper())
if hasher:
for password in passwords:
if hasher.verify(password, hash):
return True
try:
if hasher.verify(password, hash):
return True
except ValueError:
pass
return False

def has_dummy_auth_value(self) -> bool:
"""
Check whether this object has dummy auth hashes.
If clean() has returned succesfully before, the answer from this method
means that either all or no hashes have dummy values.
"""
auth_values = [auth[1] for auth in self._auth_lines(password_hashes=True)]
return bool(auth_values) and all([value == PASSWORD_HASH_DUMMY_VALUE for value in auth_values])

def force_single_new_password(self, password) -> None:
"""
Overwrite all auth hashes with a single new hash for the provided password.
Retains other methods, i.e. PGPKEY.
"""
hash = 'MD5-PW ' + PASSWORD_HASHERS['MD5-PW'].hash(password)
auths = self._auth_lines(password_hashes=False)
auths.append(hash)
self._update_attribute_value("auth", auths)

def _auth_lines(self, password_hashes=True) -> List[Union[str, List[str]]]:
"""
Return a list of auth values in this object.
By default, returns all lines.
If password_hashes=False, returns only non-hash (i.e. PGPKEY) lines.
If password_hashes=True, returns a list of lists, each inner list containing
the hash method and the hash.
"""
lines = self.parsed_data.get("auth", "").splitlines()
if password_hashes is True:
return [auth.split(' ', 1) for auth in lines if ' ' in auth]
return [auth for auth in lines if ' ' not in auth]


class RPSLPeeringSet(RPSLObject):
fields = OrderedDict([
Expand Down
11 changes: 11 additions & 0 deletions irrd/rpsl/tests/test_rpsl_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from IPy import IP
from pytest import raises

from irrd.conf import PASSWORD_HASH_DUMMY_VALUE
from irrd.utils.rpsl_samples import (object_sample_mapping, SAMPLE_MALFORMED_EMPTY_LINE, SAMPLE_MALFORMED_ATTRIBUTE_NAME,
SAMPLE_UNKNOWN_CLASS, SAMPLE_MISSING_MANDATORY_ATTRIBUTE, SAMPLE_MALFORMED_SOURCE,
SAMPLE_MALFORMED_PK, SAMPLE_UNKNOWN_ATTRIBUTE, SAMPLE_INVALID_MULTIPLE_ATTRIBUTE,
Expand Down Expand Up @@ -292,11 +293,21 @@ def test_parse(self):
rpsl_text = object_sample_mapping[RPSLMntner().rpsl_object_class]
obj = rpsl_object_from_text(rpsl_text)
assert obj.__class__ == RPSLMntner

assert not obj.messages.errors()
assert obj.pk() == "AS760-MNT"
assert obj.parsed_data["mnt-by"] == ['AS760-MNT', 'ACONET-LIR-MNT', 'ACONET2-LIR-MNT']
assert obj.render_rpsl_text() == rpsl_text

def test_parse_invalid_partial_dummy_hash(self):
rpsl_text = object_sample_mapping[RPSLMntner().rpsl_object_class]
rpsl_text = rpsl_text.replace('LEuuhsBJNFV0Q', PASSWORD_HASH_DUMMY_VALUE)
obj = rpsl_object_from_text(rpsl_text)
assert obj.__class__ == RPSLMntner
assert obj.messages.errors() == [
'Either all password auth hashes in a submitted mntner must be dummy objects, or none.'
]

@pytest.mark.usefixtures("tmp_gpg_dir") # noqa: F811
def test_verify(self, tmp_gpg_dir):
rpsl_text = object_sample_mapping[RPSLMntner().rpsl_object_class]
Expand Down
22 changes: 14 additions & 8 deletions irrd/updates/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,26 +142,32 @@ def validate(self) -> bool:

def _check_auth(self) -> bool:
assert self.rpsl_obj_new
auth_error_message = self.auth_validator.check_auth(self.rpsl_obj_new, self.rpsl_obj_current)
if auth_error_message:
auth_result = self.auth_validator.check_auth(self.rpsl_obj_new, self.rpsl_obj_current)
self.info_messages += auth_result.info_messages

if not auth_result.is_valid():
self.status = UpdateRequestStatus.ERROR_AUTH
self.error_messages.append(auth_error_message)
self.error_messages += auth_result.error_messages
return False
return True

def _check_references_to_others(self) -> bool:
"""Check all references from this object to other objects."""
assert self.rpsl_obj_new
references_error_messages = self.reference_validator.check_references_to_others(self.rpsl_obj_new)
self.error_messages += references_error_messages
if references_error_messages and self.is_valid():
self.status = UpdateRequestStatus.ERROR_REFERENCE
return False
references_result = self.reference_validator.check_references_to_others(self.rpsl_obj_new)
self.info_messages += references_result.info_messages

if not references_result.is_valid():
self.error_messages += references_result.error_messages
if self.is_valid(): # Only update the status if this object was valid prior, so this is the first failure
self.status = UpdateRequestStatus.ERROR_REFERENCE
return False
return True

def _check_references_from_others(self) -> bool:
assert self.rpsl_obj_current
references = self.reference_validator.check_references_from_others(self.rpsl_obj_current)
# TODO: fold this into ReferenceValidator
for ref_object_class, ref_pk, ref_source in references:
self.error_messages.append(f'Object {self.rpsl_obj_current.pk()} to be deleted, but still referenced '
f'by {ref_object_class} {ref_pk}')
Expand Down
2 changes: 2 additions & 0 deletions irrd/updates/tests/test_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ def test_parse_invalid_new_objects_pgp_key_does_not_exist(self, prepare_mocks):
[{'object_text': mntner_text}],
[{'object_text': mntner_text}],
[],
[],
])
mock_dh.execute_query = lambda query: next(query_responses)

Expand All @@ -248,6 +249,7 @@ def test_parse_invalid_new_objects_pgp_key_does_not_exist(self, prepare_mocks):
['sources', (['RIPE'],), {}], ['object_classes', (['mntner'],), {}], ['rpsl_pk', ('AS760-MNT',), {}],
['sources', (['RIPE'],), {}], ['object_classes', (['mntner'],), {}], ['rpsl_pks', (['AS760-MNT'],), {}],
['sources', (['RIPE'],), {}], ['object_classes', (['mntner'],), {}], ['rpsl_pks', (['AS760-MNT'],), {}],
['sources', (['RIPE'],), {}], ['object_classes', (['mntner'],), {}], ['rpsl_pks', (['AS760-MNT'],), {}]
]

assert mock_dh.mock_calls[0][0] == 'commit'
Expand Down
118 changes: 108 additions & 10 deletions irrd/updates/tests/test_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
from unittest.mock import Mock

import pytest
from passlib.handlers.md5_crypt import md5_crypt
from pytest import raises

from irrd.conf import PASSWORD_HASH_DUMMY_VALUE
from irrd.utils.rpsl_samples import SAMPLE_INETNUM, SAMPLE_AS_SET, SAMPLE_PERSON, SAMPLE_MNTNER
from irrd.utils.test_utils import flatten_mock_calls
from ..parser import parse_update_requests
Expand Down Expand Up @@ -361,6 +363,103 @@ def test_check_auth_invalid_create_mntner_referencing_self_wrong_password(self,
['rpsl_pk', ('AS760-MNT',), {}],
]

def test_check_auth_invalid_create_mntner_referencing_self_with_dummy_passwords(self, prepare_mocks):
mock_dq, mock_dh = prepare_mocks

mock_dh.execute_query = lambda query: []

reference_validator = ReferenceValidator(mock_dh)
auth_validator = AuthValidator(mock_dh)

# Submit the mntner with dummy password values as would be returned by queries.
# This should not be allowed in new objects.
data = SAMPLE_MNTNER.replace('LEuuhsBJNFV0Q', PASSWORD_HASH_DUMMY_VALUE)
data = data.replace('$1$fgW84Y9r$kKEn9MUq8PChNKpQhO6BM.', PASSWORD_HASH_DUMMY_VALUE)
result_mntner = parse_update_requests(data + 'password: crypt-password',
mock_dh, auth_validator, reference_validator)[0]
auth_validator.pre_approve([result_mntner])

assert not result_mntner._check_auth()
assert result_mntner.error_messages == ['Authorisation failed for the auth methods on this mntner object.']
assert flatten_mock_calls(mock_dq) == [
['sources', (['RIPE'],), {}],
['object_classes', (['mntner'],), {}],
['rpsl_pk', ('AS760-MNT',), {}],
]

def test_check_auth_valid_update_mntner_submits_new_object_with_all_dummy_hash_values(self, prepare_mocks):
mock_dq, mock_dh = prepare_mocks

mock_dh.execute_query = lambda query: [{'object_text': SAMPLE_MNTNER}]

reference_validator = ReferenceValidator(mock_dh)
auth_validator = AuthValidator(mock_dh)

# Submit the mntner with dummy password values as would be returned by queries,
# but a password attribute that is valid for the current DB object.
data = SAMPLE_MNTNER.replace('LEuuhsBJNFV0Q', PASSWORD_HASH_DUMMY_VALUE)
data = data.replace('$1$fgW84Y9r$kKEn9MUq8PChNKpQhO6BM.', PASSWORD_HASH_DUMMY_VALUE)
result_mntner = parse_update_requests(data + 'password: crypt-password',
mock_dh, auth_validator, reference_validator)[0]
auth_validator.pre_approve([result_mntner])
assert result_mntner._check_auth()
assert not result_mntner.error_messages
auth_pgp, auth_hash = result_mntner.rpsl_obj_new.parsed_data['auth'].splitlines()
assert auth_pgp == 'PGPKey-80F238C6'
assert auth_hash.startswith('MD5-PW ')
assert md5_crypt.verify('crypt-password', auth_hash[7:])
assert auth_hash in result_mntner.rpsl_obj_new.render_rpsl_text()

assert flatten_mock_calls(mock_dq) == [
['sources', (['RIPE'],), {}],
['object_classes', (['mntner'],), {}],
['rpsl_pk', ('AS760-MNT',), {}],
['sources', (['RIPE'],), {}],
['object_classes', (['mntner'],), {}],
['rpsl_pks', (['AS760-MNT', 'ACONET-LIR-MNT', 'ACONET2-LIR-MNT'],), {}],
]

def test_check_auth_invalid_update_mntner_submits_new_object_with_mixed_dummy_hash_real_hash(self, prepare_mocks):
mock_dq, mock_dh = prepare_mocks

mock_dh.execute_query = lambda query: [{'object_text': SAMPLE_MNTNER}]

reference_validator = ReferenceValidator(mock_dh)
auth_validator = AuthValidator(mock_dh)

# Submit the mntner with dummy password values as would be returned by queries,
# but a password attribute that is valid for the current DB object.
data = SAMPLE_MNTNER.replace('LEuuhsBJNFV0Q', PASSWORD_HASH_DUMMY_VALUE)
result_mntner = parse_update_requests(data + 'password: md5-password',
mock_dh, auth_validator, reference_validator)[0]
auth_validator.pre_approve([result_mntner])
result_mntner._check_auth()
assert not result_mntner.is_valid()
assert result_mntner.error_messages == [
'Either all password auth hashes in a submitted mntner must be dummy objects, or none.',
]

def test_check_auth_invalid_update_mntner_submits_new_object_with_dummy_hash_multiple_passwords(self, prepare_mocks):
mock_dq, mock_dh = prepare_mocks

mock_dh.execute_query = lambda query: [{'object_text': SAMPLE_MNTNER}]

reference_validator = ReferenceValidator(mock_dh)
auth_validator = AuthValidator(mock_dh)

# Submit the mntner with dummy password values as would be returned by queries,
# but multiple password attributes, which means we wouldn't know which password to set.
data = SAMPLE_MNTNER.replace('LEuuhsBJNFV0Q', PASSWORD_HASH_DUMMY_VALUE)
data = data.replace('$1$fgW84Y9r$kKEn9MUq8PChNKpQhO6BM.', PASSWORD_HASH_DUMMY_VALUE)
result_mntner = parse_update_requests(data + 'password: md5-password\npassword: other-password',
mock_dh, auth_validator, reference_validator)[0]
auth_validator.pre_approve([result_mntner])
result_mntner._check_auth()
assert not result_mntner.is_valid()
assert result_mntner.error_messages == [
'Object submitted with dummy hash values, but multiple passwords submitted. Either submit full hashes, or a single password.'
]

def test_check_auth_invalid_update_mntner_wrong_password_current_db_object(self, prepare_mocks):
mock_dq, mock_dh = prepare_mocks

Expand All @@ -380,18 +479,17 @@ def test_check_auth_invalid_update_mntner_wrong_password_current_db_object(self,
'ACONET-LIR-MNT, ACONET2-LIR-MNT'
]
assert flatten_mock_calls(mock_dq) == [
['sources', (['RIPE'],), {}],
['object_classes', (['mntner'],), {}],
['rpsl_pk', ('AS760-MNT',), {}],
['sources', (['RIPE'],), {}],
['object_classes', (['mntner'],), {}],
['sources', (['RIPE'],), {}], ['object_classes', (['mntner'],), {}], ['rpsl_pk', ('AS760-MNT',), {}],
['sources', (['RIPE'],), {}], ['object_classes', (['mntner'],), {}],
['rpsl_pks', (['AS760-MNT', 'ACONET-LIR-MNT', 'ACONET2-LIR-MNT'],), {}],
['sources', (['RIPE'],), {}], ['object_classes', (['mntner'],), {}],
['rpsl_pks', (['AS760-MNT', 'ACONET-LIR-MNT', 'ACONET2-LIR-MNT'],), {}]
]

def test_check_auth_invalid_create_with_incorrect_password_referenced_mntner(self, prepare_mocks):
mock_dq, mock_dh = prepare_mocks

query_results = iter([[{'object_text': SAMPLE_INETNUM}], [{'object_text': SAMPLE_MNTNER}]])
query_results = iter([[{'object_text': SAMPLE_INETNUM}], [{'object_text': SAMPLE_MNTNER}], []])
mock_dh.execute_query = lambda query: next(query_results)

reference_validator = ReferenceValidator(mock_dh)
Expand All @@ -403,12 +501,12 @@ def test_check_auth_invalid_create_with_incorrect_password_referenced_mntner(sel
assert 'Authorisation for inetnum 80.16.151.184 - 80.16.151.191 failed' in result_inetnum.error_messages[0]
assert 'one of: INTERB-MNT' in result_inetnum.error_messages[0]
assert flatten_mock_calls(mock_dq) == [
['sources', (['RIPE'],), {}],
['object_classes', (['inetnum'],), {}],
['sources', (['RIPE'],), {}], ['object_classes', (['inetnum'],), {}],
['rpsl_pk', ('80.16.151.184 - 80.16.151.191',), {}],
['sources', (['RIPE'],), {}],
['object_classes', (['mntner'],), {}],
['rpsl_pks', (['INTERB-MNT'],), {}]
['object_classes', (['mntner'],), {}], ['rpsl_pks', (['INTERB-MNT'],), {}],
['sources', (['RIPE'],), {}],
['object_classes', (['mntner'],), {}], ['rpsl_pks', (['INTERB-MNT'],), {}]
]

def test_check_auth_invalid_update_with_incorrect_password_referenced_mntner(self, prepare_mocks):
Expand Down
Loading

0 comments on commit cfe146c

Please sign in to comment.