Skip to content

Commit

Permalink
apacheGH-33466: [Go][Parquet] Add support for Dictionary arrays to pq…
Browse files Browse the repository at this point in the history
…arrow
  • Loading branch information
zeroshade committed Feb 24, 2023
1 parent 1c483e4 commit d8a91d0
Show file tree
Hide file tree
Showing 33 changed files with 2,534 additions and 105 deletions.
3 changes: 3 additions & 0 deletions go/arrow/array/binarybuilder.go
Expand Up @@ -220,6 +220,9 @@ func (b *BinaryBuilder) ReserveData(n int) {
// additional memory will be allocated. If n is smaller, the allocated memory may be reduced.
func (b *BinaryBuilder) Resize(n int) {
b.offsets.resize((n + 1) * b.offsetByteWidth)
if (n * b.offsetByteWidth) < b.offsets.Len() {
b.offsets.SetLength(n * b.offsetByteWidth)
}
b.builder.resize(n, b.init)
}

Expand Down
10 changes: 10 additions & 0 deletions go/arrow/array/bufferbuilder.go
Expand Up @@ -32,6 +32,7 @@ type bufBuilder interface {
Bytes() []byte
resize(int)
Advance(int)
SetLength(int)
Append([]byte)
Reset()
Finish() *memory.Buffer
Expand Down Expand Up @@ -96,6 +97,15 @@ func (b *bufferBuilder) resize(elements int) {
}
}

func (b *bufferBuilder) SetLength(length int) {
if length > b.length {
b.Advance(length)
return
}

b.length = length
}

// Advance increases the buffer by length and initializes the skipped bytes to zero.
func (b *bufferBuilder) Advance(length int) {
if b.capacity < b.length+length {
Expand Down
64 changes: 60 additions & 4 deletions go/arrow/array/dictionary.go
Expand Up @@ -213,13 +213,15 @@ func (d *Dictionary) Release() {
func (d *Dictionary) setData(data *Data) {
d.array.setData(data)

dictType := data.dtype.(*arrow.DictionaryType)
if data.dictionary == nil {
panic("arrow/array: no dictionary set in Data for Dictionary array")
if data.length > 0 {
panic("arrow/array: no dictionary set in Data for Dictionary array")
}
} else {
debug.Assert(arrow.TypeEqual(dictType.ValueType, data.dictionary.DataType()), "mismatched dictionary value types")
}

dictType := data.dtype.(*arrow.DictionaryType)
debug.Assert(arrow.TypeEqual(dictType.ValueType, data.dictionary.DataType()), "mismatched dictionary value types")

indexData := NewData(dictType.IndexType, data.length, data.buffers, data.childData, data.nulls, data.offset)
defer indexData.Release()
d.indices = MakeFromData(indexData)
Expand Down Expand Up @@ -883,6 +885,60 @@ func (b *dictionaryBuilder) AppendArray(arr arrow.Array) error {
return nil
}

func (b *dictionaryBuilder) AppendIndices(indices []int, valid []bool) {
b.length += len(indices)
switch idxbldr := b.idxBuilder.Builder.(type) {
case *Int8Builder:
vals := make([]int8, len(indices))
for i, v := range indices {
vals[i] = int8(v)
}
idxbldr.AppendValues(vals, valid)
case *Int16Builder:
vals := make([]int16, len(indices))
for i, v := range indices {
vals[i] = int16(v)
}
idxbldr.AppendValues(vals, valid)
case *Int32Builder:
vals := make([]int32, len(indices))
for i, v := range indices {
vals[i] = int32(v)
}
idxbldr.AppendValues(vals, valid)
case *Int64Builder:
vals := make([]int64, len(indices))
for i, v := range indices {
vals[i] = int64(v)
}
idxbldr.AppendValues(vals, valid)
case *Uint8Builder:
vals := make([]uint8, len(indices))
for i, v := range indices {
vals[i] = uint8(v)
}
idxbldr.AppendValues(vals, valid)
case *Uint16Builder:
vals := make([]uint16, len(indices))
for i, v := range indices {
vals[i] = uint16(v)
}
idxbldr.AppendValues(vals, valid)
case *Uint32Builder:
vals := make([]uint32, len(indices))
for i, v := range indices {
vals[i] = uint32(v)
}
idxbldr.AppendValues(vals, valid)
case *Uint64Builder:
vals := make([]uint64, len(indices))
for i, v := range indices {
vals[i] = uint64(v)
}
idxbldr.AppendValues(vals, valid)
}
}

type NullDictionaryBuilder struct {
dictionaryBuilder
}
Expand Down
43 changes: 43 additions & 0 deletions go/arrow/array/table.go
Expand Up @@ -128,6 +128,49 @@ func NewTable(schema *arrow.Schema, cols []arrow.Column, rows int64) *simpleTabl
return &tbl
}

// NewTableFromSlice is a convenience function to create a table from a slice
// of slices of arrow.Array.
//
// Like other NewTable functions this can panic if:
// - len(schema.Fields) != len(data)
// - the total length of each column's array slice (ie: number of rows
// in the column) aren't the same for all columns.
func NewTableFromSlice(schema *arrow.Schema, data [][]arrow.Array) *simpleTable {
if len(data) != len(schema.Fields()) {
panic("array/table: mismatch in number of columns and data for creating a table")
}

cols := make([]arrow.Column, len(schema.Fields()))
for i, arrs := range data {
field := schema.Field(i)
chunked := arrow.NewChunked(field.Type, arrs)
cols[i] = *arrow.NewColumn(field, chunked)
chunked.Release()
}

tbl := simpleTable{
refCount: 1,
schema: schema,
cols: cols,
rows: int64(cols[0].Len()),
}

defer func() {
if r := recover(); r != nil {
// if validate panics, let's release the columns
// so that we don't leak them, then propagate the panic
for _, c := range cols {
c.Release()
}
panic(r)
}
}()
// validate the table and its constituents.
tbl.validate()

return &tbl
}

// NewTableFromRecords returns a new basic, non-lazy in-memory table.
//
// NewTableFromRecords panics if the records and schema are inconsistent.
Expand Down
7 changes: 6 additions & 1 deletion go/arrow/compute/internal/kernels/vector_hash.go
Expand Up @@ -459,12 +459,17 @@ func hashExec(ctx *exec.KernelCtx, batch *exec.ExecSpan, out *exec.ExecResult) e
return impl.Flush(out)
}

func uniqueFinalize(ctx *exec.KernelCtx, _ []*exec.ArraySpan) ([]*exec.ArraySpan, error) {
func uniqueFinalize(ctx *exec.KernelCtx, results []*exec.ArraySpan) ([]*exec.ArraySpan, error) {
impl, ok := ctx.State.(HashState)
if !ok {
return nil, fmt.Errorf("%w: HashState in invalid state", arrow.ErrInvalid)
}

for _, r := range results {
// release any pre-allocation we did
r.Release()
}

uniques, err := impl.GetDictionary()
if err != nil {
return nil, err
Expand Down
15 changes: 15 additions & 0 deletions go/internal/bitutils/bit_set_run_reader.go
Expand Up @@ -344,3 +344,18 @@ func VisitSetBitRuns(bitmap []byte, bitmapOffset int64, length int64, visitFn Vi
}
return nil
}

func VisitSetBitRunsNoErr(bitmap []byte, bitmapOffset int64, length int64, visitFn func(pos, length int64)) {
if bitmap == nil {
visitFn(0, length)
return
}
rdr := NewSetBitRunReader(bitmap, bitmapOffset, length)
for {
run := rdr.NextRun()
if run.Length == 0 {
break
}
visitFn(run.Pos, run.Length)
}
}
3 changes: 3 additions & 0 deletions go/parquet/file/column_reader.go
Expand Up @@ -134,6 +134,8 @@ type columnChunkReader struct {
// is set when an error is encountered
err error
defLvlBuffer []int16

newDictionary bool
}

// NewColumnReader returns a column reader for the provided column initialized with the given pagereader that will
Expand Down Expand Up @@ -225,6 +227,7 @@ func (c *columnChunkReader) configureDict(page *DictionaryPage) error {
return xerrors.New("parquet: dictionary index must be plain encoding")
}

c.newDictionary = true
c.curDecoder = c.decoders[enc]
return nil
}
Expand Down
47 changes: 38 additions & 9 deletions go/parquet/file/column_writer.go
Expand Up @@ -52,6 +52,26 @@ type ColumnChunkWriter interface {
TotalBytesWritten() int64
// Properties returns the current WriterProperties in use for this writer
Properties() *parquet.WriterProperties
// CurrentEncoder returns the current encoder that is being used
// to encode new data written to this column
CurrentEncoder() encoding.TypedEncoder
// FallbackToPlain forces a dictionary encoded column writer to
// fallback to plain encoding, first flushing out any data it has
// and then changing the encoder to use plain encoding from
// here on out.
//
// This is automatically called if the dictionary reaches the
// limit in the write properties or under specific conditions.
//
// Has no effect if the column is not currently dictionary encoded.
FallbackToPlain()
// PageStatistics returns the current page statistics for this
// column writer. May be nil if stats are not enabled.
PageStatistics() metadata.TypedStatistics
// WriteDictIndices writes an arrow array of dictionary indices
// to this column. This should only be called by pqarrow or
// if you *really* know what you're doing.
WriteDictIndices(arrow.Array, []int16, []int16) error

LevelInfo() LevelInfo
SetBitsBuffer(*memory.Buffer)
Expand Down Expand Up @@ -155,10 +175,11 @@ func newColumnWriterBase(metaData *metadata.ColumnChunkMetaDataBuilder, pager Pa
return ret
}

func (w *columnWriter) HasBitsBuffer() bool { return w.bitsBuffer != nil }
func (w *columnWriter) SetBitsBuffer(buf *memory.Buffer) { w.bitsBuffer = buf }

func (w *columnWriter) LevelInfo() LevelInfo { return w.levelInfo }
func (w *columnWriter) CurrentEncoder() encoding.TypedEncoder { return w.currentEncoder }
func (w *columnWriter) HasBitsBuffer() bool { return w.bitsBuffer != nil }
func (w *columnWriter) SetBitsBuffer(buf *memory.Buffer) { w.bitsBuffer = buf }
func (w *columnWriter) PageStatistics() metadata.TypedStatistics { return w.pageStatistics }
func (w *columnWriter) LevelInfo() LevelInfo { return w.levelInfo }

func (w *columnWriter) Type() parquet.Type {
return w.descr.PhysicalType()
Expand Down Expand Up @@ -231,7 +252,8 @@ func (w *columnWriter) commitWriteAndCheckPageLimit(numLevels, numValues int64)
w.numBufferedValues += numLevels
w.numDataValues += numValues

if w.currentEncoder.EstimatedDataEncodedSize() >= w.props.DataPageSize() {
enc := w.currentEncoder.EstimatedDataEncodedSize()
if enc >= w.props.DataPageSize() {
return w.FlushCurrentPage()
}
return nil
Expand Down Expand Up @@ -618,15 +640,19 @@ func levelSliceOrNil(rep []int16, offset, batch int64) []int16 {
}

//lint:ignore U1000 maybeReplaceValidity
func (w *ByteArrayColumnChunkWriter) maybeReplaceValidity(values arrow.Array, newNullCount int64) arrow.Array {
func (w *columnWriter) maybeReplaceValidity(values arrow.Array, newNullCount int64) arrow.Array {
if w.bitsBuffer == nil {
values.Retain()
return values
}

buffers := values.Data().Buffers()
if len(buffers) == 0 {
if len(values.Data().Buffers()) == 0 {
values.Retain()
return values
}

buffers := make([]*memory.Buffer, len(values.Data().Buffers()))
copy(buffers, values.Data().Buffers())
// bitsBuffer should already be the offset slice of the validity bits
// we want so we don't need to manually slice the validity buffer
buffers[0] = w.bitsBuffer
Expand All @@ -635,5 +661,8 @@ func (w *ByteArrayColumnChunkWriter) maybeReplaceValidity(values arrow.Array, ne
data := values.Data()
buffers[1] = memory.NewBufferBytes(data.Buffers()[1].Bytes()[data.Offset()*arrow.Int32SizeBytes : data.Len()*arrow.Int32SizeBytes])
}
return array.MakeFromData(array.NewData(values.DataType(), values.Len(), buffers, nil, int(newNullCount), 0))

data := array.NewData(values.DataType(), values.Len(), buffers, nil, int(newNullCount), 0)
defer data.Release()
return array.MakeFromData(data)
}

0 comments on commit d8a91d0

Please sign in to comment.