Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

675 reduce chain mem #679

Merged
merged 9 commits into from
Dec 14, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 29 additions & 18 deletions libraries/chain/block_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,16 @@ namespace steemit { namespace chain {

inline void check_index_read()
{
try
{
if( index_write )
{
index_stream.close();
index_stream.open( index_file.generic_string().c_str(), LOG_READ );
index_write = false;
}
}
FC_LOG_AND_RETHROW()
}

inline void check_index_write()
Expand Down Expand Up @@ -132,21 +136,18 @@ namespace steemit { namespace chain {
if( block_pos < index_pos )
{
ilog( "block_pos < index_pos, close and reopen index_stream" );
my->index_stream.close();
fc::remove_all( my->index_file );
my->index_stream.open( my->index_file.generic_string().c_str(), LOG_WRITE );
my->index_write = true;
construct_index( 0 );
construct_index();
}
else if( block_pos > index_pos )
{
construct_index( index_pos );
ilog( "Index is incomplete" );
construct_index();
}
}
else
{
ilog( "Index is empty, rebuild it" );
construct_index( 0 );
ilog( "Index is empty" );
construct_index();
}
}
else if( index_size )
Expand Down Expand Up @@ -202,19 +203,23 @@ namespace steemit { namespace chain {

optional< signed_block > block_log::read_block_by_num( uint32_t block_num )const
{
try
{
optional< signed_block > b;
uint64_t pos = get_block_pos( block_num );
if( ~pos )
if( pos != npos )
b = read_block( pos ).first;
return b;
}
FC_LOG_AND_RETHROW()
}

uint64_t block_log::get_block_pos( uint32_t block_num ) const
{
my->check_index_read();

if( !( my->head && block_num <= protocol::block_header::num_from_id( my->head_id ) ) )
return ~0;
if( !( my->head.valid() && block_num <= protocol::block_header::num_from_id( my->head_id ) && block_num > 0 ) )
return npos;
my->index_stream.seekg( sizeof( uint64_t ) * ( block_num - 1 ) );
uint64_t pos;
my->index_stream.read( (char*)&pos, sizeof( pos ) );
Expand All @@ -236,23 +241,29 @@ namespace steemit { namespace chain {
return my->head;
}

void block_log::construct_index( uint64_t start_pos )
void block_log::construct_index()
{
ilog( "Reconstructing Block Log Index..." );
my->index_stream.close();
fc::remove_all( my->index_file );
my->index_stream.open( my->index_file.generic_string().c_str(), LOG_WRITE );
my->index_write = true;

uint64_t pos = 0;
uint64_t end_pos;
my->check_block_read();
my->check_index_write();

my->block_stream.seekg( -sizeof( uint64_t), std::ios::end );
my->block_stream.read( (char*)&end_pos, sizeof( end_pos ) );
signed_block tmp;

while( start_pos < end_pos )
my->block_stream.seekg( pos );

while( pos < end_pos )
{
my->block_stream.seekg( start_pos );
fc::raw::unpack( my->block_stream, tmp );
start_pos = uint64_t(my->block_stream.tellg()) + 8;
my->index_stream.write( (char*)&start_pos, sizeof( start_pos ) );
my->block_stream.read( (char*)&pos, sizeof( pos ) );
my->index_stream.write( (char*)&pos, sizeof( pos ) );
}
}

} }
64 changes: 31 additions & 33 deletions libraries/chain/database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,6 @@ void database::open( const fc::path& data_dir, const fc::path& shared_mem_dir, u

auto log_head = _block_log.head();

// block_log.head must be in block stats
// If it is not, print warning and exit
if( log_head && head_block_num() )
FC_ASSERT( get< block_stats_object >( log_head->block_num() - 1 ).block_id == log_head->id(),
"Head block of log file is not included in current chain state. log_head: ${log_head}", ("log_head", log_head) );

// Rewind all undo state. This should return us to the state at the last irreversible block.
with_write_lock( [&]()
{
Expand All @@ -129,7 +123,13 @@ void database::open( const fc::path& data_dir, const fc::path& shared_mem_dir, u
});

if( head_block_num() )
_fork_db.start_block( *fetch_block_by_number( head_block_num() ) );
{
auto head_block = _block_log.read_block_by_num( head_block_num() );
// This assertion should be caught and a reindex should occur
FC_ASSERT( head_block.valid() && head_block->id() == head_block_id(), "Chain state does not match block log. Please reindex blockchain." );

_fork_db.start_block( *head_block );
}
}

init_hardforks();
Expand Down Expand Up @@ -161,7 +161,8 @@ void database::reindex( const fc::path& data_dir, const fc::path& shared_mem_dir
skip_witness_schedule_check |
skip_authority_check |
skip_validate | /// no need to validate operations
skip_validate_invariants;
skip_validate_invariants |
skip_block_log;

with_write_lock( [&]()
{
Expand Down Expand Up @@ -224,7 +225,7 @@ void database::close(bool rewind)

bool database::is_known_block( const block_id_type& id )const
{
return _fork_db.is_known_block(id) || find< block_stats_object, by_block_id >( id );
return fetch_block_by_id( id ).valid();
}

/**
Expand All @@ -238,17 +239,28 @@ bool database::is_known_transaction( const transaction_id_type& id )const
return trx_idx.find( id ) != trx_idx.end();
}

block_id_type database::get_block_id_for_num( uint32_t block_num )const
block_id_type database::get_block_id_for_num( uint32_t block_num )const
{
return get< block_stats_object >( block_num - 1 ).block_id;
try
{
auto b = _block_log.read_block_by_num( block_num );
if( b.valid() )
return b->id();

auto results = _fork_db.fetch_block_by_number( block_num );
FC_ASSERT( results.size() == 1 );
return results[0]->data.id();

}
FC_CAPTURE_AND_RETHROW( (block_num) )
}

optional<signed_block> database::fetch_block_by_id( const block_id_type& id )const
{
auto b = _fork_db.fetch_block( id );
if( !b )
{
auto tmp = fetch_block_by_number( protocol::block_header::num_from_id( id ) );
auto tmp = _block_log.read_block_by_num( protocol::block_header::num_from_id( id ) );

if( tmp && tmp->id() == id )
return tmp;
Expand All @@ -264,14 +276,12 @@ optional<signed_block> database::fetch_block_by_number( uint32_t block_num )cons
{
optional< signed_block > b;

const auto* stats = find< block_stats_object >( block_num - 1 );

if( !stats )
return b;
auto results = _fork_db.fetch_block_by_number( block_num );
if( results.size() == 1 )
b = results[0]->data;
else
b = _block_log.read_block_by_num( block_num );

signed_block block;
fc::raw::unpack( stats->packed_block, block );
b = block;
return b;
}

Expand Down Expand Up @@ -2670,7 +2680,6 @@ void database::initialize_indexes()
add_core_index< escrow_index >(*this);
add_core_index< savings_withdraw_index >(*this);
add_core_index< decline_voting_rights_request_index >(*this);
add_core_index< block_stats_index >(*this);

_plugin_index_signal();
}
Expand Down Expand Up @@ -3038,18 +3047,6 @@ void database::_apply_block( const signed_block& next_block )
++_current_trx_in_block;
}

create< block_stats_object >( [&]( block_stats_object& bso )
{
assert( bso.block_num() == next_block_num ); // Probably can be taken out. Sanity check
bso.block_id = next_block_id;
fc::raw::pack( bso.packed_block, next_block );
/*
auto size = fc::raw::pack_size( next_block );
bso.packed_block.resize( size );
fc::datastream<char*> ds( bso.packed_block.data(), size );
fc::raw::pack( ds, next_block );*/
});

update_global_dynamic_data(next_block);
update_signing_witness(signing_witness, next_block);

Expand Down Expand Up @@ -3524,9 +3521,9 @@ void database::update_last_irreversible_block()

commit( dpo.last_irreversible_block_num );

// output to block log based on new last irreverisible block num
if( !( get_node_properties().skip_flags & skip_block_log ) )
{
// output to block log based on new last irreverisible block num
const auto& tmp_head = _block_log.head();
uint64_t log_head_num = 0;

Expand All @@ -3540,6 +3537,7 @@ void database::update_last_irreversible_block()
_block_log.append( *fetch_block_by_number( log_head_num + 1 ) );
log_head_num++;
}

_block_log.flush();
}
}
Expand Down
4 changes: 4 additions & 0 deletions libraries/chain/fork_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ item_ptr fork_database::fetch_block(const block_id_type& id)const

vector<item_ptr> fork_database::fetch_block_by_number(uint32_t num)const
{
try
{
vector<item_ptr> result;
auto itr = _index.get<block_num>().find(num);
while( itr != _index.get<block_num>().end() )
Expand All @@ -171,6 +173,8 @@ vector<item_ptr> fork_database::fetch_block_by_number(uint32_t num)const
++itr;
}
return result;
}
FC_LOG_AND_RETHROW()
}

pair<fork_database::branch_type,fork_database::branch_type>
Expand Down
8 changes: 7 additions & 1 deletion libraries/chain/include/steemit/chain/block_log.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,18 @@ namespace steemit { namespace chain {
void flush();
std::pair< signed_block, uint64_t > read_block( uint64_t file_pos )const;
optional< signed_block > read_block_by_num( uint32_t block_num )const;

/**
* Return offset of block in file, or block_log::npos if it does not exist.
*/
uint64_t get_block_pos( uint32_t block_num ) const;
signed_block read_head()const;
const optional< signed_block >& head()const;

static const uint64_t npos = std::numeric_limits<uint64_t>::max();

private:
void construct_index( uint64_t start_pos );
void construct_index();

std::unique_ptr<detail::block_log_impl> my;
};
Expand Down
34 changes: 0 additions & 34 deletions libraries/chain/include/steemit/chain/block_summary_object.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,41 +38,7 @@ namespace steemit { namespace chain {
allocator< block_summary_object >
> block_summary_index;

class block_stats_object : public object< block_stats_object_type, block_stats_object >
{
public:
template< typename Constructor, typename Allocator >
block_stats_object( Constructor&& c, allocator< Allocator > a )
:packed_block( a )
{
c( *this );
}

id_type id;
block_id_type block_id;
uint64_t pos = -1;
bip::vector< char, allocator< char > > packed_block;

uint64_t block_num()const { return id._id + 1; }
};

struct by_block_id;

typedef multi_index_container<
block_stats_object,
indexed_by<
ordered_unique< tag< by_id >,
member< block_stats_object, block_stats_id_type, &block_stats_object::id > >,
ordered_unique< tag< by_block_id >,
member< block_stats_object, block_id_type, &block_stats_object::block_id > >
>,
allocator< block_stats_object >
> block_stats_index;

} } // steemit::chain

FC_REFLECT( steemit::chain::block_summary_object, (id)(block_id) )
CHAINBASE_SET_INDEX_TYPE( steemit::chain::block_summary_object, steemit::chain::block_summary_index )

FC_REFLECT( steemit::chain::block_stats_object, (id)(block_id)(pos)(packed_block) )
CHAINBASE_SET_INDEX_TYPE( steemit::chain::block_stats_object, steemit::chain::block_stats_index )
2 changes: 1 addition & 1 deletion libraries/chain/steem_evaluator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1490,7 +1490,7 @@ void pow2_evaluator::do_apply( const pow2_operation& o )
{
const auto& work = o.work.get< equihash_pow >();
FC_ASSERT( work.prev_block == db.head_block_id(), "Equihash pow op not for last block" );
auto recent_block_num = db.get< block_stats_object, by_block_id >( work.input.prev_block ).block_num();
auto recent_block_num = protocol::block_header::num_from_id( work.input.prev_block );
FC_ASSERT( recent_block_num > dgp.last_irreversible_block_num,
"Equihash pow done for block older than last irreversible block num" );
FC_ASSERT( work.pow_summary < target_pow, "Insufficient work difficulty. Work: ${w}, Target: ${t}", ("w",work.pow_summary)("t", target_pow) );
Expand Down
18 changes: 15 additions & 3 deletions libraries/plugins/debug_node/debug_node_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,13 +189,25 @@ uint32_t debug_node_api_impl::debug_push_blocks( const std::string& src_filename
for( uint32_t i=0; i<count; i++ )
{
//fc::optional< steemit::chain::signed_block > block = log.read_block( log.get_block_pos( first_block + i ) );
auto result = log.read_block( log.get_block_pos( first_block + i ) );

if( result.second == ~0 )
uint64_t block_pos = log.get_block_pos( first_block + i );
if( block_pos == steemit::chain::block_log::npos )
{
wlog( "Block database ${fn} only contained ${i} of ${n} requested blocks", ("i", i)("n", count)("fn", src_filename) );
return i;
}

decltype( log.read_block(0) ) result;

try
{
result = log.read_block( block_pos );
}
catch( const fc::exception& e )
{
elog( "Could not read block ${i} of ${n}", ("i", i)("n", count) );
continue;
}

try
{
db->push_block( result.first, skip_flags );
Expand Down
4 changes: 3 additions & 1 deletion tests/tests/block_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ BOOST_AUTO_TEST_CASE( generate_empty_blocks )
uint32_t cutoff_height = db.get_dynamic_global_properties().last_irreversible_block_num;
if( cutoff_height >= 200 )
{
cutoff_block = *(db.fetch_block_by_number( cutoff_height ));
auto block = db.fetch_block_by_number( cutoff_height );
BOOST_REQUIRE( block.valid() );
cutoff_block = *block;
break;
}
}
Expand Down
2 changes: 1 addition & 1 deletion tests/tests/operation_time_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2558,7 +2558,7 @@ BOOST_AUTO_TEST_CASE( sbd_stability )
auto& gpo = db.get_dynamic_global_properties();
BOOST_REQUIRE( gpo.sbd_print_rate >= last_print_rate );
last_print_rate = gpo.sbd_print_rate;
db_plugin->debug_generate_blocks( debug_key, 1, ~0 );
db_plugin->debug_generate_blocks( debug_key, 1, database::skip_witness_signature );
validate_database();
}

Expand Down