Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transaction lifetimes. #612

Merged
merged 11 commits into from
May 13, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ internal static class ExceptionManager
{ typeof(TokenExpiredException), "TokenExpiredError" },
{ typeof(ConnectionReadTimeoutException), "ConnectionReadTimeoutError"},
{ typeof(InvalidBookmarkException), "InvalidBookmarkError"},
{ typeof(TransactionClosedException), "ClientError"},

{ typeof(NotSupportedException), "NotSupportedException" },

Expand All @@ -58,11 +59,12 @@ internal static ProtocolResponse GenerateExceptionResponse(Exception ex)
{
string outerExceptionMessage = ex.Message;
string exceptionMessage = (ex.InnerException != null) ? ex.InnerException.Message : ex.Message;

var type = TypeMap.GetValueOrDefault(ex.GetType());


//if (ex is Neo4jException || ex is NotSupportedException)
if(type is not null)
if (type is not null)
{
ProtocolException newError = ProtocolObjectFactory.CreateObject<ProtocolException>();
newError.ExceptionObj = ex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ public override async Task Process(Controller controller)
{
var sessionContainer = (NewSession)ObjManager.GetObject(data.sessionId);

await sessionContainer.Session.ReadTransactionAsync(async tx =>
await sessionContainer.Session.ExecuteReadAsync(async tx =>
{
sessionContainer.SetupRetryAbleState(NewSession.SessionState.RetryAbleNothing);

TransactionId = controller.TransactionManager.AddTransaction(new TransactionWrapper(tx, async cursor =>
TransactionId = controller.TransactionManager.AddTransaction(new TransactionWrapper(tx as IAsyncTransaction, async cursor =>
{
var result = ProtocolObjectFactory.CreateObject<Result>();
await result.PopulateRecords(cursor).ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ public override async Task Process(Controller controller)
{
var sessionContainer = (NewSession)ObjManager.GetObject(data.sessionId);

await sessionContainer.Session.WriteTransactionAsync(async tx =>
await sessionContainer.Session.ExecuteWriteAsync(async tx =>
{
sessionContainer.SetupRetryAbleState(NewSession.SessionState.RetryAbleNothing);

TransactionId = controller.TransactionManager.AddTransaction(new TransactionWrapper(tx, async cursor =>
TransactionId = controller.TransactionManager.AddTransaction(new TransactionWrapper(tx as IAsyncTransaction, async cursor =>
{
var result = ProtocolObjectFactory.CreateObject<Result>();
await result.PopulateRecords(cursor).ConfigureAwait(false);
Expand Down Expand Up @@ -59,7 +59,7 @@ public override string Respond()
{
var sessionContainer = (NewSession)ObjManager.GetObject(data.sessionId);

if(sessionContainer.RetryState == NewSession.SessionState.RetryAbleNothing)
if (sessionContainer.RetryState == NewSession.SessionState.RetryAbleNothing)
throw new ArgumentException("Should never hit this code with a RetryAbleNothing");

else if(sessionContainer.RetryState == NewSession.SessionState.RetryAbleNegative)
Expand Down
2 changes: 1 addition & 1 deletion Neo4j.Driver/Neo4j.Driver.Tests/TransactionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public async void ShouldThrowExceptionIfPreviousTxFailed()
await tx.MarkToClose();

var error = await ExceptionAsync(() => tx.RunAsync("ttt"));
error.Should().BeOfType<ClientException>();
error.Should().BeOfType<TransactionClosedException>();
}

[Fact]
Expand Down
4 changes: 4 additions & 0 deletions Neo4j.Driver/Neo4j.Driver/IAsyncQueryRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public interface IAsyncQueryRunner : IAsyncDisposable, IDisposable
/// </summary>
/// <param name="query">A Cypher query.</param>
/// <returns>A task of a stream of result values and associated metadata.</returns>
/// <exception cref="TransactionClosedException">>Thrown when used in a transaction that has previously been closed.</exception>
Task<IResultCursor> RunAsync(string query);

/// <summary>
Expand All @@ -49,6 +50,7 @@ public interface IAsyncQueryRunner : IAsyncDisposable, IDisposable
/// <param name="query">A Cypher query.</param>
/// <param name="parameters">A parameter dictionary which is made of prop.Name=prop.Value pairs would be created.</param>
/// <returns>A task of a stream of result values and associated metadata.</returns>
/// <exception cref="TransactionClosedException">>Thrown when used in a transaction that has previously been closed.</exception>
Task<IResultCursor> RunAsync(string query, object parameters);

/// <summary>
Expand All @@ -64,6 +66,7 @@ public interface IAsyncQueryRunner : IAsyncDisposable, IDisposable
/// <param name="query">A Cypher query.</param>
/// <param name="parameters">Input parameters for the query.</param>
/// <returns>A task of a stream of result values and associated metadata.</returns>
/// <exception cref="TransactionClosedException">>Thrown when used in a transaction that has previously been closed.</exception>
Task<IResultCursor> RunAsync(string query, IDictionary<string, object> parameters);

/// <summary>
Expand All @@ -73,6 +76,7 @@ public interface IAsyncQueryRunner : IAsyncDisposable, IDisposable
/// </summary>
/// <param name="query">A Cypher query, <see cref="Query"/>.</param>
/// <returns>A task of a stream of result values and associated metadata.</returns>
/// <exception cref="TransactionClosedException">>Thrown when used in a transaction that has previously been closed.</exception>
Task<IResultCursor> RunAsync(Query query);
}
}
76 changes: 44 additions & 32 deletions Neo4j.Driver/Neo4j.Driver/IAsyncSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,72 +65,84 @@ public interface IAsyncSession : IAsyncQueryRunner
Task<IAsyncTransaction> BeginTransactionAsync(Action<TransactionConfigBuilder> action);

/// <summary>
/// Asynchronously execute given unit of work in a <see cref="AccessMode.Read"/> transaction.
/// Asynchronously execute given unit of work in a <see cref="AccessMode.Read"/> transaction with a specific <see cref="TransactionConfig"/>.
/// </summary>
/// <typeparam name="T">The return type of the given unit of work.</typeparam>
/// <param name="work">The <see cref="Func{ITransactionAsync, T}"/> to be applied to a new read transaction.</param>
/// <param name="action">Given a <see cref="TransactionConfigBuilder"/>, defines how to set the configurations for the new transaction.
/// This configuration overrides server side default transaction configurations. See <see cref="TransactionConfig"/></param>
/// <returns>A task of a result as returned by the given unit of work.</returns>
Task<T> ReadTransactionAsync<T>(Func<IAsyncTransaction, Task<T>> work);
[Obsolete("Deprecated, Use ExecuteReadAsync. Will be removed in 6.0.")]
Task<T> ReadTransactionAsync<T>(Func<IAsyncTransaction, Task<T>> work, Action<TransactionConfigBuilder> action = null);

/// <summary>
/// Asynchronously execute given unit of work in a <see cref="AccessMode.Read"/> transaction.
/// Asynchronously execute given unit of work in a <see cref="AccessMode.Read"/> transaction with a specific <see cref="TransactionConfig"/>.
/// </summary>
/// <param name="work">The <see cref="Func{ITransactionAsync, Task}"/> to be applied to a new read transaction.</param>
/// <param name="action">Given a <see cref="TransactionConfigBuilder"/>, defines how to set the configurations for the new transaction.
/// This configuration overrides server side default transaction configurations. See <see cref="TransactionConfig"/></param>
/// <returns>A task representing the completion of the transactional read operation enclosing the given unit of work.</returns>
Task ReadTransactionAsync(Func<IAsyncTransaction, Task> work);
[Obsolete("Deprecated, Use ExecuteReadAsync. Will be removed in 6.0.")]
Task ReadTransactionAsync(Func<IAsyncTransaction, Task> work, Action<TransactionConfigBuilder> action = null);

/// <summary>
/// Asynchronously execute given unit of work in a <see cref="AccessMode.Read"/> transaction with a specific <see cref="TransactionConfig"/>.
/// Asynchronously execute given unit of work in a <see cref="AccessMode.Write"/> transaction with a specific <see cref="TransactionConfig"/>.
/// </summary>
/// <typeparam name="T">The return type of the given unit of work.</typeparam>
/// <param name="work">The <see cref="Func{ITransactionAsync, T}"/> to be applied to a new read transaction.</param>
/// <param name="work">The <see cref="Func{ITransactionAsync, T}"/> to be applied to a new write transaction.</param>
/// <param name="action">Given a <see cref="TransactionConfigBuilder"/>, defines how to set the configurations for the new transaction.
/// This configuration overrides server side default transaction configurations. See <see cref="TransactionConfig"/></param>
/// <returns>A task of a result as returned by the given unit of work.</returns>
Task<T> ReadTransactionAsync<T>(Func<IAsyncTransaction, Task<T>> work, Action<TransactionConfigBuilder> action);
[Obsolete("Deprecated, Use ExecuteWriteAsync. Will be removed in 6.0.")]
Task<T> WriteTransactionAsync<T>(Func<IAsyncTransaction, Task<T>> work, Action<TransactionConfigBuilder> action = null);

/// <summary>
/// Asynchronously execute given unit of work in a <see cref="AccessMode.Read"/> transaction with a specific <see cref="TransactionConfig"/>.
/// Asynchronously execute given unit of work in a <see cref="AccessMode.Write"/> transaction with a specific <see cref="TransactionConfig"/>.
/// </summary>
/// <param name="work">The <see cref="Func{ITransactionAsync, Task}"/> to be applied to a new read transaction.</param>
/// <param name="work">The <see cref="Func{ITransactionAsync, Task}"/> to be applied to a new write transaction.</param>
/// <param name="action">Given a <see cref="TransactionConfigBuilder"/>, defines how to set the configurations for the new transaction.
/// This configuration overrides server side default transaction configurations. See <see cref="TransactionConfig"/></param>
/// <returns>A task representing the completion of the transactional read operation enclosing the given unit of work.</returns>
Task ReadTransactionAsync(Func<IAsyncTransaction, Task> work, Action<TransactionConfigBuilder> action);
/// <returns>A task representing the completion of the transactional write operation enclosing the given unit of work.</returns>
[Obsolete("Deprecated, Use ExecuteWriteAsync. Will be removed in 6.0.")]
Task WriteTransactionAsync(Func<IAsyncTransaction, Task> work, Action<TransactionConfigBuilder> action = null);

/// <summary>
/// Asynchronously execute given unit of work in a <see cref="AccessMode.Write"/> transaction.
/// Asynchronously execute given unit of work as a transaction with a specific <see cref="TransactionConfig"/>.
/// </summary>
/// <typeparam name="T">The return type of the given unit of work.</typeparam>
/// <param name="work">The <see cref="Func{ITransactionAsync, T}"/> to be applied to a new write transaction.</param>
/// <returns>A task of a result as returned by the given unit of work.</returns>
Task<T> WriteTransactionAsync<T>(Func<IAsyncTransaction, Task<T>> work);
/// <typeparam name="TResult"></typeparam>
/// <param name="work">The <see cref="Func{IAsyncQueryRunner, Task}"/> to be applied to a new read transaction.</param>
/// <param name="action">Given a <see cref="TransactionConfigBuilder"/>, defines how to set the configurations for the new transaction.
/// This configuration overrides server side default transaction configurations. See <see cref="TransactionConfig"/></param>
/// <returns>A task that represents the asynchronous execution operation.</returns>
Task<TResult> ExecuteReadAsync<TResult>(Func<IAsyncQueryRunner, Task<TResult>> work, Action<TransactionConfigBuilder> action = null);

/// <summary>
/// Asynchronously execute given unit of work in a <see cref="AccessMode.Write"/> transaction.
/// Asynchronously execute given unit of work as a transaction with a specific <see cref="TransactionConfig"/>.
/// </summary>
/// <param name="work">The <see cref="Func{ITransactionAsync, Task}"/> to be applied to a new write transaction.</param>
/// <returns>A task representing the completion of the transactional write operation enclosing the given unit of work.</returns>
Task WriteTransactionAsync(Func<IAsyncTransaction, Task> work);
/// <param name="work">The <see cref="Func{IAsyncQueryRunner, Task}"/> to be applied to a new read transaction.</param>
/// <param name="action">Given a <see cref="TransactionConfigBuilder"/>, defines how to set the configurations for the new transaction.
/// This configuration overrides server side default transaction configurations. See <see cref="TransactionConfig"/></param>
/// <returns>A task that represents the asynchronous execution operation.</returns>
Task ExecuteReadAsync(Func<IAsyncQueryRunner, Task> work, Action<TransactionConfigBuilder> action = null);

/// <summary>
/// Asynchronously execute given unit of work in a <see cref="AccessMode.Write"/> transaction with a specific <see cref="TransactionConfig"/>.
/// Asynchronously execute given unit of work as a transaction with a specific <see cref="TransactionConfig"/>.
/// </summary>
/// <typeparam name="T">The return type of the given unit of work.</typeparam>
/// <param name="work">The <see cref="Func{ITransactionAsync, T}"/> to be applied to a new write transaction.</param>
/// <typeparam name="TResult"></typeparam>
/// <param name="work">The <see cref="Func{IAsyncQueryRunner, Task}"/> to be applied to a new write transaction.</param>
/// <param name="action">Given a <see cref="TransactionConfigBuilder"/>, defines how to set the configurations for the new transaction.
/// This configuration overrides server side default transaction configurations. See <see cref="TransactionConfig"/></param>
/// <returns>A task of a result as returned by the given unit of work.</returns>
Task<T> WriteTransactionAsync<T>(Func<IAsyncTransaction, Task<T>> work, Action<TransactionConfigBuilder> action);
/// <returns>A task that represents the asynchronous execution operation.</returns>
Task<TResult> ExecuteWriteAsync<TResult>(Func<IAsyncQueryRunner, Task<TResult>> work, Action<TransactionConfigBuilder> action = null);

/// <summary>
/// Asynchronously execute given unit of work in a <see cref="AccessMode.Write"/> transaction with a specific <see cref="TransactionConfig"/>.
/// Asynchronously execute given unit of work as a transaction with a specific <see cref="TransactionConfig"/>.
/// </summary>
/// <param name="work">The <see cref="Func{ITransactionAsync, Task}"/> to be applied to a new write transaction.</param>
/// <param name="work">The <see cref="Func{IAsyncQueryRunner, Task}"/> to be applied to a new write transaction.</param>
/// <param name="action">Given a <see cref="TransactionConfigBuilder"/>, defines how to set the configurations for the new transaction.
/// This configuration overrides server side default transaction configurations. See <see cref="TransactionConfig"/></param>
/// <returns>A task representing the completion of the transactional write operation enclosing the given unit of work.</returns>
Task WriteTransactionAsync(Func<IAsyncTransaction, Task> work, Action<TransactionConfigBuilder> action);
/// <returns>A task that represents the asynchronous execution operation.</returns>
Task ExecuteWriteAsync(Func<IAsyncQueryRunner, Task> work, Action<TransactionConfigBuilder> action = null);

/// <summary>
/// Close all resources used in this Session. If any transaction is left open in this session without commit or rollback,
Expand Down Expand Up @@ -164,7 +176,7 @@ public interface IAsyncSession : IAsyncQueryRunner
/// <param name="query">A Cypher query.</param>
/// <param name="action">Given a <see cref="TransactionConfigBuilder"/>, defines how to set the configurations for the new transaction.</param>
/// <returns>A task of a stream of result values and associated metadata.</returns>
Task<IResultCursor> RunAsync(string query, Action<TransactionConfigBuilder> action);
Task<IResultCursor> RunAsync(string query, Action<TransactionConfigBuilder> action = null);

/// <summary>
///
Expand All @@ -181,7 +193,7 @@ public interface IAsyncSession : IAsyncQueryRunner
/// <param name="action">Given a <see cref="TransactionConfigBuilder"/>, defines how to set the configurations for the new transaction.</param>
/// <returns>A task of a stream of result values and associated metadata.</returns>
Task<IResultCursor> RunAsync(string query, IDictionary<string, object> parameters,
Action<TransactionConfigBuilder> action);
Action<TransactionConfigBuilder> action = null);

/// <summary>
///
Expand All @@ -192,7 +204,7 @@ public interface IAsyncSession : IAsyncQueryRunner
/// <param name="action">Given a <see cref="TransactionConfigBuilder"/>, defines how to set the configurations for the new transaction.
/// </param>
/// <returns>A task of a stream of result values and associated metadata.</returns>
Task<IResultCursor> RunAsync(Query query, Action<TransactionConfigBuilder> action);
Task<IResultCursor> RunAsync(Query query, Action<TransactionConfigBuilder> action = null);

/// <summary>
/// Gets the session configurations back
Expand Down
2 changes: 2 additions & 0 deletions Neo4j.Driver/Neo4j.Driver/IAsyncTransaction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ public interface IAsyncTransaction : IAsyncQueryRunner
/// Asynchronously commit this transaction.
/// </summary>
/// <returns>A task of transaction commit.</returns>
/// <exception cref="TransactionClosedException">Thrown when the transaction has previously been closed.</exception>
Task CommitAsync();

/// <summary>
/// Asynchronously roll back this transaction.
/// </summary>
/// <returns>A task of transaction rollback.</returns>
/// <exception cref="TransactionClosedException">>Thrown when the transaction has previously been closed.</exception>
Task RollbackAsync();

/// <summary>
Expand Down
8 changes: 3 additions & 5 deletions Neo4j.Driver/Neo4j.Driver/Internal/AsyncQueryRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Neo4j.Driver.Internal.Logging;
using Neo4j.Driver;

namespace Neo4j.Driver.Internal
{
Expand Down Expand Up @@ -71,16 +69,16 @@ protected virtual void Dispose(bool disposing)

public async ValueTask DisposeAsync()
{
await DisposeAsyncCore();
await DisposeAsyncCore().ConfigureAwait(false);

Dispose(disposing: false);
GC.SuppressFinalize(this);
}

protected virtual async ValueTask DisposeAsyncCore()
protected virtual ValueTask DisposeAsyncCore()
{
//Nothing to dispose of in this class. Methods required for derived classes and correct dispose pattern
await Task.CompletedTask;
return new ValueTask(Task.CompletedTask);
}
}

Expand Down