Skip to content
This repository was archived by the owner on Jan 13, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 64 additions & 12 deletions iota/crypto/pycurl.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

from typing import List, MutableSequence, Optional, Sequence

from iota.exceptions import with_context

__all__ = [
'Curl',
'HASH_LENGTH',
Expand Down Expand Up @@ -59,16 +61,35 @@ def reset(self):
"""
self._state = [0] * STATE_LENGTH # type: List[int]

def absorb(self, trits):
# type: (Sequence[int], Optional[int]) -> None
def absorb(self, trits, offset=0, length=None):
# type: (Sequence[int], Optional[int], Optional[int]) -> None
"""
Absorb trits into the sponge.

:param trits:
Sequence of trits to absorb.

:param offset:
Starting offset in ``trits``.

:param length:
Number of trits to absorb. Defaults to ``len(trits)``.
"""
length = len(trits)
offset = 0
pad = ((len(trits) % HASH_LENGTH) or HASH_LENGTH)
trits += [0] * (HASH_LENGTH - pad)

if length is None:
length = len(trits)

if length < 1:
raise with_context(
exc = ValueError('Invalid length passed to ``absorb``.'),
context = {
'trits': trits,
'offset': offset,
'length': length,
},
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


# Copy trits from ``trits`` into internal state, one hash at a
# time, transforming internal state in between hashes.
Expand All @@ -92,14 +113,20 @@ def absorb(self, trits):
# Move on to the next hash.
offset += HASH_LENGTH

def squeeze(self, trits):
# type: (MutableSequence[int]) -> None
def squeeze(self, trits, offset=0, length=HASH_LENGTH):
# type: (MutableSequence[int], Optional[int], Optional[int]) -> None
"""
Squeeze trits from the sponge.

:param trits:
Sequence that the squeezed trits will be copied to.
Note: this object will be modified!

:param offset:
Starting offset in ``trits``.

:param length:
Number of trits to squeeze, default to ``HASH_LENGTH``
"""
#
# Squeeze is kind of like the opposite of absorb; it copies trits
Expand All @@ -110,14 +137,39 @@ def squeeze(self, trits):
# can simplify the implementation somewhat.
#

# Ensure that ``trits`` can hold at least one hash worth of trits.
trits.extend([0] * max(0, HASH_LENGTH - len(trits)))
# Ensure length can be mod by HASH_LENGTH
if length % HASH_LENGTH != 0:
raise with_context(
exc = ValueError('Invalid length passed to ``sequeeze`.'),
context = {
'trits': trits,
'offset': offset,
'length': length,
})

# Copy exactly one hash.
trits[0:HASH_LENGTH] = self._state[0:HASH_LENGTH]
# Ensure that ``trits`` can hold at least one hash worth of trits.
trits.extend([0] * max(0, length - len(trits)))

# Check trits with offset can handle hash length
if len(trits) - offset < HASH_LENGTH:
raise with_context(
exc = ValueError('Invalid offset passed to ``squeeze``.'),
context = {
'trits': trits,
'offset': offset,
'length': length
},
)

while length >= HASH_LENGTH:
# Copy exactly one hash.
trits[offset:offset + HASH_LENGTH] = self._state[0:HASH_LENGTH]

# One hash worth of trits copied; now transform.
self._transform()

# One hash worth of trits copied; now transform.
self._transform()
offset += HASH_LENGTH
length -= HASH_LENGTH

def _transform(self):
# type: () -> None
Expand Down
165 changes: 165 additions & 0 deletions test/crypto/pycurl_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
# coding=utf-8
from __future__ import absolute_import, division, print_function, \
unicode_literals

from unittest import TestCase

from iota import TryteString
from iota.crypto import Curl


class TestCurl(TestCase):
"""
This is the test case for CURL-P hash function used in IOTA
"""
def test_correct_first(self):
"""Test the inp tryte string will get the correct output"""
inp = (
'EMIDYNHBWMBCXVDEFOFWINXTERALUKYYPPHKP9JJ'
'FGJEIUY9MUDVNFZHMMWZUYUSWAIOWEVTHNWMHANBH'
)

trits = TryteString(inp).as_trits()

curl = Curl()
curl.absorb(trits)
trits_out = []
curl.squeeze(trits_out)

trits_out = TryteString.from_trits(trits_out)

self.assertEqual(
trits_out,
'AQBOPUMJMGVHFOXSMUAGZNACKUTISDPBSILMRAGIG'
'RXXS9JJTLIKZUW9BCJWKSTFBDSBLNVEEGVGAMSSM')

def test_input_length_greater_than_243(self):
"""Test input trytes length greater than hash length should work"""
inp = (
'G9JYBOMPUXHYHKSNRNMMSSZCSHOFYOYNZRSZMAAYWDYEIMVVOGKPJB'
'VBM9TDPULSFUNMTVXRKFIDOHUXXVYDLFSZYZTWQYTE9SPYYWYTXJYQ'
'9IFGYOLZXWZBKWZN9QOOTBQMWMUBLEWUEEASRHRTNIQWJQNDWRYLCA'
)

trits = TryteString(inp).as_trits()

curl = Curl()
curl.absorb(trits)
trits_out = []
curl.squeeze(trits_out)

trits_out = TryteString.from_trits(trits_out)

self.assertEqual(
trits_out,
'RWCBOLRFANOAYQWXXTFQJYQFAUTEEBSZWTIRSSDR'
'EYGCNFRLHQVDZXYXSJKCQFQLJMMRHYAZKRRLQZDKR')

def test_input_without_offset(self):
"""Test input without offset should work"""
inp = (
'G9JYBOMPUXHYHKSNRNMMSSZCSHOFYOYNZRSZMAAYWDYEIMVVOGKPJB'
'VBM9TDPULSFUNMTVXRKFIDOHUXXVYDLFSZYZTWQYTE9SPYYWYTXJYQ'
'9IFGYOLZXWZBKWZN9QOOTBQMWMUBLEWUEEASRHRTNIQWJQNDWRYLCA'
)

trits = TryteString(inp).as_trits()

curl = Curl()
curl.absorb(trits, 0, length=486)
curl.absorb(trits, 0, length=243)
trits_out = []
curl.squeeze(trits_out)

trits_out = TryteString.from_trits(trits_out)

self.assertEqual(
trits_out,
'OTYHXEXJLCSMEY9LYCC9ASJXMORTLAYQEHRS9DAH'
'9NR9DXLXYDGOVOBEL9LWRITLWPHPYPZDKXVPAPKUA')

def test_input_with_offset(self):
"""Test input with offset should work"""
inp = (
'G9JYBOMPUXHYHKSNRNMMSSZCSHOFYOYNZRSZMAAYWDYEIMVVOGKPJB'
'VBM9TDPULSFUNMTVXRKFIDOHUXXVYDLFSZYZTWQYTE9SPYYWYTXJYQ'
'9IFGYOLZXWZBKWZN9QOOTBQMWMUBLEWUEEASRHRTNIQWJQNDWRYLCA'
)

trits = TryteString(inp).as_trits()

curl = Curl()
curl.absorb(trits, 243, length=486)
curl.absorb(trits, 0, length=243)
trits_out = []
curl.squeeze(trits_out)

trits_out = TryteString.from_trits(trits_out)

self.assertEqual(
trits_out,
'ZWNF9YOCAKC9CXQFYZDKXSSAZOCAZLEVEB9OZDJQG'
'WEULHUDY9RAWAT9GIUXTTUSYJEGNGQDVJCGTQLN9')

def test_squeeze_with_offset(self):
"""Test squeeze with offset, this only used in ISS
GitHub IRI ISS: https://github.com/iotaledger/iri/blob/dev/src/main/java/com/iota/iri/hash/ISS.java#L83
"""
inp = (
'CDLFODMOGMQAWXDURDXTUAOO9BFESHYGZLBUWIIHPTLNZCUNHZAAXSUPUIBW'
'IRLOVKCVWJSWEKRJQZUVRDZGZRNANUNCSGANCJWVHMZMVNJVUAZNFZKDAIVV'
'LSMIM9SVGUHYECTGGIXTAMXXO9FIXUMQFZCGRQWAOWJPBTXNNQIRSTZEEAJV'
'FSXWTHWBQJCWQNYYMHSPCYRA99ITVILYJPMFGOGOUOZUVABK9HMGABSORCVD'
'FNGLMPJ9NFKBWCZMFPIWEAGRWPRNLLG9VYUUVLCTEWKGWQIRIJKERZWC9LVR'
'XJEXNHBNUGEGGLMWGERKYFB9YEZCLXLKKMCGLRKQOGASDOUDYEDJLMV9BHPG'
'GCXQIUVUOFFXKEIIINLVWLRYHHLKXPLSTWKIKNEJWEDFQQFXQVEHGRCIJC9T'
'GVQNPPKGCFGPJNWSCPQZDDSIGAVZEIVYJDVPUOCTEMKTZFGXNGPQCOIBD9MX'
'YTHJTX'
)


d = [0] * 243
trits = TryteString(inp).as_trits()
curl = Curl()

for i in range(6):
curl.reset()
curl.absorb(trits, i * 243, (i + 1) * 243)
curl.squeeze(trits, i * 243)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test looks very specific. Is it mimicking a real-world use case?

Unit tests also serve as documentation; if you add some comments explaining what real-world situation this test is simulating, it will help users to better understand what they can accomplish with the library.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only use in ISS and ISSInplace, add the source code link in docstring.


curl.reset()
curl.absorb(trits)
curl.squeeze(d)

trits_out = TryteString.from_trits(d)

self.assertEqual(
trits_out,
'TAWDGNSEAD9ZRGBBVRVEKQYYVDOKHYQ9KEIYJKFT'
'BQEYZDWZVMRFJQQGTMPHBZOGPIJCCVWLZVDKLAQVI')

def test_squeeze_with_486_length_should_work(self):
"""
Test squeeze with 486 length should work as well, no one use this
in real situation
"""
inp = (
'EMIDYNHBWMBCXVDEFOFWINXTERALUKYYPPHKP9JJ'
'FGJEIUY9MUDVNFZHMMWZUYUSWAIOWEVTHNWMHANBH'
)

trits = TryteString(inp).as_trits()

curl = Curl()
curl.absorb(trits)
trits_out = []
curl.squeeze(trits_out, length=486)

trits_out = TryteString.from_trits(trits_out)

self.assertEqual(
trits_out,
'AQBOPUMJMGVHFOXSMUAGZNACKUTISDPBSILMRAGIG'
'RXXS9JJTLIKZUW9BCJWKSTFBDSBLNVEEGVGAMSSMQ'
'GSJWCCFQRHWKTSMVPWWCEGOMCNWFYWDZBEDBLXIFB'
'HOTCKUMCANLSXXTNKSYNBMOSDDEYFTDOYIKDRJM')