Skip to content

Commit

Permalink
move to using DeferredPool
Browse files Browse the repository at this point in the history
  • Loading branch information
jobevers committed Jan 5, 2017
1 parent 2d0d0a9 commit dee2331
Showing 1 changed file with 33 additions and 42 deletions.
75 changes: 33 additions & 42 deletions scripts/query_available_blobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
import logging
import os
import random
import shutil
import sys
import tempfile

import appdirs
from twisted.internet import defer
Expand All @@ -30,6 +32,7 @@
from lbrynet.core import StreamDescriptor as sd

import pool
import track


log = logging.getLogger()
Expand All @@ -45,30 +48,36 @@ def main(args=None):

log_support.configure_console()

db_dir = appdirs.user_data_dir('LBRY')
#lbrycrd = appdirs.user_data_dir('lbrycrd')
# make a fresh dir or else we will include blobs that we've
# already downloaded but might not otherwise be available.
db_dir = tempfile.mkdtemp()
blob_dir = os.path.join(db_dir, 'blobfiles')
os.makedirs(blob_dir)
storage = Wallet.InMemoryStorage()

wallet = Wallet.LBRYumWallet(storage)#, wallet_conf=os.path.join(lbrycrd, 'lbrycrd.conf'))
session = Session.Session(
0,
db_dir=db_dir,
lbryid=utils.generate_id(),
blob_dir=os.path.join(db_dir, 'blobfiles'),
blob_dir=blob_dir,
dht_node_port=4444,
known_dht_nodes=conf.settings.known_dht_nodes,
peer_port=3333,
use_upnp=False,
wallet=wallet
)
d = session.setup()
d.addErrback(logAndStop)
d.addCallback(lambda _: Tracker.load(session))
d.addErrback(logAndStop)
d.addCallback(processTracker, args.limit, args.download)
d.addErrback(logAndStop)
d.addCallback(lambda _: reactor.stop())
reactor.run()
try:
d = session.setup()
d.addErrback(logAndStop)
d.addCallback(lambda _: Tracker.load(session))
d.addErrback(logAndStop)
d.addCallback(processTracker, args.limit, args.download)
d.addErrback(logAndStop)
d.addCallback(lambda _: reactor.stop())
reactor.run()
finally:
shutil.rmtree(db_dir)


def processTracker(tracker, limit, download):
Expand All @@ -87,16 +96,6 @@ def logAndRaise(err):
return err


def timeout(n):
def wrapper(fn):
def wrapped(*args, **kwargs):
d = fn(*args, **kwargs)
reactor.callLater(n, d.cancel)
return d
return wrapped
return wrapper


class Tracker(object):
def __init__(self, session, blob_tracker, wallet):
self.session = session
Expand Down Expand Up @@ -125,16 +124,14 @@ def processNameClaims(self, limit=None, download=False):
if download:
d.addCallback(lambda _: self._downloadAllBlobs())
d.addCallback(lambda _: self._filterNames('sd_blob'))
d.addCallback(lambda _: print(self.stats))
return d

def _setNames(self, names):
self.names = [Name(n) for n in names]

def _getSdHashes(self):
return defer.DeferredList(
[n.setSdHash(self.wallet) for n in self.names],
fireOnOneErrback=True
)
return pool.DeferredPool((n.setSdHash(self.wallet) for n in self.names), 10)

def _filterNames(self, attr):
self.names = [n for n in self.names if getattr(n, attr)]
Expand All @@ -148,15 +145,15 @@ def attempts_counter(self):
return collections.Counter([n.availability_attempts for n in self.names])

def _checkAvailability(self):
return defer.DeferredList(
[n.check_availability(self.blob_tracker) for n in self.names],
fireOnOneErrback=True
return pool.DeferredPool(
(n.check_availability(self.blob_tracker) for n in self.names),
10
)

def _downloadAllBlobs(self):
return defer.DeferredList(
[n.download_sd_blob(self.session) for n in self.names],
fireOnOneErrback=True
return pool.DeferredPool(
(n.download_sd_blob(self.session) for n in self.names),
10
)


Expand Down Expand Up @@ -190,36 +187,30 @@ def _check_availability(self, blob_tracker):
d.addCallback(lambda b: self._setAvailable(b[self.sd_hash]))
return d

@defer.inlineCallbacks
def check_availability(self, blob_tracker):
if not self.is_available and self.availability_attempts < self.MAX_ATTEMPTS:
while not self.is_available and self.availability_attempts < self.MAX_ATTEMPTS:
self.availability_attempts += 1
log.info('Attempt %s to find %s', self.availability_attempts, self.name)
return self._check_availability(blob_tracker)
else:
return defer.succeed(True)
yield self._check_availability(blob_tracker)

def _setAvailable(self, peer_count):
self.is_available = peer_count > 0

def download_sd_blob(self, session):
print('Trying to get sd_blob for {} using {}'.format(self.name, self.sd_hash))
d = download_sd_blob_with_timeout(session, self.sd_hash, session.payment_rate_manager)
d = track.download_sd_blob_with_timeout(session, self.sd_hash, session.payment_rate_manager)
d.addCallback(sd.BlobStreamDescriptorReader)
d.addCallback(self._setSdBlob)
# swallow errors from the timeout
d.addErrback(lambda err: err.trap(defer.CancelledError))
d.addErrback(lambda err: err.trap(defer.TimeoutError))
return d

def _setSdBlob(self, blob):
print('{} has a blob'.format(self.name))
self.sd_blob = blob


@timeout(60)
def download_sd_blob_with_timeout(session, sd_hash, payment_rate_manager):
return sd.download_sd_blob(session, sd_hash, payment_rate_manager)


def getNameClaims(trie):
for x in trie:
if 'txid' in x:
Expand Down

0 comments on commit dee2331

Please sign in to comment.