This repository was archived by the owner on Jan 13, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 125
Expand file tree
/
Copy pathpykerl.py
More file actions
140 lines (103 loc) · 3.72 KB
/
pykerl.py
File metadata and controls
140 lines (103 loc) · 3.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# coding=utf-8
from __future__ import absolute_import, division, print_function, \
unicode_literals
from sha3 import keccak_384
from six import PY2
from typing import MutableSequence, Optional
from iota.crypto.kerl import conv
from iota.exceptions import with_context
__all__ = [
'Kerl',
]
BYTE_HASH_LENGTH = 48
TRIT_HASH_LENGTH = 243
class Kerl(object):
k = None # type: keccak_384
def __init__(self):
self.reset()
def absorb(self, trits, offset=0, length=None):
# type: (MutableSequence[int], int, Optional[int]) -> None
"""
Absorb trits into the sponge from a buffer.
:param trits:
Buffer that contains the trits to absorb.
:param offset:
Starting offset in ``trits``.
:param length:
Number of trits to absorb. Defaults to ``len(trits)``.
"""
# Pad input if necessary, so that it can be divided evenly into
# hashes.
# Note that this operation creates a COPY of ``trits``; the
# incoming buffer is not modified!
pad = ((len(trits) % TRIT_HASH_LENGTH) or TRIT_HASH_LENGTH)
trits += [0] * (TRIT_HASH_LENGTH - pad)
if length is None:
length = len(trits)
if length < 1:
raise with_context(
exc = ValueError('Invalid length passed to ``absorb``.'),
context = {
'trits': trits,
'offset': offset,
'length': length,
},
)
while offset < length:
stop = min(offset + TRIT_HASH_LENGTH, length)
# If we're copying over a full chunk, zero last trit
if stop - offset == TRIT_HASH_LENGTH:
trits[stop - 1] = 0
signed_nums = conv.convertToBytes(trits[offset:stop])
# Convert signed bytes into their equivalent unsigned representation
# In order to use Python's built-in bytes type
unsigned_bytes = bytearray(conv.convert_sign(b) for b in signed_nums)
self.k.update(unsigned_bytes)
offset += TRIT_HASH_LENGTH
def squeeze(self, trits, offset=0, length=None):
# type: (MutableSequence[int], int, Optional[int]) -> None
"""
Squeeze trits from the sponge into a buffer.
:param trits:
Buffer that will hold the squeezed trits.
IMPORTANT: If ``trits`` is too small, it will be extended!
:param offset:
Starting offset in ``trits``.
:param length:
Number of trits to squeeze from the sponge.
If not specified, defaults to :py:data:`TRIT_HASH_LENGTH` (i.e.,
by default, we will try to squeeze exactly 1 hash).
"""
# Pad input if necessary, so that it can be divided evenly into
# hashes.
pad = ((len(trits) % TRIT_HASH_LENGTH) or TRIT_HASH_LENGTH)
trits += [0] * (TRIT_HASH_LENGTH - pad)
if length is None:
# By default, we will try to squeeze one hash.
# Note that this is different than ``absorb``.
length = len(trits) or TRIT_HASH_LENGTH
if length < 1:
raise with_context(
exc = ValueError('Invalid length passed to ``squeeze``.'),
context = {
'trits': trits,
'offset': offset,
'length': length,
},
)
while offset < length:
unsigned_hash = self.k.digest()
if PY2:
unsigned_hash = map(ord, unsigned_hash) # type: ignore
signed_hash = [conv.convert_sign(b) for b in unsigned_hash]
trits_from_hash = conv.convertToTrits(signed_hash)
trits_from_hash[TRIT_HASH_LENGTH - 1] = 0
stop = min(TRIT_HASH_LENGTH, length-offset)
trits[offset:offset+stop] = trits_from_hash[0:stop]
flipped_bytes = bytearray(conv.convert_sign(~b) for b in unsigned_hash)
# Reset internal state before feeding back in
self.reset()
self.k.update(flipped_bytes)
offset += TRIT_HASH_LENGTH
def reset(self):
self.k = keccak_384()