diff --git a/mysql-test/suite/rocksdb/r/rocksdb_locks.result b/mysql-test/suite/rocksdb/r/rocksdb_locks.result new file mode 100644 index 000000000000..6ff1afc1a4b8 --- /dev/null +++ b/mysql-test/suite/rocksdb/r/rocksdb_locks.result @@ -0,0 +1,20 @@ +create table t1 (pk int not null primary key) engine=rocksdb; +insert into t1 values (1),(2),(3); +set autocommit=0; +begin; +select * from t1 where pk=1 for update; +pk +1 +### Connection con1 +set @@rocksdb_lock_wait_timeout=500; +set autocommit=0; +begin; +set session debug= "+d,myrocks_simulate_lock_timeout1"; +select * from t1 where pk=1 for update;; +### Connection default +rollback; +### Connection con1 +ERROR HY000: Lock wait timeout exceeded; try restarting transaction +set session debug= "-d,myrocks_simulate_lock_timeout1"; +set autocommit=1; +drop table t1; diff --git a/mysql-test/suite/rocksdb/r/select_for_update.result b/mysql-test/suite/rocksdb/r/select_for_update.result index f5954489d91b..2890941a1b93 100644 --- a/mysql-test/suite/rocksdb/r/select_for_update.result +++ b/mysql-test/suite/rocksdb/r/select_for_update.result @@ -1,5 +1,5 @@ DROP TABLE IF EXISTS t1; -CREATE TABLE t1 (a , b ) ENGINE= ; +CREATE TABLE t1 (a INT, b CHAR(8), pk INT AUTO_INCREMENT PRIMARY KEY) ENGINE=rocksdb; INSERT INTO t1 (a,b) VALUES (1,'a'),(2,'b'),(3,'a'); connect con1,localhost,root,,; BEGIN; diff --git a/mysql-test/suite/rocksdb/r/select_lock_in_share_mode.result b/mysql-test/suite/rocksdb/r/select_lock_in_share_mode.result index 3d668d721aee..b073b8871154 100644 --- a/mysql-test/suite/rocksdb/r/select_lock_in_share_mode.result +++ b/mysql-test/suite/rocksdb/r/select_lock_in_share_mode.result @@ -1,5 +1,5 @@ DROP TABLE IF EXISTS t1; -CREATE TABLE t1 (a , b ) ENGINE= ; +CREATE TABLE t1 (a INT, b CHAR(8), pk INT AUTO_INCREMENT PRIMARY KEY) ENGINE=rocksdb; INSERT INTO t1 (a,b) VALUES (1,'a'),(2,'b'),(3,'a'); connect con1,localhost,root,,; BEGIN; diff --git a/mysql-test/suite/rocksdb/t/disabled.def b/mysql-test/suite/rocksdb/t/disabled.def index 7884f6be012f..4406b989b2b9 100644 --- a/mysql-test/suite/rocksdb/t/disabled.def +++ b/mysql-test/suite/rocksdb/t/disabled.def @@ -1,8 +1,6 @@ type_blob_indexes : MDEV-4097 (Indexes on text/blob fields are not allowed) type_text_indexes : MDEV-4097 (Indexes on text/blob fields are not allowed) cons_snapshot_serializable : Consistent read does not work on serializable -select_for_update : Not supported? -select_lock_in_share_mode : Not supported? level_read_uncommitted : Not supported? level_serializable: Not supported? level_repeatable_read: Can't seen own changes diff --git a/mysql-test/suite/rocksdb/t/rocksdb_locks.test b/mysql-test/suite/rocksdb/t/rocksdb_locks.test new file mode 100644 index 000000000000..7bef98b3c8a7 --- /dev/null +++ b/mysql-test/suite/rocksdb/t/rocksdb_locks.test @@ -0,0 +1,48 @@ +# +# MyRocks-specific tests for locking +# +--source include/have_debug.inc + +create table t1 (pk int not null primary key) engine=rocksdb; + +insert into t1 values (1),(2),(3); + +set autocommit=0; +begin; +select * from t1 where pk=1 for update; + +--connect (con1,localhost,root,,) +--connection con1 +--echo ### Connection con1 +let $ID= `select connection_id()`; +set @@rocksdb_lock_wait_timeout=500; +set autocommit=0; +begin; +set session debug= "+d,myrocks_simulate_lock_timeout1"; +--send select * from t1 where pk=1 for update; + +--connection default +--echo ### Connection default + +let $wait_condition= select 1 from INFORMATION_SCHEMA.PROCESSLIST + where ID = $ID and STATE = "Waiting for row lock"; +--source include/wait_condition.inc +## Waiting for row lock +## select connection_id(); +## select state='Waiting for row lock' from information_schema.processlist where id=2; + +rollback; + +--connection con1 +--echo ### Connection con1 + +--error ER_LOCK_WAIT_TIMEOUT +--reap +set session debug= "-d,myrocks_simulate_lock_timeout1"; + +--disconnect con1 +--connection default + +set autocommit=1; +drop table t1; + diff --git a/storage/rocksdb/ha_rocksdb.cc b/storage/rocksdb/ha_rocksdb.cc index 4673e4c60895..dcfc96aa0d6d 100644 --- a/storage/rocksdb/ha_rocksdb.cc +++ b/storage/rocksdb/ha_rocksdb.cc @@ -1015,7 +1015,8 @@ class Rdb_transaction max_row_locks= max_row_locks_arg; } - Row_lock *get_lock(const uchar* key, size_t keylen, bool *timed_out) + Row_lock *get_lock(const uchar* key, size_t keylen, bool is_write_lock, + bool *timed_out) { Row_lock *lock; if (trx_locks.elements() > max_row_locks) @@ -1023,7 +1024,8 @@ class Rdb_transaction *timed_out= false; return NULL; } - if (!(lock= row_locks.get_lock(pins, key, keylen, timeout_sec))) + if (!(lock= row_locks.get_lock(pins, key, keylen, timeout_sec, + is_write_lock))) { *timed_out= true; return NULL; @@ -2381,6 +2383,8 @@ int ha_rocksdb::open(const char *name, int mode, uint test_if_locked) "Attempt to open a table that is not present in RocksDB-SE data dictionary"); DBUG_RETURN(HA_ERR_INTERNAL_ERROR); } + lock_rows= false; + lock_for_write= false; key_descr= tbl_def->key_descr; pk_descr= key_descr[table->s->primary_key]; @@ -3557,8 +3561,11 @@ int ha_rocksdb::get_row_by_rowid(uchar *buf, const char *rowid, uint rowid_size) bool timed_out; if (lock_rows) { - if (!(lock= trx->get_lock((const uchar*)rowid, rowid_size, &timed_out))) + if (!(lock= trx->get_lock((const uchar*)rowid, rowid_size, lock_for_write, + &timed_out))) + { DBUG_RETURN(return_lock_failure(timed_out)); + } } rocksdb::ReadOptions options; @@ -3849,7 +3856,7 @@ int ha_rocksdb::write_row(uchar *buf) { DBUG_ENTER("ha_rocksdb::write_row"); DBUG_ASSERT(buf == table->record[0]); - DBUG_ASSERT(lock_rows); + DBUG_ASSERT(lock_rows && lock_for_write); io_perf_start(); @@ -4006,7 +4013,7 @@ int ha_rocksdb::update_write_row(const uchar *old_data, */ bool timed_out; Row_lock *lock= NULL; // init to shut up the compiler - if (!(lock= trx->get_lock(new_key_packed_tuple, new_packed_size, + if (!(lock= trx->get_lock(new_key_packed_tuple, new_packed_size, true, &timed_out))) { /* @@ -4314,7 +4321,8 @@ int ha_rocksdb::rnd_next_with_direction(uchar *buf, bool move_forward) */ Row_lock *lock; bool timed_out; - if (!(lock= trx->get_lock((const uchar*)key.data(), key.size(), &timed_out))) + if (!(lock= trx->get_lock((const uchar*)key.data(), key.size(), + lock_for_write, &timed_out))) { io_perf_end_and_record(); DBUG_RETURN(return_lock_failure(timed_out)); @@ -4598,7 +4606,7 @@ void ha_rocksdb::calc_updated_indexes() int ha_rocksdb::update_row(const uchar *old_data, uchar *new_data) { DBUG_ENTER("ha_rocksdb::update_row"); - DBUG_ASSERT(lock_rows); + DBUG_ASSERT(lock_rows && lock_for_write); /* old_data points to record we're updating. It is the same as the record we've just read (for multi-table UPDATE, too, because SQL layer will make @@ -4626,6 +4634,26 @@ THR_LOCK_DATA **ha_rocksdb::store_lock(THD *thd, enum thr_lock_type lock_type) { DBUG_ENTER("ha_rocksdb::store_lock"); + + /* First, make a decision about MyRocks's internal locking */ + if (lock_type >= TL_WRITE_ALLOW_WRITE) + { + lock_rows= true; + lock_for_write=true; + } + else if (lock_type == TL_READ_WITH_SHARED_LOCKS) + { + lock_rows= true; + lock_for_write= false; + } + else + { + lock_rows= false; + lock_for_write= false; + } + + /* Then, tell the SQL layer what kind of locking it should use: */ + if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK) { /* @@ -4687,7 +4715,6 @@ int ha_rocksdb::external_lock(THD *thd, int lock_type) trx->snapshot= NULL; } } - lock_rows= FALSE; } else { @@ -4709,8 +4736,12 @@ int ha_rocksdb::external_lock(THD *thd, int lock_type) /* SQL layer signals us to take a write lock. It does so when starting DML statement. We should put locks on the rows we're reading. + + Note: sometimes, external_lock() can be called without a prior + ::store_lock call. That's why we need to set lock_* members here, too. */ - lock_rows= TRUE; + lock_rows= true; + lock_for_write= true; } trans_register_ha(thd, FALSE, rocksdb_hton); diff --git a/storage/rocksdb/ha_rocksdb.h b/storage/rocksdb/ha_rocksdb.h index 465b5d84ece3..e1aee047b102 100644 --- a/storage/rocksdb/ha_rocksdb.h +++ b/storage/rocksdb/ha_rocksdb.h @@ -209,6 +209,12 @@ class ha_rocksdb: public handler /* If TRUE, reads should place locks on rows */ bool lock_rows; + /* + When lock_rows==true, specifies what kind of locks should be placed + (true means write locks, false means read locks) + */ + bool lock_for_write; + /* Lock that we've placed on the row we've just read. */ Row_lock *last_row_lock; diff --git a/storage/rocksdb/rdb_locks.cc b/storage/rocksdb/rdb_locks.cc index 720904326f68..75808f846986 100644 --- a/storage/rocksdb/rdb_locks.cc +++ b/storage/rocksdb/rdb_locks.cc @@ -24,22 +24,129 @@ #include "rdb_locks.h" + +typedef struct +{ + /* + How many times the owner has acquired the lock + (0 means it's an invalid object and there is really no lock + */ + uint count; + + /* Data identifying the owner thread. */ + void *owner_info; + + bool is_valid() { return count!=0; } +} LOCK_OWNER_INFO; + + +/* + A row lock that one gets from LockTable. + + @note + - the structure is stored in LF_HASH, which will copy a part of + structure with memcpy(). See LockTable::init(). + - Use of offsetof() to convert between Row_lock_impl and its + write_handle/read_handle is another the structure is a POD. +*/ + +class Row_lock_impl +{ + friend class Row_lock; +public: + /* Read and write handles */ + Row_lock write_handle; + Row_lock read_handle; + + Row_lock* get_lock_handle(bool is_write_lock) + { + return is_write_lock? &write_handle : &read_handle; + } + + /* + MAX_READ_LOCKS is an internal parameter, but test_rwlocks.cc also defines + and uses it. + */ + enum { MAX_READ_LOCKS=10 }; + + char *rowkey; /* The key this lock is for */ + int len; /* length of the rowkey */ + + /* TRUE - this row_lock is being deleted */ + bool deleted; + + /* + How many are waiting for the lock (for whatever reason, some may want + a read lock and some may want a write lock) + */ + int waiters; + + /* Write lock, if any */ + LOCK_OWNER_INFO write_lock; + + /* + Read locks, if any. All the locks are at the beginning of the array, the + first element with .count==0 marks the end of the valid data. + */ + LOCK_OWNER_INFO read_locks[MAX_READ_LOCKS]; + + inline bool have_write_lock() { return write_lock.count != 0; } + inline bool have_read_locks() { return read_locks[0].count != 0; } + + bool have_one_read_lock(void *pins) + { + return (read_locks[0].owner_info == pins) && + (read_locks[1].count==0); + } + + /* + One must hold this mutex + - when marking lock as busy or free + - when adding/removing himself from waiters + the mutex is also associated with the condition when waiting for the lock. + */ + mysql_mutex_t mutex; + + /* + Use this condition to wait for the "row lock is available for locking" + condition. That is, those who change the condition so that some locks that + were not possible before become possible, will do mysql_cond_broadcast() + so that all waiters can check if they can put their desired locks. + */ + mysql_cond_t cond; +}; + + +Row_lock_impl* Row_lock::get_impl(bool *is_write_arg) +{ + *is_write_arg= is_write_handle; + char *ptr; + + if (is_write_handle) + ptr= ((char*)this) - offsetof(Row_lock_impl, write_handle); + else + ptr= ((char*)this) - offsetof(Row_lock_impl, read_handle); + + return (Row_lock_impl*)ptr; +} + + static uchar* get_row_lock_hash_key(const uchar *entry, size_t* key_len, my_bool) { - Row_lock *rlock= (Row_lock*)entry; + Row_lock_impl *rlock= (Row_lock_impl*)entry; *key_len= rlock->len; return (uchar*) rlock->rowkey; } /** - Row_lock constructor + Row_lock_impl constructor It is called from lf_hash and takes a pointer to an LF_SLIST instance. - Row_lock is located at arg+sizeof(LF_SLIST) + Row_lock_impl is located at arg+sizeof(LF_SLIST) */ static void rowlock_init(uchar *arg) { - Row_lock *rc= (Row_lock*)(arg+LF_HASH_OVERHEAD); + Row_lock_impl *rc= (Row_lock_impl*)(arg+LF_HASH_OVERHEAD); DBUG_ENTER("rowlock_init"); memset(rc, 0, sizeof(*rc)); @@ -53,14 +160,14 @@ static void rowlock_init(uchar *arg) /** - Row_lock destructor + Row_lock_impl destructor It is called from lf_hash and takes a pointer to an LF_SLIST instance. - Row_lock is located at arg+sizeof(LF_SLIST) + Row_lock_impl is located at arg+sizeof(LF_SLIST) */ static void rowlock_destroy(uchar *arg) { - Row_lock *rc= (Row_lock*)(arg+LF_HASH_OVERHEAD); + Row_lock_impl *rc= (Row_lock_impl*)(arg+LF_HASH_OVERHEAD); DBUG_ENTER("rowlock_destroy"); mysql_mutex_destroy(&rc->mutex); @@ -78,7 +185,7 @@ static void rowlock_destroy(uchar *arg) void LockTable::init(lf_key_comparison_func_t key_cmp_func, lf_hashfunc_t hashfunc) { - lf_hash_init(&lf_hash, sizeof(Row_lock), LF_HASH_UNIQUE, 0 /* key offset */, + lf_hash_init(&lf_hash, sizeof(Row_lock_impl), LF_HASH_UNIQUE, 0 /* key offset */, 0 /*key_len*/, get_row_lock_hash_key /*get_hash_key*/, NULL /*charset*/); @@ -88,7 +195,7 @@ void LockTable::init(lf_key_comparison_func_t key_cmp_func, lf_hash.key_comparator= key_cmp_func; lf_hash.hashfunc= hashfunc; - lf_hash.element_size= offsetof(Row_lock, mutex); + lf_hash.element_size= offsetof(Row_lock_impl, mutex); } @@ -103,38 +210,47 @@ void LockTable::cleanup() /* - Get a lock for given row + Get a lock for given row. The lock is either a read lock or a write lock @param pins Pins for this thread as returned by LockTable::get_pins(). @param key Row key @param keylen Length of the row key, in bytes. @param timeout_sec Wait at most this many seconds. + @param write_lock TRUE <=> Get a write (exclusive) lock + FALSE<=> Get a read (shared) lock + + @detail @return pointer Pointer to the obtained lock NULL Failed to acquire the lock (timeout or out-of-memory error). - @note The code is based on wt_thd_will_wait_for() in mysys/waiting_threads.c */ Row_lock* LockTable::get_lock(LF_PINS *pins, const uchar* key, size_t keylen, - int timeout_sec) + int timeout_sec, bool is_write_lock) { - Row_lock *found_lock; + Row_lock_impl *found_lock; void *ptr; bool inserted= false; + bool locked= false; + uchar *key_copy= NULL; retry: while (!(ptr= lf_hash_search(&lf_hash, pins, key, keylen))) { - Row_lock new_lock; + Row_lock_impl new_lock; + new_lock.write_handle.is_write_handle= 1; + new_lock.read_handle.is_write_handle= 0; new_lock.deleted= FALSE; new_lock.waiters= 0; - new_lock.busy= 0; + new_lock.write_lock.count= 0; + memset(&new_lock.write_lock, 0, sizeof(new_lock.write_lock)); + memset(&new_lock.read_locks, 0, sizeof(new_lock.read_locks)); if (!key_copy && !(key_copy= (uchar*)my_malloc(keylen, MYF(0)))) return NULL; @@ -145,7 +261,7 @@ Row_lock* LockTable::get_lock(LF_PINS *pins, const uchar* key, size_t keylen, int res= lf_hash_insert(&lf_hash, pins, &new_lock); if (res == -1) - goto return_null; /* out of memory */ + goto err; /* out of memory */ inserted= !res; if (inserted) @@ -170,9 +286,9 @@ Row_lock* LockTable::get_lock(LF_PINS *pins, const uchar* key, size_t keylen, } if (ptr == MY_ERRPTR) - goto return_null; /* Out of memory */ + goto err; /* Out of memory */ - found_lock= (Row_lock*)ptr; + found_lock= (Row_lock_impl*)ptr; mysql_mutex_lock(&found_lock->mutex); if (found_lock->deleted) @@ -183,123 +299,279 @@ Row_lock* LockTable::get_lock(LF_PINS *pins, const uchar* key, size_t keylen, goto retry; } - /* We're holding Row_lock's mutex, which prevents anybody from deleting it */ + /* We're holding Row_lock_impl's mutex, which prevents anybody from deleting it */ lf_hash_search_unpin(pins); - if (!found_lock->busy) + /* This will release the mutex: */ + locked= do_locking_action(pins, found_lock, timeout_sec, is_write_lock); + +err: + if (key_copy) + my_free(key_copy); + + return locked? found_lock->get_lock_handle(is_write_lock) : NULL; +} + + +/* + Given a lock's Row_lock structure, acquire the lock. + + Before actually doing that, check that this is not a "spurious wake up". + + @detail + The caller guarantees that found_lock is an existing Row_lock_impl, in + particular, we own found_lock->mutex which prevents others from modifying + it. + + @return + true - Got the lock + false - Didn't get the lock +*/ + +bool LockTable::do_locking_action(LF_PINS *pins, Row_lock_impl *found_lock, + int timeout_sec, bool is_write_lock) +{ +#ifndef STANDALONE_UNITTEST + bool enter_cond_done= false; + PSI_stage_info old_stage; + THD *thd; +#endif + bool retval= true; + +restart: + if (is_write_lock) { - /* We got the Row_lock. Do nothing. */ - found_lock->busy= 1; - found_lock->owner_data= pins; - mysql_mutex_unlock(&found_lock->mutex); + if (found_lock->have_write_lock() && + found_lock->write_lock.owner_info == pins) + { + /* We already have this lock, so just increment the count */ + found_lock->write_lock.count++; + retval= true; + goto func_exit; + } + + /* + We can get a write lock if + 1. there are no other write locks (we've already handled the case + when the present write lock is ours), and + 2. there are no other read locks, except maybe our own lock + */ + if (!found_lock->have_write_lock() && // (1) + (!found_lock->have_read_locks() || // (2) + found_lock->have_one_read_lock(pins))) // (2) + { + found_lock->write_lock.owner_info= pins; + found_lock->write_lock.count++; + retval= true; + goto func_exit; + } + else + { + goto wait_and_retry; + } } else { - if (found_lock->owner_data == pins) + /* + We can get a read lock if + - there are no write locks (except maybe our lock) + */ + if (found_lock->have_write_lock() && + found_lock->write_lock.owner_info != pins) { - /* We already own this lock */ - found_lock->busy++; - mysql_mutex_unlock(&found_lock->mutex); + goto wait_and_retry; } - else + + /* Find, or insert our lock */ + int i; + for (i= 0; i < Row_lock_impl::MAX_READ_LOCKS; i++) { - /* The found row_lock is not ours. Wait for it. */ - found_lock->waiters++; - int res= 0; + if (!found_lock->read_locks[i].is_valid() || + found_lock->read_locks[i].owner_info == pins) + { + break; + } + } + if (i == Row_lock_impl::MAX_READ_LOCKS) + { + /* Too many read locks. */ + retval= false; + goto func_exit; + } + found_lock->read_locks[i].owner_info= pins; + found_lock->read_locks[i].count++; + retval= true; + goto func_exit; + } - struct timespec wait_timeout; - set_timespec(wait_timeout, timeout_sec); +wait_and_retry: + { + found_lock->waiters++; + int res= 0; + + struct timespec wait_timeout; + set_timespec(wait_timeout, timeout_sec); #ifndef STANDALONE_UNITTEST - THD *thd= current_thd; - PSI_stage_info old_stage; - thd_enter_cond(thd, &found_lock->cond, &found_lock->mutex, - &stage_waiting_on_row_lock, &old_stage); + thd= current_thd; + thd_enter_cond(thd, &found_lock->cond, &found_lock->mutex, + &stage_waiting_on_row_lock, &old_stage); + enter_cond_done= true; #endif - while (found_lock->busy) - { - res= mysql_cond_timedwait(&found_lock->cond, &found_lock->mutex, - &wait_timeout); - bool killed= false; + bool killed= false; + do + { + res= mysql_cond_timedwait(&found_lock->cond, &found_lock->mutex, + &wait_timeout); + + DBUG_EXECUTE_IF("myrocks_simulate_lock_timeout1", + {res= ETIMEDOUT;}); #ifndef STANDALONE_UNITTEST - killed= thd_killed(thd); + killed= thd_killed(thd); #endif - if (res == ETIMEDOUT || killed) - { - if (found_lock->busy) - { - // We own the mutex still - found_lock->waiters--; // we're not waiting anymore - mysql_mutex_unlock(&found_lock->mutex); - goto return_null; - } - else - break; - } - if (res!=0) - fprintf(stderr, "wait failed: %d\n", res); - } + } while (!killed && res == EINTR); + + if (res || killed) + { + if (res != ETIMEDOUT) + fprintf(stderr, "wait failed: %d\n", res); - /* - Ok, now we own the mutex again, and the lock is released. Take it. - */ - DBUG_ASSERT(!found_lock->busy); - found_lock->busy= 1; - found_lock->owner_data= pins; found_lock->waiters--; // we're not waiting anymore + + retval= false; + goto func_exit; + } + } + /* Ok, wait succeeded */ + found_lock->waiters--; // we're not waiting anymore + goto restart; + +func_exit: + { + bool free_lock= (!found_lock->have_read_locks() && + !found_lock->have_write_lock()); + char *rowkey= found_lock->rowkey; + + if (free_lock) + found_lock->deleted= true; + #ifndef STANDALONE_UNITTEST - thd_exit_cond(thd, &old_stage); + if (enter_cond_done) + thd_exit_cond(current_thd, &old_stage); + else + mysql_mutex_unlock(&found_lock->mutex); #else mysql_mutex_unlock(&found_lock->mutex); #endif + + if (free_lock) + { + int res __attribute__((unused)); + res= lf_hash_delete(&lf_hash, pins, found_lock->rowkey, found_lock->len); + DBUG_ASSERT(res == 0); + my_free(rowkey); } } - if (key_copy) - my_free(key_copy); - return found_lock; - -return_null: - if (key_copy) - my_free(key_copy); - return NULL; + return retval; } /* - Release the previously obtained lock - @param pins This thread pins - @param own_lock Previously obtained lock + @brief Release a previously obtained lock + + @param pins This thread pins + @param own_lock Previously obtained lock + @param is_write_lock Whether this was a write lock or a read lock. + + @detail + Release a lock. + If nobody is holding/waiting, we will also delete the Row_lock entry. + If somebody is waiting, we will signal them. + */ -void LockTable::release_lock(LF_PINS *pins, Row_lock *own_lock) +void LockTable::release_lock(LF_PINS *pins, Row_lock *own_lock_handle) { + bool is_write_lock; + Row_lock_impl *own_lock= own_lock_handle->get_impl(&is_write_lock); /* Acquire the mutex to prevent anybody from getting into the queue */ mysql_mutex_lock(&own_lock->mutex); - DBUG_ASSERT(own_lock->owner_data == pins); + if (is_write_lock) + { + DBUG_ASSERT(own_lock->write_lock.owner_info == pins && + own_lock->write_lock.count > 0); + + if (--own_lock->write_lock.count) + { + /* + We've released the lock once. We've acquired it more than once though, + so we still keep it. + */ + mysql_mutex_unlock(&own_lock->mutex); + return; + } - if (--own_lock->busy) + /* Fall through to either signaling the waiters or deleting the lock*/ + } + else { + /* Releasing a read lock. */ + int i; + for (i=0; i < Row_lock_impl::MAX_READ_LOCKS; i++) + { + if (own_lock->read_locks[i].owner_info == pins) + break; // this is our lock + } + + DBUG_ASSERT(i < Row_lock_impl::MAX_READ_LOCKS && + own_lock->read_locks[i].count != 0); + + if (--own_lock->read_locks[i].count) + { + /* Released our lock, but it was acquired more than once. */ + mysql_mutex_unlock(&own_lock->mutex); + return; + } + /* - We've released the lock once. We've acquired it more than once though, - so we still keep it. + Removing our lock may leave a gap in the array of read locks. + Move the last lock to remove the gap. */ - mysql_mutex_unlock(&own_lock->mutex); - return; + int j; + for (j= i+1; j < Row_lock_impl::MAX_READ_LOCKS; j++) + { + if (!own_lock->read_locks[j].is_valid()) + break; + } + + if (j != i+1) + { + own_lock->read_locks[i]= own_lock->read_locks[j-1]; + own_lock->read_locks[j-1].count= 0; + } + + if (own_lock->have_read_locks() || own_lock->have_write_lock()) + { + // One less read lock. No difference. + mysql_mutex_unlock(&own_lock->mutex); + return; + } + + /* Fall through to either signaling the waiters or deleting the lock*/ } if (own_lock->waiters) { - /* - Somebody is waiting for this lock (they can't stop as we're holding the - mutex). They are now responsible for disposing of the lock. - */ - mysql_cond_signal(&own_lock->cond); + mysql_cond_broadcast(&own_lock->cond); mysql_mutex_unlock(&own_lock->mutex); + return; } - else + + /* Nobody is waiting */ + if (!own_lock->have_read_locks() && !own_lock->have_write_lock()) { - /* Nobody's waiting. Release the lock */ + /* this will call mysql_mutex_unlock() */ char *rowkey= own_lock->rowkey; own_lock->deleted= true; mysql_mutex_unlock(&own_lock->mutex); @@ -308,4 +580,10 @@ void LockTable::release_lock(LF_PINS *pins, Row_lock *own_lock) DBUG_ASSERT(res == 0); my_free(rowkey); } + else + { + /* Nobody is waiting, but we still have a lock */ + mysql_mutex_unlock(&own_lock->mutex); + } } + diff --git a/storage/rocksdb/rdb_locks.h b/storage/rocksdb/rdb_locks.h index 48892242b334..c723ddce2801 100644 --- a/storage/rocksdb/rdb_locks.h +++ b/storage/rocksdb/rdb_locks.h @@ -21,60 +21,55 @@ #include "my_sys.h" #include "lf.h" +class Row_lock_impl; + + /* - A row lock that one gets from LockTable. + A Row Lock Handle. + + @brief + A handle is either a read lock handle, or a write lock handle. + + @detail + Internally, there is one Row_lock_impl object for a given key value. + The object stores info about all read/write locks that were obtained + for this key value. - note: the structure is stored in LF_HASH, which will copy a part of - structure with memcpy(). See LockTable::init(). + When the user calls LockTable::release_lock(), we need to know whether + this he is releasing a read lock, or a write lock (he may have both at the + same time). + + In order to achieve that, Row_lock_impl has a read Row_lock and a write + Row_lock. LockTable can check the type of lock, and then + Row_lock::get_impl() uses offsetof() pointer arithmetic to get the + underlying Row_lock. */ + class Row_lock { public: - char *rowkey; /* The key this lock is for */ - int len; /* length of the rowkey */ - - /* TRUE - this row_lock is being deleted */ - bool deleted; - - /* How many are waiting for the lock */ - int waiters; - - /* - busy==0 - means free - busy>=1 - means the lock is occupied, the number tells how many rows the - lock was acquired - */ - int busy; - - /* - Some opaque data that identifies the lock owner. This is needed so we can - tell if this is the lock owner requesting the lock the second time, or - somebody else. - */ - void *owner_data; - - /* - One must hold this mutex - - when marking lock as busy or free - - when adding/removing himself from waiters - the mutex is also associated with the condition when waiting for the lock. - */ - mysql_mutex_t mutex; - mysql_cond_t cond; + char is_write_handle; + + Row_lock_impl *get_impl(bool *is_write_handle); }; /* - A table of locks. It is backed by a lock-free hash. + @brief + A table of row locks. It is backed by a lock-free hash. - INTERNALS - - Locks are exclusive. - - If a thread has an element in the hashtable, it has a lock. + @detail + There are two types of locks: + - Read lock (compatible with other read locks) + - Write lock (not compatible with either read or write locks). + + Locks are recursive: the same thread can hold both read and write lock on the + same row. It can also acquire multiple read or write locks. Each get_lock() + call must have a matching release_lock() call. */ class LockTable { -public: LF_HASH lf_hash; public: @@ -82,13 +77,18 @@ class LockTable lf_hashfunc_t hashfunc); void cleanup(); - /* - Before using the LockTable, each thread should get its own "pins". - */ + /* Before using the LockTable, each thread should get its own "pins". */ LF_PINS* get_pins() { return lf_hash_get_pins(&lf_hash); } void put_pins(LF_PINS *pins) { return lf_hash_put_pins(pins); } Row_lock* get_lock(LF_PINS *pins, const uchar* key, size_t keylen, - int timeout_sec); + int timeout_sec, bool is_write_lock); + void release_lock(LF_PINS *pins, Row_lock *own_lock); + +private: + bool do_locking_action(LF_PINS *pins, Row_lock_impl *found_lock, + int timeout_sec, bool is_write_lock); }; + + diff --git a/storage/rocksdb/unittest/CMakeLists.txt b/storage/rocksdb/unittest/CMakeLists.txt index c5d4e9d5971c..13d601f5addb 100644 --- a/storage/rocksdb/unittest/CMakeLists.txt +++ b/storage/rocksdb/unittest/CMakeLists.txt @@ -7,4 +7,9 @@ ADD_DEFINITIONS(-DSTANDALONE_UNITTEST) ADD_EXECUTABLE(test_rowlocks test_rowlocks.cc ../rdb_locks.cc ../rdb_locks.h) + +ADD_EXECUTABLE(test_rwlocks + test_rwlocks.cc ../rdb_locks.cc ../rdb_locks.h) + #MY_ADD_TEST(test_rowlocks) + diff --git a/storage/rocksdb/unittest/test_rowlocks.cc b/storage/rocksdb/unittest/test_rowlocks.cc index a2c43b379f18..fd2fd5680da3 100644 --- a/storage/rocksdb/unittest/test_rowlocks.cc +++ b/storage/rocksdb/unittest/test_rowlocks.cc @@ -6,7 +6,11 @@ #include "../rdb_locks.h" #include "thr_template.cc" - +/***************************************************************************** + * + * This is a basic test for row locks. It only uses Write locks. + * + *****************************************************************************/ /* This will hold one lock table that we're testing */ LockTable *lock_table; @@ -52,7 +56,7 @@ ulong int_hashfunc(const char *key, size_t key_len) pthread_handler_t locktable_test1(void *arg) { LF_PINS *pins; - pins= lf_hash_get_pins(&lock_table->lf_hash); + pins= lock_table->get_pins(); /* In a loop, get a couple of locks */ int loop; @@ -78,9 +82,11 @@ pthread_handler_t locktable_test1(void *arg) Row_lock *lock1; Row_lock *lock2; - lock1= lock_table->get_lock(pins, (uchar*)&val1, sizeof(int), timeout_sec); + lock1= lock_table->get_lock(pins, (uchar*)&val1, sizeof(int), timeout_sec, + true); DBUG_ASSERT(lock1); - lock2= lock_table->get_lock(pins, (uchar*)&val2, sizeof(int), timeout_sec); + lock2= lock_table->get_lock(pins, (uchar*)&val2, sizeof(int), timeout_sec, + true); if (!prevent_deadlocks && !lock2) { diff --git a/storage/rocksdb/unittest/test_rwlocks.cc b/storage/rocksdb/unittest/test_rwlocks.cc new file mode 100644 index 000000000000..d82421d3416f --- /dev/null +++ b/storage/rocksdb/unittest/test_rwlocks.cc @@ -0,0 +1,395 @@ +/* + TODO: MP AB Copyrights +*/ +#include +#include +#include "../rdb_locks.h" + +#include "thr_template.cc" + +/***************************************************************************** + * + * This is a basic test for Read/Write Row Locks. + * + *****************************************************************************/ + + +///////////////////////////////////////////////////////////////////////////// +// Utility data structures +///////////////////////////////////////////////////////////////////////////// +/* This will hold one lock table that we're testing */ +LockTable *lock_table; + +const int N_ACCTS= 100; +int bank_accounts[N_ACCTS]; +int total_money; + +bool prevent_deadlocks= true; + +int n_deadlocks= 0; + +int timeout_sec; + +int compare_int_keys(const uchar *s, size_t slen, + const uchar *t, size_t tlen) +{ + DBUG_ASSERT(slen==sizeof(int)); + DBUG_ASSERT(tlen==sizeof(int)); + int sval; + int tval; + memcpy(&sval, s, sizeof(int)); + memcpy(&tval, t, sizeof(int)); + if (sval < tval) + return -1; + else if (sval > tval) + return 1; + else + return 0; +} + +/* Not really a hash function */ +ulong int_hashfunc(const char *key, size_t key_len) +{ + DBUG_ASSERT(key_len == sizeof(int)); + int keyval; + memcpy(&keyval, key, sizeof(int)); + return keyval; +} + +///////////////////////////////////////////////////////////////////////////// +// Real tests +///////////////////////////////////////////////////////////////////////////// + +LF_PINS *thread1_pins; + +/* + Really basic tests with one thread. +*/ +void basic_test() +{ + fprintf(stderr, "Basic test (in one thread) starting\n"); + thread1_pins= lock_table->get_pins(); + LF_PINS *pins= thread1_pins; + + /* Two write locks */ + Row_lock *lock1; + Row_lock *lock2; + + int val1; + + fprintf(stderr, " Test recursive acquisition of a read lock\n"); + val1=1; + + lock1= lock_table->get_lock(pins, (uchar*)&val1, sizeof(int), timeout_sec, + false); + lock2= lock_table->get_lock(pins, (uchar*)&val1, sizeof(int), timeout_sec, + false); + DBUG_ASSERT(lock1 && lock2); + lock_table->release_lock(pins, lock1); + lock_table->release_lock(pins, lock2); + + fprintf(stderr, " Test recursive acquisition of a write lock\n"); + val1=2; + + lock1= lock_table->get_lock(pins, (uchar*)&val1, sizeof(int), timeout_sec, + true); + lock2= lock_table->get_lock(pins, (uchar*)&val1, sizeof(int), timeout_sec, + true); + DBUG_ASSERT(lock1 && lock2); + lock_table->release_lock(pins, lock1); + lock_table->release_lock(pins, lock2); + + fprintf(stderr, " Acquire a read lock, then a write lock\n"); + val1=2; + + lock1= lock_table->get_lock(pins, (uchar*)&val1, sizeof(int), timeout_sec, + false); + lock2= lock_table->get_lock(pins, (uchar*)&val1, sizeof(int), timeout_sec, + true); + DBUG_ASSERT(lock1 && lock2); + lock_table->release_lock(pins, lock1); + lock_table->release_lock(pins, lock2); + + fprintf(stderr, " Acquire a write lock, then a read lock\n"); + val1=2; + + lock1= lock_table->get_lock(pins, (uchar*)&val1, sizeof(int), timeout_sec, + true); + lock2= lock_table->get_lock(pins, (uchar*)&val1, sizeof(int), timeout_sec, + false); + DBUG_ASSERT(lock1 && lock2); + lock_table->release_lock(pins, lock1); + lock_table->release_lock(pins, lock2); + + fprintf(stderr, " Test what happens when one gets MAX_READ_LOCKS locks\n"); + const int MAX_READ_LOCKS= 10; + + /* Use different LF_PINS* to pretend that we are from different threads */ + LF_PINS *extra_pins[MAX_READ_LOCKS]; + Row_lock *thread_locks[MAX_READ_LOCKS]; + int i; + for (i= 0; i < MAX_READ_LOCKS; i++) + extra_pins[i]= lock_table->get_pins(); + + for (int count=0; count<2; count++) + { + fprintf(stderr, " .. get MAX_READ_LOCKS read locks\n"); + /* Get max possible number of read locks */ + for (i= 0; i < MAX_READ_LOCKS; i++) + { + thread_locks[i]= + lock_table->get_lock(extra_pins[i], (uchar*)&val1, sizeof(int), + timeout_sec, false); + DBUG_ASSERT(thread_locks[i]); + } + + fprintf(stderr, " .. try getting another read lock\n"); + lock1= lock_table->get_lock(pins, (uchar*)&val1, sizeof(int), timeout_sec, + false); + DBUG_ASSERT(!lock1); + fprintf(stderr, " .. now, release one lock and try getting it again\n"); + lock_table->release_lock(extra_pins[0], thread_locks[0]); + lock1= lock_table->get_lock(pins, (uchar*)&val1, sizeof(int), timeout_sec, + false); + DBUG_ASSERT(lock1); + + for (i= 1; i < MAX_READ_LOCKS; i++) + lock_table->release_lock(extra_pins[i], thread_locks[i]); + + lock_table->release_lock(pins, lock1); + } + + for (i= 0; i < MAX_READ_LOCKS; i++) + lock_table->put_pins(extra_pins[i]); + + fprintf(stderr, "Basic test finished\n"); +} + +/* + A two-threads test +*/ +typedef void (*callback_func_t)(); + +callback_func_t thread2_work= NULL; +mysql_mutex_t thread2_has_work_mutex; +mysql_cond_t thread2_has_work_cond; + +bool thread2_exit= false; +bool thread2_done= false; +mysql_mutex_t thread2_done_mutex; +mysql_cond_t thread2_done_cond; + +void do_in_thread2(callback_func_t func); +pthread_handler_t thread2_func(void *arg); + +// Secondary thread locals +Row_lock *otherlock; +LF_PINS *thread2_pins; + +// Second thread funcs +void init_second_thread() +{ + fprintf(stderr, "Initializing second thread\n"); + thread2_pins= lock_table->get_pins(); +} + + +void get_a_read_lock() +{ + const int val1= 1; + otherlock= lock_table->get_lock(thread2_pins, (uchar*)&val1, sizeof(int), 10, + false); + DBUG_ASSERT(otherlock); +} + +void release_a_read_lock() +{ + lock_table->release_lock(thread2_pins, otherlock); +} + +void get_a_write_lock() +{ + const int val1= 1; + otherlock= lock_table->get_lock(thread2_pins, (uchar*)&val1, sizeof(int), 10, + true); + DBUG_ASSERT(otherlock); +} + +void release_a_write_lock() +{ + lock_table->release_lock(thread2_pins, otherlock); +} + +void do_nothing() +{ +} + +/* + Test number two: test read-write locks using two threads +*/ +void test2() +{ + int res; + pthread_attr_t thr_attr; + pthread_t thread2; + LF_PINS *pins= thread1_pins; + + fprintf(stderr, "Test#2 (two threads, slow) starting\n"); + mysql_mutex_init(0, &thread2_has_work_mutex, 0); + mysql_cond_init (0, &thread2_has_work_cond, 0); + mysql_mutex_init(0, &thread2_done_mutex, 0); + mysql_cond_init (0, &thread2_done_cond, 0); + + pthread_attr_init(&thr_attr); + pthread_attr_setdetachstate(&thr_attr,PTHREAD_CREATE_DETACHED); + res= pthread_create(&thread2, &thr_attr, thread2_func, NULL /*arg*/); + DBUG_ASSERT(!res); + + fprintf(stderr, " Test that two threads can acquire a read-lock\n"); + const int val1= 1; + Row_lock *lock1; + Row_lock *lock2; + lock1= lock_table->get_lock(pins, (uchar*)&val1, sizeof(int), timeout_sec, + false); + + do_in_thread2(init_second_thread); + do_in_thread2(get_a_read_lock); + do_in_thread2(release_a_read_lock); + lock_table->release_lock(pins, lock1); + + fprintf(stderr, " Test that a write-lock can not be acquired by two threads\n"); + do_in_thread2(get_a_write_lock); + lock1= lock_table->get_lock(pins, (uchar*)&val1, sizeof(int), 3 /* seconds timeout */, + true); + DBUG_ASSERT(lock1==NULL); + fprintf(stderr, " Test that a write lock prevents acquisition of a read lock\n"); + lock1= lock_table->get_lock(pins, (uchar*)&val1, sizeof(int), 3 /* seconds timeout */, + false); + DBUG_ASSERT(lock1==NULL); + do_in_thread2(release_a_write_lock); + + fprintf(stderr, " Take read locks by both threads, then try to get a write lock also\n"); + do_in_thread2(get_a_read_lock); + lock1= lock_table->get_lock(pins, (uchar*)&val1, sizeof(int), 3 /* seconds timeout */, + false); + DBUG_ASSERT(lock1); + lock2= lock_table->get_lock(pins, (uchar*)&val1, sizeof(int), 3 /* seconds timeout */, + true); + DBUG_ASSERT(lock2==NULL); + do_in_thread2(release_a_read_lock); + lock_table->release_lock(pins, lock1); + + /* Finish */ + thread2_exit= true; + do_in_thread2(do_nothing); + pthread_attr_destroy(&thr_attr); + fprintf(stderr, "Test#2 finished\n"); +} + +void do_in_thread2(callback_func_t func) +{ + // Tell thread2 it has work to do + mysql_mutex_lock(&thread2_has_work_mutex); + thread2_work= func; + thread2_done= false; + mysql_cond_signal(&thread2_has_work_cond); + mysql_mutex_unlock(&thread2_has_work_mutex); + + // Wait for work to be finished + mysql_mutex_lock(&thread2_done_mutex); + while (!thread2_done) + { + int res; + do { + res= mysql_cond_wait(&thread2_done_cond, &thread2_done_mutex); + } while (res == EINTR); + DBUG_ASSERT(!res); + } + thread2_work= NULL; // ?? + mysql_mutex_unlock(&thread2_done_mutex); +} + +pthread_handler_t thread2_func(void *arg) +{ + while (1) + { + mysql_mutex_lock(&thread2_has_work_mutex); + while (!thread2_work) + { + int res; + do { + res= mysql_cond_wait(&thread2_has_work_cond, &thread2_has_work_mutex); + } while (res == EINTR); + DBUG_ASSERT(!res); + } + mysql_mutex_unlock(&thread2_has_work_mutex); + + thread2_work(); + + // Report that work is finished + mysql_mutex_lock(&thread2_done_mutex); + thread2_work= NULL; + thread2_done= true; + mysql_cond_signal(&thread2_done_cond); + mysql_mutex_unlock(&thread2_done_mutex); + + if (thread2_exit) + break; + } + return 0; +} + +void init_shared_data() +{ + total_money= 0; + for (int i=0; i < N_ACCTS;i++) + { + bank_accounts[i]= 1000; + total_money += bank_accounts[i]; + } +} + +void check_shared_data(const char *name) +{ + int money_after= 0; + for (int i=0; i < N_ACCTS; i++) + money_after += bank_accounts[i]; + if (money_after == total_money) + fprintf(stderr, "# validation %s ok\n", name); + else + fprintf(stderr, "# validation %s failed: expected %d found %d\n", name, + total_money, money_after); +} + +void do_tests() +{ + fprintf(stderr, "# lf_hash based lock table tests\n"); + + /* Global initialization */ + lock_table= new LockTable; + lock_table->init(compare_int_keys, int_hashfunc); + + init_shared_data(); + + basic_test(); + + test2(); + + prevent_deadlocks= true; + timeout_sec= 10*1000; + + //locktable_test1(NULL); +#if 0 + test_concurrently("locktable_test1", locktable_test1, 2 /*THREADS*/, 10 /*CYCLES*/); + check_shared_data("1"); + + prevent_deadlocks= false; + timeout_sec= 2; + test_concurrently("locktable_test1", locktable_test1, 2 /*THREADS*/, 10 /*CYCLES*/); + check_shared_data("2"); + fprintf(stderr, "# n_deadlocks=%d\n", n_deadlocks); +#endif + lock_table->cleanup(); + + fprintf(stderr, "# tests end\n"); +}