Skip to content

Commit

Permalink
Introduce stream write options
Browse files Browse the repository at this point in the history
Option to disable change tracking
  • Loading branch information
yevhen committed Jul 13, 2018
1 parent 08f59f6 commit c17d0f4
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 18 deletions.
32 changes: 25 additions & 7 deletions Source/Streamstone.Tests/Scenarios/Tracking_entity_changes.cs
Expand Up @@ -29,6 +29,29 @@ public void SetUp()
stream = Stream.ProvisionAsync(partition).GetAwaiter().GetResult();
}

[Test]
public void When_disabled()
{
var entity = new TestEntity(EntityRowKey, "*");

var insert = Include.Insert(entity);
var replace = Include.Replace(entity);

EventData[] events =
{
CreateEvent(insert),
CreateEvent(replace)
};

var options = new StreamWriteOptions {TrackChanges = false};

Assert.ThrowsAsync<StorageException>(() => Stream.WriteAsync(stream, options, events),
"Should fail since there conflicting operations");

var stored = RetrieveTestEntity(entity.RowKey);
Assert.That(stored, Is.Null, "Should not insert entity due to ETG failure");
}

[Test]
public void When_normal_flow()
{
Expand Down Expand Up @@ -613,12 +636,6 @@ TEntity RetrieveEntity<TEntity>(string rowKey)
TableQuery.GenerateFilterCondition("RowKey", QueryComparisons.Equal, rowKey));

return table.ExecuteQuery<TEntity>(filter).SingleOrDefault();
//return table.CreateQuery<TEntity>()
// .Where(x =>
// x.PartitionKey == partition.PartitionKey
// && x.RowKey == rowKey)
// .ToList()
// .SingleOrDefault();
}

static EventData CreateEvent(params Include[] includes)
Expand All @@ -631,10 +648,11 @@ public class TestEntity : TableEntity
public TestEntity()
{}

public TestEntity(string rowKey)
public TestEntity(string rowKey, string etag = null)
{
RowKey = rowKey;
Data = DateTime.UtcNow.ToString();
ETag = etag;
}

public string Data { get; set; }
Expand Down
79 changes: 74 additions & 5 deletions Source/Streamstone/Stream.Api.cs
Expand Up @@ -90,15 +90,47 @@ static Task<Stream> ProvisionAsync(Stream stream)
/// <exception cref="ConcurrencyConflictException">
/// If write operation has conflicts
/// </exception>
public static Task<StreamWriteResult> WriteAsync(Stream stream, params EventData[] events)
public static Task<StreamWriteResult> WriteAsync(Stream stream, params EventData[] events) =>
WriteAsync(stream, StreamWriteOptions.Default, events);

/// <summary>
/// Initiates an asynchronous operation that writes the given array of events to a stream using specified stream header.
/// </summary>
/// <param name="stream">The stream header.</param>
/// <param name="options">The stream write options.</param>
/// <param name="events">The events to write.</param>
/// <returns>
/// The promise, that wil eventually return the result of the stream write operation
/// containing updated stream header or will fail with exception
/// </returns>
/// <exception cref="ArgumentNullException">
/// If <paramref name="stream"/> is <c>null</c>
/// </exception>
/// <exception cref="ArgumentNullException">
/// If <paramref name="events"/> is <c>null</c>
/// </exception>
/// <exception cref="ArgumentOutOfRangeException">
/// If <paramref name="events"/> array is empty
/// </exception>
/// <exception cref="DuplicateEventException">
/// If event with the given id already exists in a storage
/// </exception>
/// <exception cref="IncludedOperationConflictException">
/// If included entity operation has conflicts
/// </exception>
/// <exception cref="ConcurrencyConflictException">
/// If write operation has conflicts
/// </exception>
public static Task<StreamWriteResult> WriteAsync(Stream stream, StreamWriteOptions options, params EventData[] events)
{
Requires.NotNull(stream, nameof(stream));
Requires.NotNull(events, nameof(events));
Requires.NotNull(stream, nameof(stream));
Requires.NotNull(options, nameof(options));
Requires.NotNull(events, nameof(events));

if (events.Length == 0)
throw new ArgumentOutOfRangeException("events", "Events have 0 items");

return new WriteOperation(stream, events).ExecuteAsync();
return new WriteOperation(stream, options, events).ExecuteAsync();
}

/// <summary>
Expand Down Expand Up @@ -133,6 +165,43 @@ public static Task<StreamWriteResult> WriteAsync(Stream stream, params EventData
/// If write operation has conflicts
/// </exception>
public static async Task<StreamWriteResult> WriteAsync(Partition partition, int expectedVersion, params EventData[] events)
{
return await WriteAsync(partition, StreamWriteOptions.Default, expectedVersion, events);
}

/// <summary>
/// Initiates an asynchronous operation that writes the given array of events to a partition using specified expected version.
/// </summary>
/// <remarks>For new stream specify expected version as 0</remarks>
/// <param name="partition">The partition.</param>
/// <param name="options">The stream write options.</param>
/// <param name="expectedVersion">The expected version of the stream.</param>
/// <param name="events">The events to write.</param>
/// <returns>
/// The result of the stream write operation containing updated stream header
/// </returns>
/// <exception cref="ArgumentNullException">
/// If <paramref name="partition"/> is <c>null</c>
/// </exception>
/// <exception cref="ArgumentNullException">
/// If <paramref name="events"/> is <c>null</c>
/// </exception>
/// <exception cref="ArgumentOutOfRangeException">
/// If <paramref name="expectedVersion"/> is less than 0
/// </exception>
/// <exception cref="ArgumentOutOfRangeException">
/// If <paramref name="events"/> array is empty
/// </exception>
/// <exception cref="DuplicateEventException">
/// If event with the given id already exists in a storage
/// </exception>
/// <exception cref="IncludedOperationConflictException">
/// If included entity operation has conflicts
/// </exception>
/// <exception cref="ConcurrencyConflictException">
/// If write operation has conflicts
/// </exception>
public static async Task<StreamWriteResult> WriteAsync(Partition partition, StreamWriteOptions options, int expectedVersion, params EventData[] events)
{
Requires.NotNull(partition, nameof(partition));
Requires.GreaterThanOrEqualToZero(expectedVersion, nameof(expectedVersion));
Expand All @@ -144,7 +213,7 @@ public static async Task<StreamWriteResult> WriteAsync(Partition partition, int
if (stream.Version != expectedVersion)
throw ConcurrencyConflictException.StreamChangedOrExists(partition);

return await WriteAsync(stream, events);
return await WriteAsync(stream, options, events);
}

/// <summary>
Expand Down
21 changes: 15 additions & 6 deletions Source/Streamstone/Stream.Operations.cs
Expand Up @@ -3,7 +3,6 @@
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Runtime.CompilerServices;
using System.Runtime.ExceptionServices;
using System.Threading.Tasks;

Expand Down Expand Up @@ -73,12 +72,14 @@ class WriteOperation
const int MaxOperationsPerChunk = 99;

readonly Stream stream;
readonly StreamWriteOptions options;
readonly CloudTable table;
readonly IEnumerable<RecordedEvent> events;

public WriteOperation(Stream stream, IEnumerable<EventData> events)
public WriteOperation(Stream stream, StreamWriteOptions options, IEnumerable<EventData> events)
{
this.stream = stream;
this.options = options;
this.events = stream.Record(events);
table = stream.Partition.Table;
}
Expand All @@ -89,7 +90,7 @@ public async Task<StreamWriteResult> ExecuteAsync()

foreach (var chunk in Chunks())
{
var batch = chunk.ToBatch(current);
var batch = chunk.ToBatch(current, options);

try
{
Expand Down Expand Up @@ -162,11 +163,11 @@ bool CanAccomodate(RecordedEvent @event)

public bool IsEmpty => events.Count == 0;

public Batch ToBatch(Stream stream)
public Batch ToBatch(Stream stream, StreamWriteOptions options)
{
var entity = stream.Entity();
entity.Version += events.Count;
return new Batch(entity, events);
return new Batch(entity, events, options);
}
}

Expand All @@ -177,12 +178,14 @@ class Batch

readonly StreamEntity stream;
readonly List<RecordedEvent> events;
readonly StreamWriteOptions options;
readonly Partition partition;

internal Batch(StreamEntity stream, List<RecordedEvent> events)
internal Batch(StreamEntity stream, List<RecordedEvent> events, StreamWriteOptions options)
{
this.stream = stream;
this.events = events;
this.options = options;
partition = stream.Partition;
}

Expand All @@ -201,6 +204,12 @@ internal TableBatchOperation Prepare()

void WriteIncludes()
{
if (!options.TrackChanges)
{
operations.AddRange(events.SelectMany(x => x.IncludedOperations));
return;
}

var tracker = new EntityChangeTracker();

foreach (var @event in events)
Expand Down
21 changes: 21 additions & 0 deletions Source/Streamstone/StreamWriteOptions.cs
@@ -0,0 +1,21 @@
namespace Streamstone
{
/// <summary>
/// Represent set of possible options which can be used with stream write operation
/// </summary>
public class StreamWriteOptions
{
internal static readonly StreamWriteOptions Default = new StreamWriteOptions();

/// <summary>
/// Signals whether built-in entity change tracking should be used for includes.
/// If set to <c>true</c> all included operations will be chained
/// via built-in change tracker. Default is <c>true</c>
/// </summary>
/// <remarks>
/// Entity change tracking is required during stream replays
/// but could be disabled during normal writes.
/// </remarks>
public bool TrackChanges { get; set; } = true;
}
}

0 comments on commit c17d0f4

Please sign in to comment.