Skip to content

Commit

Permalink
Rebase on upstream (2022-11-15) (#1)
Browse files Browse the repository at this point in the history
* Serialize time.Time as a timestamp (segmentio#321)

Serialize time.Time values as Parquet timestamps. The default unit is NANOS and can be changed using the timestamp() struct tag.

type timeColumn struct {
	t1 time.Time
	t2 time.Time `parquet:",timestamp(millisecond)"`
}

* Read decimal column (segmentio#406)

Reading a parquet file with a decimal column isn't loaded with logical
type information. This behavior was not implemented. `decimalType` is
more complex from the other types because a parquet decimal can be
backed by multiple different physical types.

This PR loads logical type information for `DECIMAL` fields.

Closes segmentio#365

* Update decimal string format (segmentio#407)

parquet-cli prints the string format of decimals as
`DECIMAL(precision,scale)`. Update parquet-go's string format to match.

* Fix panic when reading file with no row groups (segmentio#408)

* Change go tag from `parquet` to `segmentio`

Co-authored-by: Larry Marburger <larry.marburger@segment.com>
Co-authored-by: Todd Wang <toddw@neeva.co>
  • Loading branch information
3 people committed Nov 15, 2022
1 parent 082e785 commit ddd7cd1
Show file tree
Hide file tree
Showing 19 changed files with 336 additions and 16 deletions.
4 changes: 4 additions & 0 deletions buffer_go18_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ func TestGenericBuffer(t *testing.T) {
testGenericBuffer[stringColumn](t)
testGenericBuffer[indexedStringColumn](t)
testGenericBuffer[uuidColumn](t)
testGenericBuffer[timeColumn](t)
testGenericBuffer[timeInMillisColumn](t)
testGenericBuffer[mapColumn](t)
testGenericBuffer[decimalColumn](t)
testGenericBuffer[addressBook](t)
Expand Down Expand Up @@ -108,6 +110,8 @@ func BenchmarkGenericBuffer(b *testing.B) {
benchmarkGenericBuffer[stringColumn](b)
benchmarkGenericBuffer[indexedStringColumn](b)
benchmarkGenericBuffer[uuidColumn](b)
benchmarkGenericBuffer[timeColumn](b)
benchmarkGenericBuffer[timeInMillisColumn](b)
benchmarkGenericBuffer[mapColumn](b)
benchmarkGenericBuffer[decimalColumn](b)
benchmarkGenericBuffer[contact](b)
Expand Down
23 changes: 21 additions & 2 deletions column.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,27 @@ func schemaElementTypeOf(s *format.SchemaElement) Type {
case lt.Enum != nil:
return (*enumType)(lt.Enum)
case lt.Decimal != nil:
// TODO:
// return (*decimalType)(lt.Decimal)
// A parquet decimal can be one of several different physical types.
if t := s.Type; t != nil {
var typ Type
switch kind := Kind(*s.Type); kind {
case Int32:
typ = Int32Type
case Int64:
typ = Int64Type
case FixedLenByteArray:
if s.TypeLength == nil {
panic("DECIMAL using FIXED_LEN_BYTE_ARRAY must specify a length")
}
typ = FixedLenByteArrayType(int(*s.TypeLength))
default:
panic("DECIMAL must be of type INT32, INT64, or FIXED_LEN_BYTE_ARRAY but got " + kind.String())
}
return &decimalType{
decimal: *lt.Decimal,
Type: typ,
}
}
case lt.Date != nil:
return (*dateType)(lt.Date)
case lt.Time != nil:
Expand Down
43 changes: 43 additions & 0 deletions column_buffer_go18.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package parquet
import (
"math/bits"
"reflect"
"time"
"unsafe"

"github.com/segmentio/parquet-go/deprecated"
Expand All @@ -30,6 +31,8 @@ func writeRowsFuncOf(t reflect.Type, schema *Schema, path columnPath) writeRowsF
switch t {
case reflect.TypeOf(deprecated.Int96{}):
return writeRowsFuncOfRequired(t, schema, path)
case reflect.TypeOf(time.Time{}):
return writeRowsFuncOfTime(t, schema, path)
}

switch t.Kind() {
Expand Down Expand Up @@ -393,3 +396,43 @@ func writeRowsFuncOfMap(t reflect.Type, schema *Schema, path columnPath) writeRo
return nil
}
}

func writeRowsFuncOfTime(_ reflect.Type, schema *Schema, path columnPath) writeRowsFunc {
t := reflect.TypeOf(int64(0))
elemSize := uintptr(t.Size())
writeRows := writeRowsFuncOf(t, schema, path)

col, _ := schema.Lookup(path...)
unit := Nanosecond.TimeUnit()
lt := col.Node.Type().LogicalType()
if lt != nil && lt.Timestamp != nil {
unit = lt.Timestamp.Unit
}

return func(columns []ColumnBuffer, rows sparse.Array, levels columnLevels) error {
if rows.Len() == 0 {
return writeRows(columns, rows, levels)
}

times := rows.TimeArray()
for i := 0; i < times.Len(); i++ {
t := times.Index(i)
var val int64
switch {
case unit.Millis != nil:
val = t.UnixMilli()
case unit.Micros != nil:
val = t.UnixMicro()
default:
val = t.UnixNano()
}

a := makeArray(unsafecast.PointerOfValue(reflect.ValueOf(val)), 1, elemSize)
if err := writeRows(columns, a, levels); err != nil {
return err
}
}

return nil
}
}
89 changes: 89 additions & 0 deletions convert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package parquet_test
import (
"reflect"
"testing"
"time"

"github.com/segmentio/parquet-go"
)
Expand Down Expand Up @@ -309,3 +310,91 @@ func TestConvert(t *testing.T) {

func newInt64(i int64) *int64 { return &i }
func newString(s string) *string { return &s }

func TestConvertTimestamp(t *testing.T) {
now := time.Unix(42, 0)
ms := now.UnixMilli()
us := now.UnixMicro()
ns := now.UnixNano()

msType := parquet.Timestamp(parquet.Millisecond).Type()
msVal := parquet.ValueOf(ms)
if msVal.Int64() != ms {
t.Errorf("converted value mismatch:\nwant = %+v\ngot = %+v", ms, msVal.Int64())
}

usType := parquet.Timestamp(parquet.Microsecond).Type()
usVal := parquet.ValueOf(us)
if usVal.Int64() != us {
t.Errorf("converted value mismatch:\nwant = %+v\ngot = %+v", us, usVal.Int64())
}

nsType := parquet.Timestamp(parquet.Nanosecond).Type()
nsVal := parquet.ValueOf(ns)
if nsVal.Int64() != ns {
t.Errorf("converted value mismatch:\nwant = %+v\ngot = %+v", ns, nsVal.Int64())
}

var timestampConversionTests = [...]struct {
scenario string
fromType parquet.Type
fromValue parquet.Value
toType parquet.Type
expected int64
}{
{
scenario: "micros to nanos",
fromType: usType,
fromValue: usVal,
toType: nsType,
expected: ns,
},
{
scenario: "millis to nanos",
fromType: msType,
fromValue: msVal,
toType: nsType,
expected: ns,
},
{
scenario: "nanos to micros",
fromType: nsType,
fromValue: nsVal,
toType: usType,
expected: us,
},
{
scenario: "nanos to nanos",
fromType: nsType,
fromValue: nsVal,
toType: nsType,
expected: ns,
},
{
scenario: "int64 to nanos",
fromType: parquet.Int64Type,
fromValue: nsVal,
toType: nsType,
expected: ns,
},
{
scenario: "int64 to int64",
fromType: parquet.Int64Type,
fromValue: nsVal,
toType: parquet.Int64Type,
expected: ns,
},
}

for _, test := range timestampConversionTests {
t.Run(test.scenario, func(t *testing.T) {
a, err := test.toType.ConvertValue(test.fromValue, test.fromType)
if err != nil {
t.Fatal(err)
}
if a.Int64() != test.expected {
t.Errorf("converted value mismatch:\nwant = %+v\ngot = %+v", test.expected, a.Int64())
}
})
}
}
4 changes: 4 additions & 0 deletions file.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ func OpenFile(r io.ReaderAt, size int64, options ...FileOption) (*File, error) {
// this case the page index is not cached within the file, programs are expected
// to make use of independently from the parquet package.
func (f *File) ReadPageIndex() ([]format.ColumnIndex, []format.OffsetIndex, error) {
if len(f.metadata.RowGroups) == 0 {
return nil, nil, nil
}

columnIndexOffset := f.metadata.RowGroups[0].Columns[0].ColumnIndexOffset
offsetIndexOffset := f.metadata.RowGroups[0].Columns[0].OffsetIndexOffset
columnIndexLength := int64(0)
Expand Down
3 changes: 2 additions & 1 deletion format/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ type DecimalType struct {
}

func (t *DecimalType) String() string {
return fmt.Sprintf("DECIMAL(%d,%d)", t.Scale, t.Precision)
// Matching parquet-cli's decimal string format: https://github.com/apache/parquet-mr/blob/d057b39d93014fe40f5067ee4a33621e65c91552/parquet-column/src/test/java/org/apache/parquet/parser/TestParquetParser.java#L249-L265
return fmt.Sprintf("DECIMAL(%d,%d)", t.Precision, t.Scale)
}

// Time units for logical types.
Expand Down
13 changes: 13 additions & 0 deletions internal/quick/quick.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"math/rand"
"reflect"
"strings"
"time"
)

var DefaultConfig = Config{
Expand Down Expand Up @@ -70,6 +71,18 @@ func (c *Config) Check(f interface{}) error {
type MakeValueFunc func(reflect.Value, *rand.Rand)

func MakeValueFuncOf(t reflect.Type) MakeValueFunc {
switch t {
case reflect.TypeOf(time.Time{}):
return func(v reflect.Value, r *rand.Rand) {
// TODO: This is a hack to support the matching of times in a precision
// other than nanosecond by generating times rounded to the second. A
// better solution would be to update columns types to add a compare
// function.
sec := r.Int63n(2524608000) // 2050-01-01
v.Set(reflect.ValueOf(time.Unix(sec, 0).UTC()))
}
}

switch t.Kind() {
case reflect.Bool:
return func(v reflect.Value, r *rand.Rand) {
Expand Down
18 changes: 18 additions & 0 deletions parquet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,24 @@ func (row uuidColumn) generate(prng *rand.Rand) uuidColumn {
return row
}

type timeColumn struct {
Value time.Time
}

func (row timeColumn) generate(prng *rand.Rand) timeColumn {
t := time.Unix(0, prng.Int63()).UTC()
return timeColumn{Value: t}
}

type timeInMillisColumn struct {
Value time.Time `parquet:",timestamp(millisecond)"`
}

func (row timeInMillisColumn) generate(prng *rand.Rand) timeInMillisColumn {
t := time.Unix(0, prng.Int63()).UTC()
return timeInMillisColumn{Value: t}
}

type decimalColumn struct {
Value int64 `parquet:",decimal(0:3)"`
}
Expand Down
4 changes: 2 additions & 2 deletions print_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,14 @@ func TestPrintSchema(t *testing.T) {
{
node: parquet.Group{"cost": parquet.Decimal(0, 9, parquet.Int32Type)},
print: `message Test {
required int32 cost (DECIMAL(0,9));
required int32 cost (DECIMAL(9,0));
}`,
},

{
node: parquet.Group{"cost": parquet.Decimal(0, 18, parquet.Int64Type)},
print: `message Test {
required int64 cost (DECIMAL(0,18));
required int64 cost (DECIMAL(18,0));
}`,
},

Expand Down
4 changes: 4 additions & 0 deletions reader_go18_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ func TestGenericReader(t *testing.T) {
testGenericReader[stringColumn](t)
testGenericReader[indexedStringColumn](t)
testGenericReader[uuidColumn](t)
testGenericReader[timeColumn](t)
testGenericReader[timeInMillisColumn](t)
testGenericReader[mapColumn](t)
testGenericReader[decimalColumn](t)
testGenericReader[addressBook](t)
Expand Down Expand Up @@ -98,6 +100,8 @@ func BenchmarkGenericReader(b *testing.B) {
benchmarkGenericReader[stringColumn](b)
benchmarkGenericReader[indexedStringColumn](b)
benchmarkGenericReader[uuidColumn](b)
benchmarkGenericReader[timeColumn](b)
benchmarkGenericReader[timeInMillisColumn](b)
benchmarkGenericReader[mapColumn](b)
benchmarkGenericReader[decimalColumn](b)
benchmarkGenericReader[contact](b)
Expand Down
10 changes: 10 additions & 0 deletions reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,16 @@ var readerTests = []struct {
model: uuidColumn{},
},

{
scenario: "time.Time",
model: timeColumn{},
},

{
scenario: "time.Time in ms",
model: timeInMillisColumn{},
},

{
scenario: "DECIMAL",
model: decimalColumn{},
Expand Down
6 changes: 4 additions & 2 deletions row.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,13 +499,15 @@ func deconstructFuncOfLeaf(columnIndex int16, node Node) (int16, deconstructFunc
if columnIndex > MaxColumnIndex {
panic("row cannot be deconstructed because it has more than 127 columns")
}
kind := node.Type().Kind()
typ := node.Type()
kind := typ.Kind()
lt := typ.LogicalType()
valueColumnIndex := ^columnIndex
return columnIndex + 1, func(row Row, levels levels, value reflect.Value) Row {
v := Value{}

if value.IsValid() {
v = makeValue(kind, value)
v = makeValue(kind, lt, value)
}

v.repetitionLevel = levels.repetitionLevel
Expand Down
14 changes: 13 additions & 1 deletion schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strconv"
"strings"
"sync"
"time"

"github.com/google/uuid"
"github.com/segmentio/parquet-go/compress"
Expand Down Expand Up @@ -496,6 +497,8 @@ func nodeOf(t reflect.Type, tag []string) Node {
return Leaf(Int96Type)
case reflect.TypeOf(uuid.UUID{}):
return UUID()
case reflect.TypeOf(time.Time{}):
return Timestamp(Nanosecond)
}

var n Node
Expand Down Expand Up @@ -831,7 +834,16 @@ func makeNodeOf(t reflect.Type, name string, tag []string) Node {
}
setNode(Timestamp(timeUnit))
default:
throwInvalidTag(t, name, option)
switch t {
case reflect.TypeOf(time.Time{}):
timeUnit, err := parseTimestampArgs(args)
if err != nil {
throwInvalidTag(t, name, option)
}
setNode(Timestamp(timeUnit))
default:
throwInvalidTag(t, name, option)
}
}
default:
throwUnknownTag(t, name, option)
Expand Down
Loading

0 comments on commit ddd7cd1

Please sign in to comment.