-
Notifications
You must be signed in to change notification settings - Fork 3k
/
coldcard.py
625 lines (501 loc) · 23.4 KB
/
coldcard.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
#
# Coldcard Electrum plugin main code.
#
#
import os, time, io
import traceback
from typing import TYPE_CHECKING, Optional
import struct
from electrum import bip32
from electrum.bip32 import BIP32Node, InvalidMasterKeyVersionBytes
from electrum.i18n import _
from electrum.plugin import Device, hook, runs_in_hwd_thread
from electrum.keystore import Hardware_KeyStore, KeyStoreWithMPK
from electrum.transaction import PartialTransaction
from electrum.wallet import Standard_Wallet, Multisig_Wallet, Abstract_Wallet
from electrum.util import bfh, bh2u, versiontuple, UserFacingException
from electrum.base_wizard import ScriptTypeNotSupported
from electrum.logging import get_logger
from ..hw_wallet import HW_PluginBase, HardwareClientBase
from ..hw_wallet.plugin import LibraryFoundButUnusable, only_hook_if_libraries_available
_logger = get_logger(__name__)
try:
import hid
from ckcc.protocol import CCProtocolPacker, CCProtocolUnpacker
from ckcc.protocol import CCProtoError, CCUserRefused, CCBusyError
from ckcc.constants import (MAX_MSG_LEN, MAX_BLK_LEN, MSG_SIGNING_MAX_LENGTH, MAX_TXN_LEN,
AF_CLASSIC, AF_P2SH, AF_P2WPKH, AF_P2WSH, AF_P2WPKH_P2SH, AF_P2WSH_P2SH)
from ckcc.client import ColdcardDevice, COINKITE_VID, CKCC_PID, CKCC_SIMULATOR_PATH
requirements_ok = True
class ElectrumColdcardDevice(ColdcardDevice):
# avoid use of pycoin for MiTM message signature test
def mitm_verify(self, sig, expect_xpub):
# verify a signature (65 bytes) over the session key, using the master bip32 node
# - customized to use specific EC library of Electrum.
pubkey = BIP32Node.from_xkey(expect_xpub).eckey
return pubkey.verify_message_hash(sig[1:65], self.session_key)
except ImportError as e:
if not (isinstance(e, ModuleNotFoundError) and e.name == 'ckcc'):
_logger.exception('error importing coldcard plugin deps')
requirements_ok = False
COINKITE_VID = 0xd13e
CKCC_PID = 0xcc10
CKCC_SIMULATED_PID = CKCC_PID ^ 0x55aa
class CKCCClient(HardwareClientBase):
def __init__(self, plugin, handler, dev_path, *, is_simulator=False):
HardwareClientBase.__init__(self, plugin=plugin)
self.device = plugin.device
self.handler = handler
# if we know what the (xfp, xpub) "should be" then track it here
self._expected_device = None
if is_simulator:
self.dev = ElectrumColdcardDevice(dev_path, encrypt=True)
else:
# open the real HID device
hd = hid.device(path=dev_path)
hd.open_path(dev_path)
self.dev = ElectrumColdcardDevice(dev=hd, encrypt=True)
# NOTE: MiTM test is delayed until we have a hint as to what XPUB we
# should expect. It's also kinda slow.
def __repr__(self):
return '<CKCCClient: xfp=%s label=%r>' % (xfp2str(self.dev.master_fingerprint),
self.label())
@runs_in_hwd_thread
def verify_connection(self, expected_xfp: int, expected_xpub=None):
ex = (expected_xfp, expected_xpub)
if self._expected_device == ex:
# all is as expected
return
if expected_xpub is None:
expected_xpub = self.dev.master_xpub
if ((self._expected_device is not None)
or (self.dev.master_fingerprint != expected_xfp)
or (self.dev.master_xpub != expected_xpub)):
# probably indicating programing error, not hacking
_logger.info(f"xpubs. reported by device: {self.dev.master_xpub}. "
f"stored in file: {expected_xpub}")
raise RuntimeError("Expecting %s but that's not what's connected?!" %
xfp2str(expected_xfp))
# check signature over session key
# - mitm might have lied about xfp and xpub up to here
# - important that we use value capture at wallet creation time, not some value
# we read over USB today
self.dev.check_mitm(expected_xpub=expected_xpub)
self._expected_device = ex
if not getattr(self, 'ckcc_xpub', None):
self.ckcc_xpub = expected_xpub
_logger.info("Successfully verified against MiTM")
def is_pairable(self):
# can't do anything w/ devices that aren't setup (this code not normally reachable)
return bool(self.dev.master_xpub)
@runs_in_hwd_thread
def close(self):
# close the HID device (so can be reused)
self.dev.close()
self.dev = None
def is_initialized(self):
return bool(self.dev.master_xpub)
def label(self):
# 'label' of this Coldcard. Warning: gets saved into wallet file, which might
# not be encrypted, so better for privacy if based on xpub/fingerprint rather than
# USB serial number.
if self.dev.is_simulator:
lab = 'Coldcard Simulator ' + xfp2str(self.dev.master_fingerprint)
elif not self.dev.master_fingerprint:
# failback; not expected
lab = 'Coldcard #' + self.dev.serial
else:
lab = 'Coldcard ' + xfp2str(self.dev.master_fingerprint)
return lab
def manipulate_keystore_dict_during_wizard_setup(self, d: dict):
master_xpub = self.dev.master_xpub
if master_xpub is not None:
try:
node = BIP32Node.from_xkey(master_xpub)
except InvalidMasterKeyVersionBytes:
raise UserFacingException(
_('Invalid xpub magic. Make sure your {} device is set to the correct chain.').format(self.device) + ' ' +
_('You might have to unplug and plug it in again.')
) from None
d['ckcc_xpub'] = master_xpub
@runs_in_hwd_thread
def has_usable_connection_with_device(self):
# Do end-to-end ping test
try:
self.ping_check()
return True
except:
return False
@runs_in_hwd_thread
def get_xpub(self, bip32_path, xtype):
assert xtype in ColdcardPlugin.SUPPORTED_XTYPES
_logger.info('Derive xtype = %r' % xtype)
xpub = self.dev.send_recv(CCProtocolPacker.get_xpub(bip32_path), timeout=5000)
# TODO handle timeout?
# change type of xpub to the requested type
try:
node = BIP32Node.from_xkey(xpub)
except InvalidMasterKeyVersionBytes:
raise UserFacingException(_('Invalid xpub magic. Make sure your {} device is set to the correct chain.')
.format(self.device)) from None
if xtype != 'standard':
xpub = node._replace(xtype=xtype).to_xpub()
return xpub
@runs_in_hwd_thread
def ping_check(self):
# check connection is working
assert self.dev.session_key, 'not encrypted?'
req = b'1234 Electrum Plugin 4321' # free up to 59 bytes
try:
echo = self.dev.send_recv(CCProtocolPacker.ping(req))
assert echo == req
except:
raise RuntimeError("Communication trouble with Coldcard")
@runs_in_hwd_thread
def show_address(self, path, addr_fmt):
# prompt user w/ address, also returns it immediately.
return self.dev.send_recv(CCProtocolPacker.show_address(path, addr_fmt), timeout=None)
@runs_in_hwd_thread
def show_p2sh_address(self, *args, **kws):
# prompt user w/ p2sh address, also returns it immediately.
return self.dev.send_recv(CCProtocolPacker.show_p2sh_address(*args, **kws), timeout=None)
@runs_in_hwd_thread
def get_version(self):
# gives list of strings
return self.dev.send_recv(CCProtocolPacker.version(), timeout=1000).split('\n')
@runs_in_hwd_thread
def sign_message_start(self, path, msg):
# this starts the UX experience.
self.dev.send_recv(CCProtocolPacker.sign_message(msg, path), timeout=None)
@runs_in_hwd_thread
def sign_message_poll(self):
# poll device... if user has approved, will get tuple: (addr, sig) else None
return self.dev.send_recv(CCProtocolPacker.get_signed_msg(), timeout=None)
@runs_in_hwd_thread
def sign_transaction_start(self, raw_psbt: bytes, *, finalize: bool = False):
# Multiple steps to sign:
# - upload binary
# - start signing UX
# - wait for coldcard to complete process, or have it refused.
# - download resulting txn
assert 20 <= len(raw_psbt) < MAX_TXN_LEN, 'PSBT is too big'
dlen, chk = self.dev.upload_file(raw_psbt)
resp = self.dev.send_recv(CCProtocolPacker.sign_transaction(dlen, chk, finalize=finalize),
timeout=None)
if resp is not None:
raise ValueError(resp)
@runs_in_hwd_thread
def sign_transaction_poll(self):
# poll device... if user has approved, will get tuple: (legnth, checksum) else None
return self.dev.send_recv(CCProtocolPacker.get_signed_txn(), timeout=None)
@runs_in_hwd_thread
def download_file(self, length, checksum, file_number=1):
# get a file
return self.dev.download_file(length, checksum, file_number=file_number)
class Coldcard_KeyStore(Hardware_KeyStore):
hw_type = 'coldcard'
device = 'Coldcard'
plugin: 'ColdcardPlugin'
def __init__(self, d):
Hardware_KeyStore.__init__(self, d)
self.ux_busy = False
# we need to know at least the fingerprint of the master xpub to verify against MiTM
# - device reports these value during encryption setup process
# - full xpub value now optional
self.ckcc_xpub = d.get('ckcc_xpub', None)
def dump(self):
# our additions to the stored data about keystore -- only during creation?
d = Hardware_KeyStore.dump(self)
d['ckcc_xpub'] = self.ckcc_xpub
return d
def get_xfp_int(self) -> int:
xfp = self.get_root_fingerprint()
assert xfp is not None
return xfp_int_from_xfp_bytes(bfh(xfp))
def get_client(self, *args, **kwargs):
# called when user tries to do something like view address, sign somthing.
# - not called during probing/setup
# - will fail if indicated device can't produce the xpub (at derivation) expected
client = super().get_client(*args, **kwargs) # type: Optional[CKCCClient]
if client:
xfp_int = self.get_xfp_int()
client.verify_connection(xfp_int, self.ckcc_xpub)
return client
def give_error(self, message):
self.logger.info(message)
if not self.ux_busy:
self.handler.show_error(message)
else:
self.ux_busy = False
raise UserFacingException(message)
def wrap_busy(func):
# decorator: function takes over the UX on the device.
def wrapper(self, *args, **kwargs):
try:
self.ux_busy = True
return func(self, *args, **kwargs)
finally:
self.ux_busy = False
return wrapper
def decrypt_message(self, pubkey, message, password):
raise UserFacingException(_('Encryption and decryption are currently not supported for {}').format(self.device))
@wrap_busy
def sign_message(self, sequence, message, password, *, script_type=None):
# Sign a message on device. Since we have big screen, of course we
# have to show the message unabiguously there first!
try:
msg = message.encode('ascii', errors='strict')
assert 1 <= len(msg) <= MSG_SIGNING_MAX_LENGTH
except (UnicodeError, AssertionError):
# there are other restrictions on message content,
# but let the device enforce and report those
self.handler.show_error('Only short (%d max) ASCII messages can be signed.'
% MSG_SIGNING_MAX_LENGTH)
return b''
path = self.get_derivation_prefix() + ("/%d/%d" % sequence)
try:
cl = self.get_client()
try:
self.handler.show_message("Signing message (using %s)..." % path)
cl.sign_message_start(path, msg)
while 1:
# How to kill some time, without locking UI?
time.sleep(0.250)
resp = cl.sign_message_poll()
if resp is not None:
break
finally:
self.handler.finished()
assert len(resp) == 2
addr, raw_sig = resp
# already encoded in Bitcoin fashion, binary.
assert 40 < len(raw_sig) <= 65
return raw_sig
except (CCUserRefused, CCBusyError) as exc:
self.handler.show_error(str(exc))
except CCProtoError as exc:
self.logger.exception('Error showing address')
self.handler.show_error('{}\n\n{}'.format(
_('Error showing address') + ':', str(exc)))
except Exception as e:
self.give_error(e)
# give empty bytes for error cases; it seems to clear the old signature box
return b''
@wrap_busy
def sign_transaction(self, tx, password):
# Upload PSBT for signing.
# - we can also work offline (without paired device present)
if tx.is_complete():
return
client = self.get_client()
assert client.dev.master_fingerprint == self.get_xfp_int()
raw_psbt = tx.serialize_as_bytes()
try:
try:
self.handler.show_message("Authorize Transaction...")
client.sign_transaction_start(raw_psbt)
while 1:
# How to kill some time, without locking UI?
time.sleep(0.250)
resp = client.sign_transaction_poll()
if resp is not None:
break
rlen, rsha = resp
# download the resulting txn.
raw_resp = client.download_file(rlen, rsha)
finally:
self.handler.finished()
except (CCUserRefused, CCBusyError) as exc:
self.logger.info(f'Did not sign: {exc}')
self.handler.show_error(str(exc))
return
except BaseException as e:
self.logger.exception('')
self.give_error(e)
return
tx2 = PartialTransaction.from_raw_psbt(raw_resp)
# apply partial signatures back into txn
tx.combine_with_other_psbt(tx2)
# caller's logic looks at tx now and if it's sufficiently signed,
# will send it if that's the user's intent.
@staticmethod
def _encode_txin_type(txin_type):
# Map from Electrum code names to our code numbers.
return {'standard': AF_CLASSIC, 'p2pkh': AF_CLASSIC,
'p2sh': AF_P2SH,
'p2wpkh-p2sh': AF_P2WPKH_P2SH,
'p2wpkh': AF_P2WPKH,
'p2wsh-p2sh': AF_P2WSH_P2SH,
'p2wsh': AF_P2WSH,
}[txin_type]
@wrap_busy
def show_address(self, sequence, txin_type):
client = self.get_client()
address_path = self.get_derivation_prefix()[2:] + "/%d/%d"%sequence
addr_fmt = self._encode_txin_type(txin_type)
try:
try:
self.handler.show_message(_("Showing address ..."))
dev_addr = client.show_address(address_path, addr_fmt)
# we could double check address here
finally:
self.handler.finished()
except CCProtoError as exc:
self.logger.exception('Error showing address')
self.handler.show_error('{}\n\n{}'.format(
_('Error showing address') + ':', str(exc)))
except BaseException as exc:
self.logger.exception('')
self.handler.show_error(exc)
@wrap_busy
def show_p2sh_address(self, M, script, xfp_paths, txin_type):
client = self.get_client()
addr_fmt = self._encode_txin_type(txin_type)
try:
try:
self.handler.show_message(_("Showing address ..."))
dev_addr = client.show_p2sh_address(M, xfp_paths, script, addr_fmt=addr_fmt)
# we could double check address here
finally:
self.handler.finished()
except CCProtoError as exc:
self.logger.exception('Error showing address')
self.handler.show_error('{}.\n{}\n\n{}'.format(
_('Error showing address'),
_('Make sure you have imported the correct wallet description '
'file on the device for this multisig wallet.'),
str(exc)))
except BaseException as exc:
self.logger.exception('')
self.handler.show_error(exc)
class ColdcardPlugin(HW_PluginBase):
keystore_class = Coldcard_KeyStore
minimum_library = (0, 7, 7)
DEVICE_IDS = [
(COINKITE_VID, CKCC_PID),
(COINKITE_VID, CKCC_SIMULATED_PID)
]
SUPPORTED_XTYPES = ('standard', 'p2wpkh-p2sh', 'p2wpkh', 'p2wsh-p2sh', 'p2wsh')
def __init__(self, parent, config, name):
HW_PluginBase.__init__(self, parent, config, name)
self.libraries_available = self.check_libraries_available()
if not self.libraries_available:
return
self.device_manager().register_devices(self.DEVICE_IDS, plugin=self)
self.device_manager().register_enumerate_func(self.detect_simulator)
def get_library_version(self):
import ckcc
try:
version = ckcc.__version__
except AttributeError:
version = 'unknown'
if requirements_ok:
return version
else:
raise LibraryFoundButUnusable(library_version=version)
def detect_simulator(self):
# if there is a simulator running on this machine,
# return details about it so it's offered as a pairing choice
fn = CKCC_SIMULATOR_PATH
if os.path.exists(fn):
return [Device(path=fn,
interface_number=-1,
id_=fn,
product_key=(COINKITE_VID, CKCC_SIMULATED_PID),
usage_page=0,
transport_ui_string='simulator')]
return []
@runs_in_hwd_thread
def create_client(self, device, handler):
# We are given a HID device, or at least some details about it.
# Not sure why not we aren't just given a HID library handle, but
# the 'path' is unabiguous, so we'll use that.
try:
rv = CKCCClient(self, handler, device.path,
is_simulator=(device.product_key[1] == CKCC_SIMULATED_PID))
return rv
except Exception as e:
self.logger.exception('late failure connecting to device?')
return None
def setup_device(self, device_info, wizard, purpose):
device_id = device_info.device.id_
client = self.scan_and_create_client_for_device(device_id=device_id, wizard=wizard)
return client
def get_xpub(self, device_id, derivation, xtype, wizard):
# this seems to be part of the pairing process only, not during normal ops?
# base_wizard:on_hw_derivation
if xtype not in self.SUPPORTED_XTYPES:
raise ScriptTypeNotSupported(_('This type of script is not supported with {}.').format(self.device))
client = self.scan_and_create_client_for_device(device_id=device_id, wizard=wizard)
client.ping_check()
xpub = client.get_xpub(derivation, xtype)
return xpub
@runs_in_hwd_thread
def get_client(self, keystore, force_pair=True, *,
devices=None, allow_user_interaction=True) -> Optional['CKCCClient']:
# Acquire a connection to the hardware device (via USB)
client = super().get_client(keystore, force_pair,
devices=devices,
allow_user_interaction=allow_user_interaction)
if client is not None:
client.ping_check()
return client
@staticmethod
def export_ms_wallet(wallet: Multisig_Wallet, fp, name):
# Build the text file Coldcard needs to understand the multisig wallet
# it is participating in. All involved Coldcards can share same file.
assert isinstance(wallet, Multisig_Wallet)
print('# Exported from Electrum', file=fp)
print(f'Name: {name:.20s}', file=fp)
print(f'Policy: {wallet.m} of {wallet.n}', file=fp)
print(f'Format: {wallet.txin_type.upper()}', file=fp)
xpubs = []
for xpub, ks in zip(wallet.get_master_public_keys(), wallet.get_keystores()): # type: str, KeyStoreWithMPK
fp_bytes, der_full = ks.get_fp_and_derivation_to_be_used_in_partial_tx(der_suffix=[], only_der_suffix=False)
fp_hex = fp_bytes.hex().upper()
der_prefix_str = bip32.convert_bip32_intpath_to_strpath(der_full)
xpubs.append((fp_hex, xpub, der_prefix_str))
# Before v3.2.1 derivation didn't matter too much to the Coldcard, since it
# could use key path data from PSBT or USB request as needed. However,
# derivation data is now required.
print('', file=fp)
assert len(xpubs) == wallet.n
for xfp, xpub, der_prefix in xpubs:
print(f'Derivation: {der_prefix}', file=fp)
print(f'{xfp}: {xpub}\n', file=fp)
def show_address(self, wallet, address, keystore: 'Coldcard_KeyStore' = None):
if keystore is None:
keystore = wallet.get_keystore()
if not self.show_address_helper(wallet, address, keystore):
return
txin_type = wallet.get_txin_type(address)
# Standard_Wallet => not multisig, must be bip32
if type(wallet) is Standard_Wallet:
sequence = wallet.get_address_index(address)
keystore.show_address(sequence, txin_type)
elif type(wallet) is Multisig_Wallet:
assert isinstance(wallet, Multisig_Wallet) # only here for type-hints in IDE
# More involved for P2SH/P2WSH addresses: need M, and all public keys, and their
# derivation paths. Must construct script, and track fingerprints+paths for
# all those keys
pubkey_deriv_info = wallet.get_public_keys_with_deriv_info(address)
pubkey_hexes = sorted([pk.hex() for pk in list(pubkey_deriv_info)])
xfp_paths = []
for pubkey_hex in pubkey_hexes:
pubkey = bytes.fromhex(pubkey_hex)
ks, der_suffix = pubkey_deriv_info[pubkey]
fp_bytes, der_full = ks.get_fp_and_derivation_to_be_used_in_partial_tx(der_suffix, only_der_suffix=False)
xfp_int = xfp_int_from_xfp_bytes(fp_bytes)
xfp_paths.append([xfp_int] + list(der_full))
script = bfh(wallet.pubkeys_to_scriptcode(pubkey_hexes))
keystore.show_p2sh_address(wallet.m, script, xfp_paths, txin_type)
else:
keystore.handler.show_error(_('This function is only available for standard wallets when using {}.').format(self.device))
return
def xfp_int_from_xfp_bytes(fp_bytes: bytes) -> int:
return int.from_bytes(fp_bytes, byteorder="little", signed=False)
def xfp2str(xfp: int) -> str:
# Standardized way to show an xpub's fingerprint... it's a 4-byte string
# and not really an integer. Used to show as '0x%08x' but that's wrong endian.
return struct.pack('<I', xfp).hex().lower()
# EOF