Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: opening & closing many itx should not increase time #3028

Merged
merged 3 commits into from
Jul 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 4 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,19 @@ mod interactive_tx {
let known_err = error.as_known().unwrap();

assert_eq!(known_err.error_code, Cow::Borrowed("P2028"));
assert!(known_err.message.contains("Transaction is no longer valid. Last state"));
assert!(known_err
.message
.contains("A commit cannot be executed on a closed transaction."));

// Wait for cache eviction, no tx should be found.
time::sleep(time::Duration::from_secs(2)).await;
// Try again
let res = runner.commit_tx(tx_id).await?;
let error = res.err().unwrap();
let known_err = error.as_known().unwrap();

assert_eq!(known_err.error_code, Cow::Borrowed("P2028"));
assert!(known_err.message.contains("Transaction not found."));
assert!(known_err
.message
.contains("A commit cannot be executed on a closed transaction."));

Ok(())
}
Expand Down Expand Up @@ -275,14 +278,47 @@ mod interactive_tx {
// Wait for tx to expire
time::sleep(time::Duration::from_millis(1500)).await;

// Expect the state of the tx to be expired.
// Status of the tx must be `Expired`
// Expect the state of the tx to be expired so the commit should fail.
let res = runner.commit_tx(tx_id.clone()).await?;
let error = res.err().unwrap();
let known_err = error.as_known().unwrap();

assert_eq!(known_err.error_code, Cow::Borrowed("P2028"));
assert!(known_err.message.contains("Transaction is no longer valid"));
assert!(known_err
.message
.contains("A commit cannot be executed on a closed transaction."));

// Expect the state of the tx to be expired so the rollback should fail.
let res = runner.rollback_tx(tx_id.clone()).await?;
let error = res.err().unwrap();
let known_err = error.as_known().unwrap();

assert_eq!(known_err.error_code, Cow::Borrowed("P2028"));
assert!(known_err
.message
.contains("A rollback cannot be executed on a closed transaction."));

// Expect the state of the tx to be expired so the query should fail.
assert_error!(
runner,
r#"{ findManyTestModel { id } }"#,
2028,
"A query cannot be executed on a closed transaction."
);

runner
.batch(
vec![
"{ findManyTestModel { id } }".to_string(),
"{ findManyTestModel { id } }".to_string(),
],
false,
)
.await?
.assert_failure(
2028,
Some("A batch query cannot be executed on a closed transaction.".to_string()),
);

Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions query-engine/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ metrics = "0.18"
metrics-util = "0.12.1"
metrics-exporter-prometheus = "0.10.0"
parking_lot = "0.12"
lru = "0.7.7"

[dev-dependencies]
expect-test = "1"
38 changes: 28 additions & 10 deletions query-engine/core/src/interactive_transactions/actor_manager.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{Operation, ResponseData};
use lru::LruCache;
use once_cell::sync::Lazy;
use schema::QuerySchemaRef;
use std::{collections::HashMap, sync::Arc};
Expand All @@ -13,16 +14,23 @@ use tokio::{

use super::{spawn_client_list_clear_actor, spawn_itx_actor, ITXClient, OpenTx, TransactionError, TxId};

pub static CACHE_EVICTION_SECS: Lazy<u64> = Lazy::new(|| match std::env::var("CLOSED_TX_CLEANUP") {
Ok(size) => size.parse().unwrap_or(300),
Err(_) => 300,
pub static CLOSED_TX_CACHE_SIZE: Lazy<usize> = Lazy::new(|| match std::env::var("CLOSED_TX_CACHE_SIZE") {
Ok(size) => size.parse().unwrap_or(100),
Err(_) => 100,
});

static CHANNEL_SIZE: usize = 100;

pub struct TransactionActorManager {
/// Map of active ITx clients
pub clients: Arc<RwLock<HashMap<TxId, ITXClient>>>,
/// Cache of closed transactions. We keep the last N closed transactions in memory to
/// return better error messages if operations are performed on closed transactions.
pub closed_txs: Arc<RwLock<LruCache<TxId, ()>>>,
/// Channel used to signal an ITx is closed and can be moved to the list of closed transactions.
send_done: Sender<TxId>,
/// Handle to the task in charge of clearing actors.
/// Used to abort the task when the TransactionActorManager is dropped.
bg_reader_clear: JoinHandle<()>,
}

Expand All @@ -42,12 +50,14 @@ impl Default for TransactionActorManager {
impl TransactionActorManager {
pub fn new() -> Self {
let clients: Arc<RwLock<HashMap<TxId, ITXClient>>> = Arc::new(RwLock::new(HashMap::new()));
let closed_txs: Arc<RwLock<LruCache<TxId, ()>>> = Arc::new(RwLock::new(LruCache::new(*CLOSED_TX_CACHE_SIZE)));

let (send_done, rx) = channel::<TxId>(CHANNEL_SIZE);
let handle = spawn_client_list_clear_actor(clients.clone(), rx);
let handle = spawn_client_list_clear_actor(clients.clone(), closed_txs.clone(), rx);

Self {
clients,
closed_txs,
send_done,
bg_reader_clear: handle,
}
Expand All @@ -60,16 +70,20 @@ impl TransactionActorManager {
value,
timeout,
CHANNEL_SIZE,
*CACHE_EVICTION_SECS,
self.send_done.clone(),
);

self.clients.write().await.insert(tx_id, client);
}

async fn get_client(&self, tx_id: &TxId) -> crate::Result<ITXClient> {
async fn get_client(&self, tx_id: &TxId, from_operation: &str) -> crate::Result<ITXClient> {
if let Some(client) = self.clients.read().await.get(tx_id) {
Ok(client.clone())
} else if self.closed_txs.read().await.contains(tx_id) {
Err(TransactionError::Closed {
reason: format!("A {from_operation} cannot be executed on a closed transaction."),
}
.into())
} else {
Err(TransactionError::NotFound.into())
}
Expand All @@ -81,7 +95,8 @@ impl TransactionActorManager {
operation: Operation,
trace_id: Option<String>,
) -> crate::Result<ResponseData> {
let client = self.get_client(tx_id).await?;
let client = self.get_client(tx_id, "query").await?;

client.execute(operation, trace_id).await
}

Expand All @@ -91,19 +106,22 @@ impl TransactionActorManager {
operations: Vec<Operation>,
trace_id: Option<String>,
) -> crate::Result<Vec<crate::Result<ResponseData>>> {
let client = self.get_client(tx_id).await?;
let client = self.get_client(tx_id, "batch query").await?;

client.batch_execute(operations, trace_id).await
}

pub async fn commit_tx(&self, tx_id: &TxId) -> crate::Result<()> {
let client = self.get_client(tx_id).await?;
let client = self.get_client(tx_id, "commit").await?;
client.commit().await?;

Ok(())
}

pub async fn rollback_tx(&self, tx_id: &TxId) -> crate::Result<()> {
let client = self.get_client(tx_id).await?;
let client = self.get_client(tx_id, "rollback").await?;
client.rollback().await?;

Ok(())
}
}
43 changes: 8 additions & 35 deletions query-engine/core/src/interactive_transactions/actors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,24 +69,6 @@ impl ITXServer {
}
}

async fn process_eviction_state_msg(&mut self, op: TxOpRequest) {
let msg = match self.cached_tx {
CachedTx::Committed => TxOpResponse::Committed(Ok(())),
CachedTx::RolledBack => TxOpResponse::RolledBack(Ok(())),
CachedTx::Expired => TxOpResponse::Expired,
_ => {
error!("[{}] unexpected state {}", self.id.to_string(), self.cached_tx);
let _ = self.rollback(true).await;
let msg = "The transaction was in an unexpected state and rolledback".to_string();
let err = Err(TransactionError::Unknown { reason: msg }.into());
TxOpResponse::RolledBack(err)
}
};

// we ignore any errors when sending
let _ = op.respond_to.send(msg);
}

async fn execute_single(&mut self, operation: &Operation, trace_id: Option<String>) -> crate::Result<ResponseData> {
let conn = self.cached_tx.as_open()?;
execute_single_operation(
Expand Down Expand Up @@ -255,7 +237,6 @@ pub fn spawn_itx_actor(
value: OpenTx,
timeout: Duration,
channel_size: usize,
cache_eviction_secs: u64,
send_done: Sender<TxId>,
) -> ITXClient {
let (tx_to_server, rx_from_client) = channel::<TxOpRequest>(channel_size);
Expand Down Expand Up @@ -293,23 +274,9 @@ pub fn spawn_itx_actor(
}

trace!("[{}] completed with {}", server.id.to_string(), server.cached_tx);
let eviction_sleep = time::sleep(Duration::from_secs(cache_eviction_secs));
tokio::pin!(eviction_sleep);

loop {
tokio::select! {
_ = &mut eviction_sleep => {
break;
}
msg = server.receive.recv() => {
if let Some(op) = msg {
server.process_eviction_state_msg(op).await;
}
}
}
}

let _ = send_done.send(server.id.clone()).await;

trace!("[{}] has stopped with {}", server.id.to_string(), server.cached_tx);
}
.with_subscriber(dispatcher),
Expand Down Expand Up @@ -361,13 +328,19 @@ pub fn spawn_itx_actor(
*/
pub fn spawn_client_list_clear_actor(
clients: Arc<RwLock<HashMap<TxId, ITXClient>>>,
closed_txs: Arc<RwLock<lru::LruCache<TxId, ()>>>,
mut rx: Receiver<TxId>,
) -> JoinHandle<()> {
tokio::task::spawn(async move {
loop {
if let Some(id) = rx.recv().await {
trace!("removing {} from client list", id);
clients.write().await.remove(&id);

let mut clients_guard = clients.write().await;
clients_guard.remove(&id);
drop(clients_guard);

closed_txs.write().await.put(id, ());
}
}
})
Expand Down
19 changes: 9 additions & 10 deletions query-engine/core/src/interactive_transactions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,17 @@ pub use messages::*;
/// TransactionActorManager, it looks for the client in the hashmap and passes the operation to the client. The ITXClient sends a message to the
/// ITXServer and waits for a response. The ITXServer will then perform the operation and return the result. The ITXServer will perform one
/// operation at a time. All other operations will sit in the message queue waiting to be processed.
/// The ITXServer will handle all messages until it transitions state, e.g "rollback" or "commit".

/// After that the ITXServer will move into the cache eviction state. In this state, the connection is closed, and any messages it receives, it will
/// will reply with its last state. i.e committed, rollbacked or timeout. The eviction state is there so that if a prisma wants to
// perform an action on a iTx that has completed it will get a better message rather than the error message that this transaction doesn't exist
///
/// The ITXServer will handle all messages until:
/// - It transitions state, e.g "rollback" or "commit"
/// - It exceeds its timeout, in which case the iTx is rolledback and the connection to the database is closed.

/// Once the eviction timeout is exceeded, the ITXServer will send a message to the Background Client list Actor to say that it is completed,
/// and the ITXServer will end. The Background Client list Actor removes the client from the list of clients that are active.
/// Once the ITXServer is done handling messages from the iTx Client, it sends a last message to the Background Client list Actor to say that it is completed and then shuts down.
/// The Background Client list Actor removes the client from the list of active clients and keeps in cache the iTx id of the closed transaction.

/// During the time the ITXServer is active there is a timer running and if that timeout is exceeded, the
/// transaction is rolledback and the connection to the database is closed. The ITXServer will then move into the eviction state.
///
/// We keep a list of closed transactions so that if any further messages are received for this iTx id,
/// the TransactionActorManager can reply with a helpful error message which explains that no operation can be performed on a closed transaction
/// rather than an error message stating that the transaction does not exist.

#[derive(Debug, Clone, Hash, Eq, PartialEq)]
pub struct TxId(String);
Expand Down