Skip to content

Commit

Permalink
const fix for storage/rocksdb (first half) (percona#478) (percona#478)
Browse files Browse the repository at this point in the history
Summary:
First harf of the const fix for all of the files under storage/rocksdb folder.

This pull request includes the changes from the previous two requests that are canceled.
Closes facebook/mysql-5.6#478

Differential Revision: D4341822

Pulled By: gunnarku
  • Loading branch information
Sidi Fu authored and inikep committed Jan 28, 2022
1 parent d96b987 commit 7a7c378
Show file tree
Hide file tree
Showing 11 changed files with 108 additions and 99 deletions.
9 changes: 5 additions & 4 deletions storage/rocksdb/rdb_mutex_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ Rdb_cond_var::~Rdb_cond_var() {
mysql_cond_destroy(&m_cond);
}

Status Rdb_cond_var::Wait(std::shared_ptr<TransactionDBMutex> mutex_arg) {
Status Rdb_cond_var::Wait(const std::shared_ptr<TransactionDBMutex> mutex_arg) {
return WaitFor(mutex_arg, BIG_TIMEOUT);
}

Expand All @@ -65,7 +65,7 @@ Status Rdb_cond_var::Wait(std::shared_ptr<TransactionDBMutex> mutex_arg) {
*/

Status
Rdb_cond_var::WaitFor(std::shared_ptr<TransactionDBMutex> mutex_arg,
Rdb_cond_var::WaitFor(const std::shared_ptr<TransactionDBMutex> mutex_arg,
int64_t timeout_micros)
{
auto *mutex_obj= reinterpret_cast<Rdb_mutex*>(mutex_arg.get());
Expand Down Expand Up @@ -200,7 +200,7 @@ Status Rdb_mutex::TryLockFor(int64_t timeout_time MY_ATTRIBUTE((__unused__)))


#ifndef STANDALONE_UNITTEST
void Rdb_mutex::set_unlock_action(PSI_stage_info *old_stage_arg)
void Rdb_mutex::set_unlock_action(const PSI_stage_info* const old_stage_arg)
{
DBUG_ASSERT(old_stage_arg != nullptr);

Expand All @@ -217,7 +217,8 @@ void Rdb_mutex::UnLock() {
#ifndef STANDALONE_UNITTEST
if (m_old_stage_info.count(current_thd) > 0)
{
std::shared_ptr<PSI_stage_info> old_stage = m_old_stage_info[current_thd];
const std::shared_ptr<PSI_stage_info> old_stage =
m_old_stage_info[current_thd];
m_old_stage_info.erase(current_thd);
/* The following will call mysql_mutex_unlock */
THD_EXIT_COND(current_thd, old_stage.get());
Expand Down
6 changes: 3 additions & 3 deletions storage/rocksdb/rdb_mutex_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class Rdb_mutex: public rocksdb::TransactionDBMutex {
friend class Rdb_cond_var;

#ifndef STANDALONE_UNITTEST
void set_unlock_action(PSI_stage_info *old_stage_arg);
void set_unlock_action(const PSI_stage_info* const old_stage_arg);
std::unordered_map<THD*, std::shared_ptr<PSI_stage_info>> m_old_stage_info;
#endif
};
Expand All @@ -87,7 +87,7 @@ class Rdb_cond_var: public rocksdb::TransactionDBCondVar {
// Returns non-OK if TransactionDB should stop waiting and fail the operation.
// May return OK spuriously even if not notified.
virtual rocksdb::Status
Wait(std::shared_ptr<rocksdb::TransactionDBMutex> mutex) override;
Wait(const std::shared_ptr<rocksdb::TransactionDBMutex> mutex) override;

// Block current thread until condition variable is notifiesd by a call to
// Notify() or NotifyAll(), or if the timeout is reached.
Expand All @@ -102,7 +102,7 @@ class Rdb_cond_var: public rocksdb::TransactionDBCondVar {
// fail the operation.
// May return OK spuriously even if not notified.
virtual rocksdb::Status
WaitFor(std::shared_ptr<rocksdb::TransactionDBMutex> mutex,
WaitFor(const std::shared_ptr<rocksdb::TransactionDBMutex> mutex,
int64_t timeout_time) override;

// If any threads are waiting on *this, unblock at least one of the
Expand Down
12 changes: 6 additions & 6 deletions storage/rocksdb/rdb_perf_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ std::string rdb_pc_stat_types[]=
idx++; \
} while (0)

static void harvest_diffs(Rdb_atomic_perf_counters *counters)
static void harvest_diffs(Rdb_atomic_perf_counters * const counters)
{
// (C) These should be in the same order as the PC enum
size_t idx= 0;
Expand Down Expand Up @@ -151,7 +151,7 @@ static void harvest_diffs(Rdb_atomic_perf_counters *counters)

static Rdb_atomic_perf_counters rdb_global_perf_counters;

void rdb_get_global_perf_counters(Rdb_perf_counters *counters)
void rdb_get_global_perf_counters(Rdb_perf_counters* const counters)
{
DBUG_ASSERT(counters != nullptr);

Expand All @@ -165,9 +165,9 @@ void Rdb_perf_counters::load(const Rdb_atomic_perf_counters &atomic_counters)
}
}

bool Rdb_io_perf::start(uint32_t perf_context_level)
bool Rdb_io_perf::start(const uint32_t perf_context_level)
{
rocksdb::PerfLevel perf_level=
const rocksdb::PerfLevel perf_level=
static_cast<rocksdb::PerfLevel>(perf_context_level);

if (rocksdb::GetPerfLevel() != perf_level)
Expand All @@ -185,9 +185,9 @@ bool Rdb_io_perf::start(uint32_t perf_context_level)
return true;
}

void Rdb_io_perf::end_and_record(uint32_t perf_context_level)
void Rdb_io_perf::end_and_record(const uint32_t perf_context_level)
{
rocksdb::PerfLevel perf_level=
const rocksdb::PerfLevel perf_level=
static_cast<rocksdb::PerfLevel>(perf_context_level);

if (perf_level == rocksdb::kDisable)
Expand Down
10 changes: 5 additions & 5 deletions storage/rocksdb/rdb_perf_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,9 @@ class Rdb_io_perf
Rdb_io_perf(const Rdb_io_perf&) = delete;
Rdb_io_perf& operator=(const Rdb_io_perf&) = delete;

void init(Rdb_atomic_perf_counters *atomic_counters,
my_io_perf_atomic_t *shared_io_perf_read,
ha_statistics *stats)
void init(Rdb_atomic_perf_counters* const atomic_counters,
my_io_perf_atomic_t* const shared_io_perf_read,
ha_statistics* const stats)
{
DBUG_ASSERT(atomic_counters != nullptr);
DBUG_ASSERT(shared_io_perf_read != nullptr);
Expand All @@ -129,8 +129,8 @@ class Rdb_io_perf
m_stats= stats;
}

bool start(uint32_t perf_context_level);
void end_and_record(uint32_t perf_context_level);
bool start(const uint32_t perf_context_level);
void end_and_record(const uint32_t perf_context_level);

explicit Rdb_io_perf() : m_atomic_counters(nullptr),
m_shared_io_perf_read(nullptr),
Expand Down
51 changes: 26 additions & 25 deletions storage/rocksdb/rdb_sst_info.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@

namespace myrocks {

Rdb_sst_file::Rdb_sst_file(rocksdb::DB* db, rocksdb::ColumnFamilyHandle* cf,
Rdb_sst_file::Rdb_sst_file(rocksdb::DB* const db,
rocksdb::ColumnFamilyHandle* const cf,
const rocksdb::DBOptions& db_options,
const std::string& name, bool tracing) :
const std::string& name, const bool tracing) :
m_db(db),
m_cf(cf),
m_db_options(db_options),
Expand Down Expand Up @@ -78,8 +79,8 @@ rocksdb::Status Rdb_sst_file::open()
// Create an sst file writer with the current options and comparator
const rocksdb::Comparator* comparator= m_cf->GetComparator();

rocksdb::EnvOptions env_options(m_db_options);
rocksdb::Options options(m_db_options, cf_descr.options);
const rocksdb::EnvOptions env_options(m_db_options);
const rocksdb::Options options(m_db_options, cf_descr.options);

m_sst_file_writer=
new rocksdb::SstFileWriter(env_options, options, comparator, m_cf);
Expand Down Expand Up @@ -136,7 +137,7 @@ rocksdb::Status Rdb_sst_file::commit()
DBUG_ASSERT(m_sst_file_writer != nullptr);

rocksdb::Status s;
rocksdb::ExternalSstFileInfo fileinfo;
rocksdb::ExternalSstFileInfo fileinfo; ///Finish may should be modified

// Close out the sst file
s= m_sst_file_writer->Finish(&fileinfo);
Expand Down Expand Up @@ -185,11 +186,11 @@ rocksdb::Status Rdb_sst_file::commit()
return s;
}

Rdb_sst_info::Rdb_sst_info(rocksdb::DB* db, const std::string& tablename,
Rdb_sst_info::Rdb_sst_info(rocksdb::DB* const db, const std::string& tablename,
const std::string& indexname,
rocksdb::ColumnFamilyHandle* cf,
rocksdb::ColumnFamilyHandle* const cf,
const rocksdb::DBOptions& db_options,
bool tracing) :
const bool& tracing) :
m_db(db),
m_cf(cf),
m_db_options(db_options),
Expand Down Expand Up @@ -224,7 +225,7 @@ Rdb_sst_info::Rdb_sst_info(rocksdb::DB* db, const std::string& tablename,
}

rocksdb::ColumnFamilyDescriptor cf_descr;
rocksdb::Status s= m_cf->GetDescriptor(&cf_descr);
const rocksdb::Status s= m_cf->GetDescriptor(&cf_descr);
if (!s.ok())
{
// Default size if we can't get the cf's target size
Expand All @@ -250,13 +251,13 @@ int Rdb_sst_info::open_new_sst_file()
DBUG_ASSERT(m_sst_file == nullptr);

// Create the new sst file's name
std::string name= m_prefix + std::to_string(m_sst_count++) + m_suffix;
const std::string name= m_prefix + std::to_string(m_sst_count++) + m_suffix;

// Create the new sst file object
m_sst_file= new Rdb_sst_file(m_db, m_cf, m_db_options, name, m_tracing);

// Open the sst file
rocksdb::Status s= m_sst_file->open();
const rocksdb::Status s= m_sst_file->open();
if (!s.ok())
{
set_error_msg(s.ToString());
Expand Down Expand Up @@ -286,14 +287,14 @@ void Rdb_sst_info::close_curr_sst_file()

{
// Add this finished sst file to the queue (while holding mutex)
std::lock_guard<std::mutex> guard(m_mutex);
const std::lock_guard<std::mutex> guard(m_mutex);
m_queue.push(m_sst_file);
}

// Notify the background thread that there is a new entry in the queue
m_cond.notify_one();
#else
rocksdb::Status s= m_sst_file->commit();
const rocksdb::Status s= m_sst_file->commit();
if (!s.ok())
{
set_error_msg(s.ToString());
Expand Down Expand Up @@ -338,7 +339,7 @@ int Rdb_sst_info::put(const rocksdb::Slice& key,
DBUG_ASSERT(m_sst_file != nullptr);

// Add the key/value to the current sst file
rocksdb::Status s= m_sst_file->put(key, value);
const rocksdb::Status s= m_sst_file->put(key, value);
if (!s.ok())
{
set_error_msg(s.ToString());
Expand Down Expand Up @@ -387,7 +388,7 @@ void Rdb_sst_info::set_error_msg(const std::string& msg)
// Both the foreground and background threads can set the error message
// so lock the mutex to protect it. We only want the first error that
// we encounter.
std::lock_guard<std::mutex> guard(m_mutex);
const std::lock_guard<std::mutex> guard(m_mutex);
#endif
my_printf_error(ER_UNKNOWN_ERROR, "bulk load error: %s", MYF(0), msg.c_str());
if (m_error_msg.empty())
Expand All @@ -405,7 +406,7 @@ void Rdb_sst_info::thread_fcn(void* object)

void Rdb_sst_info::run_thread()
{
std::unique_lock<std::mutex> lk(m_mutex);
const std::unique_lock<std::mutex> lk(m_mutex);

do
{
Expand All @@ -415,14 +416,14 @@ void Rdb_sst_info::run_thread()
// Inner loop pulls off all Rdb_sst_file entries and processes them
while (!m_queue.empty())
{
Rdb_sst_file* sst_file= m_queue.front();
const Rdb_sst_file* const sst_file= m_queue.front();
m_queue.pop();

// Release the lock - we don't want to hold it while committing the file
lk.unlock();

// Close out the sst file and add it to the database
rocksdb::Status s= sst_file->commit();
const rocksdb::Status s= sst_file->commit();
if (!s.ok())
{
set_error_msg(s.ToString());
Expand All @@ -442,10 +443,10 @@ void Rdb_sst_info::run_thread()
}
#endif

void Rdb_sst_info::init(rocksdb::DB* db)
void Rdb_sst_info::init(const rocksdb::DB* const db)
{
std::string path= db->GetName() + FN_DIRSEP;
struct st_my_dir* dir_info= my_dir(path.c_str(), MYF(MY_DONT_SORT));
const std::string path= db->GetName() + FN_DIRSEP;
struct st_my_dir* const dir_info= my_dir(path.c_str(), MYF(MY_DONT_SORT));

// Access the directory
if (dir_info == nullptr)
Expand All @@ -457,16 +458,16 @@ void Rdb_sst_info::init(rocksdb::DB* db)
}

// Scan through the files in the directory
struct fileinfo* file_info= dir_info->dir_entry;
const struct fileinfo* file_info= dir_info->dir_entry;
for (uint ii= 0; ii < dir_info->number_off_files; ii++, file_info++)
{
// find any files ending with m_suffix ...
std::string name= file_info->name;
size_t pos= name.find(m_suffix);
const std::string name= file_info->name;
const size_t pos= name.find(m_suffix);
if (pos != std::string::npos && name.size() - pos == m_suffix.size())
{
// ... and remove them
std::string fullname= path + name;
const std::string fullname= path + name;
my_delete(fullname.c_str(), MYF(0));
}
}
Expand Down
58 changes: 30 additions & 28 deletions storage/rocksdb/rdb_sst_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,20 @@ class Rdb_sst_file {
Rdb_sst_file(const Rdb_sst_file& p)= delete;
Rdb_sst_file& operator=(const Rdb_sst_file& p)= delete;

rocksdb::DB* m_db;
rocksdb::ColumnFamilyHandle* m_cf;
const rocksdb::DBOptions& m_db_options;
rocksdb::SstFileWriter* m_sst_file_writer;
std::string m_name;
bool m_tracing;
rocksdb::DB* const m_db;
rocksdb::ColumnFamilyHandle* const m_cf;
const rocksdb::DBOptions& m_db_options;
rocksdb::SstFileWriter* m_sst_file_writer;
const std::string m_name;
const bool m_tracing;

std::string generateKey(const std::string& key);

public:
Rdb_sst_file(rocksdb::DB* db, rocksdb::ColumnFamilyHandle* cf,
Rdb_sst_file(rocksdb::DB* const db,
rocksdb::ColumnFamilyHandle* const cf,
const rocksdb::DBOptions& db_options, const std::string& name,
bool tracing);
const bool tracing);
~Rdb_sst_file();

rocksdb::Status open();
Expand All @@ -62,24 +63,24 @@ class Rdb_sst_info {
Rdb_sst_info(const Rdb_sst_info& p)= delete;
Rdb_sst_info& operator=(const Rdb_sst_info& p)= delete;

rocksdb::DB* m_db;
rocksdb::ColumnFamilyHandle* m_cf;
const rocksdb::DBOptions& m_db_options;
uint64_t m_curr_size;
uint64_t m_max_size;
uint m_sst_count;
std::string m_error_msg;
std::string m_prefix;
static std::string m_suffix;
rocksdb::DB* const m_db;
rocksdb::ColumnFamilyHandle* const m_cf;
const rocksdb::DBOptions& m_db_options;
uint64_t m_curr_size;
uint64_t m_max_size;
uint m_sst_count;
std::string m_error_msg;
std::string m_prefix;
static std::string m_suffix;
#if defined(RDB_SST_INFO_USE_THREAD)
std::queue<Rdb_sst_file*> m_queue;
std::mutex m_mutex;
std::condition_variable m_cond;
std::thread* m_thread;
bool m_finished;
std::queue<Rdb_sst_file*> m_queue;
std::mutex m_mutex;
std::condition_variable m_cond;
std::thread* m_thread;
bool m_finished;
#endif
Rdb_sst_file* m_sst_file;
bool m_tracing;
Rdb_sst_file* m_sst_file;
const bool m_tracing;

int open_new_sst_file();
void close_curr_sst_file();
Expand All @@ -92,17 +93,18 @@ class Rdb_sst_info {
#endif

public:
Rdb_sst_info(rocksdb::DB* db, const std::string& tablename,
const std::string& indexname, rocksdb::ColumnFamilyHandle* cf,
const rocksdb::DBOptions& db_options, bool tracing);
Rdb_sst_info(rocksdb::DB* const db, const std::string& tablename,
const std::string& indexname,
rocksdb::ColumnFamilyHandle* const cf,
const rocksdb::DBOptions& db_options, const bool &tracing);
~Rdb_sst_info();

int put(const rocksdb::Slice& key, const rocksdb::Slice& value);
int commit();

const std::string& error_message() const { return m_error_msg; }

static void init(rocksdb::DB* db);
static void init(const rocksdb::DB* const db);
};

} // namespace myrocks
Loading

0 comments on commit 7a7c378

Please sign in to comment.