Skip to content

Commit

Permalink
Revert "broadcast shares in serial", strongly suspected of causing a …
Browse files Browse the repository at this point in the history
…memory leak

This reverts commit 6f1a456.

Conflicts:

	p2pool/main.py
	p2pool/p2p.py
	p2pool/util/p2protocol.py
  • Loading branch information
forrestv committed Jan 27, 2013
1 parent 0e325a4 commit 19bf4ea
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 28 deletions.
5 changes: 2 additions & 3 deletions p2pool/node.py
Expand Up @@ -76,7 +76,6 @@ def handle_bestblock(self, header, peer):
raise p2p.PeerMisbehavingError('received block header fails PoW test') raise p2p.PeerMisbehavingError('received block header fails PoW test')
self.node.handle_header(header) self.node.handle_header(header)


@defer.inlineCallbacks
def broadcast_share(self, share_hash): def broadcast_share(self, share_hash):
shares = [] shares = []
for share in self.node.tracker.get_chain(share_hash, min(5, self.node.tracker.get_height(share_hash))): for share in self.node.tracker.get_chain(share_hash, min(5, self.node.tracker.get_height(share_hash))):
Expand All @@ -85,8 +84,8 @@ def broadcast_share(self, share_hash):
self.shared_share_hashes.add(share.hash) self.shared_share_hashes.add(share.hash)
shares.append(share) shares.append(share)


for peer in list(self.peers.itervalues()): for peer in self.peers.itervalues():
yield peer.sendShares([share for share in shares if share.peer_addr != peer.addr], self.node.tracker, self.node.known_txs_var.value, include_txs_with=[share_hash]) peer.sendShares([share for share in shares if share.peer_addr != peer.addr], self.node.tracker, self.node.known_txs_var.value, include_txs_with=[share_hash])


def start(self): def start(self):
p2p.Node.start(self) p2p.Node.start(self)
Expand Down
15 changes: 4 additions & 11 deletions p2pool/p2p.py
Expand Up @@ -19,10 +19,10 @@ class PeerMisbehavingError(Exception):


def fragment(f, **kwargs): def fragment(f, **kwargs):
try: try:
return f(**kwargs) f(**kwargs)
except p2protocol.TooLong: except p2protocol.TooLong:
fragment(f, **dict((k, v[:len(v)//2]) for k, v in kwargs.iteritems())) fragment(f, **dict((k, v[:len(v)//2]) for k, v in kwargs.iteritems()))
return fragment(f, **dict((k, v[len(v)//2:]) for k, v in kwargs.iteritems())) fragment(f, **dict((k, v[len(v)//2:]) for k, v in kwargs.iteritems()))


class Protocol(p2protocol.Protocol): class Protocol(p2protocol.Protocol):
max_remembered_txs_size = 2500000 max_remembered_txs_size = 2500000
Expand All @@ -36,8 +36,6 @@ def __init__(self, node, incoming):
self.connected2 = False self.connected2 = False


def connectionMade(self): def connectionMade(self):
p2protocol.Protocol.connectionMade(self)

self.factory.proto_made_connection(self) self.factory.proto_made_connection(self)


self.connection_lost_event = variable.Event() self.connection_lost_event = variable.Event()
Expand Down Expand Up @@ -260,9 +258,6 @@ def handle_shares(self, shares):
self.node.handle_shares([p2pool_data.load_share(share, self.node.net, self.addr) for share in shares if share['type'] >= 9], self) self.node.handle_shares([p2pool_data.load_share(share, self.node.net, self.addr) for share in shares if share['type'] >= 9], self)


def sendShares(self, shares, tracker, known_txs, include_txs_with=[]): def sendShares(self, shares, tracker, known_txs, include_txs_with=[]):
if not shares:
return defer.succeed(None)

if self.other_version >= 8: if self.other_version >= 8:
tx_hashes = set() tx_hashes = set()
for share in shares: for share in shares:
Expand All @@ -280,14 +275,12 @@ def sendShares(self, shares, tracker, known_txs, include_txs_with=[]):


fragment(self.send_remember_tx, tx_hashes=[x for x in hashes_to_send if x in self.remote_tx_hashes], txs=[known_txs[x] for x in hashes_to_send if x not in self.remote_tx_hashes]) fragment(self.send_remember_tx, tx_hashes=[x for x in hashes_to_send if x in self.remote_tx_hashes], txs=[known_txs[x] for x in hashes_to_send if x not in self.remote_tx_hashes])


res = fragment(self.send_shares, shares=[share.as_share() for share in shares]) fragment(self.send_shares, shares=[share.as_share() for share in shares])


if self.other_version >= 8: if self.other_version >= 8:
res = self.send_forget_tx(tx_hashes=hashes_to_send) self.send_forget_tx(tx_hashes=hashes_to_send)


self.remote_remembered_txs_size -= sum(100 + bitcoin_data.tx_type.packed_size(known_txs[x]) for x in hashes_to_send) self.remote_remembered_txs_size -= sum(100 + bitcoin_data.tx_type.packed_size(known_txs[x]) for x in hashes_to_send)

return res




message_sharereq = pack.ComposedType([ message_sharereq = pack.ComposedType([
Expand Down
14 changes: 0 additions & 14 deletions p2pool/util/p2protocol.py
Expand Up @@ -19,21 +19,8 @@ def __init__(self, message_prefix, max_payload_length, traffic_happened=variable
self._message_prefix = message_prefix self._message_prefix = message_prefix
self._max_payload_length = max_payload_length self._max_payload_length = max_payload_length
self.dataReceived2 = datachunker.DataChunker(self.dataReceiver()) self.dataReceived2 = datachunker.DataChunker(self.dataReceiver())
self.paused_var = variable.Variable(False)
self.traffic_happened = traffic_happened self.traffic_happened = traffic_happened


def connectionMade(self):
self.transport.registerProducer(self, True)

def pauseProducing(self):
self.paused_var.set(True)

def resumeProducing(self):
self.paused_var.set(False)

def stopProducing(self):
pass

def dataReceived(self, data): def dataReceived(self, data):
self.traffic_happened.happened('p2p/in', len(data)) self.traffic_happened.happened('p2p/in', len(data))
self.dataReceived2(data) self.dataReceived2(data)
Expand Down Expand Up @@ -96,7 +83,6 @@ def sendPacket(self, command, payload2):
data = self._message_prefix + struct.pack('<12sI', command, len(payload)) + hashlib.sha256(hashlib.sha256(payload).digest()).digest()[:4] + payload data = self._message_prefix + struct.pack('<12sI', command, len(payload)) + hashlib.sha256(hashlib.sha256(payload).digest()).digest()[:4] + payload
self.traffic_happened.happened('p2p/out', len(data)) self.traffic_happened.happened('p2p/out', len(data))
self.transport.write(data) self.transport.write(data)
return self.paused_var.get_when_satisfies(lambda paused: not paused)


def __getattr__(self, attr): def __getattr__(self, attr):
prefix = 'send_' prefix = 'send_'
Expand Down

0 comments on commit 19bf4ea

Please sign in to comment.