Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V2.7.2 release #85

Merged
merged 26 commits into from
Aug 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
d9fb100
better error handling
mukunku Jun 5, 2023
9993f4e
better list exception handling based on error logs
mukunku Jun 7, 2023
3c557c3
don't log invalid query exceptions
mukunku Jun 7, 2023
87532f5
bump assembly version
mukunku Jun 7, 2023
3db1357
better io exception handling
mukunku Jun 11, 2023
51704a5
exception cleanup
mukunku Jun 12, 2023
7ea551c
fix list type check
mukunku Aug 12, 2023
0389fe0
add column metadata to rowgroup metadata
mukunku Aug 12, 2023
5aa945a
loosen list schema validation
mukunku Aug 12, 2023
e1f60c8
update parquet.net library
mukunku Aug 12, 2023
0804119
remove statistics and encoding stats for now unless someone needs them
mukunku Aug 12, 2023
c379c99
intercept byte[] fields and render them as strings
mukunku Aug 17, 2023
d2ff2d5
bump assembly version to 2.7.2.1
mukunku Aug 17, 2023
40be835
some minor cleanup
mukunku Aug 17, 2023
330bc83
change default columns size mode to all cells
mukunku Aug 17, 2023
1faa99d
add copy raw button for thrift metadata and remove rowgroup details a…
mukunku Aug 17, 2023
5c4d7e2
update parquet.net package
mukunku Aug 17, 2023
29cb2e7
some cleanup
mukunku Aug 17, 2023
b3b2806
Update README.md
mukunku Aug 18, 2023
080226b
Update README.md
mukunku Aug 18, 2023
436992b
add fix for malformed datetime
mukunku Aug 30, 2023
51cc9fe
update packages and assembly version
mukunku Aug 30, 2023
2dd4a51
start tracking selfcontained executable usage
mukunku Aug 30, 2023
ca4b5ab
fix unit test
mukunku Aug 30, 2023
abfad6a
fix the test for realz this time
mukunku Aug 30, 2023
854bb0d
Add "SC" suffix to version number in about box for self contained dep…
mukunku Aug 30, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,23 @@ If you'd like to add any new features feel free to send a pull request.

Some key features:
* View parquet file metadata
* Run simple sql-like queries on parquet data
* Run simple sql queries on parquet data
* Open single or partitioned files

# Download
Releases can be found here: https://github.com/mukunku/ParquetViewer/releases

Visit the Wiki for details on how to use the utility: https://github.com/mukunku/ParquetViewer/wiki

# Analytics
Users can opt-in to share anonymous usage data to help make the app better. [^1]

Checkout the [ParquetViewer Analytics Dashboard](https://app.amplitude.com/analytics/share/7207c0b64c154e979afd7082980d6dd6) if you're interested!

[^1]: Full privacy policy here: https://github.com/mukunku/ParquetViewer/wiki/Privacy-Policy

# Technical Details
The latest version of this project was written in C# using Visual Studio 2022 v17.5.3 and .NET 7
The latest version of this project was written in C# using Microsoft Visual Studio Community 2022 v17.7.3 and .NET 7

# Acknowledgements
This utility would not be possible without: https://github.com/aloneguid/parquet-dotnet
4 changes: 2 additions & 2 deletions src/Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
<ManagePackageVersionsCentrally>true</ManagePackageVersionsCentrally>
</PropertyGroup>
<ItemGroup>
<PackageVersion Include="Apache.Arrow" Version="12.0.0" />
<PackageVersion Include="Parquet.Net" Version="4.12.0" />
<PackageVersion Include="Apache.Arrow" Version="13.0.0" />
<PackageVersion Include="Parquet.Net" Version="4.16.2" />
<PackageVersion Include="Microsoft.CSharp" Version="4.7.0" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.6.0" />
<PackageVersion Include="RichardSzalay.MockHttp" Version="6.0.0" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
{
public class UnsupportedFieldException : Exception
{
public UnsupportedFieldException(string fieldName) : base(fieldName)
public UnsupportedFieldException(string message, Exception? ex = null) : base(message, ex)
{

}
Expand Down
103 changes: 82 additions & 21 deletions src/ParquetViewer.Engine/ParquetEngine.Processor.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Parquet;
using Parquet.Meta;
using ParquetViewer.Engine.Exceptions;
using System.Collections;
using System.Data;
Expand Down Expand Up @@ -83,8 +84,8 @@ public async Task<DataTable> ReadRowsAsync(List<string> selectedFields, int offs
{
cancellationToken.ThrowIfCancellationRequested();

var field = ParquetSchemaTree.GetChildByName(column.ColumnName);
if (field.SchemaElement.LogicalType?.LIST is not null)
var field = ParquetSchemaTree.GetChild(column.ColumnName);
if (field.SchemaElement.LogicalType?.LIST is not null || field.SchemaElement.ConvertedType == Parquet.Meta.ConvertedType.LIST)
{
await ReadListField(dataTable, groupReader, rowBeginIndex, field, skipRecords,
readRecords, isFirstColumn, rowLookupCache, cancellationToken, progress);
Expand All @@ -104,7 +105,7 @@ public async Task<DataTable> ReadRowsAsync(List<string> selectedFields, int offs
}
}

private static async Task ReadPrimitiveField(DataTable dataTable, ParquetRowGroupReader groupReader, int rowBeginIndex, ParquetSchemaElement field,
private async Task ReadPrimitiveField(DataTable dataTable, ParquetRowGroupReader groupReader, int rowBeginIndex, ParquetSchemaElement field,
long skipRecords, long readRecords, bool isFirstColumn, Dictionary<int, DataRow> rowLookupCache, CancellationToken cancellationToken, IProgress<int>? progress)
{
int rowIndex = rowBeginIndex;
Expand Down Expand Up @@ -146,36 +147,89 @@ public async Task<DataTable> ReadRowsAsync(List<string> selectedFields, int offs
}
}

datarow[fieldIndex] = value ?? DBNull.Value;
datarow[fieldIndex] = FixDateTime(value, field) ?? DBNull.Value;

rowIndex++;
progress?.Report(1);
}
}

/// <summary>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strange that you need to work with such a patch...
Was it already here before?

Copy link
Owner Author

@mukunku mukunku Aug 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I shared some details on your issue ticket, I believe the timestamp field is malformed which is why it's being shown as a epoch value instead of datetime.

I added a patch so we can still open such fields in the app for now. These types of inconsistencies tend to get resolved over time so I'm hoping that will be the case with this issue. I added a unit test to detect this as well.

We used to handle timestamp fields directly in the app but the parquet-dotnet library had added support for internally handling DateTime fields so we got rid of the logic from the app. But if the metadata is malformed of course that library doesn't handle it as a DateTime. So I added the old logic back for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I am wondering which package doesn't write away the metadata correctly.

It's made with pandas, but that is juste a wrapper to pyarrow - the python implementation of Apache Arrow, a package from the creators of Apache Parquet.

So strange... or it's on the parquet-dotnet side.

I'll see if I need to open another bug somewhere else.

Thanks

/// This is a patch fix to handle malformed datetime fields. We assume TIMESTAMP fields are DateTime values.
/// </summary>
/// <param name="value">Original value</param>
/// <param name="field">Schema element</param>
/// <returns>If the field is a timestamp, a DateTime object will be returned. Otherwise the value will not be changed.</returns>
private object? FixDateTime(object value, ParquetSchemaElement field)
{
if (!this.FixMalformedDateTime || value is null)
return value;

var timestampSchema = field.SchemaElement?.LogicalType?.TIMESTAMP;
if (timestampSchema is not null && field.SchemaElement?.ConvertedType is null)
{
long castValue;
if (field.DataField?.ClrType == typeof(long?))
{
castValue = ((long?)value).Value; //We know this isn't null from the null check above
}
else if (field.DataField?.ClrType == typeof(long))
{
castValue = (long)value;
}
else
{
throw new UnsupportedFieldException($"Field {field.Path} is not a valid timestamp field");
}

int divideBy = 0;
if (timestampSchema.Unit.NANOS != null)
divideBy = 1000 * 1000;
else if (timestampSchema.Unit.MICROS != null)
divideBy = 1000;
else if (timestampSchema.Unit.MILLIS != null)
divideBy = 1;

if (divideBy > 0)
value = DateTimeOffset.FromUnixTimeMilliseconds(castValue / divideBy).DateTime;
else //Not sure if this 'else' is correct but adding just in case
value = DateTimeOffset.FromUnixTimeSeconds(castValue);
}

return value;
}

private static async Task ReadListField(DataTable dataTable, ParquetRowGroupReader groupReader, int rowBeginIndex, ParquetSchemaElement field,
long skipRecords, long readRecords, bool isFirstColumn, Dictionary<int, DataRow> rowLookupCache, CancellationToken cancellationToken, IProgress<int>? progress)
{
var listField = field.GetChildByName("list");
var itemField = listField.GetChildByName("item");
var listField = field.GetChild("list");
ParquetSchemaElement itemField;
try
{
itemField = listField.GetChildOrSingle("item"); //Not all parquet files follow the same format so we're being lax with getting the child here
}
catch (Exception ex)
{
throw new UnsupportedFieldException($"Cannot load field '{field.Path}. Invalid List type.'", ex);
}

if (itemField.Children.Any())
throw new UnsupportedFieldException($"Cannot load field '{field.Path}'. Nested list types are not supported");

int rowIndex = rowBeginIndex;

int skippedRecords = 0;
var dataColumn = await groupReader.ReadColumnAsync(itemField.DataField, cancellationToken);
var dataColumn = await groupReader.ReadColumnAsync(itemField.DataField!, cancellationToken);

ArrayList? rowValue = null;
var fieldIndex = dataTable.Columns[field.Path].Ordinal;
var fieldIndex = dataTable.Columns[field.Path]!.Ordinal;
for (int i = 0; i < dataColumn.Data.Length; i++)
{
cancellationToken.ThrowIfCancellationRequested();

rowValue ??= new ArrayList();

bool IsEndOfRow() => (i + 1) == dataColumn.RepetitionLevels.Length
bool IsEndOfRow() => (i + 1) == dataColumn.RepetitionLevels!.Length
|| dataColumn.RepetitionLevels[i + 1] == 0; //0 means new list

//Skip rows
Expand Down Expand Up @@ -206,7 +260,7 @@ public async Task<DataTable> ReadRowsAsync(List<string> selectedFields, int offs
}
else
{
if (!rowLookupCache.TryGetValue(rowIndex, out datarow))
if (!rowLookupCache.TryGetValue(rowIndex, out datarow!))
{
datarow = dataTable.Rows[rowIndex];
rowLookupCache.TryAdd(rowIndex, datarow);
Expand All @@ -216,7 +270,7 @@ public async Task<DataTable> ReadRowsAsync(List<string> selectedFields, int offs
var lastItem = dataColumn.Data.GetValue(i) ?? DBNull.Value;
rowValue.Add(lastItem);

datarow[fieldIndex] = new ListValue(rowValue, itemField.DataField.ClrType);
datarow[fieldIndex] = new ListValue(rowValue, itemField.DataField!.ClrType);
rowValue = null;

rowIndex++;
Expand All @@ -236,20 +290,20 @@ public async Task<DataTable> ReadRowsAsync(List<string> selectedFields, int offs
private static async Task ReadMapField(DataTable dataTable, ParquetRowGroupReader groupReader, int rowBeginIndex, ParquetSchemaElement field,
long skipRecords, long readRecords, bool isFirstColumn, Dictionary<int, DataRow> rowLookupCache, CancellationToken cancellationToken, IProgress<int>? progress)
{
var keyValueField = field.GetChildByName("key_value");
var keyField = keyValueField.GetChildByName("key");
var valueField = keyValueField.GetChildByName("value");
var keyValueField = field.GetChild("key_value");
var keyField = keyValueField.GetChild("key");
var valueField = keyValueField.GetChild("value");

if (keyField.Children.Any() || valueField.Children.Any())
throw new UnsupportedFieldException($"Cannot load field '{field.Path}'. Nested map types are not supported");

int rowIndex = rowBeginIndex;

int skippedRecords = 0;
var keyDataColumn = await groupReader.ReadColumnAsync(keyField.DataField, cancellationToken);
var valueDataColumn = await groupReader.ReadColumnAsync(valueField.DataField, cancellationToken);
var keyDataColumn = await groupReader.ReadColumnAsync(keyField.DataField!, cancellationToken);
var valueDataColumn = await groupReader.ReadColumnAsync(valueField.DataField!, cancellationToken);

var fieldIndex = dataTable.Columns[field.Path].Ordinal;
var fieldIndex = dataTable.Columns[field.Path]!.Ordinal;
for (int i = 0; i < valueDataColumn.Data.Length; i++)
{
cancellationToken.ThrowIfCancellationRequested();
Expand Down Expand Up @@ -286,7 +340,7 @@ public async Task<DataTable> ReadRowsAsync(List<string> selectedFields, int offs

var key = keyDataColumn.Data.GetValue(i) ?? DBNull.Value;
var value = valueDataColumn.Data.GetValue(i) ?? DBNull.Value;
datarow[fieldIndex] = new MapValue(key, keyField.DataField.ClrType, value, valueField.DataField.ClrType);
datarow[fieldIndex] = new MapValue(key, keyField.DataField!.ClrType, value, valueField.DataField!.ClrType);

rowIndex++;
progress?.Report(1);
Expand All @@ -301,17 +355,24 @@ private DataTable BuildDataTable(List<string> fields)
DataTable dataTable = new();
foreach (var field in fields)
{
var schema = ParquetSchemaTree.GetChildByName(field);
var schema = ParquetSchemaTree.GetChild(field);

DataColumn newColumn;
if (schema.SchemaElement.ConvertedType == Parquet.Meta.ConvertedType.LIST)
if (schema.SchemaElement.ConvertedType == ConvertedType.LIST)
{
newColumn = new DataColumn(field, typeof(ListValue));
}
else if (schema.SchemaElement.ConvertedType == Parquet.Meta.ConvertedType.MAP)
else if (schema.SchemaElement.ConvertedType == ConvertedType.MAP)
{
newColumn = new DataColumn(field, typeof(MapValue));
}
else if (this.FixMalformedDateTime
&& schema.SchemaElement.LogicalType?.TIMESTAMP is not null
&& schema.SchemaElement?.ConvertedType is null)
{
//Fix for malformed datetime fields (#88)
newColumn = new DataColumn(field, typeof(DateTime));
}
else
{
var clrType = schema.DataField?.ClrType ?? throw new Exception($"{field} has no data field");
Expand Down
6 changes: 4 additions & 2 deletions src/ParquetViewer.Engine/ParquetEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public partial class ParquetEngine : IDisposable

public string OpenFileOrFolderPath { get; }

public bool FixMalformedDateTime { get; set; } = true;

private ParquetSchemaElement BuildParquetSchemaTree()
{
var thriftSchema = ThriftMetadata.Schema ?? throw new Exception("No thrift metadata was found");
Expand All @@ -37,10 +39,10 @@ private ParquetSchemaElement BuildParquetSchemaTree()

foreach (var dataField in Schema.GetDataFields())
{
var field = thriftSchemaTree.GetChildByName(dataField.Path.FirstPart ?? throw new Exception($"Field has no schema path: {dataField.Name}"));
var field = thriftSchemaTree.GetChild(dataField.Path.FirstPart ?? throw new Exception($"Field has no schema path: {dataField.Name}"));
for (var i = 1; i < dataField.Path.Length; i++)
{
field = field.GetChildByName(dataField.Path[i]);
field = field.GetChild(dataField.Path[i]);
}
field.DataField = dataField; //if it doesn't have a child it's a datafield (I hope)
}
Expand Down
17 changes: 16 additions & 1 deletion src/ParquetViewer.Engine/ParquetSchemaElement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,22 @@ public ParquetSchemaElement(SchemaElement schemaElement)
this.SchemaElement = schemaElement;
}

public ParquetSchemaElement GetChildByName(string name) => _children.TryGetValue(name, out var result)
public ParquetSchemaElement GetChild(string name) => _children.TryGetValue(name, out var result)
? result : throw new Exception($"Field schema path not found: {Path}/{name}");

public ParquetSchemaElement GetChildOrSingle(string name)
{
if (_children.TryGetValue(name, out var result))
{
return result;
}

if (_children.Count == 1)
{
return _children.First().Value;
}

throw new Exception($"Field schema path not found: {Path}/{name}");
}
}
}
1 change: 1 addition & 0 deletions src/ParquetViewer.Engine/ParquetViewer.Engine.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<PlatformTarget>x64</PlatformTarget>
<Configurations>Debug;Release;Release_SelfContained</Configurations>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Parquet.Net" />
Expand Down
Binary file not shown.
9 changes: 9 additions & 0 deletions src/ParquetViewer.Tests/ParquetViewer.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,18 @@
<IsPackable>false</IsPackable>

<PlatformTarget>x64</PlatformTarget>

<Configurations>Debug;Release;Release_SelfContained</Configurations>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|AnyCPU'">
<LangVersion>default</LangVersion>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release_SelfContained|AnyCPU'">
<LangVersion>default</LangVersion>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<LangVersion>default</LangVersion>
</PropertyGroup>
Expand Down Expand Up @@ -59,6 +65,9 @@
<None Update="Data\LIST_TYPE_TEST1.parquet">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
<None Update="Data\MALFORMED_DATETIME_TEST1.parquet">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
<None Update="Data\MAP_TYPE_TEST1.parquet">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
Expand Down
Loading