From 4593439f17880963fc40d279c55523ef48369d7b Mon Sep 17 00:00:00 2001 From: James O'Beirne Date: Fri, 14 Jan 2022 09:57:25 -0500 Subject: [PATCH 1/4] Consolidate to one file, fix build --- .github/workflows/run-tests.yml | 2 +- Dockerfile | 6 + Makefile | 17 +- bin/compile | 66 -- coldcore | 344 ++---- requirements-dev.txt | 4 + src/coldcore/__init__.py | 0 src/coldcore/crypto.py | 67 -- src/coldcore/main.py | 1403 ----------------------- src/coldcore/thirdparty/__init__.py | 0 src/coldcore/thirdparty/bitcoin_rpc.py | 301 ----- src/coldcore/thirdparty/clii.py | 269 ----- src/coldcore/thirdparty/py.typed | 0 src/coldcore/ui.py | 1245 -------------------- src/requirements-dev.txt | 4 - src/setup.py | 20 - {src/coldcore => test}/test_coldcard.py | 4 +- {src/coldcore => test}/test_crypto.py | 4 +- 18 files changed, 138 insertions(+), 3618 deletions(-) create mode 100644 Dockerfile delete mode 100755 bin/compile create mode 100644 requirements-dev.txt delete mode 100644 src/coldcore/__init__.py delete mode 100644 src/coldcore/crypto.py delete mode 100755 src/coldcore/main.py delete mode 100644 src/coldcore/thirdparty/__init__.py delete mode 100644 src/coldcore/thirdparty/bitcoin_rpc.py delete mode 100644 src/coldcore/thirdparty/clii.py delete mode 100644 src/coldcore/thirdparty/py.typed delete mode 100644 src/coldcore/ui.py delete mode 100644 src/requirements-dev.txt delete mode 100644 src/setup.py rename {src/coldcore => test}/test_coldcard.py (98%) rename {src/coldcore => test}/test_crypto.py (92%) diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index 933d179..0ab799d 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -23,7 +23,7 @@ jobs: - name: Install test dependencies run: | python -m pip install --upgrade pip - pip install -r src/requirements-dev.txt + pip install -r requirements-dev.txt - name: black run: black --check diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..03d1f00 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,6 @@ +# Dockerfile for running tests/lints +FROM docker.io/library/python:latest + +WORKDIR /src +COPY ./requirements-dev.txt . +RUN pip install -r requirements-dev.txt diff --git a/Makefile b/Makefile index 75073e5..d5f5546 100644 --- a/Makefile +++ b/Makefile @@ -1,8 +1,15 @@ +install: + install -m755 coldcore /usr/local/bin/coldcore -coldcore: - ./bin/compile +docker-build: + docker build --tag coldcore/test . -test: - pytest src/coldcore +test: docker-build + docker run -v ./coldcore:/src/coldcore.py -e PYTHONPATH=/src -v ./:/src:ro coldcore/test pytest -vv --color=yes test/ -.PHONY: test +lint: docker-build + docker run --rm -v ./:/src:ro coldcore/test flake8 coldcore + docker run --rm -v ./:/src:ro coldcore/test black --check coldcore + docker run --rm -v ./:/src:ro coldcore/test mypy coldcore + +.PHONY: docker-build lint test diff --git a/bin/compile b/bin/compile deleted file mode 100755 index fa5181f..0000000 --- a/bin/compile +++ /dev/null @@ -1,66 +0,0 @@ -#!/usr/bin/env python3 -""" -Used to compile the `src/` files into a single executable Python script -that's still decently auditable. -""" - -from pathlib import Path -import re - - -def render_file(): - src = Path("src/coldcore") - cc = (src / "main.py").read_bytes().splitlines() - newlines = [] - - for line in cc: - third_match = re.search(b"^from \.thirdparty\.(?P\\S+) import", line) - match = re.search(b"^from \.(?P\\S+) import", line) - - sep = f"# {'-' * 78}\n".encode() - - def inlined_from_delimiters(name): - begin = f"# --- inlined from src/coldcore/{name} " - end = f"# --- end inlined from src/coldcore/{name} " - begin = f"{begin}{'-' * (80 - len(begin))}" - end = f"{end}{'-' * (80 - len(end))}" - return (begin, end) - - if third_match: - name = third_match.group(1).decode() + ".py" - contents = Path(src / "thirdparty" / name).read_bytes().splitlines() - delim = inlined_from_delimiters("thirdparty/" + name) - newlines.extend( - [ - b"\n", - delim[0].encode(), - sep, - *contents, - b"\n\n" + delim[1].encode(), - sep, - b"\n", - ] - ) - elif match: - name = match.group(1).decode() + ".py" - contents = Path(src / name).read_bytes().splitlines() - delim = inlined_from_delimiters(name) - newlines.extend( - [ - b"\n", - delim[0].encode(), - sep, - *contents, - b"\n\n" + delim[1].encode(), - sep, - b"\n", - ] - ) - else: - newlines.append(line) - - Path("coldcore").write_bytes(b"\n".join(newlines)) - - -if __name__ == "__main__": - render_file() diff --git a/coldcore b/coldcore index f65c8aa..82bcade 100755 --- a/coldcore +++ b/coldcore @@ -21,72 +21,51 @@ """ +import argparse import logging import re import typing as t import sys import base64 import datetime +import inspect +import functools import subprocess +import hashlib import time +import io import textwrap import json -import io -import os import platform -from pathlib import Path -from typing import Optional as Op +import urllib.parse as urlparse +import socket +import http.client +import os +import http.client as httplib +import curses +import contextlib +import traceback +import threading +import decimal +import string from dataclasses import dataclass, field -from configparser import ConfigParser -from decimal import Decimal - -# fmt: off -# We have to keep these imports to one line because of how ./bin/compile works. - - -# --- inlined from src/coldcore/thirdparty/clii.py ----------------------------- -# ------------------------------------------------------------------------------ - -""" -clii - -The easiest damned argparse wrapper there ever was. - - -Copyright 2020 James O'Beirne - -Permission is hereby granted, free of charge, to any person obtaining a copy of -this software and associated documentation files (the "Software"), to deal in -the Software without restriction, including without limitation the rights to -use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies -of the Software, and to permit persons to whom the Software is furnished to do -so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. +from pathlib import Path +from collections import namedtuple +from curses.textpad import Textbox -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. -""" -import sys -import argparse -import functools -import inspect -import typing as t -import os -import logging from textwrap import dedent +from decimal import Decimal +from typing import IO, Optional as Op +from configparser import ConfigParser + +# --- command line parsing ------------------------------------------------------------ +# ------------------------------------------------------------------------------------- -logger = logging.getLogger("clii") +clii_logger = logging.getLogger("clii") if os.environ.get("CLII_DEBUG"): - logger.setLevel(logging.DEBUG) - logger.addHandler(logging.StreamHandler()) + clii_logger.setLevel(logging.DEBUG) + clii_logger.addHandler(logging.StreamHandler()) class Arg: @@ -176,7 +155,7 @@ class Arg: kwargs["action"] = "store_false" if self.default else "store_true" kwargs.pop("type", "") - logger.debug(f"Attaching argument: {self.names} -> {kwargs}") + clii_logger.debug(f"Attaching argument: {self.names} -> {kwargs}") parser.add_argument(*self.names, **kwargs) # type: ignore def update_name(self, name: str): @@ -269,7 +248,7 @@ class App: fnc.__name__.replace("_", "-"), description="\n".join(doclines), ) - logger.debug("Added subparser: %s", sub) + clii_logger.debug("Added subparser: %s", sub) for arg in Arg.from_func(fnc): arg.add_to_parser(sub) @@ -286,7 +265,7 @@ class App: def parse_for_run(self) -> t.Tuple[t.Callable, t.Tuple[t.List, t.Dict]]: self.args = self.parser.parse_args() args = vars(self.args) - logger.debug("Parsed args: %s", args) + clii_logger.debug("Parsed args: %s", args) fnc = args.pop("func", None) if not fnc: @@ -318,15 +297,8 @@ class App: return fnc(*func_args, **func_kwargs) -# --- end inlined from src/coldcore/thirdparty/clii.py ------------------------- -# ------------------------------------------------------------------------------ - - - - - -# --- inlined from src/coldcore/thirdparty/bitcoin_rpc.py ---------------------- -# ------------------------------------------------------------------------------ +# --- Bitcoin RPC --------------------------------------------------------------------- +# ------------------------------------------------------------------------------------- # Copyright (C) 2007 Jan-Klaas Kollhof # Copyright (C) 2011-2018 The python-bitcoinlib developers @@ -341,28 +313,11 @@ class App: # propagated, or distributed except according to the terms contained in the # LICENSE file. -import logging -import os -import base64 -import http.client as httplib -import json -import platform -import urllib.parse as urlparse -import socket -import re -import time -import http.client -import typing as t -from typing import IO, Optional as Op -from decimal import Decimal - DEFAULT_USER_AGENT = "AuthServiceProxy/0.1" DEFAULT_HTTP_TIMEOUT = 30 - -logger = logging.getLogger("rpc") -logger.setLevel(logging.DEBUG) -# logger.addHandler(logging.StreamHandler()) +rpc_logger = logging.getLogger("rpc") +rpc_logger.setLevel(logging.DEBUG) class JSONRPCError(Exception): @@ -441,13 +396,13 @@ class BitcoinRPC(object): authpair = self._get_bitcoind_cookie_authpair( conf, btc_conf_file, net_name ) - logger.debug("pulling authpair from cookie despite intaking URL") + rpc_logger.debug("pulling authpair from cookie despite intaking URL") if wallet_name: service_url = service_url.rstrip("/") service_url += f"/wallet/{wallet_name}" - logger.info(f"Connecting to bitcoind: {service_url}") + rpc_logger.info(f"Connecting to bitcoind: {service_url}") self.url = service_url # Credential redacted @@ -455,7 +410,7 @@ class BitcoinRPC(object): self._parsed_url = urlparse.urlparse(service_url) self.host = self._parsed_url.hostname - logger.info(f"Initializing RPC client at {self.public_url}") + rpc_logger.info(f"Initializing RPC client at {self.public_url}") # XXX keep for debugging, but don't ship: # logger.info(f"[REMOVE THIS] USING AUTHPAIR {authpair}") @@ -500,12 +455,12 @@ class BitcoinRPC(object): try: with open(cookie_file, "r") as fd: authpair = fd.read() - logger.debug("read authpair from cookie") + rpc_logger.debug("read authpair from cookie") except (IOError, FileNotFoundError) as err: - logger.debug("couldn't read authpair from cookie", exc_info=True) + rpc_logger.debug("couldn't read authpair from cookie", exc_info=True) if "rpcpassword" in conf: authpair = "%s:%s" % (conf["rpcuser"], conf["rpcpassword"]) - logger.debug("read authpair from conf") + rpc_logger.debug("read authpair from conf") else: raise ValueError( "Cookie file unusable (%s) and rpcpassword not specified " @@ -539,7 +494,7 @@ class BitcoinRPC(object): } ) - logger.debug(f"[{self.public_url}] calling %s%s", service_name, args) + rpc_logger.debug(f"[{self.public_url}] calling %s%s", service_name, args) headers = { "Host": self._parsed_url.hostname, @@ -553,12 +508,13 @@ class BitcoinRPC(object): path = self._parsed_url.path tries = 5 backoff = 0.3 + conn = None while tries: try: conn = self._getconn(timeout=kwargs['timeout']) conn.request("POST", path, postdata, headers) except (BlockingIOError, http.client.CannotSendRequest, socket.gaierror): - logger.exception( + rpc_logger.exception( f"hit request error: {path}, {postdata}, {self._parsed_url}" ) tries -= 1 @@ -569,6 +525,7 @@ class BitcoinRPC(object): else: break + assert conn response = self._get_response(conn) err = response.get("error") if err is not None: @@ -595,7 +552,7 @@ class BitcoinRPC(object): rdata = http_response.read().decode("utf8") try: loaded = json.loads(rdata, parse_float=Decimal) - logger.debug(f"[{self.public_url}] -> {loaded}") + rpc_logger.debug(f"[{self.public_url}] -> {loaded}") return loaded except Exception: raise JSONRPCError( @@ -631,22 +588,11 @@ class BitcoinRPC(object): return _call_wrapper -# --- end inlined from src/coldcore/thirdparty/bitcoin_rpc.py ------------------ -# ------------------------------------------------------------------------------ - - - - - -# --- inlined from src/coldcore/crypto.py -------------------------------------- -# ------------------------------------------------------------------------------ - -""" -Basic encoding/cryptographic operations, mostly relating to xpub parsing. -""" -import hashlib - -# Much of this file was derived from code in buidl-python +# --- crypto -------------------------------------------------------------------------- +# Basic encoding/cryptographic operations, mostly relating to xpub parsing. +# ------------------------------------------------------------------------------------- +# +# Much of this section was derived from code in buidl-python # (https://github.com/buidl-bitcoin/buidl-python). @@ -710,40 +656,10 @@ def hash256(s): return hashlib.sha256(hashlib.sha256(s).digest()).digest() -# --- end inlined from src/coldcore/crypto.py ---------------------------------- -# ------------------------------------------------------------------------------ - - - - - -# --- inlined from src/coldcore/ui.py ------------------------------------------ -# ------------------------------------------------------------------------------ - -import curses -import contextlib -import typing as t -import logging -import textwrap -import time -import subprocess -import sys -import traceback -import socket -import threading -import platform -import base64 -import datetime -import os -import json -import decimal -import string -from dataclasses import dataclass -from pathlib import Path -from collections import namedtuple -from curses.textpad import Textbox +# --- Curses UI ----------------------------------------------------------------------- +# ------------------------------------------------------------------------------------- -logger = logging.getLogger("ui") +ui_logger = logging.getLogger("ui") class DecimalEncoder(json.JSONEncoder): @@ -899,11 +815,10 @@ F = OutputFormatter() class Scene: - def __init__(self, scr, conf, wconfs, controller): + def __init__(self, scr, conf, wconfs): self.scr = scr self.config = conf self.wallet_configs = wconfs - self.controller = controller def draw(self, k: int) -> t.Tuple[int, Action]: pass @@ -914,7 +829,7 @@ class MenuItem(namedtuple("MenuItem", "idx,title,action")): return (self.idx, self.title, mchoice == self) -def run_setup(config, controller) -> t.Tuple[t.Any, t.Any]: +def run_setup(config) -> t.Tuple[t.Any, t.Any]: curses.endwin() os.system("cls" if os.name == "nt" else "clear") @@ -947,7 +862,7 @@ def run_setup(config, controller) -> t.Tuple[t.Any, t.Any]: p(title) blank("searching for Bitcoin Core...") - rpc = controller.discover_rpc(config) + rpc = discover_rpc(config) if not rpc: warn("couldn't detect Bitcoin Core - make sure it's running locally, or") warn("use `coldcore --rpc `") @@ -966,20 +881,21 @@ def run_setup(config, controller) -> t.Tuple[t.Any, t.Any]: delay() pre = "you can encrypt your config file with" - if controller.has_gpg(): + if _get_gpg_command(): prompt = "do you want to use GPG to encrypt your coldcore config? [y/N] " if inp(prompt) == "y": use_gpg = True - if controller.has_pass(): + # TODO not Windows compatible + if _get_stdout("which pass")[0] == 0: info(f"{pre} pass by prefixing your path with 'pass:'") p() delay() - defaultpath = controller.suggested_config_path(use_gpg) + defaultpath = get_path_for_new_config(use_gpg) where = inp(f"where should I store your config? [{defaultpath}] ") where = where or defaultpath - config = controller.create_config(where, rpc.url) + config = create_config(where, rpc.url) else: if config.loaded_from.endswith(".gpg"): use_gpg = True @@ -1044,7 +960,7 @@ def run_setup(config, controller) -> t.Tuple[t.Any, t.Any]: pubfile = pubfilepath try: - wallet = controller.parse_cc_public(pubfile.read_text(), rpc) + wallet = CCWallet.from_io(io.StringIO(pubfile.read_text()), rpc) except Exception as e: p() if "key 'tpub" in str(e): @@ -1087,7 +1003,7 @@ def run_setup(config, controller) -> t.Tuple[t.Any, t.Any]: p() section("wallet setup in Core") - controller.rpc_wallet_create(rpc, wallet) + rpc_wallet_create(rpc, wallet) done(f"created wallet {yellow(wallet.name)} in Core as watch-only") rpcw = config.rpc(wallet) @@ -1165,7 +1081,7 @@ def run_setup(config, controller) -> t.Tuple[t.Any, t.Any]: got_utxo = None while not got_utxo: spin("waiting for transaction") - utxos = controller.get_utxos(rpcw) + utxos = get_utxos(rpcw) matching = [u for u in utxos.values() if u.address == receive_addr1] if matching: got_utxo = matching[0] @@ -1191,8 +1107,7 @@ def run_setup(config, controller) -> t.Tuple[t.Any, t.Any]: # Send 90% of the value over. # TODO this is only for testing and is potentially dangerous send_amt = str(round(((got_utxo.amount * 9) / 10), 8)) - prepared_tx = controller.prepare_send( - config, + prepared_tx = _prepare_send( rpcw, sendtoaddr, send_amt, @@ -1230,12 +1145,12 @@ def run_setup(config, controller) -> t.Tuple[t.Any, t.Any]: # TODO clean this up psbt_hex = base64.b64encode(Path(signed_filename).read_bytes()).decode() - txhex = controller.psbt_to_tx_hex(rpcw, Path(signed_filename)) + txhex = _psbt_to_tx_hex(rpcw, Path(signed_filename)) p() p() done("cool! got the signed PSBT") - if not controller.confirm_broadcast(rpcw, txhex, psbt_hex): + if not confirm_broadcast(rpcw, txhex, psbt_hex): warn("aborting - doublespend the inputs immediately") return finish(config, wallet) @@ -1246,7 +1161,7 @@ def run_setup(config, controller) -> t.Tuple[t.Any, t.Any]: inmempool = False while not inmempool: spin("waiting to see the transaction in the mempool") - utxos = controller.get_utxos(rpcw) + utxos = get_utxos(rpcw) matching = [u for u in utxos.values() if u.address == sendtoaddr] if matching: got_utxo = matching[0] @@ -1283,14 +1198,14 @@ def _run_scantxoutset(rpcw, args, result): try: result["result"] = rpcw.scantxoutset(*args) except socket.timeout: - logger.debug("socket timed out during txoutsetscan (this is expected)") + ui_logger.debug("socket timed out during txoutsetscan (this is expected)") def _run_rescan(rpcw, begin_height: int): try: rpcw.rescanblockchain(begin_height) except socket.timeout: - logger.debug("socket timed out during rescan (this is expected)") + ui_logger.debug("socket timed out during rescan (this is expected)") # Curses is weird and ENTER isn't always ENTER. @@ -1439,7 +1354,7 @@ class DashboardScene(Scene): t1 = threading.Thread( target=_get_utxo_lines, - args=(wrpc, self.controller, self.utxos), + args=(wrpc, self.utxos), ) t1.start() self.threads.append(t1) @@ -1463,12 +1378,10 @@ class DashboardScene(Scene): try: return self._draw(k) except Exception: - logger.exception("Dashboard curses barfed") + ui_logger.exception("Dashboard curses barfed") self.stop_threads() raise - return (ord("q"), GoHome) - def _draw(self, k: int) -> t.Tuple[int, Action]: scr = self.scr self.height, self.width = scr.getmaxyx() @@ -1505,7 +1418,7 @@ class DashboardScene(Scene): rpcw = self.config.rpc(wall) self.new_addrs.append(rpcw.getnewaddress()) except Exception: - logger.info("call to getnewadddress failed", exc_info=True) + ui_logger.info("call to getnewadddress failed", exc_info=True) utxo_addrs = {u.address for u in self.utxos.values()} # Strip out used addresses. @@ -1658,7 +1571,7 @@ class DashboardScene(Scene): rpcw = self.config.rpc(wall) rpcw.setlabel(u.address, new_label) except Exception: - logger.info("failed to set label", exc_info=True) + ui_logger.info("failed to set label", exc_info=True) self.flash_msg = "failed to set label" else: self.flash_msg = f"set label to '{new_label}'" @@ -1761,7 +1674,7 @@ def to_clipboard(s: str) -> bool: if plat == "Linux": if sh("which xclip", capture_output=True) != 0: - logger.info("xclip not found, cannot copy to clipboard") + ui_logger.info("xclip not found, cannot copy to clipboard") return False cmd = "xclip -selection clipboard" sh(f"printf '{s}' | {cmd}") @@ -1793,15 +1706,16 @@ blocks_lock = threading.Lock() def _get_new_blocks(rpc, blocks): last_saw = None + saw = None while True: try: saw = rpc.getbestblockhash() except Exception: - logger.info("getbestblockhash call failed", exc_info=True) + ui_logger.info("getbestblockhash call failed", exc_info=True) rpc_conn_lost.set() - if saw != last_saw: + if saw and saw != last_saw: stats = rpc.getblockstats(saw) with blocks_lock: blocks.append( @@ -1823,19 +1737,22 @@ def _get_new_blocks(rpc, blocks): return -def _get_utxo_lines(rpcw, controller, utxos): +def _get_utxo_lines(rpcw, utxos): """ Poll constantly for new UTXOs. """ + new_utxos = None + while True: try: - new_utxos = controller.get_utxos(rpcw) + new_utxos = get_utxos(rpcw) except Exception: - logger.info("listunspents call failed", exc_info=True) + ui_logger.info("listunspents call failed", exc_info=True) - with utxos_lock: - utxos.clear() - utxos.update(new_utxos) + if new_utxos: + with utxos_lock: + utxos.clear() + utxos.update(new_utxos) time.sleep(1) @@ -1856,7 +1773,7 @@ class _TermOpts: TermOpts = _TermOpts() -def draw_menu(scr, config, wallet_configs, controller, action=None): +def draw_menu(scr, config, wallet_configs, action=None): wallet_configs = wallet_configs or [] # Clear and refresh the screen for a blank canvas scr.clear() @@ -1875,8 +1792,8 @@ def draw_menu(scr, config, wallet_configs, controller, action=None): if curses.COLORS >= 256: TermOpts.has_256color = True - home = HomeScene(scr, config, wallet_configs, controller) - dashboard = DashboardScene(scr, config, wallet_configs, controller) + home = HomeScene(scr, config, wallet_configs) + dashboard = DashboardScene(scr, config, wallet_configs) action = action or GoHome k = 0 @@ -1912,12 +1829,12 @@ def draw_menu(scr, config, wallet_configs, controller, action=None): if action == GoHome: (k, action) = home.draw(k) elif action == GoSetup: - config, wallet = run_setup(config, controller) + config, wallet = run_setup(config) # Reinitialize the scenes if config and wallet: wallet_configs.append(wallet) - home = HomeScene(scr, config, wallet_configs, controller) - dashboard = DashboardScene(scr, config, wallet_configs, controller) + home = HomeScene(scr, config, wallet_configs) + dashboard = DashboardScene(scr, config, wallet_configs) k = -1 action = GoHome elif action == GoDashboard: @@ -1947,10 +1864,10 @@ def attrs(scr, *attrs): scr.attroff(a) -def start_ui(config, wallet_configs, controller, action=None): +def start_ui(config, wallet_configs, action=None): formatter = OutputFormatter() try: - curses.wrapper(draw_menu, config, wallet_configs, controller, action) + curses.wrapper(draw_menu, config, wallet_configs, action) os.system("cls" if os.name == "nt" else "clear") except curses.error: print() @@ -1958,7 +1875,7 @@ def start_ui(config, wallet_configs, controller, action=None): print() sys.exit(1) except socket.timeout: - logger.exception("RPC connection timed out") + ui_logger.exception("RPC connection timed out") print() formatter.warn("Unable to connect to Bitcoin Core RPC - are you sure ") formatter.warn("it is running and the RPC URL you gave is correct?") @@ -1967,12 +1884,8 @@ def start_ui(config, wallet_configs, controller, action=None): sys.exit(1) -# --- end inlined from src/coldcore/ui.py -------------------------------------- -# ------------------------------------------------------------------------------ - - - -# fmt: on +# --- main/CLI ------------------------------------------------------------------------ +# ------------------------------------------------------------------------------------- __VERSION__ = "0.2.0-beta" @@ -2059,7 +1972,7 @@ def decodepsbt(fname: str, format: str = "json"): Args: format: either json or hex """ - (config, (wall, *_)) = _get_config_required() + (config, *_) = _get_config_required() rpc = config.rpc() b = Path(fname).read_bytes() hexval = base64.b64encode(b).decode() @@ -2076,7 +1989,7 @@ def setup(): one doesn't already exist) and populates a watch-only wallet in Core. """ config, walls = _get_config(require_wallets=False) - start_ui(config, walls, WizardController(), GoSetup) + start_ui(config, walls, GoSetup) @cli.cmd @@ -2162,7 +2075,7 @@ def prepare_send(to_address: str, amount: str, spend_from: str = ""): rpcw = config.rpc(wall) spend_from_list = spend_from.split(",") if spend_from else None - return _prepare_send(config, rpcw, to_address, amount, spend_from_list) + return _prepare_send(rpcw, to_address, amount, spend_from_list) @cli.cmd @@ -2209,7 +2122,7 @@ def newaddr(num: int = 1, clip: ClipArg = False): # type: ignore @cli.cmd def ui(): config, walls = _get_config(require_wallets=False) - start_ui(config, walls, WizardController()) + start_ui(config, walls) @cli.main @@ -2529,43 +2442,6 @@ class UTXO: return (self.txid, self.vout) -class WizardController: - """Used to proxy logic into the terminal UI.""" - - def create_config(self, p: str, url: str) -> Op["GlobalConfig"]: - return create_config(p, url) - - def parse_cc_public(self, contents: str, rpc: BitcoinRPC) -> CCWallet: - return CCWallet.from_io(io.StringIO(contents), rpc) - - def rpc_wallet_create(self, *args, **kwargs): - return rpc_wallet_create(*args, **kwargs) - - def discover_rpc(self, *args, **kwargs) -> Op[BitcoinRPC]: - return discover_rpc(*args, **kwargs) - - def has_gpg(self) -> bool: - return bool(_get_gpg_command()) - - def has_pass(self) -> bool: - return _get_stdout("which pass")[0] == 0 - - def suggested_config_path(self, use_gpg: bool = False) -> str: - return get_path_for_new_config(use_gpg) - - def get_utxos(self, rpcw): - return get_utxos(rpcw) - - def prepare_send(self, *args, **kwargs) -> str: - return _prepare_send(*args, **kwargs) - - def psbt_to_tx_hex(self, *args, **kwargs) -> str: - return _psbt_to_tx_hex(*args, **kwargs) - - def confirm_broadcast(self, *args, **kwargs) -> bool: - return confirm_broadcast(*args, **kwargs) - - @dataclass class GlobalConfig: """Coldcore-specific configuration.""" @@ -2576,7 +2452,6 @@ class GlobalConfig: default_wallet: Op[str] = None stdout: t.IO = sys.stdout stderr: t.IO = sys.stderr - wizard_controller: WizardController = WizardController() def rpc(self, wallet: Op[Wallet] = None, **kwargs) -> BitcoinRPC: wall_rpc = wallet.bitcoind_json_url if wallet else None @@ -2786,7 +2661,7 @@ def _get_rpc_inner( def _get_stdout(*args, **kwargs) -> t.Tuple[int, bytes]: - """Return (returncode, stdout as bytes).""" + """Return (returncode, stdout).""" kwargs["shell"] = True kwargs["capture_output"] = True result = subprocess.run(*args, **kwargs) @@ -2818,12 +2693,11 @@ def get_utxos(rpcw: BitcoinRPC) -> t.Dict[UtxoId, "UTXO"]: def _prepare_send( - config: GlobalConfig, rpcw: BitcoinRPC, to_address: str, amount: str, spend_from: Op[t.List[str]], -): +) -> str: vins = [] if spend_from: @@ -2851,7 +2725,7 @@ def _prepare_send( # error code: -5 indicates bad address; handle that. if e.error.get("code") == -5: # type: ignore F.warn(f"Bad address specified: {e}") - return False + return "" raise nowstr = datetime.datetime.now().strftime("%Y%m%d-%H%M") @@ -2963,6 +2837,8 @@ def confirm_broadcast(rpcw: BitcoinRPC, hex_val: str, psbt_hex: str) -> bool: addrs = ",".join(spk["addresses"]) elif 'address' in spk: addrs = spk["address"] + else: + raise RuntimeError(f"unexpected decoderawtransaction format:\n{info}") outs.append((addrs, out["value"])) @@ -3008,7 +2884,7 @@ def confirm_broadcast(rpcw: BitcoinRPC, hex_val: str, psbt_hex: str) -> bool: class Pass: """Access to pass, the password store.""" - def write(cls, path: str, content: str) -> bool: + def write(self, path: str, content: str) -> bool: """Return True if write successful.""" # TODO maybe detect whether or not we're overwriting and warn F.alert(f"Requesting to write to pass: {path}") @@ -3326,4 +3202,4 @@ def main(): if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/requirements-dev.txt b/requirements-dev.txt new file mode 100644 index 0000000..8124946 --- /dev/null +++ b/requirements-dev.txt @@ -0,0 +1,4 @@ +mypy +pytest +flake8 +black==21.12b0 diff --git a/src/coldcore/__init__.py b/src/coldcore/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/coldcore/crypto.py b/src/coldcore/crypto.py deleted file mode 100644 index 0133b98..0000000 --- a/src/coldcore/crypto.py +++ /dev/null @@ -1,67 +0,0 @@ -""" -Basic encoding/cryptographic operations, mostly relating to xpub parsing. -""" -import hashlib - -# Much of this file was derived from code in buidl-python -# (https://github.com/buidl-bitcoin/buidl-python). - - -BASE58_ALPHABET = "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz" - -MAINNET_XPUB = bytes.fromhex("0488b21e") -MAINNET_YPUB = bytes.fromhex("049d7cb2") -MAINNET_ZPUB = bytes.fromhex("04b24746") -TESTNET_XPUB = bytes.fromhex("043587cf") -TESTNET_YPUB = bytes.fromhex("044a5262") -TESTNET_ZPUB = bytes.fromhex("045f1cf6") - - -def raw_decode_base58(s): - num = 0 - # see how many leading 0's we are starting with - prefix = b"" - for c in s: - if num == 0 and c == "1": - prefix += b"\x00" - else: - num = 58 * num + BASE58_ALPHABET.index(c) - # put everything into base64 - byte_array = [] - while num > 0: - byte_array.insert(0, num & 255) - num >>= 8 - combined = prefix + bytes(byte_array) - checksum = combined[-4:] - if hash256(combined[:-4])[:4] != checksum: - raise RuntimeError("bad address: {} {}".format(checksum, hash256(combined)[:4])) - return combined[:-4] - - -def xpub_to_fp(xpub: str) -> str: - raw = raw_decode_base58(xpub) - - if len(raw) != 78: - raise ValueError("Not a proper extended key") - - version = raw[:4] - - if version not in ( - TESTNET_XPUB, - TESTNET_YPUB, - TESTNET_ZPUB, - MAINNET_XPUB, - MAINNET_YPUB, - MAINNET_ZPUB, - ): - raise ValueError(f"not an xprv, yprv or zprv: {version}") - - return hash160(raw[-33:])[:4].hex() - - -def hash160(s): - return hashlib.new("ripemd160", hashlib.sha256(s).digest()).digest() - - -def hash256(s): - return hashlib.sha256(hashlib.sha256(s).digest()).digest() diff --git a/src/coldcore/main.py b/src/coldcore/main.py deleted file mode 100755 index 41dda58..0000000 --- a/src/coldcore/main.py +++ /dev/null @@ -1,1403 +0,0 @@ -#!/usr/bin/env python3 -""" - _ _ - ___ ___| |_| |___ ___ ___ ___ -| _| . | | . | _| . | _| -_| -|___|___|_|___|___|___|_| |___| - - a trust minimized Bitcoin wallet interface - - -# TODO - -- [ ] add wallet name -- [ ] add version birthday to new config -- [ ] allow manual coin selection when sending -- [ ] address labeling -- [ ] implement scrolling in the curses balance panel -- [ ] implement --json, --csv -- [ ] implement command-on-monitor -- [ ] multisig workflow - -""" - -import logging -import re -import typing as t -import sys -import base64 -import datetime -import subprocess -import time -import textwrap -import json -import io -import os -import platform -from pathlib import Path -from typing import Optional as Op -from dataclasses import dataclass, field -from configparser import ConfigParser -from decimal import Decimal - -# fmt: off -# We have to keep these imports to one line because of how ./bin/compile works. -from .thirdparty.clii import App, Arg -from .thirdparty.bitcoin_rpc import BitcoinRPC, JSONRPCError -from .crypto import xpub_to_fp -from .ui import start_ui, yellow, bold, green, red, GoSetup, OutputFormatter, DecimalEncoder, to_clipboard # noqa -# fmt: on - -__VERSION__ = "0.2.0-beta" - -root_logger = logging.getLogger() -logger = logging.getLogger("main") - -MAINNET = "mainnet" -TESTNET = "testnet3" - -F = OutputFormatter() - -cli = App() -cli.add_arg("--verbose", "-v", action="store_true", default=False) -cli.add_arg( - "--config", - "-c", - action="store", - default=None, - help=( - "Path to config file. Can be a `pass:Path/To/Config` or " - "a filename ending in .gpg." - ), -) -cli.add_arg("--debug", "-d", action="store_true", default=False) -cli.add_arg( - "--testnet", - action="store_true", - default=False, - help="Try to connect on the testnet network initially instead of mainnet.", -) -cli.add_arg( - "--version", - action="version", - version=f"coldcore {__VERSION__}", -) -cli.add_arg( - "--wallet", - "-w", - action="store", - default=None, - help="The specific wallet to open.", -) -cli.add_arg( - "--rpc", - "-r", - action="store", - default=None, - help="The Bitcoin Core RPC interface URL to use, e.g. 'http://user:pass@host:8332'", -) - -PASS_PREFIX = "pass:" - - -def setup_logging() -> Op[Path]: - """ - Configure logging; only log when --debug is enabled to prevent unintentional - data leaks. - - Returns a path to the logfile if one is being used. - """ - # TODO base this on config? - log_path = "coldcore.log" - formatter = logging.Formatter("%(asctime)s [%(name)s] %(levelname)s - %(message)s") - log_filehandler = logging.FileHandler(log_path) - log_filehandler.setLevel(logging.DEBUG) - log_filehandler.setFormatter(formatter) - - if cli.args.debug: - root_logger.setLevel(logging.DEBUG) - root_logger.addHandler(log_filehandler) - logger.setLevel(logging.DEBUG) - return Path(log_path) - - return None - - -# --- CLI commands ------------------------------------------------------------ -# ----------------------------------------------------------------------------- - - -@cli.cmd -def decodepsbt(fname: str, format: str = "json"): - """ - Args: - format: either json or hex - """ - (config, (wall, *_)) = _get_config_required() - rpc = config.rpc() - b = Path(fname).read_bytes() - hexval = base64.b64encode(b).decode() - if format == "hex": - print(hexval) - else: - print(json.dumps(rpc.decodepsbt(hexval), cls=DecimalEncoder)) - - -@cli.cmd -def setup(): - """ - Run initial setup for a wallet. This creates the local configuration file (if - one doesn't already exist) and populates a watch-only wallet in Core. - """ - config, walls = _get_config(require_wallets=False) - start_ui(config, walls, WizardController(), GoSetup) - - -@cli.cmd -def watch(): - """Watch activity related to your wallets.""" - (config, (wall, *_)) = _get_config_required() - rpcw = config.rpc(wall) - - utxos = get_utxos(rpcw) - F.task(f"Watching wallet {config.wallet_name}") - - while True: - new_utxos = get_utxos(rpcw) - - spent_addrs = utxos.keys() - new_utxos.keys() - new_addrs = new_utxos.keys() - utxos.keys() - - for addr in spent_addrs: - u = utxos[addr] - F.info(f"Saw spend: {u.address} ({u.amount})") - - for addr in new_addrs: - u = new_utxos[addr] - F.info(f"Got new UTXO: {u.address} ({u.amount})") - - was_zeroconf = [new_utxos[k] for k, v in utxos.items() if v.num_confs == 0] - finally_confed = [utxo for utxo in was_zeroconf if utxo.num_confs > 0] - - for u in finally_confed: - F.info(f"UTXO confirmed! {u.address} ({u.amount})") - - utxos = new_utxos - time.sleep(0.1) - - -@cli.cmd -def balance(format: str = "plain"): - """ - Check your wallet balances. - - Args: - format: can be plain, json, csv, or raw (for listunspent output) - """ - (config, (wall, *_)) = _get_config_required() - rpcw = config.rpc(wall) - result = rpcw.listunspent(0) - - if format == "raw": - print(json.dumps(result, cls=DecimalEncoder, indent=2)) - return - - utxos = UTXO.from_listunspent(result) # includes unconfirmed - sorted_utxos = sorted(utxos, key=lambda u: -u.num_confs) - - if format == "json": - print( - json.dumps([u.__dict__ for u in sorted_utxos], cls=DecimalEncoder, indent=2) - ) - return - - for utxo in sorted_utxos: - if format == "plain": - print(f"{utxo.address:<40} {utxo.num_confs:>10} {utxo.amount}") - elif format == "csv": - print(f"{utxo.address},{utxo.num_confs},{utxo.amount}") - - if format == "plain": - amt = sum(u.amount for u in utxos) - print(bold(f"total: {len(utxos)} ({amt} BTC)")) - - -@cli.cmd -def prepare_send(to_address: str, amount: str, spend_from: str = ""): - """ - Prepare a sending PSBT. - - Args: - to_address: which address to send to - amount: amount to send in BTC - spend_from: comma-separated addresses to pull unspents from as inputs - """ - (config, (wall, *_)) = _get_config_required() - rpcw = config.rpc(wall) - spend_from_list = spend_from.split(",") if spend_from else None - - return _prepare_send(config, rpcw, to_address, amount, spend_from_list) - - -@cli.cmd -def broadcast(signed_psbt_path: Path): - """Broadcast a signed PSBT.""" - (config, (wall, *_)) = _get_config_required() - rpcw = config.rpc(wall) - hex_val = _psbt_to_tx_hex(rpcw, signed_psbt_path) - psbt_hex = base64.b64encode(Path(signed_psbt_path).read_bytes()).decode() - - assert hex_val - - if not confirm_broadcast(rpcw, hex_val, psbt_hex): - F.warn("Aborting transaction! Doublespend the inputs!") - return - - got_hex = rpcw.sendrawtransaction(hex_val) - F.done(f"tx sent: {got_hex}") - print(got_hex) - - -ClipArg = Arg(("-c", "--to-clipboard")) - - -@cli.cmd -def newaddr(num: int = 1, clip: ClipArg = False): # type: ignore - """ - Args: - num: the number of new addresses to generate - clip: if passed, copy the latest new address to the clipboard - """ - (config, (wall, *_)) = _get_config_required() - rpcw = config.rpc(wall) - addr = "" - - for _ in range(num): - addr = rpcw.getnewaddress() - print(addr) - - if clip and addr: - to_clipboard(addr) - - -@cli.cmd -def ui(): - config, walls = _get_config(require_wallets=False) - start_ui(config, walls, WizardController()) - - -@cli.main -def cli_main(): - """ - A trust-minimized wallet script. - - You can think of this program as a small shim between your - air-gapped hardware wallet and Bitcoin Core. - """ - ui() - - -# --- Configuration classes --------------------------------------------------- -# ----------------------------------------------------------------------------- - - -@dataclass -class Wallet: - """ - In-memory representation of a single BIP32 HD wallet. Often but not necessarily - backed by a hardware wallet. - """ - - name: str - fingerprint: str - deriv_path: str - xpub: str - - # The name of the watch-only wallet stored in Bitcoin Core. - bitcoind_name: str - - # TODO at some point we'll support non-WPKH descriptors. - descriptors: t.List["WpkhDescriptor"] = field(default_factory=list) - - earliest_block: Op[int] = None - bitcoind_json_url: Op[str] = None - - # If given, this was loaded from an external storage mechanism (e.g. pass, gpg). - # Respect this when translating back to INI. - loaded_from: Op[str] = None - - @property - def descriptor_base(self): - return f"wpkh([{self.fingerprint}{self.deriv_path}]{self.xpub})" - - @property - def net_name(self): - if self.xpub.startswith("tpub"): - return TESTNET - elif self.xpub.startswith("xpub"): - return MAINNET - else: - raise ValueError("unhandled xpub prefix") - - def scantxoutset_args(self) -> t.Tuple[str, t.List[str]]: - return ("start", [d.with_checksum for d in self.descriptors]) - - def importmulti_args(self) -> t.Tuple: - args = [ - { - "desc": d.with_checksum, - "internal": d.is_change, - # TODO be more decisive about this gap limit. Right now it's sort of - # arbitrary. - "range": [0, 3000], - "timestamp": "now", - "keypool": True, - "watchonly": True, - } - for d in self.descriptors - ] - - return (args,) - - @property - def as_ini_dict(self) -> t.Dict: - if self.loaded_from: - # TODO it's incumbent upon the user to maintain this themmselves? - return {"load_from": self.loaded_from} - - checksums = {} - for d in self.descriptors: - checksums.update(d.change_to_checksum) - - return { - "fingerprint": self.fingerprint, - "deriv_path": self.deriv_path, - "xpub": self.xpub, - "bitcoind_name": self.bitcoind_name, - "bitcoind_json_url": self.bitcoind_json_url or "", - "earliest_block": str(self.earliest_block or ""), - "checksum_map": json.dumps(checksums), - } - - @classmethod - def from_ini(cls, name: str, rpc: BitcoinRPC, conf: ConfigParser) -> "Wallet": - this_conf = conf[name] - load_from = this_conf.get("load_from") - - if load_from: - content: Op[str] = "" - if _is_pass_path(load_from): - passpath = load_from.split(PASS_PREFIX, 1)[-1] - content = Pass().read(passpath, action=f"Requesting wallet {name})") - elif load_from.endswith(".gpg"): - content = GPG().read(load_from) # type: ignore - else: - raise ValueError(f"from directive unrecognized: {load_from}") - - if not content: - raise ValueError(f"failed to retrieve config from {load_from}") - - conf2 = ConfigParser() - try: - conf2.read_string(content) - except Exception: - msg = f"Failed to read config for wallet {name} ({load_from})" - logger.exception(msg) - F.warn(msg) - sys.exit(1) - - this_conf = conf2[name] - - fp = this_conf["fingerprint"] - deriv_path = this_conf["deriv_path"] - bitcoind_name = this_conf["bitcoind_name"] - xpub = this_conf["xpub"] - checksum_map = json.loads(this_conf["checksum_map"]) - url = this_conf.get("bitcoind_json_url") - earliest_block = ( - int(this_conf.get("earliest_block") or 0) or None - ) # type: ignore - - if set(checksum_map.keys()) != {"1", "0"}: - raise ValueError(f"unexpected checksum map contents: {checksum_map}") - - descs = [ - WpkhDescriptor.from_conf( - fp, - deriv_path, - xpub, - is_change=is_change, - checksum=checksum_map["1" if is_change else "0"], - ) - for is_change in [False, True] - ] - - return cls( - name, - fp, - deriv_path, - xpub, - bitcoind_name, - descs, - earliest_block, - url, - loaded_from=load_from, - ) - - -class CCWallet(Wallet): - """ - A wallet whose private key lives on a Coldcard device. - """ - - @classmethod - def from_io( - cls, inp: t.IO, rpc: BitcoinRPC, - name: Op[str] = None, - earliest_block: Op[int] = None - ) -> "CCWallet": - """ - Instantiate a CCWallet from the public output generated by the - coldcard. - """ - content = inp.read() - as_lines = content.splitlines() - xpub_prefix = "xpub" - - if re.search(r" => tpub", content): - xpub_prefix = "tpub" - - masterpubkey = "" - for idx, line in enumerate(as_lines): - if "'master' extended public key" in line: - masterpubkey = as_lines[idx + 2].strip() - - if not masterpubkey.startswith(xpub_prefix): - raise ValueError("file format unexpected: master key") - - # We don't do anything with the masterpubkey other than compute a - # fingerprint based on it. - fp = xpub_to_fp(masterpubkey).lower() - del masterpubkey - - m = re.search(r"master key fingerprint: (?P[a-zA-Z0-9]+)", content) - - # Optionally verify the master key fingerprint with a second source. - if m: - fp2 = m.groupdict()["fp"].lower() - - if fp2 != fp: - raise ValueError(f"fingerprints don't match: {fp} vs. {fp2}") - - m2 = re.search( - f"m/84'(?P\\S+) => {xpub_prefix}(?P[a-zA-Z0-9]+)", - content, - ) - - if not m2: - raise ValueError("couldn't find xpub path") - - deriv_path = "/84h" - suffix = m2.groupdict()["deriv_suffix"] - - if not re.fullmatch(r"(/\d+'?)+", suffix): - raise ValueError(f"derivation path not expected: {suffix}") - - deriv_path += suffix.replace("'", "h") - - if not re.search(deriv_path.replace("h", "'") + f" => {xpub_prefix}", content): - raise ValueError(f"inferred derivation path appears invalid: {deriv_path}") - - xpub: str = xpub_prefix + m2.groupdict()["xpub"] - - def desc_to_checksum(desc: WpkhDescriptor) -> str: - try: - # TODO this isn't available in 0.18 - return rpc.getdescriptorinfo(desc.base)["checksum"] - except JSONRPCError: - F.warn("Please upgrade Bitcoin Core to a version greater than 0.19") - raise - - descs = [] - for is_change in [False, True]: - desc = WpkhDescriptor.from_conf( - fp, deriv_path, xpub, is_change=is_change, checksum="" - ) - desc.checksum = desc_to_checksum(desc) - descs.append(desc) - - name = name or f'coldcard-{fp}' - - return cls( - name, - fp, - deriv_path, - xpub, - f"coldcard-{fp.lower()}", - descriptors=descs, - earliest_block=earliest_block, - ) - - -@dataclass -class WpkhDescriptor: - # The descriptor without the checksum. - base: str - checksum: str - # Does this descriptor correspond to a change wallet? - is_change: bool - - @property - def with_checksum(self): - return f"{self.base}#{self.checksum}" - - @property - def change_to_checksum(self): - key = "1" if self.is_change else "0" - return {key: self.checksum} - - @classmethod - def from_conf( - cls, - fingerprint: str, - deriv_path: str, - xpub: str, - is_change: bool, - checksum: str, - ) -> "WpkhDescriptor": - change = 1 if is_change else 0 - base = f"wpkh([{fingerprint.lower()}{deriv_path}]{xpub}/{change}/*)" - return cls(base, checksum, is_change) - - -# Type identifying a UTXO: (txid, vout) -UtxoId = t.Tuple[str, int] - - -@dataclass -class UTXO: - address: str - amount: Decimal - num_confs: int - txid: str - vout: int - label: str - - @classmethod - def from_listunspent(cls, rpc_outs: t.List[t.Dict]) -> t.List["UTXO"]: - return [ - cls( - out["address"], - out["amount"], - out["confirmations"], - out["txid"], - out["vout"], - out.get("label", ""), - ) - for out in rpc_outs - ] - - @property - def id(self) -> UtxoId: - """Return a unique identifier for the coin.""" - return (self.txid, self.vout) - - -class WizardController: - """Used to proxy logic into the terminal UI.""" - - def create_config(self, p: str, url: str) -> Op["GlobalConfig"]: - return create_config(p, url) - - def parse_cc_public(self, contents: str, rpc: BitcoinRPC) -> CCWallet: - return CCWallet.from_io(io.StringIO(contents), rpc) - - def rpc_wallet_create(self, *args, **kwargs): - return rpc_wallet_create(*args, **kwargs) - - def discover_rpc(self, *args, **kwargs) -> Op[BitcoinRPC]: - return discover_rpc(*args, **kwargs) - - def has_gpg(self) -> bool: - return bool(_get_gpg_command()) - - def has_pass(self) -> bool: - return _get_stdout("which pass")[0] == 0 - - def suggested_config_path(self, use_gpg: bool = False) -> str: - return get_path_for_new_config(use_gpg) - - def get_utxos(self, rpcw): - return get_utxos(rpcw) - - def prepare_send(self, *args, **kwargs) -> str: - return _prepare_send(*args, **kwargs) - - def psbt_to_tx_hex(self, *args, **kwargs) -> str: - return _psbt_to_tx_hex(*args, **kwargs) - - def confirm_broadcast(self, *args, **kwargs) -> bool: - return confirm_broadcast(*args, **kwargs) - - -@dataclass -class GlobalConfig: - """Coldcore-specific configuration.""" - - loaded_from: str - raw_config: ConfigParser - bitcoind_json_url: Op[str] = None - default_wallet: Op[str] = None - stdout: t.IO = sys.stdout - stderr: t.IO = sys.stderr - wizard_controller: WizardController = WizardController() - - def rpc(self, wallet: Op[Wallet] = None, **kwargs) -> BitcoinRPC: - wall_rpc = wallet.bitcoind_json_url if wallet else None - - return get_rpc( - # The ordering of RPC preference is important here. - cli.args.rpc or wall_rpc or self.bitcoind_json_url, - wallet, - **kwargs, - ) - - def exit(self, code): - # To be overridden in unittests. - sys.exit(code) - - @classmethod - def from_ini( - cls, loaded_from: str, conf: ConfigParser - ) -> t.Tuple["GlobalConfig", t.List[Wallet]]: - sect = conf["default"] - c = cls( - loaded_from, - conf, - sect.get("bitcoind_json_url"), - sect.get("default_wallet"), - ) - wallets = [] - - for key in conf.sections(): - if key == "default": - continue - - net_name = "mainnet" - WalletClass = CCWallet # FIXME when we support more wallets - - if conf[key].get("xpub", "").startswith("tpub"): - net_name = TESTNET - # TODO RPC for one wallet config may not necessarily be available, - # but anothers' might work. Ensure this doesn't crash. - rpc = c.rpc(net_name=net_name) - - try: - wallets.append(WalletClass.from_ini(key, rpc, conf)) - except Exception: - # TODO flash a warning in the UI that we couldn't read the config - msg = f"Unable to read config section '{key}'" - logger.exception(msg) - F.warn(msg) - - return (c, wallets) - - @classmethod - def write_blank(cls, outfile: t.IO, bitcoind_json_url: Op[str] = ""): - """Write a blank configuration file.""" - outfile.write(_get_blank_conf(bitcoind_json_url)) - p = Path(outfile.name) - - # Ensure that the created file is only readable by the owner. - if p.exists(): - if platform.system() == "Windows": - F.warn("Before continuing, please ensure the configuration file") - F.warn("is only readable by your Windows user account.") - else: - _sh(f"chmod 600 {p}") - - def add_new_wallet(self, w: Wallet): - logger.info("Adding new wallet to config: %s", w.as_ini_dict) - self.raw_config[w.name] = w.as_ini_dict - - def write(self): - """Save the contents of this config to an INI file on disk.""" - if _is_pass_path(self.loaded_from): - to_path = self.loaded_from.split(PASS_PREFIX)[-1] - passobj = Pass() - content = io.StringIO() - self.raw_config.write(content) - content.seek(0) - passobj.write(to_path, content.read()) - - elif self.loaded_from.endswith(".gpg"): - gpg = GPG() - content = io.StringIO() - self.raw_config.write(content) - content.seek(0) - gpg.write(self.loaded_from, content.read()) - - else: - with open(self.loaded_from, "w") as f: - self.raw_config.write(f) - - logger.info(f"Wrote configuration to {self.loaded_from}") - - -def _get_blank_conf(bitcoind_json_url: Op[str] = "") -> str: - return textwrap.dedent( - f""" - [default] - - # If blank, this will default to something like - # http://localhost:8332 - # You can specify non-localhosts like - # http://your_rpc_user:rpcpassword@some_host:8332/ - bitcoind_json_url = {bitcoind_json_url or ''} - - # This corresponds to one of the wallet sections listed below, - # and will be used for commands where a single wallet is required - # but unspecified. - default_wallet = - """ - ) - - -# --- Bitcoin RPC utilities --------------------------------------------------- -# ----------------------------------------------------------------------------- - - -def discover_rpc( - config: Op[GlobalConfig] = None, url: Op[str] = None -) -> Op[BitcoinRPC]: - """Return an RPC connection to Bitcoin if possible.""" - service_url = None - - if cli.args.rpc: - service_url = cli.args.rpc - elif config: - service_url = config.bitcoind_json_url - elif url: - service_url = url - - for i in (MAINNET, TESTNET): - try: - logger.info(f"trying RPC for {i} at {service_url}") - rpc = get_rpc(service_url, net_name=i) - rpc.help() - logger.info(f"found RPC connection at {rpc.url}") - except Exception: - logger.debug("couldn't connect to Core RPC", exc_info=True) - else: - return rpc - return None - - -def _is_already_loaded_err(e: JSONRPCError) -> bool: - msg = str(e).lower() - return ( - ('already loaded' in msg) or - ('duplicate -wallet filename' in msg) or - ('database already exists' in msg)) - - -def get_rpc( - url: Op[str] = None, - wallet: Op[Wallet] = None, - **kwargs, -) -> BitcoinRPC: - """ - Get a connection to some Bitcoin JSON RPC server. Handles connection caching. - - If connecting to a wallet, ensure the wallet is loaded. - """ - if not hasattr(get_rpc, "_rpc_cache"): - setattr(get_rpc, "_rpc_cache", {}) - cache = get_rpc._rpc_cache # type: ignore - - wallet_name = wallet.bitcoind_name if wallet else "" - - # XXX str(kwargs) is sort of a hack, but it encompasses net_name. Maybe think of a - # better way to do this. - cache_key = (wallet_name, url, str(kwargs)) - - if cache_key in cache: - return cache[cache_key] - - if not wallet: - got = _get_rpc_inner(url, **kwargs) - cache[cache_key] = got - else: - plain_rpc = _get_rpc_inner(url, net_name=wallet.net_name, **kwargs) - try: - # We have to ensure the wallet is loaded before accessing its - # RPC. - plain_rpc.loadwallet(wallet.bitcoind_name) - except JSONRPCError as e: - # Wallet already loaded. - if not _is_already_loaded_err(e): - raise - cache[cache_key] = _get_rpc_inner( - url, net_name=wallet.net_name, wallet_name=wallet.bitcoind_name, **kwargs - ) - - return cache[cache_key] - - -def _get_rpc_inner( - url: Op[str] = None, timeout: int = (60 * 5), **kwargs -) -> BitcoinRPC: - return BitcoinRPC( - url, - timeout=timeout, - debug_stream=(sys.stderr if cli.args.debug else None), - **kwargs, - ) - - -# --- Wallet/transaction utilities -------------------------------------------- -# ----------------------------------------------------------------------------- - - -def _get_stdout(*args, **kwargs) -> t.Tuple[int, bytes]: - """Return (returncode, stdout as bytes).""" - kwargs["shell"] = True - kwargs["capture_output"] = True - result = subprocess.run(*args, **kwargs) - logger.info(f"sh: {args[0]} -> {result.returncode}") - return (result.returncode, result.stdout) - - -def _sh(*args, **kwargs) -> subprocess.CompletedProcess: - kwargs.setdefault("shell", True) - result = subprocess.run(*args, **kwargs) - logger.info(f"sh: {args[0]} -> {result.returncode}") - return result - - -def rpc_wallet_create(rpc: BitcoinRPC, wall: Wallet): - try: - rpc.createwallet(wall.bitcoind_name, True) - except JSONRPCError as e: - if not _is_already_loaded_err(e): - # Wallet already exists; ok. - raise - - -def get_utxos(rpcw: BitcoinRPC) -> t.Dict[UtxoId, "UTXO"]: - return { - u.id: u - for u in UTXO.from_listunspent(rpcw.listunspent(0)) # includes unconfirmed - } - - -def _prepare_send( - config: GlobalConfig, - rpcw: BitcoinRPC, - to_address: str, - amount: str, - spend_from: Op[t.List[str]], -): - vins = [] - - if spend_from: - utxos = UTXO.from_listunspent(rpcw.listunspent(0)) - addrs = {u.address for u in utxos} - unknown_addrs = set(spend_from) - addrs - - for addr in unknown_addrs: - # TODO should fail? - F.warn(f"WARNING: address '{addr}' not in wallet") - - for u in utxos: - if u.address in spend_from: - vins.append({"txid": u.txid, "vout": u.vout}) - - try: - result = rpcw.walletcreatefundedpsbt( - vins, # inputs for txn (manual coin control) - [{to_address: amount}], - 0, # locktime - {"includeWatching": True}, # options; 'feeRate'? - True, # bip32derivs - include BIP32 derivation paths for pubkeys if known - ) - except Exception as e: - # error code: -5 indicates bad address; handle that. - if e.error.get("code") == -5: # type: ignore - F.warn(f"Bad address specified: {e}") - return False - raise - - nowstr = datetime.datetime.now().strftime("%Y%m%d-%H%M") - filename = f"unsigned-{nowstr}.psbt" - Path(filename).write_bytes(base64.b64decode(result["psbt"])) - info = rpcw.decodepsbt(result["psbt"]) - num_inputs = len(info["inputs"]) - num_outputs = len(info["outputs"]) - - # Get info from psbt - psbtinfo = info - tx = info["tx"] - outs = tx["vout"] - total_in_amt = 0.0 - total_out_amt = 0.0 - change = 0.0 - - # Add up total input amount - for i in psbtinfo["inputs"]: - # TODO does this mean we only support segwit transactions? - wit = i["witness_utxo"] - amt = float(wit["amount"]) - total_in_amt = total_in_amt + amt - - # Add up total output amount - for o in outs: - addr = o['scriptPubKey']['addresses'][0] - try: - addr_info = rpcw.getaddressinfo(addr) - except Exception: - # TODO handle this - raise - amt = float(o["value"]) - total_out_amt = total_out_amt + amt - yours = addr_info["ismine"] or addr_info["iswatchonly"] - if yours: - change = change + amt - - fee = result["fee"] - perc = (fee / Decimal(amount)) * 100 - - F.info(f"total output amount: {total_out_amt} BTC") - F.info(f"{num_inputs} inputs, {num_outputs} outputs") - F.info(f"network fee: {result['fee']} BTC ({perc:.2f}% of amount)") - F.info(f"change back: {change} BTC") - F.info("outputs:") - - # Display outputs - for o in outs: - addr = o['scriptPubKey']['addresses'][0] - try: - addr_info = rpcw.getaddressinfo(addr) - except Exception: - # TODO handle this - raise - display_amt = bold(green(f"{o['value']} BTC")) - yours = addr_info["ismine"] or addr_info["iswatchonly"] - yours_str = " (your address)" if yours else "" - F.blank(f" -> {bold(addr)} ({display_amt}){yours_str}") - - F.p() - F.done(f"wrote PSBT to {filename} - sign with coldcard") - - return filename - - -def _psbt_to_tx_hex(rpcw: BitcoinRPC, psbt_path: Path) -> str: - content: bytes = psbt_path.read_bytes().strip() - - # Handle signed TX as raw binary. - if content[0:5] == b"psbt\xff": - to_ascii = base64.b64encode(content).decode() - # TODO handle errors - # a KeyError here means maybe we were trying to broadcast an unsigned psbt - return rpcw.finalizepsbt(to_ascii)["hex"] - - # Handle signed TX as base64. - elif content[0:6] == b"cHNidP": - # TODO handle errors - # a KeyError here means maybe we were trying to broadcast an unsigned psbt - return rpcw.finalizepsbt(content.decode())["hex"] - - # Handle signed TX as hex. - elif _can_decode_transaction(rpcw, content.decode()): - return content.decode() - - raise ValueError("unrecognized signed PSBT format") - - -def _can_decode_transaction(rpc: BitcoinRPC, tx_hex: str) -> bool: - try: - got = rpc.decoderawtransaction(tx_hex) - assert got["txid"] - except Exception: - return False - return True - - -def confirm_broadcast(rpcw: BitcoinRPC, hex_val: str, psbt_hex: str) -> bool: - """Display information about the transaction to be performed and confirm.""" - info = rpcw.decoderawtransaction(hex_val) - psbtinfo = rpcw.decodepsbt(psbt_hex) - outs: t.List[t.Tuple[str, Decimal]] = [] - - for out in info["vout"]: - spk = out["scriptPubKey"] - if 'addresses' in spk: - # XXX unsure if this is necessary, but might be for older versions of Core. - addrs = ",".join(spk["addresses"]) - elif 'address' in spk: - addrs = spk["address"] - - outs.append((addrs, out["value"])) - - F.alert("About to send a transaction:\n") - - for i in psbtinfo["inputs"]: - # TODO does this mean we only support segwit transactions? - wit = i["witness_utxo"] - amt = wit["amount"] - address = wit["scriptPubKey"]["address"] - - amt = bold(red(f"{amt} BTC")) - F.blank(f" <- {address} ({amt})") - - F.p() - - for o in outs: - try: - addr_info = rpcw.getaddressinfo(o[0]) - except Exception: - print(f"Couldn't get info for output: {o}") - # TODO handle this - raise - - amt = bold(green(f"{o[1]} BTC")) - yours = addr_info["ismine"] or addr_info["iswatchonly"] - yours_str = " (your address)" if yours else "" - F.blank(f" -> {bold(o[0])} ({amt}){yours_str}") - - print() - - inp = input(f" {yellow('?')} look okay? [y/N]: ").strip().lower() - - if inp != "y": - return False - return True - - -# --- Config management and storage utilities --------------------------------- -# ----------------------------------------------------------------------------- - - -class Pass: - """Access to pass, the password store.""" - - def write(cls, path: str, content: str) -> bool: - """Return True if write successful.""" - # TODO maybe detect whether or not we're overwriting and warn - F.alert(f"Requesting to write to pass: {path}") - logger.info(f"Writing to pass: {path}") - proc = subprocess.Popen( - f"pass insert -m -f {path}", - shell=True, - stdin=subprocess.PIPE, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - ) - proc.communicate(content.encode()) - return proc.returncode == 0 - - def read(self, path: str, action: str = "Requesting to read") -> Op[str]: - """Return None if path doesn't exist.""" - F.alert(f"{action} from pass: {path}") - logger.info(f"Reading from pass: {path}") - retcode, conf_str = _get_stdout(f"pass show {path}") - if retcode != 0: - return None - return conf_str.decode().strip() - - -class GPG: - """Access to GPG.""" - - def __init__(self): - self.gpg_path: Op[str] = _get_gpg_command() - - def write(self, path: str, content: str) -> bool: - """Return True if write successful.""" - logger.info(f"Writing to GPG: {path}") - gpg_key = find_gpg_default_key() - gpg_mode = f"-e -r {gpg_key}" - - if not gpg_key: - F.info( - "No default-key present; encrypting to GPG using a passphrase", - ) - F.info( - "(to use a default key, set envvar COLDCORE_GPG_KEY " - "or default-key in gpg.conf)" - ) - gpg_mode = "-c" - - with open(path, "w") as f: - proc = subprocess.Popen( - f"{self.gpg_path} {gpg_mode}", - shell=True, - stdout=f, - stdin=subprocess.PIPE, - ) - proc.communicate(content.encode()) - - return proc.returncode == 0 - - def read(self, path: str) -> Op[str]: - p = Path(path) - if not p.exists(): - logger.warning(f"tried to read from GPG path {p} that doesn't exist") - return None - - logger.info(f"Reading from GPG: {path}") - (retcode, content) = _get_stdout(f"{self.gpg_path} -d {p}") - - if retcode == 0: - return content.decode().strip() - - logger.warning(f"failed to read GPG path {p}, returncode: {retcode}") - return None - - -def find_gpg_default_key() -> Op[str]: - """Get the GPG default-key to encrypt with.""" - gpg_conf_path = Path.home() / ".gnupg" / "gpg.conf" - gpg_conf_lines = [] - key = os.environ.get("COLDCORE_GPG_KEY") - if key: - return key - - try: - gpg_conf_lines = gpg_conf_path.read_text().splitlines() - except FileNotFoundError: - pass - - default_key_line = None - try: - [default_key_line] = [ - line for line in gpg_conf_lines if line.startswith("default-key ") - ] - except ValueError: - pass - - if not default_key_line: - logger.info( - f"Must set `default-key` in {gpg_conf_path} or " - "use COLDCORE_GPG_KEY envvar, otherwise don't know " - "what to encrypt with.", - ) - return None - - return default_key_line.split("default-key ")[-1] - - -CONFIG_DIR = Path.home() / ".config" / "coldcore" - - -# TODO move config backend to prefix system - - -def _get_gpg_command() -> Op[str]: - """Find the version, if any, of GPG installed.""" - if _get_stdout("which gpg2")[0] == 0: - return "gpg2" - elif _get_stdout("which gpg")[0] == 0: - return "gpg" - return None - - -def get_path_for_new_config(use_gpg=False) -> str: - """Returns the suggested path for a new configuration.""" - # FIXME: prefix backends - gpg = _get_gpg_command() - if gpg and use_gpg: - return str(CONFIG_DIR / "config.ini.gpg") - return str(CONFIG_DIR / "config.ini") - - -def find_default_config() -> Op[str]: - """ - Find an existing default configuration file. We do this - (vs. get_path_for_new_config) because a user may have created a configuration file - and then installed GPG. - """ - # Prefer GPG configs - for ext in (".gpg", ""): - path = CONFIG_DIR / ("config.ini" + ext) - if path.exists(): - return str(path) - return None - - -def _is_pass_path(p: Op[str]) -> bool: - return bool(p) and p.startswith(PASS_PREFIX) # type: ignore - - -def create_config(conf_path, bitcoind_json_url: str) -> Op[GlobalConfig]: - """ - Write a new global config file out using some storage backend. - """ - if not CONFIG_DIR.exists(): - CONFIG_DIR.mkdir(mode=0o700, parents=True, exist_ok=True) - - confp = ConfigParser() - - def confirm_overwrite() -> bool: - if Path(conf_path).exists(): - prompt = ( - f" ? Are you sure you want to overwrite " - f"the existing file at {conf_path}? [y/N] " - ) - return input(prompt).lower() == "y" - return True - - # Optionally, create the configuration in `pass`. - if _is_pass_path(conf_path): - passobj = Pass() - passpath = conf_path.split(PASS_PREFIX, 1)[-1] - msg = f"Creating blank configuration at {yellow(conf_path)}" - logger.info(msg) - F.info(msg) - contents = _get_blank_conf(bitcoind_json_url) - # config doesn't exist, so insert it - if not passobj.write(passpath, contents): - print(f"Failed to write new configuration to {conf_path}") - return None - - confp.read_string(contents) - - # Or within GPG - elif conf_path.endswith(".gpg"): - gpg = GPG() - if not confirm_overwrite(): - return None - msg = f"Creating blank configuration at {conf_path}" - logger.info(msg) - F.info(msg) - contents = _get_blank_conf(bitcoind_json_url) - # config doesn't exist, so insert it - if not gpg.write(conf_path, contents): - print(f"Failed to write new configuration to {conf_path}") - return None - confp.read_string(contents) - - # Or just write it to some file path. - else: - logger.info(f"Creating blank configuration at {conf_path}") - if not confirm_overwrite(): - return None - - F.warn("WARNING: creating an unencrypted configuration file.") - F.warn("Please consider installing GPG and/or pass to support config file ") - F.warn("encryption. If someone gains access to your xpubs, they can ") - F.warn("see all of your addresses.") - - with open(conf_path, "w") as f: - GlobalConfig.write_blank(f, bitcoind_json_url) - - confp.read(conf_path) - - return GlobalConfig.from_ini(conf_path, confp)[0] - - -def _get_config_required(*args, **kwargs) -> t.Tuple[GlobalConfig, t.List[Wallet]]: - ret = _get_config(*args, **kwargs) - if not ret[0]: - F.warn("Please ensure this file is readable or run `coldcore` -> setup") - sys.exit(1) - - return ret # type: ignore - - -def _get_config( - wallet_names: Op[t.List[str]] = None, - bitcoind_json_url: str = "", - require_wallets: bool = True, -) -> t.Tuple[Op[GlobalConfig], Op[t.List[Wallet]]]: - """ - Load in coldcore config from some source. - - Return the config and a list of loaded wallets. The config's default_wallet will - be the first item in the list. - """ - confp = ConfigParser() - conf_path = cli.args.config or os.environ.get( - "COLDCORE_CONFIG", find_default_config() - ) - none = (None, None) - - if not conf_path: - return none - - def fail(): - F.warn(f"Failed to read config from {conf_path}") - - # Optionally, read the configuration from `pass`. - if _is_pass_path(conf_path): - passobj = Pass() - passpath = conf_path.split(PASS_PREFIX, 1)[-1] - contents = passobj.read(passpath, action="Requesting to load configuration INI") - - if not contents: - fail() - return none - - confp.read_string(contents) - - # Or read from GPG - elif conf_path.endswith(".gpg"): - gpg = GPG() - F.alert(f"Reading configuration from {conf_path} with GPG") - contents = gpg.read(conf_path) - - if not contents: - fail() - return none - - confp.read_string(contents) - - # Or just read it from some file path. - else: - if not Path(conf_path).exists(): - fail() - return none - confp.read(conf_path) - - (conf, wallet_confs) = GlobalConfig.from_ini(conf_path, confp) - - logger.debug("loaded with config: %s", conf) - logger.debug("loaded with wallets: %s", wallet_confs) - - unrecog_wallets = set(wallet_names or []) - set(w.name for w in wallet_confs) - if unrecog_wallets: - F.warn("Unrecognized wallet names: {', '.join(unrecog_wallets)}") - conf.exit(1) - - if wallet_names: - wallet_confs = [w for w in wallet_confs if w.name in wallet_names] - - default_wallet = cli.args.wallet or conf.default_wallet - - # Return the default wallet first. - wallet_confs = sorted( - wallet_confs, key=lambda w: w.name == default_wallet, reverse=True - ) - - if require_wallets and not wallet_confs: - F.warn("At least one wallet config is required but none were found.") - F.warn("Try running `coldcore setup --help` to set up a wallet") - sys.exit(1) - - return (conf, wallet_confs) - - -def main(): - cli.parse_for_run() - log_path = setup_logging() - cli.run() - - if log_path: - F.warn( - f"WARNING: remove logfiles at {log_path} to prevent leaking sensitive data", - ) - - -if __name__ == "__main__": - main() diff --git a/src/coldcore/thirdparty/__init__.py b/src/coldcore/thirdparty/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/coldcore/thirdparty/bitcoin_rpc.py b/src/coldcore/thirdparty/bitcoin_rpc.py deleted file mode 100644 index 803a87f..0000000 --- a/src/coldcore/thirdparty/bitcoin_rpc.py +++ /dev/null @@ -1,301 +0,0 @@ -# Copyright (C) 2007 Jan-Klaas Kollhof -# Copyright (C) 2011-2018 The python-bitcoinlib developers -# Copyright (C) 2020 James O'Beirne -# -# This section is part of python-bitcoinlib. -# -# It is subject to the license terms in the LICENSE file found in the top-level -# directory of the python-bitcoinlib distribution. -# -# No part of python-bitcoinlib, including this section, may be copied, modified, -# propagated, or distributed except according to the terms contained in the -# LICENSE file. - -import logging -import os -import base64 -import http.client as httplib -import json -import platform -import urllib.parse as urlparse -import socket -import re -import time -import http.client -import typing as t -from typing import IO, Optional as Op -from decimal import Decimal - -DEFAULT_USER_AGENT = "AuthServiceProxy/0.1" -DEFAULT_HTTP_TIMEOUT = 30 - - -logger = logging.getLogger("rpc") -logger.setLevel(logging.DEBUG) -# logger.addHandler(logging.StreamHandler()) - - -class JSONRPCError(Exception): - """JSON-RPC protocol error base class - Subclasses of this class also exist for specific types of errors; the set - of all subclasses is by no means complete. - """ - - def __init__(self, rpc_error): - super(JSONRPCError, self).__init__( - "msg: %r code: %r" % (rpc_error["message"], rpc_error["code"]) - ) - self.error = rpc_error - - -class BitcoinRPC(object): - """Base JSON-RPC proxy class. Contains only private methods; do not use - directly.""" - - def __init__( - self, - service_url=None, - service_port=None, - btc_conf_file=None, - net_name=None, - timeout=DEFAULT_HTTP_TIMEOUT, - debug_stream: Op[IO] = None, - wallet_name=None, - ): - - self.debug_stream = debug_stream - authpair = None - net_name = net_name or "mainnet" - self.timeout = timeout - self.net_name = net_name - - # Figure out the path to the bitcoin.conf file - if btc_conf_file is None: - if platform.system() == "Darwin": - btc_conf_file = os.path.expanduser( - "~/Library/Application Support/Bitcoin/" - ) - elif platform.system() == "Windows": - btc_conf_file = os.path.join(os.environ["APPDATA"], "Bitcoin") - else: - btc_conf_file = os.path.expanduser("~/.bitcoin") - btc_conf_file = os.path.join(btc_conf_file, "bitcoin.conf") - - if not service_url: - # Bitcoin Core accepts empty rpcuser, not specified in btc_conf_file - conf = self._get_bitcoind_conf_from_filesystem(btc_conf_file) - if service_port is None: - service_port = { - "mainnet": 8332, - }.get(net_name, 18332) - - conf["rpcport"] = int(conf.get("rpcport", service_port)) # type: ignore - conf["rpchost"] = conf.get("rpcconnect", "localhost") - - service_url = f"http://{conf['rpchost']}:{conf['rpcport']}" - - authpair = self._get_bitcoind_cookie_authpair(conf, btc_conf_file, net_name) - else: - url = urlparse.urlparse(service_url) - authpair = "%s:%s" % (url.username or "", url.password or "") - - # Do our best to autodetect testnet. - if url.port: - self.net_name = net_name = ( - "testnet3" if url.port == 18332 else "mainnet" - ) - - # Try and pull in auth information from the filesystem if it's missing. - if authpair == ":": - conf = self._get_bitcoind_conf_from_filesystem(btc_conf_file) - authpair = self._get_bitcoind_cookie_authpair( - conf, btc_conf_file, net_name - ) - logger.debug("pulling authpair from cookie despite intaking URL") - - if wallet_name: - service_url = service_url.rstrip("/") - service_url += f"/wallet/{wallet_name}" - - logger.info(f"Connecting to bitcoind: {service_url}") - self.url = service_url - - # Credential redacted - self.public_url = re.sub(r":[^/]+@", ":***@", self.url, 1) - self._parsed_url = urlparse.urlparse(service_url) - self.host = self._parsed_url.hostname - - logger.info(f"Initializing RPC client at {self.public_url}") - # XXX keep for debugging, but don't ship: - # logger.info(f"[REMOVE THIS] USING AUTHPAIR {authpair}") - - if self._parsed_url.scheme not in ("http",): - raise ValueError("Unsupported URL scheme %r" % self._parsed_url.scheme) - - self.__id_count = 0 - - self.__auth_header = None - if authpair: - self.__auth_header = b"Basic " + base64.b64encode(authpair.encode("utf8")) - - def _get_bitcoind_conf_from_filesystem(self, btc_conf_file: str) -> t.Dict: - conf = {"rpcuser": ""} - - # Extract contents of bitcoin.conf to build service_url - try: - with open(btc_conf_file, "r") as fd: - for line in fd.readlines(): - if "#" in line: - line = line[: line.index("#")] - if "=" not in line: - continue - k, v = line.split("=", 1) - conf[k.strip()] = v.strip() - - # Treat a missing bitcoin.conf as though it were empty - except FileNotFoundError: - pass - - return conf - - def _get_bitcoind_cookie_authpair( - self, conf: dict, btc_conf_file: str, net_name: str - ) -> t.Optional[str]: - """Get an authpair from the cookie or configuration files.""" - authpair = "" - cookie_dir = conf.get("datadir", os.path.dirname(btc_conf_file)) - if net_name != "mainnet": - cookie_dir = os.path.join(cookie_dir, net_name) - cookie_file = os.path.join(cookie_dir, ".cookie") - try: - with open(cookie_file, "r") as fd: - authpair = fd.read() - logger.debug("read authpair from cookie") - except (IOError, FileNotFoundError) as err: - logger.debug("couldn't read authpair from cookie", exc_info=True) - if "rpcpassword" in conf: - authpair = "%s:%s" % (conf["rpcuser"], conf["rpcpassword"]) - logger.debug("read authpair from conf") - else: - raise ValueError( - "Cookie file unusable (%s) and rpcpassword not specified " - "in the configuration file: %r" % (err, btc_conf_file) - ) - - return authpair - - @property - def port(self) -> int: - if self._parsed_url.port is None: - return httplib.HTTP_PORT - else: - return self._parsed_url.port - - def _getconn(self, timeout=None): - return httplib.HTTPConnection( - self._parsed_url.hostname, port=self.port, timeout=timeout, - ) - - def _call(self, service_name, *args, **kwargs): - self.__id_count += 1 - kwargs.setdefault('timeout', self.timeout) - - postdata = json.dumps( - { - "version": "1.1", - "method": service_name, - "params": args, - "id": self.__id_count, - } - ) - - logger.debug(f"[{self.public_url}] calling %s%s", service_name, args) - - headers = { - "Host": self._parsed_url.hostname, - "User-Agent": DEFAULT_USER_AGENT, - "Content-type": "application/json", - } - - if self.__auth_header is not None: - headers["Authorization"] = self.__auth_header - - path = self._parsed_url.path - tries = 5 - backoff = 0.3 - while tries: - try: - conn = self._getconn(timeout=kwargs['timeout']) - conn.request("POST", path, postdata, headers) - except (BlockingIOError, http.client.CannotSendRequest, socket.gaierror): - logger.exception( - f"hit request error: {path}, {postdata}, {self._parsed_url}" - ) - tries -= 1 - if not tries: - raise - time.sleep(backoff) - backoff *= 2 - else: - break - - response = self._get_response(conn) - err = response.get("error") - if err is not None: - if isinstance(err, dict): - raise JSONRPCError( - { - "code": err.get("code", -345), - "message": err.get("message", "error message not specified"), - } - ) - raise JSONRPCError({"code": -344, "message": str(err)}) - elif "result" not in response: - raise JSONRPCError({"code": -343, "message": "missing JSON-RPC result"}) - else: - return response["result"] - - def _get_response(self, conn): - http_response = conn.getresponse() - if http_response is None: - raise JSONRPCError( - {"code": -342, "message": "missing HTTP response from server"} - ) - - rdata = http_response.read().decode("utf8") - try: - loaded = json.loads(rdata, parse_float=Decimal) - logger.debug(f"[{self.public_url}] -> {loaded}") - return loaded - except Exception: - raise JSONRPCError( - { - "code": -342, - "message": ( - "non-JSON HTTP response with '%i %s' from server: '%.20s%s'" - % ( - http_response.status, - http_response.reason, - rdata, - "..." if len(rdata) > 20 else "", - ) - ), - } - ) - - def __getattr__(self, name): - if name.startswith("__") and name.endswith("__"): - # Prevent RPC calls for non-existing python internal attribute - # access. If someone tries to get an internal attribute - # of RawProxy instance, and the instance does not have this - # attribute, we do not want the bogus RPC call to happen. - raise AttributeError - - # Create a callable to do the actual call - def _call_wrapper(*args, **kwargs): - return self._call(name, *args, **kwargs) - - # Make debuggers show rather than > - _call_wrapper.__name__ = name - return _call_wrapper diff --git a/src/coldcore/thirdparty/clii.py b/src/coldcore/thirdparty/clii.py deleted file mode 100644 index 16b0c6a..0000000 --- a/src/coldcore/thirdparty/clii.py +++ /dev/null @@ -1,269 +0,0 @@ -""" -clii - -The easiest damned argparse wrapper there ever was. - - -Copyright 2020 James O'Beirne - -Permission is hereby granted, free of charge, to any person obtaining a copy of -this software and associated documentation files (the "Software"), to deal in -the Software without restriction, including without limitation the rights to -use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies -of the Software, and to permit persons to whom the Software is furnished to do -so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. -""" -import sys -import argparse -import functools -import inspect -import typing as t -import os -import logging -from textwrap import dedent - - -logger = logging.getLogger("clii") -if os.environ.get("CLII_DEBUG"): - logger.setLevel(logging.DEBUG) - logger.addHandler(logging.StreamHandler()) - - -class Arg: - def __init__( - self, - name_or_flags: t.Union[str, t.Sequence[str]], - type: object = str, - help: str = "", - default: object = inspect.Parameter.empty, - is_kwarg: bool = False, - is_vararg: bool = False, - dest: t.Optional[str] = None, - ): - names: t.List[str] = ( - [name_or_flags] if isinstance(name_or_flags, str) else list(name_or_flags) - ) - - # Store original parameter name unmangled (e.g. no '-' for '_' sub). - self.dest = dest or names[0] - - if is_kwarg: - names = [n.replace("_", "-") for n in names] - - self.name = names[0] - self.all_names = list(names) - self.type = type - self.default = default - self.is_kwarg = is_kwarg - self.is_vararg = is_vararg - self.help = help - - @classmethod - def from_parameter(cls, param: inspect.Parameter, help: str = "") -> "Arg": - type = param.annotation - arg = None - - def is_kwarg(p): - return p.default != inspect.Parameter.empty - - if isinstance(type, cls): - # User already specified an Arg, just use that. - arg = type - arg.is_kwarg = is_kwarg(param) - arg.default = param.default - arg.update_name(param.name) - arg.dest = param.name - return arg - return cls( - param.name, - type=param.annotation, - default=param.default, - help=help, - is_kwarg=is_kwarg(param), - is_vararg=(param.kind == inspect.Parameter.VAR_POSITIONAL), - dest=param.name, - ) - - @classmethod - def from_func(cls, func: t.Callable) -> t.Sequence["Arg"]: - # Ignore `**kwargs`; it can't be sensibly interpreted into flags - params = [ - p for p in _get_func_params(func) if p.kind != inspect.Parameter.VAR_KEYWORD - ] - - helps_from_doc = _get_helps_from_func(func, [p.name for p in params]) - - return tuple( - cls.from_parameter(param, helps_from_doc.get(param.name, "")) - for param in _get_func_params(func) - if - # Ignore `**kwargs`; it can't be sensibly interpreted into flags - param.kind != inspect.Parameter.VAR_KEYWORD - ) - - def add_to_parser(self, parser: argparse.ArgumentParser): - kwargs = dict(default=self.default, type=self.type, help=self.arg_help) - - if self.is_kwarg: - kwargs["dest"] = self.dest - elif self.is_vararg: - kwargs["nargs"] = "*" - kwargs.pop("default", "") - if kwargs.get("type") == inspect.Parameter.empty: - kwargs.pop("type") - - if self.type == bool or any(self.default is i for i in [True, False]): - kwargs["action"] = "store_false" if self.default else "store_true" - kwargs.pop("type", "") - - logger.debug(f"Attaching argument: {self.names} -> {kwargs}") - parser.add_argument(*self.names, **kwargs) # type: ignore - - def update_name(self, name: str): - if name not in self.all_names: - self.all_names.insert(0, name) - else: - assert self.all_names[0] == name - - self.name = name - - @property - def names(self) -> t.Tuple[str, ...]: - if not self.is_kwarg: - return (self.name,) - - assert all(i.startswith("-") for i in self.all_names[1:]) - assert self.name == self.all_names[0] - return (f"--{self.name}",) + tuple(self.all_names[1:]) - - @property - def arg_help(self) -> str: - out = self.help or "" - if self.default is not inspect.Parameter.empty: - if out: - out += ". " - out += f"default: {self.default}" - return out - - -def _get_func_params(func) -> t.List[inspect.Parameter]: - return list(inspect.signature(func).parameters.values()) - - -def _get_helps_from_func(func, param_names) -> t.Dict[str, str]: - if not func.__doc__: - return {} - - helps_from_doc = {} - - for line in dedent(func.__doc__).splitlines(): - for p in param_names: - patt = f" {p}:" - - if patt in line: - helps_from_doc[p] = line.split(patt)[-1].strip() - - return helps_from_doc - - -class App: - def __init__(self, *args, **kwargs): - self.parser = argparse.ArgumentParser(*args, **kwargs) - self.subparsers = None - self.args = argparse.Namespace() - - def add_arg(self, *args, **kwargs): - self.parser.add_argument(*args, **kwargs) - return self.parser - - add_argument = add_arg - - def main(self, fnc): - self.parser.set_defaults(func=fnc) - - for arg in Arg.from_func(fnc): - arg.add_to_parser(self.parser) - - if not self.parser.description: - self.parser.description = fnc.__doc__ - - @functools.wraps(fnc) - def wrapper(*args, **kwargs): - return fnc(*args, **kwargs) - - return wrapper - - def cmd(self, fnc) -> t.Callable: - if not self.subparsers: - self.subparsers = self.parser.add_subparsers() - - desc = fnc.__doc__ or "" - doclines = [] - - for line in desc.splitlines(): - if line.strip().lower() in ["args:", "kwargs;"]: - break - doclines.append(line) - - sub = self.subparsers.add_parser( - fnc.__name__.replace("_", "-"), - description="\n".join(doclines), - ) - logger.debug("Added subparser: %s", sub) - - for arg in Arg.from_func(fnc): - arg.add_to_parser(sub) - logger.debug(" Adding argument: %s", arg) - - sub.set_defaults(func=fnc) - - @functools.wraps(fnc) - def wrapper(*args, **kwargs): - return fnc(*args, **kwargs) - - return wrapper - - def parse_for_run(self) -> t.Tuple[t.Callable, t.Tuple[t.List, t.Dict]]: - self.args = self.parser.parse_args() - args = vars(self.args) - logger.debug("Parsed args: %s", args) - fnc = args.pop("func", None) - - if not fnc: - self.parser.print_help() - sys.exit(1) - - func_args = [] - func_kwargs = {} - building_kwargs = False - - # Only pull in those parameters which `fnc` accepts, since the - # global parser may have supplied more. - for p in _get_func_params(fnc): - if p.kind == inspect.Parameter.KEYWORD_ONLY: - building_kwargs = True - - if building_kwargs: - func_kwargs[p.name] = args[p.name] - elif p.kind == inspect.Parameter.VAR_POSITIONAL: - func_args.extend(args[p.name]) - else: - func_args.append(args[p.name]) - - return (fnc, (func_args, func_kwargs)) - - def run(self): - (fnc, (func_args, func_kwargs)) = self.parse_for_run() - - return fnc(*func_args, **func_kwargs) diff --git a/src/coldcore/thirdparty/py.typed b/src/coldcore/thirdparty/py.typed deleted file mode 100644 index e69de29..0000000 diff --git a/src/coldcore/ui.py b/src/coldcore/ui.py deleted file mode 100644 index ddc8599..0000000 --- a/src/coldcore/ui.py +++ /dev/null @@ -1,1245 +0,0 @@ -import curses -import contextlib -import typing as t -import logging -import textwrap -import time -import subprocess -import sys -import traceback -import socket -import threading -import platform -import base64 -import datetime -import os -import json -import decimal -import string -from dataclasses import dataclass -from pathlib import Path -from collections import namedtuple -from curses.textpad import Textbox - -logger = logging.getLogger("ui") - - -class DecimalEncoder(json.JSONEncoder): - def default(self, o): - if isinstance(o, decimal.Decimal): - return str(o) - return super(DecimalEncoder, self).default(o) - - -colr = curses.color_pair -_use_color_no_tty = True - - -def use_color(): - if sys.stdout.isatty(): - return True - if _use_color_no_tty: - return True - return False - - -def open_file_browser(): - plat = platform.system() - - if plat == "Linux": - cmd = "xdg-open ." - elif plat == "Darwin": - cmd = "open ." - elif plat == "Windows": - cmd = "explorer ." - - subprocess.Popen( - cmd, - shell=True, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - ) - - -def esc(*codes: t.Union[int, str]) -> str: - """Produces an ANSI escape code from a list of integers - :rtype: text_type - """ - return t_("\x1b[{}m").format(t_(";").join(t_(str(c)) for c in codes)) - - -def t_(b: t.Union[bytes, t.Any]) -> str: - """ensure text type""" - if isinstance(b, bytes): - return b.decode() - return b - - -def conn_line(msg: str) -> str: - return green(bold(" ○ ")) + msg - - -# 8 bit Color -############################################################################### -# -# TODO this color stuff was taken from some Github page; track it down and credit -# the authos. - - -def make_color(start, end: str) -> t.Callable[[str], str]: - def color_func(s: str) -> str: - if not use_color(): - return s - - # render - return start + t_(s) + end - - return color_func - - -FG_END = esc(39) -red = make_color(esc(31), FG_END) -green = make_color(esc(32), FG_END) -yellow = make_color(esc(33), FG_END) -blue = make_color(esc(34), FG_END) -cyan = make_color(esc(36), FG_END) -bold = make_color(esc(1), esc(22)) - - -class Action: - pass - - -class Spinner: - def __init__(self): - self.i = -1 - - def spin(self) -> str: - self.i += 1 - return ["◰", "◳", "◲", "◱"][self.i % 4] - - -class OutputFormatter: - def __init__(self): - self.spinner = Spinner() - - def p(self, msg: str = "", clear=False, **kwargs): - if clear: - msg = f"\r{msg}" - else: - msg += "\n" - print(msg, flush=True, file=sys.stderr, end="", **kwargs) - - def task(self, s: str, **kwargs): - self.p(bold(" □ ") + s, **kwargs) - - def blank(self, s: str, **kwargs): - self.p(" " + s, **kwargs) - - def done(self, s: str, **kwargs): - self.p(green(bold(" ✔ ")) + s, **kwargs) - - def alert(self, s: str, **kwargs): - self.p(f" {yellow('!')} " + s, **kwargs) - - def info(self, s: str, **kwargs): - self.p(" -- " + s, **kwargs) - - def inp(self, s: str) -> str: - got = input(yellow(" ? ") + s).strip() - self.p() - return got - - def warn(self, s: str, **kwargs): - self.p(red(bold(" ! ")) + s, **kwargs) - - def spin(self, s: str): - self.p(f" {self.spinner.spin()} {s} ", clear=True) - - def section(self, s: str): - self.p() - self.p(f" {bold('#')} {bold(s)}") - self.p(f" {'-' * len(s)}") - self.p() - - def finish_setup(self, config=None, wallet=None) -> t.Tuple[int, Action]: - self.p() - time.sleep(1) - self.blank(" enjoy your wallet, and remember...") - time.sleep(1.5) - print(textwrap.indent(neversell, " ")) - self.p() - input(" press [enter] to return home ") - return (config, wallet) - - -F = OutputFormatter() - - -class Scene: - def __init__(self, scr, conf, wconfs, controller): - self.scr = scr - self.config = conf - self.wallet_configs = wconfs - self.controller = controller - - def draw(self, k: int) -> t.Tuple[int, Action]: - pass - - -class MenuItem(namedtuple("MenuItem", "idx,title,action")): - def args(self, mchoice): - return (self.idx, self.title, mchoice == self) - - -def run_setup(config, controller) -> t.Tuple[t.Any, t.Any]: - curses.endwin() - os.system("cls" if os.name == "nt" else "clear") - - formatter = OutputFormatter() - p = formatter.p - section = formatter.section - inp = formatter.inp - blank = formatter.blank - warn = formatter.warn - info = formatter.info - done = formatter.done - task = formatter.task - spin = formatter.spin - finish = formatter.finish_setup - - title = cyan( - r""" - . - .o8 - .oooo.o .ooooo. .o888oo oooo oooo oo.ooooo. - d88( "8 d88' `88b 888 `888 `888 888' `88b - `"Y88b. 888ooo888 888 888 888 888 888 - o. )88b 888 .o 888 . 888 888 888 888 - 8""888P' `Y8bod8P' "888" `V88V"V8P' 888bod8P' - 888 - o888o - -""" - ) - p(title) - - blank("searching for Bitcoin Core...") - rpc = controller.discover_rpc(config) - if not rpc: - warn("couldn't detect Bitcoin Core - make sure it's running locally, or") - warn("use `coldcore --rpc `") - sys.exit(1) - - hoststr = yellow(f"{rpc.host}:{rpc.port}") - p(conn_line(f"connected to Bitcoin Core at {hoststr}")) - p() - - def delay(t: float = 1.0): - time.sleep(t) - - use_gpg = False - if not config: - section("coldcore config file setup") - delay() - pre = "you can encrypt your config file with" - - if controller.has_gpg(): - prompt = "do you want to use GPG to encrypt your coldcore config? [y/N] " - if inp(prompt) == "y": - use_gpg = True - - if controller.has_pass(): - info(f"{pre} pass by prefixing your path with 'pass:'") - p() - delay() - - defaultpath = controller.suggested_config_path(use_gpg) - where = inp(f"where should I store your config? [{defaultpath}] ") - where = where or defaultpath - config = controller.create_config(where, rpc.url) - else: - if config.loaded_from.endswith(".gpg"): - use_gpg = True - done(f"loaded config from {yellow(config.loaded_from)}") - - if not config: - warn("Couldn't obtain config. Exiting.") - sys.exit(1) - - p() - - section("Coldcard hardware setup") - - inp( - "have you set up your Coldcard " - "(https://coldcardwallet.com/docs/quick)? [press enter] " - ) - - blank("checking Bitcoin Core sync progres...") - chaininfo = {"verificationprogress": 0} - while chaininfo["verificationprogress"] < 0.999: - try: - chaininfo = rpc.getblockchaininfo() - except Exception: - pass - prog = "%.2f" % (chaininfo["verificationprogress"] * 100) - info(f"initial block download progress: {prog}%", clear=True) - - height = f"(height: {yellow(str(chaininfo['blocks']))})" - done(f"chain sync completed {height} ", clear=True) - delay() - p() - p() - - section("xpub import from Coldcard") - delay() - - blank("now we're going to import your wallet's public information") - blank("on your coldcard, go to Advanced > MicroSD > Dump Summary") - blank("(see: https://coldcardwallet.com/docs/microsd#dump-summary-file)") - p() - delay() - warn("this is not key material, but it can be used to track your addresses") - p() - delay() - cwd = os.getcwd() - task(f"place this file in this directory ({cwd})") - delay() - p() - - pubfilepath = Path("./public.txt") - if not pubfilepath.exists(): - prompt = "would you like me to open a file explorer for you here? [Y/n] " - if inp(prompt).lower() in ["y", ""]: - open_file_browser() - - pubfile = None - while not pubfile: - spin("waiting for public.txt") - time.sleep(0.1) - if pubfilepath.exists(): - pubfile = pubfilepath - - try: - wallet = controller.parse_cc_public(pubfile.read_text(), rpc) - except Exception as e: - p() - if "key 'tpub" in str(e): - warn("it looks like you're using a testnet config with a mainnet rpc.") - warn("rerun this with `coldcore --rpc setup`") - sys.exit(1) - if "key 'xpub" in str(e): - warn("it looks like you're using a mainnet config with a testnet rpc.") - warn("rerun this with `coldcore --rpc setup`") - sys.exit(1) - warn("error parsing public.txt contents") - warn("check your public.txt file and try this again, or file a bug:") - warn(" github.com/jamesob/coldcore/issues") - p() - traceback.print_exc() - sys.exit(1) - - p() - done("parsed xpub as ") - blank(f" {yellow(wallet.descriptor_base)}") - p() - - walletname = inp(f"name of this wallet? [{wallet.name}] ") - if walletname: - wallet.name = walletname - - # Ensure we save the RPC connection we initialized with. - wallet.bitcoind_json_url = rpc.url - config.add_new_wallet(wallet) - - if use_gpg or config.loaded_from.startswith("pass:"): - info( - "writing wallet to encrypted config; GPG may prompt you " - "for your password [press enter] " - ) - input() - - config.write() - done(f"wrote config to {config.loaded_from}") - p() - - section("wallet setup in Core") - controller.rpc_wallet_create(rpc, wallet) - done(f"created wallet {yellow(wallet.name)} in Core as watch-only") - - rpcw = config.rpc(wallet) - rpcw.importmulti(*wallet.importmulti_args()) - done("imported descriptors 0/* and 1/* (change)") - - scan_result = {} # type: ignore - scan_thread = threading.Thread( - target=_run_scantxoutset, - args=(config.rpc(wallet), wallet.scantxoutset_args(), scan_result), - ) - scan_thread.start() - - p() - section("scanning the chain for balance and history") - while scan_thread.is_alive(): - spin("scanning the UTXO set for your balance (few minutes) ") - time.sleep(0.2) - - p() - done("scan of UTXO set complete!") - - # TODO this will fail if we timed out - unspents = scan_result["result"]["unspents"] - bal = sum([i["amount"] for i in unspents]) - bal_str = yellow(bold(f"{bal} BTC")) - bal_count = yellow(bold(f"{len(unspents)} UTXOs")) - blank(f"found an existing balance of {yellow(bal_str)} across {yellow(bal_count)}") - - if unspents: - rescan_begin_height = min([i["height"] for i in unspents]) - p() - blank( - f"beginning chain rescan from height {bold(str(rescan_begin_height))} " - f"(minutes to hours)" - ) - blank(" this allows us to find transactions associated with your coins\n") - rescan_thread = threading.Thread( - target=_run_rescan, - args=(config.rpc(wallet), rescan_begin_height), - daemon=True, - ) - rescan_thread.start() - - time.sleep(2) - - scan_info = rpcw.getwalletinfo()["scanning"] - while scan_info: - spin(f"scan progress: {scan_info['progress'] * 100:.2f}% ") - time.sleep(0.5) - scan_info = rpcw.getwalletinfo()["scanning"] - - name = yellow(wallet.name) - p() - done(f"scan complete. wallet {name} ready to use.") - info(f"Hint: check out your UTXOs with `coldcore -w {wallet.name} balance`") - - p() - - got = inp("do you want to perform some test transactions? [Y/n] ").lower() - - if got not in ["y", ""]: - return finish(config, wallet) - - section("test transactions") - - receive_addr1 = rpcw.getnewaddress() - task("send a tiny amount (we're talking like ~0.000001 BTC) to") - p() - blank(f" {yellow(receive_addr1)}") - p() - blank("(obviously, this is an address you own)") - p() - - got_utxo = None - while not got_utxo: - spin("waiting for transaction") - utxos = controller.get_utxos(rpcw) - matching = [u for u in utxos.values() if u.address == receive_addr1] - if matching: - got_utxo = matching[0] - time.sleep(1) - - p() - done( - f"received amount of {green(str(got_utxo.amount))} " - f"(txid {got_utxo.txid[:8]})" - ) - p() - - info("great - now let's test your ability to send") - info( - "we're going to send 90% of the value of the last UTXO over " - "to a new address:" - ) - sendtoaddr = rpcw.getnewaddress() - p() - blank(f" {yellow(sendtoaddr)}") - p() - - # Send 90% of the value over. - # TODO this is only for testing and is potentially dangerous - send_amt = str(round(((got_utxo.amount * 9) / 10), 8)) - prepared_tx = controller.prepare_send( - config, - rpcw, - sendtoaddr, - send_amt, - [got_utxo.address], - ) - - info( - "I've prepared a transaction for you to sign in a " - f"file called '{prepared_tx}'" - ) - p() - - task("transfer this file to your Coldcard and sign it") - p() - warn( - "as always, remember to verify all transaction details on the Coldcard " - "display" - ) - warn( - "the Coldcard should say something like " - "'Consolidating ... within wallet' when signing" - ) - p() - - prompt = "would you like me to open a file explorer for you here? [Y/n] " - if inp(prompt).lower() in ["y", ""]: - open_file_browser() - - # TODO: coldcard specific? - signed_filename = prepared_tx.replace(".psbt", "-signed.psbt") - - while not Path(signed_filename).exists(): - spin(f"waiting for the signed file ({signed_filename})") - time.sleep(0.5) - - # TODO clean this up - psbt_hex = base64.b64encode(Path(signed_filename).read_bytes()).decode() - txhex = controller.psbt_to_tx_hex(rpcw, Path(signed_filename)) - p() - p() - done("cool! got the signed PSBT") - - if not controller.confirm_broadcast(rpcw, txhex, psbt_hex): - warn("aborting - doublespend the inputs immediately") - return finish(config, wallet) - - rpcw.sendrawtransaction(txhex) - done("transaction broadcast!") - p() - - inmempool = False - while not inmempool: - spin("waiting to see the transaction in the mempool") - utxos = controller.get_utxos(rpcw) - matching = [u for u in utxos.values() if u.address == sendtoaddr] - if matching: - got_utxo = matching[0] - - if got_utxo: - inmempool = True - - p() - done(f"saw tx {got_utxo.txid}") - p() - - section("done") - done(bold(f"your wallet {yellow(wallet.name)} is good to go")) - p() - p() - - return finish(config, wallet) - - -neversell = r""" - $$\ $$\ - $$ |$$ | -$$$$$$$\ $$$$$$\ $$\ $$\ $$$$$$\ $$$$$$\ $$$$$$$\ $$$$$$\ $$ |$$ | -$$ __$$\ $$ __$$\\$$\ $$ |$$ __$$\ $$ __$$\ $$ _____|$$ __$$\ $$ |$$ | -$$ | $$ |$$$$$$$$ |\$$\$$ / $$$$$$$$ |$$ | \__| \$$$$$$\ $$$$$$$$ |$$ |$$ | -$$ | $$ |$$ ____| \$$$ / $$ ____|$$ | \____$$\ $$ ____|$$ |$$ | -$$ | $$ |\$$$$$$$\ \$ / \$$$$$$$\ $$ | $$$$$$$ |\$$$$$$$\ $$ |$$ | -\__| \__| \_______| \_/ \_______|\__| \_______/ \_______|\__|\__| - -""" - - -def _run_scantxoutset(rpcw, args, result): - try: - result["result"] = rpcw.scantxoutset(*args) - except socket.timeout: - logger.debug("socket timed out during txoutsetscan (this is expected)") - - -def _run_rescan(rpcw, begin_height: int): - try: - rpcw.rescanblockchain(begin_height) - except socket.timeout: - logger.debug("socket timed out during rescan (this is expected)") - - -# Curses is weird and ENTER isn't always ENTER. -ENTER_KEYS = [curses.KEY_ENTER, 10, 13] - - -class HomeScene(Scene): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - - self.dashboard_item = MenuItem(0, "dashboard", GoDashboard) - self.setup_item = MenuItem(1, "set up wallet", GoSetup) - # self.send_item = MenuItem(2, "send", GoHome) - # self.recieve_item = MenuItem(3, "receive", GoHome) - - self.mitems = [ - self.setup_item, - # self.send_item, - # self.recieve_item, - ] - if self.wallet_configs: - self.mitems.insert(0, self.dashboard_item) - - self.midx = 0 - self.mchoice = self.setup_item - - def draw(self, k: int) -> t.Tuple[int, Action]: - scr = self.scr - curses.noecho() - height, width = scr.getmaxyx() - wconfigs = self.wallet_configs - - if k in [ord("q")]: - return (-1, Quit) - elif k in ENTER_KEYS: - return (-1, self.mchoice.action) - - if k in [curses.KEY_DOWN, ord("j")] and self.midx < (len(self.mitems) - 1): - self.midx += 1 - elif k in [curses.KEY_UP, ord("k")] and self.midx > 0: - self.midx -= 1 - - self.mchoice = self.mitems[self.midx] - - # Declaration of strings - - title: str = """ -░█████╗░░█████╗░██╗░░░░░██████╗░░█████╗░░█████╗░██████╗░███████╗ -██╔══██╗██╔══██╗██║░░░░░██╔══██╗██╔══██╗██╔══██╗██╔══██╗██╔════╝ -██║░░╚═╝██║░░██║██║░░░░░██║░░██║██║░░╚═╝██║░░██║██████╔╝█████╗░░ -██║░░██╗██║░░██║██║░░░░░██║░░██║██║░░██╗██║░░██║██╔══██╗██╔══╝░░ -╚█████╔╝╚█████╔╝███████╗██████╔╝╚█████╔╝╚█████╔╝██║░░██║███████╗ -░╚════╝░░╚════╝░╚══════╝╚═════╝░░╚════╝░░╚════╝░╚═╝░░╚═╝╚══════╝ - """ - - titlelines = [i for i in title.splitlines() if i.strip()] - title_len = len(titlelines[2]) - subtitle = "your monetary glue" - - # Centering calculations - start_x_title = int((width // 2) - (title_len // 2) - title_len % 2) - title_height = len(title.splitlines()) + 1 - start_y = height // 4 - - if wconfigs: - # TODO this will run off the end of the screen quickly - keystr = f"Wallets: {', '.join([w.name for w in wconfigs])}".format(k)[ - : width - 1 - ] - start_x_keystr = int((width // 2) - (len(keystr) // 2) - len(keystr) % 2) - scr.addstr(start_y + title_height + 4, start_x_keystr, keystr[:width]) - - start_x_subtitle = int((width // 2) - (len(subtitle) // 2) - len(subtitle) % 2) - - with attrs(scr, colr(2), curses.A_BOLD): - for i, line in enumerate(titlelines): - scr.addstr(start_y + i, start_x_title, line) - - scr.addstr(start_y + title_height, start_x_subtitle, subtitle[:width]) - scr.addstr( - start_y + title_height + 2, start_x_title, ("/ " * (title_len // 2))[:width] - ) - - def menu_option(idx: int, text: str, selected=False): - half = width // 2 - - start_str = f'{"":<6}{text:>20}{"":<6}' - if selected: - start_str = " -> " + start_str[4:] - scr.addstr(start_y + title_height + 8 + idx, half, start_str[:width]) - - if self.wallet_configs: - menu_option(*self.dashboard_item.args(self.mchoice)) - # TODO - # menu_option(*self.send_item.args(self.mchoice)) - # menu_option(*self.recieve_item.args(self.mchoice)) - - menu_option(*self.setup_item.args(self.mchoice)) - - scr.move(0, 0) - - # Refresh the screen - scr.refresh() - - k = scr.getch() - # Wait for next input - return (k, GoHome) - - -def _s(window, y, x, msg, attr=0): - """A width-safe version of addstr.""" - (_, width) = window.getmaxyx() - if not attr: - window.addstr(y, x, msg[:width]) - else: - window.addstr(y, x, msg[:width], attr) - - -class DashboardScene(Scene): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.utxos = {} - self.threads = [] - self.threads_started = False - self.new_addrs = [] - self.blocks = [] - - self.conn_status = None - self.loop_count = 0 - self.cursorposx = 0 - self.flash_msg = "" - self.selected_utxos = set() - - # Y cursor positions within each window. - self.wincursoridx = { - "utxos": 0, - "addrs": 0, - } - - def start_threads(self): - if self.threads_started: - return - - wall = self.wallet_configs[0] - wrpc = self.config.rpc(wall, timeout=2) - - t1 = threading.Thread( - target=_get_utxo_lines, - args=(wrpc, self.controller, self.utxos), - ) - t1.start() - self.threads.append(t1) - - t2 = threading.Thread( - target=_get_new_blocks, - args=(self.config.rpc(timeout=2), self.blocks), - ) - t2.start() - self.threads.append(t2) - - self.threads_started = True - self.rpc = self.config.rpc() - - def stop_threads(self): - stop_threads_event.set() - for thread in self.threads: - thread.join() - - def draw(self, k: int) -> t.Tuple[int, Action]: - try: - return self._draw(k) - except Exception: - logger.exception("Dashboard curses barfed") - self.stop_threads() - raise - - return (ord("q"), GoHome) - - def _draw(self, k: int) -> t.Tuple[int, Action]: - scr = self.scr - self.height, self.width = scr.getmaxyx() - wall = self.wallet_configs[0] - - substartx = 3 - substarty = 2 - top_panel_height = int(self.height * 0.7) - - balwidth = max(int(self.width * 0.6) - 4, 66) - addrwidth = max(int(self.width * 0.4) - 2, 26) - chainwidth = max(self.width - 6, 92) - chainwin_height = int(self.height * 0.25) - - self.balance_win = scr.derwin(top_panel_height, balwidth, substarty, substartx) - self.address_win = scr.derwin( - top_panel_height, addrwidth, substarty, substartx + balwidth + 1 - ) - self.chain_win = scr.derwin( - chainwin_height, chainwidth, substarty + top_panel_height, substartx - ) - - LIMIT_NEW_ADDRS = 10 - - if k != -1: - self.flash_msg = "" - - if rpc_conn_lost.is_set(): - self.flash_msg = "connection to Bitcoin Core lost" - - if k == ord("n"): - if len(self.new_addrs) < LIMIT_NEW_ADDRS: - try: - rpcw = self.config.rpc(wall) - self.new_addrs.append(rpcw.getnewaddress()) - except Exception: - logger.info("call to getnewadddress failed", exc_info=True) - - utxo_addrs = {u.address for u in self.utxos.values()} - # Strip out used addresses. - self.new_addrs = [a for a in self.new_addrs if a not in utxo_addrs] - - with utxos_lock: - utxos = dict(self.utxos) - - max_balance_utxo_lines = self.balance_win.getmaxyx()[0] - 6 - total_balance_lines = min(len(utxos), max_balance_utxo_lines) - - # The (window name, y pos) of the user's selection cursor. - if k in [ord("h"), curses.KEY_LEFT, ord("a")] and self.cursorposx > 0: - self.cursorposx -= 1 - elif k in [ord("l"), curses.KEY_RIGHT, ord("d")] and self.cursorposx < 1: - self.cursorposx += 1 - - cur_win_title = [ - "utxos", - "addrs", - ][self.cursorposx] - last_wincursoridx = self.wincursoridx[cur_win_title] - - downkeys = [ord("j"), curses.KEY_DOWN, ord("s")] - upkeys = [ord("k"), curses.KEY_UP, ord("w")] - - if cur_win_title == "utxos": - if k in downkeys and last_wincursoridx < (total_balance_lines - 1): - self.wincursoridx["utxos"] += 1 - elif k in upkeys and last_wincursoridx > 0: - self.wincursoridx["utxos"] -= 1 - - elif cur_win_title == "addrs": - if k in downkeys and last_wincursoridx < (len(self.new_addrs) - 1): - self.wincursoridx["addrs"] += 1 - elif k in upkeys and last_wincursoridx > 0: - self.wincursoridx["addrs"] -= 1 - - # Bring cursor to new address if created - if k == ord("n"): - cur_win_title = "addrs" - self.cursorposx = 1 # the index for the addresses window - self.wincursoridx[cur_win_title] = len(self.new_addrs) - 1 - - wincursoridx = self.wincursoridx[cur_win_title] - - try: - self.start_threads() - except ConnectionRefusedError: - curses.endwin() - F.warn("Unable to connect to Bitcoin Core RPC") - F.warn("Ensure Core is running or use `coldcore --rpc `") - sys.exit(1) - - # --- Paint the balances window - - border_attrs = [curses.A_BOLD] if cur_win_title == "utxos" else [] - title_attrs = [curses.A_STANDOUT] if cur_win_title == "utxos" else [] - with attrs(self.balance_win, *border_attrs): - self.balance_win.border() - with attrs(self.balance_win, *title_attrs): - _s(self.balance_win, 0, 2, " UTXOs ") - - _s( - self.balance_win, - 2, - 2, - f"{'address':<48}{'confs':>10}{'BTC':>12}", - ) - - starty = 2 - startx = 2 - - _s(self.balance_win, starty, startx, "") - starty += 1 - - if max_balance_utxo_lines < len(utxos): - _s( - self.balance_win, - starty, - startx, - "-- too many UTXOs to fit --", - curses.A_BOLD, - ) - starty += 1 - - sorted_utxos = sorted(utxos.values(), key=lambda u: -u.num_confs)[ - -max_balance_utxo_lines: - ] - total_bal = f"{sum([u.amount for u in sorted_utxos])}" - coin_idx = 0 - y_idx = 0 - - def sanitize_label(label: str): - return "".join(i for i in label if i in string.printable).strip() - - bal_line_width = 70 - - for u in sorted_utxos: - attrslist = [] - - if u.num_confs < 6: - attrslist.extend([colr(6), curses.A_BOLD]) - - enter_label = False - - if cur_win_title == "utxos" and wincursoridx == coin_idx: - attrslist.append(curses.A_REVERSE) - - if k in (ENTER_KEYS + [ord(" ")]): - # Enter pressed; toggle this address for spending - self.selected_utxos ^= {u.id} - elif k == ord("L"): - enter_label = True - - addr_str = u.address - if u.id in self.selected_utxos: - attrslist.append(colr(4)) - addr_str = f"✔ {addr_str}" - - line = f"{addr_str:<48}{u.num_confs:>10}{u.amount:>12}" - - with attrs(self.balance_win, *attrslist): - _s(self.balance_win, starty + y_idx, startx, line) - - coin_idx += 1 - y_idx += 1 - - if u.label: - label = sanitize_label(u.label) - if len(label) > (balwidth - 4): - label = label[: (balwidth - 7)] + "..." - label += " " * (bal_line_width - len(label) - 4) - _s(self.balance_win, starty + y_idx, startx, f" └─ {label}") - y_idx += 1 - - if enter_label: - _s(self.balance_win, starty + y_idx, startx + 1, "Enter label") - labelwin = self.balance_win.derwin( - 1, balwidth - startx - 5, starty + y_idx + 1, startx + 1 - ) - _s(labelwin, 0, 0, " └─ ") - - tb = Textbox(labelwin) - scr.refresh() - tb.edit() - new_label = sanitize_label(tb.gather()) - - try: - rpcw = self.config.rpc(wall) - rpcw.setlabel(u.address, new_label) - except Exception: - logger.info("failed to set label", exc_info=True) - self.flash_msg = "failed to set label" - else: - self.flash_msg = f"set label to '{new_label}'" - # Redraw with label - return (-1, GoDashboard) - - if sorted_utxos: - _s( - self.balance_win, - starty + y_idx + 1, - startx, - f"{' ':<54}{total_bal:>16}", - curses.A_BOLD, - ) - - # --- Paint the addresses window - - border_attrs = [curses.A_BOLD] if cur_win_title == "addrs" else [] - title_attrs = [curses.A_STANDOUT] if cur_win_title == "addrs" else [] - with attrs(self.address_win, *border_attrs): - self.address_win.border() - with attrs(self.address_win, *title_attrs): - _s(self.address_win, 0, 2, " unused addresses ") - - _s(self.address_win, 2, 2, "press 'n' to get new address") - - for i, addr in enumerate(self.new_addrs): - attrslist = [] - is_highlighted = cur_win_title == "addrs" and wincursoridx == i - - if is_highlighted: - attrslist.append(curses.A_REVERSE) - - with attrs(self.address_win, *attrslist): - _s(self.address_win, 3 + i, 2, addr) - - if is_highlighted and k in ENTER_KEYS: - to_clipboard(addr) - self.flash_msg = f"copied address '{addr}' to clipboard" - - # --- Paint the chain history window - - self.chain_win.box() - _s(self.chain_win, 0, 2, " chain status ") - - max_history = chainwin_height - 5 - - if not self.conn_status or self.loop_count % 20 == 0: - try: - rpc = self.config.rpc() - netinfo = self.rpc.getnetworkinfo() - except Exception: - self.conn_status = "! couldn't connect to Bitcoin Core" - else: - ver = netinfo["subversion"].strip("/") - self.conn_status = ( - f"✔ connected to version {ver} at {rpc.host}:{rpc.port}" - ) - - status_attrs = [curses.A_BOLD, colr(2)] if self.conn_status[0] == "!" else [] - with attrs(self.chain_win, *status_attrs): - _s(self.chain_win, 2, 3, self.conn_status) - - with blocks_lock: - for i, b in enumerate(self.blocks[-max_history:]): - blockstr = ( - f"{b.time_saw} | block {b.height} (...{b.hash[-8:]}) - " - f"{b.median_fee} sat/B - " - f"{b.txs} txs - " - f"subsidy: {b.subsidy / 100_000_000}" - ) - _s(self.chain_win, 4 + i, 3, blockstr[:chainwidth]) - - if self.flash_msg: - with attrs(scr, colr(3)): - msg = f" (!) {self.flash_msg}" - msg = msg + (" " * (self.width - len(msg) - 1)) - scr.addstr(0, 0, msg) - - scr.refresh() - - # scr.move(self.width, self.height) - - scr.timeout(400) - next_k = scr.getch() - self.loop_count += 1 - - if next_k == ord("q"): - self.stop_threads() - - return (next_k, GoDashboard) - - -def to_clipboard(s: str) -> bool: - """Put s into the system clipboard.""" - plat = platform.system() - - def sh(cmd, **kwargs) -> int: - return subprocess.run(cmd, shell=True, **kwargs).returncode - - if plat == "Linux": - if sh("which xclip", capture_output=True) != 0: - logger.info("xclip not found, cannot copy to clipboard") - return False - cmd = "xclip -selection clipboard" - sh(f"printf '{s}' | {cmd}") - elif plat == "Darwin": - cmd = "pbcopy" - sh(f"printf '{s}' | {cmd}") - elif plat == "Windows": - cmd = "clip" - sh(f"echo {s} | {cmd}") - - return True - - -@dataclass -class Block: - hash: str - height: int - time_saw: datetime.datetime - median_fee: float - subsidy: float - txs: int - - -rpc_conn_lost = threading.Event() -stop_threads_event = threading.Event() -utxos_lock = threading.Lock() -blocks_lock = threading.Lock() - - -def _get_new_blocks(rpc, blocks): - last_saw = None - - while True: - try: - saw = rpc.getbestblockhash() - except Exception: - logger.info("getbestblockhash call failed", exc_info=True) - rpc_conn_lost.set() - - if saw != last_saw: - stats = rpc.getblockstats(saw) - with blocks_lock: - blocks.append( - Block( - saw, - stats["height"], - datetime.datetime.now(), - stats["feerate_percentiles"][2], - stats["subsidy"], - stats["txs"], - ) - ) - last_saw = saw - - rpc_conn_lost.clear() - time.sleep(1) - - if stop_threads_event.is_set(): - return - - -def _get_utxo_lines(rpcw, controller, utxos): - """ - Poll constantly for new UTXOs. - """ - while True: - try: - new_utxos = controller.get_utxos(rpcw) - except Exception: - logger.info("listunspents call failed", exc_info=True) - - with utxos_lock: - utxos.clear() - utxos.update(new_utxos) - - time.sleep(1) - - if stop_threads_event.is_set(): - return - - -GoHome = Action() -GoSetup = Action() -GoDashboard = Action() -Quit = Action() - - -class _TermOpts: - has_256color = False - - -TermOpts = _TermOpts() - - -def draw_menu(scr, config, wallet_configs, controller, action=None): - wallet_configs = wallet_configs or [] - # Clear and refresh the screen for a blank canvas - scr.clear() - scr.refresh() - scr.scrollok(True) - - curses.start_color() - curses.use_default_colors() - curses.init_pair(1, curses.COLOR_CYAN, curses.COLOR_BLACK) - curses.init_pair(2, curses.COLOR_RED, curses.COLOR_BLACK) - curses.init_pair(3, curses.COLOR_BLACK, curses.COLOR_WHITE) - curses.init_pair(4, curses.COLOR_CYAN, curses.COLOR_BLACK) - curses.init_pair(5, curses.COLOR_GREEN, curses.COLOR_BLACK) - curses.init_pair(6, curses.COLOR_YELLOW, curses.COLOR_BLACK) - - if curses.COLORS >= 256: - TermOpts.has_256color = True - - home = HomeScene(scr, config, wallet_configs, controller) - dashboard = DashboardScene(scr, config, wallet_configs, controller) - - action = action or GoHome - k = 0 - - while action != Quit: - # Initialization - scr.clear() - height, width = scr.getmaxyx() - - try: - kstr = curses.keyname(k).decode() - except ValueError: - kstr = "???" - - statusbarstr = f"press 'q' to exit | never sell | last keypress: {kstr} ({k})" - if k == -1: - statusbarstr += " | waiting" - - try: - # Render status bar - with attrs(scr, colr(3)): - try: - scr.addstr(height - 1, 0, statusbarstr[:width]) - scr.addstr( - height - 1, - len(statusbarstr), - (" " * (width - len(statusbarstr) - 1))[:width], - ) - # TODO better status bar - except Exception: - pass - - if action == GoHome: - (k, action) = home.draw(k) - elif action == GoSetup: - config, wallet = run_setup(config, controller) - # Reinitialize the scenes - if config and wallet: - wallet_configs.append(wallet) - home = HomeScene(scr, config, wallet_configs, controller) - dashboard = DashboardScene(scr, config, wallet_configs, controller) - k = -1 - action = GoHome - elif action == GoDashboard: - (k, action) = dashboard.draw(k) - except curses.error: - scr = curses.initscr() - scr.clear() - scr.timeout(400) - scr.refresh() - scr.addstr(1, 1, "! terminal too small.") - scr.addstr(2, 1, "! resize to make larger") - scr.addstr(3, 1, "! or press 'q' to exit.") - scr.refresh() - time.sleep(0.3) - k = scr.getch() - - if k == ord("q") or action == Quit: - break - - -@contextlib.contextmanager -def attrs(scr, *attrs): - for a in attrs: - scr.attron(a) - yield - for a in attrs: - scr.attroff(a) - - -def start_ui(config, wallet_configs, controller, action=None): - formatter = OutputFormatter() - try: - curses.wrapper(draw_menu, config, wallet_configs, controller, action) - os.system("cls" if os.name == "nt" else "clear") - except curses.error: - print() - formatter.warn("The UI crashed! Terminal might be too small, try resizing.") - print() - sys.exit(1) - except socket.timeout: - logger.exception("RPC connection timed out") - print() - formatter.warn("Unable to connect to Bitcoin Core RPC - are you sure ") - formatter.warn("it is running and the RPC URL you gave is correct?") - formatter.alert("See `--rpc` in `coldcore --help`") - print() - sys.exit(1) diff --git a/src/requirements-dev.txt b/src/requirements-dev.txt deleted file mode 100644 index 98e7c91..0000000 --- a/src/requirements-dev.txt +++ /dev/null @@ -1,4 +0,0 @@ -mypy==0.782 -pytest==6.2.0 -flake8==3.8.3 -black==20.8b1 diff --git a/src/setup.py b/src/setup.py deleted file mode 100644 index bc9ee74..0000000 --- a/src/setup.py +++ /dev/null @@ -1,20 +0,0 @@ -from setuptools import setup -import os - -here = os.path.abspath(os.path.dirname(__file__)) - -setup( - name="coldcore", - version="0.0.1", - description="A small shim to connect Coldcards to Bitcoin Core", - author="jamesob", - author_email="hijamesob@pm.me", - include_package_data=True, - zip_safe=False, - packages=["coldcore"], - entry_points={ - "console_scripts": [ - "coldcore = coldcore.main:main", - ], - }, -) diff --git a/src/coldcore/test_coldcard.py b/test/test_coldcard.py similarity index 98% rename from src/coldcore/test_coldcard.py rename to test/test_coldcard.py index 4011eb5..f08fd3f 100644 --- a/src/coldcore/test_coldcard.py +++ b/test/test_coldcard.py @@ -1,6 +1,6 @@ import io -from .main import CCWallet, WpkhDescriptor +from coldcore import CCWallet, WpkhDescriptor def test_parse_public(): @@ -29,6 +29,7 @@ def getdescriptorinfo(*args, **kwargs): is_change=True, ), ], + "name": "coldcard-3d88d0cf", "earliest_block": None, } @@ -77,6 +78,7 @@ def getdescriptorinfo(*args, **kwargs): ], "earliest_block": None, "bitcoind_json_url": None, + "name": "coldcard-f0ccde95", } diff --git a/src/coldcore/test_crypto.py b/test/test_crypto.py similarity index 92% rename from src/coldcore/test_crypto.py rename to test/test_crypto.py index d0cdae3..64b2a2f 100644 --- a/src/coldcore/test_crypto.py +++ b/test/test_crypto.py @@ -1,4 +1,4 @@ -from . import crypto +from coldcore import xpub_to_fp def test_xpub_to_fp(): @@ -23,4 +23,4 @@ def test_xpub_to_fp(): ] for (xpub, fp) in pairs: - assert crypto.xpub_to_fp(xpub) == fp + assert xpub_to_fp(xpub) == fp From 83ebeae592ea6262000e867d45b67f9459498c67 Mon Sep 17 00:00:00 2001 From: James O'Beirne Date: Fri, 14 Jan 2022 11:06:50 -0500 Subject: [PATCH 2/4] black: reformatting --- coldcore | 33 +++++++++++++++++++-------------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/coldcore b/coldcore index 82bcade..a83226b 100755 --- a/coldcore +++ b/coldcore @@ -478,12 +478,14 @@ class BitcoinRPC(object): def _getconn(self, timeout=None): return httplib.HTTPConnection( - self._parsed_url.hostname, port=self.port, timeout=timeout, + self._parsed_url.hostname, + port=self.port, + timeout=timeout, ) def _call(self, service_name, *args, **kwargs): self.__id_count += 1 - kwargs.setdefault('timeout', self.timeout) + kwargs.setdefault("timeout", self.timeout) postdata = json.dumps( { @@ -511,7 +513,7 @@ class BitcoinRPC(object): conn = None while tries: try: - conn = self._getconn(timeout=kwargs['timeout']) + conn = self._getconn(timeout=kwargs["timeout"]) conn.request("POST", path, postdata, headers) except (BlockingIOError, http.client.CannotSendRequest, socket.gaierror): rpc_logger.exception( @@ -2291,9 +2293,11 @@ class CCWallet(Wallet): @classmethod def from_io( - cls, inp: t.IO, rpc: BitcoinRPC, - name: Op[str] = None, - earliest_block: Op[int] = None + cls, + inp: t.IO, + rpc: BitcoinRPC, + name: Op[str] = None, + earliest_block: Op[int] = None, ) -> "CCWallet": """ Instantiate a CCWallet from the public output generated by the @@ -2365,7 +2369,7 @@ class CCWallet(Wallet): desc.checksum = desc_to_checksum(desc) descs.append(desc) - name = name or f'coldcard-{fp}' + name = name or f"coldcard-{fp}" return cls( name, @@ -2597,9 +2601,10 @@ def discover_rpc( def _is_already_loaded_err(e: JSONRPCError) -> bool: msg = str(e).lower() return ( - ('already loaded' in msg) or - ('duplicate -wallet filename' in msg) or - ('database already exists' in msg)) + ("already loaded" in msg) + or ("duplicate -wallet filename" in msg) + or ("database already exists" in msg) + ) def get_rpc( @@ -2752,7 +2757,7 @@ def _prepare_send( # Add up total output amount for o in outs: - addr = o['scriptPubKey']['addresses'][0] + addr = o["scriptPubKey"]["addresses"][0] try: addr_info = rpcw.getaddressinfo(addr) except Exception: @@ -2775,7 +2780,7 @@ def _prepare_send( # Display outputs for o in outs: - addr = o['scriptPubKey']['addresses'][0] + addr = o["scriptPubKey"]["addresses"][0] try: addr_info = rpcw.getaddressinfo(addr) except Exception: @@ -2832,10 +2837,10 @@ def confirm_broadcast(rpcw: BitcoinRPC, hex_val: str, psbt_hex: str) -> bool: for out in info["vout"]: spk = out["scriptPubKey"] - if 'addresses' in spk: + if "addresses" in spk: # XXX unsure if this is necessary, but might be for older versions of Core. addrs = ",".join(spk["addresses"]) - elif 'address' in spk: + elif "address" in spk: addrs = spk["address"] else: raise RuntimeError(f"unexpected decoderawtransaction format:\n{info}") From fa7a7f1b711ed3e24d4b1b74c715b665ec23fb03 Mon Sep 17 00:00:00 2001 From: James O'Beirne Date: Fri, 14 Jan 2022 11:14:06 -0500 Subject: [PATCH 3/4] CI: fix for single file --- .github/workflows/run-tests.yml | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index 0ab799d..6bee126 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -24,26 +24,22 @@ jobs: run: | python -m pip install --upgrade pip pip install -r requirements-dev.txt + cp coldcore coldcore.py + echo "PYTHONPATH=$(pwd)" >> $GITHUB_ENV - name: black - run: black --check + run: black --check coldcore - name: flake8 run: | - flake8 src --count --show-source --statistics --exclude 'test_*' + flake8 coldcore --count --show-source --statistics # When linting the tests, ignore long line errors - flake8 src --count --show-source --statistics --ignore E501 + flake8 test/ --count --show-source --statistics --ignore E501 - name: pytest run: | - pytest -vv src + pytest -vv test/ - name: mypy run: | - mypy src/coldcore - - - name: check compiled script - run: | - python ./bin/compile - git diff --exit-code - ./coldcore --help + mypy coldcore From 73a322744e56efade6b7e731b1a7d5e9d418c459 Mon Sep 17 00:00:00 2001 From: James O'Beirne Date: Fri, 14 Jan 2022 11:26:04 -0500 Subject: [PATCH 4/4] README: update for single file --- README.md | 39 ++++++++------------------------------- 1 file changed, 8 insertions(+), 31 deletions(-) diff --git a/README.md b/README.md index ffa2cc2..0896494 100644 --- a/README.md +++ b/README.md @@ -95,6 +95,13 @@ by doing a few test transactions in testnet. set `coldcard --rpc ` if desired. +### Development + +Run tests and linting locally with `make test` and `make lint`. It's advisable to do +this before filing a PR, otherwise CI will likely fail due to [`black` formatting +requirements](https://github.com/psf/black). + + ## Design ### Zero install process @@ -143,37 +150,7 @@ supported, but I'm happy to add others that fit the criteria of ### Auditing -The final script, `coldcore`, is dumbly compiled from the contents of `src/coldcore/` -for convenience during development (per `./bin/compile`). - -If you want to read through, I recommend starting with the `src/coldcore` tree. - -``` -. -├── bin -│   ├── compile # generates final `coldcore` script -│   └── sign_release -├── coldcore -├── sigs # signatures for verification -│   └── coldcore-0.1.0-alpha.asc -└── src - ├── coldcore - │   ├── crypto.py # a few basic cryptographic utilities - │   ├── __init__.py - │   ├── main.py # most logic is here; wallet ops, CLI, models - │   ├── test_coldcard.py - │   ├── test_crypto.py - │   ├── thirdparty - │   │   ├── bitcoin_rpc.py # taken from python-bitcoinlib - │   │   ├── clii.py # taken from jamesob/clii - │   │   ├── __init__.py - │   │   └── py.typed - │   └── ui.py # presentation logic, curses - ├── requirements-dev.txt - └── setup.py # for development use only -``` - - +All source is contained in `coldcore`. ## Status