From 4fe27585789dbf575bfb9997365d4f3ad457a03d Mon Sep 17 00:00:00 2001 From: Richard Conway Date: Wed, 5 Jul 2017 07:34:47 +0100 Subject: [PATCH] 61 plain non nullable types (#68) * added new row indexer for parquet data frame * updated all tests and code to use DateTimeOffset * added logical JSON type * added new dataset handling of rows through pivoting * Update PlainValuesReader.cs * built more single responsibility around ParquetReader type to ensure efficient deallocation of resources using IDisposable * updated reader to look at nulls * added branches to set type IList as either nullable or non-nullable and done this against the required attribute on the column header * moved BigDecimal to own file --- src/Parquet/File/Values/BigDecimal.cs | 31 ++++++++ src/Parquet/File/Values/PlainValuesReader.cs | 72 ++++-------------- src/Parquet/ParquetColumn.cs | 79 +++++++++++++++----- 3 files changed, 106 insertions(+), 76 deletions(-) create mode 100644 src/Parquet/File/Values/BigDecimal.cs diff --git a/src/Parquet/File/Values/BigDecimal.cs b/src/Parquet/File/Values/BigDecimal.cs new file mode 100644 index 0000000000..c10ff0a4b6 --- /dev/null +++ b/src/Parquet/File/Values/BigDecimal.cs @@ -0,0 +1,31 @@ +using System.Numerics; + +namespace Parquet.File.Values +{ + public struct BigDecimal + { + public decimal Integer { get; set; } + public int Scale { get; set; } + public int Precision { get; set; } + + public BigDecimal(BigInteger integer, int scale, int precision) : this() + { + Integer = (decimal) integer; + Scale = scale; + Precision = precision; + while (Scale > 0) + { + Integer /= 10; + Scale -= 1; + } + Scale = scale; + } + + public static explicit operator decimal(BigDecimal bd) + { + return bd.Integer; + } + + // TODO: Add to byte array for writer + } +} \ No newline at end of file diff --git a/src/Parquet/File/Values/PlainValuesReader.cs b/src/Parquet/File/Values/PlainValuesReader.cs index 93bd973559..bc5fe9e9b1 100644 --- a/src/Parquet/File/Values/PlainValuesReader.cs +++ b/src/Parquet/File/Values/PlainValuesReader.cs @@ -9,6 +9,7 @@ using TType = Parquet.Thrift.Type; using System.Runtime.CompilerServices; using System.Numerics; +using System.Reflection; namespace Parquet.File.Values { @@ -65,7 +66,6 @@ private static void ReadPlainBoolean(byte[] data, IList destination, long maxVal int ibit = 0; int ibyte = 0; byte b = data[0]; - var destinationTyped = (List)destination; for(int ires = 0; ires < maxValues; ires++) { @@ -76,29 +76,27 @@ private static void ReadPlainBoolean(byte[] data, IList destination, long maxVal } bool set = ((b >> ibit++) & 1) == 1; - destinationTyped.Add(set); + destination.Add(set); } } [MethodImpl(MethodImplOptions.AggressiveInlining)] private static void ReadInt32(byte[] data, SchemaElement schema, IList destination) { - if(schema.Converted_type == ConvertedType.DATE) + if (schema.Converted_type == ConvertedType.DATE) { - List destinationTyped = (List)destination; for (int i = 0; i < data.Length; i += 4) { int iv = BitConverter.ToInt32(data, i); - destinationTyped.Add(new DateTimeOffset(iv.FromUnixTime(), TimeSpan.Zero)); + destination.Add(new DateTimeOffset(iv.FromUnixTime(), TimeSpan.Zero)); } } else { - List destinationTyped = (List)destination; for (int i = 0; i < data.Length; i += 4) { int iv = BitConverter.ToInt32(data, i); - destinationTyped.Add(iv); + destination.Add(iv); } } } @@ -106,40 +104,36 @@ private static void ReadInt32(byte[] data, SchemaElement schema, IList destinati [MethodImpl(MethodImplOptions.AggressiveInlining)] private static void ReadFloat(byte[] data, SchemaElement schema, IList destination) { - List destinationTyped = (List)destination; for (int i = 0; i < data.Length; i += 4) { float iv = BitConverter.ToSingle(data, i); - destinationTyped.Add(iv); + destination.Add(iv); } } [MethodImpl(MethodImplOptions.AggressiveInlining)] private static void ReadLong(byte[] data, SchemaElement schema, IList destination) { - List destinationTyped = (List)destination; for (int i = 0; i < data.Length; i += 8) { long lv = BitConverter.ToInt64(data, i); - destinationTyped.Add(lv); + destination.Add(lv); } } [MethodImpl(MethodImplOptions.AggressiveInlining)] private static void ReadDouble(byte[] data, SchemaElement schema, IList destination) { - List destinationTyped = (List)destination; for (int i = 0; i < data.Length; i += 8) { double lv = BitConverter.ToDouble(data, i); - destinationTyped.Add(lv); + destination.Add(lv); } } [MethodImpl(MethodImplOptions.AggressiveInlining)] private static void ReadFixedLenByteArray(byte[] data, SchemaElement schema, IList destination) { - List destinationTyped = (List) destination; for (int i = 0; i < data.Length; i += schema.Type_length) { if (schema.Converted_type != ConvertedType.DECIMAL) continue; @@ -149,21 +143,15 @@ private static void ReadFixedLenByteArray(byte[] data, SchemaElement schema, ILi var bigInt = new BigDecimal(new BigInteger(dataNew.Reverse().ToArray()), schema.Scale, schema.Precision); decimal dc = (decimal) bigInt; - destinationTyped.Add(dc); + destination.Add(dc); } } [MethodImpl(MethodImplOptions.AggressiveInlining)] private static void ReadInt96(byte[] data, SchemaElement schema, IList destination) { -#if !SPARK_TYPES - List destinationTyped = (List)destination; -#else - List destinationTyped = (List)destination; -#endif - //todo: this is a sample how to read int96, not tested this yet - // todo: need to work this out because Spark is not encoding per spec - working with the Spark encoding instead + #if !SPARK_TYPES //var r96 = new List(data.Length / 12); #else @@ -188,7 +176,7 @@ private static void ReadInt96(byte[] data, SchemaElement schema, IList destinati double millis = (double) nanosToInt64 / 1000000D; bi = bi.AddMilliseconds(millis); #endif - destinationTyped.Add(new DateTimeOffset(bi)); + destination.Add(new DateTimeOffset(bi)); } } @@ -203,19 +191,17 @@ private void ReadByteArray(byte[] data, SchemaElement schemaElement, IList desti schemaElement.Converted_type == ConvertedType.UTF8 || schemaElement.Converted_type == ConvertedType.JSON || _options.TreatByteArrayAsString) { - List destinationTyped = (List)destination; for (int i = 0; i < data.Length;) { int length = BitConverter.ToInt32(data, i); i += 4; //fast-forward to data string s = UTF8.GetString(data, i, length); i += length; //fast-forward to the next element - destinationTyped.Add(s); + destination.Add(s); } } else { - List destinationTyped = (List)destination; for (int i = 0; i < data.Length;) { int length = BitConverter.ToInt32(data, i); @@ -223,42 +209,10 @@ private void ReadByteArray(byte[] data, SchemaElement schemaElement, IList desti byte[] ar = new byte[length]; Array.Copy(data, i, ar, 0, length); i += length; //fast-forward to the next element - destinationTyped.Add(ar); + destination.Add(ar); } } } - } - - - struct BigDecimal - { - public decimal Integer { get; set; } - public int Scale { get; set; } - public int Precision { get; set; } - - public BigDecimal(BigInteger integer, int scale, int precision) : this() - { - Integer = (decimal) integer; - Scale = scale; - Precision = precision; - while (Scale > 0) - { - Integer /= 10; - Scale -= 1; - } - Scale = scale; - } - - public static explicit operator decimal(BigDecimal bd) - { - return bd.Integer; - } - - // TODO: Add to byte array for writer - - - - } } diff --git a/src/Parquet/ParquetColumn.cs b/src/Parquet/ParquetColumn.cs index 95bc92bc89..757d9d3e6d 100644 --- a/src/Parquet/ParquetColumn.cs +++ b/src/Parquet/ParquetColumn.cs @@ -189,28 +189,58 @@ internal static IList CreateValuesList(SchemaElement schema, out Type systemType switch(schema.Type) { case TType.BOOLEAN: - systemType = typeof(bool?); - return new List(); + if (schema.Repetition_type == FieldRepetitionType.OPTIONAL) + { + systemType = typeof(bool?); + return new List(); + } + systemType = typeof(bool); + return new List(); case TType.INT32: if(schema.Converted_type == ConvertedType.DATE) { - systemType = typeof(DateTimeOffset?); - return new List(); + if (schema.Repetition_type == FieldRepetitionType.OPTIONAL) + { + systemType = typeof(DateTimeOffset?); + return new List(); + } + systemType = typeof(DateTimeOffset); + return new List(); } else { - systemType = typeof(int?); - return new List(); + if (schema.Repetition_type == FieldRepetitionType.OPTIONAL) + { + systemType = typeof(int?); + return new List(); + } + systemType = typeof(int); + return new List(); } case TType.FLOAT: - systemType = typeof(float?); - return new List(); + if (schema.Repetition_type == FieldRepetitionType.OPTIONAL) + { + systemType = typeof(float?); + return new List(); + } + systemType = typeof(float); + return new List(); case TType.INT64: - systemType = typeof(long?); - return new List(); + if (schema.Repetition_type == FieldRepetitionType.OPTIONAL) + { + systemType = typeof(long?); + return new List(); + } + systemType = typeof(long); + return new List(); case TType.DOUBLE: - systemType = typeof(double?); - return new List(); + if (schema.Repetition_type == FieldRepetitionType.OPTIONAL) + { + systemType = typeof(double?); + return new List(); + } + systemType = typeof(double); + return new List(); case TType.INT96: #if !SPARK_TYPES systemType = typeof(DateTimeOffset?); @@ -227,20 +257,35 @@ internal static IList CreateValuesList(SchemaElement schema, out Type systemType } else { - systemType = typeof(bool?); - return new List(); + if (schema.Repetition_type == FieldRepetitionType.OPTIONAL) + { + systemType = typeof(bool?); + return new List(); + } + systemType = typeof(bool); + return new List(); } case TType.FIXED_LEN_BYTE_ARRAY: // TODO: Converted type should work differently shouldn't inline in this way if (schema.Converted_type == ConvertedType.DECIMAL) { + if (schema.Repetition_type == FieldRepetitionType.OPTIONAL) + { + systemType = typeof(decimal?); + return new List(); + } systemType = typeof(decimal); - return new List(); + return new List(); } else { - systemType = typeof(byte?[]); - return new List(); + if (schema.Repetition_type == FieldRepetitionType.OPTIONAL) + { + systemType = typeof(byte?[]); + return new List(); + } + systemType = typeof(byte[]); + return new List(); } default: throw new NotImplementedException($"type {schema.Type} not implemented");