Skip to content

Commit

Permalink
a lot more stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
eukreign committed Mar 27, 2018
1 parent 3dfb449 commit f698141
Show file tree
Hide file tree
Showing 7 changed files with 354 additions and 1,578 deletions.
14 changes: 3 additions & 11 deletions lbrynet/wallet/account.py
Expand Up @@ -13,10 +13,6 @@ def get_key_chain_from_xpub(xpub):
return key, chain


def derive_key(parent_key, chain, sequence):
return CKD_pub(parent_key, chain, sequence)[0]


class AddressSequence:

def __init__(self, derived_keys, gap, age_checker, pub_key, chain_key):
Expand All @@ -31,7 +27,7 @@ def __init__(self, derived_keys, gap, age_checker, pub_key, chain_key):
]

def generate_next_address(self):
new_key, _ = derive_key(self.pub_key, self.chain_key, len(self.derived_keys))
new_key, _ = CKD_pub(self.pub_key, self.chain_key, len(self.derived_keys))
address = public_key_to_address(new_key)
self.derived_keys.append(new_key.encode('hex'))
self.addresses.append(address)
Expand Down Expand Up @@ -59,11 +55,11 @@ def __init__(self, data, receiving_gap, change_gap, age_checker):
master_key, master_chain = get_key_chain_from_xpub(data['xpub'])
self.receiving = AddressSequence(
data.get('receiving', []), receiving_gap, age_checker,
*derive_key(master_key, master_chain, 0)
*CKD_pub(master_key, master_chain, 0)
)
self.change = AddressSequence(
data.get('change', []), change_gap, age_checker,
*derive_key(master_key, master_chain, 1)
*CKD_pub(master_key, master_chain, 1)
)
self.is_old = age_checker

Expand All @@ -74,10 +70,6 @@ def as_dict(self):
'xpub': self.xpub
}

def ensure_enough_addresses(self):
return self.receiving.ensure_enough_addresses() + \
self.change.ensure_enough_addresses()

@property
def sequences(self):
return self.receiving, self.change
71 changes: 59 additions & 12 deletions lbrynet/wallet/blockchain.py
@@ -1,16 +1,63 @@
import os
import logging
import hashlib

from twisted.internet import threads, defer

from lbryum.util import hex_to_int, int_to_hex, rev_hex
from lbryum.hashing import hash_encode, Hash, PoWHash
from .stream import StreamController
from .stream import StreamController, execute_serially
from .constants import blockchain_params, HEADER_SIZE

log = logging.getLogger(__name__)


class Transaction:

def __init__(self, tx_hash, raw, height):
self.hash = tx_hash
self.raw = raw
self.height = height


class BlockchainTransactions:

def __init__(self, history):
self.addresses = {}
self.transactions = {}
for address, transactions in history.items():
self.addresses[address] = []
for txid, raw, height in transactions:
tx = Transaction(txid, raw, height)
self.addresses[address].append(tx)
self.transactions[txid] = tx

def has_address(self, address):
return address in self.addresses

def get_transaction(self, tx_hash, *args):
return self.transactions.get(tx_hash, *args)

def get_transactions(self, address, *args):
return self.addresses.get(address, *args)

def get_status(self, address):
hashes = [
'{}:{}:'.format(tx.hash, tx.height)
for tx in self.get_transactions(address, [])
]
if hashes:
return hashlib.sha256(''.join(hashes)).digest().encode('hex')

def has_transaction(self, tx_hash):
return tx_hash in self.transactions

def add_transaction(self, address, transaction):
self.transactions.setdefault(transaction.hash, transaction)
self.addresses.setdefault(address, [])
self.addresses[address].append(transaction)


class BlockchainHeaders:

def __init__(self, path, chain='lbrycrd_main'):
Expand All @@ -24,39 +71,39 @@ def __init__(self, path, chain='lbrycrd_main'):
self.on_changed = self._on_change_controller.stream

self._size = None
self._write_lock = defer.DeferredLock()

if not os.path.exists(path):
with open(path, 'wb'):
pass

@property
def height(self):
return len(self) - 1

def sync_read_length(self):
return os.path.getsize(self.path) / HEADER_SIZE

def __len__(self):
if self._size is None:
self._size = self.sync_read_length()
return self._size

def sync_read_header(self, height):
if 0 <= height < len(self):
with open(self.path, 'rb') as f:
f.seek(height * HEADER_SIZE)
return f.read(HEADER_SIZE)

def __len__(self):
if self._size is None:
self._size = self.sync_read_length()
return self._size

def __getitem__(self, height):
assert not isinstance(height, slice),\
"Slicing of header chain has not been implemented yet."
header = self.sync_read_header(height)
return self._deserialize(height, header)

@execute_serially
@defer.inlineCallbacks
def connect(self, start, headers):
yield self._write_lock.acquire()
try:
yield threads.deferToThread(self._sync_connect, start, headers)
finally:
self._write_lock.release()
yield threads.deferToThread(self._sync_connect, start, headers)

def _sync_connect(self, start, headers):
previous_header = None
Expand Down
129 changes: 97 additions & 32 deletions lbrynet/wallet/manager.py
@@ -1,34 +1,30 @@
import os
import logging
from operator import itemgetter

from twisted.internet import defer

import lbryschema

from .protocol import Network
from .blockchain import BlockchainHeaders
from .blockchain import BlockchainHeaders, Transaction
from .wallet import Wallet
from .stream import execute_serially

log = logging.getLogger(__name__)


def chunks(l, n):
for i in range(0, len(l), n):
yield l[i:i+n]


class WalletManager:

def __init__(self, storage, config):
self.storage = storage
self.config = config
lbryschema.BLOCKCHAIN_NAME = config['chain']
self.headers = BlockchainHeaders(self.headers_path, config['chain'])
self.wallet = Wallet(self.wallet_path)
self.wallet = Wallet(self.wallet_path, self.headers)
self.network = Network(config)
self.network.on_header.listen(self.process_header)
self.network.on_transaction.listen(self.process_transaction)
self._downloading_headers = False
self.network.on_status.listen(self.process_status)

@property
def headers_path(self):
Expand All @@ -41,48 +37,117 @@ def headers_path(self):
def wallet_path(self):
return os.path.join(self.config['wallet_path'], 'wallets', 'default_wallet')

def get_least_used_receiving_address(self, max_transactions=1000):
return self._get_least_used_address(
self.wallet.receiving_addresses,
self.wallet.default_account.receiving,
max_transactions
)

def get_least_used_change_address(self, max_transactions=100):
return self._get_least_used_address(
self.wallet.change_addresses,
self.wallet.default_account.change,
max_transactions
)

def _get_least_used_address(self, addresses, sequence, max_transactions):
transaction_counts = []
for address in addresses:
transactions = self.wallet.history.get_transactions(address, [])
tx_count = len(transactions)
if tx_count == 0:
return address
elif tx_count >= max_transactions:
continue
else:
transaction_counts.append((address, tx_count))

if transaction_counts:
transaction_counts.sort(key=itemgetter(1))
return transaction_counts[0]

address = sequence.generate_next_address()
self.subscribe_history(address)
return address

@defer.inlineCallbacks
def start(self):
self.wallet.load()
self.network.start()
yield self.network.on_connected.first
yield self.download_headers()
yield self.network.headers_subscribe()
yield self.download_transactions()
yield self.update_headers()
yield self.network.subscribe_headers()
yield self.update_wallet()

def stop(self):
return self.network.stop()

@execute_serially
@defer.inlineCallbacks
def download_headers(self):
self._downloading_headers = True
def update_headers(self):
while True:
sought_height = len(self.headers)
headers = yield self.network.block_headers(sought_height)
log.info("received {} headers starting at {} height".format(headers['count'], sought_height))
height_sought = len(self.headers)
headers = yield self.network.get_headers(height_sought)
log.info("received {} headers starting at {} height".format(headers['count'], height_sought))
if headers['count'] <= 0:
break
yield self.headers.connect(sought_height, headers['hex'].decode('hex'))
self._downloading_headers = False
yield self.headers.connect(height_sought, headers['hex'].decode('hex'))

@defer.inlineCallbacks
def process_header(self, header):
if self._downloading_headers:
def process_header(self, response):
header = response[0]
if self.update_headers.is_running:
return
if header['block_height'] == len(self.headers):
if header['height'] == len(self.headers):
# New header from network directly connects after the last local header.
yield self.headers.connect(len(self.headers), header['hex'].decode('hex'))
elif header['block_height'] > len(self.headers):
elif header['height'] > len(self.headers):
# New header is several heights ahead of local, do download instead.
yield self.download_headers()
yield self.update_headers()

@execute_serially
@defer.inlineCallbacks
def download_transactions(self):
for addresses in chunks(self.wallet.addresses, 500):
self.network.rpc([
('blockchain.address.subscribe', [address])
for address in addresses
def update_wallet(self):

if not self.wallet.exists:
self.wallet.create()

# Before subscribing, download history for any addresses that don't have any,
# this avoids situation where we're getting status updates to addresses we know
# need to update anyways. Continue to get history and create more addresses until
# all missing addresses are created and history for them is fully restored.
self.wallet.ensure_enough_addresses()
addresses = list(self.wallet.addresses_without_history)
while addresses:
yield defer.gatherResults([
self.update_history(a) for a in addresses
])
addresses = self.wallet.ensure_enough_addresses()

# By this point all of the addresses should be restored and we
# can now subscribe all of them to receive updates.
yield defer.gatherResults([
self.subscribe_history(address)
for address in self.wallet.addresses
])

def process_transaction(self, tx):
pass
@defer.inlineCallbacks
def update_history(self, address):
history = yield self.network.get_history(address)
for hash in map(itemgetter('tx_hash'), history):
transaction = self.wallet.history.get_transaction(hash)
if not transaction:
raw = yield self.network.get_transaction(hash)
transaction = Transaction(hash, raw, None)
self.wallet.history.add_transaction(address, transaction)

@defer.inlineCallbacks
def subscribe_history(self, address):
status = yield self.network.subscribe_address(address)
if status != self.wallet.history.get_status(address):
self.update_history(address)

def process_status(self, response):
address, status = response
if status != self.wallet.history.get_status(address):
self.update_history(address)

0 comments on commit f698141

Please sign in to comment.