Skip to content

Commit

Permalink
Add support to execute transactions from ledger (#325)
Browse files Browse the repository at this point in the history
  • Loading branch information
moisses89 committed Dec 15, 2023
1 parent 4e6d1dd commit dd99966
Show file tree
Hide file tree
Showing 8 changed files with 339 additions and 20 deletions.
14 changes: 14 additions & 0 deletions safe_cli/operators/hw_wallets/hw_wallet.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import re
from abc import ABC, abstractmethod

from eth_typing import HexStr
from web3.types import TxParams

from .constants import BIP32_ETH_PATTERN, BIP32_LEGACY_LEDGER_PATTERN
from .exceptions import InvalidDerivationPath

Expand Down Expand Up @@ -44,6 +47,17 @@ def sign_typed_hash(self, domain_hash: bytes, message_hash: bytes) -> bytes:
:return: signature bytes
"""

@abstractmethod
def get_signed_raw_transaction(
self, tx_parameters: TxParams, chain_id: int
) -> HexStr:
"""
:param chain_id:
:param tx_parameters:
:return: raw transaction signed
"""

def __str__(self):
return f"{self.__class__.__name__} device with address {self.address}"

Expand Down
78 changes: 75 additions & 3 deletions safe_cli/operators/hw_wallets/hw_wallet_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
from typing import Dict, List, Optional, Set, Tuple

from eth_typing import ChecksumAddress
from hexbytes import HexBytes
from prompt_toolkit import HTML, print_formatted_text
from web3.types import TxParams, Wei

from gnosis.eth import TxSpeed
from gnosis.eth.eip712 import eip712_encode
from gnosis.safe import SafeTx

Expand All @@ -25,6 +28,7 @@ class HwWalletManager:
def __init__(self):
self.wallets: Set[HwWallet] = set()
self.supported_hw_wallet_types: Dict[str, HwWallet] = {}
self.sender: Optional[HwWallet] = None
try:
from .ledger_wallet import LedgerWallet

Expand Down Expand Up @@ -82,9 +86,19 @@ def add_account(

hw_wallet = self.get_hw_wallet(hw_wallet_type)

address = hw_wallet(derivation_path).address
self.wallets.add(hw_wallet(derivation_path))
return address
wallet = hw_wallet(derivation_path)
self.wallets.add(wallet)
return wallet.address

def set_sender(self, hw_wallet_type: HwWalletType, derivation_path: str):
"""
Set a harware wallet as a sender to enable execute transaction from it.
:param hw_wallet_type:
:param derivation_path:
:return:
"""
hw_wallet = self.get_hw_wallet(hw_wallet_type)
self.sender = hw_wallet(derivation_path)

def delete_accounts(self, addresses: List[ChecksumAddress]) -> Set:
"""
Expand All @@ -97,6 +111,8 @@ def delete_accounts(self, addresses: List[ChecksumAddress]) -> Set:
for address in addresses:
for account in self.wallets:
if account.address == address:
if self.sender and self.sender.address == address:
self.sender = None
accounts_to_remove.add(account)
self.wallets = self.wallets.difference(accounts_to_remove)
return accounts_to_remove
Expand Down Expand Up @@ -136,3 +152,59 @@ def sign_eip712(self, safe_tx: SafeTx, wallets: List[HwWallet]) -> SafeTx:
)

return safe_tx

def execute_safe_tx(
self,
safe_tx: SafeTx,
tx_gas: Optional[int] = None,
tx_gas_price: Optional[int] = None,
tx_nonce: Optional[int] = None,
eip1559_speed: Optional[TxSpeed] = None,
) -> Tuple[HexBytes, TxParams]:
"""
Send multisig tx to the Safe
:param safe_tx: Safe transaction to sign
:param tx_gas: Gas for the external tx. If not, `(safe_tx_gas + base_gas) * 2` will be used
:param tx_gas_price: Gas price of the external tx. If not, `gas_price` will be used
:param tx_nonce: Force nonce for `tx_sender`
:param eip1559_speed: If provided, use EIP1559 transaction
:return: Tuple(tx_hash, tx)
"""

if eip1559_speed and safe_tx.ethereum_client.is_eip1559_supported():
tx_parameters = safe_tx.ethereum_client.set_eip1559_fees(
{
"from": self.sender.address,
},
tx_speed=eip1559_speed,
)
else:
tx_parameters = {
"from": self.sender.address,
"gasPrice": tx_gas_price or safe_tx.w3.eth.gas_price,
}

if tx_gas:
tx_parameters["gas"] = tx_gas

if tx_nonce is not None:
tx_parameters["nonce"] = tx_nonce
else:
tx_parameters["nonce"] = safe_tx.ethereum_client.get_nonce_for_account(
self.sender.address, block_identifier="latest"
)
safe_tx.tx = safe_tx.w3_tx.build_transaction(tx_parameters)
safe_tx.tx["gas"] = Wei(
tx_gas or (max(safe_tx.tx["gas"] + 75000, safe_tx.recommended_gas()))
)
signed_raw_transaction = self.sender.get_signed_raw_transaction(
safe_tx.tx, safe_tx.ethereum_client.get_chain_id()
) # sign with ledger
safe_tx.tx_hash = safe_tx.ethereum_client.w3.eth.send_raw_transaction(
signed_raw_transaction
)
# Set signatures empty after executing the tx. `Nonce` is increased even if it fails,
# so signatures are not valid anymore
safe_tx.signatures = b""
return safe_tx.tx_hash, safe_tx.tx
29 changes: 27 additions & 2 deletions safe_cli/operators/hw_wallets/ledger_wallet.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from typing import Optional

from eth_typing import ChecksumAddress
from eth_typing import ChecksumAddress, HexStr
from ledgerblue.Dongle import Dongle
from ledgereth import sign_typed_data_draft
from ledgereth import create_transaction, sign_typed_data_draft
from ledgereth.accounts import get_account_by_path
from ledgereth.comms import init_dongle
from web3.types import TxParams

from gnosis.safe.signatures import signature_to_bytes

Expand Down Expand Up @@ -49,3 +50,27 @@ def sign_typed_hash(self, domain_hash: bytes, message_hash: bytes) -> bytes:
)

return signature_to_bytes(signed.v, signed.r, signed.s)

@raise_ledger_exception_as_hw_wallet_exception
def get_signed_raw_transaction(
self, tx_parameters: TxParams, chain_id: int
) -> HexStr:
"""
:param chain_id:
:param tx_parameters:
:return: raw transaction signed
"""
signed_transaction = create_transaction(
destination=tx_parameters["to"],
amount=tx_parameters["value"],
gas=tx_parameters["gas"],
nonce=tx_parameters["nonce"],
data=tx_parameters.get("data"),
max_priority_fee_per_gas=tx_parameters.get("maxPriorityFeePerGas"),
max_fee_per_gas=tx_parameters.get("maxPriorityFeePerGas"),
chain_id=chain_id,
sender_path=self.derivation_path,
dongle=self.dongle,
)
return signed_transaction.raw_transaction()
12 changes: 11 additions & 1 deletion safe_cli/operators/hw_wallets/trezor_wallet.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from functools import lru_cache

from eth_typing import ChecksumAddress
from eth_typing import ChecksumAddress, HexStr
from trezorlib import tools
from trezorlib.client import TrezorClient, get_default_client
from trezorlib.ethereum import get_address, sign_typed_data_hash
from trezorlib.ui import ClickUI
from web3.types import TxParams

from .hw_wallet import HwWallet
from .trezor_exceptions import raise_trezor_exception_as_hw_wallet_exception
Expand Down Expand Up @@ -49,3 +50,12 @@ def sign_typed_hash(self, domain_hash: bytes, message_hash: bytes) -> bytes:
self.client, n=address_n, domain_hash=domain_hash, message_hash=message_hash
)
return signed.signature

def get_signed_raw_transaction(self, tx_parameters: TxParams) -> HexStr:
"""
:param chain_id:
:param tx_parameters:
:return: raw transaction signed
"""
raise NotImplementedError
47 changes: 38 additions & 9 deletions safe_cli/operators/safe_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def require_default_sender(f):

@wraps(f)
def decorated(self, *args, **kwargs):
if not self.default_sender:
if not self.default_sender and not self.hw_wallet_manager.sender:
raise SenderRequiredException()
else:
return f(self, *args, **kwargs)
Expand Down Expand Up @@ -260,7 +260,11 @@ def load_cli_owners(self, keys: List[str]):
f'with balance={Web3.from_wei(balance, "ether")} ether'
)
)
if not self.default_sender and balance > 0:
if (
not self.default_sender
and not self.hw_wallet_manager.sender
and balance > 0
):
print_formatted_text(
HTML(
f"Set account <b>{account.address}</b> as default sender of txs"
Expand Down Expand Up @@ -288,17 +292,27 @@ def load_hw_wallet(
if option is None:
return None
_, derivation_path = ledger_accounts[option]

address = self.hw_wallet_manager.add_account(hw_wallet_type, derivation_path)
balance = self.ethereum_client.get_balance(address)

print_formatted_text(
HTML(
f"Loaded account <b>{address}</b> "
f'with balance={Web3.from_wei(balance, "ether")} ether.\n'
f"Ledger account cannot be defined as sender"
f'with balance={Web3.from_wei(balance, "ether")} ether.'
)
)

if (
not self.default_sender
and not self.hw_wallet_manager.sender
and hw_wallet_type == HwWalletType.LEDGER
and balance > 0
):
self.hw_wallet_manager.set_sender(hw_wallet_type, derivation_path)
print_formatted_text(HTML(f"HwDevice {address} added as sender"))
else:
print_formatted_text(HTML(f"HwDevice {address} wasn't added as sender"))

def load_ledger_cli_owners(
self, derivation_path: str = None, legacy_account: bool = False
):
Expand Down Expand Up @@ -350,6 +364,13 @@ def show_cli_owners(self):
f"</ansigreen>"
)
)
elif self.hw_wallet_manager.sender:
print_formatted_text(
HTML(
f"<ansigreen><b>HwDevice sender:</b> {self.hw_wallet_manager.sender}"
f"</ansigreen>"
)
)
else:
print_formatted_text(
HTML("<ansigreen>Not default sender set </ansigreen>")
Expand Down Expand Up @@ -838,12 +859,20 @@ def prepare_and_execute_safe_transaction(
@require_default_sender # Throws Exception if default sender not found
def execute_safe_transaction(self, safe_tx: SafeTx):
try:
call_result = safe_tx.call(self.default_sender.address)
if self.default_sender:
call_result = safe_tx.call(self.default_sender.address)
else:
call_result = safe_tx.call(self.hw_wallet_manager.sender.address)
print_formatted_text(HTML(f"Result: <ansigreen>{call_result}</ansigreen>"))
if yes_or_no_question("Do you want to execute tx " + str(safe_tx)):
tx_hash, tx = safe_tx.execute(
self.default_sender.key, eip1559_speed=TxSpeed.NORMAL
)
if self.default_sender:
tx_hash, tx = safe_tx.execute(
self.default_sender.key, eip1559_speed=TxSpeed.NORMAL
)
else:
tx_hash, tx = self.hw_wallet_manager.execute_safe_tx(
safe_tx, eip1559_speed=TxSpeed.NORMAL
)
self.executed_transactions.append(tx_hash.hex())
print_formatted_text(
HTML(
Expand Down
81 changes: 81 additions & 0 deletions tests/test_hw_wallet_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
from unittest.mock import MagicMock

from eth_account import Account
from hexbytes import HexBytes
from ledgerblue.Dongle import Dongle
from ledgereth import SignedTransaction

from gnosis.safe import SafeTx
from gnosis.safe.tests.safe_test_case import SafeTestCaseMixin

from safe_cli.operators.hw_wallets.hw_wallet_manager import (
Expand Down Expand Up @@ -133,3 +136,81 @@ def test_delete_account(
2,
)
self.assertEqual(len(hw_wallet_manager.wallets), 0)

@mock.patch(
"safe_cli.operators.hw_wallets.ledger_wallet.create_transaction",
autospec=True,
return_value=Dongle(),
)
@mock.patch(
"safe_cli.operators.hw_wallets.ledger_wallet.LedgerWallet.get_address",
autospec=True,
)
@mock.patch(
"safe_cli.operators.hw_wallets.ledger_wallet.init_dongle",
autospec=True,
return_value=Dongle(),
)
def test_execute(
self,
mock_init_dongle: MagicMock,
mock_get_address: MagicMock,
mock_ledger_create_transaction: MagicMock,
):
owner = self.ethereum_test_account
to = Account.create()
derivation_path = "44'/60'/0'/0"
hw_wallet_manager = HwWalletManager()
mock_get_address.return_value = owner.address
hw_wallet_manager.add_account(HwWalletType.LEDGER, derivation_path)
hw_wallet_manager.set_sender(HwWalletType.LEDGER, derivation_path)
self.assertEqual(hw_wallet_manager.sender.address, owner.address)
safe = self.deploy_test_safe(
owners=[owner.address],
threshold=1,
initial_funding_wei=self.w3.to_wei(0.1, "ether"),
)
safe_tx = SafeTx(
self.ethereum_client,
safe.address,
to.address,
10,
b"",
0,
200000,
200000,
self.gas_price,
None,
None,
safe_nonce=0,
)
safe_tx.sign(owner.key)
tx_parameters = {
"from": owner.address,
"gasPrice": safe_tx.w3.eth.gas_price,
"nonce": 0,
"gas": safe_tx.recommended_gas(),
}
safe_tx.tx = safe_tx.w3_tx.build_transaction(tx_parameters)
signed_fields = safe_tx.w3.eth.account.sign_transaction(
safe_tx.tx, private_key=owner.key
)

mocked_signed_transaction_response = SignedTransaction(
nonce=0,
gas_price=safe_tx.tx["gasPrice"],
gas_limit=safe_tx.tx["gas"],
destination=HexBytes(safe_tx.tx["to"]),
amount=safe_tx.tx["value"],
data=HexBytes(safe_tx.tx["data"]),
v=signed_fields.v,
r=signed_fields.r,
s=signed_fields.s,
)

mock_ledger_create_transaction.return_value = mocked_signed_transaction_response
tx_hash, tx = hw_wallet_manager.execute_safe_tx(safe_tx)
self.assertEqual(tx["data"], safe_tx.tx["data"])
self.assertIsNotNone(
self.ethereum_client.w3.eth.get_transaction_receipt(tx_hash)
)
Loading

0 comments on commit dd99966

Please sign in to comment.