Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sub batch publish #57

Merged
merged 14 commits into from
Jan 21, 2022
Merged

Sub batch publish #57

merged 14 commits into from
Jan 21, 2022

Conversation

Gsantomaggio
Copy link
Member

@Gsantomaggio Gsantomaggio commented Jan 11, 2022

The current implementation supports:

  • GZIP compression
  • None compression
 var messages = new List<Message>();
var messagesGzip = new List<Message>();
const ulong messagesCount = 1_000_000;
var start = DateTime.Now;
ulong pid = 0;
for (ulong i = 1; i <= messagesCount; i++)
{
    var message = new Message(Encoding.UTF8.GetBytes($"NONE {i}"));
    messages.Add(message);
    var messageGzip = new Message(Encoding.UTF8.GetBytes($"GZIP {i}"));
    messagesGzip.Add(messageGzip);

    if (i % 5000 != 0) continue;
    await producer.Send(pid++, messages, CompressionType.None);
    messages.Clear();

    await producer.Send(pid++, messagesGzip, CompressionType.Gzip);
    messagesGzip.Clear();
}

Console.WriteLine($"{messagesCount * 2} published in: {DateTime.Now - start}");

Performances look good (my 2,8 GHz Quad-Core Intel Core i7 mac)

2000000 published in: 00:00:01.3406040

Consumer usage does not change.

Note:
The limit for sub-batching messages is ushort so it is not possible to store more than 65535 messages at time.
In this case the publish/3 method will raise an exception:

RabbitMQ.Stream.Client.OutOfBoundsException: List out of limits: 0-65535

see the tests:

  • unittests:MessagesListLenValidation
  • producertests:ProducerMessagesListLenValidation

Add custom codec

You can implement the ICompressionCodec interface, with something like:

public class StreamLz4Codec : ICompressionCodec

then register the codec for the specific compress type

StreamCompressionCodecs.RegisterCodec<StreamLz4Codec>(CompressionMode.Lz4);

See: #57 (comment)

@Gsantomaggio Gsantomaggio marked this pull request as draft January 11, 2022 17:14
@codecov-commenter
Copy link

codecov-commenter commented Jan 11, 2022

Codecov Report

Merging #57 (3b24b31) into main (574c161) will increase coverage by 0.96%.
The diff coverage is 95.88%.

Impacted file tree graph

@@            Coverage Diff             @@
##             main      #57      +/-   ##
==========================================
+ Coverage   87.87%   88.83%   +0.96%     
==========================================
  Files          53       55       +2     
  Lines        2779     3153     +374     
  Branches      115      131      +16     
==========================================
+ Hits         2442     2801     +359     
- Misses        304      316      +12     
- Partials       33       36       +3     
Impacted Files Coverage Δ
RabbitMQ.Stream.Client/Producer.cs 78.14% <66.66%> (-1.12%) ⬇️
Tests/UnitTests.cs 97.36% <93.54%> (-2.64%) ⬇️
RabbitMQ.Stream.Client/Compression.cs 94.89% <94.89%> (ø)
Tests/ProducerSystemTests.cs 99.30% <98.43%> (-0.70%) ⬇️
RabbitMQ.Stream.Client/Client.cs 88.67% <100.00%> (ø)
RabbitMQ.Stream.Client/Deliver.cs 96.52% <100.00%> (+2.87%) ⬆️
RabbitMQ.Stream.Client/SubEntryPublish.cs 100.00% <100.00%> (ø)
Tests/ConsumerSystemTests.cs 100.00% <100.00%> (ø)
Tests/Utils.cs 88.31% <100.00%> (ø)

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 574c161...3b24b31. Read the comment docs.

@Gsantomaggio Gsantomaggio changed the title Work in progress sub batch publish Sub batch publish Jan 19, 2022
@Gsantomaggio Gsantomaggio marked this pull request as ready for review January 19, 2022 11:12
@Gsantomaggio
Copy link
Member Author

A complete Lz4 Codec using https://github.com/MiloszKrajewski/K4os.Compression.LZ4:

public class StreamLz4Codec : ICompressionCodec
{
    private ReadOnlySequence<byte> compressedReadOnlySequence;

    private static int WriteUInt32(Span<byte> span, uint value)
    {
        BinaryPrimitives.WriteUInt32BigEndian(span, value);
        return 4;
    }

    private static int Write(Span<byte> span, ReadOnlySequence<byte> msg)
    {
        msg.CopyTo(span);
        return (int) msg.Length;
    }

    public void Compress(List<Message> messages)
    {
        MessagesCount = messages.Count;
        UnCompressedSize = messages.Sum(msg => 4 + msg.Size);
        var messagesSource = new Span<byte>(new byte[UnCompressedSize]);
        var offset = 0;
        foreach (var msg in messages)
        {
            offset += WriteUInt32(messagesSource.Slice(offset), (uint) msg.Size);
            offset += msg.Write(messagesSource.Slice(offset));
        }

        using var source = new MemoryStream(messagesSource.ToArray());
        using var destination = new MemoryStream();
        var settings = new LZ4EncoderSettings
        {
            ChainBlocks = false
        };
        using (var target = LZ4Stream.Encode(destination, settings, false))
        {
            source.CopyTo(target);
        }
        compressedReadOnlySequence = new ReadOnlySequence<byte>(destination.ToArray());
    }

    public int Write(Span<byte> span)
    {
        return Write(span, compressedReadOnlySequence);
    }

    public int CompressedSize => (int) compressedReadOnlySequence.Length;

    public int UnCompressedSize { get; private set; }
    public int MessagesCount { get; private set; }

    public CompressionType CompressionType => CompressionType.Lz4;

    public ReadOnlySequence<byte> UnCompress(ReadOnlySequence<byte> source, uint dataLen, uint unCompressedDataSize)
    {
        using var target = new MemoryStream();
        using (var sourceDecode = LZ4Stream.Decode(new MemoryStream(source.ToArray())))
        {
            sourceDecode.CopyTo(target);
        }

        return new ReadOnlySequence<byte>(target.ToArray());
    }
}

Then:

 StreamCompressionCodecs.RegisterCodec<StreamLz4Codec>(CompressionType.Lz4);

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants