Skip to content

Commit

Permalink
buffer refactor - changed blocksizes to array
Browse files Browse the repository at this point in the history
  • Loading branch information
Jack Dermody committed Apr 8, 2024
1 parent 05b8786 commit c142419
Show file tree
Hide file tree
Showing 19 changed files with 402 additions and 484 deletions.
2 changes: 1 addition & 1 deletion BrightData.Parquet/ExtensionMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public static async Task WriteAsParquet(this IDataTable dataTable, Stream output
writer.CustomMetadata = dataTable.MetaData.AllKeys.ToDictionary(x => x, x => dataTable.MetaData.Get(x)?.ToString() ?? "");

// write each block
for (uint i = 0; i < firstColumn.BlockCount; i++) {
for (uint i = 0, len = (uint)firstColumn.BlockSizes.Length; i < len; i++) {
using var blockWriter = writer.CreateRowGroup();
var columnIndex = 0;
foreach (var column in columns) {
Expand Down
4 changes: 2 additions & 2 deletions BrightData.UnitTests/BufferTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public async Task StringBuffer()
var stringBuffer = _streamProvider.CreateCompositeBuffer(2, 0, 128);
stringBuffer.Append(data);
var index = 0;
for (uint i = 0; i < stringBuffer.BlockCount; i++) {
for (uint i = 0, len = (uint)stringBuffer.BlockSizes.Length; i < len; i++) {
var block = await stringBuffer.GetTypedBlock(i);
foreach (var item in block.ToArray())
item.Should().Be(data[index++]);
Expand Down Expand Up @@ -157,7 +157,7 @@ public async Task IntBuffer()
await foreach (var item in intBuffer)
item.Should().Be(++index);
index = 0;
for (uint i = 0; i < intBuffer.BlockCount; i++) {
for (uint i = 0, len = (uint)intBuffer.BlockSizes.Length; i < len; i++) {
var block = await intBuffer.GetTypedBlock(i);
foreach(var item in block.ToArray())
item.Should().Be(++index);
Expand Down
36 changes: 24 additions & 12 deletions BrightData/BrightData.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1007,12 +1007,6 @@
<param name="from"></param>
<param name="converter"></param>
</member>
<member name="T:BrightData.Buffer.ReadOnly.Helper.BufferConcatenator`1">
<summary>
Concatenates multiple buffers into one single buffer
</summary>
<typeparam name="T"></typeparam>
</member>
<member name="T:BrightData.Buffer.ReadOnly.Helper.MappedReadOnlyBuffer`2">
<summary>
Adapts a buffer with a block mapper function
Expand Down Expand Up @@ -2508,6 +2502,29 @@
<param name="type"></param>
<returns></returns>
</member>
<member name="M:BrightData.ExtensionMethods.AllIndices(BrightData.IReadOnlyBuffer)">
<summary>
Returns the block index and relative block index (index within each block) of each item in the buffer
</summary>
<param name="buffer"></param>
<returns></returns>
</member>
<member name="M:BrightData.ExtensionMethods.GetIndices(BrightData.IReadOnlyBuffer,System.UInt32[])">
<summary>
Returns all indices in the buffer, including their block index and relative block index (index within each block)
</summary>
<param name="buffer"></param>
<param name="rowIndices">Row indices to select (or all if non specified)</param>
<returns></returns>
</member>
<member name="M:BrightData.ExtensionMethods.GetIndices(BrightData.IReadOnlyBuffer,System.Collections.Generic.IEnumerable{System.UInt32})">
<summary>
Returns all indices in the buffer, including their block index and relative block index (index within each block)
</summary>
<param name="buffer"></param>
<param name="rowIndices">Row indices to select (or all if non specified)</param>
<returns></returns>
</member>
<member name="M:BrightData.ExtensionMethods.ToType(System.TypeCode)">
<summary>
Converts a type code to a type
Expand Down Expand Up @@ -7047,16 +7064,11 @@
Read only buffer - composed of multiple blocks of a fixed size
</summary>
</member>
<member name="P:BrightData.IReadOnlyBuffer.BlockSize">
<member name="P:BrightData.IReadOnlyBuffer.BlockSizes">
<summary>
Size of each block in the buffer
</summary>
</member>
<member name="P:BrightData.IReadOnlyBuffer.BlockCount">
<summary>
Number of blocks in the buffer
</summary>
</member>
<member name="P:BrightData.IReadOnlyBuffer.DataType">
<summary>
The type of data within the buffer
Expand Down
37 changes: 30 additions & 7 deletions BrightData/Buffer/Composite/CompositeBufferBase.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -53,6 +54,25 @@ internal abstract class CompositeBufferBase<T, BT> : TypedBufferBase<T>, ICompos
public uint? DistinctItems => (uint?)_distinct?.Count;
public Type DataType => typeof(T);

public uint[] BlockSizes
{
get
{
var ret = new uint[BlockCount];
var index = 0;
if (_inMemoryBlocks is not null) {
foreach (var block in _inMemoryBlocks) {
ret[index++] = BlockSize;
}
}
for(uint i = 0; i < _blocksInFile; i++)
ret[index++] = BlockSize;
if (_currBlock is not null)
ret[index] = _currBlock.Size;
return ret;
}
}

public async Task ForEachBlock(BlockCallback<T> callback, INotifyOperationProgress? notify = null, string? message = null, CancellationToken ct = default)
{
var guid = Guid.NewGuid();
Expand Down Expand Up @@ -221,10 +241,12 @@ public async Task WriteTo(Stream stream)
writer.Write(BlockCount);
writer.Flush();

// create space for the block positions
var headerPosition = stream.Position;
var blockPositions = new (long Offset, uint Size)[BlockCount];
stream.Seek((sizeof(long) + sizeof(uint)) * BlockCount, SeekOrigin.Current);
var blockPositions = new (long Offset, uint ByteSize, uint Count)[BlockCount];
stream.Seek((sizeof(long) + sizeof(uint) + sizeof(uint)) * BlockCount, SeekOrigin.Current);
stream.SetLength(stream.Position);

var index = 0;
var dataBlock = stream.AsDataBlock();
var pos = (uint)stream.Position;
Expand All @@ -234,7 +256,7 @@ public async Task WriteTo(Stream stream)
{
foreach (var block in _inMemoryBlocks) {
var blockSize = await block.WriteTo(dataBlock);
blockPositions[index++] = (pos, blockSize);
blockPositions[index++] = (pos, blockSize, block.Size);
pos += blockSize;
}
}
Expand All @@ -249,21 +271,22 @@ public async Task WriteTo(Stream stream)
var block = _blockFactory(blockData.ToArray(), true);
var blockSize = await block.WriteTo(dataBlock);
offset += size;
blockPositions[index++] = (pos, blockSize);
blockPositions[index++] = (pos, blockSize, block.Size);
pos += blockSize;
}
}

// write current block
if (_currBlock is not null) {
var blockSize = await _currBlock.WriteTo(dataBlock);
blockPositions[index] = (pos, blockSize);
blockPositions[index] = (pos, blockSize, _currBlock.Size);
}

stream.Seek(headerPosition, SeekOrigin.Begin);
foreach (var (startPos, size) in blockPositions) {
foreach (var (startPos, byteSize, count) in blockPositions) {
writer.Write(startPos);
writer.Write(size);
writer.Write(byteSize);
writer.Write(count);
}

// move the stream position to the end position
Expand Down
6 changes: 2 additions & 4 deletions BrightData/Buffer/Operations/IndexedCopyOperation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@ internal class IndexedCopyOperation<T>(IReadOnlyBuffer<T> from, IAppendToBuffer<
{
public async Task Execute(INotifyOperationProgress? notify = null, string? msg = null, CancellationToken ct = default)
{
var blockSize = from.BlockSize;
var blocks = indices.Select(x => (Index: x, BlockIndex: x / blockSize))
var blocks = from.GetIndices(indices)
.GroupBy(x => x.BlockIndex)
.OrderBy(x => x.Key)
;
foreach (var block in blocks) {
var blockMemory = await from.GetTypedBlock(block.Key);
var baseOffset = block.Key * blockSize;
CopyIndices(blockMemory, block.Select(x => (int)(x.Index - baseOffset)));
CopyIndices(blockMemory, block.Select(x => (int)x.RelativeBlockIndex));
}
return;

Expand Down
2 changes: 1 addition & 1 deletion BrightData/Buffer/Operations/ManyToManyCopy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public ManyToManyCopy(IReadOnlyList<IReadOnlyBuffer> from, IReadOnlyList<IAppend
if (_from.Any(x => x.Size != size))
throw new ArgumentException("Expected all input buffers to have the same size", nameof(from));
_size = (uint)_from.Count;
_blockSize = (uint)_from.Average(x => x.BlockSize);
_blockSize = (uint)_from.Average(x => x.BlockSizes.Average(y => y));
_to = to;
}

Expand Down
2 changes: 1 addition & 1 deletion BrightData/Buffer/Operations/ManyToOneMutation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public ManyToOneMutation(IEnumerable<IReadOnlyBuffer<FT>> from, IAppendToBuffer<
_size = _from.First().Size;
if (_from.Any(x => x.Size != _size))
throw new ArgumentException("Expected all input buffers to have the same size", nameof(from));
_blockSize = (uint)_from.Average(x => x.BlockSize);
_blockSize = (uint)_from.Average(x => x.BlockSizes.Average(y => y));
_mutator = mutator;
_to = to;
}
Expand Down
16 changes: 16 additions & 0 deletions BrightData/Buffer/ReadOnly/BlockReaderReadOnlyBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,22 @@ public async Task ForEachBlock(BlockCallback<T> callback, INotifyOperationProgre
notify?.OnCompleteOperation(guid, ct.IsCancellationRequested);
}

public uint[] BlockSizes
{
get
{
var ret = new uint[BlockCount];
var lastBlockIndex = BlockCount - 1;
for (uint i = 0; i < lastBlockIndex; i++)
ret[i] = BlockSize;

var start = _offset + lastBlockIndex * BlockSize * _sizeOfT;
var end = Math.Min(start + BlockSize * _sizeOfT, _byteSize + _offset);
ret[lastBlockIndex] = (end - start) / _sizeOfT;
return ret;
}
}

public override async Task<ReadOnlyMemory<T>> GetTypedBlock(uint blockIndex)
{
if (blockIndex >= BlockCount)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ abstract class ReadOnlyConverterBase<FT, TT>(IReadOnlyBuffer<FT> from) : TypedBu
protected abstract TT Convert(in FT from);

public uint Size => from.Size;
public uint BlockSize => from.BlockSize;
public uint BlockCount => from.BlockCount;
public uint[] BlockSizes => from.BlockSizes;
public Type DataType => typeof(TT);
public override async IAsyncEnumerable<object> EnumerateAll()
{
Expand Down
120 changes: 60 additions & 60 deletions BrightData/Buffer/ReadOnly/Helper/BufferConcatenator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,70 +11,70 @@ namespace BrightData.Buffer.ReadOnly.Helper
/// Concatenates multiple buffers into one single buffer
/// </summary>
/// <typeparam name="T"></typeparam>
internal class BufferConcatenator<T> : TypedBufferBase<T>, IReadOnlyBufferWithMetaData<T> where T : notnull
{
readonly IReadOnlyBufferWithMetaData<T>[] _buffers;
//internal class BufferConcatenator<T> : TypedBufferBase<T>, IReadOnlyBufferWithMetaData<T> where T : notnull
//{
// readonly IReadOnlyBufferWithMetaData<T>[] _buffers;

public BufferConcatenator(params IReadOnlyBufferWithMetaData<T>[] buffers)
{
_buffers = buffers;
var first = buffers.First();
var size = first.Size;
var blockCount = first.BlockCount;
MetaData = first.MetaData;
foreach (var buffer in buffers.Skip(1))
{
if (first.BlockSize != buffer.BlockSize)
throw new ArgumentException("All buffer block sizes must be the same");
size += buffer.Size;
blockCount += buffer.BlockCount;
}
Size = size;
BlockSize = first.BlockSize;
BlockCount = blockCount;
DataType = typeof(T);
}
// public BufferConcatenator(params IReadOnlyBufferWithMetaData<T>[] buffers)
// {
// _buffers = buffers;
// var first = buffers.First();
// var size = first.Size;
// var blockCount = first.BlockCount;
// MetaData = first.MetaData;
// foreach (var buffer in buffers.Skip(1))
// {
// if (first.BlockSize != buffer.BlockSize)
// throw new ArgumentException("All buffer block sizes must be the same");
// size += buffer.Size;
// blockCount += buffer.BlockCount;
// }
// Size = size;
// BlockSize = first.BlockSize;
// BlockCount = blockCount;
// DataType = typeof(T);
// }

public uint Size { get; }
public uint BlockSize { get; }
public uint BlockCount { get; }
public Type DataType { get; }
public MetaData MetaData { get; }
// public uint Size { get; }
// public uint BlockSize { get; }
// public uint BlockCount { get; }
// public Type DataType { get; }
// public MetaData MetaData { get; }

public override async IAsyncEnumerable<object> EnumerateAll()
{
foreach (var buffer in _buffers)
{
await foreach (var item in buffer.EnumerateAll())
yield return item;
}
}
// public override async IAsyncEnumerable<object> EnumerateAll()
// {
// foreach (var buffer in _buffers)
// {
// await foreach (var item in buffer.EnumerateAll())
// yield return item;
// }
// }

public async Task ForEachBlock(BlockCallback<T> callback, INotifyOperationProgress? notify = null, string? message = null, CancellationToken ct = default)
{
foreach (var buffer in _buffers)
await buffer.ForEachBlock(callback, notify, message, ct);
}
// public async Task ForEachBlock(BlockCallback<T> callback, INotifyOperationProgress? notify = null, string? message = null, CancellationToken ct = default)
// {
// foreach (var buffer in _buffers)
// await buffer.ForEachBlock(callback, notify, message, ct);
// }

public override Task<ReadOnlyMemory<T>> GetTypedBlock(uint blockIndex)
{
uint curr = 0;
foreach (var buffer in _buffers)
{
if (blockIndex < curr + buffer.BlockCount)
return buffer.GetTypedBlock(curr - blockIndex);
curr += buffer.BlockCount;
}
throw new Exception("Block not found");
}
// public override Task<ReadOnlyMemory<T>> GetTypedBlock(uint blockIndex)
// {
// uint curr = 0;
// foreach (var buffer in _buffers)
// {
// if (blockIndex < curr + buffer.BlockCount)
// return buffer.GetTypedBlock(curr - blockIndex);
// curr += buffer.BlockCount;
// }
// throw new Exception("Block not found");
// }

public override async IAsyncEnumerable<T> EnumerateAllTyped()
{
foreach (var buffer in _buffers)
{
await foreach (var item in buffer.EnumerateAllTyped())
yield return item;
}
}
}
// public override async IAsyncEnumerable<T> EnumerateAllTyped()
// {
// foreach (var buffer in _buffers)
// {
// await foreach (var item in buffer.EnumerateAllTyped())
// yield return item;
// }
// }
//}
}
4 changes: 2 additions & 2 deletions BrightData/Buffer/ReadOnly/Helper/MappedReadOnlyBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ internal class MappedReadOnlyBuffer<IT, T>(IReadOnlyBufferWithMetaData<IT> index
where IT : notnull
where T : notnull
{
public uint BlockSize => index.BlockSize;
public uint BlockCount => index.BlockCount;
public uint[] BlockSizes => index.BlockSizes;
public uint BlockCount { get; } = (uint)index.BlockSizes.Length;
public Type DataType => typeof(T);
public uint Size => index.Size;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ internal class ReadOnlyBufferMetaDataWrapper<T>(IReadOnlyBuffer<T> buffer, MetaD
{
public uint Size => buffer.Size;

public uint BlockSize => buffer.BlockSize;

public uint BlockCount => buffer.BlockCount;
public uint[] BlockSizes => buffer.BlockSizes;

public Type DataType => buffer.DataType;

Expand Down

0 comments on commit c142419

Please sign in to comment.