Skip to content

Commit

Permalink
add table for tracking hash announcements
Browse files Browse the repository at this point in the history
  • Loading branch information
jobevers committed Jan 23, 2017
1 parent 7696f30 commit 9b84b70
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 6 deletions.
17 changes: 15 additions & 2 deletions lbrynet/core/BlobManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def add_blob_to_upload_history(self, blob_hash, host, rate):

def _immediate_announce(self, blob_hashes):
if self.hash_announcer:
return self.hash_announcer.immediate_announce(blob_hashes)
return self.hash_announcer.immediate_announce(blob_hashes, self)


# TODO: Having different managers for different blobs breaks the
Expand Down Expand Up @@ -140,6 +140,13 @@ def hashes_to_announce(self):
next_announce_time = time.time() + self.hash_reannounce_time
return self._get_blobs_to_announce(next_announce_time)

@rerun_if_locked
def on_hash_announced(self, blob_hash):
self.db_conn.runQuery(
"insert into announcement (blob_hash, announce_time) values (?, ?)",
(blob_hash, time.time())
)

def creator_finished(self, blob_creator):
log.debug("blob_creator.blob_hash: %s", blob_creator.blob_hash)
assert blob_creator.blob_hash is not None
Expand Down Expand Up @@ -258,6 +265,12 @@ def create_tables(transaction):
" rate float, " +
" ts integer)")

# Table to track when blobs where announced to the dht
# - intended mostly for debugging
transaction.execute("create table if not exists announcement (" +
" blob_hash text, " +
" announce_time real)")

return self.db_conn.runInteraction(create_tables)

@rerun_if_locked
Expand Down Expand Up @@ -427,7 +440,7 @@ def delete_blobs(self, blob_hashes):

def immediate_announce_all_blobs(self):
if self.hash_announcer:
return self.hash_announcer.immediate_announce(self.blobs.iterkeys())
return self.hash_announcer.immediate_announce(self.blobs.iterkeys(), self)

def _manage(self):
from twisted.internet import reactor
Expand Down
9 changes: 5 additions & 4 deletions lbrynet/core/server/DHTHashAnnouncer.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ def stop(self):
def add_supplier(self, supplier):
self.suppliers.append(supplier)

def immediate_announce(self, blob_hashes):
def immediate_announce(self, blob_hashes, supplier):
if self.peer_port is not None:
return self._announce_hashes(blob_hashes)
return self._announce_hashes(blob_hashes, supplier)
else:
return defer.succeed(False)

Expand All @@ -47,12 +47,12 @@ def _announce_available_hashes(self):
ds = []
for supplier in self.suppliers:
d = supplier.hashes_to_announce()
d.addCallback(self._announce_hashes)
d.addCallback(self._announce_hashes, supplier)
ds.append(d)
dl = defer.DeferredList(ds)
return dl

def _announce_hashes(self, hashes):
def _announce_hashes(self, hashes, supplier):
if not hashes:
return
log.debug('Announcing %s hashes', len(hashes))
Expand All @@ -71,6 +71,7 @@ def announce():
h, announce_deferred = self.hash_queue.popleft()
log.debug('Announcing blob %s to dht', h)
d = self.dht_node.announceHaveBlob(binascii.unhexlify(h), self.peer_port)
d.addCallback(lambda _: supplier.on_hash_announced(h))
d.chainDeferred(announce_deferred)
d.addBoth(lambda _: reactor.callLater(0, announce))
else:
Expand Down

0 comments on commit 9b84b70

Please sign in to comment.