Skip to content

Commit

Permalink
clean implementation of daemon threads
Browse files Browse the repository at this point in the history
  • Loading branch information
ThomasV committed Mar 13, 2015
1 parent 58f9ab3 commit 72688a5
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 111 deletions.
41 changes: 8 additions & 33 deletions lib/blockchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,74 +19,53 @@

import threading, time, Queue, os, sys, shutil
from util import user_dir, print_error
import util
from bitcoin import *


class Blockchain(threading.Thread):
class Blockchain(util.DaemonThread):

def __init__(self, config, network):
threading.Thread.__init__(self)
self.daemon = True
util.DaemonThread.__init__(self)
self.config = config
self.network = network
self.lock = threading.Lock()
self.local_height = 0
self.running = False
self.headers_url = 'http://headers.electrum.org/blockchain_headers'
self.set_local_height()
self.queue = Queue.Queue()


def height(self):
return self.local_height


def stop(self):
with self.lock: self.running = False


def is_running(self):
with self.lock: return self.running


def run(self):
self.init_headers_file()
self.set_local_height()
print_error( "blocks:", self.local_height )

with self.lock:
self.running = True

while self.is_running():

try:
result = self.queue.get()
result = self.queue.get(timeout=0.1)
except Queue.Empty:
continue

if not result: continue

if not result:
continue
i, header = result
if not header: continue

if not header:
continue
height = header.get('block_height')

if height <= self.local_height:
continue

if height > self.local_height + 50:
if not self.get_and_verify_chunks(i, header, height):
continue

if height > self.local_height:
# get missing parts from interface (until it connects to my chain)
chain = self.get_chain( i, header )

# skip that server if the result is not consistent
if not chain:
print_error('e')
continue

# verify the chain
if self.verify_chain( chain ):
print_error("height:", height, i.server)
Expand All @@ -96,13 +75,9 @@ def run(self):
print_error("error", i.server)
# todo: dismiss that server
continue


self.network.new_blockchain_height(height, i)




def verify_chain(self, chain):

first_header = chain[0]
Expand Down
27 changes: 6 additions & 21 deletions lib/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,18 +66,17 @@ def get_daemon(config, start_daemon=True):



class ClientThread(threading.Thread):
class ClientThread(util.DaemonThread):

def __init__(self, server, s):
threading.Thread.__init__(self)
util.DaemonThread.__init__(self)
self.server = server
self.daemon = True
self.client_pipe = util.SocketPipe(s)
self.response_queue = Queue.Queue()
self.server.add_client(self)

def reading_thread(self):
while self.running:
while self.is_running():
try:
request = self.client_pipe.get()
except util.timeout:
Expand All @@ -91,9 +90,8 @@ def reading_thread(self):
self.server.send_request(self, request)

def run(self):
self.running = True
threading.Thread(target=self.reading_thread).start()
while self.running:
while self.is_running():
try:
response = self.response_queue.get(timeout=0.1)
except Queue.Empty:
Expand All @@ -109,11 +107,10 @@ def run(self):



class NetworkServer(threading.Thread):
class NetworkServer(util.DaemonThread):

def __init__(self, config):
threading.Thread.__init__(self)
self.daemon = True
util.DaemonThread.__init__(self)
self.debug = False
self.config = config
self.network = Network(config)
Expand All @@ -128,18 +125,6 @@ def __init__(self, config):
self.request_id = 0
self.requests = {}

def is_running(self):
with self.lock:
return self.running

def stop(self):
with self.lock:
self.running = False

def start(self):
self.running = True
threading.Thread.start(self)

def add_client(self, client):
for key in ['status','banner','updated','servers','interfaces']:
value = self.network.get_status_value(key)
Expand Down
18 changes: 3 additions & 15 deletions lib/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,20 +120,18 @@ def serialize_server(host, port, protocol):
return str(':'.join([host, port, protocol]))


class Network(threading.Thread):
class Network(util.DaemonThread):

def __init__(self, config=None):
if config is None:
config = {} # Do not use mutables as default values!
threading.Thread.__init__(self)
self.daemon = True
util.DaemonThread.__init__(self)
self.config = SimpleConfig(config) if type(config) == type({}) else config
self.lock = threading.Lock()
self.num_server = 8 if not self.config.get('oneserver') else 0
self.blockchain = Blockchain(self.config, self)
self.interfaces = {}
self.queue = Queue.Queue()
self.running = False
# Server for addresses and transactions
self.default_server = self.config.get('server')
# Sanitize default server
Expand Down Expand Up @@ -270,10 +268,9 @@ def start(self, response_queue):
self.response_queue = response_queue
self.start_interfaces()
t = threading.Thread(target=self.process_requests_thread)
t.daemon = True
t.start()
self.blockchain.start()
threading.Thread.start(self)
util.DaemonThread.start(self)

def set_proxy(self, proxy):
self.proxy = proxy
Expand Down Expand Up @@ -540,15 +537,6 @@ def on_address(self, i, r):
self.addresses[addr] = result
self.response_queue.put(r)

def stop(self):
self.print_error("stopping network")
with self.lock:
self.running = False

def is_running(self):
with self.lock:
return self.running

def get_header(self, tx_height):
return self.blockchain.read_header(tx_height)

Expand Down
11 changes: 2 additions & 9 deletions lib/network_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@



class NetworkProxy(threading.Thread):
class NetworkProxy(util.DaemonThread):

def __init__(self, socket, config=None):

if config is None:
config = {} # Do not use mutables as default arguments!
threading.Thread.__init__(self)
util.DaemonThread.__init__(self)
self.config = SimpleConfig(config) if type(config) == type({}) else config
self.message_id = 0
self.unanswered_requests = {}
Expand All @@ -48,8 +48,6 @@ def __init__(self, socket, config=None):
self.lock = threading.Lock()
self.pending_transactions_for_notifications = []
self.callbacks = {}
self.running = True
self.daemon = True

if socket:
self.pipe = util.SocketPipe(socket)
Expand All @@ -70,8 +68,6 @@ def __init__(self, socket, config=None):
self.server_height = 0
self.interfaces = []

def is_running(self):
return self.running

def run(self):
while self.is_running():
Expand Down Expand Up @@ -213,9 +209,6 @@ def get_parameters(self):
def set_parameters(self, *args):
return self.synchronous_get([('network.set_parameters', args)])[0]

def stop(self):
self.running = False

def stop_daemon(self):
return self.send([('daemon.stop',[])], None)

Expand Down
17 changes: 3 additions & 14 deletions lib/synchronizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,31 +22,22 @@
import Queue

import bitcoin
import util
from util import print_error
from transaction import Transaction


class WalletSynchronizer(threading.Thread):
class WalletSynchronizer(util.DaemonThread):

def __init__(self, wallet, network):
threading.Thread.__init__(self)
self.daemon = True
util.DaemonThread.__init__(self)
self.wallet = wallet
self.network = network
self.was_updated = True
self.running = False
self.lock = threading.Lock()
self.queue = Queue.Queue()
self.address_queue = Queue.Queue()

def stop(self):
with self.lock:
self.running = False

def is_running(self):
with self.lock:
return self.running

def add(self, address):
self.address_queue.put(address)

Expand All @@ -57,8 +48,6 @@ def subscribe_to_addresses(self, addresses):
self.network.send(messages, self.queue.put)

def run(self):
with self.lock:
self.running = True
while self.is_running():
while not self.network.is_connected():
time.sleep(0.1)
Expand Down
25 changes: 25 additions & 0 deletions lib/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from datetime import datetime
import urlparse
import urllib
import threading

class NotEnoughFunds(Exception): pass

Expand All @@ -20,6 +21,30 @@ def default(self, obj):
return super(MyEncoder, self).default(obj)


class DaemonThread(threading.Thread):
""" daemon thread that terminates cleanly """

def __init__(self):
threading.Thread.__init__(self)
self.parent_thread = threading.currentThread()
self.running = False
self.running_lock = threading.Lock()

def start(self):
with self.running_lock:
self.running = True
return threading.Thread.start(self)

def is_running(self):
with self.running_lock:
return self.running and self.parent_thread.is_alive()

def stop(self):
with self.running_lock:
self.running = False



is_verbose = False
def set_verbosity(b):
global is_verbose
Expand Down
Loading

0 comments on commit 72688a5

Please sign in to comment.