diff --git a/iota/crypto/pycurl.py b/iota/crypto/pycurl.py index ce2337e6..3ed0ee23 100644 --- a/iota/crypto/pycurl.py +++ b/iota/crypto/pycurl.py @@ -4,6 +4,8 @@ from typing import List, MutableSequence, Optional, Sequence +from iota.exceptions import with_context + __all__ = [ 'Curl', 'HASH_LENGTH', @@ -59,16 +61,35 @@ def reset(self): """ self._state = [0] * STATE_LENGTH # type: List[int] - def absorb(self, trits): - # type: (Sequence[int], Optional[int]) -> None + def absorb(self, trits, offset=0, length=None): + # type: (Sequence[int], Optional[int], Optional[int]) -> None """ Absorb trits into the sponge. :param trits: Sequence of trits to absorb. + + :param offset: + Starting offset in ``trits``. + + :param length: + Number of trits to absorb. Defaults to ``len(trits)``. """ - length = len(trits) - offset = 0 + pad = ((len(trits) % HASH_LENGTH) or HASH_LENGTH) + trits += [0] * (HASH_LENGTH - pad) + + if length is None: + length = len(trits) + + if length < 1: + raise with_context( + exc = ValueError('Invalid length passed to ``absorb``.'), + context = { + 'trits': trits, + 'offset': offset, + 'length': length, + }, + ) # Copy trits from ``trits`` into internal state, one hash at a # time, transforming internal state in between hashes. @@ -92,14 +113,20 @@ def absorb(self, trits): # Move on to the next hash. offset += HASH_LENGTH - def squeeze(self, trits): - # type: (MutableSequence[int]) -> None + def squeeze(self, trits, offset=0, length=HASH_LENGTH): + # type: (MutableSequence[int], Optional[int], Optional[int]) -> None """ Squeeze trits from the sponge. :param trits: Sequence that the squeezed trits will be copied to. Note: this object will be modified! + + :param offset: + Starting offset in ``trits``. + + :param length: + Number of trits to squeeze, default to ``HASH_LENGTH`` """ # # Squeeze is kind of like the opposite of absorb; it copies trits @@ -110,14 +137,39 @@ def squeeze(self, trits): # can simplify the implementation somewhat. # - # Ensure that ``trits`` can hold at least one hash worth of trits. - trits.extend([0] * max(0, HASH_LENGTH - len(trits))) + # Ensure length can be mod by HASH_LENGTH + if length % HASH_LENGTH != 0: + raise with_context( + exc = ValueError('Invalid length passed to ``sequeeze`.'), + context = { + 'trits': trits, + 'offset': offset, + 'length': length, + }) - # Copy exactly one hash. - trits[0:HASH_LENGTH] = self._state[0:HASH_LENGTH] + # Ensure that ``trits`` can hold at least one hash worth of trits. + trits.extend([0] * max(0, length - len(trits))) + + # Check trits with offset can handle hash length + if len(trits) - offset < HASH_LENGTH: + raise with_context( + exc = ValueError('Invalid offset passed to ``squeeze``.'), + context = { + 'trits': trits, + 'offset': offset, + 'length': length + }, + ) + + while length >= HASH_LENGTH: + # Copy exactly one hash. + trits[offset:offset + HASH_LENGTH] = self._state[0:HASH_LENGTH] + + # One hash worth of trits copied; now transform. + self._transform() - # One hash worth of trits copied; now transform. - self._transform() + offset += HASH_LENGTH + length -= HASH_LENGTH def _transform(self): # type: () -> None diff --git a/test/crypto/pycurl_test.py b/test/crypto/pycurl_test.py new file mode 100644 index 00000000..df3c5b04 --- /dev/null +++ b/test/crypto/pycurl_test.py @@ -0,0 +1,165 @@ +# coding=utf-8 +from __future__ import absolute_import, division, print_function, \ + unicode_literals + +from unittest import TestCase + +from iota import TryteString +from iota.crypto import Curl + + +class TestCurl(TestCase): + """ + This is the test case for CURL-P hash function used in IOTA + """ + def test_correct_first(self): + """Test the inp tryte string will get the correct output""" + inp = ( + 'EMIDYNHBWMBCXVDEFOFWINXTERALUKYYPPHKP9JJ' + 'FGJEIUY9MUDVNFZHMMWZUYUSWAIOWEVTHNWMHANBH' + ) + + trits = TryteString(inp).as_trits() + + curl = Curl() + curl.absorb(trits) + trits_out = [] + curl.squeeze(trits_out) + + trits_out = TryteString.from_trits(trits_out) + + self.assertEqual( + trits_out, + 'AQBOPUMJMGVHFOXSMUAGZNACKUTISDPBSILMRAGIG' + 'RXXS9JJTLIKZUW9BCJWKSTFBDSBLNVEEGVGAMSSM') + + def test_input_length_greater_than_243(self): + """Test input trytes length greater than hash length should work""" + inp = ( + 'G9JYBOMPUXHYHKSNRNMMSSZCSHOFYOYNZRSZMAAYWDYEIMVVOGKPJB' + 'VBM9TDPULSFUNMTVXRKFIDOHUXXVYDLFSZYZTWQYTE9SPYYWYTXJYQ' + '9IFGYOLZXWZBKWZN9QOOTBQMWMUBLEWUEEASRHRTNIQWJQNDWRYLCA' + ) + + trits = TryteString(inp).as_trits() + + curl = Curl() + curl.absorb(trits) + trits_out = [] + curl.squeeze(trits_out) + + trits_out = TryteString.from_trits(trits_out) + + self.assertEqual( + trits_out, + 'RWCBOLRFANOAYQWXXTFQJYQFAUTEEBSZWTIRSSDR' + 'EYGCNFRLHQVDZXYXSJKCQFQLJMMRHYAZKRRLQZDKR') + + def test_input_without_offset(self): + """Test input without offset should work""" + inp = ( + 'G9JYBOMPUXHYHKSNRNMMSSZCSHOFYOYNZRSZMAAYWDYEIMVVOGKPJB' + 'VBM9TDPULSFUNMTVXRKFIDOHUXXVYDLFSZYZTWQYTE9SPYYWYTXJYQ' + '9IFGYOLZXWZBKWZN9QOOTBQMWMUBLEWUEEASRHRTNIQWJQNDWRYLCA' + ) + + trits = TryteString(inp).as_trits() + + curl = Curl() + curl.absorb(trits, 0, length=486) + curl.absorb(trits, 0, length=243) + trits_out = [] + curl.squeeze(trits_out) + + trits_out = TryteString.from_trits(trits_out) + + self.assertEqual( + trits_out, + 'OTYHXEXJLCSMEY9LYCC9ASJXMORTLAYQEHRS9DAH' + '9NR9DXLXYDGOVOBEL9LWRITLWPHPYPZDKXVPAPKUA') + + def test_input_with_offset(self): + """Test input with offset should work""" + inp = ( + 'G9JYBOMPUXHYHKSNRNMMSSZCSHOFYOYNZRSZMAAYWDYEIMVVOGKPJB' + 'VBM9TDPULSFUNMTVXRKFIDOHUXXVYDLFSZYZTWQYTE9SPYYWYTXJYQ' + '9IFGYOLZXWZBKWZN9QOOTBQMWMUBLEWUEEASRHRTNIQWJQNDWRYLCA' + ) + + trits = TryteString(inp).as_trits() + + curl = Curl() + curl.absorb(trits, 243, length=486) + curl.absorb(trits, 0, length=243) + trits_out = [] + curl.squeeze(trits_out) + + trits_out = TryteString.from_trits(trits_out) + + self.assertEqual( + trits_out, + 'ZWNF9YOCAKC9CXQFYZDKXSSAZOCAZLEVEB9OZDJQG' + 'WEULHUDY9RAWAT9GIUXTTUSYJEGNGQDVJCGTQLN9') + + def test_squeeze_with_offset(self): + """Test squeeze with offset, this only used in ISS + GitHub IRI ISS: https://github.com/iotaledger/iri/blob/dev/src/main/java/com/iota/iri/hash/ISS.java#L83 + """ + inp = ( + 'CDLFODMOGMQAWXDURDXTUAOO9BFESHYGZLBUWIIHPTLNZCUNHZAAXSUPUIBW' + 'IRLOVKCVWJSWEKRJQZUVRDZGZRNANUNCSGANCJWVHMZMVNJVUAZNFZKDAIVV' + 'LSMIM9SVGUHYECTGGIXTAMXXO9FIXUMQFZCGRQWAOWJPBTXNNQIRSTZEEAJV' + 'FSXWTHWBQJCWQNYYMHSPCYRA99ITVILYJPMFGOGOUOZUVABK9HMGABSORCVD' + 'FNGLMPJ9NFKBWCZMFPIWEAGRWPRNLLG9VYUUVLCTEWKGWQIRIJKERZWC9LVR' + 'XJEXNHBNUGEGGLMWGERKYFB9YEZCLXLKKMCGLRKQOGASDOUDYEDJLMV9BHPG' + 'GCXQIUVUOFFXKEIIINLVWLRYHHLKXPLSTWKIKNEJWEDFQQFXQVEHGRCIJC9T' + 'GVQNPPKGCFGPJNWSCPQZDDSIGAVZEIVYJDVPUOCTEMKTZFGXNGPQCOIBD9MX' + 'YTHJTX' + ) + + + d = [0] * 243 + trits = TryteString(inp).as_trits() + curl = Curl() + + for i in range(6): + curl.reset() + curl.absorb(trits, i * 243, (i + 1) * 243) + curl.squeeze(trits, i * 243) + + curl.reset() + curl.absorb(trits) + curl.squeeze(d) + + trits_out = TryteString.from_trits(d) + + self.assertEqual( + trits_out, + 'TAWDGNSEAD9ZRGBBVRVEKQYYVDOKHYQ9KEIYJKFT' + 'BQEYZDWZVMRFJQQGTMPHBZOGPIJCCVWLZVDKLAQVI') + + def test_squeeze_with_486_length_should_work(self): + """ + Test squeeze with 486 length should work as well, no one use this + in real situation + """ + inp = ( + 'EMIDYNHBWMBCXVDEFOFWINXTERALUKYYPPHKP9JJ' + 'FGJEIUY9MUDVNFZHMMWZUYUSWAIOWEVTHNWMHANBH' + ) + + trits = TryteString(inp).as_trits() + + curl = Curl() + curl.absorb(trits) + trits_out = [] + curl.squeeze(trits_out, length=486) + + trits_out = TryteString.from_trits(trits_out) + + self.assertEqual( + trits_out, + 'AQBOPUMJMGVHFOXSMUAGZNACKUTISDPBSILMRAGIG' + 'RXXS9JJTLIKZUW9BCJWKSTFBDSBLNVEEGVGAMSSMQ' + 'GSJWCCFQRHWKTSMVPWWCEGOMCNWFYWDZBEDBLXIFB' + 'HOTCKUMCANLSXXTNKSYNBMOSDDEYFTDOYIKDRJM')