Skip to content

Commit

Permalink
Update documentation. Make exceptions serializable. Unseal OffsetMana…
Browse files Browse the repository at this point in the history
…ger. Update packages.
  • Loading branch information
tautvydasversockas committed Jun 15, 2022
1 parent 43645db commit 3a8c66e
Show file tree
Hide file tree
Showing 8 changed files with 138 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,12 @@
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
<PackageReference Include="coverlet.collector" Version="3.1.0">
<PackageReference Include="coverlet.collector" Version="3.1.2">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
</ItemGroup>

<ItemGroup>
<Using Include="FluentAssertions" />
<Using Include="Xunit" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Kafka.OffsetManagement\Kafka.OffsetManagement.csproj" />
</ItemGroup>
Expand Down
24 changes: 19 additions & 5 deletions Kafka.OffsetManagement.Tests/OffsetManagerTests.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
namespace Kafka.OffsetManagement.Tests;
using FluentAssertions;
using Xunit;

namespace Kafka.OffsetManagement.Tests;

public sealed class OffsetManagerTests
{
Expand All @@ -13,8 +16,8 @@ public sealed class OffsetManagerTests
[InlineData(new long[] { }, new long[] { }, null)]
public async Task Getting_commitable_offset(long[] offsets, long[] ackOffsets, long? expectedOffset)
{
// Arrange
using var sut = new OffsetManager(100);

var offsetAckIds = new Dictionary<long, AckId>();

foreach (var offset in offsets)
Expand All @@ -23,19 +26,22 @@ public async Task Getting_commitable_offset(long[] offsets, long[] ackOffsets, l
foreach (var offset in ackOffsets)
sut.Ack(offsetAckIds[offset]);

// Act
var commitOffset = sut.GetCommitOffset();

// Assert
commitOffset.Should().Be(expectedOffset);
}

[Fact]
public async Task Getting_out_of_order_offset_ack_id()
{
// Arrange
using var sut = new OffsetManager(100);
await sut.GetAckIdAsync(5);

// Act
KafkaOffsetManagementException? ex = null;

try
{
await sut.GetAckIdAsync(4);
Expand All @@ -45,28 +51,34 @@ public async Task Getting_out_of_order_offset_ack_id()
ex = e;
}

// Assert
ex.Should().NotBeNull();
ex?.ErrorCode.Should().Be(KafkaOffsetManagementErrorCode.OffsetOutOfOrder);
}

[Fact]
public void Marking_offset_as_acked()
{
// Arrange
using var sut = new OffsetManager(100);

sut.MarkAsAcked(5);

// Act
var commitOffset = sut.GetCommitOffset();

// Assert
commitOffset.Should().Be(6);
}

[Fact]
public async Task Marking_out_of_order_offset_as_acked()
{
// Arrange
using var sut = new OffsetManager(100);
await sut.GetAckIdAsync(5);

// Act
KafkaOffsetManagementException? ex = null;

try
{
sut.MarkAsAcked(4);
Expand All @@ -76,6 +88,8 @@ public async Task Marking_out_of_order_offset_as_acked()
ex = e;
}

// Assert
ex.Should().NotBeNull();
ex?.ErrorCode.Should().Be(KafkaOffsetManagementErrorCode.OffsetOutOfOrder);
}
}
9 changes: 9 additions & 0 deletions Kafka.OffsetManagement/AckId.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
namespace Kafka.OffsetManagement;

/// <summary>
/// Acknowledgement ID.
/// </summary>
public struct AckId : IEquatable<AckId>
{
/// <summary>
/// Acknowledgement ID value.
/// </summary>
public int Value { get; }

/// <summary>
/// Creates acknowledgement ID based on integer value.
/// </summary>
public AckId(int value)
{
Value = value;
Expand Down
2 changes: 1 addition & 1 deletion Kafka.OffsetManagement/Kafka.OffsetManagement.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<PropertyGroup>
<TargetFrameworks>netstandard2.0;net6.0</TargetFrameworks>
<PackageId>Kafka.OffsetManager</PackageId>
<Version>2.2.0</Version>
<Version>2.2.1</Version>
<Authors>Tautvydas Versockas</Authors>
<Description>Kafka.OffsetManager - thread safe Kafka offset manager for parallel data processing</Description>
<PackageRequireLicenseAcceptance>false</PackageRequireLicenseAcceptance>
Expand Down
10 changes: 10 additions & 0 deletions Kafka.OffsetManagement/KafkaOffsetManagementErrorCode.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
namespace Kafka.OffsetManagement;

/// <summary>
/// Offset management error code.
/// </summary>
public enum KafkaOffsetManagementErrorCode
{
/// <summary>
/// No error.
/// </summary>
NoError = 0,

/// <summary>
/// Offset is out of order.
/// </summary>
OffsetOutOfOrder = 1
}
10 changes: 10 additions & 0 deletions Kafka.OffsetManagement/KafkaOffsetManagementException.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
namespace Kafka.OffsetManagement;

/// <summary>
/// Offset management exception.
/// </summary>
[Serializable]
public class KafkaOffsetManagementException : Exception
{
/// <summary>
/// Returns offset out of order exception.
/// </summary>
public static KafkaOffsetManagementException OffsetOutOfOrder(string message) =>
new(KafkaOffsetManagementErrorCode.OffsetOutOfOrder, message);

/// <summary>
/// Offset management error code.
/// </summary>
public KafkaOffsetManagementErrorCode ErrorCode { get; }

internal KafkaOffsetManagementException(KafkaOffsetManagementErrorCode errorCode, string message)
Expand Down
92 changes: 74 additions & 18 deletions Kafka.OffsetManagement/OffsetManager.cs
Original file line number Diff line number Diff line change
@@ -1,36 +1,51 @@
namespace Kafka.OffsetManagement;

public sealed class OffsetManager : IDisposable
/// <summary>
/// Manages out of order offsets.
/// </summary>
public class OffsetManager : IDisposable
{
/// <summary>
/// Interval between unsuccessfull reset tries.
/// </summary>
public TimeSpan ResetCheckInterval { get; set; } = TimeSpan.FromMilliseconds(500);

private readonly object _lock = new();
private readonly IntegerArrayLinkedList _unackedOffsets;
private readonly SemaphoreSlim _addSemaphore;

private long? _lastAddedOffset;
private long? _lastOffset;
private bool _disposed;

/// <summary>
/// Creates offset manager.
/// </summary>
/// <param name="maxOutstanding">Max number of unacknowledged offsets at the same time.</param>
public OffsetManager(int maxOutstanding)
{
_unackedOffsets = new IntegerArrayLinkedList(maxOutstanding);
_addSemaphore = new SemaphoreSlim(maxOutstanding, maxOutstanding);
}

/// <summary>
/// Returns offset acknowledgement ID
/// that can later be used to acknowledge the offset.
/// Waits if offset manager has maxOutstanding unacknowledged offsets.
/// </summary>
public async Task<AckId> GetAckIdAsync(long offset, CancellationToken token = default)
{
await _addSemaphore.WaitAsync(token);

lock (_lock)
{
if (offset <= _lastAddedOffset)
throw KafkaOffsetManagementException.OffsetOutOfOrder(
$"Offset {offset} must be greater than last added offset {_lastAddedOffset}.");

_lastAddedOffset = offset;
UpdateLastOffset(offset);
return _unackedOffsets.Add(offset);
}
}

/// <summary>
/// Acknowledges.
/// </summary>
public void Ack(AckId ackId)
{
lock (_lock)
Expand All @@ -41,45 +56,86 @@ public void Ack(AckId ackId)
_addSemaphore.Release();
}

/// <summary>
/// Marks offset as acknowledged. Can only be used in a sequential manner.
/// </summary>
public void MarkAsAcked(long offset)
{
lock (_lock)
{
if (offset <= _lastAddedOffset)
throw KafkaOffsetManagementException.OffsetOutOfOrder(
$"Offset {offset} must be greater than last added offset {_lastAddedOffset}.");

_lastAddedOffset = offset;
UpdateLastOffset(offset);
}
}

/// <summary>
/// Returns offset that can be safely commited.
/// Returns null if no offset can be commited safely.
/// </summary>
public long? GetCommitOffset()
{
lock (_lock)
{
return _unackedOffsets.First() ?? _lastAddedOffset + 1;
return _unackedOffsets.First() ?? _lastOffset + 1;
}
}

/// <summary>
/// Waits until there are no unacknowledged offsets
/// and resets current manager instance to the initial state.
/// </summary>
public async Task ResetAsync(CancellationToken token = default)
{
while (!token.IsCancellationRequested)
{
lock (_lock)
{
if (_unackedOffsets.First() is null)
{
_lastAddedOffset = null;
if (TryReset())
return;
}
}

await Task.Delay(ResetCheckInterval, token);
}
}

private void UpdateLastOffset(long offset)
{
if (offset <= _lastOffset)
throw KafkaOffsetManagementException.OffsetOutOfOrder(
$"Offset {offset} must be greater than last added offset {_lastOffset}.");

_lastOffset = offset;
}

private bool TryReset()
{
if (_unackedOffsets.First() is not null)
return false;

_lastOffset = null;
return true;
}

public void Dispose()
{
_addSemaphore.Dispose();
Dispose(true);
GC.SuppressFinalize(this);
}

protected virtual void Dispose(bool disposing)
{
if (_disposed)
return;

if (disposing)
{
_addSemaphore.Dispose();
}

_disposed = true;
}

~OffsetManager()
{
Dispose(false);
}
}
32 changes: 14 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,34 +24,30 @@ PM> Install-Package Kafka.OffsetManager
using Kafka.OffsetManagement;

// `maxOutstanding` specifies the maximum number of offsets that
// can be kept unacknowledged.
// can be kept unacknowledged at the same time.
using var offsetManager = new OffsetManager(maxOutstanding: 10000);

// `GetAckIdAsync` returns an acknowledgement ID that later can be
// used to acknowledge successfully processed offsets.
// In case `OffsetManager` has `maxOutstanding` unacknowledged offsets,
// `GetAckIdAsync` waits and only returns when at least one of the
// unacknowledged offsets is acknowledged.
// Call `GetAckIdAsync` to get an acknowledgement ID that later can be
// used to acknowledge successfully processed message offsets.
var ackId = await offsetManager.GetAckIdAsync(offset: 5);

// Process messages in parallel.
// `Ack` acknowledges successfully processed offset.
// Process messages in parallel and call `Ack` to acknowledge
// successfully processed message offset.
offsetManager.Ack(ackId);

// `MarkAsAcked` marks successfully processed offset as acknowledged.
// `MarkAsAcked` can only be used to acknowledge offsets in sequential manner.
// Offset 5 can't be marked as acknowledged after marking offset 6 as acknowledged
// or getting acknowledgement ID for offset 6.
// Call `MarkAsAcked` to mark message offset as sucessfully processed.
// `MarkAsAcked` can only be used to acknowledge offsets in sequential manner -
// offset 5 can only be marked as acknowledged before marking offset 6 as acknowledged
// or getting acknowledgement (`GetAckIdAsync`) ID for offset 6.
// `MarkAsAcked` is usually used to mark message as acknowledged without
// further processing it.
offsetManager.MarkAsAcked(offset: 6);

// `GetCommitOffset` returns an offset that can be safely committed.
// Call `GetCommitOffset` to get the offset that can be safely committed.
// Safely commitable offset = last sequentialy processed offset + 1.
// If offsets 3, 4, 7 are acknowledged, `GetCommitOffset` will return 5.
// Only when offsets 5 and 6 are acknowledged `GetCommitOffset` will return 8.
// The offset returned not the one that was acknowledged but the one that can be safely committed.
// Safely commitable offset = last sequentialy processed offset + 1.
// `GetCommitOffset` is usually periodically called on a separate
// from message processing thread.
// `GetCommitOffset` is usually called from a separate from message processing thread.
var commitableOffset = offsetManager.GetCommitOffset();
```

Expand Down

0 comments on commit 3a8c66e

Please sign in to comment.