Skip to content

Commit

Permalink
Changed logic of multiaggregate persistence read.
Browse files Browse the repository at this point in the history
Previous logic is based on aggregate index, and it is absolutely
not useful, it is logic to use only the global position to perform
a multi partition read.
  • Loading branch information
alkampfergit committed Jan 4, 2024
1 parent d894cd8 commit 7923bef
Show file tree
Hide file tree
Showing 15 changed files with 109 additions and 90 deletions.
4 changes: 3 additions & 1 deletion releasenotes.md
@@ -1,4 +1,6 @@
## vNext
## 0.17.1

- Changed logic for MultiPartitionRead because it used index not global position and it is not correct.

## 0.17.0

Expand Down
28 changes: 14 additions & 14 deletions src/NStore.BaseSqlPersistence/AbstractSqlPersistence.cs
Expand Up @@ -398,16 +398,16 @@ protected async Task EnsureTable(CancellationToken cancellationToken)

protected async Task ScanRange(
IEnumerable<string> partitionIdsList,
long lowerIndexInclusive,
long upperIndexInclusive,
long lowerPositionInclusive,
long upperPositionInclusive,
bool descending,
ISubscription subscription,
CancellationToken cancellationToken)
{
var sql = Options.GetRangeMultiplePartitionSelectChunksSql(
partitionIdsList,
lowerIndexInclusive: lowerIndexInclusive,
upperIndexInclusive: upperIndexInclusive,
lowerIndexInclusive: lowerPositionInclusive,
upperIndexInclusive: upperPositionInclusive,
descending: descending);

using (var context = await Options.GetContextAsync(cancellationToken).ConfigureAwait(false))
Expand All @@ -420,28 +420,28 @@ protected async Task EnsureTable(CancellationToken cancellationToken)
context.AddParam(command, $"@p{i++}", id);
}

if (lowerIndexInclusive > 0 && lowerIndexInclusive != Int64.MaxValue)
if (lowerPositionInclusive > 0 && lowerPositionInclusive != Int64.MaxValue)
{
context.AddParam(command, "@lowerIndexInclusive", lowerIndexInclusive);
context.AddParam(command, "@lowerIndexInclusive", lowerPositionInclusive);
}

if (upperIndexInclusive > 0 && upperIndexInclusive != Int64.MaxValue)
if (upperPositionInclusive > 0 && upperPositionInclusive != Int64.MaxValue)
{
context.AddParam(command, "@upperIndexInclusive", upperIndexInclusive);
context.AddParam(command, "@upperIndexInclusive", upperPositionInclusive);
}

await PushToSubscriber(command, descending ? upperIndexInclusive : lowerIndexInclusive,
await PushToSubscriber(command, descending ? upperPositionInclusive : lowerPositionInclusive,
subscription, false, cancellationToken)
.ConfigureAwait(false);
}
}
}

public async Task ReadForwardMultiplePartitionsAsync(
public async Task ReadForwardMultiplePartitionsByGlobalPositionAsync(
IEnumerable<string> partitionIdsList,
long fromLowerIndexInclusive,
long fromLowerPositionInclusive,
ISubscription subscription,
long toUpperIndexInclusive,
long toUpperPositionInclusive,
CancellationToken cancellationToken)
{
if (partitionIdsList is null)
Expand All @@ -456,8 +456,8 @@ protected async Task EnsureTable(CancellationToken cancellationToken)

await ScanRange(
partitionIdsList: partitionIdsList,
lowerIndexInclusive: fromLowerIndexInclusive,
upperIndexInclusive: toUpperIndexInclusive,
lowerPositionInclusive: fromLowerPositionInclusive,
upperPositionInclusive: toUpperPositionInclusive,

@descending: false,
subscription: subscription,
Expand Down
20 changes: 19 additions & 1 deletion src/NStore.Core/InMemory/InMemoryPartition.cs
Expand Up @@ -45,7 +45,25 @@ public InMemoryPartition(string partitionId, INetworkSimulator networkSimulator,
_lockSlim.ExitReadLock();
await PushToSubscriber(fromLowerIndexInclusive, subscription, result, cancellationToken).ConfigureAwait(false);
}


public async Task ReadForwardByPosition(
long fromLowerPositionInclusive,
ISubscription subscription,
long toUpperPositionInclusive,
int limit,
CancellationToken cancellationToken)
{
_lockSlim.EnterReadLock();

var result = Chunks
.Where(x => x.Position >= fromLowerPositionInclusive && x.Position <= toUpperPositionInclusive)
.Take(limit)
.ToArray();

_lockSlim.ExitReadLock();
await PushToSubscriber(fromLowerPositionInclusive, subscription, result, cancellationToken).ConfigureAwait(false);
}

public Task ReadBackward(
long fromUpperIndexInclusive,
ISubscription subscription,
Expand Down
8 changes: 4 additions & 4 deletions src/NStore.Core/InMemory/InMemoryPersistence.cs
Expand Up @@ -83,11 +83,11 @@ CancellationToken cancellationToken
);
}

public async Task ReadForwardMultiplePartitionsAsync(
public async Task ReadForwardMultiplePartitionsByGlobalPositionAsync(
IEnumerable<string> partitionIdsList,
long fromLowerIndexInclusive,
long fromLowerPositionInclusive,
ISubscription subscription,
long toUpperIndexInclusive,
long toUpperPositionInclusive,
CancellationToken cancellationToken)
{
if (partitionIdsList is null)
Expand All @@ -107,7 +107,7 @@ CancellationToken cancellationToken

foreach (var partition in result)
{
await partition.ReadForward(fromLowerIndexInclusive, subscription, toUpperIndexInclusive, Int32.MaxValue, cancellationToken);
await partition.ReadForwardByPosition(fromLowerPositionInclusive, subscription, toUpperPositionInclusive, Int32.MaxValue, cancellationToken);
}
}

Expand Down
16 changes: 7 additions & 9 deletions src/NStore.Core/Persistence/IMultiPartitionPersistenceReader.cs
@@ -1,8 +1,6 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace NStore.Core.Persistence
{
Expand All @@ -18,9 +16,9 @@ public interface IMultiPartitionPersistenceReader
/// This is the same as <see cref="IPersistence.ReadForwardAsync"/> but for multiple partitions.
/// </summary>
/// <param name="partitionIdsList">List of all partition id I want to read.</param>
/// <param name="fromLowerIndexInclusive"></param>
/// <param name="fromLowerPositionInclusive">Lower global position to perform a read (included)</param>
/// <param name="subscription"></param>
/// <param name="toUpperIndexInclusive"></param>
/// <param name="toUpperPositionInclusive">Upper global position to perform a read (included)</param>
/// <param name="cancellationToken"></param>
/// <remarks><para>
/// We have ZERO guarantee on the order of partition list, we can only assume that
Expand All @@ -42,11 +40,11 @@ public interface IMultiPartitionPersistenceReader
/// return a different order.
/// </para>
/// </remarks>
Task ReadForwardMultiplePartitionsAsync(
Task ReadForwardMultiplePartitionsByGlobalPositionAsync(
IEnumerable<string> partitionIdsList,
long fromLowerIndexInclusive,
long fromLowerPositionInclusive,
ISubscription subscription,
long toUpperIndexInclusive,
long toUpperPositionInclusive,
CancellationToken cancellationToken
);
}
Expand Down
12 changes: 6 additions & 6 deletions src/NStore.Core/Persistence/LogDecorator.cs
Expand Up @@ -33,16 +33,16 @@ public LogDecorator(IPersistence persistence, INStoreLoggerFactory inStoreLogger
_logger.LogDebug("End ReadPartitionForward(Partition {PartitionId}, from: {from})", partitionId, fromLowerIndexInclusive);
}

public async Task ReadForwardMultiplePartitionsAsync(
public async Task ReadForwardMultiplePartitionsByGlobalPositionAsync(
IEnumerable<string> partitionIdsList,
long fromLowerIndexInclusive,
long fromLowerPositionInclusive,
ISubscription subscription,
long toUpperIndexInclusive,
long toUpperPositionInclusive,
CancellationToken cancellationToken)
{
_logger.LogDebug("Start ReadForwardMultiplePartitionsAsync(Partition {PartitionId}, from: {from})", string.Join(",", partitionIdsList), fromLowerIndexInclusive);
await _persistence.ReadForwardMultiplePartitionsAsync(partitionIdsList, fromLowerIndexInclusive, subscription, toUpperIndexInclusive, cancellationToken).ConfigureAwait(false);
_logger.LogDebug("End ReadForwardMultiplePartitionsAsync(Partition {PartitionId}, from: {from})", string.Join(",", partitionIdsList), fromLowerIndexInclusive);
_logger.LogDebug("Start ReadForwardMultiplePartitionsByGlobalPositionAsync(Partition {PartitionId}, from: {from})", string.Join(",", partitionIdsList), fromLowerPositionInclusive);
await _persistence.ReadForwardMultiplePartitionsByGlobalPositionAsync(partitionIdsList, fromLowerPositionInclusive, subscription, toUpperPositionInclusive, cancellationToken).ConfigureAwait(false);
_logger.LogDebug("End ReadForwardMultiplePartitionsByGlobalPositionAsync(Partition {PartitionId}, from: {from})", string.Join(",", partitionIdsList), fromLowerPositionInclusive);
}

public async Task ReadBackwardAsync(
Expand Down
6 changes: 3 additions & 3 deletions src/NStore.Core/Persistence/NullPersistence.cs
Expand Up @@ -61,11 +61,11 @@ CancellationToken cancellationToken
return Task.CompletedTask;
}

public Task ReadForwardMultiplePartitionsAsync(
public Task ReadForwardMultiplePartitionsByGlobalPositionAsync(
IEnumerable<string> partitionIdsList,
long fromLowerIndexInclusive,
long fromLowerPositionInclusive,
ISubscription subscription,
long toUpperIndexInclusive,
long toUpperPositionInclusive,
CancellationToken cancellationToken)
{
return Task.CompletedTask;
Expand Down
20 changes: 10 additions & 10 deletions src/NStore.Core/Persistence/PersistenceExtensions.cs
Expand Up @@ -26,17 +26,17 @@ ISubscription subscription
);
}

public static Task ReadForwardMultiplePartitionsAsync(
public static Task ReadForwardMultiplePartitionsByGlobalPositionAsync(
this IMultiPartitionPersistenceReader persistence,
IEnumerable<string> partitionIdsList,
ISubscription subscription
)
{
return persistence.ReadForwardMultiplePartitionsAsync(
return persistence.ReadForwardMultiplePartitionsByGlobalPositionAsync(
partitionIdsList: partitionIdsList,
fromLowerIndexInclusive: 0,
fromLowerPositionInclusive: 0,
subscription: subscription,
toUpperIndexInclusive: long.MaxValue,
toUpperPositionInclusive: long.MaxValue,
cancellationToken: CancellationToken.None
);
}
Expand Down Expand Up @@ -65,11 +65,11 @@ ISubscription subscription
ISubscription subscription
)
{
return persistence.ReadForwardMultiplePartitionsAsync(
return persistence.ReadForwardMultiplePartitionsByGlobalPositionAsync(
partitionIdsList: partitionIdsList,
fromLowerIndexInclusive: fromLowerIndexInclusive,
fromLowerPositionInclusive: fromLowerIndexInclusive,
subscription: subscription,
toUpperIndexInclusive: long.MaxValue,
toUpperPositionInclusive: long.MaxValue,
cancellationToken: CancellationToken.None
);
}
Expand Down Expand Up @@ -100,11 +100,11 @@ long toUpperIndexInclusive
long toUpperIndexInclusive
)
{
return persistence.ReadForwardMultiplePartitionsAsync(
return persistence.ReadForwardMultiplePartitionsByGlobalPositionAsync(
partitionIdsList,
fromLowerIndexInclusive: fromLowerIndexInclusive,
fromLowerPositionInclusive: fromLowerIndexInclusive,
subscription: subscription,
toUpperIndexInclusive: toUpperIndexInclusive,
toUpperPositionInclusive: toUpperIndexInclusive,
cancellationToken: CancellationToken.None
);
}
Expand Down
12 changes: 6 additions & 6 deletions src/NStore.Core/Persistence/ProfileDecorator.cs
Expand Up @@ -58,11 +58,11 @@ public ProfileDecorator(IPersistence persistence)
)).ConfigureAwait(false);
}

public async Task ReadForwardMultiplePartitionsAsync(
public async Task ReadForwardMultiplePartitionsByGlobalPositionAsync(
IEnumerable<string> partitionIdsList,
long fromLowerIndexInclusive,
long fromLowerPositionInclusive,
ISubscription subscription,
long toUpperIndexInclusive,
long toUpperPositionInclusive,
CancellationToken cancellationToken)
{
var counter = new SubscriptionWrapper(subscription)
Expand All @@ -71,11 +71,11 @@ public ProfileDecorator(IPersistence persistence)
};

await ReadForwardCounter.CaptureAsync(() =>
_persistence.ReadForwardMultiplePartitionsAsync(
_persistence.ReadForwardMultiplePartitionsByGlobalPositionAsync(
partitionIdsList,
fromLowerIndexInclusive,
fromLowerPositionInclusive,
counter,
toUpperIndexInclusive,
toUpperPositionInclusive,
cancellationToken
)).ConfigureAwait(false);
}
Expand Down
14 changes: 7 additions & 7 deletions src/NStore.Persistence.LiteDB/LiteDBPersistence.cs
Expand Up @@ -51,21 +51,21 @@ await PublishAsync(chunks, fromLowerIndexInclusive, subscription, false, cancell
.ConfigureAwait(false);
}

public async Task ReadForwardMultiplePartitionsAsync(
public async Task ReadForwardMultiplePartitionsByGlobalPositionAsync(
IEnumerable<string> partitionIdsList,
long fromLowerIndexInclusive,
long fromLowerPositionInclusive,
ISubscription subscription,
long toUpperIndexInclusive,
long toUpperPositionInclusive,
CancellationToken cancellationToken)
{
var chunks = _streams.Query()
.Where(x => partitionIdsList.Contains(x.PartitionId)
&& x.Index >= fromLowerIndexInclusive
&& x.Index <= toUpperIndexInclusive)
.OrderBy(x => x.Index)
&& x.Position >= fromLowerPositionInclusive
&& x.Position <= toUpperPositionInclusive)
.OrderBy(x => x.Position)
.ToList();

await PublishAsync(chunks, fromLowerIndexInclusive, subscription, false, cancellationToken)
await PublishAsync(chunks, fromLowerPositionInclusive, subscription, false, cancellationToken)
.ConfigureAwait(false);
}

Expand Down
14 changes: 7 additions & 7 deletions src/NStore.Persistence.Mongo/MongoPersistence.cs
Expand Up @@ -143,24 +143,24 @@ CancellationToken cancellationToken
cancellationToken).ConfigureAwait(false);
}

public async Task ReadForwardMultiplePartitionsAsync(
public async Task ReadForwardMultiplePartitionsByGlobalPositionAsync(
IEnumerable<string> partitionIdsList,
long fromLowerIndexInclusive,
long fromLowerPositionInclusive,
ISubscription subscription,
long toUpperIndexInclusive,
long toUpperPositionInclusive,
CancellationToken cancellationToken)
{
var filter = Builders<TChunk>.Filter.And(
Builders<TChunk>.Filter.In(x => x.PartitionId, partitionIdsList),
Builders<TChunk>.Filter.Gte(x => x.Index, fromLowerIndexInclusive),
Builders<TChunk>.Filter.Lte(x => x.Index, toUpperIndexInclusive)
Builders<TChunk>.Filter.Gte(x => x.Position, fromLowerPositionInclusive),
Builders<TChunk>.Filter.Lte(x => x.Position, toUpperPositionInclusive)
);

var sort = Builders<TChunk>.Sort.Ascending(x => x.Index);
var sort = Builders<TChunk>.Sort.Ascending(x => x.Position);
var options = new FindOptions<TChunk>() { Sort = sort };

await PushToSubscriber(
fromLowerIndexInclusive,
fromLowerPositionInclusive,
subscription,
options,
filter,
Expand Down
6 changes: 3 additions & 3 deletions src/NStore.Persistence.MsSql/MsSqlPersistenceOptions.cs
Expand Up @@ -221,15 +221,15 @@ ORDER BY

if (lowerIndexInclusive > 0 && lowerIndexInclusive != Int64.MinValue)
{
sb.Append("AND [Index] >= @lowerIndexInclusive ");
sb.Append("AND [Position] >= @lowerIndexInclusive ");
}

if (upperIndexInclusive > 0 && upperIndexInclusive != Int64.MaxValue)
{
sb.Append("AND [Index] <= @upperIndexInclusive ");
sb.Append("AND [Position] <= @upperIndexInclusive ");
}

sb.Append(@descending ? "ORDER BY [Index] DESC" : "ORDER BY [Index]");
sb.Append(@descending ? "ORDER BY [Position] DESC" : "ORDER BY [Position]");

return sb.ToString();
}
Expand Down
6 changes: 3 additions & 3 deletions src/NStore.Persistence.Sqlite/SqlitePersistenceOptions.cs
Expand Up @@ -182,15 +182,15 @@ [Position] DESC

if (lowerIndexInclusive > 0 && lowerIndexInclusive != Int64.MinValue)
{
sb.Append("AND [Index] >= @lowerIndexInclusive ");
sb.Append("AND [Position] >= @lowerIndexInclusive ");
}

if (upperIndexInclusive > 0 && upperIndexInclusive != Int64.MaxValue)
{
sb.Append("AND [Index] <= @upperIndexInclusive ");
sb.Append("AND [Position] <= @upperIndexInclusive ");
}

sb.Append(descending ? "ORDER BY [Index] DESC" : "ORDER BY [Index]");
sb.Append(descending ? "ORDER BY [Position] DESC" : "ORDER BY [Position]");

return sb.ToString();
}
Expand Down

0 comments on commit 7923bef

Please sign in to comment.