Skip to content

Commit

Permalink
Fix #74 - Accept dummy hashes in submissions
Browse files Browse the repository at this point in the history
Follows the implementation suggested in
#74 (comment)
including automatic upgrading of hashes to MD5-PW.
  • Loading branch information
mxsasha committed Aug 9, 2018
1 parent b824987 commit 305daa2
Show file tree
Hide file tree
Showing 7 changed files with 261 additions and 86 deletions.
55 changes: 49 additions & 6 deletions irrd/rpsl/rpsl_objects.py
@@ -1,9 +1,9 @@
from collections import OrderedDict
from typing import Set, List, Optional
from typing import Set, List, Optional, Union

import gnupg

from irrd.conf import get_setting
from irrd.conf import get_setting, PASSWORD_HASH_DUMMY_VALUE
from .config import PASSWORD_HASHERS
from .fields import (RPSLTextField, RPSLIPv4PrefixField, RPSLIPv4PrefixesField, RPSLIPv6PrefixField,
RPSLIPv6PrefixesField, RPSLIPv4AddressRangeField, RPSLASNumberField, RPSLASBlockField,
Expand Down Expand Up @@ -298,25 +298,68 @@ class RPSLMntner(RPSLObject):
("source", RPSLGenericNameField()),
])

def clean(self):
"""Check whether either all hash values are dummy hashes, or none."""
if not super().clean():
return False # pragma: no cover

dummy_matches = [auth[1] == PASSWORD_HASH_DUMMY_VALUE for auth in self._auth_lines(True)]
if any(dummy_matches) and not all(dummy_matches):
self.messages.error('Either all password auth hashes in a submitted mntner must be dummy objects, or none.')

def verify_auth(self, passwords: List[str], keycert_obj_pk: Optional[str]=None) -> bool:
"""
Verify whether one of a given list of passwords matches
any of the auth hashes in this object, or match the
keycert object PK.
"""
for auth in self.parsed_data.get("auth", "").splitlines():
for auth in self.parsed_data.get('auth', '').splitlines():
if keycert_obj_pk and auth.upper() == keycert_obj_pk:
return True
if " " not in auth:
continue
scheme, hash = auth.split(" ", 1)
scheme, hash = auth.split(' ', 1)
hasher = PASSWORD_HASHERS.get(scheme.upper())
if hasher:
for password in passwords:
if hasher.verify(password, hash):
return True
try:
if hasher.verify(password, hash):
return True
except ValueError:
pass
return False

def has_dummy_auth_value(self) -> bool:
"""
Check whether this object has dummy auth hashes.
If clean() has returned successfully before, the answer from this method
means that either all or no hashes have dummy values.
"""
auth_values = [auth[1] for auth in self._auth_lines(password_hashes=True)]
return bool(auth_values) and all([value == PASSWORD_HASH_DUMMY_VALUE for value in auth_values])

def force_single_new_password(self, password) -> None:
"""
Overwrite all auth hashes with a single new hash for the provided password.
Retains other methods, i.e. PGPKEY.
"""
hash = 'MD5-PW ' + PASSWORD_HASHERS['MD5-PW'].hash(password)
auths = self._auth_lines(password_hashes=False)
auths.append(hash)
self._update_attribute_value("auth", auths)

def _auth_lines(self, password_hashes=True) -> List[Union[str, List[str]]]:
"""
Return a list of auth values in this object.
If password_hashes=False, returns only non-hash (i.e. PGPKEY) lines.
If password_hashes=True, returns a list of lists, each inner list containing
the hash method and the hash.
"""
lines = self.parsed_data.get("auth", "").splitlines()
if password_hashes is True:
return [auth.split(' ', 1) for auth in lines if ' ' in auth]
return [auth for auth in lines if ' ' not in auth]


class RPSLPeeringSet(RPSLObject):
fields = OrderedDict([
Expand Down
11 changes: 11 additions & 0 deletions irrd/rpsl/tests/test_rpsl_objects.py
Expand Up @@ -2,6 +2,7 @@
from IPy import IP
from pytest import raises

from irrd.conf import PASSWORD_HASH_DUMMY_VALUE
from irrd.utils.rpsl_samples import (object_sample_mapping, SAMPLE_MALFORMED_EMPTY_LINE, SAMPLE_MALFORMED_ATTRIBUTE_NAME,
SAMPLE_UNKNOWN_CLASS, SAMPLE_MISSING_MANDATORY_ATTRIBUTE, SAMPLE_MALFORMED_SOURCE,
SAMPLE_MALFORMED_PK, SAMPLE_UNKNOWN_ATTRIBUTE, SAMPLE_INVALID_MULTIPLE_ATTRIBUTE,
Expand Down Expand Up @@ -292,11 +293,21 @@ def test_parse(self):
rpsl_text = object_sample_mapping[RPSLMntner().rpsl_object_class]
obj = rpsl_object_from_text(rpsl_text)
assert obj.__class__ == RPSLMntner

assert not obj.messages.errors()
assert obj.pk() == "AS760-MNT"
assert obj.parsed_data["mnt-by"] == ['AS760-MNT', 'ACONET-LIR-MNT', 'ACONET2-LIR-MNT']
assert obj.render_rpsl_text() == rpsl_text

def test_parse_invalid_partial_dummy_hash(self):
rpsl_text = object_sample_mapping[RPSLMntner().rpsl_object_class]
rpsl_text = rpsl_text.replace('LEuuhsBJNFV0Q', PASSWORD_HASH_DUMMY_VALUE)
obj = rpsl_object_from_text(rpsl_text)
assert obj.__class__ == RPSLMntner
assert obj.messages.errors() == [
'Either all password auth hashes in a submitted mntner must be dummy objects, or none.'
]

@pytest.mark.usefixtures("tmp_gpg_dir") # noqa: F811
def test_verify(self, tmp_gpg_dir):
rpsl_text = object_sample_mapping[RPSLMntner().rpsl_object_class]
Expand Down
55 changes: 26 additions & 29 deletions irrd/updates/parser.py
Expand Up @@ -131,44 +131,41 @@ def validate(self) -> bool:
auth_valid = self._check_auth()
if not auth_valid:
return False
# For deletions, only references to the deleted object matter, as
# they now become invalid. For other operations, only the validity
# of references from this object to others matter.
if self.request_type == UpdateRequestType.DELETE:
references_valid = self._check_references_from_others()
else:
references_valid = self._check_references_to_others()
references_valid = self._check_references()
return references_valid

def _check_auth(self) -> bool:
assert self.rpsl_obj_new
auth_error_message = self.auth_validator.check_auth(self.rpsl_obj_new, self.rpsl_obj_current)
if auth_error_message:
self.status = UpdateRequestStatus.ERROR_AUTH
self.error_messages.append(auth_error_message)
return False
return True
auth_result = self.auth_validator.check_auth(self.rpsl_obj_new, self.rpsl_obj_current)
self.info_messages += auth_result.info_messages

def _check_references_to_others(self) -> bool:
"""Check all references from this object to other objects."""
assert self.rpsl_obj_new
references_error_messages = self.reference_validator.check_references_to_others(self.rpsl_obj_new)
self.error_messages += references_error_messages
if references_error_messages and self.is_valid():
self.status = UpdateRequestStatus.ERROR_REFERENCE
if not auth_result.is_valid():
self.status = UpdateRequestStatus.ERROR_AUTH
self.error_messages += auth_result.error_messages
return False
return True

def _check_references_from_others(self) -> bool:
assert self.rpsl_obj_current
references = self.reference_validator.check_references_from_others(self.rpsl_obj_current)
for ref_object_class, ref_pk, ref_source in references:
self.error_messages.append(f'Object {self.rpsl_obj_current.pk()} to be deleted, but still referenced '
f'by {ref_object_class} {ref_pk}')
def _check_references(self) -> bool:
"""
Check all references from this object to or from other objects.
if self.error_messages and self.is_valid():
self.status = UpdateRequestStatus.ERROR_REFERENCE
return False
For deletions, only references to the deleted object matter, as
they now become invalid. For other operations, only the validity
of references from the new object to others matter.
"""
if self.request_type == UpdateRequestType.DELETE and self.rpsl_obj_current is not None:
assert self.rpsl_obj_new
references_result = self.reference_validator.check_references_from_others(self.rpsl_obj_new)
else:
assert self.rpsl_obj_new
references_result = self.reference_validator.check_references_to_others(self.rpsl_obj_new)
self.info_messages += references_result.info_messages

if not references_result.is_valid():
self.error_messages += references_result.error_messages
if self.is_valid(): # Only update the status if this object was valid prior, so this is the first failure
self.status = UpdateRequestStatus.ERROR_REFERENCE
return False
return True


Expand Down
2 changes: 2 additions & 0 deletions irrd/updates/tests/test_handler.py
Expand Up @@ -236,6 +236,7 @@ def test_parse_invalid_new_objects_pgp_key_does_not_exist(self, prepare_mocks):
[{'object_text': mntner_text}],
[{'object_text': mntner_text}],
[],
[],
])
mock_dh.execute_query = lambda query: next(query_responses)

Expand All @@ -248,6 +249,7 @@ def test_parse_invalid_new_objects_pgp_key_does_not_exist(self, prepare_mocks):
['sources', (['RIPE'],), {}], ['object_classes', (['mntner'],), {}], ['rpsl_pk', ('AS760-MNT',), {}],
['sources', (['RIPE'],), {}], ['object_classes', (['mntner'],), {}], ['rpsl_pks', (['AS760-MNT'],), {}],
['sources', (['RIPE'],), {}], ['object_classes', (['mntner'],), {}], ['rpsl_pks', (['AS760-MNT'],), {}],
['sources', (['RIPE'],), {}], ['object_classes', (['mntner'],), {}], ['rpsl_pks', (['AS760-MNT'],), {}]
]

assert mock_dh.mock_calls[0][0] == 'commit'
Expand Down

0 comments on commit 305daa2

Please sign in to comment.