Skip to content

Commit

Permalink
Sub SubEntry publish (#57)
Browse files Browse the repository at this point in the history
Implement SubEntry publish. 
Add new API send(id, messages, compressionType)
Support None and GZIP compression
Possibility to add Custom Compress Codec
  • Loading branch information
Gsantomaggio committed Jan 21, 2022
1 parent 574c161 commit 3325caf
Show file tree
Hide file tree
Showing 11 changed files with 765 additions and 52 deletions.
60 changes: 50 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,15 @@ var config = new StreamSystemConfig
### Tls
```csharp
var config = new StreamSystemConfig
{
UserName = "guest",
Password = "guest",
VirtualHost = "/",
Ssl = new SslOption()
{
Enabled = true
},
var config = new StreamSystemConfig
{
UserName = "guest",
Password = "guest",
VirtualHost = "/",
Ssl = new SslOption()
{
Enabled = true
},
```

### Load Balancer
Expand All @@ -146,7 +146,47 @@ var config = new StreamSystemConfig
```

### Publish Messages
// TODO

Standard publish:
```csharp
var producer = await system.CreateProducer(
new ProducerConfig
{
Reference = Guid.NewGuid().ToString(),
Stream = stream,
ConfirmHandler = conf =>
{
// messages confirmed
},
});

var message = new Message(Encoding.UTF8.GetBytes($"hello {i}"));
await producer.Send(i, message);

```

Sub Entries Batching:
A sub-entry is one "slot" in a publishing frame, meaning outbound messages are not only batched in publishing frames, but in sub-entries as well. Use this feature to increase throughput at the cost of increased latency.

`CompressionType.None` and `CompressionType.GZip` codecs compression are built-in.

```csharp
var subEntryMessages = List<Messages>();
for (var i = 1; i <= 500; i++)
{
var message = new Message(Encoding.UTF8.GetBytes($"SubBatchMessage_{i}"));
subEntryMessages.Add(message);
}

await producer.Send(1, subEntryMessages, CompressionType.Gzip);
messages.Clear();
```

Note:
`CompressionType.Lz4`,`CompressionType.Snappy`,`CompressionType.Zstd`
are not provided by default.<br>
See the section: "Implement a Custom Compression Codec" for more details.


### Consume Messages
// TODO
Expand Down
8 changes: 3 additions & 5 deletions RabbitMQ.Stream.Client/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public class Client : IClient
private uint correlationId = 0; // allow for some pre-amble

private byte nextPublisherId = 0;

private Connection connection;

private readonly IDictionary<byte, (Action<ReadOnlyMemory<ulong>>, Action<(ulong, ResponseCode)[]>)>
Expand Down Expand Up @@ -156,9 +156,9 @@ private Client(ClientParameters parameters)

private async Task OnConnectionClosed(string reason)
{
await ConnectionClosed?.Invoke(reason)!;
await ConnectionClosed?.Invoke(reason)!;
}

public static async Task<Client> Create(ClientParameters parameters)
{
var client = new Client(parameters);
Expand Down Expand Up @@ -428,9 +428,7 @@ private void HandleCorrelatedCommand(ushort tag, ref ReadOnlySequence<byte> fram
}
}




public async Task<CloseResponse> Close(string reason)
{
if (closeResponse != null)
Expand Down
216 changes: 216 additions & 0 deletions RabbitMQ.Stream.Client/Compression.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Threading;

namespace RabbitMQ.Stream.Client
{
// Compression Type for sub-entry publish
public enum CompressionType : byte
{
// builtin compression
None = 0,
Gzip = 1,

// Not implemented by default.
// It is possible to add custom codec with StreamCompressionCodecs
Snappy = 2,
Lz4 = 3,
Zstd = 4,
}

// Interface for Compress/unCompress the messages
// used by SubEntryPublish to publish the messages
public interface ICompressionCodec
{
void Compress(List<Message> messages);
public int Write(Span<byte> span);
public int CompressedSize { get; }
public int UnCompressedSize { get; }

public int MessagesCount { get; }

public CompressionType CompressionType { get; }

ReadOnlySequence<byte> UnCompress(ReadOnlySequence<byte> source, uint dataLen, uint unCompressedDataSize);
}

public class NoneCompressionCodec : ICompressionCodec
{
private List<Message> rMessages;

public void Compress(List<Message> messages)
{
rMessages = messages;
UnCompressedSize = messages.Sum(msg => 4 + msg.Size);
// since the buffer is not compressed CompressedSize is ==UnCompressedSize
CompressedSize = UnCompressedSize;
}

public int Write(Span<byte> span)
{
var offset = 0;
foreach (var msg in rMessages)
{
offset += WireFormatting.WriteUInt32(span.Slice(offset), (uint) msg.Size);
offset += msg.Write(span.Slice(offset));
}

return offset;
}

public int CompressedSize { get; private set; }
public int UnCompressedSize { get; private set; }
public int MessagesCount => rMessages.Count;
public CompressionType CompressionType => CompressionType.None;

public ReadOnlySequence<byte> UnCompress(ReadOnlySequence<byte> source, uint dataLen,
uint unCompressedDataSize)
{
return source;
}
}

public class GzipCompressionCodec : ICompressionCodec
{
private ReadOnlySequence<byte> compressedReadOnlySequence;

public void Compress(List<Message> messages)
{
MessagesCount = messages.Count;
UnCompressedSize = messages.Sum(msg => 4 + msg.Size);
var span = new Span<byte>(new byte[UnCompressedSize]);
var offset = 0;
foreach (var msg in messages)
{
offset += WireFormatting.WriteUInt32(span.Slice(offset), (uint) msg.Size);
offset += msg.Write(span.Slice(offset));
}

using var compressedMemory = new MemoryStream();
using (var gZipStream =
new GZipStream(compressedMemory, CompressionMode.Compress))
{
gZipStream.Write(span);
}
compressedReadOnlySequence = new ReadOnlySequence<byte>(compressedMemory.ToArray());
}

public int Write(Span<byte> span)
{
return WireFormatting.Write(span, compressedReadOnlySequence);
}

public int CompressedSize => (int) compressedReadOnlySequence.Length;
public int UnCompressedSize { get; private set; }
public int MessagesCount { get; private set; }
public CompressionType CompressionType => CompressionType.Gzip;


public ReadOnlySequence<byte> UnCompress(ReadOnlySequence<byte> source, uint dataLen, uint unCompressedDataSize)
{
using var sourceMemoryStream = new MemoryStream(source.ToArray(), 0, (int) dataLen);
using var resultMemoryStream = new MemoryStream((int) unCompressedDataSize);
using (var gZipStream =
new GZipStream(sourceMemoryStream, CompressionMode.Decompress))
{
gZipStream.CopyTo(resultMemoryStream);
}
return new ReadOnlySequence<byte>(resultMemoryStream.ToArray());
}
}

/// <summary>
/// StreamCompressionCodecs: Global class to register/unregister the compress codec
/// GZip and None compression are provided by default
/// </summary>
public static class StreamCompressionCodecs
{
private static readonly Dictionary<CompressionType, Type> AvailableCompressCodecs =
new()
{
{CompressionType.Gzip, typeof(GzipCompressionCodec)},
{CompressionType.None, typeof(NoneCompressionCodec)},
};

public static void RegisterCodec<T>(CompressionType compressionType) where T : ICompressionCodec, new()
{
if (AvailableCompressCodecs.ContainsKey(compressionType))
{
throw new CodecAlreadyExistException($"codec for {compressionType} already exist.");
}

AvailableCompressCodecs.Add(compressionType, typeof(T));
}

public static void UnRegisterCodec(CompressionType compressionType)
{
if (AvailableCompressCodecs.ContainsKey(compressionType))
{
AvailableCompressCodecs.Remove(compressionType);
}
}

public static ICompressionCodec GetCompressionCodec(CompressionType compressionType)
{
if (!AvailableCompressCodecs.ContainsKey(compressionType))
{
throw new CodecNotFoundException($"codec for {compressionType} not found");
}

return (ICompressionCodec) Activator.CreateInstance(AvailableCompressCodecs[compressionType]);
}
}

public static class CompressionHelper
{
public static ICompressionCodec Compress(List<Message> messages, CompressionType compressionType)
{
if (messages.Count > ushort.MaxValue)
{
throw new OutOfBoundsException($"List out of limits: 0-{ushort.MaxValue}");
}

var codec
= StreamCompressionCodecs.GetCompressionCodec(compressionType);
codec.Compress(messages);
return codec;
}

public static ReadOnlySequence<byte> UnCompress(CompressionType compressionType,
ReadOnlySequence<byte> source, uint dataLen,
uint unCompressedDataSize)
{
var codec = StreamCompressionCodecs.GetCompressionCodec(compressionType);
return codec.UnCompress(source, dataLen, unCompressedDataSize);
}
}


public class OutOfBoundsException : Exception
{
public OutOfBoundsException(string s) :
base(s)
{
}
}

public class CodecNotFoundException : Exception
{
public CodecNotFoundException(string s) :
base(s)
{
}
}

public class CodecAlreadyExistException : Exception
{
public CodecAlreadyExistException(string s) :
base(s)
{
}
}
}
Loading

0 comments on commit 3325caf

Please sign in to comment.