Skip to content

Commit

Permalink
transaction: use AtomicU8 for status (#429)
Browse files Browse the repository at this point in the history
* use AtomicU8 for status

Signed-off-by: Ping Yu <yuping@pingcap.com>

* fix check

Signed-off-by: Ping Yu <yuping@pingcap.com>

* skip exchange on equal

Signed-off-by: Ping Yu <yuping@pingcap.com>

---------

Signed-off-by: Ping Yu <yuping@pingcap.com>
  • Loading branch information
pingyu committed Nov 1, 2023
1 parent 788d6e2 commit a7885be
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 50 deletions.
2 changes: 1 addition & 1 deletion src/transaction/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ pub async fn resolve_locks(
.await?;
clean_regions
.entry(lock.lock_version)
.or_insert_with(HashSet::new)
.or_default()
.insert(cleaned_region);
}
Ok(live_locks)
Expand Down
2 changes: 1 addition & 1 deletion src/transaction/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::Value;
///
/// See the [Transaction](struct@crate::Transaction) docs for more information on the methods.
#[derive(new)]
pub struct Snapshot<Cod: Codec = ApiV1TxnCodec, PdC: PdClient = PdRpcClient<Cod>> {
pub struct Snapshot<Cod: Codec = ApiV1TxnCodec, PdC: PdClient<Codec = Cod> = PdRpcClient<Cod>> {
transaction: Transaction<Cod, PdC>,
phantom: PhantomData<Cod>,
}
Expand Down
137 changes: 89 additions & 48 deletions src/transaction/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

use std::iter;
use std::marker::PhantomData;
use std::sync::atomic;
use std::sync::atomic::AtomicU8;
use std::sync::Arc;
use std::time::Instant;

Expand All @@ -10,7 +12,6 @@ use fail::fail_point;
use futures::prelude::*;
use log::debug;
use log::warn;
use tokio::sync::RwLock;
use tokio::time::Duration;

use crate::backoff::Backoff;
Expand Down Expand Up @@ -76,8 +77,8 @@ use crate::Value;
/// txn.commit().await.unwrap();
/// # });
/// ```
pub struct Transaction<Cod: Codec = ApiV1TxnCodec, PdC: PdClient = PdRpcClient<Cod>> {
status: Arc<RwLock<TransactionStatus>>,
pub struct Transaction<Cod: Codec = ApiV1TxnCodec, PdC: PdClient<Codec = Cod> = PdRpcClient<Cod>> {
status: Arc<AtomicU8>,
timestamp: Timestamp,
buffer: Buffer,
rpc: Arc<PdC>,
Expand All @@ -99,7 +100,7 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Transaction<Cod, PdC> {
TransactionStatus::Active
};
Transaction {
status: Arc::new(RwLock::new(status)),
status: Arc::new(AtomicU8::new(status as u8)),
timestamp,
buffer: Buffer::new(options.is_pessimistic()),
rpc,
Expand Down Expand Up @@ -632,15 +633,16 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Transaction<Cod, PdC> {
/// ```
pub async fn commit(&mut self) -> Result<Option<Timestamp>> {
debug!("commiting transaction");
{
let mut status = self.status.write().await;
if !matches!(
*status,
TransactionStatus::StartedCommit | TransactionStatus::Active
) {
return Err(Error::OperationAfterCommitError);
}
*status = TransactionStatus::StartedCommit;
if !self.transit_status(
|status| {
matches!(
status,
TransactionStatus::StartedCommit | TransactionStatus::Active
)
},
TransactionStatus::StartedCommit,
) {
return Err(Error::OperationAfterCommitError);
}

let primary_key = self.buffer.get_primary_key();
Expand All @@ -665,8 +667,7 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Transaction<Cod, PdC> {
.await;

if res.is_ok() {
let mut status = self.status.write().await;
*status = TransactionStatus::Committed;
self.set_status(TransactionStatus::Committed);
}
res
}
Expand All @@ -689,21 +690,18 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Transaction<Cod, PdC> {
/// ```
pub async fn rollback(&mut self) -> Result<()> {
debug!("rolling back transaction");
{
let status = self.status.read().await;
if !matches!(
*status,
TransactionStatus::StartedRollback
| TransactionStatus::Active
| TransactionStatus::StartedCommit
) {
return Err(Error::OperationAfterCommitError);
}
}

{
let mut status = self.status.write().await;
*status = TransactionStatus::StartedRollback;
if !self.transit_status(
|status| {
matches!(
status,
TransactionStatus::StartedRollback
| TransactionStatus::Active
| TransactionStatus::StartedCommit
)
},
TransactionStatus::StartedRollback,
) {
return Err(Error::OperationAfterCommitError);
}

let primary_key = self.buffer.get_primary_key();
Expand All @@ -721,8 +719,7 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Transaction<Cod, PdC> {
.await;

if res.is_ok() {
let mut status = self.status.write().await;
*status = TransactionStatus::Rolledback;
self.set_status(TransactionStatus::Rolledback);
}
res
}
Expand Down Expand Up @@ -906,8 +903,7 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Transaction<Cod, PdC> {

/// Checks if the transaction can perform arbitrary operations.
async fn check_allow_operation(&self) -> Result<()> {
let status = self.status.read().await;
match *status {
match self.get_status() {
TransactionStatus::ReadOnly | TransactionStatus::Active => Ok(()),
TransactionStatus::Committed
| TransactionStatus::Rolledback
Expand Down Expand Up @@ -946,9 +942,9 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Transaction<Cod, PdC> {
loop {
tokio::time::sleep(heartbeat_interval).await;
{
let status = status.read().await;
let status: TransactionStatus = status.load(atomic::Ordering::Acquire).into();
if matches!(
*status,
status,
TransactionStatus::Rolledback
| TransactionStatus::Committed
| TransactionStatus::Dropped
Expand Down Expand Up @@ -977,16 +973,45 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Transaction<Cod, PdC> {
}
});
}

fn get_status(&self) -> TransactionStatus {
self.status.load(atomic::Ordering::Acquire).into()
}

fn set_status(&self, status: TransactionStatus) {
self.status.store(status as u8, atomic::Ordering::Release);
}

fn transit_status<F>(&self, check_status: F, next: TransactionStatus) -> bool
where
F: Fn(TransactionStatus) -> bool,
{
let mut current = self.get_status();
while check_status(current) {
if current == next {
return true;
}
match self.status.compare_exchange_weak(
current as u8,
next as u8,
atomic::Ordering::AcqRel,
atomic::Ordering::Acquire,
) {
Ok(_) => return true,
Err(x) => current = x.into(),
}
}
false
}
}

impl<Cod: Codec, PdC: PdClient> Drop for Transaction<Cod, PdC> {
impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Drop for Transaction<Cod, PdC> {
fn drop(&mut self) {
debug!("dropping transaction");
if std::thread::panicking() {
return;
}
let mut status = futures::executor::block_on(self.status.write());
if *status == TransactionStatus::Active {
if self.get_status() == TransactionStatus::Active {
match self.options.check_level {
CheckLevel::Panic => {
panic!("Dropping an active transaction. Consider commit or rollback it.")
Expand All @@ -998,7 +1023,7 @@ impl<Cod: Codec, PdC: PdClient> Drop for Transaction<Cod, PdC> {
CheckLevel::None => {}
}
}
*status = TransactionStatus::Dropped;
self.set_status(TransactionStatus::Dropped);
}
}

Expand Down Expand Up @@ -1432,22 +1457,38 @@ impl<PdC: PdClient> Committer<PdC> {
}
}

#[derive(PartialEq, Eq)]
#[derive(PartialEq, Eq, Clone, Copy)]
#[repr(u8)]
enum TransactionStatus {
/// The transaction is read-only [`Snapshot`](super::Snapshot), no need to commit or rollback or panic on drop.
ReadOnly,
ReadOnly = 0,
/// The transaction have not been committed or rolled back.
Active,
Active = 1,
/// The transaction has committed.
Committed,
Committed = 2,
/// The transaction has tried to commit. Only `commit` is allowed.
StartedCommit,
StartedCommit = 3,
/// The transaction has rolled back.
Rolledback,
Rolledback = 4,
/// The transaction has tried to rollback. Only `rollback` is allowed.
StartedRollback,
StartedRollback = 5,
/// The transaction has been dropped.
Dropped,
Dropped = 6,
}

impl From<u8> for TransactionStatus {
fn from(num: u8) -> Self {
match num {
0 => TransactionStatus::ReadOnly,
1 => TransactionStatus::Active,
2 => TransactionStatus::Committed,
3 => TransactionStatus::StartedCommit,
4 => TransactionStatus::Rolledback,
5 => TransactionStatus::StartedRollback,
6 => TransactionStatus::Dropped,
_ => panic!("Unknown transaction status {}", num),
}
}
}

#[cfg(test)]
Expand Down

0 comments on commit a7885be

Please sign in to comment.