diff --git a/README.md b/README.md index f456ac3d..26ba7f6d 100644 --- a/README.md +++ b/README.md @@ -121,7 +121,20 @@ the information about the Safe using: ``` > refresh ``` +## Ledger module +Ledger module is an optional feature of safe-cli to sign transactions with the help of [ledgereth](https://github.com/mikeshultz/ledger-eth-lib) library based on [ledgerblue](https://github.com/LedgerHQ/blue-loader-python). +To enable, safe-cli must be installed as follows: +``` +pip install safe-cli[ledger] +``` +When running on Linux, make sure the following rules have been added to `/etc/udev/rules.d/`: +```commandline +SUBSYSTEMS=="usb", ATTRS{idVendor}=="2c97", ATTRS{idProduct}=="0000", MODE="0660", TAG+="uaccess", TAG+="udev-acl" OWNER="" +SUBSYSTEMS=="usb", ATTRS{idVendor}=="2c97", ATTRS{idProduct}=="0001", MODE="0660", TAG+="uaccess", TAG+="udev-acl" OWNER="" +SUBSYSTEMS=="usb", ATTRS{idVendor}=="2c97", ATTRS{idProduct}=="0004", MODE="0660", TAG+="uaccess", TAG+="udev-acl" OWNER="" +``` +**NOTE**: before signing anything ensure that the data showing on your ledger is the same as the safe-cli data. ## Creating a new Safe Use `safe-creator --owners --threshold --salt-nonce `. diff --git a/requirements.txt b/requirements.txt index 9f7496c2..0a816c78 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,9 +1,10 @@ art==6.1 colorama==0.4.6 +ledgereth==0.9.0 packaging>=23.1 prompt_toolkit==3.0.39 pygments==2.16.1 requests==2.31.0 -safe-eth-py==6.0.0b2 +safe-eth-py==6.0.0b5 tabulate==0.9.0 web3==6.10.0 diff --git a/safe_cli/operators/__init__.py b/safe_cli/operators/__init__.py index 64d6296f..da9edc1b 100644 --- a/safe_cli/operators/__init__.py +++ b/safe_cli/operators/__init__.py @@ -1,4 +1,5 @@ # flake8: noqa F401 from .enums import SafeOperatorMode -from .safe_operator import SafeOperator, SafeServiceNotAvailable +from .exceptions import SafeServiceNotAvailable +from .safe_operator import SafeOperator from .safe_tx_service_operator import SafeTxServiceOperator diff --git a/safe_cli/operators/exceptions.py b/safe_cli/operators/exceptions.py new file mode 100644 index 00000000..083bb318 --- /dev/null +++ b/safe_cli/operators/exceptions.py @@ -0,0 +1,86 @@ +class SafeOperatorException(Exception): + pass + + +class ExistingOwnerException(SafeOperatorException): + pass + + +class NonExistingOwnerException(SafeOperatorException): + pass + + +class HashAlreadyApproved(SafeOperatorException): + pass + + +class ThresholdLimitException(SafeOperatorException): + pass + + +class SameFallbackHandlerException(SafeOperatorException): + pass + + +class InvalidFallbackHandlerException(SafeOperatorException): + pass + + +class FallbackHandlerNotSupportedException(SafeOperatorException): + pass + + +class SameGuardException(SafeOperatorException): + pass + + +class InvalidGuardException(SafeOperatorException): + pass + + +class GuardNotSupportedException(SafeOperatorException): + pass + + +class SameMasterCopyException(SafeOperatorException): + pass + + +class SafeAlreadyUpdatedException(SafeOperatorException): + pass + + +class UpdateAddressesNotValid(SafeOperatorException): + pass + + +class SenderRequiredException(SafeOperatorException): + pass + + +class AccountNotLoadedException(SafeOperatorException): + pass + + +class NotEnoughSignatures(SafeOperatorException): + pass + + +class InvalidMasterCopyException(SafeOperatorException): + pass + + +class NotEnoughEtherToSend(SafeOperatorException): + pass + + +class NotEnoughTokenToSend(SafeOperatorException): + pass + + +class SafeServiceNotAvailable(SafeOperatorException): + pass + + +class HardwareWalletException(SafeOperatorException): + pass diff --git a/safe_cli/operators/hw_accounts/__init__.py b/safe_cli/operators/hw_accounts/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/safe_cli/operators/hw_accounts/exceptions.py b/safe_cli/operators/hw_accounts/exceptions.py new file mode 100644 index 00000000..f74947f0 --- /dev/null +++ b/safe_cli/operators/hw_accounts/exceptions.py @@ -0,0 +1,31 @@ +import functools + +from ledgereth.exceptions import ( + LedgerAppNotOpened, + LedgerCancel, + LedgerLocked, + LedgerNotFound, +) + +from safe_cli.operators.exceptions import HardwareWalletException + + +def raise_as_hw_account_exception(function): + @functools.wraps(function) + def wrapper(*args, **kwargs): + try: + return function(*args, **kwargs) + except LedgerNotFound as e: + raise HardwareWalletException(e.message) + except LedgerLocked as e: + raise HardwareWalletException(e.message) + except LedgerAppNotOpened as e: + raise HardwareWalletException(e.message) + except LedgerCancel as e: + raise HardwareWalletException(e.message) + except BaseException as e: + if "Error while writing" in e.args: + raise HardwareWalletException("Ledger error writting, restart safe-cli") + raise e + + return wrapper diff --git a/safe_cli/operators/hw_accounts/ledger_manager.py b/safe_cli/operators/hw_accounts/ledger_manager.py new file mode 100644 index 00000000..7eb4e446 --- /dev/null +++ b/safe_cli/operators/hw_accounts/ledger_manager.py @@ -0,0 +1,129 @@ +from typing import List, Optional, Set, Tuple + +from eth_typing import ChecksumAddress +from ledgereth import sign_typed_data_draft +from ledgereth.accounts import LedgerAccount, get_account_by_path +from ledgereth.comms import init_dongle +from ledgereth.exceptions import LedgerNotFound +from prompt_toolkit import HTML, print_formatted_text + +from gnosis.eth.eip712 import eip712_encode +from gnosis.safe import SafeTx +from gnosis.safe.signatures import signature_to_bytes + +from safe_cli.operators.hw_accounts.exceptions import raise_as_hw_account_exception + + +class LedgerManager: + def __init__(self): + self.dongle = None + self.accounts: Set[LedgerAccount] = set() + self.connect() + + def connect(self) -> bool: + """ + Connect with ledger + :return: True if connection was successful or False in other case + """ + try: + self.dongle = init_dongle(self.dongle) + return True + except LedgerNotFound: + return False + + @property + @raise_as_hw_account_exception + def connected(self) -> bool: + """ + :return: True if ledger is connected or False in other case + """ + return self.connect() + + @raise_as_hw_account_exception + def get_accounts( + self, legacy_account: Optional[bool] = False, number_accounts: Optional[int] = 5 + ) -> List[Tuple[ChecksumAddress, str]]: + """ + Request to ledger device the first n accounts + + :param legacy_account: + :param number_accounts: number of accounts requested to ledger + :return: a list of tuples with address and derivation path + """ + accounts = [] + for i in range(number_accounts): + if legacy_account: + path_string = f"44'/60'/0'/{i}" + else: + path_string = f"44'/60'/{i}'/0/0" + + account = get_account_by_path(path_string, self.dongle) + accounts.append((account.address, account.path)) + return accounts + + @raise_as_hw_account_exception + def add_account(self, derivation_path: str): + """ + Add an account to ledger manager set + + :param derivation_path: + :return: + """ + account = get_account_by_path(derivation_path, self.dongle) + self.accounts.add(LedgerAccount(account.path, account.address)) + + def delete_accounts(self, addresses: List[ChecksumAddress]) -> Set: + """ + Remove ledger accounts from address + + :param accounts: + :return: list with the delete accounts + """ + accounts_to_remove = set() + for address in addresses: + for account in self.accounts: + if account.address == address: + accounts_to_remove.add(account) + self.accounts = self.accounts.difference(accounts_to_remove) + return accounts_to_remove + + @raise_as_hw_account_exception + def sign_eip712(self, safe_tx: SafeTx, accounts: List[LedgerAccount]) -> SafeTx: + """ + Call ledger ethereum app method to sign eip712 hashes with a ledger account + + :param domain_hash: + :param message_hash: + :param account: ledger account + :return: bytes of signature + """ + encode_hash = eip712_encode(safe_tx.eip712_structured_data) + domain_hash = encode_hash[1] + message_hash = encode_hash[2] + for account in accounts: + print_formatted_text( + HTML( + "Make sure in your ledger before signing that domain_hash and message_hash are both correct" + ) + ) + print_formatted_text(HTML(f"Domain_hash: {domain_hash.hex()}")) + print_formatted_text(HTML(f"Message_hash: {message_hash.hex()}")) + signed = sign_typed_data_draft( + domain_hash, message_hash, account.path, self.dongle + ) + + signature = signature_to_bytes(signed.v, signed.r, signed.s) + # TODO should be refactored on safe_eth_py function insert_signature_sorted + # Insert signature sorted + if account.address not in safe_tx.signers: + new_owners = safe_tx.signers + [account.address] + new_owner_pos = sorted(new_owners, key=lambda x: int(x, 16)).index( + account.address + ) + safe_tx.signatures = ( + safe_tx.signatures[: 65 * new_owner_pos] + + signature + + safe_tx.signatures[65 * new_owner_pos :] + ) + + return safe_tx diff --git a/safe_cli/operators/safe_operator.py b/safe_cli/operators/safe_operator.py index bbdccd53..0153a225 100644 --- a/safe_cli/operators/safe_operator.py +++ b/safe_cli/operators/safe_operator.py @@ -34,12 +34,32 @@ from gnosis.safe.multi_send import MultiSend, MultiSendOperation, MultiSendTx from safe_cli.ethereum_hd_wallet import get_account_from_words +from safe_cli.operators.exceptions import ( + AccountNotLoadedException, + ExistingOwnerException, + FallbackHandlerNotSupportedException, + GuardNotSupportedException, + HashAlreadyApproved, + InvalidFallbackHandlerException, + InvalidGuardException, + InvalidMasterCopyException, + NonExistingOwnerException, + NotEnoughEtherToSend, + NotEnoughSignatures, + SafeAlreadyUpdatedException, + SameFallbackHandlerException, + SameGuardException, + SameMasterCopyException, + SenderRequiredException, + ThresholdLimitException, + UpdateAddressesNotValid, +) from safe_cli.safe_addresses import ( get_default_fallback_handler_address, get_safe_contract_address, get_safe_l2_contract_address, ) -from safe_cli.utils import get_erc_20_list, yes_or_no_question +from safe_cli.utils import choose_option_question, get_erc_20_list, yes_or_no_question @dataclasses.dataclass @@ -63,90 +83,6 @@ def __str__(self): ) -class SafeOperatorException(Exception): - pass - - -class ExistingOwnerException(SafeOperatorException): - pass - - -class NonExistingOwnerException(SafeOperatorException): - pass - - -class HashAlreadyApproved(SafeOperatorException): - pass - - -class ThresholdLimitException(SafeOperatorException): - pass - - -class SameFallbackHandlerException(SafeOperatorException): - pass - - -class InvalidFallbackHandlerException(SafeOperatorException): - pass - - -class FallbackHandlerNotSupportedException(SafeOperatorException): - pass - - -class SameGuardException(SafeOperatorException): - pass - - -class InvalidGuardException(SafeOperatorException): - pass - - -class GuardNotSupportedException(SafeOperatorException): - pass - - -class SameMasterCopyException(SafeOperatorException): - pass - - -class SafeAlreadyUpdatedException(SafeOperatorException): - pass - - -class UpdateAddressesNotValid(SafeOperatorException): - pass - - -class SenderRequiredException(SafeOperatorException): - pass - - -class AccountNotLoadedException(SafeOperatorException): - pass - - -class NotEnoughSignatures(SafeOperatorException): - pass - - -class InvalidMasterCopyException(SafeOperatorException): - pass - - -class NotEnoughEtherToSend(SafeOperatorException): - pass - - -class NotEnoughTokenToSend(SafeOperatorException): - pass - - -class SafeServiceNotAvailable(SafeOperatorException): - pass - - def require_tx_service(f): @wraps(f) def decorated(self, *args, **kwargs): @@ -181,6 +117,19 @@ def decorated(self, *args, **kwargs): return decorated +def load_ledger_manager(): + """ + Load ledgerManager if dependencies are installed + :return: LedgerManager or None + """ + try: + from safe_cli.operators.hw_accounts.ledger_manager import LedgerManager + + return LedgerManager() + except (ModuleNotFoundError, IOError): + return None + + class SafeOperator: address: ChecksumAddress node_url: str @@ -228,6 +177,7 @@ def __init__(self, address: ChecksumAddress, node_url: str): self.require_all_signatures = ( True # Require all signatures to be present to send a tx ) + self.ledger_manager = load_ledger_manager() @cached_property def last_default_fallback_handler_address(self) -> ChecksumAddress: @@ -325,6 +275,36 @@ def load_cli_owners(self, keys: List[str]): except ValueError: print_formatted_text(HTML(f"Cannot load key={key}")) + def load_ledger_cli_owners(self, legacy_account: bool = False): + if not self.ledger_manager: + return None + + ledger_accounts = self.ledger_manager.get_accounts( + legacy_account=legacy_account + ) + if len(ledger_accounts) == 0: + return None + + for option, ledger_account in enumerate(ledger_accounts): + address, _ = ledger_account + print_formatted_text(HTML(f"{option} - {address} ")) + + option = choose_option_question( + "Select the owner address", len(ledger_accounts) - 1 + ) + if option is None: + return None + address, derivation_path = ledger_accounts[option] + self.ledger_manager.add_account(derivation_path) + balance = self.ethereum_client.get_balance(address) + print_formatted_text( + HTML( + f"Loaded account {address} " + f'with balance={Web3.from_wei(balance, "ether")} ether' + f"Ledger account cannot be defined as sender" + ) + ) + def unload_cli_owners(self, owners: List[str]): accounts_to_remove: Set[Account] = set() for owner in owners: @@ -335,6 +315,12 @@ def unload_cli_owners(self, owners: List[str]): accounts_to_remove.add(account) break self.accounts = self.accounts.difference(accounts_to_remove) + # Check if there are ledger owners + if self.ledger_manager and len(accounts_to_remove) < len(owners): + accounts_to_remove = ( + accounts_to_remove | self.ledger_manager.delete_accounts(owners) + ) + if accounts_to_remove: print_formatted_text( HTML("Accounts have been deleted") @@ -343,10 +329,15 @@ def unload_cli_owners(self, owners: List[str]): print_formatted_text(HTML("No account was deleted")) def show_cli_owners(self): - if not self.accounts: + accounts = ( + self.accounts | self.ledger_manager.accounts + if self.ledger_manager + else self.accounts + ) + if not accounts: print_formatted_text(HTML("No accounts loaded")) else: - for account in self.accounts: + for account in accounts: print_formatted_text( HTML( f"Account {account.address} loaded" @@ -677,6 +668,28 @@ def print_info(self): ) ) + if not self.ledger_manager: + print_formatted_text( + HTML( + "Ledger=" + "Disabled Optional ledger library is not installed, run pip install safe-cli[ledger] " + ) + ) + elif self.ledger_manager.connected: + print_formatted_text( + HTML( + "Ledger=" + "Connected" + ) + ) + else: + print_formatted_text( + HTML( + "Ledger=" + "disconnected" + ) + ) + if not self.is_version_updated(): print_formatted_text( HTML( @@ -870,6 +883,15 @@ def sign_transaction(self, safe_tx: SafeTx) -> SafeTx: threshold -= 1 if threshold == 0: break + # If still pending required signatures continue with ledger owners + selected_ledger_accounts = [] + if threshold > 0 and self.ledger_manager: + for ledger_account in self.ledger_manager.accounts: + if ledger_account.address in permitted_signers: + selected_ledger_accounts.append(ledger_account) + threshold -= 1 + if threshold == 0: + break if self.require_all_signatures and threshold > 0: raise NotEnoughSignatures(threshold) @@ -877,6 +899,10 @@ def sign_transaction(self, safe_tx: SafeTx) -> SafeTx: for selected_account in selected_accounts: safe_tx.sign(selected_account.key) + # Sign with ledger + if len(selected_ledger_accounts) > 0: + safe_tx = self.ledger_manager.sign_eip712(safe_tx, selected_ledger_accounts) + return safe_tx @require_tx_service diff --git a/safe_cli/operators/safe_tx_service_operator.py b/safe_cli/operators/safe_tx_service_operator.py index 7465251b..f493f58d 100644 --- a/safe_cli/operators/safe_tx_service_operator.py +++ b/safe_cli/operators/safe_tx_service_operator.py @@ -13,12 +13,9 @@ from safe_cli.utils import yes_or_no_question -from .safe_operator import ( - AccountNotLoadedException, - NonExistingOwnerException, - SafeOperator, - SafeServiceNotAvailable, -) +from . import SafeServiceNotAvailable +from .exceptions import AccountNotLoadedException, NonExistingOwnerException +from .safe_operator import SafeOperator class SafeTxServiceOperator(SafeOperator): @@ -102,6 +99,16 @@ def submit_signatures(self, safe_tx_hash: bytes) -> bool: for account in self.accounts: if account.address in owners: safe_tx.sign(account.key) + # Check if there are ledger signers + if self.ledger_manager: + selected_ledger_accounts = [] + for ledger_account in self.ledger_manager.accounts: + if ledger_account.address in owners: + selected_ledger_accounts.append(ledger_account) + if len(selected_ledger_accounts) > 0: + safe_tx = self.ledger_manager.sign_eip712( + safe_tx, selected_ledger_accounts + ) if safe_tx.signers: self.safe_tx_service.post_signatures(safe_tx_hash, safe_tx.signatures) diff --git a/safe_cli/prompt_parser.py b/safe_cli/prompt_parser.py index 7bd190ce..a16190a6 100644 --- a/safe_cli/prompt_parser.py +++ b/safe_cli/prompt_parser.py @@ -10,10 +10,12 @@ check_hex_str, check_keccak256_hash, ) -from .operators.safe_operator import ( +from .operators import SafeServiceNotAvailable +from .operators.exceptions import ( AccountNotLoadedException, ExistingOwnerException, FallbackHandlerNotSupportedException, + HardwareWalletException, HashAlreadyApproved, InvalidMasterCopyException, NonExistingOwnerException, @@ -21,13 +23,12 @@ NotEnoughSignatures, NotEnoughTokenToSend, SafeAlreadyUpdatedException, - SafeOperator, - SafeServiceNotAvailable, SameFallbackHandlerException, SameMasterCopyException, SenderRequiredException, ThresholdLimitException, ) +from .operators.safe_operator import SafeOperator def safe_exception(function): @@ -116,6 +117,10 @@ def wrapper(*args, **kwargs): f"Service not available for network {e.args[0]}" ) ) + except HardwareWalletException as e: + print_formatted_text( + HTML(f"HwDevice exception: {e.args[0]}") + ) return wrapper @@ -152,6 +157,10 @@ def load_cli_owners_from_words(args): def load_cli_owners(args): safe_operator.load_cli_owners(args.keys) + @safe_exception + def load_ledger_cli_owners(args): + safe_operator.load_ledger_cli_owners(args.legacy_accounts) + @safe_exception def unload_cli_owners(args): safe_operator.unload_cli_owners(args.addresses) @@ -292,6 +301,14 @@ def remove_delegate(args): parser_load_cli_owners.add_argument("keys", type=str, nargs="+") parser_load_cli_owners.set_defaults(func=load_cli_owners) + parser_load_ledger_cli_owners = subparsers.add_parser("load_ledger_cli_owners") + parser_load_ledger_cli_owners.add_argument( + "--legacy-accounts", + action="store_true", + help="Enable search legacy accounts", + ) + parser_load_ledger_cli_owners.set_defaults(func=load_ledger_cli_owners) + parser_unload_cli_owners = subparsers.add_parser("unload_cli_owners") parser_unload_cli_owners.add_argument( "addresses", type=check_ethereum_address, nargs="+" diff --git a/safe_cli/safe_completer_constants.py b/safe_cli/safe_completer_constants.py index 7294db66..fc511e80 100644 --- a/safe_cli/safe_completer_constants.py +++ b/safe_cli/safe_completer_constants.py @@ -24,6 +24,7 @@ "history": "(read-only)", "info": "(read-only)", "load_cli_owners": " [...]", + "load_ledger_cli_owners": "[--legacy-accounts]", "load_cli_owners_from_words": " ... ", "refresh": "", "remove_delegate": "
", @@ -155,6 +156,9 @@ "Command load_cli_owners will try to load a new owner via " "<account-private-key>." ), + "load_ledger_cli_owners": HTML( + "Command load_ledger_cli_owners show a list of ledger addresses to choose between them " + ), "load_cli_owners_from_words": HTML( "Command load_cli_owners_from_words will try to load owners via" "seed_words. Only relevant accounts(owners) will be loaded" diff --git a/safe_cli/utils.py b/safe_cli/utils.py index b3e96f87..132861fa 100644 --- a/safe_cli/utils.py +++ b/safe_cli/utils.py @@ -1,4 +1,7 @@ import os +from typing import Optional + +from prompt_toolkit import HTML, print_formatted_text from gnosis.eth import EthereumClient @@ -41,3 +44,25 @@ def yes_or_no_question(question: str, default_no: bool = True) -> bool: return False else: return False if default_no else True + + +def choose_option_question( + question: str, number_options: int, default_option: int = 0 +) -> Optional[int]: + if "PYTEST_CURRENT_TEST" in os.environ: + return default_option # Ignore confirmations when running tests + choices = f" [0-{number_options}] default {default_option}:" + reply = str(get_input(question + choices)).lower().strip() or str(default_option) + try: + option = int(reply) + except ValueError: + print_formatted_text(HTML(" Option must be an integer ")) + return None + + if option not in range(0, number_options): + print_formatted_text( + HTML(f" {option} is not between [0-{number_options}}} ") + ) + return None + + return option diff --git a/setup.py b/setup.py index 35d814aa..5f67b776 100644 --- a/setup.py +++ b/setup.py @@ -24,9 +24,10 @@ "prompt_toolkit>=3", "pygments>=2", "requests>=2", - "safe-eth-py>=6", + "safe-eth-py==6.0.0b3", "tabulate>=0.8", ], + extras_require={"ledger": ["ledgereth==0.9.0"]}, packages=setuptools.find_packages(), entry_points={ "console_scripts": [ diff --git a/tests/test_ledger_manager.py b/tests/test_ledger_manager.py new file mode 100644 index 00000000..8b9a103d --- /dev/null +++ b/tests/test_ledger_manager.py @@ -0,0 +1,237 @@ +import unittest +from unittest import mock +from unittest.mock import MagicMock + +from eth_account import Account +from ledgerblue.Dongle import Dongle +from ledgereth.accounts import LedgerAccount +from ledgereth.exceptions import ( + LedgerAppNotOpened, + LedgerCancel, + LedgerLocked, + LedgerNotFound, +) + +from gnosis.eth.eip712 import eip712_encode +from gnosis.safe import SafeTx +from gnosis.safe.signatures import signature_split +from gnosis.safe.tests.safe_test_case import SafeTestCaseMixin + +from safe_cli.operators.exceptions import HardwareWalletException +from safe_cli.operators.hw_accounts.ledger_manager import LedgerManager + + +class TestLedgerManager(SafeTestCaseMixin, unittest.TestCase): + def test_setup_ledger_manager(self): + ledger_manager = LedgerManager() + self.assertIsNone(ledger_manager.dongle) + self.assertEqual(len(ledger_manager.accounts), 0) + self.assertEqual(ledger_manager.connected, False) + + @mock.patch("safe_cli.operators.hw_accounts.ledger_manager.init_dongle") + @mock.patch("safe_cli.operators.hw_accounts.ledger_manager.get_account_by_path") + def test_connected( + self, mock_get_account_by_path: MagicMock, mock_init_dongle: MagicMock + ): + ledger_manager = LedgerManager() + mock_init_dongle.side_effect = LedgerNotFound() + + self.assertEqual(ledger_manager.connected, False) + + mock_init_dongle.side_effect = None + mock_init_dongle.return_value = Dongle() + mock_get_account_by_path.side_effect = LedgerLocked() + + self.assertEqual(ledger_manager.connected, True) + + @mock.patch( + "safe_cli.operators.hw_accounts.ledger_manager.sign_typed_data_draft", + autospec=True, + ) + @mock.patch( + "safe_cli.operators.hw_accounts.ledger_manager.get_account_by_path", + autospec=True, + ) + def test_hw_device_exception(self, mock_ledger_fn: MagicMock, mock_sign: MagicMock): + ledger_manager = LedgerManager() + + derivation_path = "44'/60'/0'/0" + ledger_account = LedgerAccount(derivation_path, Account.create().address) + safe = self.deploy_test_safe( + owners=[Account.create().address], + threshold=1, + initial_funding_wei=self.w3.to_wei(0.1, "ether"), + ) + safe_tx = SafeTx( + self.ethereum_client, + safe.address, + Account.create().address, + 10, + b"", + 0, + 200000, + 200000, + self.gas_price, + None, + None, + safe_nonce=0, + ) + + mock_ledger_fn.side_effect = LedgerNotFound + mock_sign.side_effect = LedgerNotFound + with self.assertRaises(HardwareWalletException): + ledger_manager.get_accounts() + with self.assertRaises(HardwareWalletException): + ledger_manager.add_account(derivation_path) + with self.assertRaises(HardwareWalletException): + ledger_manager.sign_eip712(safe_tx, [ledger_account]) + + mock_ledger_fn.side_effect = LedgerLocked + mock_sign.side_effect = LedgerLocked + with self.assertRaises(HardwareWalletException): + ledger_manager.get_accounts() + with self.assertRaises(HardwareWalletException): + ledger_manager.add_account(derivation_path) + with self.assertRaises(HardwareWalletException): + ledger_manager.sign_eip712(safe_tx, [ledger_account]) + + mock_ledger_fn.side_effect = LedgerAppNotOpened + mock_sign.side_effect = LedgerAppNotOpened + with self.assertRaises(HardwareWalletException): + ledger_manager.get_accounts() + with self.assertRaises(HardwareWalletException): + ledger_manager.add_account(derivation_path) + with self.assertRaises(HardwareWalletException): + ledger_manager.sign_eip712(safe_tx, [ledger_account]) + + mock_ledger_fn.side_effect = LedgerCancel + mock_sign.side_effect = LedgerCancel + with self.assertRaises(HardwareWalletException): + ledger_manager.get_accounts() + with self.assertRaises(HardwareWalletException): + ledger_manager.add_account(derivation_path) + with self.assertRaises(HardwareWalletException): + ledger_manager.sign_eip712(safe_tx, [ledger_account]) + + @mock.patch( + "safe_cli.operators.hw_accounts.ledger_manager.get_account_by_path", + autospec=True, + ) + def test_get_accounts(self, mock_get_account_by_path: MagicMock): + ledger_manager = LedgerManager() + addresses = [Account.create().address, Account.create().address] + derivation_paths = ["44'/60'/0'/0", "44'/60'/0'/1"] + mock_get_account_by_path.side_effect = [ + LedgerAccount(derivation_paths[0], addresses[0]), + LedgerAccount(derivation_paths[1], addresses[1]), + ] + ledger_accounts = ledger_manager.get_accounts(number_accounts=2) + self.assertEqual(len(ledger_accounts), 2) + for ledger_account, expected_address, expected_derivation_path in zip( + ledger_accounts, addresses, derivation_paths + ): + ledger_address, ledger_path = ledger_account + self.assertEqual(expected_address, ledger_address) + self.assertEqual(expected_derivation_path, ledger_path) + + @mock.patch( + "safe_cli.operators.hw_accounts.ledger_manager.get_account_by_path", + autospec=True, + ) + def test_add_account(self, mock_get_account_by_path: MagicMock): + ledger_manager = LedgerManager() + derivation_path = "44'/60'/0'/0" + account_address = Account.create().address + mock_get_account_by_path.return_value = LedgerAccount( + derivation_path, account_address + ) + self.assertEqual(len(ledger_manager.accounts), 0) + + ledger_manager.add_account(derivation_path) + + self.assertEqual(len(ledger_manager.accounts), 1) + ledger_account = list(ledger_manager.accounts)[0] + self.assertEqual(ledger_account.address, account_address) + self.assertEqual(ledger_account.path, derivation_path) + + def test_delete_account(self): + ledger_manager = LedgerManager() + random_address = Account.create().address + random_address_2 = Account.create().address + self.assertEqual(len(ledger_manager.accounts), 0) + self.assertEqual(len(ledger_manager.delete_accounts([random_address])), 0) + ledger_manager.accounts.add(LedgerAccount("44'/60'/0'/0", random_address_2)) + self.assertEqual(len(ledger_manager.delete_accounts([random_address])), 0) + self.assertEqual(len(ledger_manager.accounts), 1) + self.assertEqual(len(ledger_manager.delete_accounts([])), 0) + ledger_manager.accounts.add(LedgerAccount("44'/60'/0'/1", random_address)) + self.assertEqual(len(ledger_manager.accounts), 2) + self.assertEqual(len(ledger_manager.delete_accounts([random_address])), 1) + self.assertEqual(len(ledger_manager.accounts), 1) + ledger_manager.accounts.add(LedgerAccount("44'/60'/0'/1", random_address)) + self.assertEqual(len(ledger_manager.accounts), 2) + self.assertEqual( + len(ledger_manager.delete_accounts([random_address, random_address_2])), 2 + ) + self.assertEqual(len(ledger_manager.accounts), 0) + + @mock.patch( + "safe_cli.operators.hw_accounts.ledger_manager.init_dongle", + autospec=True, + return_value=Dongle(), + ) + def test_sign_eip712(self, mock_init_dongle: MagicMock): + ledger_manager = LedgerManager() + owner = Account.create() + to = Account.create() + ledger_account = LedgerAccount("44'/60'/0'/0", owner.address) + safe = self.deploy_test_safe( + owners=[owner.address], + threshold=1, + initial_funding_wei=self.w3.to_wei(0.1, "ether"), + ) + safe_tx = SafeTx( + self.ethereum_client, + safe.address, + to.address, + 10, + b"", + 0, + 200000, + 200000, + self.gas_price, + None, + None, + safe_nonce=0, + ) + encode_hash = eip712_encode(safe_tx.eip712_structured_data) + expected_signature = safe_tx.sign(owner.key) + # We need to split to change the bytes signature order to v + r + s like ledger return signature + v, r, s = signature_split(expected_signature) + + ledger_return_signature = ( + v.to_bytes(1, byteorder="big") + + r.to_bytes(32, byteorder="big") + + s.to_bytes(32, byteorder="big") + ) + mock_init_dongle.return_value.exchange = MagicMock( + return_value=ledger_return_signature + ) + safe_tx = ledger_manager.sign_eip712(safe_tx, [ledger_account]) + self.assertEqual(safe_tx.signatures, expected_signature) + + # Check that dongle exchange is called with the expected payload + # https://github.com/LedgerHQ/app-ethereum/blob/master/doc/ethapp.adoc#sign-eth-eip-712 + command = "e00c0000" + "51" # command + payload length + payload = ( + "04" + "8000002c8000003c8000000000000000" + ) # number of derivations + 44'/60'/0'/0 + expected_exchange_payload = ( + bytes.fromhex(command) + + bytes.fromhex(payload) + + encode_hash[1] + + encode_hash[2] + ) + mock_init_dongle.return_value.exchange.assert_called_once_with( + expected_exchange_payload + ) diff --git a/tests/test_safe_operator.py b/tests/test_safe_operator.py index 2f60a57f..b0b795e5 100644 --- a/tests/test_safe_operator.py +++ b/tests/test_safe_operator.py @@ -3,13 +3,14 @@ from unittest.mock import MagicMock from eth_account import Account +from ledgereth.objects import LedgerAccount from web3 import Web3 from gnosis.eth import EthereumClient from gnosis.safe import Safe from gnosis.safe.multi_send import MultiSend -from safe_cli.operators.safe_operator import ( +from safe_cli.operators.exceptions import ( AccountNotLoadedException, ExistingOwnerException, FallbackHandlerNotSupportedException, @@ -21,12 +22,12 @@ NonExistingOwnerException, NotEnoughEtherToSend, NotEnoughSignatures, - SafeOperator, SameFallbackHandlerException, SameGuardException, SameMasterCopyException, SenderRequiredException, ) +from safe_cli.operators.safe_operator import SafeOperator from safe_cli.utils import get_erc_20_list from tests.utils import generate_transfers_erc20 @@ -63,9 +64,37 @@ def test_load_cli_owner(self, get_contract_mock: MagicMock): # Test unload cli owner safe_operator.default_sender = random_accounts[0] number_of_accounts = len(safe_operator.accounts) - safe_operator.unload_cli_owners(["aloha", random_accounts[0].address, "bye"]) + ledger_random_address = Account.create().address + safe_operator.ledger_manager.accounts.add( + LedgerAccount("44'/60'/0'/1", ledger_random_address) + ) + self.assertEqual(len(safe_operator.ledger_manager.accounts), 1) + safe_operator.unload_cli_owners( + ["aloha", random_accounts[0].address, "bye", ledger_random_address] + ) self.assertEqual(len(safe_operator.accounts), number_of_accounts - 1) self.assertFalse(safe_operator.default_sender) + self.assertEqual(len(safe_operator.ledger_manager.accounts), 0) + + @mock.patch("safe_cli.operators.hw_accounts.ledger_manager.get_account_by_path") + def test_load_ledger_cli_owner(self, mock_get_account_by_path: MagicMock): + owner_address = Account.create().address + safe_address = self.deploy_test_safe(owners=[owner_address]).address + safe_operator = SafeOperator(safe_address, self.ethereum_node_url) + safe_operator.ledger_manager.get_accounts = MagicMock(return_value=[]) + safe_operator.load_ledger_cli_owners() + self.assertEqual(len(safe_operator.ledger_manager.accounts), 0) + random_account = Account.create().address + other_random_account = Account.create().address + safe_operator.ledger_manager.get_accounts.return_value = [ + (random_account, "44'/60'/0'/0/0"), + (other_random_account, "44'/60'/0'/0/1"), + ] + mock_get_account_by_path.return_value = LedgerAccount( + "44'/60'/0'/0/0", random_account + ) + safe_operator.load_ledger_cli_owners() + self.assertEqual(len(safe_operator.ledger_manager.accounts), 1) def test_approve_hash(self): safe_address = self.deploy_test_safe( diff --git a/tests/test_safe_tx_service_operator.py b/tests/test_safe_tx_service_operator.py index d04bebc8..c9d48914 100644 --- a/tests/test_safe_tx_service_operator.py +++ b/tests/test_safe_tx_service_operator.py @@ -4,6 +4,7 @@ from eth_account import Account from hexbytes import HexBytes +from ledgereth.objects import LedgerAccount from web3 import Web3 from gnosis.eth import EthereumClient @@ -134,6 +135,24 @@ def test_submit_signatures( get_safe_transaction_mock.return_value = GetMultisigTxRequestMock(executed=True) self.assertFalse(safe_operator.submit_signatures(safe_tx_hash)) + # Test ledger signers + with mock.patch.object( + SafeTx, "signers", return_value=["signer"], new_callable=mock.PropertyMock + ) as mock_safe_tx: + safe_operator.ledger_manager.sign_eip712 = MagicMock( + return_value=mock_safe_tx + ) + get_safe_transaction_mock.return_value = GetMultisigTxRequestMock( + executed=False + ) + safe_operator.ledger_manager.accounts.add( + LedgerAccount("44'/60'/0'/0", Account.create().address) + ) + get_permitted_signers_mock.return_value = { + list(safe_operator.ledger_manager.accounts)[0].address + } + self.assertTrue(safe_operator.submit_signatures(safe_tx_hash)) + @mock.patch.object(TransactionServiceApi, "post_transaction", return_value=True) @mock.patch.object( SafeTx, "safe_version", return_value="1.4.1", new_callable=mock.PropertyMock diff --git a/tests/test_utils.py b/tests/test_utils.py index 729c1ee9..06e22e04 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -3,7 +3,7 @@ from unittest import mock from unittest.mock import MagicMock -from safe_cli.utils import yes_or_no_question +from safe_cli.utils import choose_option_question, yes_or_no_question class TestUtils(unittest.TestCase): @@ -34,6 +34,23 @@ def test_yes_or_no_question(self, input_mock: MagicMock): os.environ["PYTEST_CURRENT_TEST"] = pytest_current_test + @mock.patch("safe_cli.utils.get_input") + def test_choose_option_question(self, input_mock: MagicMock): + pytest_current_test = os.environ.pop("PYTEST_CURRENT_TEST") + + input_mock.return_value = "" + self.assertEqual(choose_option_question("", 1), 0) + input_mock.return_value = "" + self.assertEqual(choose_option_question("", 5, 4), 4) + input_mock.return_value = "m" + self.assertIsNone(choose_option_question("", 1)) + input_mock.return_value = "10" + self.assertIsNone(choose_option_question("", 1)) + input_mock.return_value = "1" + self.assertEqual(choose_option_question("", 2), 1) + + os.environ["PYTEST_CURRENT_TEST"] = pytest_current_test + if __name__ == "__main__": unittest.main()