Skip to content

Commit

Permalink
61 plain non nullable types (tensorflow#68)
Browse files Browse the repository at this point in the history
* added new row indexer for parquet data frame

* updated all tests and code to use DateTimeOffset

* added logical JSON type

* added new dataset handling of rows through pivoting

* Update PlainValuesReader.cs

* built more single responsibility around ParquetReader type to ensure efficient deallocation of resources using IDisposable

* updated reader to look at nulls

* added branches to set type IList as either nullable or non-nullable and done this against the required attribute on the column header

* moved BigDecimal to own file
  • Loading branch information
azurecoder authored and Ivan Gavryliuk committed Jul 5, 2017
1 parent 1e8407c commit 4fe2758
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 76 deletions.
31 changes: 31 additions & 0 deletions src/Parquet/File/Values/BigDecimal.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
using System.Numerics;

namespace Parquet.File.Values
{
public struct BigDecimal
{
public decimal Integer { get; set; }
public int Scale { get; set; }
public int Precision { get; set; }

public BigDecimal(BigInteger integer, int scale, int precision) : this()
{
Integer = (decimal) integer;
Scale = scale;
Precision = precision;
while (Scale > 0)
{
Integer /= 10;
Scale -= 1;
}
Scale = scale;
}

public static explicit operator decimal(BigDecimal bd)
{
return bd.Integer;
}

// TODO: Add to byte array for writer
}
}
72 changes: 13 additions & 59 deletions src/Parquet/File/Values/PlainValuesReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using TType = Parquet.Thrift.Type;
using System.Runtime.CompilerServices;
using System.Numerics;
using System.Reflection;

namespace Parquet.File.Values
{
Expand Down Expand Up @@ -65,7 +66,6 @@ private static void ReadPlainBoolean(byte[] data, IList destination, long maxVal
int ibit = 0;
int ibyte = 0;
byte b = data[0];
var destinationTyped = (List<bool?>)destination;

for(int ires = 0; ires < maxValues; ires++)
{
Expand All @@ -76,70 +76,64 @@ private static void ReadPlainBoolean(byte[] data, IList destination, long maxVal
}

bool set = ((b >> ibit++) & 1) == 1;
destinationTyped.Add(set);
destination.Add(set);
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static void ReadInt32(byte[] data, SchemaElement schema, IList destination)
{
if(schema.Converted_type == ConvertedType.DATE)
if (schema.Converted_type == ConvertedType.DATE)
{
List<DateTimeOffset?> destinationTyped = (List<DateTimeOffset?>)destination;
for (int i = 0; i < data.Length; i += 4)
{
int iv = BitConverter.ToInt32(data, i);
destinationTyped.Add(new DateTimeOffset(iv.FromUnixTime(), TimeSpan.Zero));
destination.Add(new DateTimeOffset(iv.FromUnixTime(), TimeSpan.Zero));
}
}
else
{
List<int?> destinationTyped = (List<int?>)destination;
for (int i = 0; i < data.Length; i += 4)
{
int iv = BitConverter.ToInt32(data, i);
destinationTyped.Add(iv);
destination.Add(iv);
}
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static void ReadFloat(byte[] data, SchemaElement schema, IList destination)
{
List<float?> destinationTyped = (List<float?>)destination;
for (int i = 0; i < data.Length; i += 4)
{
float iv = BitConverter.ToSingle(data, i);
destinationTyped.Add(iv);
destination.Add(iv);
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static void ReadLong(byte[] data, SchemaElement schema, IList destination)
{
List<long?> destinationTyped = (List<long?>)destination;
for (int i = 0; i < data.Length; i += 8)
{
long lv = BitConverter.ToInt64(data, i);
destinationTyped.Add(lv);
destination.Add(lv);
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static void ReadDouble(byte[] data, SchemaElement schema, IList destination)
{
List<double?> destinationTyped = (List<double?>)destination;
for (int i = 0; i < data.Length; i += 8)
{
double lv = BitConverter.ToDouble(data, i);
destinationTyped.Add(lv);
destination.Add(lv);
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static void ReadFixedLenByteArray(byte[] data, SchemaElement schema, IList destination)
{
List<decimal?> destinationTyped = (List<decimal?>) destination;
for (int i = 0; i < data.Length; i += schema.Type_length)
{
if (schema.Converted_type != ConvertedType.DECIMAL) continue;
Expand All @@ -149,21 +143,15 @@ private static void ReadFixedLenByteArray(byte[] data, SchemaElement schema, ILi
var bigInt = new BigDecimal(new BigInteger(dataNew.Reverse().ToArray()), schema.Scale, schema.Precision);

decimal dc = (decimal) bigInt;
destinationTyped.Add(dc);
destination.Add(dc);
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static void ReadInt96(byte[] data, SchemaElement schema, IList destination)
{
#if !SPARK_TYPES
List<BigInteger?> destinationTyped = (List<BigInteger?>)destination;
#else
List<DateTimeOffset?> destinationTyped = (List<DateTimeOffset?>)destination;
#endif

//todo: this is a sample how to read int96, not tested this yet
// todo: need to work this out because Spark is not encoding per spec - working with the Spark encoding instead

#if !SPARK_TYPES
//var r96 = new List<BigInteger>(data.Length / 12);
#else
Expand All @@ -188,7 +176,7 @@ private static void ReadInt96(byte[] data, SchemaElement schema, IList destinati
double millis = (double) nanosToInt64 / 1000000D;
bi = bi.AddMilliseconds(millis);
#endif
destinationTyped.Add(new DateTimeOffset(bi));
destination.Add(new DateTimeOffset(bi));

}
}
Expand All @@ -203,62 +191,28 @@ private void ReadByteArray(byte[] data, SchemaElement schemaElement, IList desti
schemaElement.Converted_type == ConvertedType.UTF8 || schemaElement.Converted_type == ConvertedType.JSON ||
_options.TreatByteArrayAsString)
{
List<string> destinationTyped = (List<string>)destination;
for (int i = 0; i < data.Length;)
{
int length = BitConverter.ToInt32(data, i);
i += 4; //fast-forward to data
string s = UTF8.GetString(data, i, length);
i += length; //fast-forward to the next element
destinationTyped.Add(s);
destination.Add(s);
}
}
else
{
List<byte[]> destinationTyped = (List<byte[]>)destination;
for (int i = 0; i < data.Length;)
{
int length = BitConverter.ToInt32(data, i);
i += 4; //fast-forward to data
byte[] ar = new byte[length];
Array.Copy(data, i, ar, 0, length);
i += length; //fast-forward to the next element
destinationTyped.Add(ar);
destination.Add(ar);
}
}
}

}


struct BigDecimal
{
public decimal Integer { get; set; }
public int Scale { get; set; }
public int Precision { get; set; }

public BigDecimal(BigInteger integer, int scale, int precision) : this()
{
Integer = (decimal) integer;
Scale = scale;
Precision = precision;
while (Scale > 0)
{
Integer /= 10;
Scale -= 1;
}
Scale = scale;
}

public static explicit operator decimal(BigDecimal bd)
{
return bd.Integer;
}

// TODO: Add to byte array for writer




}
}
79 changes: 62 additions & 17 deletions src/Parquet/ParquetColumn.cs
Original file line number Diff line number Diff line change
Expand Up @@ -189,28 +189,58 @@ internal static IList CreateValuesList(SchemaElement schema, out Type systemType
switch(schema.Type)
{
case TType.BOOLEAN:
systemType = typeof(bool?);
return new List<bool?>();
if (schema.Repetition_type == FieldRepetitionType.OPTIONAL)
{
systemType = typeof(bool?);
return new List<bool?>();
}
systemType = typeof(bool);
return new List<bool>();
case TType.INT32:
if(schema.Converted_type == ConvertedType.DATE)
{
systemType = typeof(DateTimeOffset?);
return new List<DateTimeOffset?>();
if (schema.Repetition_type == FieldRepetitionType.OPTIONAL)
{
systemType = typeof(DateTimeOffset?);
return new List<DateTimeOffset?>();
}
systemType = typeof(DateTimeOffset);
return new List<DateTimeOffset>();
}
else
{
systemType = typeof(int?);
return new List<int?>();
if (schema.Repetition_type == FieldRepetitionType.OPTIONAL)
{
systemType = typeof(int?);
return new List<int?>();
}
systemType = typeof(int);
return new List<int>();
}
case TType.FLOAT:
systemType = typeof(float?);
return new List<float?>();
if (schema.Repetition_type == FieldRepetitionType.OPTIONAL)
{
systemType = typeof(float?);
return new List<float?>();
}
systemType = typeof(float);
return new List<float>();
case TType.INT64:
systemType = typeof(long?);
return new List<long?>();
if (schema.Repetition_type == FieldRepetitionType.OPTIONAL)
{
systemType = typeof(long?);
return new List<long?>();
}
systemType = typeof(long);
return new List<long>();
case TType.DOUBLE:
systemType = typeof(double?);
return new List<double?>();
if (schema.Repetition_type == FieldRepetitionType.OPTIONAL)
{
systemType = typeof(double?);
return new List<double?>();
}
systemType = typeof(double);
return new List<double>();
case TType.INT96:
#if !SPARK_TYPES
systemType = typeof(DateTimeOffset?);
Expand All @@ -227,20 +257,35 @@ internal static IList CreateValuesList(SchemaElement schema, out Type systemType
}
else
{
systemType = typeof(bool?);
return new List<bool?>();
if (schema.Repetition_type == FieldRepetitionType.OPTIONAL)
{
systemType = typeof(bool?);
return new List<bool?>();
}
systemType = typeof(bool);
return new List<bool>();
}
case TType.FIXED_LEN_BYTE_ARRAY:
// TODO: Converted type should work differently shouldn't inline in this way
if (schema.Converted_type == ConvertedType.DECIMAL)
{
if (schema.Repetition_type == FieldRepetitionType.OPTIONAL)
{
systemType = typeof(decimal?);
return new List<decimal?>();
}
systemType = typeof(decimal);
return new List<decimal?>();
return new List<decimal>();
}
else
{
systemType = typeof(byte?[]);
return new List<byte?[]>();
if (schema.Repetition_type == FieldRepetitionType.OPTIONAL)
{
systemType = typeof(byte?[]);
return new List<byte?[]>();
}
systemType = typeof(byte[]);
return new List<byte[]>();
}
default:
throw new NotImplementedException($"type {schema.Type} not implemented");
Expand Down

0 comments on commit 4fe2758

Please sign in to comment.