Skip to content

Commit

Permalink
data table refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
Jack Dermody committed Apr 13, 2024
1 parent c142419 commit 7a9a8e6
Show file tree
Hide file tree
Showing 31 changed files with 543 additions and 134 deletions.
73 changes: 73 additions & 0 deletions BrightData.Parquet/BufferAdaptors/ParquetBufferAdaptor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using BrightData.Types;
using Parquet;
using Parquet.Data;

namespace BrightData.Parquet.BufferAdaptors
{
internal class ParquetBufferAdaptor<T>(RowGroupReaderProvider rowGroupProvider, int columnIndex, MetaData metaData) : IReadOnlyBufferWithMetaData<T>
where T : notnull
{
public uint Size => rowGroupProvider.Size;
public uint[] BlockSizes => rowGroupProvider.RowGroupSizes;
public Type DataType => typeof(T);

public async IAsyncEnumerable<object> EnumerateAll()
{
for (uint i = 0; i < rowGroupProvider.RowGroupCount; i++)
{
var column = await rowGroupProvider.GetColumn(i, columnIndex);
foreach (var item in GetArray(column))
yield return item;
}
}

public async Task<Array> GetBlock(uint blockIndex)
{
var column = await rowGroupProvider.GetColumn(blockIndex, columnIndex);
return GetArray(column);
}

public MetaData MetaData { get; } = metaData;

public async Task ForEachBlock(BlockCallback<T> callback, INotifyOperationProgress? notify = null, string? message = null, CancellationToken ct = default)
{
var guid = Guid.NewGuid();
notify?.OnStartOperation(guid, message);
for (uint i = 0; i < rowGroupProvider.RowGroupCount; i++)
{
var column = await rowGroupProvider.GetColumn(i, columnIndex);
var data = GetArray(column);
callback(data);
notify?.OnOperationProgress(guid, (float)i / rowGroupProvider.RowGroupCount);
}
notify?.OnCompleteOperation(guid, ct.IsCancellationRequested);
}

public async Task<ReadOnlyMemory<T>> GetTypedBlock(uint blockIndex)
{
var column = await rowGroupProvider.GetColumn(blockIndex, columnIndex);
var data = GetArray(column);
return data;
}

public async IAsyncEnumerable<T> EnumerateAllTyped()
{
for (uint i = 0; i < rowGroupProvider.RowGroupCount; i++)
{
var column = await rowGroupProvider.GetColumn(i, columnIndex);
var data = GetArray(column);
foreach (var item in data)
yield return item;
}
}

public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken ct = default) => EnumerateAllTyped().GetAsyncEnumerator(ct);

protected virtual T[] GetArray(DataColumn column) => (T[])column.Data;
}
}
25 changes: 25 additions & 0 deletions BrightData.Parquet/BufferAdaptors/ParquetMappedBufferAdaptor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
using BrightData.Types;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Parquet.Data;

namespace BrightData.Parquet.BufferAdaptors
{
internal class ParquetMappedBufferAdaptor<DT, T>(RowGroupReaderProvider rowGroupProvider, int columnIndex, MetaData metaData, Func<DT, T> mappingFunction) : ParquetBufferAdaptor<T>(rowGroupProvider, columnIndex, metaData)
where T : notnull
{
protected override T[] GetArray(DataColumn column)
{
var data = (DT[])column.Data;
var ret = new T[data.Length];
var index = 0;

foreach (var item in data)
ret[index++] = mappingFunction(item);
return ret;
}
}
}
25 changes: 25 additions & 0 deletions BrightData.Parquet/BufferAdaptors/ParquetNullableBufferAdaptor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
using BrightData.Types;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Parquet.Data;

namespace BrightData.Parquet.BufferAdaptors
{
internal class ParquetNullableBufferAdaptor<T>(RowGroupReaderProvider rowGroupProvider, int columnIndex, MetaData metaData, T defaultValue) : ParquetBufferAdaptor<T>(rowGroupProvider, columnIndex, metaData)
where T : struct
{
protected override T[] GetArray(DataColumn column)
{
var data = (T?[])column.Data;
var ret = new T[data.Length];
var index = 0;

foreach (var item in data)
ret[index++] = item ?? defaultValue;
return ret;
}
}
}
24 changes: 24 additions & 0 deletions BrightData.Parquet/BufferAdaptors/ParquetNullableStringAdaptor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using BrightData.Types;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Parquet.Data;

namespace BrightData.Parquet.BufferAdaptors
{
internal class ParquetNullableStringAdaptor(RowGroupReaderProvider rowGroupProvider, int columnIndex, MetaData metaData, string defaultValue) : ParquetBufferAdaptor<string>(rowGroupProvider, columnIndex, metaData)
{
protected override string[] GetArray(DataColumn column)
{
var data = (string?[])column.Data;
var ret = new string[data.Length];
var index = 0;

foreach (var item in data)
ret[index++] = item ?? defaultValue;
return ret;
}
}
}
10 changes: 10 additions & 0 deletions BrightData.Parquet/ExtensionMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,5 +94,15 @@ static ICompositeBuffer CreateColumn(ParquetRowGroupReader reader, DataField fie
columnMetaData.Set(key, value);
return builder.CreateColumn(field.ClrType.GetBrightDataType(), columnMetaData);
}

public static Task<IDataTable> LoadParquetDataTableFromFile(this BrightDataContext context, string path)
{
return LoadParquetDataTableFromStream(context, File.OpenRead(path));
}

public static async Task<IDataTable> LoadParquetDataTableFromStream(this BrightDataContext context, Stream stream)
{
return await ParquetDataTableAdaptor.Create(context, stream);
}
}
}
30 changes: 0 additions & 30 deletions BrightData.Parquet/ParquetBufferAdaptor.cs

This file was deleted.

135 changes: 117 additions & 18 deletions BrightData.Parquet/ParquetDataTableAdaptor.cs
Original file line number Diff line number Diff line change
@@ -1,33 +1,113 @@
using BrightData.DataTable.Rows;
using BrightData.Converter;
using BrightData.DataTable.Rows;
using BrightData.Parquet.BufferAdaptors;
using BrightData.Types;
using Parquet;
using Parquet.Schema;

namespace BrightData.Parquet
{
internal class ParquetDataTableAdaptor : IDataTable
{
readonly ParquetReader _reader;
uint? _rowCount = null;
readonly ParquetReader _reader;
readonly RowGroupReaderProvider _rowGroupReaderProvider;
readonly IReadOnlyBufferWithMetaData[] _columns;
readonly Lazy<IReadOnlyBuffer<object>[]> _genericColumns;
uint? _rowCount = null;

private ParquetDataTableAdaptor(BrightDataContext context, ParquetReader reader)
{
Context = context;
_reader = reader;
ColumnMetaData = reader.Schema.DataFields.Select(x => {
var dataFields = reader.Schema.DataFields;
ColumnMetaData = dataFields.Select(x => {
var m = new MetaData();
m.SetName(x.Name);
return m;
}).ToArray();
_rowGroupReaderProvider = new(reader);
_columns = dataFields.Select(GetColumn).ToArray();

// create lazily loaded column to object columns
_genericColumns = new(GetColumnsAsObjectBuffers);
}

IReadOnlyBuffer<object>[] GetColumnsAsObjectBuffers()
{
var index = 0;
var ret = new IReadOnlyBuffer<object>[ColumnCount];
for (uint i = 0; i < ColumnCount; i++)
ret[index++] = GetColumn(i).ToObjectBuffer();
return ret;
}

IReadOnlyBufferWithMetaData GetColumn(DataField dataField, int columnIndex)
{
var metaData = ColumnMetaData[columnIndex];
var dataType = dataField.ClrType;
var isNullable = dataField.IsNullable;

if (dataType == typeof(byte[])) {
return isNullable
? (IReadOnlyBufferWithMetaData)new ParquetMappedBufferAdaptor<byte[]?, BinaryData>(_rowGroupReaderProvider, columnIndex, metaData, x => x is null ? new BinaryData() : new BinaryData(x))
: new ParquetMappedBufferAdaptor<byte[], BinaryData>(_rowGroupReaderProvider, columnIndex, metaData, x => new BinaryData(x))
;
}

var typeCode = Type.GetTypeCode(dataType);
return typeCode switch {
TypeCode.Boolean when isNullable => new ParquetNullableBufferAdaptor<bool>(_rowGroupReaderProvider, columnIndex, metaData, false),
TypeCode.Boolean => new ParquetBufferAdaptor<bool>(_rowGroupReaderProvider, columnIndex, metaData),

TypeCode.DateTime when isNullable => new ParquetNullableBufferAdaptor<DateTime>(_rowGroupReaderProvider, columnIndex, metaData, DateTime.MinValue),
TypeCode.DateTime => new ParquetBufferAdaptor<DateTime>(_rowGroupReaderProvider, columnIndex, metaData),

TypeCode.Decimal when isNullable => new ParquetNullableBufferAdaptor<decimal>(_rowGroupReaderProvider, columnIndex, metaData, decimal.MinValue),
TypeCode.Decimal => new ParquetBufferAdaptor<decimal>(_rowGroupReaderProvider, columnIndex, metaData),

TypeCode.Double when isNullable => new ParquetNullableBufferAdaptor<double>(_rowGroupReaderProvider, columnIndex, metaData, double.MinValue),
TypeCode.Double => new ParquetBufferAdaptor<double>(_rowGroupReaderProvider, columnIndex, metaData),

TypeCode.Single when isNullable => new ParquetNullableBufferAdaptor<float>(_rowGroupReaderProvider, columnIndex, metaData, float.MinValue),
TypeCode.Single => new ParquetBufferAdaptor<float>(_rowGroupReaderProvider, columnIndex, metaData),

TypeCode.Byte when isNullable => new ParquetNullableBufferAdaptor<byte>(_rowGroupReaderProvider, columnIndex, metaData, byte.MinValue),
TypeCode.Byte => new ParquetBufferAdaptor<byte>(_rowGroupReaderProvider, columnIndex, metaData),

TypeCode.String when isNullable => new ParquetNullableStringAdaptor(_rowGroupReaderProvider, columnIndex, metaData, string.Empty),
TypeCode.String => new ParquetBufferAdaptor<string>(_rowGroupReaderProvider, columnIndex, metaData),

TypeCode.Int16 when isNullable => new ParquetNullableBufferAdaptor<short>(_rowGroupReaderProvider, columnIndex, metaData, short.MinValue),
TypeCode.Int16 => new ParquetBufferAdaptor<short>(_rowGroupReaderProvider, columnIndex, metaData),

TypeCode.Int32 when isNullable => new ParquetNullableBufferAdaptor<int>(_rowGroupReaderProvider, columnIndex, metaData, int.MinValue),
TypeCode.Int32 => new ParquetBufferAdaptor<int>(_rowGroupReaderProvider, columnIndex, metaData),

TypeCode.Int64 when isNullable => new ParquetNullableBufferAdaptor<long>(_rowGroupReaderProvider, columnIndex, metaData, long.MinValue),
TypeCode.Int64 => new ParquetBufferAdaptor<long>(_rowGroupReaderProvider, columnIndex, metaData),

TypeCode.UInt16 when isNullable => new ParquetNullableBufferAdaptor<ushort>(_rowGroupReaderProvider, columnIndex, metaData, ushort.MinValue),
TypeCode.UInt16 => new ParquetBufferAdaptor<ushort>(_rowGroupReaderProvider, columnIndex, metaData),

TypeCode.UInt32 when isNullable => new ParquetNullableBufferAdaptor<uint>(_rowGroupReaderProvider, columnIndex, metaData, uint.MinValue),
TypeCode.UInt32 => new ParquetBufferAdaptor<uint>(_rowGroupReaderProvider, columnIndex, metaData),

TypeCode.UInt64 when isNullable => new ParquetNullableBufferAdaptor<ulong>(_rowGroupReaderProvider, columnIndex, metaData, ulong.MinValue),
TypeCode.UInt64 => new ParquetBufferAdaptor<ulong>(_rowGroupReaderProvider, columnIndex, metaData),

_ => throw new ArgumentOutOfRangeException($"{typeCode} could not be converted")
};
}

public async Task<ParquetDataTableAdaptor> Create(BrightDataContext context, Stream stream)
public static async Task<ParquetDataTableAdaptor> Create(BrightDataContext context, Stream stream)
{
return new(context, await ParquetReader.CreateAsync(stream));
}

public void Dispose()
{
_reader.Dispose();
_rowGroupReaderProvider.Dispose();
}

public MetaData MetaData
Expand Down Expand Up @@ -67,31 +147,50 @@ public void PersistMetaData()
throw new NotImplementedException();
}

public IReadOnlyBufferWithMetaData GetColumn(uint index)
{
throw new NotImplementedException();
}
public IReadOnlyBufferWithMetaData GetColumn(uint index) => _columns[index];

public IReadOnlyBufferWithMetaData<T> GetColumn<T>(uint index) where T : notnull
{
throw new NotImplementedException();
var typeofT = typeof(T);
var reader = _columns[index];
var dataType = reader.DataType;

if(dataType == typeofT)
return (IReadOnlyBufferWithMetaData<T>)reader;

//if (typeofT == typeof(object)) {
// var ret = new ReadOnlyBufferMetaDataWrapper<object>(GenericTypeMapping.ToObjectConverter(reader), _columnMetaData[index]);
// return (IReadOnlyBufferWithMetaData<T>)ret;
//}

//if (typeofT == typeof(string)) {
// var ret = new ReadOnlyBufferMetaDataWrapper<string>(GenericTypeMapping.ToStringConverter(reader), _columnMetaData[index]);
// return (IReadOnlyBufferWithMetaData<T>)ret;
//}

//if (dataType.GetBrightDataType().IsNumeric() && typeofT.GetBrightDataType().IsNumeric()) {
// var converter = StaticConverters.GetConverter(dataType, typeof(T));
// return new ReadOnlyBufferMetaDataWrapper<T>((IReadOnlyBuffer<T>)GenericTypeMapping.TypeConverter(typeof(T), reader, converter), _columnMetaData[index]);
//}

throw new NotImplementedException($"Not able to create a column of type {typeof(T)} from {dataType}");
}

public Task<T> Get<T>(uint columnIndex, uint rowIndex) where T : notnull
{
throw new NotImplementedException();
var column = GetColumn<T>(columnIndex);
return column.GetItem(rowIndex);
}

public Task<T[]> Get<T>(uint columnIndex, params uint[] rowIndices) where T : notnull
{
throw new NotImplementedException();
}

public Task<GenericTableRow[]> GetRows(params uint[] rowIndices)
{
throw new NotImplementedException();
var column = GetColumn<T>(columnIndex);
return column.GetItems(rowIndices);
}

public GenericTableRow this[uint index] => throw new NotImplementedException();
public Task<GenericTableRow[]> GetRows(params uint[] rowIndices) => BrightData.ExtensionMethods.GetRows(this, rowIndices);
public Task<GenericTableRow> this[uint index] => this.GetRow(index);
public IAsyncEnumerable<GenericTableRow> EnumerateRows() => BrightData.ExtensionMethods.EnumerateRows(this);
IReadOnlyBuffer<object>[] IHaveGenericColumns.GenericColumns => _genericColumns.Value;
}
}

0 comments on commit 7a9a8e6

Please sign in to comment.