Skip to content
Permalink
Browse files

Recycle NpgsqlTransaction

Since PostgreSQL only supports on transaction on a given connection,
we reycle the NpgsqlTransaction object.

Closes #2416
  • Loading branch information...
roji committed Apr 10, 2019
1 parent be1d89c commit 7b8e117049f381c7130192ffd7fb4f608076d524
@@ -531,7 +531,8 @@ public new NpgsqlTransaction BeginTransaction(IsolationLevel level)
if (connector.InTransaction)
throw new InvalidOperationException("A transaction is already in progress; nested/concurrent transactions aren't supported.");

return new NpgsqlTransaction(this, level);
connector.Transaction.Init(level);
return connector.Transaction;
}
}

@@ -101,21 +101,11 @@ sealed partial class NpgsqlConnector : IDisposable
internal TransactionStatus TransactionStatus { get; set; }

/// <summary>
/// The transaction currently in progress, if any.
/// A transaction object for this connector. Since only one transaction can be in progress at any given time,
/// this instance is recycled. To check whether a transaction is currently in progress on this connector,
/// see <see cref="TransactionStatus"/>.
/// </summary>
/// <remarks>
/// <para>
/// Note that this doesn't mean a transaction request has actually been sent to the backend - for
/// efficiency we defer sending the request to the first query after BeginTransaction is called.
/// See <see cref="TransactionStatus"/> for the actual transaction status.
/// </para>
/// <para>
/// Also, the user can initiate a transaction in SQL (i.e. BEGIN), in which case there will be no
/// NpgsqlTransaction instance. As a result, never check <see cref="Transaction"/> to know whether
/// a transaction is in progress, check <see cref="TransactionStatus"/> instead.
/// </para>
/// </remarks>
internal NpgsqlTransaction? Transaction { get; set; }
internal NpgsqlTransaction Transaction { get; }

/// <summary>
/// The NpgsqlConnection that (currently) owns this connector. Null if the connector isn't
@@ -271,6 +261,8 @@ internal NpgsqlConnector(NpgsqlConnection connection)
ConnectionString = connectionString;
PostgresParameters = new Dictionary<string, string>();
SqlParser = new SqlQueryParser(UseConformantStrings);
Transaction = new NpgsqlTransaction(this);

CancelLock = new object();

_isKeepAliveEnabled = Settings.KeepAlive > 0;
@@ -1146,32 +1138,25 @@ internal bool InTransaction
/// </summary>
void ProcessNewTransactionStatus(TransactionStatus newStatus)
{
if (newStatus == TransactionStatus) { return; }
if (newStatus == TransactionStatus)
return;

switch (newStatus) {
case TransactionStatus.Idle:
ClearTransaction();
break;
case TransactionStatus.InTransactionBlock:
case TransactionStatus.InFailedTransactionBlock:
break;
TransactionStatus = newStatus;
return;
case TransactionStatus.Pending:
throw new Exception("Invalid TransactionStatus (should be frontend-only)");
default:
throw new InvalidOperationException($"Internal Npgsql bug: unexpected value {newStatus} of enum {nameof(TransactionStatus)}. Please file a bug.");
}
TransactionStatus = newStatus;
}

void ClearTransaction()
{
if (TransactionStatus == TransactionStatus.Idle) { return; }
// We may not have an NpgsqlTransaction for the transaction (i.e. user executed BEGIN)
if (Transaction != null)
{
Transaction.Clear();
Transaction = null;
}
Transaction.DisposeImmediately();
TransactionStatus = TransactionStatus.Idle;
}

@@ -1514,6 +1499,7 @@ internal void Reset()
case TransactionStatus.InTransactionBlock:
case TransactionStatus.InFailedTransactionBlock:
Rollback(false);
ClearTransaction();
break;
default:
throw new InvalidOperationException($"Internal Npgsql bug: unexpected value {TransactionStatus} of enum {nameof(TransactionStatus)}. Please file a bug.");
@@ -20,25 +20,31 @@ public sealed class NpgsqlTransaction : DbTransaction
/// Specifies the <see cref="NpgsqlConnection"/> object associated with the transaction.
/// </summary>
/// <value>The <see cref="NpgsqlConnection"/> object associated with the transaction.</value>
public new NpgsqlConnection Connection { get; internal set; }
public new NpgsqlConnection? Connection
{
get
{
CheckReady();
return _connector.Connection;
}
}

// Note that with ambient transactions, it's possible for a transaction to be pending after its connection
// is already closed. So we capture the connector and perform everything directly on it.
NpgsqlConnector _connector;
readonly NpgsqlConnector _connector;

/// <summary>
/// Specifies the completion state of the transaction.
/// Specifies the <see cref="NpgsqlConnection"/> object associated with the transaction.
/// </summary>
/// <value>The completion state of the transaction.</value>
public bool IsCompleted => _connector == null;
/// <value>The <see cref="NpgsqlConnection"/> object associated with the transaction.</value>
protected override DbConnection? DbConnection => Connection;

/// <summary>
/// Specifies the <see cref="NpgsqlConnection"/> object associated with the transaction.
/// If true, the transaction has been committed/rolled back, but not disposed.
/// </summary>
/// <value>The <see cref="NpgsqlConnection"/> object associated with the transaction.</value>
protected override DbConnection DbConnection => Connection;
internal bool IsCompleted => _connector.TransactionStatus == TransactionStatus.Idle;

bool _isDisposed;
internal bool IsDisposed;

/// <summary>
/// Specifies the <see cref="System.Data.IsolationLevel">IsolationLevel</see> for this transaction.
@@ -53,55 +59,54 @@ public override IsolationLevel IsolationLevel
return _isolationLevel;
}
}
readonly IsolationLevel _isolationLevel;
IsolationLevel _isolationLevel;

static readonly NpgsqlLogger Log = NpgsqlLogManager.GetCurrentClassLogger();

const IsolationLevel DefaultIsolationLevel = IsolationLevel.ReadCommitted;

#endregion

#region Constructors
#region Initialization

internal NpgsqlTransaction(NpgsqlConnection conn, IsolationLevel isolationLevel = DefaultIsolationLevel)
internal NpgsqlTransaction(NpgsqlConnector connector)
=> _connector = connector;

internal void Init(IsolationLevel isolationLevel = DefaultIsolationLevel)
{
Debug.Assert(conn != null);
Debug.Assert(isolationLevel != IsolationLevel.Chaos);

Connection = conn;
_connector = Connection.CheckReadyAndGetConnector();

if (!_connector.DatabaseInfo.SupportsTransactions)
return;

Log.Debug($"Beginning transaction with isolation level {isolationLevel}", _connector.Id);
_connector.Transaction = this;
_connector.TransactionStatus = TransactionStatus.Pending;

switch (isolationLevel) {
case IsolationLevel.RepeatableRead:
case IsolationLevel.Snapshot:
_connector.PrependInternalMessage(PregeneratedMessages.BeginTransRepeatableRead, 2);
break;
case IsolationLevel.Serializable:
_connector.PrependInternalMessage(PregeneratedMessages.BeginTransSerializable, 2);
break;
case IsolationLevel.ReadUncommitted:
// PG doesn't really support ReadUncommitted, it's the same as ReadCommitted. But we still
// send as if.
_connector.PrependInternalMessage(PregeneratedMessages.BeginTransReadUncommitted, 2);
break;
case IsolationLevel.ReadCommitted:
_connector.PrependInternalMessage(PregeneratedMessages.BeginTransReadCommitted, 2);
break;
case IsolationLevel.Unspecified:
isolationLevel = DefaultIsolationLevel;
goto case DefaultIsolationLevel;
default:
throw new NotSupportedException("Isolation level not supported: " + isolationLevel);
switch (isolationLevel)
{
case IsolationLevel.RepeatableRead:
case IsolationLevel.Snapshot:
_connector.PrependInternalMessage(PregeneratedMessages.BeginTransRepeatableRead, 2);
break;
case IsolationLevel.Serializable:
_connector.PrependInternalMessage(PregeneratedMessages.BeginTransSerializable, 2);
break;
case IsolationLevel.ReadUncommitted:
// PG doesn't really support ReadUncommitted, it's the same as ReadCommitted. But we still
// send as if.
_connector.PrependInternalMessage(PregeneratedMessages.BeginTransReadUncommitted, 2);
break;
case IsolationLevel.ReadCommitted:
_connector.PrependInternalMessage(PregeneratedMessages.BeginTransReadCommitted, 2);
break;
case IsolationLevel.Unspecified:
isolationLevel = DefaultIsolationLevel;
goto case DefaultIsolationLevel;
default:
throw new NotSupportedException("Isolation level not supported: " + isolationLevel);
}

_connector.TransactionStatus = TransactionStatus.Pending;
_isolationLevel = isolationLevel;
IsDisposed = false;
}

#endregion
@@ -124,7 +129,6 @@ async Task Commit(bool async)
{
Log.Debug("Committing transaction", _connector.Id);
await _connector.ExecuteInternalCommand(PregeneratedMessages.CommitTransaction, async);
Clear();
}
}

@@ -155,7 +159,6 @@ async Task Rollback(bool async)
if (!_connector.DatabaseInfo.SupportsTransactions)
return;
await _connector.Rollback(async);
Clear();
}

/// <summary>
@@ -291,50 +294,36 @@ public Task ReleaseAsync(string name, CancellationToken cancellationToken = defa
/// </summary>
protected override void Dispose(bool disposing)
{
if (_isDisposed) { return; }
if (IsDisposed)
return;

if (disposing && !IsCompleted)
{
_connector.CloseOngoingOperations();
Rollback();
}

Clear();

base.Dispose(disposing);
_isDisposed = true;
IsDisposed = true;
}

#pragma warning disable CS8625
internal void Clear()
{
_connector = null;
Connection = null;
}
#pragma warning enable CS8625
/// <summary>
/// Disposes the transaction, without rolling back. Used only in special circumstances, e.g. when
/// the connection is broken.
/// </summary>
internal void DisposeImmediately() => IsDisposed = true;

#endregion

#region Checks

void CheckReady()
{
CheckDisposed();
CheckCompleted();
}

void CheckCompleted()
{
if (IsDisposed)
throw new ObjectDisposedException(typeof(NpgsqlTransaction).Name);
if (IsCompleted)
throw new InvalidOperationException("This NpgsqlTransaction has completed; it is no longer usable.");
}

void CheckDisposed()
{
if (_isDisposed)
throw new ObjectDisposedException(typeof(NpgsqlTransaction).Name);
}

#endregion
}
}
@@ -19,6 +19,9 @@ public void Commit()
conn.ExecuteNonQuery("INSERT INTO data (name) VALUES ('X')", tx: tx);
tx.Commit();
Assert.That(conn.ExecuteScalar("SELECT COUNT(*) FROM data"), Is.EqualTo(1));
Assert.That(() => tx.Connection, Throws.Exception.TypeOf<InvalidOperationException>());
tx.Dispose();
Assert.That(() => tx.Connection, Throws.Exception.TypeOf<ObjectDisposedException>());
}
}

@@ -32,6 +35,9 @@ public async Task CommitAsync()
conn.ExecuteNonQuery("INSERT INTO data (name) VALUES ('X')", tx: tx);
await tx.CommitAsync();
Assert.That(conn.ExecuteScalar("SELECT COUNT(*) FROM data"), Is.EqualTo(1));
Assert.That(() => tx.Connection, Throws.Exception.TypeOf<InvalidOperationException>());
tx.Dispose();
Assert.That(() => tx.Connection, Throws.Exception.TypeOf<ObjectDisposedException>());
}
}

@@ -50,6 +56,9 @@ public void Rollback([Values(PrepareOrNot.NotPrepared, PrepareOrNot.Prepared)] P
tx.Rollback();
Assert.That(tx.IsCompleted);
Assert.That(conn.ExecuteScalar("SELECT COUNT(*) FROM data"), Is.EqualTo(0));
Assert.That(() => tx.Connection, Throws.Exception.TypeOf<InvalidOperationException>());
tx.Dispose();
Assert.That(() => tx.Connection, Throws.Exception.TypeOf<ObjectDisposedException>());
}
}

@@ -68,6 +77,9 @@ public async Task RollbackAsync([Values(PrepareOrNot.NotPrepared, PrepareOrNot.P
await tx.RollbackAsync();
Assert.That(tx.IsCompleted);
Assert.That(conn.ExecuteScalar("SELECT COUNT(*) FROM data"), Is.EqualTo(0));
Assert.That(() => tx.Connection, Throws.Exception.TypeOf<InvalidOperationException>());
tx.Dispose();
Assert.That(() => tx.Connection, Throws.Exception.TypeOf<ObjectDisposedException>());
}
}

@@ -80,7 +92,6 @@ public void RollbackOnDispose()
var tx = conn.BeginTransaction();
conn.ExecuteNonQuery("INSERT INTO data (name) VALUES ('X')", tx: tx);
tx.Dispose();
Assert.That(tx.IsCompleted);
Assert.That(conn.ExecuteScalar("SELECT COUNT(*) FROM data"), Is.EqualTo(0));
}
}
@@ -100,8 +111,8 @@ public void RollbackOnClose()
tx = conn2.BeginTransaction();
conn2.ExecuteNonQuery($"INSERT INTO {tableName} (name) VALUES ('X')", tx);
}
Assert.That(tx.IsCompleted);
Assert.That(conn1.ExecuteScalar($"SELECT COUNT(*) FROM {tableName}"), Is.EqualTo(0));
Assert.That(() => tx.Connection, Throws.Exception.TypeOf<ObjectDisposedException>());
conn1.ExecuteNonQuery($"DROP TABLE {tableName}");
}
}

0 comments on commit 7b8e117

Please sign in to comment.
You can’t perform that action at this time.