Skip to content

Commit

Permalink
Issue facebook#44: SELECT ... LOCK IN SHARE MODE
Browse files Browse the repository at this point in the history
Summary:
Add support for read and read write locks in RocksDB's LockTable.
The implementation is more concerned with correctness than with
concurrency.

Differential Revision: https://reviews.facebook.net/D38265

fbshipit-source-id: 374184b65bd
  • Loading branch information
spetrunia authored and inikep committed Aug 30, 2021
1 parent 76f3400 commit 2c8a101
Show file tree
Hide file tree
Showing 12 changed files with 938 additions and 151 deletions.
20 changes: 20 additions & 0 deletions mysql-test/suite/rocksdb/r/rocksdb_locks.result
@@ -0,0 +1,20 @@
create table t1 (pk int not null primary key) engine=rocksdb;
insert into t1 values (1),(2),(3);
set autocommit=0;
begin;
select * from t1 where pk=1 for update;
pk
1
### Connection con1
set @@rocksdb_lock_wait_timeout=500;
set autocommit=0;
begin;
set session debug= "+d,myrocks_simulate_lock_timeout1";
select * from t1 where pk=1 for update;;
### Connection default
rollback;
### Connection con1
ERROR HY000: Lock wait timeout exceeded; try restarting transaction
set session debug= "-d,myrocks_simulate_lock_timeout1";
set autocommit=1;
drop table t1;
2 changes: 1 addition & 1 deletion mysql-test/suite/rocksdb/r/select_for_update.result
@@ -1,5 +1,5 @@
DROP TABLE IF EXISTS t1;
CREATE TABLE t1 (a <INT_COLUMN>, b <CHAR_COLUMN>) ENGINE=<STORAGE_ENGINE> <CUSTOM_TABLE_OPTIONS>;
CREATE TABLE t1 (a INT, b CHAR(8), pk INT AUTO_INCREMENT PRIMARY KEY) ENGINE=rocksdb;
INSERT INTO t1 (a,b) VALUES (1,'a'),(2,'b'),(3,'a');
connect con1,localhost,root,,;
BEGIN;
Expand Down
@@ -1,5 +1,5 @@
DROP TABLE IF EXISTS t1;
CREATE TABLE t1 (a <INT_COLUMN>, b <CHAR_COLUMN>) ENGINE=<STORAGE_ENGINE> <CUSTOM_TABLE_OPTIONS>;
CREATE TABLE t1 (a INT, b CHAR(8), pk INT AUTO_INCREMENT PRIMARY KEY) ENGINE=rocksdb;
INSERT INTO t1 (a,b) VALUES (1,'a'),(2,'b'),(3,'a');
connect con1,localhost,root,,;
BEGIN;
Expand Down
2 changes: 0 additions & 2 deletions mysql-test/suite/rocksdb/t/disabled.def
@@ -1,8 +1,6 @@
type_blob_indexes : MDEV-4097 (Indexes on text/blob fields are not allowed)
type_text_indexes : MDEV-4097 (Indexes on text/blob fields are not allowed)
cons_snapshot_serializable : Consistent read does not work on serializable
select_for_update : Not supported?
select_lock_in_share_mode : Not supported?
level_read_uncommitted : Not supported?
level_serializable: Not supported?
level_repeatable_read: Can't seen own changes
Expand Down
48 changes: 48 additions & 0 deletions mysql-test/suite/rocksdb/t/rocksdb_locks.test
@@ -0,0 +1,48 @@
#
# MyRocks-specific tests for locking
#
--source include/have_debug.inc

create table t1 (pk int not null primary key) engine=rocksdb;

insert into t1 values (1),(2),(3);

set autocommit=0;
begin;
select * from t1 where pk=1 for update;

--connect (con1,localhost,root,,)
--connection con1
--echo ### Connection con1
let $ID= `select connection_id()`;
set @@rocksdb_lock_wait_timeout=500;
set autocommit=0;
begin;
set session debug= "+d,myrocks_simulate_lock_timeout1";
--send select * from t1 where pk=1 for update;

--connection default
--echo ### Connection default

let $wait_condition= select 1 from INFORMATION_SCHEMA.PROCESSLIST
where ID = $ID and STATE = "Waiting for row lock";
--source include/wait_condition.inc
## Waiting for row lock
## select connection_id();
## select state='Waiting for row lock' from information_schema.processlist where id=2;

rollback;

--connection con1
--echo ### Connection con1

--error ER_LOCK_WAIT_TIMEOUT
--reap
set session debug= "-d,myrocks_simulate_lock_timeout1";

--disconnect con1
--connection default

set autocommit=1;
drop table t1;

49 changes: 40 additions & 9 deletions storage/rocksdb/ha_rocksdb.cc
Expand Up @@ -1015,15 +1015,17 @@ class Rdb_transaction
max_row_locks= max_row_locks_arg;
}

Row_lock *get_lock(const uchar* key, size_t keylen, bool *timed_out)
Row_lock *get_lock(const uchar* key, size_t keylen, bool is_write_lock,
bool *timed_out)
{
Row_lock *lock;
if (trx_locks.elements() > max_row_locks)
{
*timed_out= false;
return NULL;
}
if (!(lock= row_locks.get_lock(pins, key, keylen, timeout_sec)))
if (!(lock= row_locks.get_lock(pins, key, keylen, timeout_sec,
is_write_lock)))
{
*timed_out= true;
return NULL;
Expand Down Expand Up @@ -2381,6 +2383,8 @@ int ha_rocksdb::open(const char *name, int mode, uint test_if_locked)
"Attempt to open a table that is not present in RocksDB-SE data dictionary");
DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
}
lock_rows= false;
lock_for_write= false;

key_descr= tbl_def->key_descr;
pk_descr= key_descr[table->s->primary_key];
Expand Down Expand Up @@ -3557,8 +3561,11 @@ int ha_rocksdb::get_row_by_rowid(uchar *buf, const char *rowid, uint rowid_size)
bool timed_out;
if (lock_rows)
{
if (!(lock= trx->get_lock((const uchar*)rowid, rowid_size, &timed_out)))
if (!(lock= trx->get_lock((const uchar*)rowid, rowid_size, lock_for_write,
&timed_out)))
{
DBUG_RETURN(return_lock_failure(timed_out));
}
}

rocksdb::ReadOptions options;
Expand Down Expand Up @@ -3849,7 +3856,7 @@ int ha_rocksdb::write_row(uchar *buf)
{
DBUG_ENTER("ha_rocksdb::write_row");
DBUG_ASSERT(buf == table->record[0]);
DBUG_ASSERT(lock_rows);
DBUG_ASSERT(lock_rows && lock_for_write);

io_perf_start();

Expand Down Expand Up @@ -4006,7 +4013,7 @@ int ha_rocksdb::update_write_row(const uchar *old_data,
*/
bool timed_out;
Row_lock *lock= NULL; // init to shut up the compiler
if (!(lock= trx->get_lock(new_key_packed_tuple, new_packed_size,
if (!(lock= trx->get_lock(new_key_packed_tuple, new_packed_size, true,
&timed_out)))
{
/*
Expand Down Expand Up @@ -4314,7 +4321,8 @@ int ha_rocksdb::rnd_next_with_direction(uchar *buf, bool move_forward)
*/
Row_lock *lock;
bool timed_out;
if (!(lock= trx->get_lock((const uchar*)key.data(), key.size(), &timed_out)))
if (!(lock= trx->get_lock((const uchar*)key.data(), key.size(),
lock_for_write, &timed_out)))
{
io_perf_end_and_record();
DBUG_RETURN(return_lock_failure(timed_out));
Expand Down Expand Up @@ -4598,7 +4606,7 @@ void ha_rocksdb::calc_updated_indexes()
int ha_rocksdb::update_row(const uchar *old_data, uchar *new_data)
{
DBUG_ENTER("ha_rocksdb::update_row");
DBUG_ASSERT(lock_rows);
DBUG_ASSERT(lock_rows && lock_for_write);
/*
old_data points to record we're updating. It is the same as the record
we've just read (for multi-table UPDATE, too, because SQL layer will make
Expand Down Expand Up @@ -4626,6 +4634,26 @@ THR_LOCK_DATA **ha_rocksdb::store_lock(THD *thd,
enum thr_lock_type lock_type)
{
DBUG_ENTER("ha_rocksdb::store_lock");

/* First, make a decision about MyRocks's internal locking */
if (lock_type >= TL_WRITE_ALLOW_WRITE)
{
lock_rows= true;
lock_for_write=true;
}
else if (lock_type == TL_READ_WITH_SHARED_LOCKS)
{
lock_rows= true;
lock_for_write= false;
}
else
{
lock_rows= false;
lock_for_write= false;
}

/* Then, tell the SQL layer what kind of locking it should use: */

if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
{
/*
Expand Down Expand Up @@ -4687,7 +4715,6 @@ int ha_rocksdb::external_lock(THD *thd, int lock_type)
trx->snapshot= NULL;
}
}
lock_rows= FALSE;
}
else
{
Expand All @@ -4709,8 +4736,12 @@ int ha_rocksdb::external_lock(THD *thd, int lock_type)
/*
SQL layer signals us to take a write lock. It does so when starting DML
statement. We should put locks on the rows we're reading.
Note: sometimes, external_lock() can be called without a prior
::store_lock call. That's why we need to set lock_* members here, too.
*/
lock_rows= TRUE;
lock_rows= true;
lock_for_write= true;
}

trans_register_ha(thd, FALSE, rocksdb_hton);
Expand Down
6 changes: 6 additions & 0 deletions storage/rocksdb/ha_rocksdb.h
Expand Up @@ -209,6 +209,12 @@ class ha_rocksdb: public handler
/* If TRUE, reads should place locks on rows */
bool lock_rows;

/*
When lock_rows==true, specifies what kind of locks should be placed
(true means write locks, false means read locks)
*/
bool lock_for_write;

/* Lock that we've placed on the row we've just read. */
Row_lock *last_row_lock;

Expand Down

0 comments on commit 2c8a101

Please sign in to comment.