Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 31 additions & 40 deletions onlykey/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from sys import platform

log = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)

DEVICE_IDS = [
(0x16C0, 0x0486), # OnlyKey
Expand Down Expand Up @@ -252,7 +251,6 @@ def send_message(self, payload=None, msg=None, slot_id=None, message_field=None)
raw_bytes += bytes([payload])
else:
raise Exception('`payload` must be either `str` or `list`, got `{}`'.format(type(payload)))

# Pad the ouput with 0s
while len(raw_bytes) < MAX_INPUT_REPORT_SIZE:
raw_bytes += bytes([0])
Expand All @@ -271,22 +269,19 @@ def send_large_message(self, payload=None, msg=None, slot_id=chr(101)):
for chunk in chunks:
# print chunk
# print [ord(c) for c in chunk]
current_payload = [255] # 255 means that it's not the last payload
current_payload = bytes([255]) # 255 means that it's not the last payload
# If it's less than the max size, set explicitely the size
if len(chunk) < 58:
current_payload = [len(chunk)]
current_payload = bytes([len(chunk)])

# Append the actual payload
if isinstance(chunk, list):
current_payload.extend(chunk)
current_payload += bytes(chunk)
else:
for c in chunk:
current_payload.append(ord(c))
current_payload += chunk

self.send_message(payload=current_payload, msg=msg)

return


def send_large_message2(self, payload=None, msg=None, slot_id=101):
"""Wrapper for sending large message (larger than 58 bytes) in batch in a transparent way."""
Expand All @@ -303,18 +298,16 @@ def send_large_message2(self, payload=None, msg=None, slot_id=101):
if len(chunk) < 57:
current_payload = [slot_id, len(chunk)]

current_payload = bytes(current_payload)

# Append the actual payload
if isinstance(chunk, list):
current_payload.extend(chunk)
current_payload += bytes(chunk)
else:
for c in chunk:
current_payload.append(c)
current_payload += chunk

self.send_message(payload=current_payload, msg=msg)

return



def send_large_message3(self, payload=None, msg=None, slot_id=101, key_type=1):
"""Wrapper for sending large message (larger than 58 bytes) in batch in a transparent way."""
Expand All @@ -324,35 +317,32 @@ def send_large_message3(self, payload=None, msg=None, slot_id=101, key_type=1):
# Split the payload in multiple chunks
chunks = [payload[x:x+MAX_LARGE_PAYLOAD_SIZE-1] for x in range(0, len(payload), 57)]
for chunk in chunks:
# print chunk
# print [ord(c) for c in chunk]
current_payload = [slot_id, key_type]
current_payload = bytes([slot_id, key_type])

# Append the actual payload
if isinstance(chunk, list):
current_payload.extend(chunk)
current_payload += bytes(chunk)
else:
for c in chunk:
current_payload.append(ord(c))
current_payload += chunk

self.send_message(payload=current_payload, msg=msg)

return
self.send_message(payload=current_payload, msg=msg)

def read_bytes(self, n=64, to_str=False, timeout_ms=100) -> bytes:
def read_bytes(self, n=64, to_str=False, timeout_ms=100):
"""Read n bytes and return an array of uint8 (int)."""
out = self._hid.read(n, timeout=timeout_ms)
log.debug('read="%s"', out.decode())
if to_str:
# Returns the bytes a string if requested
return out.decode("utf-8")
return out.hex()

# Returns the raw list
return out

def read_string(self, timeout_ms=100):
"""Read an ASCII string."""
return self.read_bytes(MAX_INPUT_REPORT_SIZE, timeout_ms=timeout_ms).decode("ascii")
return self.read_chunk(timeout_ms=timeout_ms).decode("ascii")

def read_chunk(self, timeout_ms=100):
return self.read_bytes(MAX_INPUT_REPORT_SIZE, timeout_ms=timeout_ms)

def getlabels(self):
"""Fetch the list of `Slot` from the OnlyKey.
Expand All @@ -377,13 +367,14 @@ def getkeylabels(self):
No need to read messages.
"""
self.send_message(msg=Message.OKGETLABELS, slot_id=107)
time.sleep(0)
slots = []
for _ in range(33):
data = self.read_string().split('|')
slot_number = ord(data[0])
data = self.read_chunk()
bef, _, aft = data.partition(b"|")
slot_number = bef[0]
if 25 <= slot_number <= 57:
slots.append(Slot(slot_number, label=data[1]))

return slots

def displaykeylabels(self):
Expand Down Expand Up @@ -568,14 +559,14 @@ def getpub(self):
ok_pubkey1 = ''
while ok_pubkey1 == '':
time.sleep(0.5)
ok_pubkey1= self.read_bytes(64, to_str=True)
ok_pubkey1= self.read_bytes(64)

print()
print('received=', repr(ok_pubkey1))

print('Trying to read the public RSA N part 2...')
for _ in range(10):
ok_pubkey2 = self.read_bytes(64, to_str=True)
ok_pubkey2 = self.read_bytes(64)
if len(ok_pubkey2) == 64:
break

Expand All @@ -584,7 +575,7 @@ def getpub(self):

print('Trying to read the public RSA N part 3...')
for _ in range(10):
ok_pubkey3 = self.read_bytes(64, to_str=True)
ok_pubkey3 = self.read_bytes(64)
if len(ok_pubkey3) == 64:
break

Expand All @@ -594,7 +585,7 @@ def getpub(self):

print('Trying to read the public RSA N part 4...')
for _ in range(10):
ok_pubkey4 = self.read_bytes(64, to_str=True)
ok_pubkey4 = self.read_bytes(64)
if len(ok_pubkey4) == 64:
break

Expand All @@ -604,7 +595,7 @@ def getpub(self):

print('Trying to read the public RSA N part 5...')
for _ in range(10):
ok_pubkey5 = self.read_bytes(64, to_str=True)
ok_pubkey5 = self.read_bytes(64)
if len(ok_pubkey5) == 64:
break

Expand All @@ -614,7 +605,7 @@ def getpub(self):

print('Trying to read the public RSA N part 6...')
for _ in range(10):
ok_pubkey6 = self.read_bytes(64, to_str=True)
ok_pubkey6 = self.read_bytes(64)
if len(ok_pubkey6) == 64:
break

Expand All @@ -624,7 +615,7 @@ def getpub(self):

print('Trying to read the public RSA N part 7...')
for _ in range(10):
ok_pubkey7 = self.read_bytes(64, to_str=True)
ok_pubkey7 = self.read_bytes(64)
if len(ok_pubkey7) == 64:
break

Expand All @@ -634,7 +625,7 @@ def getpub(self):
print('Trying to read the public RSA N part 8...')

for _ in range(10):
ok_pubkey8 = self.read_bytes(64, to_str=True)
ok_pubkey8 = self.read_bytes(64)
if len(ok_pubkey8) == 64:
break

Expand All @@ -647,7 +638,7 @@ def getpub(self):

print('Received Public Key generated by OnlyKey')
ok_pubkey = ok_pubkey1 + ok_pubkey2 + ok_pubkey3 + ok_pubkey4 + ok_pubkey5 + ok_pubkey6 + ok_pubkey7 + ok_pubkey8
print('Public N='), repr(ok_pubkey)
print('Public N=', repr(ok_pubkey))
print()

print('Key Size =', len(ok_pubkey))
Expand Down
10 changes: 6 additions & 4 deletions tests/PGP_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import pgpy
from pgpy import PGPKey
from pgpy.constants import PubKeyAlgorithm, KeyFlags, HashAlgorithm, SymmetricKeyAlgorithm, CompressionAlgorithm, ok
from pgpy.constants import PubKeyAlgorithm, KeyFlags, HashAlgorithm, SymmetricKeyAlgorithm, CompressionAlgorithm
from pgpy.packet.fields import RSAPub,MPI,RSAPriv
from pgpy.packet.packets import PubKeyV4,PrivKeyV4

Expand All @@ -18,6 +18,8 @@

from onlykey import OnlyKey, Message

ok = OnlyKey()

def custRSAPub(n,e):
res = RSAPriv()
res.n = MPI(n)
Expand Down Expand Up @@ -63,8 +65,8 @@ def makekey():
# key_expires=timedelta(days=365))
#p = n[:(len(n)/2)]
#q = n[(len(n)/2):]
n = n.encode("HEX")
N = long(n, 16)
n = n.hex()
N = int(n, 16)
#p = p.encode("HEX")
#p = long(p, 16)
#q = q.encode("HEX")
Expand Down Expand Up @@ -138,7 +140,7 @@ def makekey():
print('Type or paste the text message, press return to go to new line, and then press Ctrl+D or Ctrl+Z (Windows only)')
print()
msg_blob = sys.stdin.read()
message_from_blob = priv_key.sign2(msg_blob)
message_from_blob = priv_key.sign(msg_blob)
print('Encoded Signed Message =')
print('-----BEGIN PGP SIGNED MESSAGE-----')
print('Hash: SHA256')
Expand Down
6 changes: 3 additions & 3 deletions tests/ecdh_curve25519.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@


message = 'Secret Message'
counter = "\x00\x00\x00\x01"
counter = b"\x00\x00\x00\x01"
shared_secret = curve.calculateAgreement(alice_private_key, bob_public_key)
h = hashlib.sha256()
h.update(counter.encode())
h.update(counter)
h.update(shared_secret)
h.update(message.encode())
d = h.digest()
Expand Down Expand Up @@ -159,7 +159,7 @@ def get_button(ibyte):
ok_shared_secret = ''
while ok_shared_secret == '':
time.sleep(0.5)
ok_shared_secret = ok.read_bytes(len(shared_secret), to_str=True)
ok_shared_secret = ok.read_bytes(64, to_str=True)

print('OnlyKey Shared Secret =', repr(ok_shared_secret))

Expand Down
20 changes: 10 additions & 10 deletions tests/rsa_decrypt_1024.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def pack_long(n):
it seems to be want you wanted? it's 64 bytes.
"""
h = '%x' % n
s = ('0'*(len(h) % 2) + h).decode('hex')
s = bytes.fromhex('0'*(len(h) % 2) + h)
return s

def bin2hex(binStr):
Expand All @@ -61,15 +61,15 @@ def bin2hex(binStr):
def hex2bin(hexStr):
return binascii.unhexlify(hexStr)

hexPrivKey = bin2hex(binPrivKey)
hexPubKey = bin2hex(binPubKey)
hexPrivKey = binPrivKey.hex()
hexPubKey = binPubKey.hex()

# p and q are long ints that are no more than 1/2 the size of pubkey
# I need to convert these into a single byte array put p in the first
# half byte[0] of the byte array and q in the second half byte[(type*128) / 2]
# send the byte array to OnlyKey splitting into 56 bytes per packet
q_and_p = pack_long(q) + pack_long(p)
public_n = pack_long(n)
q_and_p = q.to_bytes(64, "little") + p.to_bytes(64, "little")
public_n = n.to_bytes(128, "little")
#
ok.send_large_message3(msg=Message.OKSETPRIV, slot_id=1, key_type=(1+32), payload=q_and_p)

Expand All @@ -90,10 +90,10 @@ def hex2bin(hexStr):
print()

print('Trying to read the public RSA N part 1...')
ok.send_message(msg=Message.OKGETPUBKEY, payload=chr(1)) #, payload=[1, 1])
ok.send_message(msg=Message.OKGETPUBKEY, payload=bytes([1,1])) #, payload=[1, 1])
time.sleep(1.5)
for _ in xrange(10):
ok_pubkey1 = ok.read_bytes(64, to_str=True, timeout_ms=1000)
for _ in range(10):
ok_pubkey1 = ok.read_bytes(64, timeout_ms=1000)
if len(ok_pubkey1) == 64:
break
time.sleep(1)
Expand All @@ -103,8 +103,8 @@ def hex2bin(hexStr):
print('received=', repr(ok_pubkey1))

print('Trying to read the public RSA N part 2...')
for _ in xrange(10):
ok_pubkey2 = ok.read_bytes(64, to_str=True, timeout_ms=1000)
for _ in range(10):
ok_pubkey2 = ok.read_bytes(64, timeout_ms=1000)
if len(ok_pubkey2) == 64:
break
time.sleep(1)
Expand Down
2 changes: 1 addition & 1 deletion tests/rsa_decrypt_3072.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def pack_long(n):
it seems to be want you wanted? it's 64 bytes.
"""
h = '%x' % n
s = ('0'*(len(h) % 2) + h).decode('hex')
s = bytes.fromhex('0'*(len(h) % 2) + h)
return s

def bin2hex(binStr):
Expand Down
8 changes: 4 additions & 4 deletions tests/ssh_auth_rsa_1024.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,16 +187,16 @@ def bytes2int(str):
print('local messege to sign=', repr(h.hexdigest()))
verifier = PKCS1_v1_5.new(key)
if verifier.verify(h, signature):
print "The local signature is authentic."
print("The local signature is authentic.")
else:
print "The local signature is not authentic."
print("The local signature is not authentic.")

print('OnlyKey messege to sign=', repr(test_payload2))
verifier = PKCS1_v1_5.new(key)
if verifier.verify(h, ok_signature):
print "The OnlyKey signature is authentic."
print("The OnlyKey signature is authentic.")
else:
print "The OnlyKey signature is not authentic."
print("The OnlyKey signature is not authentic.")



Expand Down