Skip to content

Commit

Permalink
Merge pull request #3716
Browse files Browse the repository at this point in the history
  • Loading branch information
luigi1111 committed Jun 27, 2018
2 parents 31f47d7 + 0e4c7d0 commit a844844
Show file tree
Hide file tree
Showing 32 changed files with 757 additions and 490 deletions.
8 changes: 4 additions & 4 deletions src/blockchain_utilities/blockchain_import.cpp
Expand Up @@ -164,7 +164,7 @@ int pop_blocks(cryptonote::core& core, int num_blocks)
return num_blocks; return num_blocks;
} }


int check_flush(cryptonote::core &core, std::list<block_complete_entry> &blocks, bool force) int check_flush(cryptonote::core &core, std::vector<block_complete_entry> &blocks, bool force)
{ {
if (blocks.empty()) if (blocks.empty())
return 0; return 0;
Expand All @@ -176,7 +176,7 @@ int check_flush(cryptonote::core &core, std::list<block_complete_entry> &blocks,
if (!force && new_height % HASH_OF_HASHES_STEP) if (!force && new_height % HASH_OF_HASHES_STEP)
return 0; return 0;


std::list<crypto::hash> hashes; std::vector<crypto::hash> hashes;
for (const auto &b: blocks) for (const auto &b: blocks)
{ {
cryptonote::block block; cryptonote::block block;
Expand Down Expand Up @@ -312,7 +312,7 @@ int import_from_file(cryptonote::core& core, const std::string& import_file_path
MINFO("Reading blockchain from bootstrap file..."); MINFO("Reading blockchain from bootstrap file...");
std::cout << ENDL; std::cout << ENDL;


std::list<block_complete_entry> blocks; std::vector<block_complete_entry> blocks;


// Skip to start_height before we start adding. // Skip to start_height before we start adding.
{ {
Expand Down Expand Up @@ -437,7 +437,7 @@ int import_from_file(cryptonote::core& core, const std::string& import_file_path
{ {
cryptonote::blobdata block; cryptonote::blobdata block;
cryptonote::block_to_blob(bp.block, block); cryptonote::block_to_blob(bp.block, block);
std::list<cryptonote::blobdata> txs; std::vector<cryptonote::blobdata> txs;
for (const auto &tx: bp.txs) for (const auto &tx: bp.txs)
{ {
txs.push_back(cryptonote::blobdata()); txs.push_back(cryptonote::blobdata());
Expand Down
37 changes: 26 additions & 11 deletions src/common/threadpool.cpp
Expand Up @@ -36,16 +36,17 @@
#include "common/util.h" #include "common/util.h"


static __thread int depth = 0; static __thread int depth = 0;
static __thread bool is_leaf = false;


namespace tools namespace tools
{ {
threadpool::threadpool(unsigned int max_threads) : running(true), active(0) { threadpool::threadpool(unsigned int max_threads) : running(true), active(0) {
boost::thread::attributes attrs; boost::thread::attributes attrs;
attrs.set_stack_size(THREAD_STACK_SIZE); attrs.set_stack_size(THREAD_STACK_SIZE);
max = max_threads ? max_threads : tools::get_max_concurrency(); max = max_threads ? max_threads : tools::get_max_concurrency();
unsigned int i = max; size_t i = max ? max - 1 : 0;
while(i--) { while(i--) {
threads.push_back(boost::thread(attrs, boost::bind(&threadpool::run, this))); threads.push_back(boost::thread(attrs, boost::bind(&threadpool::run, this, false)));
} }
} }


Expand All @@ -60,20 +61,25 @@ threadpool::~threadpool() {
} }
} }


void threadpool::submit(waiter *obj, std::function<void()> f) { void threadpool::submit(waiter *obj, std::function<void()> f, bool leaf) {
entry e = {obj, f}; CHECK_AND_ASSERT_THROW_MES(!is_leaf, "A leaf routine is using a thread pool");
boost::unique_lock<boost::mutex> lock(mutex); boost::unique_lock<boost::mutex> lock(mutex);
if ((active == max && !queue.empty()) || depth > 0) { if (!leaf && ((active == max && !queue.empty()) || depth > 0)) {
// if all available threads are already running // if all available threads are already running
// and there's work waiting, just run in current thread // and there's work waiting, just run in current thread
lock.unlock(); lock.unlock();
++depth; ++depth;
is_leaf = leaf;
f(); f();
--depth; --depth;
is_leaf = false;
} else { } else {
if (obj) if (obj)
obj->inc(); obj->inc();
queue.push_back(e); if (leaf)
queue.push_front({obj, f, leaf});
else
queue.push_back({obj, f, leaf});
has_work.notify_one(); has_work.notify_one();
} }
} }
Expand All @@ -91,17 +97,20 @@ threadpool::waiter::~waiter()
} }
try try
{ {
wait(); wait(NULL);
} }
catch (const std::exception &e) catch (const std::exception &e)
{ {
/* ignored */ /* ignored */
} }
} }


void threadpool::waiter::wait() { void threadpool::waiter::wait(threadpool *tpool) {
if (tpool)
tpool->run(true);
boost::unique_lock<boost::mutex> lock(mt); boost::unique_lock<boost::mutex> lock(mt);
while(num) cv.wait(lock); while(num)
cv.wait(lock);
} }


void threadpool::waiter::inc() { void threadpool::waiter::inc() {
Expand All @@ -113,24 +122,30 @@ void threadpool::waiter::dec() {
const boost::unique_lock<boost::mutex> lock(mt); const boost::unique_lock<boost::mutex> lock(mt);
num--; num--;
if (!num) if (!num)
cv.notify_one(); cv.notify_all();
} }


void threadpool::run() { void threadpool::run(bool flush) {
boost::unique_lock<boost::mutex> lock(mutex); boost::unique_lock<boost::mutex> lock(mutex);
while (running) { while (running) {
entry e; entry e;
while(queue.empty() && running) while(queue.empty() && running)
{
if (flush)
return;
has_work.wait(lock); has_work.wait(lock);
}
if (!running) break; if (!running) break;


active++; active++;
e = queue.front(); e = queue.front();
queue.pop_front(); queue.pop_front();
lock.unlock(); lock.unlock();
++depth; ++depth;
is_leaf = e.leaf;
e.f(); e.f();
--depth; --depth;
is_leaf = false;


if (e.wo) if (e.wo)
e.wo->dec(); e.wo->dec();
Expand Down
7 changes: 4 additions & 3 deletions src/common/threadpool.h
Expand Up @@ -59,15 +59,15 @@ class threadpool
public: public:
void inc(); void inc();
void dec(); void dec();
void wait(); //! Wait for a set of tasks to finish. void wait(threadpool *tpool); //! Wait for a set of tasks to finish.
waiter() : num(0){} waiter() : num(0){}
~waiter(); ~waiter();
}; };


// Submit a task to the pool. The waiter pointer may be // Submit a task to the pool. The waiter pointer may be
// NULL if the caller doesn't care to wait for the // NULL if the caller doesn't care to wait for the
// task to finish. // task to finish.
void submit(waiter *waiter, std::function<void()> f); void submit(waiter *waiter, std::function<void()> f, bool leaf = false);


unsigned int get_max_concurrency() const; unsigned int get_max_concurrency() const;


Expand All @@ -78,6 +78,7 @@ class threadpool
typedef struct entry { typedef struct entry {
waiter *wo; waiter *wo;
std::function<void()> f; std::function<void()> f;
bool leaf;
} entry; } entry;
std::deque<entry> queue; std::deque<entry> queue;
boost::condition_variable has_work; boost::condition_variable has_work;
Expand All @@ -86,7 +87,7 @@ class threadpool
unsigned int active; unsigned int active;
unsigned int max; unsigned int max;
bool running; bool running;
void run(); void run(bool flush = false);
}; };


} }
2 changes: 1 addition & 1 deletion src/cryptonote_basic/connection_context.h
Expand Up @@ -52,7 +52,7 @@ namespace cryptonote
}; };


state m_state; state m_state;
std::list<crypto::hash> m_needed_objects; std::vector<crypto::hash> m_needed_objects;
std::unordered_set<crypto::hash> m_requested_objects; std::unordered_set<crypto::hash> m_requested_objects;
uint64_t m_remote_blockchain_height; uint64_t m_remote_blockchain_height;
uint64_t m_last_response_height; uint64_t m_last_response_height;
Expand Down

0 comments on commit a844844

Please sign in to comment.