Permalink
Browse files

implemented stratum reconnect and add_peers

storing server entries as objects
  • Loading branch information...
1 parent 2fb93af commit c890423910766733efea829861391d3c3165526e @m0mchil committed Oct 30, 2012
Showing with 104 additions and 97 deletions.
  1. +10 −10 GetworkSource.py
  2. +4 −3 Source.py
  3. +25 −18 StratumSource.py
  4. +65 −66 Switch.py
View
@@ -20,15 +20,15 @@ class NotAuthorized(Exception): pass
class RPCError(Exception): pass
class GetworkSource(Source):
- def __init__(self, switch, server):
- super(GetworkSource, self).__init__(switch, server)
-
+ def __init__(self, switch):
+ super(GetworkSource, self).__init__(switch)
+
self.connection = self.lp_connection = None
self.long_poll_timeout = 3600
self.max_redirects = 3
self.postdata = {'method': 'getwork', 'id': 'json'}
- self.headers = {"User-Agent": self.switch.user_agent, "Authorization": 'Basic ' + b64encode('%s:%s' % (self.user, self.pwd)), "X-Mining-Extensions": 'hostlist midstate rollntime'}
+ self.headers = {"User-Agent": self.switch.user_agent, "Authorization": 'Basic ' + b64encode('%s:%s' % (self.server().user, self.server().pwd)), "X-Mining-Extensions": 'hostlist midstate rollntime'}
self.long_poll_url = ''
self.long_poll_active = False
@@ -107,7 +107,7 @@ def request(self, connection, url, headers, data=None, timeout=0):
if not response:
return None
if response.status == httplib.UNAUTHORIZED:
- say_line('Wrong username or password for %s', self.switch.server_name())
+ say_line('Wrong username or password for %s', self.server().name)
self.authorization_failed = True
raise NotAuthorized()
r = self.max_redirects
@@ -151,7 +151,7 @@ def timeout_response(self, connection, timeout):
def getwork(self, data=None):
try:
- self.connection = self.ensure_connected(self.connection, self.proto, self.host)[0]
+ self.connection = self.ensure_connected(self.connection, self.server().proto, self.server().host)[0]
self.postdata['params'] = if_else(data, [data], [])
(self.connection, result) = self.request(self.connection, '/', self.headers, dumps(self.postdata))
@@ -178,8 +178,8 @@ def long_poll_thread(self):
url = self.long_poll_url
if url != '':
- proto = self.proto
- host = self.host
+ proto = self.server().proto
+ host = self.server().host
parsedUrl = urlsplit(url)
if parsedUrl.scheme != '':
proto = parsedUrl.scheme
@@ -191,7 +191,7 @@ def long_poll_thread(self):
if host != last_host: self.close_lp_connection()
self.lp_connection, changed = self.ensure_connected(self.lp_connection, proto, host)
if changed:
- say_line("LP connected to %s", self.switch.server_name())
+ say_line("LP connected to %s", self.server().name)
last_host = host
self.long_poll_active = True
@@ -252,4 +252,4 @@ def detect_stratum(self):
return False
say_line('no response to getwork, using as stratum')
- return self.host
+ return self.server().host
View
@@ -3,13 +3,14 @@
class Source(object):
- def __init__(self, switch, server):
+ def __init__(self, switch):
self.switch = switch
- self.server = server
- self.proto, self.user, self.pwd, self.host, self.name = server[:5]
self.result_queue = Queue()
self.options = switch.options
+ def server(self):
+ return self.switch.server()
+
def loop(self):
self.should_stop = False
self.last_failback = time()
View
@@ -4,7 +4,7 @@
from json import dumps, loads
from log import say_exception, say_line
from struct import pack
-from threading import Thread, Lock
+from threading import Thread, Lock, Timer
from time import sleep, time
from util import chunks, Object
import asynchat
@@ -49,8 +49,8 @@ def detect_stratum_proxy(host):
class StratumSource(Source):
- def __init__(self, switch, server):
- super(StratumSource, self).__init__(switch, server)
+ def __init__(self, switch):
+ super(StratumSource, self).__init__(switch)
self.handler = None
self.socket = None
self.channel_map = {}
@@ -86,8 +86,7 @@ def loop(self):
if not self.handler:
try:
#socket = ssl.wrap_socket(socket)
- host = self.server[3]
- address, port = host.split(':', 1)
+ address, port = self.server().host.split(':', 1)
if not self.options.proxy:
@@ -208,16 +207,21 @@ def handle_message(self, message):
elif message['method'] == 'mining.set_difficulty':
say_line("Setting new difficulty: %s", message['params'][0])
self.server_difficulty = BASE_DIFFICULTY / message['params'][0]
-
+
#client.reconnect
elif message['method'] == 'client.reconnect':
- (hostname, port) = message['params'][:2]
- server = self.switch[self.server_index]
- say_line(server[4] + " asked us to reconnect to %s:%d", (hostname, port))
- server[3] = hostname + ':' + str(port)
- self.server = server
- self.switch[self.server_index] = server
- self.handler.close()
+ address, port = self.server().host.split(':', 1)
+ (new_address, new_port, timeout) = message['params'][:3]
+ if new_address: address = new_address
+ if new_port != None: port = new_port
+ say_line("%s asked us to reconnect to %s:%d in %d seconds", (self.server().name, address, port, timeout))
+ self.server().host = address + ':' + str(port)
+ Timer(timeout, self.reconnect).start()
+
+ #client.add_peers
+ elif message['method'] == 'client.add_peers':
+ hosts = [{'host': host[0], 'port': host[1]} for host in message['params'][0]]
+ self.switch.add_servers(hosts)
#responses to server API requests
elif 'result' in message:
@@ -244,13 +248,17 @@ def handle_message(self, message):
self.last_submits_cleanup = now
#response to mining.authorize
- elif message['id'] == self.server[1]:
+ elif message['id'] == self.server().user:
if not message['result']:
- say_line('authorization failed with %s:%s@%s', (self.server[1:4]))
+ say_line('authorization failed with %s:%s@%s', (self.server().user, self.server().pwd, self.server().host))
self.authorized = False
else:
self.authorized = True
+ def reconnect(self):
+ say_line("%s reconnecting to %s", (self.server().name, self.server().host))
+ self.handler.close()
+
def subscribe(self):
self.send_message({'id': 's', 'method': 'mining.subscribe', 'params': []})
for i in xrange(10):
@@ -259,7 +267,7 @@ def subscribe(self):
return self.subscribed
def authorize(self):
- self.send_message({'id': self.user, 'method': 'mining.authorize', 'params': [self.user, self.pwd]})
+ self.send_message({'id': self.server().user, 'method': 'mining.authorize', 'params': [self.server().user, self.server().pwd]})
for i in xrange(10):
sleep(1)
if self.authorized != None: break
@@ -269,13 +277,12 @@ def send_internal(self, result, nonce):
job_id = result.job_id
if not job_id in self.jobs:
return True
- user = self.server[1]
extranonce2 = result.extranonce2
ntime = pack('I', long(result.time)).encode('hex')
hex_nonce = pack('I', long(nonce)).encode('hex')
id_ = job_id + hex_nonce
self.submits[id_] = (result.miner, nonce, time())
- return self.send_message({'params': [user, job_id, extranonce2, ntime, hex_nonce], 'id': id_, 'method': u'mining.submit'})
+ return self.send_message({'params': [self.server().user, job_id, extranonce2, ntime, hex_nonce], 'id': id_, 'method': u'mining.submit'})
def send_message(self, message):
data = dumps(message) + '\n'
Oops, something went wrong.

0 comments on commit c890423

Please sign in to comment.