Skip to content
This repository has been archived by the owner on Jul 12, 2021. It is now read-only.

Commit

Permalink
C++ core with working memory pool
Browse files Browse the repository at this point in the history
  • Loading branch information
genjix committed Apr 22, 2012
1 parent f3bcc7b commit ce2fafb
Show file tree
Hide file tree
Showing 8 changed files with 330 additions and 49 deletions.
4 changes: 4 additions & 0 deletions backends/libbitcoin/Makefile
@@ -1,5 +1,9 @@
CC = g++ -fPIC -Wall -ansi `pkg-config --cflags libbitcoin` -I/usr/include/python2.7

membuf:
$(CC) -c membuf.cpp -o membuf.o
$(CC) -shared -Wl,-soname,membuf.so membuf.o -lpython2.7 -lboost_python `pkg-config --libs libbitcoin` -lboost_thread -o membuf.so

default:
$(CC) -c history.cpp -o history.o
$(CC) -shared -Wl,-soname,_history.so history.o -lpython2.7 -lboost_python `pkg-config --libs libbitcoin` -lboost_thread -o history1/_history.so
Expand Down
91 changes: 56 additions & 35 deletions backends/libbitcoin/__init__.py
Expand Up @@ -4,7 +4,8 @@
import threading
import time

import history
import history1 as history
import membuf

class HistoryCache:

Expand All @@ -31,9 +32,10 @@ def clear(self, addresses):

class MonitorAddress:

def __init__(self, processor, cache):
def __init__(self, processor, cache, backend):
self.processor = processor
self.cache = cache
self.backend = backend
self.lock = threading.Lock()
# key is hash:index, value is address
self.monitor_output = {}
Expand All @@ -42,6 +44,8 @@ def __init__(self, processor, cache):
# affected
self.affected = {}

backend.memory_buffer.set_handles(self.tx_stored, self.tx_confirmed)

def monitor(self, address, result):
for info in result:
if not info.has_key("raw_output_script"):
Expand All @@ -55,11 +59,24 @@ def monitor(self, address, result):
with self.lock:
self.monitor_address.add(address)

def tx_stored(self, tx_desc):
tx_hash, prevouts, addrs = tx_desc
def unpack(self, tx):
tx_hash = bitcoin.hash_transaction(tx)
previous_outputs = []
for input in tx.inputs:
prevout = input.previous_output
prevout = "%s:%s" % (prevout.hash, prevout.index)
previous_outputs.append(prevout)
addrs = []
for output_index, output in enumerate(tx.outputs):
address = bitcoin.payment_address()
if address.extract(output.output_script):
addrs.append((output_index, str(address)))
return tx_hash, previous_outputs, addrs

def tx_stored(self, tx):
affected_addrs = set()
for prevout_hash, prevout_index in prevouts:
prevout = "%s:%s" % (prevout_hash, prevout_index)
tx_hash, previous_outputs, addrs = self.unpack(tx)
for prevout in previous_outputs:
with self.lock:
if self.monitor_output.has_key(prevout):
affected_addrs.add(self.monitor_output[prevout])
Expand All @@ -73,7 +90,7 @@ def tx_stored(self, tx_desc):
self.notify(affected_addrs)

def tx_confirmed(self, tx_desc):
tx_hash, prevouts, addrs = tx_desc
tx_hash, previous_outputs, addrs = self.unpack(tx)
with self.lock:
affected_addrs = self.affected[tx_hash]
del self.affected[tx_hash]
Expand All @@ -86,8 +103,7 @@ def tx_confirmed(self, tx_desc):
if address in affected_addrs:
self.monitor_output[outpoint] = address
# delete spent outpoints
for prevout_hash, prevout_index in prevouts:
prevout = "%s:%s" % (prevout_hash, prevout_index)
for prevout in previous_outputs:
with self.lock:
if self.monitor_output.has_key(prevout):
del self.monitor_output[prevout]
Expand All @@ -96,14 +112,15 @@ def notify(self, affected_addrs):
templ_response = {"id": None,
"method": "blockchain.address.subscribe",
"params": []}
service = self.backend.mempool_service
chain = self.backend.blockchain
txpool = self.backend.transaction_pool
membuf = self.backend.pool_buffer
memory_buff = self.backend.memory_buffer
for address in affected_addrs:
response = templ_response.copy()
response["params"].append(address)
history.payment_history(chain, txpool, membuf, address,
bind(self.send_notify, _1, response))
history.payment_history(service, chain, txpool, memory_buff,
address, bind(self.send_notify, _1, _2, response))

def mempool_n(self, result):
assert result is not None
Expand All @@ -118,13 +135,16 @@ def mempool_n(self, result):
last_id = last_info["block_hash"]
return last_id

def send_notify(self, result, response):
def send_notify(self, ec, result, response):
if ec:
print "Error: Monitor.send_notify()", ec
return
response["params"].append(self.mempool_n(result))
self.processor.push_response(response)

class Backend:

def __init__(self, monitor):
def __init__(self):
# Create 3 thread-pools each with 1 thread
self.network_service = bitcoin.async_service(1)
self.disk_service = bitcoin.async_service(1)
Expand All @@ -150,9 +170,10 @@ def __init__(self, monitor):
self.poller, self.transaction_pool)
self.session.start(self.handle_start)

self.pool_buffer = \
history.MemoryPoolBuffer(self.transaction_pool,
self.blockchain, monitor)
self.memory_buffer = \
membuf.memory_buffer(self.mempool_service.internal_ptr,
self.blockchain.internal_ptr,
self.transaction_pool.internal_ptr)

def handle_start(self, ec):
if ec:
Expand Down Expand Up @@ -183,7 +204,7 @@ def recv_tx(self, ec, tx, node):
print "Error with new transaction:", ec
return
tx_hash = bitcoin.hash_transaction(tx)
self.pool_buffer.recv_tx(tx, bind(self.store_tx, _1, tx_hash))
self.memory_buffer.receive(tx, bind(self.store_tx, _1, tx_hash))
# Re-subscribe to new transactions from node
node.subscribe_transaction(bind(self.recv_tx, _1, _2, node))

Expand Down Expand Up @@ -247,16 +268,17 @@ def __init__(self, backend, processor):

def get(self, request):
address = str(request["params"][0])
service = self.backend.mempool_service
chain = self.backend.blockchain
txpool = self.backend.transaction_pool
membuf = self.backend.pool_buffer
history.payment_history(chain, txpool, membuf, address,
bind(self.respond, _1, request))
memory_buff = self.backend.memory_buffer
history.payment_history(service, chain, txpool, memory_buff,
address, bind(self.respond, _1, _2, request))

def respond(self, result, request):
if result is None:
def respond(self, ec, result, request):
if ec:
response = {"id": request["id"], "result": None,
"error": {"message": "Error", "code": -4}}
"error": {"message": str(ec), "code": -4}}
else:
response = {"id": request["id"], "result": result, "error": None}
self.processor.push_response(response)
Expand All @@ -269,20 +291,20 @@ def __init__(self, backend, processor, cache, monitor):
self.cache = cache
self.monitor = monitor

self.backend.pool_buffer.cheat = self

def subscribe(self, request):
address = str(request["params"][0])
service = self.backend.mempool_service
chain = self.backend.blockchain
txpool = self.backend.transaction_pool
membuf = self.backend.pool_buffer
history.payment_history(chain, txpool, membuf, address,
bind(self.construct, _1, request))
memory_buff = self.backend.memory_buffer
history.payment_history(service, chain, txpool, memory_buff,
address, bind(self.construct, _1, _2, request))

def construct(self, result, request):
if result is None:
def construct(self, ec, result, request):
if ec:
response = {"id": request["id"], "result": None,
"error": {"message": "Error", "code": -4}}
"error": {"message": str(ec), "code": -4}}
self.processor.push_response(response)
return
last_id = self.monitor.mempool_n(result)
response = {"id": request["id"], "result": last_id, "error": None}
Expand All @@ -306,9 +328,8 @@ class BlockchainProcessor(Processor):
def __init__(self, config):
Processor.__init__(self)
cache = HistoryCache()
monitor = MonitorAddress(self, cache)
self.backend = Backend(monitor)
monitor.backend = self.backend
self.backend = Backend()
monitor = MonitorAddress(self, cache, self.backend)
self.numblocks_subscribe = NumblocksSubscribe(self.backend, self)
self.address_get_history = AddressGetHistory(self.backend, self)
self.address_subscribe = \
Expand Down
8 changes: 6 additions & 2 deletions backends/libbitcoin/h1.py
Expand Up @@ -6,13 +6,17 @@ def blockchain_started(ec, chain):

def finish(ec, result):
print "Finish:", ec
print result
for line in result:
for k, v in line.iteritems():
begin = k + ":"
print begin, " " * (12 - len(begin)), v
print

a = bitcoin.async_service(1)
chain = bitcoin.bdb_blockchain(a, "/home/genjix/libbitcoin/database",
blockchain_started)
txpool = bitcoin.transaction_pool(a, chain)
address = "1Jqu2PVGDvNv4La113hgCJsvRUCDb3W65D"
address = "1FpES68UNcxnXeoaFciqvUSGiKGZ33gbfQ"
history.payment_history(a, chain, txpool, address, finish)
raw_input()

23 changes: 15 additions & 8 deletions backends/libbitcoin/history.cpp
Expand Up @@ -7,7 +7,7 @@ using namespace libbitcoin;
#include <boost/python.hpp>
namespace python = boost::python;

#include "/home/genjix/python-bitcoin/src/primitive.h"
#include "memory_buffer.hpp"

namespace ph = std::placeholders;

Expand Down Expand Up @@ -68,8 +68,8 @@ class query_history
: public std::enable_shared_from_this<query_history>
{
public:
query_history(async_service& service,
blockchain_ptr chain, transaction_pool_ptr txpool)
query_history(async_service& service, blockchain_ptr chain,
transaction_pool_ptr txpool, memory_buffer_ptr membuf)
: strand_(service.get_service()), chain_(chain),
txpool_(txpool), stopped_(false)
{
Expand Down Expand Up @@ -158,7 +158,7 @@ class query_history
BITCOIN_ASSERT(entry->loaded_input && entry->loaded_output);
// value of the input is simply the inverse of
// the corresponding output
entry->loaded_input->value = -entry->loaded_output->value;
entry->loaded_input->value = entry->loaded_output->value;
// Unspent outputs have a raw_output_script field
// Blank this field as it isn't used
entry->loaded_output->raw_output_script.clear();
Expand Down Expand Up @@ -351,7 +351,8 @@ void write_info(std::string& json, info_unit_ptr info)
<< "\"tx_hash\": \"" << pretty_hex(info->tx_hash) << "\","
<< "\"block_hash\": \"" << pretty_hex(info->block_hash) << "\","
<< "\"index\": " << info->index << ","
<< "\"value\": " << info->value << ","
// x for received, and -x for sent amounts
<< "\"value\": " << (info->is_input ? "-" : "") << info->value << ","
<< "\"height\": " << info->height << ","
<< "\"timestamp\": " << info->timestamp << ","
<< "\"is_input\": " << info->is_input << ",";
Expand All @@ -378,18 +379,24 @@ void keep_query_alive_proxy(const std::error_code& ec,
auto entry = *it;
BITCOIN_ASSERT(entry->loaded_output);
write_info(json, entry->loaded_output);
if (entry->input_exists)
{
BITCOIN_ASSERT(entry->loaded_input);
json += ",";
write_info(json, entry->loaded_input);
}
}
json += "]";
pyfunction<const std::error_code&, const std::string&> f(handle_finish);
f(ec, json);
}

void payment_history(async_service_ptr service, blockchain_ptr chain,
transaction_pool_ptr txpool, const std::string& address,
python::object handle_finish)
transaction_pool_ptr txpool, memory_buffer_ptr membuf,
const std::string& address, python::object handle_finish)
{
query_history_ptr history =
std::make_shared<query_history>(*service, chain, txpool);
std::make_shared<query_history>(*service, chain, txpool, membuf);
history->start(address,
std::bind(keep_query_alive_proxy, ph::_1, ph::_2,
handle_finish, history));
Expand Down
6 changes: 3 additions & 3 deletions backends/libbitcoin/history1/__init__.py
Expand Up @@ -5,8 +5,8 @@
def wrap_finish(handle_finish, ec, result_json):
handle_finish(ec, json.loads(result_json))

def payment_history(service, chain, txpool, address, finish):
def payment_history(service, chain, txpool, membuf, address, finish):
_history.payment_history(service.internal_ptr, chain.internal_ptr,
txpool.internal_ptr, address,
bind(wrap_finish, finish, _1, _2))
txpool.internal_ptr, membuf.internal_ptr,
address, bind(wrap_finish, finish, _1, _2))

40 changes: 40 additions & 0 deletions backends/libbitcoin/membuf.cpp
@@ -0,0 +1,40 @@
#include <boost/python.hpp>
namespace python = boost::python;

#include "memory_buffer.hpp"

struct memory_buffer_wrapper
{
memory_buffer_wrapper(async_service_ptr service, blockchain_ptr chain,
transaction_pool_ptr txpool)
{
membuf = std::make_shared<memory_buffer>(service, chain, txpool);
}

void set_handles(python::object handle_tx_stored,
python::object handle_tx_confirmed)
{
membuf->set_handles(handle_tx_stored, handle_tx_confirmed);
}

void receive(const message::transaction& tx,
python::object handle_receive)
{
membuf->receive(tx, handle_receive);
}

memory_buffer_ptr membuf;
};

BOOST_PYTHON_MODULE(membuf)
{
using namespace boost::python;
class_<memory_buffer_wrapper>("memory_buffer", init<
async_service_ptr, blockchain_ptr, transaction_pool_ptr>())
.def("receive", &memory_buffer_wrapper::receive)
.def("set_handles", &memory_buffer_wrapper::set_handles)
.def_readonly("internal_ptr", &memory_buffer_wrapper::membuf)
;
class_<memory_buffer_ptr>("internal_memory_buffer", no_init);
}

0 comments on commit ce2fafb

Please sign in to comment.