Skip to content

Commit

Permalink
Merge pull request #85 from mukunku/v2.7.2-release
Browse files Browse the repository at this point in the history
V2.7.2 release
  • Loading branch information
mukunku committed Aug 31, 2023
2 parents be6acf2 + 854bb0d commit ab5f189
Show file tree
Hide file tree
Showing 26 changed files with 475 additions and 151 deletions.
11 changes: 9 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,23 @@ If you'd like to add any new features feel free to send a pull request.

Some key features:
* View parquet file metadata
* Run simple sql-like queries on parquet data
* Run simple sql queries on parquet data
* Open single or partitioned files

# Download
Releases can be found here: https://github.com/mukunku/ParquetViewer/releases

Visit the Wiki for details on how to use the utility: https://github.com/mukunku/ParquetViewer/wiki

# Analytics
Users can opt-in to share anonymous usage data to help make the app better. [^1]

Checkout the [ParquetViewer Analytics Dashboard](https://app.amplitude.com/analytics/share/7207c0b64c154e979afd7082980d6dd6) if you're interested!

[^1]: Full privacy policy here: https://github.com/mukunku/ParquetViewer/wiki/Privacy-Policy

# Technical Details
The latest version of this project was written in C# using Visual Studio 2022 v17.5.3 and .NET 7
The latest version of this project was written in C# using Microsoft Visual Studio Community 2022 v17.7.3 and .NET 7

# Acknowledgements
This utility would not be possible without: https://github.com/aloneguid/parquet-dotnet
4 changes: 2 additions & 2 deletions src/Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
<ManagePackageVersionsCentrally>true</ManagePackageVersionsCentrally>
</PropertyGroup>
<ItemGroup>
<PackageVersion Include="Apache.Arrow" Version="12.0.0" />
<PackageVersion Include="Parquet.Net" Version="4.12.0" />
<PackageVersion Include="Apache.Arrow" Version="13.0.0" />
<PackageVersion Include="Parquet.Net" Version="4.16.2" />
<PackageVersion Include="Microsoft.CSharp" Version="4.7.0" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.6.0" />
<PackageVersion Include="RichardSzalay.MockHttp" Version="6.0.0" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
{
public class UnsupportedFieldException : Exception
{
public UnsupportedFieldException(string fieldName) : base(fieldName)
public UnsupportedFieldException(string message, Exception? ex = null) : base(message, ex)
{

}
Expand Down
103 changes: 82 additions & 21 deletions src/ParquetViewer.Engine/ParquetEngine.Processor.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Parquet;
using Parquet.Meta;
using ParquetViewer.Engine.Exceptions;
using System.Collections;
using System.Data;
Expand Down Expand Up @@ -83,8 +84,8 @@ public async Task<DataTable> ReadRowsAsync(List<string> selectedFields, int offs
{
cancellationToken.ThrowIfCancellationRequested();

var field = ParquetSchemaTree.GetChildByName(column.ColumnName);
if (field.SchemaElement.LogicalType?.LIST is not null)
var field = ParquetSchemaTree.GetChild(column.ColumnName);
if (field.SchemaElement.LogicalType?.LIST is not null || field.SchemaElement.ConvertedType == Parquet.Meta.ConvertedType.LIST)
{
await ReadListField(dataTable, groupReader, rowBeginIndex, field, skipRecords,
readRecords, isFirstColumn, rowLookupCache, cancellationToken, progress);
Expand All @@ -104,7 +105,7 @@ public async Task<DataTable> ReadRowsAsync(List<string> selectedFields, int offs
}
}

private static async Task ReadPrimitiveField(DataTable dataTable, ParquetRowGroupReader groupReader, int rowBeginIndex, ParquetSchemaElement field,
private async Task ReadPrimitiveField(DataTable dataTable, ParquetRowGroupReader groupReader, int rowBeginIndex, ParquetSchemaElement field,
long skipRecords, long readRecords, bool isFirstColumn, Dictionary<int, DataRow> rowLookupCache, CancellationToken cancellationToken, IProgress<int>? progress)
{
int rowIndex = rowBeginIndex;
Expand Down Expand Up @@ -146,36 +147,89 @@ public async Task<DataTable> ReadRowsAsync(List<string> selectedFields, int offs
}
}

datarow[fieldIndex] = value ?? DBNull.Value;
datarow[fieldIndex] = FixDateTime(value, field) ?? DBNull.Value;

rowIndex++;
progress?.Report(1);
}
}

/// <summary>
/// This is a patch fix to handle malformed datetime fields. We assume TIMESTAMP fields are DateTime values.
/// </summary>
/// <param name="value">Original value</param>
/// <param name="field">Schema element</param>
/// <returns>If the field is a timestamp, a DateTime object will be returned. Otherwise the value will not be changed.</returns>
private object? FixDateTime(object value, ParquetSchemaElement field)
{
if (!this.FixMalformedDateTime || value is null)
return value;

var timestampSchema = field.SchemaElement?.LogicalType?.TIMESTAMP;
if (timestampSchema is not null && field.SchemaElement?.ConvertedType is null)
{
long castValue;
if (field.DataField?.ClrType == typeof(long?))
{
castValue = ((long?)value).Value; //We know this isn't null from the null check above
}
else if (field.DataField?.ClrType == typeof(long))
{
castValue = (long)value;
}
else
{
throw new UnsupportedFieldException($"Field {field.Path} is not a valid timestamp field");
}

int divideBy = 0;
if (timestampSchema.Unit.NANOS != null)
divideBy = 1000 * 1000;
else if (timestampSchema.Unit.MICROS != null)
divideBy = 1000;
else if (timestampSchema.Unit.MILLIS != null)
divideBy = 1;

if (divideBy > 0)
value = DateTimeOffset.FromUnixTimeMilliseconds(castValue / divideBy).DateTime;
else //Not sure if this 'else' is correct but adding just in case
value = DateTimeOffset.FromUnixTimeSeconds(castValue);
}

return value;
}

private static async Task ReadListField(DataTable dataTable, ParquetRowGroupReader groupReader, int rowBeginIndex, ParquetSchemaElement field,
long skipRecords, long readRecords, bool isFirstColumn, Dictionary<int, DataRow> rowLookupCache, CancellationToken cancellationToken, IProgress<int>? progress)
{
var listField = field.GetChildByName("list");
var itemField = listField.GetChildByName("item");
var listField = field.GetChild("list");
ParquetSchemaElement itemField;
try
{
itemField = listField.GetChildOrSingle("item"); //Not all parquet files follow the same format so we're being lax with getting the child here
}
catch (Exception ex)
{
throw new UnsupportedFieldException($"Cannot load field '{field.Path}. Invalid List type.'", ex);
}

if (itemField.Children.Any())
throw new UnsupportedFieldException($"Cannot load field '{field.Path}'. Nested list types are not supported");

int rowIndex = rowBeginIndex;

int skippedRecords = 0;
var dataColumn = await groupReader.ReadColumnAsync(itemField.DataField, cancellationToken);
var dataColumn = await groupReader.ReadColumnAsync(itemField.DataField!, cancellationToken);

ArrayList? rowValue = null;
var fieldIndex = dataTable.Columns[field.Path].Ordinal;
var fieldIndex = dataTable.Columns[field.Path]!.Ordinal;
for (int i = 0; i < dataColumn.Data.Length; i++)
{
cancellationToken.ThrowIfCancellationRequested();

rowValue ??= new ArrayList();

bool IsEndOfRow() => (i + 1) == dataColumn.RepetitionLevels.Length
bool IsEndOfRow() => (i + 1) == dataColumn.RepetitionLevels!.Length
|| dataColumn.RepetitionLevels[i + 1] == 0; //0 means new list

//Skip rows
Expand Down Expand Up @@ -206,7 +260,7 @@ public async Task<DataTable> ReadRowsAsync(List<string> selectedFields, int offs
}
else
{
if (!rowLookupCache.TryGetValue(rowIndex, out datarow))
if (!rowLookupCache.TryGetValue(rowIndex, out datarow!))
{
datarow = dataTable.Rows[rowIndex];
rowLookupCache.TryAdd(rowIndex, datarow);
Expand All @@ -216,7 +270,7 @@ public async Task<DataTable> ReadRowsAsync(List<string> selectedFields, int offs
var lastItem = dataColumn.Data.GetValue(i) ?? DBNull.Value;
rowValue.Add(lastItem);

datarow[fieldIndex] = new ListValue(rowValue, itemField.DataField.ClrType);
datarow[fieldIndex] = new ListValue(rowValue, itemField.DataField!.ClrType);
rowValue = null;

rowIndex++;
Expand All @@ -236,20 +290,20 @@ public async Task<DataTable> ReadRowsAsync(List<string> selectedFields, int offs
private static async Task ReadMapField(DataTable dataTable, ParquetRowGroupReader groupReader, int rowBeginIndex, ParquetSchemaElement field,
long skipRecords, long readRecords, bool isFirstColumn, Dictionary<int, DataRow> rowLookupCache, CancellationToken cancellationToken, IProgress<int>? progress)
{
var keyValueField = field.GetChildByName("key_value");
var keyField = keyValueField.GetChildByName("key");
var valueField = keyValueField.GetChildByName("value");
var keyValueField = field.GetChild("key_value");
var keyField = keyValueField.GetChild("key");
var valueField = keyValueField.GetChild("value");

if (keyField.Children.Any() || valueField.Children.Any())
throw new UnsupportedFieldException($"Cannot load field '{field.Path}'. Nested map types are not supported");

int rowIndex = rowBeginIndex;

int skippedRecords = 0;
var keyDataColumn = await groupReader.ReadColumnAsync(keyField.DataField, cancellationToken);
var valueDataColumn = await groupReader.ReadColumnAsync(valueField.DataField, cancellationToken);
var keyDataColumn = await groupReader.ReadColumnAsync(keyField.DataField!, cancellationToken);
var valueDataColumn = await groupReader.ReadColumnAsync(valueField.DataField!, cancellationToken);

var fieldIndex = dataTable.Columns[field.Path].Ordinal;
var fieldIndex = dataTable.Columns[field.Path]!.Ordinal;
for (int i = 0; i < valueDataColumn.Data.Length; i++)
{
cancellationToken.ThrowIfCancellationRequested();
Expand Down Expand Up @@ -286,7 +340,7 @@ public async Task<DataTable> ReadRowsAsync(List<string> selectedFields, int offs

var key = keyDataColumn.Data.GetValue(i) ?? DBNull.Value;
var value = valueDataColumn.Data.GetValue(i) ?? DBNull.Value;
datarow[fieldIndex] = new MapValue(key, keyField.DataField.ClrType, value, valueField.DataField.ClrType);
datarow[fieldIndex] = new MapValue(key, keyField.DataField!.ClrType, value, valueField.DataField!.ClrType);

rowIndex++;
progress?.Report(1);
Expand All @@ -301,17 +355,24 @@ private DataTable BuildDataTable(List<string> fields)
DataTable dataTable = new();
foreach (var field in fields)
{
var schema = ParquetSchemaTree.GetChildByName(field);
var schema = ParquetSchemaTree.GetChild(field);

DataColumn newColumn;
if (schema.SchemaElement.ConvertedType == Parquet.Meta.ConvertedType.LIST)
if (schema.SchemaElement.ConvertedType == ConvertedType.LIST)
{
newColumn = new DataColumn(field, typeof(ListValue));
}
else if (schema.SchemaElement.ConvertedType == Parquet.Meta.ConvertedType.MAP)
else if (schema.SchemaElement.ConvertedType == ConvertedType.MAP)
{
newColumn = new DataColumn(field, typeof(MapValue));
}
else if (this.FixMalformedDateTime
&& schema.SchemaElement.LogicalType?.TIMESTAMP is not null
&& schema.SchemaElement?.ConvertedType is null)
{
//Fix for malformed datetime fields (#88)
newColumn = new DataColumn(field, typeof(DateTime));
}
else
{
var clrType = schema.DataField?.ClrType ?? throw new Exception($"{field} has no data field");
Expand Down
6 changes: 4 additions & 2 deletions src/ParquetViewer.Engine/ParquetEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public partial class ParquetEngine : IDisposable

public string OpenFileOrFolderPath { get; }

public bool FixMalformedDateTime { get; set; } = true;

private ParquetSchemaElement BuildParquetSchemaTree()
{
var thriftSchema = ThriftMetadata.Schema ?? throw new Exception("No thrift metadata was found");
Expand All @@ -37,10 +39,10 @@ private ParquetSchemaElement BuildParquetSchemaTree()

foreach (var dataField in Schema.GetDataFields())
{
var field = thriftSchemaTree.GetChildByName(dataField.Path.FirstPart ?? throw new Exception($"Field has no schema path: {dataField.Name}"));
var field = thriftSchemaTree.GetChild(dataField.Path.FirstPart ?? throw new Exception($"Field has no schema path: {dataField.Name}"));
for (var i = 1; i < dataField.Path.Length; i++)
{
field = field.GetChildByName(dataField.Path[i]);
field = field.GetChild(dataField.Path[i]);
}
field.DataField = dataField; //if it doesn't have a child it's a datafield (I hope)
}
Expand Down
17 changes: 16 additions & 1 deletion src/ParquetViewer.Engine/ParquetSchemaElement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,22 @@ public ParquetSchemaElement(SchemaElement schemaElement)
this.SchemaElement = schemaElement;
}

public ParquetSchemaElement GetChildByName(string name) => _children.TryGetValue(name, out var result)
public ParquetSchemaElement GetChild(string name) => _children.TryGetValue(name, out var result)
? result : throw new Exception($"Field schema path not found: {Path}/{name}");

public ParquetSchemaElement GetChildOrSingle(string name)
{
if (_children.TryGetValue(name, out var result))
{
return result;
}

if (_children.Count == 1)
{
return _children.First().Value;
}

throw new Exception($"Field schema path not found: {Path}/{name}");
}
}
}
1 change: 1 addition & 0 deletions src/ParquetViewer.Engine/ParquetViewer.Engine.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<PlatformTarget>x64</PlatformTarget>
<Configurations>Debug;Release;Release_SelfContained</Configurations>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Parquet.Net" />
Expand Down
Binary file not shown.
9 changes: 9 additions & 0 deletions src/ParquetViewer.Tests/ParquetViewer.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,18 @@
<IsPackable>false</IsPackable>

<PlatformTarget>x64</PlatformTarget>

<Configurations>Debug;Release;Release_SelfContained</Configurations>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|AnyCPU'">
<LangVersion>default</LangVersion>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release_SelfContained|AnyCPU'">
<LangVersion>default</LangVersion>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<LangVersion>default</LangVersion>
</PropertyGroup>
Expand Down Expand Up @@ -59,6 +65,9 @@
<None Update="Data\LIST_TYPE_TEST1.parquet">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
<None Update="Data\MALFORMED_DATETIME_TEST1.parquet">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
<None Update="Data\MAP_TYPE_TEST1.parquet">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
Expand Down
Loading

0 comments on commit ab5f189

Please sign in to comment.