Skip to content

Commit

Permalink
Add internal locking to block_log #1698
Browse files Browse the repository at this point in the history
  • Loading branch information
mvandeberg committed Oct 24, 2017
1 parent ce0976e commit 2d62c7c
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 9 deletions.
62 changes: 53 additions & 9 deletions libraries/chain/block_log.cpp
Expand Up @@ -2,11 +2,16 @@
#include <fstream>
#include <fc/io/raw.hpp>

#include <boost/thread/mutex.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>

#define LOG_READ (std::ios::in | std::ios::binary)
#define LOG_WRITE (std::ios::out | std::ios::binary | std::ios::app)

namespace steem { namespace chain {

typedef boost::interprocess::scoped_lock< boost::mutex > scoped_lock;

namespace detail {
class block_log_impl {
public:
Expand All @@ -19,6 +24,9 @@ namespace steem { namespace chain {
bool block_write;
bool index_write;

boost::mutex read_mtx;
boost::mutex write_mtx;

inline void check_block_read()
{
try
Expand Down Expand Up @@ -186,6 +194,9 @@ namespace steem { namespace chain {
{
try
{
scoped_lock w_lock( my->write_mtx );
scoped_lock r_lock( my->read_mtx );

my->check_block_write();
my->check_index_write();

Expand All @@ -207,11 +218,23 @@ namespace steem { namespace chain {

void block_log::flush()
{
scoped_lock w_lock( my->write_mtx );
scoped_lock r_lock( my->read_mtx );

my->block_stream.flush();
my->index_stream.flush();
}

std::pair< signed_block, uint64_t > block_log::read_block( uint64_t pos )const
{
scoped_lock w_lock( my->write_mtx );
scoped_lock r_lock( my->read_mtx );
w_lock.release();

return read_block_helper( pos );
}

std::pair< signed_block, uint64_t > block_log::read_block_helper( uint64_t pos )const
{
try
{
Expand All @@ -230,19 +253,32 @@ namespace steem { namespace chain {
{
try
{
optional< signed_block > b;
uint64_t pos = get_block_pos( block_num );
if( pos != npos )
{
b = read_block( pos ).first;
FC_ASSERT( b->block_num() == block_num , "Wrong block was read from block log.", ( "returned", b->block_num() )( "expected", block_num ));
}
return b;
scoped_lock w_lock( my->write_mtx );
scoped_lock r_lock( my->read_mtx );
w_lock.release();

optional< signed_block > b;
uint64_t pos = get_block_pos_helper( block_num );
if( pos != npos )
{
b = read_block_helper( pos ).first;
FC_ASSERT( b->block_num() == block_num , "Wrong block was read from block log.", ( "returned", b->block_num() )( "expected", block_num ));
}
return b;
}
FC_LOG_AND_RETHROW()
}

uint64_t block_log::get_block_pos( uint32_t block_num ) const
{
scoped_lock w_lock( my->write_mtx );
scoped_lock r_lock( my->read_mtx );
w_lock.release();

return get_block_pos_helper( block_num );
}

uint64_t block_log::get_block_pos_helper( uint32_t block_num ) const
{
try
{
Expand All @@ -262,18 +298,26 @@ namespace steem { namespace chain {
{
try
{
scoped_lock w_lock( my->write_mtx );
scoped_lock r_lock( my->read_mtx );
w_lock.release();

my->check_block_read();

uint64_t pos;
my->block_stream.seekg( -sizeof(pos), std::ios::end );
my->block_stream.read( (char*)&pos, sizeof(pos) );
return read_block( pos ).first;
return read_block_helper( pos ).first;
}
FC_LOG_AND_RETHROW()
}

const optional< signed_block >& block_log::head()const
{
scoped_lock w_lock( my->write_mtx );
scoped_lock r_lock( my->read_mtx );
w_lock.release();

return my->head;
}

Expand Down
3 changes: 3 additions & 0 deletions libraries/chain/include/steem/chain/block_log.hpp
Expand Up @@ -59,6 +59,9 @@ namespace steem { namespace chain {
private:
void construct_index();

std::pair< signed_block, uint64_t > read_block_helper( uint64_t file_pos )const;
uint64_t get_block_pos_helper( uint32_t block_num ) const;

std::unique_ptr<detail::block_log_impl> my;
};

Expand Down

0 comments on commit 2d62c7c

Please sign in to comment.