diff --git a/releasenotes.md b/releasenotes.md index 2a2e80a7..d78edf7d 100644 --- a/releasenotes.md +++ b/releasenotes.md @@ -1,4 +1,6 @@ -## vNext +## 0.17.1 + +- Changed logic for MultiPartitionRead because it used index not global position and it is not correct. ## 0.17.0 diff --git a/src/NStore.BaseSqlPersistence/AbstractSqlPersistence.cs b/src/NStore.BaseSqlPersistence/AbstractSqlPersistence.cs index 60402ff4..8baaf505 100644 --- a/src/NStore.BaseSqlPersistence/AbstractSqlPersistence.cs +++ b/src/NStore.BaseSqlPersistence/AbstractSqlPersistence.cs @@ -398,16 +398,16 @@ protected async Task EnsureTable(CancellationToken cancellationToken) protected async Task ScanRange( IEnumerable partitionIdsList, - long lowerIndexInclusive, - long upperIndexInclusive, + long lowerPositionInclusive, + long upperPositionInclusive, bool descending, ISubscription subscription, CancellationToken cancellationToken) { var sql = Options.GetRangeMultiplePartitionSelectChunksSql( partitionIdsList, - lowerIndexInclusive: lowerIndexInclusive, - upperIndexInclusive: upperIndexInclusive, + lowerIndexInclusive: lowerPositionInclusive, + upperIndexInclusive: upperPositionInclusive, descending: descending); using (var context = await Options.GetContextAsync(cancellationToken).ConfigureAwait(false)) @@ -420,28 +420,28 @@ protected async Task EnsureTable(CancellationToken cancellationToken) context.AddParam(command, $"@p{i++}", id); } - if (lowerIndexInclusive > 0 && lowerIndexInclusive != Int64.MaxValue) + if (lowerPositionInclusive > 0 && lowerPositionInclusive != Int64.MaxValue) { - context.AddParam(command, "@lowerIndexInclusive", lowerIndexInclusive); + context.AddParam(command, "@lowerIndexInclusive", lowerPositionInclusive); } - if (upperIndexInclusive > 0 && upperIndexInclusive != Int64.MaxValue) + if (upperPositionInclusive > 0 && upperPositionInclusive != Int64.MaxValue) { - context.AddParam(command, "@upperIndexInclusive", upperIndexInclusive); + context.AddParam(command, "@upperIndexInclusive", upperPositionInclusive); } - await PushToSubscriber(command, descending ? upperIndexInclusive : lowerIndexInclusive, + await PushToSubscriber(command, descending ? upperPositionInclusive : lowerPositionInclusive, subscription, false, cancellationToken) .ConfigureAwait(false); } } } - public async Task ReadForwardMultiplePartitionsAsync( + public async Task ReadForwardMultiplePartitionsByGlobalPositionAsync( IEnumerable partitionIdsList, - long fromLowerIndexInclusive, + long fromLowerPositionInclusive, ISubscription subscription, - long toUpperIndexInclusive, + long toUpperPositionInclusive, CancellationToken cancellationToken) { if (partitionIdsList is null) @@ -456,8 +456,8 @@ protected async Task EnsureTable(CancellationToken cancellationToken) await ScanRange( partitionIdsList: partitionIdsList, - lowerIndexInclusive: fromLowerIndexInclusive, - upperIndexInclusive: toUpperIndexInclusive, + lowerPositionInclusive: fromLowerPositionInclusive, + upperPositionInclusive: toUpperPositionInclusive, @descending: false, subscription: subscription, diff --git a/src/NStore.Core/InMemory/InMemoryPartition.cs b/src/NStore.Core/InMemory/InMemoryPartition.cs index aa8b3f32..2df7abc7 100644 --- a/src/NStore.Core/InMemory/InMemoryPartition.cs +++ b/src/NStore.Core/InMemory/InMemoryPartition.cs @@ -45,7 +45,25 @@ public InMemoryPartition(string partitionId, INetworkSimulator networkSimulator, _lockSlim.ExitReadLock(); await PushToSubscriber(fromLowerIndexInclusive, subscription, result, cancellationToken).ConfigureAwait(false); } - + + public async Task ReadForwardByPosition( + long fromLowerPositionInclusive, + ISubscription subscription, + long toUpperPositionInclusive, + int limit, + CancellationToken cancellationToken) + { + _lockSlim.EnterReadLock(); + + var result = Chunks + .Where(x => x.Position >= fromLowerPositionInclusive && x.Position <= toUpperPositionInclusive) + .Take(limit) + .ToArray(); + + _lockSlim.ExitReadLock(); + await PushToSubscriber(fromLowerPositionInclusive, subscription, result, cancellationToken).ConfigureAwait(false); + } + public Task ReadBackward( long fromUpperIndexInclusive, ISubscription subscription, diff --git a/src/NStore.Core/InMemory/InMemoryPersistence.cs b/src/NStore.Core/InMemory/InMemoryPersistence.cs index 6cb06b7e..156a39f7 100644 --- a/src/NStore.Core/InMemory/InMemoryPersistence.cs +++ b/src/NStore.Core/InMemory/InMemoryPersistence.cs @@ -83,11 +83,11 @@ CancellationToken cancellationToken ); } - public async Task ReadForwardMultiplePartitionsAsync( + public async Task ReadForwardMultiplePartitionsByGlobalPositionAsync( IEnumerable partitionIdsList, - long fromLowerIndexInclusive, + long fromLowerPositionInclusive, ISubscription subscription, - long toUpperIndexInclusive, + long toUpperPositionInclusive, CancellationToken cancellationToken) { if (partitionIdsList is null) @@ -107,7 +107,7 @@ CancellationToken cancellationToken foreach (var partition in result) { - await partition.ReadForward(fromLowerIndexInclusive, subscription, toUpperIndexInclusive, Int32.MaxValue, cancellationToken); + await partition.ReadForwardByPosition(fromLowerPositionInclusive, subscription, toUpperPositionInclusive, Int32.MaxValue, cancellationToken); } } diff --git a/src/NStore.Core/Persistence/IMultiPartitionPersistenceReader.cs b/src/NStore.Core/Persistence/IMultiPartitionPersistenceReader.cs index eb9c9ed2..aae5408a 100644 --- a/src/NStore.Core/Persistence/IMultiPartitionPersistenceReader.cs +++ b/src/NStore.Core/Persistence/IMultiPartitionPersistenceReader.cs @@ -1,8 +1,6 @@ -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading.Tasks; +using System.Collections.Generic; using System.Threading; +using System.Threading.Tasks; namespace NStore.Core.Persistence { @@ -18,9 +16,9 @@ public interface IMultiPartitionPersistenceReader /// This is the same as but for multiple partitions. /// /// List of all partition id I want to read. - /// + /// Lower global position to perform a read (included) /// - /// + /// Upper global position to perform a read (included) /// /// /// We have ZERO guarantee on the order of partition list, we can only assume that @@ -42,11 +40,11 @@ public interface IMultiPartitionPersistenceReader /// return a different order. /// /// - Task ReadForwardMultiplePartitionsAsync( + Task ReadForwardMultiplePartitionsByGlobalPositionAsync( IEnumerable partitionIdsList, - long fromLowerIndexInclusive, + long fromLowerPositionInclusive, ISubscription subscription, - long toUpperIndexInclusive, + long toUpperPositionInclusive, CancellationToken cancellationToken ); } diff --git a/src/NStore.Core/Persistence/LogDecorator.cs b/src/NStore.Core/Persistence/LogDecorator.cs index 1bc54e98..b89ee7df 100644 --- a/src/NStore.Core/Persistence/LogDecorator.cs +++ b/src/NStore.Core/Persistence/LogDecorator.cs @@ -33,16 +33,16 @@ public LogDecorator(IPersistence persistence, INStoreLoggerFactory inStoreLogger _logger.LogDebug("End ReadPartitionForward(Partition {PartitionId}, from: {from})", partitionId, fromLowerIndexInclusive); } - public async Task ReadForwardMultiplePartitionsAsync( + public async Task ReadForwardMultiplePartitionsByGlobalPositionAsync( IEnumerable partitionIdsList, - long fromLowerIndexInclusive, + long fromLowerPositionInclusive, ISubscription subscription, - long toUpperIndexInclusive, + long toUpperPositionInclusive, CancellationToken cancellationToken) { - _logger.LogDebug("Start ReadForwardMultiplePartitionsAsync(Partition {PartitionId}, from: {from})", string.Join(",", partitionIdsList), fromLowerIndexInclusive); - await _persistence.ReadForwardMultiplePartitionsAsync(partitionIdsList, fromLowerIndexInclusive, subscription, toUpperIndexInclusive, cancellationToken).ConfigureAwait(false); - _logger.LogDebug("End ReadForwardMultiplePartitionsAsync(Partition {PartitionId}, from: {from})", string.Join(",", partitionIdsList), fromLowerIndexInclusive); + _logger.LogDebug("Start ReadForwardMultiplePartitionsByGlobalPositionAsync(Partition {PartitionId}, from: {from})", string.Join(",", partitionIdsList), fromLowerPositionInclusive); + await _persistence.ReadForwardMultiplePartitionsByGlobalPositionAsync(partitionIdsList, fromLowerPositionInclusive, subscription, toUpperPositionInclusive, cancellationToken).ConfigureAwait(false); + _logger.LogDebug("End ReadForwardMultiplePartitionsByGlobalPositionAsync(Partition {PartitionId}, from: {from})", string.Join(",", partitionIdsList), fromLowerPositionInclusive); } public async Task ReadBackwardAsync( diff --git a/src/NStore.Core/Persistence/NullPersistence.cs b/src/NStore.Core/Persistence/NullPersistence.cs index 55431080..9a48e939 100644 --- a/src/NStore.Core/Persistence/NullPersistence.cs +++ b/src/NStore.Core/Persistence/NullPersistence.cs @@ -61,11 +61,11 @@ CancellationToken cancellationToken return Task.CompletedTask; } - public Task ReadForwardMultiplePartitionsAsync( + public Task ReadForwardMultiplePartitionsByGlobalPositionAsync( IEnumerable partitionIdsList, - long fromLowerIndexInclusive, + long fromLowerPositionInclusive, ISubscription subscription, - long toUpperIndexInclusive, + long toUpperPositionInclusive, CancellationToken cancellationToken) { return Task.CompletedTask; diff --git a/src/NStore.Core/Persistence/PersistenceExtensions.cs b/src/NStore.Core/Persistence/PersistenceExtensions.cs index 9beefd01..34f07dcc 100644 --- a/src/NStore.Core/Persistence/PersistenceExtensions.cs +++ b/src/NStore.Core/Persistence/PersistenceExtensions.cs @@ -26,17 +26,17 @@ ISubscription subscription ); } - public static Task ReadForwardMultiplePartitionsAsync( + public static Task ReadForwardMultiplePartitionsByGlobalPositionAsync( this IMultiPartitionPersistenceReader persistence, IEnumerable partitionIdsList, ISubscription subscription ) { - return persistence.ReadForwardMultiplePartitionsAsync( + return persistence.ReadForwardMultiplePartitionsByGlobalPositionAsync( partitionIdsList: partitionIdsList, - fromLowerIndexInclusive: 0, + fromLowerPositionInclusive: 0, subscription: subscription, - toUpperIndexInclusive: long.MaxValue, + toUpperPositionInclusive: long.MaxValue, cancellationToken: CancellationToken.None ); } @@ -65,11 +65,11 @@ ISubscription subscription ISubscription subscription ) { - return persistence.ReadForwardMultiplePartitionsAsync( + return persistence.ReadForwardMultiplePartitionsByGlobalPositionAsync( partitionIdsList: partitionIdsList, - fromLowerIndexInclusive: fromLowerIndexInclusive, + fromLowerPositionInclusive: fromLowerIndexInclusive, subscription: subscription, - toUpperIndexInclusive: long.MaxValue, + toUpperPositionInclusive: long.MaxValue, cancellationToken: CancellationToken.None ); } @@ -100,11 +100,11 @@ long toUpperIndexInclusive long toUpperIndexInclusive ) { - return persistence.ReadForwardMultiplePartitionsAsync( + return persistence.ReadForwardMultiplePartitionsByGlobalPositionAsync( partitionIdsList, - fromLowerIndexInclusive: fromLowerIndexInclusive, + fromLowerPositionInclusive: fromLowerIndexInclusive, subscription: subscription, - toUpperIndexInclusive: toUpperIndexInclusive, + toUpperPositionInclusive: toUpperIndexInclusive, cancellationToken: CancellationToken.None ); } diff --git a/src/NStore.Core/Persistence/ProfileDecorator.cs b/src/NStore.Core/Persistence/ProfileDecorator.cs index 9aae375c..9a468428 100644 --- a/src/NStore.Core/Persistence/ProfileDecorator.cs +++ b/src/NStore.Core/Persistence/ProfileDecorator.cs @@ -58,11 +58,11 @@ public ProfileDecorator(IPersistence persistence) )).ConfigureAwait(false); } - public async Task ReadForwardMultiplePartitionsAsync( + public async Task ReadForwardMultiplePartitionsByGlobalPositionAsync( IEnumerable partitionIdsList, - long fromLowerIndexInclusive, + long fromLowerPositionInclusive, ISubscription subscription, - long toUpperIndexInclusive, + long toUpperPositionInclusive, CancellationToken cancellationToken) { var counter = new SubscriptionWrapper(subscription) @@ -71,11 +71,11 @@ public ProfileDecorator(IPersistence persistence) }; await ReadForwardCounter.CaptureAsync(() => - _persistence.ReadForwardMultiplePartitionsAsync( + _persistence.ReadForwardMultiplePartitionsByGlobalPositionAsync( partitionIdsList, - fromLowerIndexInclusive, + fromLowerPositionInclusive, counter, - toUpperIndexInclusive, + toUpperPositionInclusive, cancellationToken )).ConfigureAwait(false); } diff --git a/src/NStore.Persistence.LiteDB/LiteDBPersistence.cs b/src/NStore.Persistence.LiteDB/LiteDBPersistence.cs index b4171b00..ec7a4cab 100644 --- a/src/NStore.Persistence.LiteDB/LiteDBPersistence.cs +++ b/src/NStore.Persistence.LiteDB/LiteDBPersistence.cs @@ -51,21 +51,21 @@ await PublishAsync(chunks, fromLowerIndexInclusive, subscription, false, cancell .ConfigureAwait(false); } - public async Task ReadForwardMultiplePartitionsAsync( + public async Task ReadForwardMultiplePartitionsByGlobalPositionAsync( IEnumerable partitionIdsList, - long fromLowerIndexInclusive, + long fromLowerPositionInclusive, ISubscription subscription, - long toUpperIndexInclusive, + long toUpperPositionInclusive, CancellationToken cancellationToken) { var chunks = _streams.Query() .Where(x => partitionIdsList.Contains(x.PartitionId) - && x.Index >= fromLowerIndexInclusive - && x.Index <= toUpperIndexInclusive) - .OrderBy(x => x.Index) + && x.Position >= fromLowerPositionInclusive + && x.Position <= toUpperPositionInclusive) + .OrderBy(x => x.Position) .ToList(); - await PublishAsync(chunks, fromLowerIndexInclusive, subscription, false, cancellationToken) + await PublishAsync(chunks, fromLowerPositionInclusive, subscription, false, cancellationToken) .ConfigureAwait(false); } diff --git a/src/NStore.Persistence.Mongo/MongoPersistence.cs b/src/NStore.Persistence.Mongo/MongoPersistence.cs index 5f58d8dc..3b765d0e 100644 --- a/src/NStore.Persistence.Mongo/MongoPersistence.cs +++ b/src/NStore.Persistence.Mongo/MongoPersistence.cs @@ -143,24 +143,24 @@ CancellationToken cancellationToken cancellationToken).ConfigureAwait(false); } - public async Task ReadForwardMultiplePartitionsAsync( + public async Task ReadForwardMultiplePartitionsByGlobalPositionAsync( IEnumerable partitionIdsList, - long fromLowerIndexInclusive, + long fromLowerPositionInclusive, ISubscription subscription, - long toUpperIndexInclusive, + long toUpperPositionInclusive, CancellationToken cancellationToken) { var filter = Builders.Filter.And( Builders.Filter.In(x => x.PartitionId, partitionIdsList), - Builders.Filter.Gte(x => x.Index, fromLowerIndexInclusive), - Builders.Filter.Lte(x => x.Index, toUpperIndexInclusive) + Builders.Filter.Gte(x => x.Position, fromLowerPositionInclusive), + Builders.Filter.Lte(x => x.Position, toUpperPositionInclusive) ); - var sort = Builders.Sort.Ascending(x => x.Index); + var sort = Builders.Sort.Ascending(x => x.Position); var options = new FindOptions() { Sort = sort }; await PushToSubscriber( - fromLowerIndexInclusive, + fromLowerPositionInclusive, subscription, options, filter, diff --git a/src/NStore.Persistence.MsSql/MsSqlPersistenceOptions.cs b/src/NStore.Persistence.MsSql/MsSqlPersistenceOptions.cs index bf35a7f9..4628d008 100644 --- a/src/NStore.Persistence.MsSql/MsSqlPersistenceOptions.cs +++ b/src/NStore.Persistence.MsSql/MsSqlPersistenceOptions.cs @@ -221,15 +221,15 @@ ORDER BY if (lowerIndexInclusive > 0 && lowerIndexInclusive != Int64.MinValue) { - sb.Append("AND [Index] >= @lowerIndexInclusive "); + sb.Append("AND [Position] >= @lowerIndexInclusive "); } if (upperIndexInclusive > 0 && upperIndexInclusive != Int64.MaxValue) { - sb.Append("AND [Index] <= @upperIndexInclusive "); + sb.Append("AND [Position] <= @upperIndexInclusive "); } - sb.Append(@descending ? "ORDER BY [Index] DESC" : "ORDER BY [Index]"); + sb.Append(@descending ? "ORDER BY [Position] DESC" : "ORDER BY [Position]"); return sb.ToString(); } diff --git a/src/NStore.Persistence.Sqlite/SqlitePersistenceOptions.cs b/src/NStore.Persistence.Sqlite/SqlitePersistenceOptions.cs index 65df86a4..1f9cf437 100644 --- a/src/NStore.Persistence.Sqlite/SqlitePersistenceOptions.cs +++ b/src/NStore.Persistence.Sqlite/SqlitePersistenceOptions.cs @@ -182,15 +182,15 @@ [Position] DESC if (lowerIndexInclusive > 0 && lowerIndexInclusive != Int64.MinValue) { - sb.Append("AND [Index] >= @lowerIndexInclusive "); + sb.Append("AND [Position] >= @lowerIndexInclusive "); } if (upperIndexInclusive > 0 && upperIndexInclusive != Int64.MaxValue) { - sb.Append("AND [Index] <= @upperIndexInclusive "); + sb.Append("AND [Position] <= @upperIndexInclusive "); } - sb.Append(descending ? "ORDER BY [Index] DESC" : "ORDER BY [Index]"); + sb.Append(descending ? "ORDER BY [Position] DESC" : "ORDER BY [Position]"); return sb.ToString(); } diff --git a/src/NStore.Persistence.Tests/PersistenceFixture.cs b/src/NStore.Persistence.Tests/PersistenceFixture.cs index 946e70ca..42992005 100644 --- a/src/NStore.Persistence.Tests/PersistenceFixture.cs +++ b/src/NStore.Persistence.Tests/PersistenceFixture.cs @@ -1033,7 +1033,7 @@ public MultiPartitionBasicRead() public async Task read_multiple_partition() { var tape = new Recorder(); - await Store.ReadForwardMultiplePartitionsAsync(new[] { "mbpra", "mbprb" }, 1, tape, Int32.MaxValue, CancellationToken.None); + await Store.ReadForwardMultiplePartitionsByGlobalPositionAsync(new[] { "mbpra", "mbprb" }, 1, tape, Int32.MaxValue, CancellationToken.None); AssertForBasicRead(tape, 5); } @@ -1044,7 +1044,7 @@ public async Task read_no_partitions() var tape = new Recorder(); //Read empty list - await Store.ReadForwardMultiplePartitionsAsync(Array.Empty(), 1, tape, Int32.MaxValue, CancellationToken.None); + await Store.ReadForwardMultiplePartitionsByGlobalPositionAsync(Array.Empty(), 1, tape, Int32.MaxValue, CancellationToken.None); Assert.Empty(tape.Chunks); } @@ -1053,7 +1053,7 @@ public async Task read_no_partitions() public async Task read_with_extensions() { var tape = new Recorder(); - await Store.ReadForwardMultiplePartitionsAsync(new[] { "mbpra", "mbprb" }, tape); + await Store.ReadForwardMultiplePartitionsByGlobalPositionAsync(new[] { "mbpra", "mbprb" }, tape); AssertForBasicRead(tape, 5); } @@ -1062,7 +1062,7 @@ public async Task read_with_extensions() public async Task read_multiple_partition_can_read_single_partition() { var tape = new Recorder(); - await Store.ReadForwardMultiplePartitionsAsync(new[] { "mbpra" }, 1, tape, Int32.MaxValue, CancellationToken.None); + await Store.ReadForwardMultiplePartitionsByGlobalPositionAsync(new[] { "mbpra" }, 1, tape, Int32.MaxValue, CancellationToken.None); Assert.Equal(3, tape.Chunks.Count()); //we could not assume ordering, but clearly b1, is less than b2. @@ -1085,26 +1085,27 @@ public async Task read_multiple_partition_can_read_single_partition() public async Task read_limit_version() { var tape = new Recorder(); - await Store.ReadForwardMultiplePartitionsAsync(new[] { "mbpra", "mbprb" }, 1, tape, 2, CancellationToken.None); + await Store.ReadForwardMultiplePartitionsByGlobalPositionAsync(new[] { "mbpra", "mbprb" }, 1, tape, 4, CancellationToken.None); AssertForBasicRead(tape, 4); } [Fact] - public async Task read_not_from_first_version() + public async Task read_single_checkpoint() { var tape = new Recorder(); - await Store.ReadForwardMultiplePartitionsAsync(new[] { "mbpra", "mbprb" }, 2, tape, 2, CancellationToken.None); + await Store.ReadForwardMultiplePartitionsByGlobalPositionAsync(new[] { "mbpra", "mbprb" }, 2, tape, 2, CancellationToken.None); - AssertForBasicRead(tape, 2); + AssertForBasicRead(tape, 1); } [Fact] public async Task read_non_exiting_partition() { var tape = new Recorder(); - await Store.ReadForwardMultiplePartitionsAsync(new[] { "mbpra", "does-not-exists" }, 2, tape, 2, CancellationToken.None); + await Store.ReadForwardMultiplePartitionsByGlobalPositionAsync(new[] { "mbpra", "does-not-exists" }, 1, tape, 2, CancellationToken.None); + //read only single chunk of mbpra because the second chunk is from mbprb AssertForBasicRead(tape, 1); } @@ -1112,7 +1113,7 @@ public async Task read_non_exiting_partition() public async Task read_multiple_partition_can_read_no_partition() { var tape = new Recorder(); - await Store.ReadForwardMultiplePartitionsAsync(Array.Empty(), 1, tape, Int32.MaxValue, CancellationToken.None); + await Store.ReadForwardMultiplePartitionsByGlobalPositionAsync(Array.Empty(), 1, tape, Int32.MaxValue, CancellationToken.None); Assert.Empty(tape.Chunks); } diff --git a/src/NStore.Tpl/PersistenceBatchAppendDecorator.cs b/src/NStore.Tpl/PersistenceBatchAppendDecorator.cs index 9713a36d..0ca0cb85 100644 --- a/src/NStore.Tpl/PersistenceBatchAppendDecorator.cs +++ b/src/NStore.Tpl/PersistenceBatchAppendDecorator.cs @@ -60,18 +60,18 @@ public PersistenceBatchAppendDecorator(IPersistence persistence, int batchSize, toUpperIndexInclusive, limit, cancellationToken); } - public Task ReadForwardMultiplePartitionsAsync( + public Task ReadForwardMultiplePartitionsByGlobalPositionAsync( IEnumerable partitionIdsList, - long fromLowerIndexInclusive, + long fromLowerPositionInclusive, ISubscription subscription, - long toUpperIndexInclusive, + long toUpperPositionInclusive, CancellationToken cancellationToken) { - return _persistence.ReadForwardMultiplePartitionsAsync( + return _persistence.ReadForwardMultiplePartitionsByGlobalPositionAsync( partitionIdsList, - fromLowerIndexInclusive, + fromLowerPositionInclusive, subscription, - toUpperIndexInclusive, + toUpperPositionInclusive, cancellationToken); }