Permalink
Browse files

initial for multi_miner

  • Loading branch information...
1 parent 557945e commit f1ce310af8e7bf508dc4983e45d477b9986355ff @m0mchil committed Sep 25, 2012
Showing with 129 additions and 106 deletions.
  1. +19 −36 BitcoinMiner.py
  2. +31 −31 HttpTransport.py
  3. +44 −19 Servers.py
  4. +19 −18 StratumTransport.py
  5. +16 −2 poclbm.py
View
@@ -7,7 +7,6 @@
from threading import Thread
from time import sleep, time
from util import *
-import log
import pyopencl as cl
ADL_PRESENT = False
@@ -25,56 +24,34 @@
class BitcoinMiner():
- def __init__(self, device, options, version, transport):
+ def __init__(self, device, options):
self.output_size = 0x100
self.options = options
- self.version = version
+
(self.defines, self.rate_divisor, self.hashspace) = if_else(self.options.vectors, ('-DVECTORS', 500, 0x7FFFFFFF), ('', 1000, 0xFFFFFFFF))
self.defines += (' -DOUTPUT_SIZE=' + str(self.output_size))
self.defines += (' -DOUTPUT_MASK=' + str(self.output_size - 1))
self.device = device
- self.options.rate = if_else(self.options.verbose, max(self.options.rate, 60), max(self.options.rate, 0.1))
- self.options.askrate = max(self.options.askrate, 1)
- self.options.askrate = min(self.options.askrate, 10)
self.options.frames = max(self.options.frames, 3)
- self.update_time = False
+ self.update_time_counter = 1
self.share_count = [0, 0]
self.work_queue = Queue()
- self.transport = transport(self)
- log.verbose = self.options.verbose
- log.quiet = self.options.quiet
+
+ self.update = True
if ADL_PRESENT:
self.adapterIndex = self.get_adapter_info()[self.options.device].iAdapterIndex
def start(self):
self.should_stop = False
Thread(target=self.mining_thread).start()
- self.transport.loop()
def stop(self, message = None):
if message: print '\n%s' % message
- self.transport.stop()
self.should_stop = True
- def say_status(self, rate, estimated_rate):
- rate = Decimal(rate) / 1000
- estimated_rate = Decimal(estimated_rate) / 1000
- total_shares = self.share_count[1] + self.share_count[0]
- total_shares_estimator = max(total_shares, total_shares, 1)
- say_quiet('[%.03f MH/s (~%d MH/s)] [Rej: %d/%d (%.02f%%)]', (rate, round(estimated_rate), self.share_count[0], total_shares, float(self.share_count[0]) * 100 / total_shares_estimator))
-
- def diff1_found(self, hash, target):
- if self.options.verbose and target < 0xFFFF0000L:
- say_line('checking %s <= %s', (hash, target))
-
- def share_found(self, hash, accepted, is_block):
- self.share_count[if_else(accepted, 1, 0)] += 1
- if self.options.verbose or is_block:
- say_line('%s%s, %s', (if_else(is_block, 'block ', ''), hash, if_else(accepted, 'accepted', '_rejected_')))
-
def mining_thread(self):
self.load_kernel()
frame = 1.0 / self.options.frames
@@ -141,8 +118,8 @@ def mining_thread(self):
t = now - last_rated
if t > self.options.rate:
- rate = int((threads_run / t) / self.rate_divisor)
-
+ self.rate = int((threads_run / t) / self.rate_divisor)
+ self.rate = Decimal(self.rate) / 1000
if accept_hist:
LAH = accept_hist.pop()
if LAH[1] != self.share_count[1]:
@@ -151,9 +128,10 @@ def mining_thread(self):
while (accept_hist[0][0] < now - self.options.estimate):
accept_hist.pop(0)
new_accept = self.share_count[1] - accept_hist[0][1]
- estimated_rate = Decimal(new_accept) * (work.targetQ) / min(int(now - start_time), self.options.estimate) / 1000
+ self.estimated_rate = Decimal(new_accept) * (work.targetQ) / min(int(now - start_time), self.options.estimate) / 1000
+ self.estimated_rate = Decimal(self.estimated_rate) / 1000
- self.say_status(rate, estimated_rate)
+ self.servers.status_updated()
last_rated = now; threads_run = 0
queue.finish()
@@ -170,13 +148,14 @@ def mining_thread(self):
result.job_id = work.job_id
result.extranonce2 = work.extranonce2
result.server = work.server
- self.transport.put(result)
+ result.miner = self
+ self.servers.put(result)
output.fill(0)
cl.enqueue_write_buffer(queue, output_buffer, output)
- if not self.update_time:
- if nonces_left < (self.transport.timeout + 1) * global_threads * self.options.frames:
- self.transport.update = True
+ if not self.servers.update_time:
+ if nonces_left < 3 * global_threads * self.options.frames:
+ self.update = True
nonces_left += 0xFFFFFFFFFFFF
elif 0xFFFFFFFFFFF < nonces_left < 0xFFFFFFFFFFFF:
say_line('warning: job finished, miner is idle')
@@ -186,6 +165,10 @@ def mining_thread(self):
state2 = partial(state, work.merkle_end, work.time, work.difficulty, f)
calculateF(state, work.merkle_end, work.time, work.difficulty, f, state2)
last_n_time = now
+ self.update_time_counter += 1
+ if self.update_time_counter >= self.servers.max_update_time:
+ self.update = True
+ self.update_time_counter = 1
def load_kernel(self):
self.context = cl.Context([self.device], None, None)
View
@@ -23,7 +23,6 @@ def __init__(self, servers, server):
self.connection = self.lp_connection = None
self.long_poll_timeout = 3600
- self.long_poll_max_askrate = 60 - self.servers.timeout
self.max_redirects = 3
self.postdata = {'method': 'getwork', 'id': 'json'}
@@ -47,11 +46,11 @@ def loop(self):
try:
with self.servers.lock:
- update = self.servers.update = (self.servers.update or (time() - self.servers.last_work) > if_else(self.long_poll_active, self.long_poll_max_askrate, self.config.askrate))
- if update:
- work = self.getwork()
- if self.servers.update:
- self.queue_work(work)
+ miner = self.servers.updatable_miner()
+ while miner:
+ work = self.getwork()
+ self.queue_work(work, miner)
+ miner = self.servers.updatable_miner()
while not self.result_queue.empty():
result = self.result_queue.get(False)
@@ -63,15 +62,15 @@ def loop(self):
traceback.print_exc()
break
- def ensure_connected(self, connection, proto, host, timeout):
+ def ensure_connected(self, connection, proto, host):
if connection != None and connection.sock != None:
return connection, False
if proto == 'https': connector = httplib.HTTPSConnection
else: connector = httplib.HTTPConnection
if not self.config.proxy:
- return connector(host, strict=True, timeout=timeout), True
+ return connector(host, strict=True), True
host, port = host.split(':')
@@ -83,7 +82,6 @@ def ensure_connected(self, connection, proto, host, timeout):
connection = connector(host, strict=True)
connection.sock = socks.socksocket()
- #connection.sock.settimeout(timeout)
proxy_type = socks.PROXY_TYPE_SOCKS5
if proxy_proto == 'http':
@@ -104,18 +102,7 @@ def request(self, connection, url, headers, data=None, timeout=0):
try:
if data: connection.request('POST', url, data, headers)
else: connection.request('GET', url, headers=headers)
- if timeout:
- start = time()
- connection.sock.settimeout(min(timeout, 5))
- response = None
- while not response:
- if self.should_stop or time() - start > timeout: return
- try:
- response = connection.getresponse()
- except socket.timeout:
- pass
- else:
- response = connection.getresponse()
+ response = self.timeout_response(connection, timeout)
if response.status == httplib.UNAUTHORIZED:
say_line('Wrong username or password for %s', self.servers.server_name())
raise NotAuthorized()
@@ -125,10 +112,10 @@ def request(self, connection, url, headers, data=None, timeout=0):
url = response.getheader('Location', '')
if r == 0 or url == '': raise HTTPException('Too much or bad redirects')
connection.request('GET', url, headers=headers)
- response = connection.getresponse();
+ response = self.timeout_response(connection, timeout)
r -= 1
self.long_poll_url = response.getheader('X-Long-Polling', '')
- self.servers.miner.update_time = bool(response.getheader('X-Roll-NTime', ''))
+ self.servers.update_time = bool(response.getheader('X-Roll-NTime', ''))
hostList = response.getheader('X-Host-List', '')
self.stratum_header = response.getheader('x-stratum', '')
if (not self.config.nsf) and hostList: self.servers.add_servers(loads(hostList))
@@ -142,9 +129,24 @@ def request(self, connection, url, headers, data=None, timeout=0):
connection.close()
connection = None
+ def timeout_response(self, connection, timeout):
+ if timeout:
+ start = time()
+ connection.sock.settimeout(5)
+ response = None
+ while not response:
+ if self.should_stop or time() - start > timeout: return
+ try:
+ response = connection.getresponse()
+ except socket.timeout:
+ pass
+ return response
+ else:
+ return connection.getresponse()
+
def getwork(self, data=None):
- try:
- self.connection = self.ensure_connected(self.connection, self.proto, self.host, self.servers.timeout)[0]
+ try:
+ self.connection = self.ensure_connected(self.connection, self.proto, self.host)[0]
self.postdata['params'] = if_else(data, [data], [])
(self.connection, result) = self.request(self.connection, '/', self.headers, dumps(self.postdata))
@@ -158,7 +160,7 @@ def send_internal(self, result, nonce):
data = ''.join([result.header.encode('hex'), pack('III', long(result.time), long(result.difficulty), long(nonce)).encode('hex'), '000000800000000000000000000000000000000000000000000000000000000000000000000000000000000080020000'])
accepted = self.getwork(data)
if accepted != None:
- self.servers.report(nonce, accepted)
+ self.servers.report(result.miner, nonce, accepted)
def long_poll_thread(self):
last_host = None
@@ -179,7 +181,7 @@ def long_poll_thread(self):
if url == '': url = '/'
try:
if host != last_host: self.close_lp_connection()
- self.lp_connection, changed = self.ensure_connected(self.lp_connection, proto, host, self.long_poll_timeout)
+ self.lp_connection, changed = self.ensure_connected(self.lp_connection, proto, host)
if changed:
say_line("LP connected to %s", self.servers.server_name())
last_host = host
@@ -215,14 +217,12 @@ def close_lp_connection(self):
self.lp_connection.close()
self.lp_connection = None
- def queue_work(self, work):
+ def queue_work(self, work, miner=None):
if work:
if not 'target' in work:
work['target'] = '0000000000000000000000000000000000000000000000000000ffff00000000'
- self.servers.queue_work(self, work['data'], work['target'])
- else:
- self.servers.queue_work(self, work)
+ self.servers.queue_work(self, work['data'], work['target'], miner)
def detect_stratum(self):
work = self.getwork()
View
@@ -7,13 +7,13 @@
import log
class Servers(object):
- def __init__(self, miner):
+ def __init__(self, config):
self.lock = RLock()
- self.miner = miner
- self.config = miner.options
- self.update = True
+ self.miners = []
+ self.config = config
self.last_work = 0
- self.timeout = 5
+ self.update_time = True
+ self.max_update_time = config.max_update_time
self.backup_server_index = 1
self.errors = 0
@@ -23,7 +23,7 @@ def __init__(self, miner):
self.save_server = None
self.server_map = {}
- self.user_agent = 'poclbm/' + miner.version
+ self.user_agent = 'poclbm/' + config.version
self.difficulty = 0
self.true_target = None
@@ -71,6 +71,16 @@ def parse_server(self, server, mailAsUser=True):
return (proto, user, pwd, host, name)
+ def add_miner(self, miner):
+ self.miners.append(miner)
+ miner.servers = self
+
+ def updatable_miner(self):
+ for miner in self.miners:
+ if miner.update:
+ miner.update = False
+ return miner
+
def loop(self):
self.should_stop = False
if not self.servers:
@@ -148,6 +158,9 @@ def decode(self, server, block_header, target, job_id = None, extranonce2 = None
job.server = server
calculateF(job.state, job.merkle_end, job.time, job.difficulty, job.f, job.state2)
+
+ if job.difficulty != self.difficulty:
+ self.set_difficulty(job.difficulty)
return job
@@ -158,30 +171,39 @@ def set_difficulty(self, difficulty):
true_target = ''.join(list(chunks(true_target, 2))[::-1])
self.true_target = np.array(unpack('IIIIIIII', true_target.decode('hex')), dtype=np.uint32)
- def process(self, work):
- if work:
- if work.difficulty != self.difficulty:
- self.set_difficulty(work.difficulty)
-
def send(self, result, send_callback):
- for i in xrange(self.miner.output_size):
+ for i in xrange(result.miner.output_size):
if result.nonce[i]:
h = hash(result.state, result.merkle_end, result.time, result.difficulty, result.nonce[i])
if h[7] != 0:
say_line('Verification failed, check hardware!')
- #self.miner.stop()
else:
- self.miner.diff1_found(bytereverse(h[6]), result.target[6])
+ self.diff1_found(bytereverse(h[6]), result.target[6])
if belowOrEquals(h[:7], result.target[:7]):
is_block = belowOrEquals(h[:7], self.true_target[:7])
hash6 = pack('I', long(h[6])).encode('hex')
hash5 = pack('I', long(h[5])).encode('hex')
self.sent[result.nonce[i]] = (is_block, hash6, hash5)
return send_callback(result, result.nonce[i])
- def report(self, nonce, accepted):
+ def diff1_found(self, hash, target):
+ if self.config.verbose and target < 0xFFFF0000L:
+ say_line('checking %s <= %s', (hash, target))
+
+ def status_updated(self):
+ rate = sum([m.rate for m in self.miners])
+ estimated_rate = sum([m.estimated_rate for m in self.miners])
+ rejected_shares = sum([m.share_count[0] for m in self.miners])
+ total_shares = rejected_shares + sum([m.share_count[1] for m in self.miners])
+ total_shares_estimator = max(total_shares, 1)
+ say_quiet('[%.03f MH/s (~%d MH/s)] [Rej: %d/%d (%.02f%%)]', (rate, round(estimated_rate), rejected_shares, total_shares, float(rejected_shares) * 100 / total_shares_estimator))
+
+ def report(self, miner, nonce, accepted):
is_block, hash6, hash5 = self.sent[nonce]
- self.miner.share_found(if_else(is_block, hash6 + hash5, hash6), accepted, is_block)
+ miner.share_count[if_else(accepted, 1, 0)] += 1
+ hash = if_else(is_block, hash6 + hash5, hash6)
+ if self.config.verbose or is_block:
+ say_line('%s%s, %s', (if_else(is_block, 'block ', ''), hash, if_else(accepted, 'accepted', '_rejected_')))
del self.sent[nonce]
def set_server_by_index(self, server_index):
@@ -202,11 +224,14 @@ def add_servers(self, hosts):
server = (server[0], server[1], server[2], ''.join([host['host'], ':', str(host['port'])]), server[4])
self.servers.insert(self.backup_server_index, server)
- def queue_work(self, server, block_header, target = None, job_id = None, extranonce2 = None):
+ def queue_work(self, server, block_header, target = None, job_id = None, extranonce2 = None, miner=None):
work = self.decode(server, block_header, target, job_id, extranonce2)
- self.process(work)
with self.lock:
- self.miner.work_queue.put(work)
+ if not miner:
+ miner = self.miners[0]
+ for i in xrange(1, len(self.miners)):
+ self.miners[i].update = True
+ miner.work_queue.put(work)
if work:
self.update = False; self.last_work = time()
if self.last_block != work.header[25:29]:
Oops, something went wrong.

0 comments on commit f1ce310

Please sign in to comment.